2013-03-07 68 views
6

我被rx和特定查询卡住了。 问题:Rx groupby直到条件发生变化

许多单一的更新操作是由连续流产生的。操作可以是插入或删除。我想缓冲这些数据流并在当时执行少量操作,但维护订单非常重要。此外,操作应被缓冲并在序列来完成每X秒

实施例:

在:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete 

输出:

insert(3)-delete(2)-insert(1)-delete(4) 

我写了一个简单的应用程序,以测试它,它的作品或多或少,因为我会,但它不尊重传入的插入/删除顺序

namespace RxTests 
{ 
using System; 
using System.Collections.Generic; 
using System.Globalization; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Text; 
using System.Threading; 

internal class Program 
{ 
    private static readonly Random Random = new Random(); 

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); 

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); 

    private static void Main(string[] args) 
    { 
     Console.WriteLine("Starting production"); 
     var producerScheduler = new EventLoopScheduler(); 
     var consumerScheduler = new EventLoopScheduler(); 
     var producer = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .SubscribeOn(producerScheduler) 
         .Subscribe(Produce, WriteProductionCompleted); 
     var consumer = 
      operations.ObserveOn(producerScheduler) 
         .GroupBy(operation => operation.Delete) 
         .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50)) 
         .SubscribeOn(consumerScheduler) 
         .Subscribe(WriteUpdateOperations); 
     Console.WriteLine("Type any key to stop"); 
     Console.ReadKey(); 
     consumer.Dispose(); 
     producer.Dispose(); 
    } 

    private static void Produce(long time) 
    { 
     var delete = Random.NextDouble() < 0.5; 
     Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time); 
     var idString = (time + 1).ToString(CultureInfo.InvariantCulture); 
     var id = time + 1; 
     operations.OnNext(
      new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture))); 
    } 

    private static void WriteProductionCompleted() 
    { 
     Console.WriteLine("Production completed"); 
     ProducerStopped.Cancel(); 
    } 

    private static void WriteUpdateOperation(UpdateOperation updateOperation) 
    { 
     Console.WriteLine("Consuming {0}", updateOperation); 
    } 

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation) 
    { 
     foreach (var operation in updateOperation) 
     { 
      WriteUpdateOperation(operation); 
     } 
    } 

    private class UpdateOperation 
    { 
     public UpdateOperation(long id, bool delete, params string[] changes) 
     { 
      this.Id = id; 
      this.Delete = delete; 
      this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); 
     } 

     public bool Delete { get; set; } 

     public long Id { get; private set; } 

     public IList<string> Changes { get; private set; } 

     public override string ToString() 
     { 
      var stringBuilder = new StringBuilder("{UpdateOperation "); 
      stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete); 
      if (this.Changes.Count > 0) 
      { 
       stringBuilder.Append(this.Changes.First()); 
       foreach (var change in this.Changes.Skip(1)) 
       { 
        stringBuilder.AppendFormat(", {0}", change); 
       } 
      } 

      stringBuilder.Append("]}"); 
      return stringBuilder.ToString(); 
     } 
    } 
} 

}

任何人可以帮助我正确的查询?

感谢

UPDATE 13年3月8日(由JerKimball建议)

以下线的微小变化/补充JerKimball代码打印结果:

using(query.Subscribe(Print)) 
{ 
    Console.ReadLine(); 
    producer.Dispose();   
} 

使用以下打印方法:

private static void Print(IObservable<IList<Operation>> operations) 
{ 
    operations.Subscribe(Print); 
} 

private static void Print(IList<Operation> operations) 
{ 
    var stringBuilder = new StringBuilder("["); 
    if (operations.Count > 0) 
    { 
     stringBuilder.Append(operations.First()); 
     foreach (var item in operations.Skip(1)) 
     { 
      stringBuilder.AppendFormat(", {0}", item); 
     } 
    } 

    stringBuilder.Append("]"); 
    Console.WriteLine(stringBuilder); 
} 

和下面的字符串的操作:

public override string ToString() 
{ 
    return string.Format("{0}:{1}", this.Type, this.Seq); 
} 

顺序被保存,但是:

  • 我不知道其他内部认购订阅:这是正确的(这是一个问题,因为我有很久以前,对我而言并不清楚)?
  • 我总是有每个列表(即使流产生两个以上的连续值与同类型)的不超过两种元素
+0

'我不知道在另一个订阅订阅:它是正确的< - 你是什么意思?这发生在哪里? (编辑)哦,我看到 - 在你的'打印'...是的,你不想这样做,如果只是因为你现在正在泄漏一个'IDisposable'' – JerKimball 2013-03-08 20:14:20

+0

