2017-04-25 45 views
2

我有两个函数的管道,这两个函数都是重IO,并行运行在一组项目上。撰写Task.async_stream与延续传递

第一个,func1很常见,而且我经常只想要func1的响应。其他时候,我想用其他函数func2来处理func1的结果。

什么是构成Task.async_stream之间的权衡(性能/开销,地道的烦躁),即

enum 
|> Task.async_stream(Mod1, :func1, []) 
|> Task.async_stream(Mod2, :func2, []) 
... 

与传递一个延续,并使用一个Task.async_stream两个func1func2

enum 
|> Task.async_stream(Mod1, :func1_then, [&Mod2.func2/arity]) 
... 

其中func1_then调用以正常func1计算结束的功能参数(Mod2.func2)?

回答

4

如果这两个功能IO约束,那么不应该有你的第一个例子中的任何问题:

enum 
|> Task.async_stream(Mod1, :func1, []) 
|> Task.async_stream(Mod2, :func2, []) 

如果你没有要折叠的两个电话,我不会用一个延续的风格,只是他们的管道中拉姆达传递给Task.async_stream/3

enum 
|> Task.async_stream(fn x -> x |> Mod1.func1() |> M2.func2() end) 

或者,你可以考虑使用Flow

enum 
|> Flow.from_enumerable() 
|> Flow.map(&Mod1.func1/1) 
|> Flow.map(&Mod2.func2/1) 
|> Flow.run() 
+0

很好的答案,很高兴能有更多的观点来完成这一任务。流是否在Task.async_stream上引入额外开销?这两个函数是IO绑定(读/写文件),但集合本身通常会少于100个项目。 – Nolan330

+2

为了跟进此事,我做了一些非常天真的**基准测试(使用找到的示例[here](http://stackoverflow.com/a/29674651/1492117)),并发现在每次500次运行中,平均次数如下: '组成Task.async:0.204734s' '管道式Task.async:0.21415s' '流速:0.25598s' 显然与盐的任何其他上下文的晶粒取。 – Nolan330

+0

第一个例子比第二个例子好,因为你将函数分成两个不同的流,第二个例子中你只有一个流,因此操作不是在同一个层次上分离的。 – PatNowak