我是BigData生态系统和入门的新手。从Kafka读取并写入实木复合地板的hdfs
我看了几篇文章有关使用火花流,但想知道是否可以使用火花的工作,而不是流从卡夫卡读读卡夫卡的话题? 如果是的话,你们可以帮我指出一些可以让我开始的文章或代码片段。
我的问题的第二部分是实木复合地板的格式写入HDFS。 一旦我从卡夫卡读到,我想我会有一个rdd。 将此rdd转换为数据帧,然后将数据帧写入parquet文件。 这是正确的做法。
任何帮助表示赞赏。
感谢
我是BigData生态系统和入门的新手。从Kafka读取并写入实木复合地板的hdfs
我看了几篇文章有关使用火花流,但想知道是否可以使用火花的工作,而不是流从卡夫卡读读卡夫卡的话题? 如果是的话,你们可以帮我指出一些可以让我开始的文章或代码片段。
我的问题的第二部分是实木复合地板的格式写入HDFS。 一旦我从卡夫卡读到,我想我会有一个rdd。 将此rdd转换为数据帧,然后将数据帧写入parquet文件。 这是正确的做法。
任何帮助表示赞赏。
感谢
对于卡夫卡读取数据并将其写入HDFS,在木地板格式,使用星火批处理作业,而不是流,你可以使用Spark Structured Streaming。
结构化数据流是建立在星火SQL引擎可扩展性和容错流处理引擎。您可以使用与在静态数据上表示批量计算的相同方式来表达流式计算。 Spark SQL引擎将负责逐步连续运行它,并在流式数据持续到达时更新最终结果。您可以在Scala,Java,Python或R中使用Dataset/DataFrame API来表示流聚合,事件时窗口,流到批处理连接等。计算在同一优化的Spark SQL引擎上执行。最后,系统通过检查点和预写日志确保端到端的准确性 - 一次容错保证。简而言之,结构化数据流提供了快速,可扩展,容错,端到端的流处理,而无需用户推断流式传输。
它配备了卡夫卡从卡夫卡一个内置的来源,即,我们可以查询数据。它与卡夫卡经纪人版本0.10.0或更高版本兼容。
对于批处理模式从卡夫卡拉低数据,则可以创建一个定义的偏置范围内的数据集/数据帧。
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
源每一行都有下面的模式:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
现在,将数据写入到HDFS在地板格式,下面的代码可以这样写:
df.write.parquet("hdfs://data.parquet")
更多结构化数据流+卡夫卡的星火信息,请参见下面的指南 - Kafka Integration Guide
我希望它有帮助!
使用卡夫卡流。 SparkStreaming是一个可怕的用词不当(它是引擎盖下的小批量)。
https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/
此答案有帮助吗? – himanshuIIITian
谢谢Himanshu,这很有帮助。似乎这需要Spark 2.2,还有其他方式可以在2.0版本的火花低版本中执行此操作。 – Henosis