2

我正在开发基于Scala的Apache Spark实现,用于将数据从远程位置导入HDFS,然后将数据从HDFS导入到Hive表。fs.rename(新路径(rawFileName),新路径(processFileName))不起作用

用我的第一次火花的工作,我已经onboarded数据/文件到HDFS在一个位置说 -

HDFS://sandbox.hortonworks.com:8020 /数据/分析/生/文件夹

让我们考虑一下,在上载CT_Click_Basic.csv和CT_Click_Basic1.csv.gz文件后,我在HDFS中有以下文件[在共享位置的文件名将是文件夹名称,其内容将存在于部分xxxxx文件中]:

[根@沙箱〜]#HDFS DFS -ls /数据/分析/原料/ */ 实测值3项

-rw-R - R-- 3个chauhan.bhupesh HDFS 0 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02/data/analytics/raw/CT_Click_Basic的.csv /部分-00000

-rw-R - R-- 3个chauhan.bhupesh HDFS 8395 2017年7月27日15时02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

找到2项目

-rw-R - R-- 3个chauhan.bhupesh HDFS 0 2017年7月27日15时02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-R-- R-- 3 chauhan.bhupesh HDFS 16588 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

现在用我的另一个星火工作,我想移动这些文件是从/raw文件夹到/进程,然后最后到/归档文件根据在每个阶段执行的任务在HDFS中的文件夹。

对于这样做,我使用下面的代码首先获取所有存在的文件列表下方/生文件夹:

def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = { 
val files = GlobalContext.hdfs.listStatus(new Path(filePath)) 
files.foreach { fileStatus => { 
      if(!fileStatus.isDirectory()) { 
       filePaths+=fileStatus.getPath()  
      } 
      else { 
       listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths) 
      } 
     } 
    } 
    filePaths 
} 

,然后使用代码以下行,我试图重命名/移动文件在/原始文件夹到/处理文件夹:

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 
    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path(rawFileName), new Path(processFileName)) 
} 

但我无法使用上面的书面代码移动/重命名这些文件。我试着调试代码,发现fs.rename()返回“false”。

请注意:我能够实现文件重命名/移动,当我在/数据/分析/原始文件夹前CT.csv [或任何其他文件]手动复制任何文件 ,然后运行fs.rename ()但它不适用于Part-xxxxx文件。

有什么我失踪了吗?

任何快速帮助将不胜感激。

问候, 布佩希

回答

0

最后,我有这个问题。实际上,我试图将文件从/data/analytics/raw/folder.csv/part-xxxxx重命名为/data/analytics/process/folder.csv/part-xxxxx,其中/ data/analytics/process在HDFS中存在,但“ folder.csv“不存在;因此在重命名时它会让我失望。我在代码中添加了以下行,并且对我来说工作得很好

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 

    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 

    var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2) 
    var processFolderPath = new Path(processFolderName) 
    if(!(fs.exists(processFolderPath))) 
     fs.mkdirs(processFolderPath) 
    val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path(rawFileName), new Path(processFileName)) 
} 
0

重命名可以返回false如果新路径(rawFileName)不存在。
之前fs.rename做检查文件是否存在:

if (fs.exists(somePath)) { 
fs.rename... 
} 

另一个原因可能是您尝试重命名使用的人该文件。或者如果您尝试重命名目录中的某些文件可以被某人使用。
要确保这是根本原因尝试在你的代码重命名别的文件:

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 
    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**")) 
} 

如果此重命名将是成功的根本原因是真正的比赛状态。