2017-10-16 106 views
0

我得到了关键的问题我用火箭MQ(V4.1.0-孵化)客户端如下:org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:invokeAsyncImpl调用太快

2017-10-16 16:18:12:457[ERROR][SimpleProducer$1.onException(SimpleProducer.java:44)] - send message to mq fail: 
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsyncImpl invoke too fast 
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeAsyncImpl(NettyRemotingAbstract.java:422) 
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:488) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:368) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:455) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:156) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:417) 
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51) 
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

我不知道即使我谷歌很多,没有任何正确的答案,如何解决它。

这里是ASYN方式我的制片人的代码:

public class SimpleProducer { 

static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class); 
static AtomicInteger total = new AtomicInteger(0); 
private final static CountDownLatch mCountDownLatch = new CountDownLatch(1); 
public static void main(String[] args){ 
    logger.info("Bootstrap start..."); 
    DefaultMQProducer producer = new DefaultMQProducer("Producer"); 
    producer.setNamesrvAddr("192.168.137.112:9876"); 
    try { 
     producer.start(); 
     producer.setRetryTimesWhenSendAsyncFailed(3); 
     long start = System.currentTimeMillis(); 
     for (int i = 0; i < 10000000; i++) 
      try { 
       { 
        Message msg = new Message("newTopic", 
          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        producer.send(msg, new SendCallback(){ 
         public void onSuccess(SendResult sendResult) { 
          if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { 
           //MsgSendResponseCounter.factory.getInstance().onSuccess(); 
           logger.info("Succeeded send {} message, total {}", sendResult.getMsgId(), total.getAndIncrement()); 
          } 
         } 
         public void onException(Throwable ee) { 
          logger.error("send message to mq fail:", ee); 
         } 
        }); 
       } 

      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     mCountDownLatch.await(); 
     long end = System.currentTimeMillis(); 
     logger.info("Finished 10000000 msgs! elapsed time {} in all", (end - start)/1000); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    }finally{ 
     producer.shutdown(); 
    } 
} 

}

+0

,如果我的值设置为0 producer.setRetryTimesWhenSendAsyncFailed方法后(0);一个新的异常如下:org.apache.rocketmq.client.exception.MQClientException:等待响应超时3000ms 欲了解更多信息,请访问url,http://rocketmq.apache.org/docs/faq/ \t at org .apache.rocketmq.client.impl.MQClientAPIImpl $ 1.operationComplete(MQClientAPIImpl.java:416) \t at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51) –

回答

0

答案其实很简单,你把太多太快,达到流量控制阈值。

当您发送aysnc消息时,客户端将尝试通过许可证,在收到经纪人响应后,它将发布许可证。

由于您以异步方式发送消息,其中producer.send()将返回非常快,并且您正在继续在for循环中发送messags,而没有任何将会面临问题的睡眠毫秒。

只是简单地说了Thread.sleep(100)你叫producer.send或使用同步发送不需要SendCallBack