我尝试使用spark的sc.textfile('/home/sathya/location/*.txt')
访问示例位置中的最新添加文件但是,我需要取最新添加的文件而不是获取所有文件目录下。使用spark从文件夹访问最新更改的文件
感谢, Sathiyarajan中号
我尝试使用spark的sc.textfile('/home/sathya/location/*.txt')
访问示例位置中的最新添加文件但是,我需要取最新添加的文件而不是获取所有文件目录下。使用spark从文件夹访问最新更改的文件
感谢, Sathiyarajan中号
你可以从目录中的最新修改的文件,并把它传递给sc.textFile()
火花阅读。
这里是你如何能得到最新的修改后的文件
val directory = new File("/home/sathya/location/")
val allFiles = directory.listFiles
.filter(_.isFile)
.sortBy(-_.lastModified())
.toList
val latestFile = allFiles(0)
这里latestFile
是最新修改的文件,现在你可以阅读最新的文件到火花作为
sc.textFile(latestFile)
希望这有助于!
对于您的问题没有现成的解决方案,首先找到最新的文件,然后加载它。
Java示例:
/**
* Function to get latest file in directory
*/
public static String latestFileInDir(String dir) throws IOException, InterruptedException {
//Replace hadoop home
String command = "<HADOOP_HOME>/bin/hadoop fs -ls -R " + dir + " | awk -F\" \" '{print $6\" \"$7\" \"$8}' | sort -nr | head -1";
ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", command);
String op = null;
Process process = pb.start();
int errCode = process.waitFor();
if (errCode == 0) {
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
op = br.readLine();
}
return op;
}
获取最新的目录和负载
rdd= sc.textfile(latestFileInDir("/home/sathya/location/"));
如果这个回答你的问题,你能接受作为回答,并关闭话题。这对其他人也有帮助 –
其工作,谢谢 – sathiyarajan
感谢您接受作为答案:) –