2017-06-13 288 views
0

我正在做一个输入流速率计。它基本上是一个公开请求流调用的服务,并计算它可以处理的每秒消息的数量。ClientCallStreamObserver isReady永不返回真

由于客户端在发送消息时是完全异步的,我使用ClientCallStreamObserver在流准备就绪时开始发送消息,以避免内存溢出。

客户端代码如下所示:

public static void main(String[] args) throws Exception { 
    ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build(); 
    ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel); 


    StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() { 
     @Override 
     public void onNext(Empty empty) { 

     } 

     @Override 
     public void onError(Throwable throwable) { 
      logger.info("on error response stream"); 
     } 

     @Override 
     public void onCompleted() { 
      logger.info("on completed response stream"); 
     } 
    }); 

    final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream; 

    while (!clientCallObserver.isReady()) { 
     Thread.sleep(2000); 
     logger.info("stream not ready yet"); 
    } 

    counter.setLastTic(System.nanoTime()); 

    while (true) { 
     counter.inc(); 
     if (counter.getCounter() % 15000 == 0) { 
      long now = System.nanoTime(); 
      double rate = (double) NANOSEC_TO_SEC * counter.getCounter()/(now - counter.getLastTic()); 
      logger.info("rate: " + rate + " msgs per sec"); 
      counter.clear(); 
      counter.setLastTic(now); 
     } 
     inputStream.onNext(createRandomTrade()); 
    } 
} 

我过的isReady观察循环永无止境。

OBS:我正在使用kubernetes集群来服务我的测试,服务器正在接收该调用并返回一个StreamObserver实现。

回答

1

isReady最终应该返回true,只要RPC没有错误/立即完成。但是代码没有正确观察流量控制。

在每次致电onNext()后发送请求isReady()可能开始返回false。您的while (true)循环应在每次迭代开始时进行isReady()检查。

代替轮询,最好在呼叫准备发送时通知serverCallObserver.setOnReadyHandler(yourRunnable)。请注意,您应该在yourRunnable之内检查isReady(),因为可能存在虚假/过期的通知。

+0

在发送消息之前将验证更改为正确。 setOnReadyHandler看起来更好,会尝试它。 –

+1

@FelipeJun,你仍然需要阻塞main()直到RPC完成。对于你所展示的代码来说,最简单的事情就是在启动RPC和'channel.awaitTerminated()'之后'channel.shutdown()'。一旦RPC完成,通道将终止并且'awaitTerminated()'将返回。 –