1

我想读的情侣使用SparkSession从HDFS文件夹CSV文件多张文件(即我不想读取该文件夹中的所有文件)星火会话阅读,而不是使用模式

我收到以下错误,同时运行(在末尾代码):

Path does not exist: 
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, 
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv 

我不想用一边看书,喜欢/home/temp/*.csv的格局,在今后的原因是我有逻辑挑中只有一个或两个文件100个CSV文件中的文件夹

请建议

SparkSession sparkSession = SparkSession 
      .builder() 
      .appName(SparkCSVProcessors.class.getName()) 
      .master(master).getOrCreate(); 
    SparkContext context = sparkSession.sparkContext(); 
    context.setLogLevel("ERROR"); 

    Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/")) 
      .filter(name -> name.toString().endsWith(".csv")) 
      .map(name -> name.toString()) 
      .collect(Collectors.toSet()); 

    SQLContext sqlCtx = sparkSession.sqlContext(); 

    Dataset<Row> rawDataset = sparkSession.read() 
      .option("inferSchema", "true") 
      .option("header", "true") 
      .format("com.databricks.spark.csv") 
      .option("delimiter", ",") 
      //.load(String.join(" , ", fileSet)); 
      .load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " + 
        "/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"); 

UPDATE

我可以遍历文件和下面做一个工会。请推荐,如果有一个更好的办法...

Dataset<Row> unifiedDataset = null; 

    for (String fileName : fileSet) { 
     Dataset<Row> tempDataset = sparkSession.read() 
       .option("inferSchema", "true") 
       .option("header", "true") 
       .format("csv") 
       .option("delimiter", ",") 
       .load(fileName); 
     if (unifiedDataset != null) { 
      unifiedDataset= unifiedDataset.unionAll(tempDataset); 
     } else { 
      unifiedDataset = tempDataset; 
     } 
    } 
+0

你的问题似乎是对HDFS和本地路径。当您在本地模式下使用它时,它会尝试从本地文件系统读取文件,当您在服务器中使用它时,它会尝试从hdfs读取文件。解决方案将取决于您在测试和服务器环境中如何运行以及文件位于何处。 – dirceusemighini

+0

我相信当我们给火星读取一组值时,它只接受“DIRECTORIES”而不接受单个文件。在HDFS模式和本地文件模式下,行为是否会改变? – Manjesh

回答

1

你的问题是,你正在创建与值的字符串

“的/ home/Cloudera公司/工作/ JavaKafkaSparkStream /输入/ INPUT_2的.csv, /home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv”

不是传递两个文件名作为参数,应该这样做:

.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv", 
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"); 

逗号必须超出限制,你应该有两个值,而不是一个字符串。

0

根据我的理解,您希望从HDFS中读取多个文件,而不使用像“/path/*.csv”这样的正则表达式。你缺少的是每个路径需要用引号分别是由“”分隔

您可以阅读使用如下代码,请确保您已经添加SPARK CSV库:

sqlContext.read.format("csv").load("/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv","/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv") 
0

模式可以还有帮助。 您想在时间选择两个文件。 如果他们是序贯那么你可以做这样的事情

.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_[1-2].csv") 

如果有更多的文件,那么就去做输入_ [1-5] .csv文件