2013-02-25 81 views
-1

我有我的C#4.0应用程序中的对象列表。假设这个列表包含100个学生类的对象。 Reactive Framework有什么方法可以每次同时执行10个对象?这可能使用Reactive Framework吗?

每个学生对象都运行一些耗时约10到15秒的方法。因此,第一次通过时,从列表中取出前10个学生对象,并等待所有10个学生对象完成其工作,然后取下10个学生对象,依此类推直到完成列表中的全部项目?

  1. 我有一个List<Student> 100计数。
  2. 首先从列表中取出10个项目,并且同时调用每个对象的长跑方法。
  3. 接收每个对象的返回值并更新UI [订阅部分]。
  4. 只有前10轮完成并释放所有内存时,才开始下一轮。
  5. 对列表中的所有项目重复相同的过程。
  6. 如何捕捉每个过程中的错误?
  7. 如何从内存中释放每个学生对象的资源和其他资源?
  8. 哪个是在Reactive Framework中完成所有这些事情的最佳方法?

回答

0

我尝试....

var students = new List<Student>(); 
{....} 
var cancel = students 
    .ToObservable(Scheduler.Default) 
    .Window(10) 
    .Merge(1) 
    .Subscribe(tenStudents => 
    { 
     tenStudents.ObserveOn(Scheduler.Default) 
      .Do(x => DoSomeWork(x)) 
      .ObserverOnDispatcher() 
      .Do(tenStudents => UpdateUI(tenStudents)) 
      .Subscribe();    
    }); 
+0

谢谢阿隆。你能解释一下你的代码吗?非常感谢 – user2017793 2013-02-26 14:12:07

+0

真的很简单。窗口(10)将工作转换为块10.合并(1)在单个线程上工作。将这10名学生转换成一个内部可观察的。呃,做一些工作吧。 ObserveOnDispatcher()返回到下一位的UI线程。 Do ...嗯...在UpdatingUI上工作。最后订阅内部可观察。冲洗并重复。 – Aron 2013-02-26 14:55:38

+0

再次感谢阿隆。我的疑问是如何释放每个10个学生对象资源。你的解释非常有帮助,非常感谢。我担心内存不足问题。请帮助我。 – user2017793 2013-02-26 15:08:15

1

这个版本将始终在同一时间运行的10名学生。当学生完成时,另一个将开始。当每个学生完成时,你可以处理它有的任何错误,然后清理它(这将在下一个学生开始运行之前发生)。

students 
    .ToObservable() 
    .Select(student => Observable.Defer(() => Observable.Start(() => 
     { 
      // do the work for this student, then return a Tuple of the student plus any error 
      try 
      { 
       student.DoWork(); 
       return { Student = student, Error = (Exception)null }; 
      } 
      catch (Exception e) 
      { 
       return { Student = student, Error = e }; 
      } 
     }))) 
    .Merge(10) // let 10 students be executing in parallel at all times 
    .Subscribe(studentResult => 
    { 
     if (studentResult.Error != null) 
     { 
      // handle error 
     } 

     studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up. 
    }); 

这不正是问什么,因为它没有开始下一批次之前完成第一批10。这总是保持10并行运行。如果你真的想批10我会调整的代码。

+0

感谢您的回复。我对RX很陌生并试图研究它。你们都很有帮助。在这个解决方案中,我发现只需要10个学生,然后10个学生完成后,就看不到任何代码,以便接下来10个学生(如批次),直到它覆盖100个学生。学生名单集合包含100名学生。由于所有100名学生并行运行可能会导致内存异常,我的计划是分批运行学生10.请帮助。 – user2017793 2013-02-28 00:26:37

+0

'.Merge(10)'这样做。把它想象成一个酒吧里的保镖。一次只允许10名学生参加。只要其中一名学生完成并离开,那里只剩下9名学生,所以'Merge'会让另一名学生进入,直到所有100名学生都被处理完毕。 – Brandon 2013-02-28 03:31:35

+0

感谢帮助。一次真正需要10名学生,下一次需要10个学生。当它需要一批10人时,Rx中是否有任何方法可以运行,这10名学生可以并行运行。上述解决方案正在等待每个批次的完成,然后只会调用订阅。在订阅中,我有另一个消息来更新wcf客户端中的UI。但等待每批的完整执行杀死时间。我正在寻找更快的RX方式。有没有什么办法可以批量并行运行成员? – user2017793 2013-02-28 10:46:34

0

这对我来说听起来非常像TPL的问题。你有一组已知的数据。您想分割一些繁重的处理并行运行,并且希望能够批处理负载。

我没有看到问题的任何地方是异步的源代码,是运动数据的源代码或需要被动的消费者。这是我建议您使用TPL的理由。

在另一个注释中,为什么要并行处理10个幻数?这是业务需求,还是潜在的优化性能的尝试?通常最好的做法是让TaskPool根据核心数量和当前负载计算出最适合客户端CPU的最佳实践。我想,随着设备及其CPU结构(单核,多核,多核,低功耗/禁用内核等)的巨大变化,这变得越来越重要。

这里有一种方法可以做到这在LinqPad(但要注意缺乏Rx的)

void Main() 
{ 
    var source = new List<Item>(); 
    for (int i = 0; i < 100; i++){source.Add(new Item(i));} 

    //Put into batches of ten, but only then pass on the item, not the temporary tuple construct. 
    var batches = source.Select((item, idx) =>new {item, idx}) 
         .GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item); 

    //Process one batch at a time (serially), but process the items of the batch in parallel (concurrently). 
    foreach (var batch in batches) 
    { 
     "Processing batch...".Dump(); 
     var results = batch.AsParallel().Select (item => item.Process()); 
     foreach (var result in results) 
     { 
      result.Dump(); 
     } 
     "Processed batch.".Dump(); 
    } 
} 


public class Item 
{ 
    private static readonly Random _rnd = new Random(); 
    private readonly int _id; 
    public Item(int id) 
    { 
     _id = id; 
    } 

    public int Id { get {return _id;} } 

    public double Process() 
    { 
     var threadId = Thread.CurrentThread.ManagedThreadId; 
     string.Format("Processing on thread:{0}", threadId).Dump(Id); 
     var loopCount = _rnd.Next(10000,1000000); 
     Thread.SpinWait(loopCount); 
     return _rnd.NextDouble(); 
    } 
    public override string ToString() 
    { 
     return string.Format("Item:{0}", _id); 
    } 
} 

如果你有一个数据在运动问题或反应我很想找出消费者问题,但只是“淡化”了问题,以便于解释。