2017-10-11 70 views
2

我在名为part-0001part-0002等的Linux机器上的单个目录中有大约200个文件。每个行拥有大约一百万行相同的列(称为'a','b'等等)。让'a','b'作为每行的关键字(包含许多重复项)。并行化在pyspark中的Spark数据帧组

同时,我建立了一个Spark主机和两个从机的Spark 2.2.0群集,共有42个内核可用。地址是spark://XXX.YYY.com:7077

然后,我使用PySpark连接到群集,并按如下方式计算每个唯一对的200个文件的计数。

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

sc = SparkContext("spark://XXX.YYY.com:7077") 
sqlContext = SQLContext(sc) 

data_path = "/location/to/my/data/part-*" 
sparkdf = sqlContext.read.csv(path=data_path, header=True) 
dfgrouped = sparkdf.groupBy(['a','b']) 
counts_by_group = dfgrouped.count() 

这样做的效果是,我可以看到Spark在一系列消息中前进,它确实返回看似合理的结果。

问题:虽然正在执行此计算,但top并未显示任何证据表明从属内核正在执行任何操作。似乎没有任何并行化。每个从机都有一个在作业之前存在的单个相关Java进程(以及来自其他用户和后台系统进程的进程)。所以看起来主人正在做所有的工作。鉴于有200个奇怪的文件,我预计会看到21个进程在每个从机上运行,​​直到事情结束(这个我看到当我在一个单独的实现中明确调用parallelize时,如下count = sc.parallelize(c=range(1, niters + 1), numSlices=ncores).map(f).reduce(add))。

问题:如何确保Spark实际上并行计数?我希望每个核心都能抓取一个或多个文件,对它在文件中看到的配对进行计数,然后将各个结果缩减为一个DataFrame。我不应该在顶部看到这个吗?我是否需要明确调用并行化?

(FWIW,我所看到的例子使用分区,但我的理解是,这是用来在文件的数据块分配处理。我的情况是,我有很多的文件。)

谢谢提前。

回答

1

TL; DR还有可能是您的部署没有问题。

我希望看到21个进程运行

除非你专门配置的火花,使用每个JVM执行单核,没有理由要做到这一点。与RDD不同,您在问题中已经提到过DataFrame API根本不使用Python工作者,Python UserDefinedFunctions除外。与此同时,JVM执行程序使用线程而不是完整的系统进程(PySpark使用后者来避免GIL)。此外,在独立模式下的默认spark.executor.cores等于the available cores on the worker的数量。因此,如果没有额外的配置,您应该看到两个执行器JVM,每个执行器使用21个数据处理线程。

总的来说,你应该检查Spark UI,如果你看到任务分配给执行者,一切都应该没问题。