2016-05-30 133 views
4

我是Spark/Scala中的新成员。我知道如何加载CSV文件:Spark Scala Streaming CSV

sqlContext.read.format("csv") 

以及如何读取文本流和文件流:

scc.textFileStream("""file:///c:\path\filename"""); 
    scc.fileStream[LongWritable, Text, TextInputFormat](...) 

但如何读取CSV格式文本流?谢谢,列维

回答

3

在这里你去:

val ssc = new StreamingContext(sparkConf, Seconds(5)) 


    // Create the FileInputDStream on the directory 
    val lines = ssc.textFileStream("file:///C:/foo/bar") 

    lines.foreachRDD(rdd => { 
     if (!rdd.isEmpty()) { 
      println("RDD row count: " + rdd.count()) 
     // Now you can convert this RDD to DataFrame/DataSet and perform business logic. 

     } 
     } 
    }) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
0

你可以串流播放您的CSV利用火花2.2结构容易流文件。

您可以参考here