2016-11-20 163 views
4

我有一个spark 2.0应用程序,它使用spark流(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息。用Spark 2.0.2读取来自Kafka的Avro消息(结构化流媒体)

结构化流看起来非常酷,所以我想尝试和迁移代码,但我无法弄清楚如何使用它。

在常规流式传输中,我使用kafkaUtils创建了Dstrean,并在传递它的参数中使用了值解串器。

在结构化流媒体文件说,我应该反序列化使用DataFrame功能,但我不能确切地说明这意味着什么。

我看着象这样的例子example,但我在卡夫卡的Avro对象退出复杂,不能简单地铸造比如上例中的字符串..

到目前为止,我尝试这种代码(我所看见的在这里一个不同的问题):

import spark.implicits._ 

    val ds1 = spark.readStream.format("kafka"). 
    option("kafka.bootstrap.servers","localhost:9092"). 
    option("subscribe","RED-test-tal4").load() 

    ds1.printSchema() 
    ds1.select("value").printSchema() 
    val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show() 
    val query = ds2.writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 

,我得到 “数据类型不匹配:不能投BinaryType到StructType(StructField(....”?

我怎么能反序列化值

回答

2

我还不是很熟悉Spark的序列化如何与新的/实验性结构化流结合使用,但下面的方法确实有效 - 尽管我不确定它是否是最好的方法(恕我直言,这种方法有点尴尬看起来感觉)。

我会尝试以自定义数据类型为例(此处为Foo个案类)来代替Avro,但我希望它能帮助您。这个想法是使用Kryo序列化来序列化/反序列化您的自定义类型,请参阅Spark文档中的Tuning: Data serialization

注意:Spark支持通过内置(隐式)编码器开箱即用的外壳类序列化,您可以通过import spark.implicits._导入该编码器。但为了这个例子,我们忽略这个功能。

假设你定义了下面Foo案例类作为您的自定义类型(TL; DR提示:为防止运行到奇怪的星火系列化投诉/错误,你应该把代码放到一个单独的Foo.scala文件):

// This could also be your auto-generated Avro class/type 
case class Foo(s: String) 

现在你有以下的结构化数据流的代码读卡夫卡,其中输入主题包含了卡夫卡的信息,其信息价值的数据是二进制编码String,你的目标是什么呢基于这些信息价值创造Foo实例(即类似于你会如何反序列化的二进制数据到Avro的类的实例):

val messages: DataFrame = spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") 
    .option("subscribe", "my-input-topic") 
    .load() 

现在我们deserializing值放入我们的自定义Foo类型的实例,为此,我们首先需要定义一个隐含Encoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo] 
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value"))) 

让我们回到你的Avro的问题,你需要做的是:

  1. 创建y的适当Encoder我们的需求。
  2. 与把你的二进制编码Avro的数据出消息值(row.getAs[Array[Byte]]("value"))的代码,反序列化二进制编码的Avro的数据转换成的Avro的POJO,即代码替换Foo(new String(row.getAs[Array[Byte]]("value"))和回报,比如说,一个Avro的GenericRecord或任何SpecificCustomAvroObject你已在其他地方定义

如果有人知道更简洁/更好/ ...的方式来回答Tal的问题,我全都是耳朵。 :-)

参见:

+0

我认为塔尔的使用情况是,他并没有对他的话题二进制编码的字符串,他有二进制编码Avro的。在这种情况下会使用bijection-avro工作吗? – zzztimbo

+0

yep @zzztimbo没错。我不得不把这个项目积压一点,所以我没有机会尝试任何新的东西。希望我很快就能看到这个主题。当我会做,我会考虑bijection-avro –

+0

@TalJoffe请让我知道你想出了什么。我试图读取一个kstream放在那里的avro,双向注入avro没有为我工作。 – zzztimbo

1

所以实际上有人在我公司解决了这个对我来说,我会在这里发布了未来的读者..

基本上我错过了什么miguno建议是解码部分:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<SomeObject>] = { 
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl) 
iter.map(message => { 
    val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record] 
    val field1 = record.get("field1Name").asInstanceOf[GenericData.Record] 
    val field2 = record.get("field1Name").asInstanceOf[GenericData.String] 
     ... 
    //create an object with the fields extracted from genericRecord 
    }) 
} 

现在你可以从卡夫卡读取消息,并将它们像这样解码:

