akka-stream

    0热度

    1回答

    我正在尝试编写Akka流图。我写的代码是 val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder => (sink1, sink2) => import GraphDSL.Implicits._ val bcast = builder.

    3热度

    1回答

    下面是使用最简单的图形中的Partition和Merge我能想出的,但运行时,它提供了以下错误: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1] 据我所知

    0热度

    1回答

    我想弄清楚如何处理在你的一个阶段中你需要进行一个返回InputStream的调用,在那里我将处理该流作为舞台的来源进一步下降。 例如 Source.map(e => Calls that return an InputStream) .via(processingFlow).runwith(sink.ignore) 我想该元素将处理流程那些从InputStream到来。这基本上是我拖尾一个文

    0热度

    1回答

    我想在Scala中实现外部合并排序。它用于排序整个主内存中不适合的大文件。 详细信息可以在这里找到: - How external merge sort algorithm works? 现在,我需要阅读的文件块,排序,并将其写入到磁盘等等等等 什么是阅读的最地道的/功能性的方式/写一个大文件的部分? 如果我使用'Source.fromFile(filename).getLines'方法,我知道我

    0热度

    2回答

    我已经使用monix和akka-streams将List [ClassA]映射到List [ClassB]的基准,但我不明白它为什么如此缓慢。 我尝试了不同的方法来映射,这里是与江铃控股的结果: [info] Benchmark Mode Cnt Score Error Units [info] MappingBenchmark.akkaLoadBalanceMap ss 2

    0热度

    1回答

    我试图解码Marc21二进制数据记录,它具有关于提供记录长度的字段的以下规范。 A计算机生成的五个字符的数字,等于整个记录的长度,包括其本身和记录终止符。数字 是正确的,未使用的位置包含零。 我想使用 阿卡流Framing.lengthField,但我只是不知道如何指定字段的大小。我想象一个角色是8位,也许是16位,我不确定,我想知道是否依赖于平台或语言。总之,问题是可以说出该领域的规模是什么知道

    0热度

    1回答

    我正在尝试创建一个可以通过类似Iterator的东西来消费的流。 我正在实现一个公开类似于迭代器的接口的库,所以这对我来说是最简单的东西。 我目前设计的图形本质上是Source<Iterator<DataRow>>。有一件事我看到到目前为止是将其压扁到Source<DataRow>然后用后跟https://docs.oracle.com/javase/8/docs/api/java/util/st

    0热度

    1回答

    我想听听使用阿卡流SQS阿卡流的状态,我得到的消息从它的Q 使用此代码段: implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUt

    1热度

    1回答

    我有一个在云中运行某处的Kafka Broker,当我试图通过命令行消费者工具使用它时,我可以使用消息。但是,当我将相同的端点放在我的akka​​-stream卡巴消费者设置中时,它不起作用。 例如: - bin/kafka-console-consumer.sh --zookeeper主机名:2181/xxx/yyy - 主题名称 这对我有用。但是当我通过ConsumerSetting做同样的事

    0热度

    1回答

    在akka-http websocket应用程序中,我有一条Route回应给定的消息,并且我还需要在应用程序中维护状态。所以下面的工作正常: override protected def routes: Route = pathSuffix("echo") { handleWebSocketMessages(echoMessageFlow) } def