2017-03-09 62 views
0

我有一个AWS IoT规则将传入的JSON发送到Kinesis Firehose。如何过滤进入AWS Hive表的多行JSON数据

从我的物联网发布的JSON数据是全部在一行上 - 例如:

{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"} 

在管理界面的IOT测试“测试”部分允许你发布的消息,默认为以下(注格式化多-line JSON):

{ 
    "message": "Hello from AWS IoT console" 
} 

我流的流水到S3,然后通过EMR转换为柱状格式最终由雅典娜使用。

问题是,在转换为列格式时,Hive(特别是JSON SerDe)无法处理跨越多行的JSON对象。它会炸毁转换,而不会转换好的单行JSON记录。

我的问题是

  • 你如何设置流水忽略多行JSON?
  • 如果不可能,如何告诉Hive在载入表之前删除换行符,或者至少捕获异常并尝试继续?

我已经开始尝试定义蜂巢表时忽略畸形的JSON:

DROP TABLE site_sensor_data_raw; 
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,  
dateTime8601 timestamp 
) 
PARTITIONED BY(year int, month int, day int, hour int) 
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' 
with serdeproperties (
'ignore.malformed.json' = 'true', 
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis" 
) 
LOCATION 's3://...'; 

这里是我的全HQL,做转换:

--Example of converting to OEX/columnar formats 
DROP TABLE site_sensor_data_raw; 
CREATE EXTERNAL TABLE site_sensor_data_raw (
    count int, 
    dateTime8601 timestamp 
) 
PARTITIONED BY(year int, month int, day int, hour int) 
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' 
with serdeproperties (
'ignore.malformed.json' = 'true', 
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis" 
) 
LOCATION 's3://bucket.me.com/raw/all-sites/'; 

ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15'; 
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16'; 
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17'; 

DROP TABLE to_orc; 
CREATE EXTERNAL TABLE to_orc (
     count int, 
     dateTime8601 timestamp 
) 
STORED AS ORC 
LOCATION 's3://bucket.me.com/orc' 
TBLPROPERTIES ("orc.compress"="ZLIB"); 

INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15; 

回答

2

好了,默认JSON SERDE的使用在EMR和雅典娜不能在多行json记录上工作。每个JSON记录应该在一行中。

在多线JSON,我看到蜂巢/ Hadoop的两个问题,甚至普雷斯托的(在Athean使用)的角度

  • 给定一个文件,其明显的蜂巢/ Hadoop和JSON的SERDE的将不能识别json记录的结束和开始以返回其对象表示。
  • 鉴于多个文件,多行JSON文件不像正常/ n分隔的JSON文件那样可拆分。

为了解决从EMR /雅典娜结束这个问题,你需要编写自己的定制SERDE的基于数据结构和捕获异常等

你如何设置流水忽略多行JSON?

Firehose无法忽略特定的格式。它将采用任何正在使用其API(PutRecord或PutRecordBatch)作为数据blob并将它发送到目的地。

http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

不管怎样,AWS流水提供数据转换与AWS LAMBDA在那里你可以使用lambda函数来转换数据上的流水输入数据,并把转换后的数据到目的地。所以,你可能会使用该功能来识别并压扁多线JSON。如果格式不正确,你也可能会丢失记录。你将需要探索IOT如何发送多行json数据来排出(比如逐行等)来编写你自己的函数。

https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

如果不可能,怎么你告诉蜂房之前 装载除去新行到表或至少捕获异常,并试图继续?

如果您的firehose目标中仍然有多行JSON,由于您在ETL中有EMR,因此可以使用其计算而不是Lambda来平展JSON。火花上的这个功能也可以帮助你。 https://issues.apache.org/jira/browse/SPARK-18352

然后,您可以获取这些数据,创建为雅典娜工作的柱状格式。