2016-03-02 102 views
4

我正在设置火花流光与kinesis和红移。我每10秒钟从kinesis读取数据,处理它,并使用spark-redshift lib将它写入红移。火花红移花费很多时间写红移

问题是它只花了很多时间只写了300行。

这就是它显示我在控制台

[Stage 56:====================================================> (193 + 1)/200] 

看我的日志df.write.format是这样做的。

我在带有4GB内存和2个核心的亚马逊EC2的机器上运行spark-setup,使用--master local [*]模式运行。

这是我如何创建流

kinesisStream = KinesisUtils.createStream(ssc, APPLICATION_NAME, STREAM_NAME, ENDPOINT, REGION_NAME, INITIAL_POS, CHECKPOINT_INTERVAL, awsAccessKeyId =AWSACCESSID, awsSecretKey=AWSSECRETKEY, storageLevel=STORAGE_LEVEL)  
CHECKPOINT_INTERVAL = 60 
storageLevel = memory 

kinesisStream.foreachRDD(writeTotable) 
def WriteToTable(df, type): 
    if type in REDSHIFT_PAGEVIEW_TBL: 
     df = df.groupby([COL_STARTTIME, COL_ENDTIME, COL_CUSTOMERID, COL_PROJECTID, COL_FONTTYPE, COL_DOMAINNAME, COL_USERAGENT]).count() 
     df = df.withColumnRenamed('count', COL_PAGEVIEWCOUNT) 

     # Write back to a table 

     url = ("jdbc:redshift://" + REDSHIFT_HOSTNAME + ":" + REDSHIFT_PORT + "/" + REDSHIFT_DATABASE + "?user=" + REDSHIFT_USERNAME + "&password="+ REDSHIFT_PASSWORD) 

     s3Dir = 's3n://' + AWSACCESSID + ':' + AWSSECRETKEY + '@' + BUCKET + '/' + FOLDER 

     print 'Start writing to redshift' 
     df.write.format("com.databricks.spark.redshift").option("url", url).option("dbtable", REDSHIFT_PAGEVIEW_TBL).option('tempdir', s3Dir).mode('Append').save() 

     print 'Finished writing to redshift' 

请让我知道采取这一多时间

回答

6

写作既通过Spark和直接红移时,我有过类似的经历的原因。 spark-redshift将始终将数据写入S3,然后使用Redshift复制功能将数据写入目标表。这种方法是编写大量记录的最佳实践和最有效的方法。这种方法还会在写入时产生很多开销,特别是当每次写入的记录数量相对较少时。

看上面的输出,看起来你有大量的分区(大概是200左右)。这很可能是因为spark.sql.shuffle.partitions设置默认设置为200。你可以找到更多的细节in the Spark documentation

该组操作可能会生成200个分区。这意味着您正在对S3执行200次单独的复制操作,每次复制操作在获取连接和完成写入操作时都有相当长的延迟时间。

正如我们在下面的评论中讨论,并在聊天中,您可以在组的结果,凝聚通过为较少的分区进行以下修改到行上面:

df = df.coalesce(4).withColumnRenamed('count', COL_PAGEVIEWCOUNT) 

这将减少数量从200到4的分区以及从副本到S3的开销量为几个数量级。您可以尝试使用分区数来优化性能。您还可以更改spark.sql.shuffle.partitions设置,以根据您正在处理的数据大小和可用内核数量减少分区数量。

+0

不要只写3行的东西需要4分钟左右的时间很多。此外,即使我有5000行写,仍然4分钟是很多时间 – Nipun

+0

哇,我没有意识到它花了那么长时间。在这种情况下,可能发生的情况是分区太多(从上面的输出中可以看出这种情况)。这可能会导致从机器写入S3的瓶颈。我不确定这是否适用于流媒体,但对于常规的spark工作,如df.coalesce(1).write.format(“com.databricks.spark.redshift”)。option(“url”,url)。选项(“dbtable”,REDSHIFT_PAGEVIEW_TBL)。选项('tempdir',s3Dir).mode('Append')。save()会起作用。你可以玩分区的数量来合并。 – DemetriKots

+0

我尝试过,使用coalesce(4)和缓存,但它花费了相同的时间。这很奇怪,但是4分钟就像写了10条记录或1000条记录一样。我尝试联系AWS,但它也没有帮助。尝试从s3直接加载csv到红移使用命令,看看是否需要时间,但这也是几秒钟。 – Nipun

0

你是databrick API。这是已知的问题。我有同样的问题。我确实与Databric API团队交谈过。从Avaro文件加载时,Redshift的效果并不理想。我们确实与AWS团队交谈过。他们正在努力。 Databrick API在S3上创建avaro文件,然后复制命令将加载avaro文件。那就是性能杀手。

+0

请发表评论 – adao7000