2016-07-25 107 views
0

我有一个Spark Streaming作业输出一些当前存储在HDFS中的日志,我想用logstash处理它们。不幸的是,虽然有一个插件可以在hdfs中写入logstash,但它实际上是从hdfs中读取的如何将Spark输出链接到Logstash输入

我有搜索解决方案来链接这两个部分,但在python api的Spark流中,存储某些东西的唯一方法是将它作为文本文件写入hdfs,所以我必须从hdfs读取! 我无法在本地保存它们,因为Spark在群集上运行,并且我不想从每个节点获取所有数据。

目前我运行一个非常脏的脚本,每2秒将hdfs目录的内容复制到本地。但是这个解决方案显然不令人满意。

有没有人知道一个软件可以帮助我把Spark的输出发送到Logstash?

在此先感谢!

编辑:我使用Python &星火1.6.0

+0

这些是由Log4j生成的日志吗? –

+0

不,这是由Spark处理的apache日志,它基于机器学习算法增加了一些功能。 –

回答

0

这似乎是使用Kafka十全十美的工作。在Spark Streaming作业中,写入Kafka,然后使用Logstash中的记录。

stream.foreachRDD { rdd => 
    rdd.foreachPartition { partition => 
    val producer = createKafkaProducer() 
    partition.foreach { message => 
     val record = ... // convert message to record 
     producer.send(record) 
    } 
    producer.close() 
    } 
}