我正在尝试创建一个模拟服务器,以帮助更轻松地为在事件流上运行的客户端编写测试。我如何使用TestScheduler和RxNetty http服务器
我正在用RxNetty实现服务器和客户端,我的问题是如何使用RxNetty的TestScheduler来控制何时发生事件。
这是我(简化)服务器:
final Observable<Event> events = Observable.just(...);
final TestScheduler testScheduler = new TestScheduler();
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, (request, response) -> {
// [snip] validate request...
return request.getContent().flatMap(buf ->
events
.zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, testScheduler), (e, t) -> e)
.map(Encoder::eventToBytes)
.flatMap(response::writeBytesAndFlush)
);
});
server.start();
然而,当我指定interval
应该使用testScheduler
服务器永远不会发送任何事件。如果我删除testScheduler
或Schedulers.computation()
或Schedulers.io()
服务器能够发送事件。
下面是能接收来自服务器事件的客户端/使用侧:
final int serverPort = server.getServerPort();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
final HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final Event event = new Event("some event")
final TestScheduler clientReceiveScheduler = new TestScheduler();
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
.withHeader("Content-Type", "application/x-protobuf")
.withHeader("Accept", "application/x-protobuf")
.withContent(event.toByteArray());
final Observable<byte[]> observable = client.submit(request)
.flatMap(AbstractHttpContentHolder::getContent)
.map(BufUtils::bufToBytes)
.zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, clientReceiveScheduler), (b, l) -> b)
.doOnNext((b) -> LOGGER.info("Received bytes: {}", Arrays.toString(b)));
final TestSubscriber<Event> sub = new TestSubscriber<>();
observable.subscribe(sub);
testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
clientReceiveScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
sub.getOnNextEvents()
.stream()
.forEach(System.out::println);
这里是一个链接到我的full server和test case试图使用的服务器。
感谢@ user2607715我一直能够以类似于您在此处描述的方式获得间隔内容与测试计划程序一起工作,我甚至已经将它与RxNetty一起用于接收事件流。但是,我无法使它与发送消息的服务器一起工作。我会更新我的问题,以显示我在客户端做的事情。 –
@BenWhitehead代码的问题在于,服务器接收到的请求与'testScheduler'时间先进之间存在固有的竞争条件。 为了证明这个假设,如果你在推进测试调度器之前添加睡眠,你会看到你得到了预期的响应。 –
感谢@Nitesh康德我会研究我能做些什么来解决这个问题。我错误地猜测服务器线程不会受到testScheduler的影响。 –