-2
我是新来的Apache火花,并试图理解结构化流与Apache卡夫卡在斯卡拉,但没有工作在我的青睐,直到现在基本上我想发送JSON从卡夫卡过程它使用火花结构化流并发回到卡夫卡。我试着在网站上给出的例子,但它不工作。试图理解结构化流
这里是我的代码:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object dataset_kafka {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("topic", "test1")
.option("checkpointLocation", "/home/hduser/Desktop/tempo")
.start()
.awaitTermination()
}
}
与我要去哪里不对任何帮助?
我以这种格式发送卡夫卡JSON:
{"schema":"Hiren","payload":"123"}
欢迎来到SO!请参阅这里了解如何发布一个很好的问题,这个问题可能不会被关闭,甚至可能会被回答:https://stackoverflow.com/help/how-to-ask –
我的问题无效吗? –
你需要显示你自己的一些代码没有工作/你自己的一些努力。你所要求的是称为教程 – radumanolescu