Spark流textFileStream
和fileStream
可以监控目录并处理Dstream RDD中的新文件。Spark流DStream RDD获取文件名
如何获取DStream RDD在特定时间间隔内处理的文件名?
Spark流textFileStream
和fileStream
可以监控目录并处理Dstream RDD中的新文件。Spark流DStream RDD获取文件名
如何获取DStream RDD在特定时间间隔内处理的文件名?
fileStream
产生UnionRDD
的NewHadoopRDD
s。由sc.newAPIHadoopFile
创建的关于NewHadoopRDD
的很好的部分是它们的name
被设置为它们的路径。
这里是你可以用这些知识做什么的例子:
def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
.transform(rdd =>
new UnionRDD(rdd.context,
rdd.dependencies.map(dep =>
dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
)
)
)
def transformByFile[U: ClassTag](unionrdd: RDD[String],
transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
new UnionRDD(unionrdd.context,
unionrdd.dependencies.map{ dep =>
if (dep.rdd.isEmpty) None
else {
val filename = dep.rdd.name
Some(
transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
.setName(filename)
)
}
}.flatten
)
}
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Process by file")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(30))
val dstream = namesTextFileStream(ssc, "/some/directory")
def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
rdd.map(line => (filename, line))
val transformed = dstream.
transform(rdd => transformByFile(rdd, byFileTransformer))
// Do some stuff with transformed
ssc.start()
ssc.awaitTermination()
}
感谢您的支持!虽然'val文件名'令人困惑,因为它是整个文件的路径。 '新的文件(dep.rdd.name).getName'似乎工作。 – 2017-01-27 00:30:16
对于那些想一些Java代码,而不是斯卡拉:
JavaPairInputDStream<LongWritable, Text> textFileStream =
jsc.fileStream(
inputPath,
LongWritable.class,
Text.class,
TextInputFormat.class,
FileInputDStream::defaultFilter,
false
);
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map(depRdd -> {
if (depRdd.isEmpty()) {
return null;
}
JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
String filename = depRdd.name();
JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
return newDep.rdd();
}).filter(t -> t != null).collect(Collectors.toList());
Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});
重复http://stackoverflow.com/问题/ 29935732/spark-streaming-textfilestream-filestream-get-file-name – Irene 2015-06-18 13:27:30
@Irene这是在2015年3月发布的,该问题发布于2015年4月。如何重复?顺便说一句,其他问题也没有答案。 – 2015-06-19 05:20:23
hehe我错误地读了日期。另一个问题在评论中回答。 – Irene 2015-06-19 07:40:30