2013-07-09 27 views
4

我正在寻找一种不间断地动态合并数据源的方法。真实世界的情景会像来自多个来源的数据一样拉动,而不考虑冗余信息。Rx,动态合并源

为了简化代码,我用一个简单的数字生成器代替了更复杂的代码,它将连续生成数据。这可以通过读取来自多个外部服务器的连续数据流进行比较。

我希望能够合并两个来源并将结果(适当时)输出到控制台,这部分工作得很好。当我们终止这两个来源,并在另一个来源合并是事情按预期停止工作的地方。在这种情况下,我们可以轻松地重新连接mergedStreamObserver,但是,在一个更大的应用程序中,我们不得不关注数据中的差距,并跟踪观察者订阅的内容。

有没有办法解决这个问题?

// imports 
using System; 
using System.Reactive.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

static void Main(string[] args) { 
    // base "stream of results" as we will want to randomly add (and terminate other sources) 
    IObservable<int> merged = Observable.Empty<int>(); 

    // source 1 
    var tokenSource1 = new CancellationTokenSource(); 
    IObservable<int> xs = Generate(tokenSource1, "A"); 

    // to avoid generating the same numbers, which does happen, 
    // sleep some amount of time before calling generate again 
    Thread.Sleep(100); 

    // source 2 
    var tokenSource2 = new CancellationTokenSource(); 
    IObservable<int> xt = Generate(tokenSource2, "B"); 

    // odd queries 
    var seq1 = from n in xs where n % 2 == 1 select n; 

    // even queries 
    var seq2 = from n in xt where n % 2 == 0 select n; 

    // merge everything together 
    merged = merged.Merge<int>(seq1); 
    merged = merged.Merge<int>(seq2); 

    // observer for the merged "streams" 
    // NOTE: while this does not appear to be working correctly, 
    // remember you have 2 streams and 2 queries at work. It 
    // really is doing what it's expected to here. 
    IDisposable mergedStreamObserver = merged.Subscribe(str => { Console.WriteLine(str); }); 

    // kill both sources 
    Console.ReadKey(); 

    tokenSource1.Cancel(); 
    tokenSource2.Cancel(); 

    // start source and query for evens 
    // try to merge it 
    Console.ReadKey(); 

    tokenSource2 = new CancellationTokenSource(); 
    xt = Generate(tokenSource2, "B"); 

    seq2 = from n in xt where n % 2 == 0 select n; 

    merged = merged.Merge(seq2); 

    // Nothing is happening because the merged stream was modified. 
    // How do we create a composite Observable from multiple sources 
    // and dynamically add/terminate those sources? 

    Console.ReadKey(); 

    tokenSource2.Cancel(); 
    mergedStreamObserver.Dispose(); 
    Console.ReadKey(); 
} 

static IObservable<int> Generate(CancellationTokenSource tokenSource, string name) { 
    Random random = new Random(); 

    Action<int> observer = _ => { }; /* We could use null, but then at every invocation 
              * we'd have to copy to a local and check for null. 
              */ 

    Task.Factory.StartNew(() => { 
      while(!tokenSource.IsCancellationRequested) { 
       var t = random.Next(0, 100); 
       Console.WriteLine("From Generator {0}: {1}", name, t); 

       observer(t); 

       Thread.Sleep(1000); 
      } 

      Console.WriteLine("Random Generator Stopped"); 
     }, tokenSource.Token); 

    return Observable.FromEvent<int>(
     eh => observer += eh, 
     eh => observer -= eh); 
} 

回答

5

使用一个主题,订阅合并的流创建流前:

var streams = new Subject<IObservable<int>>(); 
var mergedStreams = streams.Merge(); 
var mergedObserver = mergedStreams.Subscribe(...); 

// now create your streams 
... 

// add them to the streams subject 
streams.OnNext(seq1); 
streams.OnNext(seq2); 
... 

streams.OnNext(seq3); 
streams.OnNext(seq4); 

... 
// If we know there will be no more streams, tell the Subject... 
streams.OnCompleted(); 
+0

完美,谢谢!来自SI 1.2的心态,我试图错误地使用这个主题。 – codeape