0
IM与卡夫卡合作生产的消息,和我做了一个这样的制片人:卡夫卡是缓慢的第一秒
synchronized (obj) {
while (true){
long start = Instant.now().toEpochMilli();
for (int i=0; i< NUM_MSG_SEC ; i++)
{
PriceStreamingData data = PriceStreamingData.newBuilder()
.setUser(getRequest().getUser())
.setSecurity(getRequest().getSecurity())
.setTimestamp(Instant.now().toEpochMilli())
.setPrice(new Random().nextDouble()*200)
.build();
record = new ProducerRecord<>(topic, keyBuilder.build(data),
data);
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
counter.incrementAndGet();
if(arg1 != null){
arg1.printStackTrace();
}
}
});
}
long diffCiclo = Instant.now().toEpochMilli() - start;
long diff = Instant.now().toEpochMilli() - startTime;
System.out.println("Number of sent: " + counter.get() +
" Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff);
try {
if(diffCiclo >= 1000){
System.out.println("over 1 second: " + diffCiclo);
}
else {
obj.wait(1000 - diffCiclo);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
,你可以看到它是非常简单,它只是做一个新的消息,并将其发送。 如果我看到日志:
NumberOfSent/Diff(K)
在第一个10秒就进行非常糟糕的只是
30k per second
60秒后,我有
180k per second
为什么呢?我怎么能已经开始这个过程已经达到180K?
我卡夫卡制片配置是Follwing
Async producer (but also with sync producer the situation dose not change)
ACKS_CONFIG = 0
BATCH_SIZE_CONFIG = 20000
COMPRESSION_TYPE_CONFIG = none
LINGER_MS_CONFIG = 0
最后的细节:
NUM_MSG_SEC is set to 200000 or bigger number
是否有其他的东西会锁定'obj'导致延迟?什么时候'isRunning()'返回true? –
没有别的东西锁在obj上,没有别的原因导致延迟,我认为延迟是在某处,但不在我的代码中,我的代码非常简单,我认为它是围绕kafka配置的东西,我想,(isRunning永远是真的) –
也许在'synchronized(obj)'之后加上一个日志语句来确定你的代码何时被实际执行。也许还可以在调试中运行,以便在代码执行之前查看发生了什么。 –