0
public class MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
private void run() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
String topic = "mytopic";
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 2);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
int thread = 0;
LOGGER.info("size: {}", streams.size());
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (final KafkaStream stream : streams) {
final int tid = thread++;
LOGGER.info("submit thread {}", tid);
executorService.execute(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
byte[] key = messageAndMetadata.key();
String message = new String(messageAndMetadata.message());
LOGGER.info("key: {} message: {} thread: {}", key, message, tid);
}
}
});
}
if (consumerConnector != null)
consumerConnector.shutdown();
}
public static void main(String[] args) {
new MessageHandler().run();
}
}卡夫卡多线程comsumer抛出ClosedChannelException
运行此消费后,我得到这个异常:
WARN 2016-08-13 22:46:56.969] [testgroup_debian-1471099616127-8c8586c4-leader-finder-thread] kafka.utils.Logging$class.warn(Logging.scala:89) [Fetching topic metadata with correlation id 0 for topics [Set(mytopic)] from broker [BrokerEndPoint(0,debian,9092)] failed]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
为什么会出现这种异常? 经纪人和动物园管理员的配置应该没问题,因为我可以使用控制台生产者/消费者发送/接收消息。
那么,如果我只能使用一个线程,那么获取多个流(主题计数> 1)有什么意义?迭代器会阻塞,我永远不会访问其他流。 –