我在名为part-0001
,part-0002
等的Linux机器上的单个目录中有大约200个文件。每个行拥有大约一百万行相同的列(称为'a','b'等等)。让'a','b'作为每行的关键字(包含许多重复项)。并行化在pyspark中的Spark数据帧组
同时,我建立了一个Spark主机和两个从机的Spark 2.2.0群集,共有42个内核可用。地址是spark://XXX.YYY.com:7077
。
然后,我使用PySpark连接到群集,并按如下方式计算每个唯一对的200个文件的计数。
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext("spark://XXX.YYY.com:7077")
sqlContext = SQLContext(sc)
data_path = "/location/to/my/data/part-*"
sparkdf = sqlContext.read.csv(path=data_path, header=True)
dfgrouped = sparkdf.groupBy(['a','b'])
counts_by_group = dfgrouped.count()
这样做的效果是,我可以看到Spark在一系列消息中前进,它确实返回看似合理的结果。
问题:虽然正在执行此计算,但top并未显示任何证据表明从属内核正在执行任何操作。似乎没有任何并行化。每个从机都有一个在作业之前存在的单个相关Java进程(以及来自其他用户和后台系统进程的进程)。所以看起来主人正在做所有的工作。鉴于有200个奇怪的文件,我预计会看到21个进程在每个从机上运行,直到事情结束(这个是我看到当我在一个单独的实现中明确调用parallelize
时,如下count = sc.parallelize(c=range(1, niters + 1), numSlices=ncores).map(f).reduce(add)
)。
问题:如何确保Spark实际上并行计数?我希望每个核心都能抓取一个或多个文件,对它在文件中看到的配对进行计数,然后将各个结果缩减为一个DataFrame
。我不应该在顶部看到这个吗?我是否需要明确调用并行化?
(FWIW,我所看到的例子使用分区,但我的理解是,这是用来在单文件的数据块分配处理。我的情况是,我有很多的文件。)
谢谢提前。