我正在开发基于Scala的Apache Spark实现,用于将数据从远程位置导入HDFS,然后将数据从HDFS导入到Hive表。fs.rename(新路径(rawFileName),新路径(processFileName))不起作用
用我的第一次火花的工作,我已经onboarded数据/文件到HDFS在一个位置说 -
HDFS://sandbox.hortonworks.com:8020 /数据/分析/生/文件夹
让我们考虑一下,在上载CT_Click_Basic.csv和CT_Click_Basic1.csv.gz文件后,我在HDFS中有以下文件[在共享位置的文件名将是文件夹名称,其内容将存在于部分xxxxx文件中]:
[根@沙箱〜]#HDFS DFS -ls /数据/分析/原料/ */ 实测值3项
-rw-R - R-- 3个chauhan.bhupesh HDFS 0 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS
-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02/data/analytics/raw/CT_Click_Basic的.csv /部分-00000
-rw-R - R-- 3个chauhan.bhupesh HDFS 8395 2017年7月27日15时02 /data/analytics/raw/CT_Click_Basic.csv/part-00001
找到2项目
-rw-R - R-- 3个chauhan.bhupesh HDFS 0 2017年7月27日15时02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS
-rw-R-- R-- 3 chauhan.bhupesh HDFS 16588 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000
现在用我的另一个星火工作,我想移动这些文件是从/raw文件夹到/进程,然后最后到/归档文件根据在每个阶段执行的任务在HDFS中的文件夹。
对于这样做,我使用下面的代码首先获取所有存在的文件列表下方/生文件夹:
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
,然后使用代码以下行,我试图重命名/移动文件在/原始文件夹到/处理文件夹:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
但我无法使用上面的书面代码移动/重命名这些文件。我试着调试代码,发现fs.rename()返回“false”。
请注意:我能够实现文件重命名/移动,当我在/数据/分析/原始文件夹前CT.csv [或任何其他文件]手动复制任何文件 ,然后运行fs.rename ()但它不适用于Part-xxxxx文件。
有什么我失踪了吗?
任何快速帮助将不胜感激。
问候, 布佩希