我在一个特定的主题中有多个消息(更具体地说是日志消息),这些消息具有相同的消息块ID(这些ID不断变化,但对于某个消息块保持不变),我需要找到一种组所有具有该ID的消息或共享消息组中所有消费者之间具有相同ID的消息中包含的数据。 那么有什么方法可以在消费者群体中的各种消费者之间共享数据?Kafka:消费者群体中的消费者可以共享数据吗?
1
A
回答
0
这听起来像是一个会话用例。 Kafka没有提供任何将消息分组或嵌套在一起的方法,因此您必须通过在处理消息的同时保持状态并使用某种标头来包装消息组来实现这一点。然后,您可以将其推送到包裹消息组的新主题。
更好的方法可能是利用外部数据库或其他系统,以更灵活的方式选择或组织基于字段的数据。有关使用Spark streaming + HBase的示例,您可以查看this blogpost。
0
有两种方法可以做到这一点。
当您发布消息本身时,使用分区键创建一条消息,因此所有具有相同id的消息都转到单个分区。那么在消费方面,它总是会被接收者接收。[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]
如果您使用的火花流在消费者方面,你可以使用滑动窗口的概念将所有相同的ID消息。[http://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations]
相关问题
- 1. Kafka:多个独立消费者群体可以消费一个主题吗?
- 2. kafka消费者上的AssertionError
- 3. clj-kafka - 消费者空
- 4. 单个生产者到多个消费者(相同的消费者群体)
- 5. 在哪里定义特定消费者群体的消费者数量?
- 6. Kafka transactional生产者和消费者
- 7. python-kafka:消费者可以根据消息属性跳过消息吗?
- 8. spring-cloud-stream-kafka不尊重单个消费者的群体
- 9. 消费群体负载均衡读者
- 10. ActiveMQ - 消费者不共享负载
- 11. kafka中的多个消费群体
- 12. 在Rust消费者中消费多个Kafka主题
- 13. 生产者/消费者线程中的油门消费者
- 14. 消费者生产者多线程消费者不会消逝
- 15. 可以在消费者或生产者中使用kafka broker ip吗?
- 16. Java生产者 - 消费者:生产者不“通知()”消费者
- 17. 来自kafka消费者的InstanceAlreadyExistsException
- 18. kafka-python消费者给出的错误
- 19. Kafka Listener方法未被调用。消费者不消费。
- 20. Kafka关于消费群体的问题
- 21. 消费者池
- 22. RabbitMQ消费者
- 23. 消费者过滤的生产者 - 消费者阻塞队列
- 24. 伪装Confluent .NET Kafka消费者
- 25. Kafka高级消费者error_code = 15
- 26. Flask API作为实时kafka消费者
- 27. KAFKA Java消费者不工作
- 28. Apache kafka - 消费者延迟选项
- 29. 生产者/消费者:消费者需要从共享队列中删除元素
- 30. 如何在使用Semphores的生产者 - 消费者中消费?