2015-12-22 56 views
0

我正在尝试创建一个模拟服务器,以帮助更轻松地为在事件流上运行的客户端编写测试。我如何使用TestScheduler和RxNetty http服务器

我正在用RxNetty实现服务器和客户端,我的问题是如何使用RxNetty的TestScheduler来控制何时发生事件。

这是我(简化)服务器:

final Observable<Event> events = Observable.just(...); 
    final TestScheduler testScheduler = new TestScheduler(); 
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, (request, response) -> { 

     // [snip] validate request... 

     return request.getContent().flatMap(buf -> 
      events 
       .zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, testScheduler), (e, t) -> e) 
       .map(Encoder::eventToBytes) 
       .flatMap(response::writeBytesAndFlush) 
     ); 
    }); 
    server.start(); 

然而,当我指定interval应该使用testScheduler服务器永远不会发送任何事件。如果我删除testSchedulerSchedulers.computation()Schedulers.io()服务器能够发送事件。

下面是能接收来自服务器事件的客户端/使用侧:

final int serverPort = server.getServerPort(); 
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort)); 
    final HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) 
     .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) 
     .build(); 

    final Event event = new Event("some event") 

    final TestScheduler clientReceiveScheduler = new TestScheduler(); 
    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath()) 
     .withHeader("Content-Type", "application/x-protobuf") 
     .withHeader("Accept", "application/x-protobuf") 
     .withContent(event.toByteArray()); 

    final Observable<byte[]> observable = client.submit(request) 
     .flatMap(AbstractHttpContentHolder::getContent) 
     .map(BufUtils::bufToBytes) 
     .zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, clientReceiveScheduler), (b, l) -> b) 
     .doOnNext((b) -> LOGGER.info("Received bytes: {}", Arrays.toString(b))); 

    final TestSubscriber<Event> sub = new TestSubscriber<>(); 
    observable.subscribe(sub); 

    testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); 
    clientReceiveScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); 
    sub.getOnNextEvents() 
     .stream() 
     .forEach(System.out::println); 

这里是一个链接到我的full servertest case试图使用的服务器。

回答

0

TestScheduler要求您提前使用其advanceTime*方法来触发任何计划的操作。 Observable.interval()在提供的调度程序上安排一个操作,在本例中为TestScheduler。如果没有在TestScheduler中推进时间,它将永远不会执行这些操作;在这种情况下,打勾号为Observable.interval()

删除RxNetty,如果你看一下这个代码:

TestScheduler scheduler = Schedulers.test(); 
    TestSubscriber<Long> subscriber = new TestSubscriber<>(); 
    Observable.interval(1, TimeUnit.MILLISECONDS, scheduler) 
       .take(1) 
       .subscribe(subscriber); 

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); 

    System.out.println(subscriber.getOnNextEvents().size()); 

输出始终为1 OTOH,如果你注释掉scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);这将始终为0

+0

感谢@ user2607715我一直能够以类似于您在此处描述的方式获得间隔内容与测试计划程序一起工作,我甚至已经将它与RxNetty一起用于接收事件流。但是,我无法使它与发送消息的服务器一起工作。我会更新我的问题,以显示我在客户端做的事情。 –

+0

@BenWhitehead代码的问题在于,服务器接收到的请求与'testScheduler'时间先进之间存在固有的竞争条件。 为了证明这个假设,如果你在推进测试调度器之前添加睡眠,你会看到你得到了预期的响应。 –

+0

感谢@Nitesh康德我会研究我能做些什么来解决这个问题。我错误地猜测服务器线程不会受到testScheduler的影响。 –