2017-04-05 190 views
1

我正在写一个程序,将时间系列数据从kafka保存到hadoop。我这样设计的目录结构:将时间序列数据写入按月份和日期分区的hdfs中?

event_data 
|-2016 
    |-01 
    |-data01 
    |-data02 
    |-data03 
|-2017 
    |-01 
    |-data01 

因为是一个守护任务,我写了一个基于LRU经理来管理打开的文件并关闭非活动文件中的时间,以避免资源泄漏,但收入数据流不按时间排序,再次打开已存在的文件以追加新数据是很常见的。

我试图使用FileSystem#append()方法打开OutputStream当文件存在,但它在我的HDFS集群上运行的错误(对不起,我不能在这里提供的特定错误,因为这是几个月前,现在我尝试另一种解决方案)。

然后我使用另一种方式来实现我的目标: 当同名文件存在时,向文件名添加序列后缀。现在我在我的hdfs中有很多文件。它看起来很脏。

我的问题是:这种情况的最佳做法是什么?

+0

HDFS并非真正设计用于追加到文件 –

回答

0

对不起,这不是直接回答你的编程问题,但如果你打开所有选项而不是自己实现它,我想分享你我们的经验fluentd和它的HDFS (WebHDFS) Output Plugin。 Fluentd是一个开源的,可插拔的数据收集器,通过它你可以轻松地构建数据管道,它将从输入中读取数据,处理它,然后将其写入指定的输出,在您的场景中,输入是kafka,输出为HDFS。你需要做的是:

  • 配置fluentd input以下fluentd kafka plugin,你将与你的卡夫卡/话题资讯配置的source部分
  • 启用webhdfs,并为您的HDFS集群append操作,你可以找到如何请执行以下操作HDFS (WebHDFS) Output Plugin
  • 配置您的match部分将您的数据写入HDFS,在插件文档页面上有示例。对于分区中的数据通过月份和日期,则可以配置path参数与时间片的占位符,是这样的:

    path "/event_data/%Y/%m/data%d"

使用此选项来收集数据,那么你可以写你的MapReduce作业做ETL或任何你喜欢的东西。

我不知道这是否适合您的问题,只需在此处提供一个选项即可。

+0

我打开任何选项只有它可以解决问题。事实上,我的问题中并没有公布一些细节。 kafka中的消息不是纯文本,而是“protobuf”消息,“timestamp”是“protobuf”消息中的字段。我现在还没有在'fluentd'中找到'protobuf'的明确支持。另一个细节是hdfs中的消息存储格式,是带有utf-8编码的'json'行。 – aLeX

+0

您可以在您的服务中始终处理/转换来自kafka的数据,并将格式良好的数据推送到第三方服务(如fluentd),以负责将数据加载到HDFS的过程,这使您的服务专注于数据转换/只处理并成为整个数据管道中的一个步骤。无论如何,这是您根据您的技术堆栈和偏好选择的选择。对不起,没有线索为您的错误,我只是不能再现它。 – shizhz

相关问题