我试图建立管道与Apache水槽: spooldir - >卡夫卡通道 - > HDFS沉阿帕奇水槽:kafka.consumer.ConsumerTimeoutException
活动去卡夫卡的话题没有问题,我可以看到他们kafkacat请求。但kafka频道无法通过接收器将文件写入hdfs。错误是:
超时等待数据来自卡夫卡
完整的日志:
2016年2月26日18:25:17125 (SinkRunner-PollingRunner-DefaultSinkProcessor -SendThread(zoo02:2181)) [调试 - org.apache.zookeeper.ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)的会话ID 得到ping响应:0毫秒
后0x2524a81676d02aa2016年2月26日18:25:19127 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [调试 - org.apache.zookeeper.ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)用于的sessionid GOT ping响应:后1毫秒
2016年2月26日18 0x2524a81676d02aa:25:21129 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [调试 - org.apache.zookeeper .ClientCnxn $ SendThread.readResponse(ClientCnxn.java:717)] 0ms后得到对sessionid:0x2524a81676d02aa的ping响应
2016年2月26日18:25:21775 (SinkRunner-PollingRunner-DefaultSinkProcessor)[调试 - org.apache.flume.channel.kafka.KafkaChannel $ KafkaTransaction.doTake(KafkaChannel.java:327)] 超时在等待数据时来自卡夫卡 kafka.consumer.ConsumerTimeoutException在 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) 在 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) 在 在kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) 在 org.apache.flume.chan nel.kafka.KafkaChannel $ KafkaTransaction.doTake(KafkaChannel.java:306) 在 org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) 在 org.apache.flume.channel.BasicChannelSemantics。采取(BasicChannelSemantics.java:95) 在 org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) 在 org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68) 在org.apache.flume.SinkRunner $ PollingRunner.run(SinkRunner.java:147) 在java.lang.Thread.run(Thread.java:745)
我的水槽的配置是:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c2
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181
# Bind the source and sink to the channel
a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2
随着内存通道,而不是卡夫卡频道的所有作品好。
感谢提前任何想法!
你找到一个解决这个问题,我也很是停留在同一个地方,甚至当我通过配置设置consumer.timeout.ms,它没有得到重写和水槽持续100毫秒默认工作 – mbaxi
都能跟得上我没有找到卡夫卡通道的任何解决方案。内存频道运行良好,在制作6个月内没有问题。 – Samriang
我是能够解决的超时问题,在水槽卡夫卡通道代码硬编码值设置为财产consumer.timeout.ms 100 ms..I固定的代码从配置看,而不是就不见了错误 – mbaxi