project-reactor

    0热度

    1回答

    我对开始时的反应式编程有点新兴。在参加了关于reactor和spring 5.0的一些讨论之后,我想自己试试这个编程模型。 我有一个应用程序使用WebClient从不同的API构建两个Flux对象。我想将这2个对象组合成一个并将其返回给用户。 的代码示例是这样的: public class User { private String username; //getters an

    0热度

    1回答

    我需要在用户注册后发送一些数据。我想在主线程中进行第一次尝试,但是如果有任何错误,我想以10分钟的时间间隔重试5次。 @Override public void sendRegisterInfo(MailData data) { Mono.just(data) .doOnNext(this::send) .doOnError(ex -> logger.warn(

    2热度

    2回答

    这需要回压还是有一个更简单的方法? 例如在下面的代码中,我希望每2秒调用一次旋转函数。有时'旋转'可能需要更长的时间来计算超过2秒的间隔,在这种情况下,我不希望任何间隔排放排队。但在下面的代码中,它们排队。 在下面的代码中,前4个旋转函数调用需要10秒,其余的需要1秒。因此,一旦功能变得更快,Flux.interval排放就会“赶上”。不过,我不希望任何“跟上”发生 import reactor.

    0热度

    1回答

    在reference guide的错误处理部分,我阅读并理解下面的注释,即使错误将通过错误处理运营商处理,原始序列终止。有没有办法处理错误,例如流中的错误被替换为一个值,并且原始序列继续? “在您了解错误处理操作符之前,您必须记住,反应序列中的任何错误都是终止事件,即使使用了错误处理操作符,它也不允许原始序列继续,而是将onError信号转换为新序列的开始(后退一个),因此它将上游序列替换掉。“

    2热度

    1回答

    我试图用kotlin使用新的Spring WebFlux框架。我找不到在哪里我错了,此代码(为myService): fun foo(): Flux<ByteArray> { val client = WebClient.create("http://byte-array-service") return client .get() .uri("/i

    1热度

    1回答

    反应式编程的主要优点是容错,并且可以处理比阻塞实现更多的事件,尽管事件处理通常会发生比较慢。 我不完全理解的是事件的存储方式和位置。我知道有一个事件缓冲区,它可以被调整,但如果队列没有被绑定,缓冲区可以轻松地超载内存,不是吗?该缓冲区可以刷新到磁盘上吗?在内存中使用它不是一个好习惯吗?它可以配置成类似于Lagom事件采集或持久Akka演员事件可以存储在数据库中吗?

    0热度

    1回答

    不会不工作的方法,我试图整合阻挡消费者在反应堆铝SR1一个通量用户。我想使用并行的调度程序来同时执行阻塞操作。 我已经实现一个主类来描述我的意图: package etienne.peiniau; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.co

    0热度

    2回答

    我正在尝试使用自动确认功能使用Reactor Kafka实现卡夫卡主题分区的并发处理。这里的文档使它看起来这是可能的: http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-ordered 说什么我试图是我使用自动应答之间的唯一区别。 我有以下代码(相关法是receiveAuto): public class Kaf

    1热度

    2回答

    我正试图在Flux的Fluxes上实现Reactor中的缓冲过程。内部通量的每个发射都按某种属性分组,并在缓冲区持续时间到期后发射。 以下测试(简化为暴露问题)示出所期望的行为: private static final long STEP_MILLIS = 50; private static final Duration BUFFER_DURATION = Duration.ofMilli

    2热度

    2回答

    一个Flux.take(持续时间) 我使用Spring反应堆堆芯3.0.6,我有一个方法返回一个流量: public Flux<Foo> createFlux(){ return Flux.<List<Foo>,String>generate(/* generator omitted for clarity's sake */) .take(Duration.ofSecond