2016-07-26 88 views
3

我有这一小段代码,模拟使用大对象(那巨大的byte[])的流量。对于序列中的每个项目,调用一个异步方法来获得一些结果。问题?事实上,它会抛出OutOfMemoryException无效扩展SelectMany与大对象

代码LINQPad(C#程序)兼容:

void Main() 
{ 
    var selectMany = Enumerable.Range(1, 100) 
        .Select(i => new LargeObject(i)) 
        .ToObservable() 
        .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 


private static async Task<int> DoSomethingAsync(LargeObject lo) 
{ 
    await Task.Delay(10000); 
    return lo.Id; 
} 

internal class LargeObject 
{ 
    public int Id { get; } 

    public LargeObject(int id) 
    { 
     this.Id = id; 
    } 

    public byte[] Data { get; } = new byte[10000000]; 
} 

似乎它创建的同时所有对象。我该如何正确地做到这一点?

其基本思想是调用DoSomethingAsync以获得每个对象的一些结果,所以这就是为什么我使用SelectMany。为了简化,我只介绍了一个Task.Delay,但在现实生活中它是一个可以同时处理一些项目的服务,所以我想引入一些并发机制来获得它的优势。

请注意,从理论上讲,处理少量项目的时间不应该填满内存。实际上,我们只需要每个“大对象”来获取DoSomethingAsync方法的结果。在那之后,大对象不再被使用。

+0

我不知道你的问题是否与你的测试代码(Enumerable.Range'急切地创建所有大对象),或者你在生产中看到这个问题?无论哪种方式,如果某个序列创建了许多LargeObjects,并且它们仍在使用,那么不能被GC'ed,那么您会得到一个OOM异常。 –

回答

4

我觉得我是repeating myself。与您最后一个问题和我的最后一个答案类似,您需要做的是限制要并发创建的bigObjects™的数量。

为此,您需要将对象创建和处理合并到一个线程池中。现在的问题是,我们使用异步方法来允许线程在我们的异步方法运行时执行其他操作。由于缓慢的网络调用是异步的,因此您的(快速)对象创建代码将不断创建大型对象。

取而代之,我们可以使用Rx通过将对象创建与异步调用相结合来保持并发观察对象的数量,并使用.Merge(maxConcurrent)来限制并发性。

作为奖励,我们还可以设置查询执行的最短时间。只需要Zip以最少的延迟。

static void Main() 
{ 
    var selectMany = Enumerable.Range(1, 100) 
         .ToObservable() 
         .Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i))) 
          .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))) 
          .Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el) 
         ).Merge(4); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 

    Console.ReadLine(); 
} 


private static async Task<int> DoSomethingAsync(LargeObject lo) 
{ 
    await Task.Delay(10000); 
    return lo.Id; 
} 

internal class LargeObject 
{ 
    public int Id { get; } 

    public LargeObject(int id) 
    { 
     this.Id = id; 
     Console.WriteLine(id + "!"); 
    } 

    public byte[] Data { get; } = new byte[10000000]; 
} 
0

您可以引入一个时间间隔延迟是这样的:

var source = Enumerable.Range(1, 100) 
    .ToObservable() 
    .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i) 
    .Select(i => new LargeObject(i)) 
    .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o))); 

所以不要扯所有100个整数一次,立即将它们转换为LargeObject则呼吁所有100 DoSomethingAsync的,它淌的整数出来一个接一个地隔开一秒钟。


这就是TPL + Rx解决方案的样子。不用说,它比单独的Rx或TPL单独不那么优雅。不过,我不认为这个问题是非常适合的Rx:

void Main() 
{ 
    var source = Observable.Range(1, 100); 

    const int MaxParallelism = 5; 
    var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)), 
     new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism }); 
    source.Subscribe(transformBlock.AsObserver()); 
    var selectMany = transformBlock.AsObservable(); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 
+0

我可以理解这可能在实践中有效,但选择1秒延迟是任意的,并且仍然可以允许内存不足错误或者它可以显着减慢计算。这不是一个可靠的解决方案。 – Enigmativity

+0

编辑添加TPL答案。 Rx不会在这里发光。 – Shlomo

2

看来,它创建在同一时间的所有对象。

是的,因为您一次都创建它们。

如果我简化你的代码,我可以告诉你为什么:

void Main() 
{ 
    var selectMany = 
     Enumerable 
      .Range(1, 5) 
      .Do(x => Console.WriteLine($"{x}!")) 
      .ToObservable() 
      .SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i))); 

    selectMany 
     .Subscribe(r => Console.WriteLine(r)); 
} 

private static async Task<int> DoSomethingAsync(int i) 
{ 
    await Task.Delay(1); 
    return i; 
} 

运行该生产:

 
1! 
2! 
3! 
4! 
5! 
4 
3 
5 
2 
1 

因为Observable.FromAsync的你是允许源之前,任何运行至结束结果返回。换句话说,您正在快速构建所有大型对象,但要慢慢处理它们。

您应该允许Rx同步运行,但在默认调度程序上,以便您的主线程不被阻塞。代码将运行,没有任何内存问题,您的程序将在主线程上保持响应。

下面是这个代码:我试图测试其他选项

var selectMany = 
    Observable 
     .Range(1, 100, Scheduler.Default) 
     .Select(i => new LargeObject(i)) 
     .Select(o => DoSomethingAsync(o)) 
     .Select(t => t.Result); 

(我已经有效地代替Enumerable.Range(1, 100).ToObservable()Observable.Range(1, 100)为也将有一些问题,帮助)

,但如此远远地允许DoSomethingAsync以异步方式运行的任何内容都会出现内存不足错误。

+0

感谢您的回答,@Enigmativity,但我想我错过了一些东西。我正在调用的异步方法是一个可以同时处理项目的远程服务。在处理另一个物品之前等待一个物品处理是不理想的。你认为我可以同时处理多个项目(3个或4个)以获得并发优势而不会遇到内存问题吗? – SuperJMN

+1

@SuperJMN,如果您尝试创建比您可以分配更多的'LargeObject',则会收到OOM异常。这与Rx有关。如果它们需要独立于正在执行的DoSomethingAsync被创建,那么你有麻烦了。在我看来,你实际上想要坚持到队列中并突破Rx。 –

+1

@SuperJMN - 我尝试了一堆东西让它扼杀处理,但它不工作。一次处理一个对象时效率不高,但它具有高效的内存。这取决于你要达到的效率。对于计算效率来说,Shlomo的回答更加糟糕。如果我想到什么,我会让你知道。 – Enigmativity

0

ConcatMap支持开箱即用。我知道这个运算符在.net中不可用,但是你可以使用Concat运算符来完成相同的操作,这个运算符会延迟订阅每个内部源,直到前一个完成。

+0

您可以发布ConcatMap操作符的代码和一些示例吗?谢谢! – SuperJMN

+0

http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ –

+0

http://reactivex.io/documentation/operators/concat.html –