2016-08-16 45 views
1

我在我的服务器机器上运行单节点kafka。 我使用以下命令来创建主题“bin/kafka-topics.sh --create --zookeeper localhost:2181 - 复制因子1 - 部分1 - 主题测试”。 我有两个logstash实例正在运行。第一个从一些Java应用程序日志文件读取数据将同样注入到kafka中。 它工作正常,我可以在控制台上使用“bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning”命令查看数据。但是另一个从kafka(相同主题“测试”)读入并注入elasicsearch的logstash实例失败了。 Logstash的第二个实例无法读取数据。我改变了它的配置文件从卡夫卡到阅读和打印在控制台上,那么也不会输出anything.Here是未能logstash配置文件:Logsatsh和kafka

// config file 
    input { 
     kafka { 
     zk_connect => "localhost:2181" 
     topic_id => "test" 
     } 
     } 
     output { 
     stdout{} 
     } 

Logstash没有打印任何东西,也不抛出任何错误。 我正在使用Logstash 2.4和kafka 0.10。 我用卡夫卡快速入门指南(http://kafka.apache.org/documentation.html#quickstart

+0

你确定你有一个Zookeeper实例在localhost上运行吗? – Val

回答

1

如果你看卡夫卡输入plugin configuration,你可以看到一个重要的参数,它允许连接到卡夫卡集​​群:zk_connect

根据文档,它默认设置为localhost:2181。确保将它设置为Kafka集群实例,或者理想情况下为多个实例,具体取决于您的设置。

例如,假设您要连接到带有JSON主题的三节点Kafka集群。配置如下:

kafka { 
topic_id => "your_topic" 
zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181" 
} 

此外,重要的是配置主题的正确编解码器。上面的例子将使用JSON事件。如果您使用Avro,则需要设置另一个参数 - codec。有关如何配置的详细信息,请参阅on the documentation page。它基本上需要指向Avro模式文件,这可以作为avsc文件或模式注册表端点(在我看来更好的解决方案)给出。

如果您的架构注册表在您的Kafka环境中运行,您可以将编解码器指向它的url。一个完整的例子是:

kafka { 
codec => avro_schema_registry { endpoint => "http://kc1.host:8081"} 
topic_id => "your_topic" 
zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181" 
} 

希望它的作品!

+0

嗨wjp,我正在运行单节点kafka集群,kafka中的数据是一些xmls。没有Schema Registry正在运行。我检查了zookeeper,它也在运行。 –

1
@wjp 
Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running. I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test". I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash: 
input { 
kafka { 
zk_connect => "localhost:2181" 
topic_id => "test" 
} 
} 
output { 
stdout{} 
} 
Logstash neither print anything nor it throws any error. 
I am using Logstash 2.4 and kafka 0.10. 
I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)