我想读的情侣使用SparkSession
从HDFS文件夹CSV文件多张文件(即我不想读取该文件夹中的所有文件)星火会话阅读,而不是使用模式
我收到以下错误,同时运行(在末尾代码):
Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv
我不想用一边看书,喜欢/home/temp/*.csv
的格局,在今后的原因是我有逻辑挑中只有一个或两个文件100个CSV文件中的文件夹
请建议
SparkSession sparkSession = SparkSession
.builder()
.appName(SparkCSVProcessors.class.getName())
.master(master).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
.filter(name -> name.toString().endsWith(".csv"))
.map(name -> name.toString())
.collect(Collectors.toSet());
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> rawDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("com.databricks.spark.csv")
.option("delimiter", ",")
//.load(String.join(" , ", fileSet));
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
UPDATE
我可以遍历文件和下面做一个工会。请推荐,如果有一个更好的办法...
Dataset<Row> unifiedDataset = null;
for (String fileName : fileSet) {
Dataset<Row> tempDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("csv")
.option("delimiter", ",")
.load(fileName);
if (unifiedDataset != null) {
unifiedDataset= unifiedDataset.unionAll(tempDataset);
} else {
unifiedDataset = tempDataset;
}
}
你的问题似乎是对HDFS和本地路径。当您在本地模式下使用它时,它会尝试从本地文件系统读取文件,当您在服务器中使用它时,它会尝试从hdfs读取文件。解决方案将取决于您在测试和服务器环境中如何运行以及文件位于何处。 – dirceusemighini
我相信当我们给火星读取一组值时,它只接受“DIRECTORIES”而不接受单个文件。在HDFS模式和本地文件模式下,行为是否会改变? – Manjesh