2016-02-26 163 views
0

我试图建立管道与Apache水槽: spooldir - >卡夫卡通道 - > HDFS沉阿帕奇水槽:kafka.consumer.ConsumerTimeoutException

活动去卡夫卡的话题没有问题,我可以看到他们kafkacat请求。但kafka频道无法通过接收器将文件写入hdfs。错误是:

超时等待数据来自卡夫卡

完整的日志:

2016年2月26日18:25:17125 (SinkRunner-PollingRunner-DefaultSinkProcessor -SendThread(zoo02:2181)) [调试 - org.apache.zookeeper.ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)的会话ID 得到ping响应:0毫秒

后0x2524a81676d02aa

2016年2月26日18:25:19127 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [调试 - org.apache.zookeeper.ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)用于的sessionid GOT ping响应:后1毫秒

2016年2月26日18 0x2524a81676d02aa:25:21129 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [调试 - org.apache.zookeeper .ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)] 0ms后得到对sessionid:0x2524a81676d02aa的ping响应

2016年2月26日18:25:21775 (SinkRunner-PollingRunner-DefaultSinkProcessor)[调试 - org.apache.flume.channel.kafka.KafkaChannel $ KafkaTransaction.doTake(KafkaChannel.java:327)] 超时在等待数据时来自卡夫卡 kafka.consumer.ConsumerTimeoutException在 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) 在 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) 在 在kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) 在 org.apache.flume.chan nel.kafka.KafkaChannel $ KafkaTransaction.doTake(KafkaChannel.java:306) 在 org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) 在 org.apache.flume.channel.BasicChannelSemantics。采取(BasicChannelSemantics.java:95) 在 org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) 在 org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68) 在org.apache.flume.SinkRunner $ PollingRunner.run(SinkRunner.java:147) 在java.lang.Thread.run(Thread.java:745)

我的水槽的配置是:

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c2 

# Describe/configure the source 
a1.sources.r1.type = spooldir 
a1.sources.r1.spoolDir = /home/alex/spoolFlume 

a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://10.12.0.1:54310/logs/flumetest/ 
a1.sinks.k1.hdfs.filePrefix = flume- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel 
a1.channels.c2.capacity = 10000 
a1.channels.c2.transactionCapacity = 1000 
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092 
a1.channels.c2.topic=flume_test_001 
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c2 
a1.sinks.k1.channel = c2 

随着内存通道,而不是卡夫卡频道的所有作品好。

感谢提前任何想法!

+0

你找到一个解决这个问题,我也很是停留在同一个地方,甚至当我通过配置设置consumer.timeout.ms,它没有得到重写和水槽持续100毫秒默认工作 – mbaxi

+0

都能跟得上我没有找到卡夫卡通道的任何解决方案。内存频道运行良好,在制作6个月内没有问题。 – Samriang

+1

我是能够解决的超时问题,在水槽卡夫卡通道代码硬编码值设置为财产consumer.timeout.ms 100 ms..I固定的代码从配置看,而不是就不见了错误 – mbaxi

回答

0

ConsumerTimeoutException意味着有很长一段时间没有新的消息,并不意味着连接超时卡夫卡。

http://kafka.apache.org/documentation.html

consumer.timeout.ms -1抛出一个超时异常消费者如果指定的时间间隔后没有消息可供消费

+0

你的意思是我必须改变一些卡夫卡属性? 你怎样看待管道“spooldir - >卡夫卡通道 - > HDFS汇”有什么看法?可能应该有一些技巧 - 另一个水槽实例等.. – Samriang

0

卡夫卡的ConsumerConfig类有“consumer.timeout.ms”配置属性,Kafka默认设置为-1。预计任何新的卡夫卡消费者都将以合适的价格取代该物业。

下面是从Kafka documentation的引用:

consumer.timeout.ms  -1 
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value. 

当水槽创建卡夫卡信道,它是在timeout.ms值设置为100,上在INFO级的水槽日志所见。这就解释了为什么我们会看到大量的这些ConsumerTimeoutException。

level: INFO Post-validation flume configuration contains configuration for agents: [agent] 
level: INFO Creating channels 
level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type 
level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel 
level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type 
level: INFO Group ID was not specified. Using flume as the group id. 
level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume, 
       zookeeper.connect=zookeeper:2181, **consumer.timeout.ms=100**, auto.commit.enable=false} 
level: INFO Created channel c1 

通过Kafka channel settings水槽用户指南去,我试图重写通过指定低于此值,但似乎并没有工作,虽然:

agent.channels.c1.kafka.consumer.timeout.ms=5000 

此外,我们做了一个负载通过频道不停地敲击数据进行测试,并且在测试过程中没有发生这种异常。

+0

是的。我可以确认设置选项'agent.channels.c1.kafka.consumer.timeout.ms'在这种情况下是无用的。 – Samriang

0

我读水槽的源代码,并发现水槽中读取密钥“超时”为“consumer.timeout.ms”的价值。

所以,你可以为“consumer.timeout.ms”像这样的配置值:

agent1.channels.kafka_channel.timeout=-1