2016-01-13 85 views
2

我在同一台机器上有一个Spark集群和一个Hdfs。 我已经在每台机器的本地文件系统和hdfs分布式文件系统上复制了一个单独的文本文件,大约为3Gbytes。Spark本地vs hdfs permormance

我有一个简单的字数pyspark程序。

如果我提交从本地文件系统读取文件的程序,它会持续约33秒。 如果我提交从hdfs读取文件的程序,它持续约46秒。

为什么?我期待完全相反的结果。

增加sgvd的请求后:

16奴隶1个主

星火独立的,没有特别的设置(复制因子3)

版本1.5.2

import sys 
sys.path.insert(0, '/usr/local/spark/python/') 
sys.path.insert(0, '/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip') 
import os 
os.environ['SPARK_HOME']='/usr/local/spark' 
os.environ['JAVA_HOME']='/usr/local/java' 
from pyspark import SparkContext 
#conf = pyspark.SparkConf().set<conf settings> 


if sys.argv[1] == 'local': 
    print 'Esecuzine in modalita local file' 
    sc = SparkContext('spark://192.168.2.11:7077','Test Local file') 
    rdd = sc.textFile('/root/test2') 
else: 
    print 'Esecuzine in modalita hdfs' 
    sc = SparkContext('spark://192.168.2.11:7077','Test HDFS file') 
    rdd = sc.textFile('hdfs://192.168.2.11:9000/data/test2') 


rdd1 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y) 
topFive = rdd1.takeOrdered(5,key=lambda x: -x[1]) 
print topFive 
+0

它可以依靠很多东西。你的集群有多大?你使用什么集群管理器?任何自定义设置?什么Spark版本?你能显示你的代码吗? – sgvd

+0

我在问题的空间回答。 – arj

回答

0

这是因为如何数据是分布式的,单个文档不是一个好的选择,有几个更好的选择,如parquet,如果你这样做所以您会注意到性能会显着提高,这是因为文件分区的方式允许您的群集将并行读取这些部分,从而提高性能。

1

它有点直观,但由于复制因子为3,并且您有16个节点,因此每个节点平均有20%的数据存储在本地HDFS中。然后,大约6个工作节点应该足以平均读取整个文件,而无需任何网络传输。

如果您记录运行时间与工作人员节点的数量,您应该注意,在大约6之后,从本地FS和HDFS读取数据不会有任何区别。

上述计算可以使用变量来完成,例如, x=number of worker nodes,y= replication factor,但您可以很容易地看到,由于从本地FS读取强制该文件位于所有节点上,因此最终使用x=y,并且在使用floor(x/y)节点后没有差异。这正是你所观察到的,起初它似乎不符合直觉。你会在生产中使用复制因子100%吗?

+0

更改人工因素但不包括工人人数不会改变时间。 随着6工人repfactor 3和6 datanode时间增加到1分30秒。 – arj

+0

你是如何配置的?你重启了你的集群吗?你在描述中说你有16个奴隶。 –

+0

代表事实更改尝试:我已将文件的代表事实从2更改为16.程序提交给16个从属。 节点数尝试:我已经重新配置整个群集(火花和hadoop)只有6个节点。 – arj

1

Executor,Driver和RDD特有的参数(关于Spilling和存储级别)是什么?

从星火documentation

性能的影响

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O.要为洗牌组织数据,星火产生的任务集 - 地图的任务来组织数据,以及一组reduce任务汇总吧。这个术语来自MapReduce,并不直接与Spark的地图和减少操作相关。

某些洗牌操作会消耗大量的堆内存,因为它们使用内存中的数据结构在传输它们之前或之后组织记录。 Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection

我对memory/CPU core限制为Spark Job Vs memory/CPU core限制Map & Reduce任务感兴趣。

关键参数指标从Hadoop的:

yarn.nodemanager.resource.cpu-vcores 
mapreduce.map.cpu.vcores 
mapreduce.reduce.cpu.vcores 
mapreduce.map.memory.mb 
mapreduce.reduce.memory.mb 
mapreduce.reduce.shuffle.memory.limit.percent 

关键参数对Hadoop的基准SPARK params用于在对等。

spark.driver.memory 
spark.driver.cores 
spark.executor.memory 
spark.executor.cores 
spark.memory.fraction 

这些只是一些关键参数。看看SPARKMap ReduceMap Reduce的详细集

没有正确的参数设置,我们无法比较跨两种不同技术的作业性能。

相关问题