0

我有一个使用案例,其中关于传感器的事件信息在MySQL中连续插入。我们需要每隔1或2分钟通过一些卡夫卡话题处理这些信息。如何从Apache Spark向Kafka发送订单主题

我正在使用Spark将这些信息发送给Kafka主题,并在Phoenix表中维护CDC。我使用Cron作业每隔1分钟运行一次Spark任务。

我目前面临的问题是消息排序,我需要发送这些消息以升序时间戳结束系统卡夫卡主题(其中有1个分区)。但是由于多于1个火花DataFrame分区同时向卡夫卡主题发送信息,所以大部分消息订购都会丢失。

目前作为一种解决方法,我将我的DataFrame重新分区为1,以维护消息排序,但这不是一个长期解决方案,因为我失去了火花分布式计算。

如果你们有更好的解决方案设计围绕此请建议。

+0

你能告诉你如何插入数据到MySQL吗? – user8371915

+0

@ user8371915数据由应用程序插入,这些应用程序的职责是捕获传感器事件并插入到mysql数据库中,这些应用程序不在我的控制之下。 – nilesh1212

+0

所以MySQL是源代码,Kafka是一个接收器?目前尚不清楚为什么订单是相关的,但总的来说,您无法获得订单保证和端到端并行性。 – user8371915

回答

0

我能够根据升序时间戳实现消息排序,以便通过使用密钥对数据进行赔偿并通过在分区内应用排序来扩展。

val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}")) 
     .toDF("Asset", "Message") 
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset") 
     .select($"Message") 
     .select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"), 
      expr("(split(Message, ','))[1]").cast("String").as("TS"), 
      expr("(split(Message, ','))[2]").cast("Long").as("Col3"), 
      expr("(split(Message, ','))[3]").cast("String").as("Col4"), 
      expr("(split(Message, ','))[4]").cast("String").as("Value")) 
     .sortWithinPartitions($"TS", $"Value") 
相关问题