2017-09-13 384 views
2

我正在使用Spark 2.0.2,使用Kafka 0.11.0和 我试图在火花流中使用来自kafka的消息。以下是代码:使用模式转换ConsumerRecord值到Spark-kafka中的Dataframe

val topics = "notes" 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:7092", 
    "schema.registry.url" -> "http://localhost:7070", 
    "group.id" -> "connect-cluster1", 
    "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer", 
    "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer" 
) 
val topicSet: Set[String] = Set(topics) 
val stream = KafkaUtils.createDirectStream[String, String](
    SparkStream.ssc, 
    PreferConsistent, 
    Subscribe[String, String](topicSet, kafkaParams) 
) 
stream.foreachRDD (rdd => { 
    rdd.foreachPartition(iterator => { 
    while (iterator.hasNext) { 
     val next = iterator.next() 
     println(next.value()) 
    } 
    }) 
}) 

如果卡夫卡消息包含的记录,输出会是:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"} 
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"} 

因此,接收到的消息是Avro的解码,从consumerRecord的价值可见。 现在我需要在一个数据帧格式的记录,但我不知道如何从这里做起,甚至与架构如下:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val m = sr.getLatestSchemaMetadata(topics + "-value") 
val schemaId = m.getId 
val schemaString = m.getSchema 

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
val parser = new Schema.Parser() 
val avroSchema = parser.parse(schemaString) 
println(avroSchema) 

印作为架构如下:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

任何人都可以帮助我理解如何从消费者记录的价值中获取数据框?我已经看过其他问题,例如Use schema to convert AVRO messages with Spark to DataFrame,Handling schema changes in running Spark Streaming application,但他们没有处理consumerRecord在第一位。

+0

我遇到了类似的情况。你能弄清楚这一点吗? –

回答

0

我是新来的scala \ kafka \ spark自己,所以我不确定这是否完全回答这个问题,但它会帮助我。我确信有比这更好的方法,所以希望有更多经验的人可以出席并提供更好的答案。

// KafkaRDD 
stream.foreachRDD { rdd => { 

    // pull the values I'm looking for into a string array 
    var x = rdd.map(row => row.value()).collect() 

    // convert to dataframe 
    val df = spark.createDataFrame(x).toDF("record") 

    // write data frame to datastore (MySQL in my case) 
    df.write 
    .mode(SaveMode.Append) 
    .jdbc(url, table, props) 

    } 
} 
1

您可以使用以下代码片段: 流是消费记录的DSTREAM从kafka010的kafkaUtils API返回:

stream.foreachRDD(rdd => 
    if (!rdd.isEmpty()) { 
     val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) 
     import sqlContext.implicits._ 
     val topicValueStrings = rdd.map(record => (record.value()).toString) 
     val df = sqlContext.read.json(topicValueStrings) 
     df.show() 
    })