5

增量S3文件,我做了如下管道: 任务管理器 - > SQS - >刮板工人(我的应用程序) - > AWS流水 - > S3文件 - >星火 - >红移(?)。如何处理在星火

有些事情我试图解决/改进,我会很乐意指导:

  1. 刮板可能得到复制数据,并再次刷新他们流水,这将导致火花的DUP。我应该在开始计算之前使用Distinct函数在火花中解决这个问题吗?
  2. 我并没有删除S3处理过的文件,所以数据越来越大。这是一个很好的做法吗? (以s3作为输入数据库)或者我应该处理每个文件并在spark完成后删除它?目前我正在做sc.textFile("s3n://...../*/*/*") - 这将收集我所有的桶文件并运行计算。
  3. 要将结果放入Redshift(或s3) - >我该如何增量执行此操作?也就是说,如果s3变得越来越大,那么红移会有重复的数据......我以前总是冲洗它吗?怎么样?
+0

你可以有你的水桶要处理的元素,一旦他们已推,将它们移动到另一个桶,所以你保留一份副本如果需要的话,但你不会处理它们第二次 –

回答

0

我曾经在一个流水线前,虽然没有遇到这些问题。这是我做的。

  1. 删除重复

    一个。我用BloomFilter删除本地重复。请注意,文档相对不完整,但您可以轻松地保存/加载/合并/交叉布隆过滤器对象。你甚至可以在过滤器上做reduce

    b。如果直接将数据从Spark保存到RedShift,则可能需要花费一些时间和精力为当前批次更新BloomFilter,进行广播,然后过滤以确保全局不存在重复。在我使用RDS中的UNIQUE约束之前,忽略错误,但不幸的是RedShift does not honour the constraint

  2. 和3.数据挺大

我使用EMR集群运行s3-dist-cp command移动&合并数据(因为通常有很多小的日志文件,其影响斯巴克的性能)。如果您碰巧使用EMR托管您的Spark群集,只需在分析之前添加一个步骤即可将数据从一个存储桶移至另一个存储桶。步骤采取command-runner.jar作为自定义罐子,命令看起来像

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess 

而且,原distcp不支持合并的文件。

通常,应该尽量避免在同一个桶(或至少,路径)一起具有加工和未加工的数据。