2017-10-08 93 views
0

我基本上想要使用来自Kafka的数据并将其写入HDFS。但发生的情况是,它不是在hdfs中编写任何文件。它会创建空文件。来自Kafka的Spark流式传输和Avro格式的HDFS写入

而且请指导我,如果我想写在HDF格式的HDFS我如何修改代码。

为了简单起见,我写了本地C盘。

import org.apache.spark.SparkConf 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.SparkContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.KafkaUtils 
import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaStreaming extends App{ 
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming") 
val conext = new SparkContext(conf) 
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1)) 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "group", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (true: java.lang.Boolean)) 
val topics = Array("topic") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams)) 
val lines = stream.map(_.value) 
stream.foreachRDD(rdd => { 
    rdd.coalesce(1).saveAsTextFile("C:/data/spark/") 
}) 
ssc.start() 
ssc.awaitTermination()} 

而且下面是build.sbt

name := "spark-streaming" 
version := "1.0" 
scalaVersion := "2.11.8" 
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0" 
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0" 
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0- 
10_2.11" % "2.2.0" 
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1" 
+0

它所需要的是一个过滤器,以检查是否该批次是空.. stream.map(_。值).foreachRDD(RDD => { rdd.foreach(的println ) 如果){ rdd.saveAsTextFile( “C:/数据/火花/”) }(rdd.isEmpty(!) }) 但我仍然面对的问题是,新一批覆盖旧数据。我希望所有的数据被追加到文件中.. –

回答

0

下面点上运行您的卡夫卡消费者应用程序之前必须检查:在卡夫卡或

  • 检查数据可用不是

  • 更改auto.offset.resetearliest 这里最早的意思是kafka自动将偏移重置为最早的偏移量。

  • 启动Kafka控制台生产者应用程序并开始键入一些消息。然后启动卡夫卡消费者代码,再次在卡夫卡控制台制作者上输入一些消息,然后检查消息是否打印到消费者控制台。

您可以使用下面的代码

spark.write.avro("<path>") 

行写Avro的格式输出我希望这将有助于你

+0

嗨谢谢你是卡夫卡正在运行,消息在卡夫卡。感谢您为avro提供的输入 –

1

HDFS中不写任何文件。它会创建空文件。

请检查这里怎么调试

Unable to see messages from Kafka Stream in Spark

请指导我,如果我想在Avro的格式写在HDFS

https://github.com/sryza/simplesparkavroapp

package com.cloudera.sparkavro 

import org.apache.avro.mapred.AvroKey 
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat} 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.io.NullWritable 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.SparkContext._ 

object SparkSpecificAvroWriter { 
    def main(args: Array[String]) { 
    val outPath = args(0) 

    val sparkConf = new SparkConf().setAppName("Spark Avro") 
    MyKryoRegistrator.register(sparkConf) 
    val sc = new SparkContext(sparkConf) 

    val user1 = new User("Alyssa", 256, null) 
    val user2 = new User("Ben", 7, "red") 

    val records = sc.parallelize(Array(user1, user2)) 
    val withValues = records.map((x) => (new AvroKey(x), NullWritable.get)) 

    val conf = new Job() 
    FileOutputFormat.setOutputPath(conf, new Path(outPath)) 
    val schema = User.SCHEMA$ 
    AvroJob.setOutputKeySchema(conf, schema) 
    conf.setOutputFormatClass(classOf[AvroKeyOutputFormat[User]]) 
    withValues.saveAsNewAPIHadoopDataset(conf.getConfiguration) 
    } 
} 
+0

Kafka中存在消息,问题是当新批处理覆盖现有数据时。现在我添加了空批次的过滤器。所以现在我的零件文件不再是空的了。但是,当它回顾一个新的批处理时,它会覆盖旧的内容。你能帮我如何追加文件。以供参考我的代码在git中心:https://github.com/Viyaan/spark-kafka-hdfs/blob/master/src/main/scala/com/spark/streaming/KafkaStreaming.scala –

+0

尝试使用kafdrop检查卡夫卡消息。这可以帮助你从Kafka的最后调试它。 –

1

查看您的代码,您可以简单地将当前时间戳添加到您正在编写的文件中。

这应该解决您的问题。 :)

==========

如果你想所有的文件追加到一个文件,那么你可以使用dataframes如下:

我不会推荐使用追加因为这个文件系统的设计方式在HDFS中。但这里是你可以尝试的。

  1. 从您的RDD
  2. 创建一个数据框使用数据框的节省模式(“追加”),然后写入文件。

e.g:

VAL数据帧= youRdd.toDF(); dataframe.write()。mode(SaveMode.Append).format(FILE_FORMAT).. save(path);

看看是否有帮助

+0

是的,但会创建很多文件,我只想要一个文件。谢谢你的建议你 –

+0

已经更新了上面的答案。看看是否有助于你的用例 –