val ds = spark 
    .readStream 
    .format(config.getString(ConfigUtil.inputFormat)) 
    .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers)) 
    .option("subscribe", config.getString(ConfigUtil.subscribeTopic)) 
    .load() 
    .as[KafkaMessage] 

val decodedDs = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl)) 

* KafkaMessage是一个简单的情况下类,包含通用对象从卡夫卡(键,值,主题阅读时,你得到的,分区,偏移,时间戳)

希望这可以帮助别人

+0

那么你的意思是我们需要提供除avro生成的类之外的相关案例类吗?你能告诉我们你的进口声明吗?在这份声明中,何处获得“Deocder”类? val解码器= AvroTo Decoder.getDecoder(schemaRegistryUrl) –

2

如上所述,星火2.1.0有与该批次读者的Avro支持,但不能与SparkSession.readStream()。以下是我如何根据其他响应在Scala中工作的方式。为简洁起见,我简化了架构。

package com.sevone.sparkscala.mypackage 

import org.apache.spark.sql._ 
import org.apache.avro.io.DecoderFactory 
import org.apache.avro.Schema 
import org.apache.avro.generic.{GenericDatumReader, GenericRecord} 

object MyMain { 

    // Create avro schema and reader 
    case class KafkaMessage (
     deviceId: Int, 
     deviceName: String 
    ) 
    val schemaString = """{ 
     "fields": [ 
      { "name": "deviceId",  "type": "int"}, 
      { "name": "deviceName", "type": "string"}, 
     ], 
     "name": "kafkamsg", 
     "type": "record" 
    }"""" 
    val messageSchema = new Schema.Parser().parse(schemaString) 
    val reader = new GenericDatumReader[GenericRecord](messageSchema) 
    // Factory to deserialize binary avro data 
    val avroDecoderFactory = DecoderFactory.get() 
    // Register implicit encoder for map operation 
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord] 

    def main(args: Array[String]) { 

     val KafkaBroker = args(0); 
     val InTopic = args(1); 
     val OutTopic = args(2); 

     // Get Spark session 
     val session = SparkSession 
       .builder 
       .master("local[*]") 
       .appName("myapp") 
       .getOrCreate() 

     // Load streaming data 
     import session.implicits._ 
     val data = session 
       .readStream 
       .format("kafka") 
       .option("kafka.bootstrap.servers", KafkaBroker) 
       .option("subscribe", InTopic) 
       .load() 
       .select($"value".as[Array[Byte]]) 
       .map(d => { 
        val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null)) 
        val deviceId = rec.get("deviceId").asInstanceOf[Int] 
        val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString 
        new KafkaMessage(deviceId, deviceName) 
       }) 
+0

它没有为我工作,导致:java.io.EOFException错误 –

+1

此解决方案不适用于模式注册表已启用kafka。它报告“引起:org.apache.avro.AvroRuntimeException:格式错误的数据。长度为负值:-13” –

0

使用以下步骤:

  • 定义卡夫卡消息。
  • 定义一个consumer Utility,它返回YourAvroObject的一个DataSet。
  • 定义您的逻辑代码。

卡夫卡消息:

case class KafkaMessage(key: String, value: Array[Byte], 
            topic: String, partition: String, offset: Long, timestamp: Timestamp) 

卡夫卡消费者:

import java.util.Collections 

import com.typesafe.config.{Config, ConfigFactory} 
import io.confluent.kafka.serializers.KafkaAvroDeserializer 
import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.spark.sql.SparkSession 

import scala.reflect.runtime.universe._ 


object KafkaAvroConsumer { 

    private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer") 
    val valueDeserializer = new KafkaAvroDeserializer() 
    valueDeserializer.configure(Collections.singletonMap("schema.registry.url", 
    conf.getString("schema.registry.url")), false) 

    def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = { 
    val schema = new Schema.Parser().parse(schemaStr) 
    Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value)) 
    } 

    def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag] 
    (schemaStr: String) 
    (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = { 

    val spark = SparkSession 
     .builder 
     .master("local[*]") 
     .appName(appName) 
     .getOrCreate() 

    import spark.implicits._ 

    // Create DataSet representing the stream of KafkaMessage from kafka 
    val ds = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")) 
     .option(subscribeType, topics) 
     .option("startingOffsets", "earliest") 
     .load() 
     .as[KafkaMessage] 
     .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object. 

    ds 
    } 

} 
+1

您能举出完整的示例或在github中共享代码 –

+0

Utils.convert方法如何定义? –

相关问题