2016-06-21 48 views
1

我有一个带字符串(textData)的文件和一组正则表达式过滤器(regx),我想应用并获取计数。在我们迁移到星火,我用GREP如下:Apache Spark:使用ReduceByKey的正则表达式比GREP命令慢很多

from subprocess import check_output 
result={} 
for reg in regx: # regx is a list of all the filters 
    result[reg] = system.exec('grep -e ' + reg + 'file.txt | wc -l') 

注:我意译这里“system.exec”,我实际使用check_output。

我升级到SPARK其他的东西,所以我想也在这里采取火花的好处。所以我写了这个代码。

import re 

sc = SparkContext('local[*]') 
rdd = sc.textFile('file.txt') #containing the strings as before 
result = rdd.flatMap(lambda line: [(reg, line) for reg in regx]) 
      .map(lambda line: (line[0], len(re.findall(line[0], line[1])))) 
      .reduceByKey(lambda a,b: a+b) 
      .collect() 

我以为我很聪明,但代码实际上比较慢。任何人都可以指出任何明显的错误?我正在运行它作为 火花提交 - 本地[*] filename.py

我没有在同一确切的数据上运行两个版本,以确切地检查速度有多慢。如果需要,我可以轻松地做到这一点。当我检查本地主机:4040大部分时间正在由reduceByKey作业。

为了说明所花费的时间,文件中的行数是100,000,每行的平均#chars约为1000左右。过滤器的数量len(regx)= 20。这个代码已经在具有128GB RAM的8核处理器上运行了44分钟。

编辑:只需添加,正则表达式过滤器和文本文件的数量将在最终系统中乘以100倍。而不是写入/读取文本文件中的数据,我会用SQL语句查询rdd中的数据。因此,我认为Spark是一个不错的选择。

+0

如果您发现'grep'比Spark慢得多,这将是一个有趣的问题。另一种方式,特别是在本地模式下,它甚至不是令人惊讶的。虽然你的代码可以通过相当多的方式得到改进,但是在非分布式设置中,grep是不现实的。 – zero323

+0

您是否尝试过(增加)更改'textFile'中的'minPartitions'参数? –

+0

@ zero323好吧,我目前正在开发环境,但将转向分布式设置。如果你能详细说明'在不同方面有所改进',那将会非常有用。我非常感激。 –

回答

2

我的相当重的用户排序为好,而星火不觉得快在本地安装,你应该考虑一些其他的事情:

  • 有多大规模数据集?当需要大量RAM时,将记录交换到/ tmp。
  • 你有多少RAM分配给你的Spark应用程序?默认情况下它只有1GB,这对于排序和没有RAM限制的排序命令是非常不公平的。
  • 这两个任务是在同一台机器上执行的吗? Spark机器是在“自动扩展”磁盘文件中运行的虚拟设备吗? (糟糕的表现)。
  • Spark Clusters会自动将您的任务分散到多个服务器上。如果在Hadoop上运行,请记住文件是以128MB块分区的,每个块可以是RDD分区。

I.e.在Hadoop集群中,RDD分区可以并行处理。这是你不会表现的地方。

Spark将处理Hadoop以尽最大努力实现“数据本地化”,这意味着您的进程直接针对本地硬盘驱动器运行,否则数据将在整个网络中进行复制,如执行reduce-like进程。这些是阶段。理解阶段以及数据如何在执行者之间移动会带来很好的改进,此外,考虑到排序的类型是“reduce”,并且它在Spark上触发新的执行阶段,可能在整个网络上移动数据。在正在执行映射的相同节点上拥有备用资源可以节省大量网络开销。

否则,它仍然可以工作坦率地说好,你不能错:-)

破坏HDFS文件这是你真正得到的数据和执行的性能和安全性,在传播任务并行于在自恢复执行环境中针对大量硬盘驱动器工作。

在本地设置中,您只是觉得它没有反应,主要是因为加载,启动和跟踪过程需要一些时间,但在处理多个节点上的许多GB时感觉快速且安全。

我也喜欢shell脚本,而且我经常处理合理的GB数量,但是你无法正常匹配5TB的数据而无需分配磁盘IO或支付RAM,就好像没有明天一样。