我有一个spark 2.0应用程序,它使用spark流(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息。用Spark 2.0.2读取来自Kafka的Avro消息(结构化流媒体)
结构化流看起来非常酷,所以我想尝试和迁移代码,但我无法弄清楚如何使用它。
在常规流式传输中,我使用kafkaUtils创建了Dstrean,并在传递它的参数中使用了值解串器。
在结构化流媒体文件说,我应该反序列化使用DataFrame功能,但我不能确切地说明这意味着什么。
我看着象这样的例子example,但我在卡夫卡的Avro对象退出复杂,不能简单地铸造比如上例中的字符串..
到目前为止,我尝试这种代码(我所看见的在这里一个不同的问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
,我得到 “数据类型不匹配:不能投BinaryType到StructType(StructField(....”?
我怎么能反序列化值
我认为塔尔的使用情况是,他并没有对他的话题二进制编码的字符串,他有二进制编码Avro的。在这种情况下会使用bijection-avro工作吗? – zzztimbo
yep @zzztimbo没错。我不得不把这个项目积压一点,所以我没有机会尝试任何新的东西。希望我很快就能看到这个主题。当我会做,我会考虑bijection-avro –
@TalJoffe请让我知道你想出了什么。我试图读取一个kstream放在那里的avro,双向注入avro没有为我工作。 – zzztimbo