7

我一直在观察对象的递归链中遇到一些麻烦。RxJS:可观察对象和单观察者的递归列表

我与RxJS,这是目前在1.0.10621版本,包含最基本的Rx功能,在其中Rx jQuery的协同工作。

让我为我的问题介绍一个示例场景: 我正在轮询包含特定关键字的tweets/updates的Twitter search API(JSON响应)。响应还包括一个“refresh_url”,用于生成后续请求。对后续请求的响应将再次包含一个新的refresh_url等。

Rx.jQuery允许我使Twitter搜索API调用一个可观察事件,该事件会产生一个onNext,然后完成。我到目前为止所尝试的是让onNext处理程序记住refresh_url,并在onCompleted处理程序中使用它来为下一个请求生成一个新的observable和相应的观察者。这样,一个可观察+观察者对无限期地跟随另一个。

这种方法的问题是:当前人尚未处理完毕

  1. 随访观察的/观察者已经活着。

  2. 我必须做很多讨厌的簿记保持对目前生活的观察者,其中有可能实际上是两个有效的参考。 (一个在onCompleted中,另一个在其生命周期中的其他地方)当然,这个引用需要取消订阅/处理观察者。 一个替代簿记的方法是通过“仍然运行?” - 布尔值来实现副作用,就像我在我的例子中所做的那样。

示例代码:

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

不要被观测/观察员从请求鸣叫双层上当。 我的例子主要关注第一层:从Twitter请求数据。如果在解决这个问题时第二层(将响应转换为推文)可以与第一层一起使用,那就太棒了;但我认为这是完全不同的事情。目前。

埃里克·梅蒂赫尔指出展开操作者给我(见下例),并建议Join patterns作为替代。

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

这应该是可复制到LINQPad。它假设单身可观测量并产生一个最终观察者。

所以我的问题是:我怎样才能在RxJS最好的扩展技巧?

编辑:
扩展运算符可能可以实现,如this thread所示。但需要generators(我只有JS < 1.6)。
不幸的是RxJS 2.0.20304-beta没有实现Extend方法。

+2

我得出结论,解决这个问题的确是[Expand operator](http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9),该版本尚未在RxJS中实施至版本2.0.20304-beta。 – derabbink 2012-05-25 11:26:56

+1

扩展正是你所需要的。我意识到这篇文章已经过时了,但可以将您需要的运营商从未来版本的Rx移植到您自己的Rx版本中。它可能需要一些操作,但代码库并没有从v1之前大幅改变。 – 2014-06-10 17:14:33

+0

此外,升级Rx(如果可能)是一个很好的决定。最新版本中有许多错误修复和改进。 – 2014-06-10 17:15:08

回答

4

因此,我将尝试解决您的问题,与您的问题稍有不同,并采取一些自由度,您可以更轻松地解决问题。

所以1件事我不能告诉的是,你尝试以下步骤

  • 获取第一鸣叫列表,其中包含下一个URL
  • 一旦鸣叫名单收到,onNext当前观测并获得下一组鸣叫
    • 做这个无限期

还是有用户操作(获取更多/滚动到底部)。无论哪种方式,它都是同样的问题。我可能会错误地阅读你的问题。这是答案。

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

这可能不是您要求的答案。如果没有,对不起!


编辑:查看代码后,它看起来像要将结果视为数组并观察它。

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat();