2017-06-21 104 views
0

我有一个使用Yarn作为群集管理器[在三节点hadoop群集上运行]的三节点Spark群集。使用Spark Job将文件/数据从共享/ NFS安装位置放入HDFS时会产生间歇性问题

考虑一下,我的Hadoop集群有三个节点[Master,Slave1和Slave2] Resourcemanager在Slave1上的Master和NodaManager上运行& Slave2。 Spark Cluster也存在于三个节点上。

在主节点上,我创建了一个文件夹/ data/nfsshare,它已经在Slave1和Slave2上挂载为/ nfsshare。现在我在/ data/nfsshare文件夹中保留了一个文件abc.txt,它在/ nfsshare位置对slave1和slave2都可见。

我创建了一个小的点火工作,用于将abc.txt从/ data/nfsshare位置复制到HDFS,并执行字数计数并将结果保存在HDFS中。

DEF write2HDFS(参数:数组[字符串]){

val source = args(0) 
val destination = args(1) 
val processedDataDestination = args(2) 
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true"); 
val sc = new SparkContext(conf) 

logger.info("STARTING READ") 

val rdd = sc.textFile(source) 

logger.info("READING DONE") 
logger.info("STARTING WRITE") 
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString) 
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions) 
// rdd.coalesce(1) 
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions) 

rdd.saveAsTextFile(destination) 


logger.info("DONE") 
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination) 

sc.stop} 

当我试图使用命令执行此代码:

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB 

我得到以下间歇性问题:

1)InputPath不存在:file:/data/nfsshare/abc.txt,在该作业的某些运行过程中间歇性地发生[文件存在于共享位置/安装路径中]

2)有时/间歇作业状态快要到了失败,但与所需的数据都拿到创建的输出目录

3)输出目录已经存在:有时候HDFS输出目录中的现有问题来了 - >这得到了由解决增加执行程序和驱动程序内存

- >我尝试在集群和客户端部署模式下运行此作业,但在这两种情况下我都遇到同样的问题。

我不确定共享位置路径为/ Master /的/ data/nfsshare和/ nfsshare在slave上有什么区别?因为在命令行我传递/ data/nfsshare作为文件路径的位置,因此每当任何执行程序在奴隶上运行时都会查找/ data/nfsshare会失败。

我试着在所有三个节点上运行这个工作,但这些间歇性问题仍然存在。

任何专家的意见,将不胜感激。

如果有任何其他更好的方法可以将文件从任何暂存区域/挂载位置放入HDFS,那么请分享它。

问候, 布佩希

回答

0

其实我面临InputPath犯规存在:文件:由于安装的文件夹名称/data/nfsshare/abc.txt间歇。一旦我在所有节点上保留了相同的名称[/ data/nfsshare]。这个问题发生了。我假设当我在集群模式下运行我的Spark工作时,YARN决定在哪里运行驱动程序和执行程序,因此如果所有执行程序都在主节点上运行[from where/data/nfsshare]是可见的,工作正常,而对于其他执行者来说,这条路径是/ nfsshare,这个投掷路径相关的问题。一旦路径问题就解决了所有的执行者们能看到文件的路径为/数据/ nfsshare

也为输出目录已经存在,吉德拉尔维尔马的代码片断帮助。

0
  1. 最好是输入文件上传到HDFS没有火花,只是把它上传到HDFS与HDFS DFS -copyFromLocal,或者你可以尝试使用HDFS客户端库上传,但单线程没有火花的api。通常假设输入数据已经在分布式文件系统(s3,hdfs,无论)中。使用nfs时可能会看到各种效果。所以从设计的角度来看,有一部分管道将数据放入s3/hdfs中,并且只有spark的并行处理才会启动。
  2. 是的,你应该,如果你在运行同样的工作,清理你的输出目录,一遍又一遍,我觉得有火花的配置,它将允许你禁用此验证,但是更好地设计你的应用程序写入到每一次新路径
1

Q1,Q2)您提到nfs将HDFS上的/ data/nfsshare目录安装在本地/ nfsshare上。如果你已经成功完成了这个工作,并且你已经验证了它正在工作,那么为什么不使用它作为你的输入路径?当您尝试使用本地文件系统

事情变得纱线模式棘手的一种。如果您使用的是分布式计算,那么最好保留您的输入应该在HDFS中。所以,你的火花提交命令变为,

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar /nfsshare/path /user/hduser/hdfsWrite1MB /user/hduser/hdfsWriteP1MB 

注意,我省略hdfs://,这是因为在星火环境默认文件系统是HDFS。

Q3)输出目录已经存在:您可以将文件保存为解释here之前做到这一点,

val hadoopConf = new org.apache.hadoop.conf.Configuration() 
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://host:port/"), hadoopConf) 
try { hdfs.delete(new org.apache.hadoop.fs.Path(/path/to/output), true) } 
catch { case _ : Throwable => { } } 

,或者你可以当前时间戳简单地附加在你的输出路径,如果你不想删除一遍又一遍的东西。只有在处理RDD时,Dataframe API才能覆盖现有路径。

PS:file:/data/nfsshare/test-1MB你Q1显示输入路径,而在​​命令输入显示file:///data/nfsshare/abc.txt。是abc.txt的一个目录吗?

让我知道这是否有帮助。干杯。

+0

感谢您的宝贵意见吉德拉尔维尔马。我想我无法正确解释NFS安装部分。其实这是事实。我有三个节点Master,Slave1和Slave2。我创建了一个文件夹/数据/上主其上安装有SLAVE1和SLAVE2,是AS/nfsshare不是/数据/ nfsshare可见有nfsshare。 –