5

我想从kinesis firehose中将数据接收到s3中,格式化为实木复合地板。到目前为止,我刚刚找到一个解决方案,意味着创建一个EMR,但我正在寻找更便宜和更快的东西,比如将接收的json直接从firehose存储为parquet或使用Lambda函数。从AWS Kinesis fireshose写入实木复合地板到AWS S3

非常感谢你, 哈维。

回答

8

对付AWS支持服务之后,不同的实现百,我想解释一下我所取得的成就。

最后,我已经创建了一个处理由室壁运动流水生成的每个文件lambda函数,根据有效载荷分类我的活动,并将结果存储在S3木地板的文件。

这样做,是不是很容易的:

你应该创建一个Python虚拟ENV,包括所有需要的库文件(在我的情况熊猫,NumPy的,Fastparquet等)的
  1. 第一。 由于结果文件(包括所有库和我的Lambda函数很重,需要启动一个EC2实例,我使用了免费层中包含的实例)。要创建虚拟ENV请按照下列步骤操作:

    • 登录在EC2
    • 创建一个文件夹名为拉姆达(或任何其他名称)
    • 须藤荫-y更新
    • 须藤荫-y升级
    • 须藤荫-y groupinstall “开发工具”
    • 须藤荫-y安装BLAS
    • 须藤荫-y安装LAPACK
    • 须藤荫-y安装atlas-SSE3-devel的
    • 须藤荫安装python27-devel的python27-PIP GCC
    • VIRTUALENV ENV
    • 源ENV /斌/激活
    • PIP安装boto3
    • PIP安装fastparquet
    • PIP安装熊猫
    • PIP安装thriftpy
    • PIP安装s3fs
    • pip install(任何其他需要的库)
    • find〜/ lambda/env/lib */python2.7/site-packages/-name“* .so”| xargs的剥离
    • PUSHD ENV/LIB/python2.7 /站点包/
    • 拉链-r -9 -q〜/ lambda.zip *
    • Popd的
    • PUSHD ENV/lib64下/ python2.7 /站点包/
    • 拉链-r -9 -q〜/ lambda.zip *
    • Popd的
  2. 创建lambda_function propertly:

    import json 
    import boto3 
    import datetime as dt 
    import urllib 
    import zlib 
    import s3fs 
    from fastparquet import write 
    import pandas as pd 
    import numpy as np 
    import time 
    
    def _send_to_s3_parquet(df): 
        s3_fs = s3fs.S3FileSystem() 
        s3_fs_open = s3_fs.open 
        # FIXME add something else to the key or it will overwrite the file 
        key = 'mybeautifullfile.parquet.gzip' 
        # Include partitions! key1 and key2 
        write('ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df, 
          compression='GZIP',open_with=s3_fs_open)    
    
    def lambda_handler(event, context): 
        # Get the object from the event and show its content type 
        bucket = event['Records'][0]['s3']['bucket']['name'] 
        key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']) 
        try: 
         s3 = boto3.client('s3') 
         response = s3.get_object(Bucket=bucket, Key=key) 
         data = response['Body'].read() 
         decoded = data.decode('utf-8') 
         lines = decoded.split('\n') 
         # Do anything you like with the dataframe (Here what I do is to classify them 
         # and write to different folders in S3 according to the values of 
         # the columns that I want 
         df = pd.DataFrame(lines) 
         _send_to_s3_parquet(df) 
        except Exception as e: 
         print('Error getting object {} from bucket {}.'.format(key, bucket)) 
         raise e 
    
  3. 复制lambda函数的lambda.zip和部署lambda_function:

    • 回到你的EC2实例,并添加所需的拉链lambda函数:拉链-9 lambda.zip lambda_function.py (lambda_function.py是在步骤2中生成的文件)
    • 将生成的zip文件复制到S3,因为它非常繁重,无法通过S3进行部署。 aws s3 cp lambda.zip s3:// support-bucket/lambda_packages/
    • 部署lambda函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages /lambda.zip
  4. 触发时你喜欢的要执行,例如,每一个新的文件在S3创建,甚至你可以在lambda函数到流水相关联的时间。 (我没有选择这个选项,因为'lambda'限制低于Firehose限制,您可以配置Firehose每个128Mb或15分钟写入一个文件,但是如果将此lambda函数与Firehose关联,则将执行lambda函数每3分钟或5MB,在我的情况下,我产生了很多小木地板文件的问题,因为每次启动lambda函数时,我都会生成至少10个文件)。

7

亚马逊室壁运动流水接收流式传输的记录,并且可以将它们存储在亚马逊S3(或Amazon红移或Amazon Elasticsearch服务)。

每条记录​​最大可达1000KB。

Kinesis flow

然而,记录被一起追加到一个文本文件中,有基于时间或大小配料。传统上,记录是JSON格式。

您将是无法发送实木复合地板文件,因为它不符合此文件格式。

是可能的触发LAMBDA数据变换功能,但这将不能够或者输出一个拼花文件。

实际上,考虑到镶木地板文件的性质,您不可能一次构建一个记录。作为一种列式存储格式,我怀疑它们确实需要在批处理中创建,而不是每个记录附加数据。

底线:都能跟得上。

+0

非常感谢你。 – bracana

+0

Hi @Javi,如果这个或任何答案已解决您的问题,请点击复选标记考虑[接受它](http://meta.stackexchange.com/q/5234/179419)。这向更广泛的社区表明,您已经找到了解决方案,并为答复者和您自己提供了一些声誉。没有义务这样做。 –

+0

@JohnRotenstein您可以让lambda对Firehose的每个缓冲时间/大小的批处理进行转换,并且稍后每隔几个小时将Parquet文件拼接成更大的大小?这可让您通过FireHose将JSON流式传输到Parquet,以便在Athena中获得接近实时的数据,并且仍然可以获得Parquet的性能优势。 – earmouse