backpressure

    0热度

    1回答

    在RxJava 1/RxScala中,如何在下列情况下节流/背压可观测源? def fast: Observable[Foo] // Supports backpressure def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => Observable.from(expensiveOp(foo)) // Signa

    0热度

    1回答

    ---abcde-----f-------gh-----i----> //Events 我有一个“工作队列”,我想观察/订阅。这是一个要处理的命令对象的数组。新的作品通常以连发形式出现,并且需要连续处理(按收到的顺序逐个处理,直至完全处理)。 我正在使用RxJS 5.0.0-beta.6。 (由其他库强加的版本) 下面是一个工作示例,说明我想要的行为,但使用RxJS v4。 问题的主要代码是这

    0热度

    1回答

    我在我的android应用程序中使用RxJava。我正在使用interval()函数使用计时器,但即使添加了onBackPressureDrop(),我仍然得到Missing Backpressure异常。我还为我的订户添加了onError(),并将异常记录到Crashlytics,但它仍然崩溃。请帮忙。我花了一个星期的时间来解决问题,但无济于事。代码偶尔崩溃,我甚至一次都无法重现它。 Trace

    0热度

    1回答

    我已经编写了一个Akka应用程序,该应用程序从Kafka获取输入,然后使用分片演员处理数据并输出到Kafka。 但在某些场合分片区域不能处理负载,我也得到: 你或许应该实行流量控制,以避免水浸 远程连接。 如何在此链/流中实施背压? 卡夫卡消费 - >共享演员 - >卡夫卡生产者 从代码一些片断: ReactiveKafka kafka = new ReactiveKafka(); Subsc

    2热度

    1回答

    我一直读了几文档的背压RxJava,但我不能找到喜欢它是如何在库内部发生的详细解释,每个人都只是概括它像“生产者”是太快, “消费者”太慢了。 例如像下面的代码: Observable.interval(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .subscribe( i -> {

    0热度

    1回答

    我一直在挣扎一段时间,我相信这是一个非常基本的问题。 我有一个Flowable从网络中检索一捆物品并发出它们。 Flowable .create(new FlowableOnSubscribe<Item>() { @Override public void subscribe(FlowableEmitter<Item> emitter) throws Except

    2热度

    1回答

    我用阿卡流“ActorPublisher演员作为流每个连接的数据Source发送到传入的WebSocket或HTTP连接。 ActorPublisher的contract是定期通过提供需求请求数据 - 下游可接受的元素数量。如果需求为0,我不应该发送更多元素。我观察到,如果我缓冲元素,当消费者速度缓慢时,缓冲区大小在1到60之间波动,但大多数在40-50之间。 要流我使用阿卡-HTTP“s到的We

    4热度

    1回答

    最近我意识到我不明白RxJava2背压是如何工作的。 我做了小测试,我希望它应该会失败,MissingBackpressureException例外: @Test public void testBackpressureWillFail() { Observable.<Integer>create(e -> { for (int i = 0; i < 10000; i++)

    3热度

    1回答

    我想测试一些Akka流功能,如conflate。为此,我需要在简单的单元测试中构建一个不受背压影响的源。天真的尝试,如 Source.tick(1.milli, 1.milli, "tick").map(_ => Random.nextDouble()) 由于背压不起作用。 OTOH通过HTTP可能是矫枉过正。 如何创建一个简单的Source对于不受背压影响的单元测试?

    4热度

    1回答

    我正在分析Spark结构化流式处理中的背压功能。有谁知道细节?是否有可能通过代码调整处理传入记录? 谢谢