2017-10-08 168 views
1

我在S3中存在大约15000个文件(ORC),其中每个文件包含几分钟的数据和每个文件的大小在300-700MB之间变化。由于递归循环YYYY/MM/DD/HH24/MIN格式的目录非常昂贵,我创建了一个包含给定日期的所有S3文件列表的文件(objects_list.txt)并传递此文件作为输入到火花读APISpark EMR S3处理大量文件

val file_list = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/objects_list.txt")) 
val paths: mutable.Set[String] = mutable.Set[String]() 
    for (line <- file_list.getLines()) { 
     if(line.length > 0 && line.contains("part")) 
     paths.add(line.trim) 
    } 

val eventsDF = spark.read.format("orc").option("spark.sql.orc.filterPushdown","true").load(paths.toSeq: _*) 
eventsDF.createOrReplaceTempView("events") 

所述簇的大小是10个r3.4xlarge机(工人)(其中每个节点:120GB RAM和16个核心)和主是m3.2xlarge配置(

我面临的问题是,火花阅读运行不断,我看到只有司机工作和休息所有节点没有做任何事情,我不知道为什么驱动程序我因为AFAIK火花懒惰地工作,所以直到一个行动被称为阅读不应该发生,我认为它列出每个文件,并收集一些与它相关的元数据。

但是为什么只有驱动程序正在工作并休息所有节点都没有做任何事情,我如何让这个操作在所有工作节点上并行运行?

我所遇到的这些文章https://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219https://gist.github.com/snowindy/d438cb5256f9331f5eec,但这里的整个文件的内容被解读为RDD,但我的使用情况取决于列唯一被提及的那些块/数据的列应该从S3中获取(给出ORC的列式访问是我的存储)。在S3文件大约有130列,但只有20场正在使用被称为和处理数据帧API的

Sample Log Messages: 
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=09/min=00/part-r-00199-e4ba7eee-fb98-4d4f-aecc-3f5685ff64a8.zlib.orc' for reading 
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=19/min=00/part-r-00023-5e53e661-82ec-4ff1-8f4c-8e9419b2aadc.zlib.orc' for reading 

下面你可以看到,只有一个执行程序运行到驱动程序的任务节点之一(集群模式)而CPU是在其他节点的剩余部分(即工人),0%,甚至后3-4小时处理的,这种情况是在文件的同一给定数量庞大的已被处理 Only One Executor is Active i.e Driver

任何指针怎么可以避免这个问题,即加快负载和进程?

回答

2

有一种解决方案可以帮助您基于AWS Glue。

你在S3中分割了很多文件。但是你有基于时间戳的分区。所以使用胶水,您可以在S3中使用您的对象,如EMR中的“配置表”。

首先,你需要创建一个EMR有5.8+版本,你将能够看到这一点:

enter image description here

您可以设置这个检查这两个选项。这将允许访问AWS胶粘数据目录。

之后,您需要将您的根文件夹添加到AWS胶水目录。快速的方法是使用Glue Crawler。该工具将抓取您的数据并根据需要创建目录。

我会建议你看看here

履带运行后,这会对你的表中的目录,你可以在AWS Athena看到的元数据。

在Athena,你可以看到你的数据是否被抓取者正确识别。

该解决方案将使您的火花接近真正的HDFS。由于元数据将在数据目录中正确显示。而你的应用程序正在寻找“索引”的时间将允许更快地运行作业。

在这里使用此功能,我能够改善查询,并且使用胶水处理分区效果更好。所以,试试这可能会有助于演出。

+0

感谢您的详细解答。但除了AWS Glue之外,还有其他的通用解决方案吗?如果有人在GCE或Azure上运行他们的应用程序,该怎么办? ,我认为这是一个非常普遍的问题,人们可能正在做一些事情来摆脱这个瓶颈,有兴趣了解该解决方案 –

+0

在另一个说明中,是否有任何关于如何从EMR引用/连接到Glue目录表的参考,他们的文档没有任何样本/例子 –

+1

关于这一点并不多。如果您创建一个EMR集群来检查上述两件事情。当您使用spark指向一个Hive表时,您可以指向该表。就像这样使用:'val myDf = spark.table(“database.table”)'你有你的数据框。 –