0

IM与卡夫卡合作生产的消息,和我做了一个这样的制片人:卡夫卡是缓慢的第一秒

synchronized (obj) { 

     while (true){ 

      long start = Instant.now().toEpochMilli(); 
      for (int i=0; i< NUM_MSG_SEC ; i++) 
      { 

       PriceStreamingData data = PriceStreamingData.newBuilder() 
         .setUser(getRequest().getUser()) 
         .setSecurity(getRequest().getSecurity()) 
         .setTimestamp(Instant.now().toEpochMilli()) 
         .setPrice(new Random().nextDouble()*200) 
         .build(); 


       record = new ProducerRecord<>(topic, keyBuilder.build(data), 
         data); 



       producer.send(record,new Callback(){ 
        @Override 
        public void onCompletion(RecordMetadata arg0, Exception arg1) { 
         counter.incrementAndGet(); 
         if(arg1 != null){ 
          arg1.printStackTrace(); 
         } 


        } 
       }); 

      } 
      long diffCiclo = Instant.now().toEpochMilli() - start; 
      long diff = Instant.now().toEpochMilli() - startTime; 


      System.out.println("Number of sent: " + counter.get() + 
        " Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff); 

      try { 
       if(diffCiclo >= 1000){ 
        System.out.println("over 1 second: " + diffCiclo); 

       } 
       else { 
        obj.wait(1000 - diffCiclo); 

       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 




     } 
    } 

,你可以看到它是非常简单,它只是做一个新的消息,并将其发送。 如果我看到日志:

NumberOfSent/Diff(K) 

在第一个10秒就进行非常糟糕的只是

30k per second 

60秒后,我有

180k per second 

为什么呢?我怎么能已经开始这个过程已经达到180K?

我卡夫卡制片配置是Follwing

Async producer (but also with sync producer the situation dose not change) 
    ACKS_CONFIG = 0 
    BATCH_SIZE_CONFIG = 20000 
    COMPRESSION_TYPE_CONFIG = none 
    LINGER_MS_CONFIG = 0 

最后的细节:

NUM_MSG_SEC is set to 200000 or bigger number 
+0

是否有其他的东西会锁定'obj'导致延迟?什么时候'isRunning()'返回true? –

+0

没有别的东西锁在obj上,没有别的原因导致延迟,我认为延迟是在某处,但不在我的代码中,我的代码非常简单,我认为它是围绕kafka配置的东西,我想,(isRunning永远是真的) –

+0

也许在'synchronized(obj)'之后加上一个日志语句来确定你的代码何时被实际执行。也许还可以在调试中运行,以便在代码执行之前查看发生了什么。 –

回答

0

我发现我自己的解决方案,我希望这篇文章可以为别人有用的。

问题的立场在

ProducerConfig.BATCH_SIZE_CONFIG 

ProducerConfig.LINGER_MS_CONFIG 

我的参数为20000和0,为了解决这个问题,我没有他们将其设置为较高的值200000 1000最后我开始使用参数的JVM:

-XX:MinMetaspaceFreeRatio=100 
-XX:MaxMetaspaceFreeRatio=100 

因为我看到它需要更长的时间来设置元空间到一个体面的价值。

现在生产者开始直接在140k和1秒已经是180k。