2017-06-13 144 views
2

使用Future.traverse的Im是执行的顺序保证。我的功能fn必须在下一个元素运行之前调用并完成未来。Future.traverse确保执行的顺序

val xs = Seq[T] ??? 
def fn(t: T): Future[Unit] = ??? 
Future.traverse(xs)(fn) 

感谢,

回答

2

实现:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = 
    in.foldLeft(successful(cbf(in))) { (fr, a) => 
     val fb = fn(a) 
     for (r <- fr; b <- fb) yield (r += b) 
    }.map(_.result()) 

val fb = fn(a)创建Future[B],然后才与先前创建的未来for (r <- fr; b <- fb) yield (r += b)组成。所以答案是否定的。没有执行订单保证。

斯卡拉2.12实施改变:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = 
    in.foldLeft(successful(cbf(in))) { 
     (fr, a) => fr.zipWith(fn(a))(_ += _) 
    }.map(_.result())(InternalCallbackExecutor) 

但同样“下一步”创建未来之前(zipWith的第一个参数是“按值调用”)与以前fr链接。

如果您需要依次遍历那么就使执行2.11变化不大:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = 
    in.foldLeft(successful(cbf(in))) { (fr, a) => 
     for (r <- fr; b <- fn(a)) yield (r += b) 
    }.map(_.result()) 
+0

我没有达到源代码的原因是希望在没有签名/文档更改的情况下实现可能会发生变化。 但是,如果信息来源本身说它是无序的,假阳性与假阴性相比是有点模糊的。 –

+0

至于我的文档需要澄清这种行为。使用示例并不直接来自方法描述。 – Zernike

1

看起来不像它给我

ScalaDocs 2.12.0

异步和非阻挡地变换一个TraversableOnce [A]成未来[TraversableOnce [B ]]使用提供的功能A =>未来[B]。这对于执行平行地图很有用。

它在文档中没有具体提及,所以这意味着如果存在更高性能的方法,合同可能会改变。它还提到了“平行地图”,这是另一个暗示它不太可能保持执行顺序。在斯卡拉2.11 traverse

2

至于其他的答案已经指出的那样:没有,traverse不(一定[1]),应用转换按顺序完成,为元素。

你可以然而,使一些相当于linearize

也许是这样的:

import scala.concurrent._ 
import scala.collection.mutable.Builder 
import scala.collection.generic.CanBuildFrom 
import language.higherKinds 

/** 
* Linearize asynchronously applies a given function in-order to a sequence of values, producing a Future with the result of the function applications. 
* Execution of subsequent entries will be aborted if an exception is thrown in the application of the function. 
*/ 
def linearize[T, U, C[T] <: Traversable[T]](s: C[T])(f: T => Future[U])(implicit cbf: CanBuildFrom[C[T], U, C[U]], e: ExecutionContext): Future[C[U]] = { 
    def next(i: Iterator[T], b: Builder[U, C[U]]): Future[C[U]] = 
    if(!i.hasNext) Future.successful(b.result) 
    else Future.unit.flatMap(_ => f(i.next()).flatMap(v => next(i, b += v))) 
    next(s.toIterator, cbf(s)) 
} 

1:你能想象一个同步EC实现顺序效果寿。