2017-10-06 103 views
1

我正在尝试将文件用作我的制作人。源文件连续增长(例如每秒记录20条记录)。下面是类似我的问题后:如何使用增长文件作为Apache Kafka制作者并只读取新添加的数据

How to write a file to Kafka Producer

但是,在这种情况下,整个文件被读取并添加到卡夫卡的话题每一个新行插入文件的时间。我只需要将新添加的行发送到主题(即,如果该文件已包含10行,并且附加了4行,则只需将这4行发送到该主题)。

有没有办法实现这个?

其它的解决方案的尝试:

  1. 阿帕奇水槽通过使用源类型为 'spooldir'。但它没有用,因为它从添加到目录中的新文件读取数据,而不是在将数据附加到已读文件时。

  2. 而且我们试图使用作为“EXEC”和命令如“尾-F /路径/文件名”水槽源类型。这似乎也不起作用。

    使用任何其他工具

建议也欢迎为我的目标是实时读取从文件中的数据(即我尽快所需要的数据,因为它被插入到该文件)。

+0

您是否试过来自Kafka Connect的[FileSource Connector](https://docs.confluent.io/current/connect/connect-filestream/filestream_connector.html)? –

+0

感谢Chin Huang。我不知道连接器。它的工作:) – Sindhu

回答

0

根据您的具体需求,您可以查看几个选项。

卡夫卡连接

正如卡夫卡连接FileSource连接上述由Chin Huang应该能够做你想做的,无需安装额外的软件。请查看Connect Quickstart以获取如何启动和运行的指导,他们实际上有一个将文件读入Kafka的示例。

Logstash

Logstash是这样的经典的选项,其Kafka输出会做只是你想要它做的事,对一个或muliple文件。下面的配置应该给你粗略的你想要的。

input { 
    file { 
    path => "/path/to/your/file" 
    } 
output { 
    kafka { 
     bootstrap_servers => "127.0.0.1:9092" 
     topic_id => "topicname" 
    } 
} 

Filebeat

Filebeat是相当类似Logstash,如果你想从文件中读取数据执行额外的处理,它只是提供了较少的功能。此外,它是用go而不是java编写的,所以它运行的机器上的占用空间应该更小。 下面列出的是让你开始一个最小配置(从内存中,你可能需要添加一个或两个参数,如果他们是强制性的):如果你想重温

filebeat.prospectors: 
- type: log 
    paths: 
    - /path/to/your/file 

output.kafka: 
    hosts: ["127.0.0.1:9092"] 
    topic: 'topicname' 

水槽

你的Flume选项,看看TaildirSource,我没有用过它,但它听起来像它应该很适合你的用例。

+0

所有的解决方案建议为我工作:)感谢一吨:) – Sindhu

相关问题