2017-06-15 73 views
0

您如何在流程上创建Flow?比方说,我有一个进程通过stdin获取输入并通过stdout发送输出。我想有一个流量。因此,流程将首先启动流程,控制其输入和输出流,然后充当Map,通过将输入输出到流程来将输入映射为输出?最终在流结束时终止流程。在这种情况下,背压控制又是如何工作的?Akka Streams:正在运行的流程的流程

回答

0

这是一个非常基本的Flow that is "coupled",因此它由于来自输入或输出(接收器/源)的中断而终止。为简单起见,我假定在没有标准输入的情况下,但你能适应这种添加Sink没有被忽略,pipes input into the process

 
// capture stdout from the "ls" command 
val runCmd: Source[Message, NotUsed] = { 
    Source.fromIterator[Message](() => Process("ls").lineStream.toIterator.map(line => TextMessage.Strict(line))) 
    } 

// create flow for emitting stdout source to client 
val messageStdout: Flow[Any, Message, NotUsed] = Flow 
    .fromSinkAndSourceCoupled(Sink.ignore, runCmd) 

至于背压,我上面的代码示例通知是阻塞操作(一次为)。随着处理速度的增长,您可以开发一个异步解决方案,并且具有最大并行度的服务,如mapasync所定义的那样。