2015-03-13 103 views
7

Spark流textFileStreamfileStream可以监控目录并处理Dstream RDD中的新文件。Spark流DStream RDD获取文件名

如何获取DStream RDD在特定时间间隔内处理的文件名?

+0

重复http://stackoverflow.com/问题/ 29935732/spark-streaming-textfilestream-filestream-get-file-name – Irene 2015-06-18 13:27:30

+0

@Irene这是在2015年3月发布的,该问题发布于2015年4月。如何重复?顺便说一句,其他问题也没有答案。 – 2015-06-19 05:20:23

+0

hehe我错误地读了日期。另一个问题在评论中回答。 – Irene 2015-06-19 07:40:30

回答

4

fileStream产生UnionRDDNewHadoopRDD s。由sc.newAPIHadoopFile创建的关于NewHadoopRDD的很好的部分是它们的name被设置为它们的路径。

这里是你可以用这些知识做什么的例子:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] = 
    ssc.fileStream[LongWritable, Text, TextInputFormat](directory) 
    .transform(rdd => 
     new UnionRDD(rdd.context, 
     rdd.dependencies.map(dep => 
      dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name) 
     ) 
    ) 
    ) 

def transformByFile[U: ClassTag](unionrdd: RDD[String], 
           transformFunc: String => RDD[String] => RDD[U]): RDD[U] = { 
    new UnionRDD(unionrdd.context, 
    unionrdd.dependencies.map{ dep => 
     if (dep.rdd.isEmpty) None 
     else { 
     val filename = dep.rdd.name 
     Some(
      transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]]) 
      .setName(filename) 
     ) 
     } 
    }.flatten 
) 
} 

def main(args: Array[String]) = { 
    val conf = new SparkConf() 
    .setAppName("Process by file") 
    .setMaster("local[2]") 

    val ssc = new StreamingContext(conf, Seconds(30)) 

    val dstream = namesTextFileStream(ssc, "/some/directory") 

    def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] = 
    rdd.map(line => (filename, line)) 

    val transformed = dstream. 
    transform(rdd => transformByFile(rdd, byFileTransformer)) 

    // Do some stuff with transformed 

    ssc.start() 
    ssc.awaitTermination() 
} 
+0

感谢您的支持!虽然'val文件名'令人困惑,因为它是整个文件的路径。 '新的文件(dep.rdd.name).getName'似乎工作。 – 2017-01-27 00:30:16

0

对于那些想一些Java代码,而不是斯卡拉:

JavaPairInputDStream<LongWritable, Text> textFileStream = 
     jsc.fileStream(
      inputPath, 
      LongWritable.class, 
      Text.class, 
      TextInputFormat.class, 
      FileInputDStream::defaultFilter, 
      false 
     ); 
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> { 
     UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd(); 
     List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava(); 
     List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map(depRdd -> { 
      if (depRdd.isEmpty()) { 
       return null; 
      } 
      JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD(); 
      String filename = depRdd.name(); 
      JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename); 
      return newDep.rdd(); 
     }).filter(t -> t != null).collect(Collectors.toList()); 
     Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq(); 
     ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); 
     return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD(); 
});