我想从kinesis firehose中将数据接收到s3中,格式化为实木复合地板。到目前为止,我刚刚找到一个解决方案,意味着创建一个EMR,但我正在寻找更便宜和更快的东西,比如将接收的json直接从firehose存储为parquet或使用Lambda函数。从AWS Kinesis fireshose写入实木复合地板到AWS S3
非常感谢你, 哈维。
我想从kinesis firehose中将数据接收到s3中,格式化为实木复合地板。到目前为止,我刚刚找到一个解决方案,意味着创建一个EMR,但我正在寻找更便宜和更快的东西,比如将接收的json直接从firehose存储为parquet或使用Lambda函数。从AWS Kinesis fireshose写入实木复合地板到AWS S3
非常感谢你, 哈维。
对付AWS支持服务之后,不同的实现百,我想解释一下我所取得的成就。
最后,我已经创建了一个处理由室壁运动流水生成的每个文件lambda函数,根据有效载荷分类我的活动,并将结果存储在S3木地板的文件。
这样做,是不是很容易的:
你应该创建一个Python虚拟ENV,包括所有需要的库文件(在我的情况熊猫,NumPy的,Fastparquet等)的第一。 由于结果文件(包括所有库和我的Lambda函数很重,需要启动一个EC2实例,我使用了免费层中包含的实例)。要创建虚拟ENV请按照下列步骤操作:
创建lambda_function propertly:
import json
import boto3
import datetime as dt
import urllib
import zlib
import s3fs
from fastparquet import write
import pandas as pd
import numpy as np
import time
def _send_to_s3_parquet(df):
s3_fs = s3fs.S3FileSystem()
s3_fs_open = s3_fs.open
# FIXME add something else to the key or it will overwrite the file
key = 'mybeautifullfile.parquet.gzip'
# Include partitions! key1 and key2
write('ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
compression='GZIP',open_with=s3_fs_open)
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
try:
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
decoded = data.decode('utf-8')
lines = decoded.split('\n')
# Do anything you like with the dataframe (Here what I do is to classify them
# and write to different folders in S3 according to the values of
# the columns that I want
df = pd.DataFrame(lines)
_send_to_s3_parquet(df)
except Exception as e:
print('Error getting object {} from bucket {}.'.format(key, bucket))
raise e
复制lambda函数的lambda.zip和部署lambda_function:
触发时你喜欢的要执行,例如,每一个新的文件在S3创建,甚至你可以在lambda函数到流水相关联的时间。 (我没有选择这个选项,因为'lambda'限制低于Firehose限制,您可以配置Firehose每个128Mb或15分钟写入一个文件,但是如果将此lambda函数与Firehose关联,则将执行lambda函数每3分钟或5MB,在我的情况下,我产生了很多小木地板文件的问题,因为每次启动lambda函数时,我都会生成至少10个文件)。
亚马逊室壁运动流水接收流式传输的记录,并且可以将它们存储在亚马逊S3(或Amazon红移或Amazon Elasticsearch服务)。
每条记录最大可达1000KB。
然而,记录被一起追加到一个文本文件中,有基于时间或大小配料。传统上,记录是JSON格式。
您将是无法发送实木复合地板文件,因为它不符合此文件格式。
是可能的触发LAMBDA数据变换功能,但这将不能够或者输出一个拼花文件。
实际上,考虑到镶木地板文件的性质,您不可能一次构建一个记录。作为一种列式存储格式,我怀疑它们确实需要在批处理中创建,而不是每个记录附加数据。
底线:都能跟得上。
非常感谢你。 – bracana
Hi @Javi,如果这个或任何答案已解决您的问题,请点击复选标记考虑[接受它](http://meta.stackexchange.com/q/5234/179419)。这向更广泛的社区表明,您已经找到了解决方案,并为答复者和您自己提供了一些声誉。没有义务这样做。 –
@JohnRotenstein您可以让lambda对Firehose的每个缓冲时间/大小的批处理进行转换,并且稍后每隔几个小时将Parquet文件拼接成更大的大小?这可让您通过FireHose将JSON流式传输到Parquet,以便在Athena中获得接近实时的数据,并且仍然可以获得Parquet的性能优势。 – earmouse