'Subscribe'有一个重载需要一个'Action ' - 使用它;这就是为什么我在我的例子中使用'Console.WriteLine'。如果你改变'Print'的签名来取'IList ',你可以'使用(query.Subscribe(Print))' – JerKimball 2013-03-08 20:17:12

+0

从Select我得到一个IObservable >>,这就是为什么我用第一次打印(IObservable >操作)。所以我会再次包装第二个操作。订阅(打印)在使用 – fra 2013-03-08 20:22:01

回答

1

让我们尝试一种新的方法(因此新的答案):

首先,让我们定义的扩展方法,将“崩溃”的基础上的一个关键项目的列表,同时保留顺序:

public static class Ext 
{ 
    public static IEnumerable<List<T>> ToRuns<T, TKey>(
      this IEnumerable<T> source, 
      Func<T, TKey> keySelector) 
    { 
     using (var enumerator = source.GetEnumerator()) 
     { 
      if (!enumerator.MoveNext()) 
       yield break; 

      var currentSet = new List<T>(); 

      // inspect the first item 
      var lastKey = keySelector(enumerator.Current); 
      currentSet.Add(enumerator.Current); 

      while (enumerator.MoveNext()) 
      { 
       var newKey = keySelector(enumerator.Current); 
       if (!Equals(newKey, lastKey)) 
       { 
        // A difference == new run; return what we've got thus far 
        yield return currentSet; 
        lastKey = newKey; 
        currentSet = new List<T>(); 
       } 
       currentSet.Add(enumerator.Current); 
      } 

      // Return the last run. 
      yield return currentSet; 

      // and clean up 
      currentSet = new List<T>(); 
      lastKey = default(TKey); 
     } 
    } 
} 

相当简单 - 给定一个IEnumerable<T>,将返回一个List<List<T>>其中每个子列表将具有相同的密钥。

现在,为了养活它并使用它:

var rnd = new Random(); 
var fakeSource = new Subject<Operation>(); 
var producer = Observable 
    .Interval(TimeSpan.FromMilliseconds(1000)) 
    .Subscribe(i => 
     { 
      var op = new Operation(); 
      op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
      fakeSource.OnNext(op); 
     });  

var singleSource = fakeSource 
    .Publish().RefCount(); 

var query = singleSource 
    // change this value to alter your "look at" time window 
    .Buffer(TimeSpan.FromSeconds(5))  
    .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0)); 

using(query.Subscribe(batch => 
{ 
    foreach(var item in batch) 
    { 
     Console.WriteLine("{0}({1})", item.First().Type, item.Count); 
    } 
})) 
{ 
    Console.ReadLine(); 
    producer.Dispose();  
} 

给一个旋转 - 这是我在一个典型的运行看:

insert(4) 
delete(2) 
insert(1) 
delete(1) 
insert(1) 
insert(1) 
delete(1) 
insert(1) 
delete(2) 
delete(2) 
insert(2) 
delete(1) 
insert(1) 
delete(2) 
insert(2) 
+0

非常感谢! 这正是我所需要的。我会给你非常值得的奖励! – fra 2013-03-16 15:48:21

4

依我看,你可以得到你以后用的GroupByUntil混合,DistinctUntilChangedBuffer

这一点需要一些调整,以适应您的示例代码,但查询(和概念)保持:

(编辑:DOH - 错过了位那里...)

void Main() 
{ 
    var rnd = new Random(); 
    var fakeSource = new Subject<Operation>(); 
    var producer = Observable 
     .Interval(TimeSpan.FromMilliseconds(1000)) 
     .Subscribe(i => 
      { 
       var op = new Operation(); 
       op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
       fakeSource.OnNext(op); 
      });  
    var singleSource = fakeSource.Publish().RefCount(); 

    var query = singleSource 
     // We want to groupby until we see a change in the source 
     .GroupByUntil(
       i => i.Type, 
       grp => singleSource.DistinctUntilChanged(op => op.Type)) 
     // then buffer up those observed events in the groupby window 
     .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50)); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     Console.ReadLine(); 
     producer.Dispose();   
    } 
} 

public class Operation { 
    private static int _cnt = 0; 
    public Operation() { Seq = _cnt++; } 
    public int Seq {get; set;} 
    public string Type {get; set;}  
} 
+0

嗨!谢谢,但它没有给我预期的输出。 我其实认为GroupBy不适合我的需求,因为我需要在两个流之间切换。 您可以验证您的解决方案的输出吗?谢谢 – fra 2013-03-08 18:48:04

+0

@fra哈 - 错过了一部分... *叹*我会编辑...好的,试试更新的版本? – JerKimball 2013-03-08 19:20:44

+0

非常感谢,我们快到了。还有一些疑问和问题,请检查我更新的问题 – fra 2013-03-08 20:07:29