我试图在Kafka消息流上进行流处理和CEP。为此,我选择了Apache Ignite来首先实现原型。但是我无法连接到队列:Apache Ignite Kafka连接问题
使用 kafka_2.11-0.10.1.0 阿帕奇点燃织物-1.8.0-斌
斌/ zookeeper-server-start.sh配置/饲养员。属性 斌/ kafka-server-start.sh配置/ server.properties 斌/ kafka-topics.sh --create --zookeeper本地主机:2181 --replication因子1 --partitions 1 --topic测试
卡夫卡正常工作,我与消费者进行了测试。 然后我开始点燃,然后运行下面的弹簧启动命令行应用程序。
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
} finally {
kafkaStreamer.stop();
}
应用程序启动时我得到
2017年2月23日10:25:23.409 WARN 1388 --- [主] kafka.utils.VerifiableProperties:房产bootstrap.servers无效 2017年-02-23 10:25:23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties:属性group.id被覆盖测试 2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties:属性key.deserializer无效 2017-02-23 10:25:23.411警告1388 --- [main] kafka.utils.VerifiableProperties:属性key.serializer无效 2017-0 2-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:属性value.deserializer无效 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka。 utils.VerifiableProperties:属性value.serializer无效 2017年2月23日10:25:23.411 INFO 1388 --- [主要] kafka.utils.VerifiableProperties:属性zookeeper.connect被覆盖为localhost:2181
然后
2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $:从主题[Set(test)]的相关ID 0获取主题元数据经纪人[BrokerEndPoint(0,user.local,9092)]失败
java.nio.channels.ClosedChannelException:null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:80)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:79)〜[kafka_2.11 -0.10.0.1.jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils $ .fetchTopicMetadata( ClientUtils.scala:59)[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:94)[kafka_2.11-0.10.0.1.jar:na] 在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManag 66)[kafka_2.11-0.10.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[kafka_2.11-0.10.0.1.jar:na]
从队列中读取不起作用。 有没有人有一个想法如何解决这个问题?
编辑:如果我评论finally块,然后下面的错误的内容来
[2m2017-02-27 16时42分27秒。780 [0; 39m [35m29946 [0; 39m [2m ... [0; 39m [2m [pool-3-thread-1] [0; 39m [36m [0; 39m [2m: [0; 39m由于错误消息被忽略[msg = MessageAndMetadata(test,0,Message(magic = 1,attributes = 0,CreateTime = -1,crc = 2558126716,key = java.nio.HeapByteBuffer [pos = 0 lim = 1 cap = 79],payload = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74]),15941704,kafka.serializer.StringDecoder @ 74a96647,kafka.serializer.StringDecoder @ 42849d34,-1,CreateTime )]
java.lang.IllegalStateException:数据流传输已关闭。 at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal。处理器.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl。 java:667)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run(KafkaStreamer.java:180)〜[ignite-kafka-1.8 .0.jar:1.8.0] at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)[na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask。 java:266)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j ava:1142)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)[na:1.8.0_111] at java.lang.Thread.run(Thread。 java:745)[na:1.8.0_111]
谢谢!
感谢您的回答,如果我评论“finally”块的内容,我收到上面发布的错误(在编辑中) – razvan
这是因为您也关闭了'IgniteDataStreamer'。摆脱尝试与资源块,它会工作。 –
嗨,我没有得到应用程序运行(因为我不知道如何从缓存中读取),但至少我没有得到错误了。所以我会将这个问题标记为答案,并为其余的开启一个新的问题。再次感谢,也许你也可以看看https://stackoverflow.com/questions/42562766/how-to-properly-read-from-ignite-cache – razvan