kafka-producer-api

    0热度

    1回答

    我们目前每个主题都使用一个生产者。由于显而易见的原因,我们正在考虑转换为一个制作人员来处理多个主题 现在,有没有这样的情况,一个主题可能由于某种原因而较慢,而不是其他?如果有这样的情况,它将如何与一个单一的生产者生产多个主题?会影响其他主题的吞吐量吗? 是否有任何其他情况下,使用单个生产者的多个主题可能不是一个好主意? 谢谢! 更新: 这里是我们到目前为止已经试过: 我们简要地看了看代码,并在这里

    3热度

    1回答

    下面两个代码片段发布消息的行为有什么不同? 方法1 Message<String> message = MessageBuilder.withPayload("testmsg") .setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build(); ListenableF

    2热度

    2回答

    我写在斯卡拉生产者和我想要做配料。批量应该工作的方式是,它应该将队列中的消息保存到队列中,然后将所有消息一起发布到主题上。但不知何故,它不工作。当我开始发送消息时,它开始逐一发布消息。有谁知道如何在卡夫卡生产者中使用配料。 val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer"

    0热度

    1回答

    我们在我们的系统中使用kafka进行流处理。输入数据\消息的结构非常复杂。所以如何定义输入消息的结构。这是输入数据\消息的最合适的结构和序列化机制。

    3热度

    1回答

    我们有一个spark插件应用程序(以下是代码),它可以从kafka获取数据,并在将数据插入MongoDB之前对每条消息进行一些转换。我们有一个中间件应用程序,将消息(批量)推送到Kafka并等待来自Spark流应用程序的确认(针对每条消息)。如果在将消息发送到Kafka后的特定时间段(5秒)内中间件没有收到确认,则中间件应用程序会重新发送消息。火花流应用程序能够接收大约50-100条消息(在一批中

    0热度

    1回答

    我试图通过Cloudera Data Science Workbench在我们的内部Hadoop集群上实施GitHub项目(https://github.com/tomatoTomahto/CDH-Sensor-Analytics)。 在Cloudera Data Science Workbench上运行项目时,尝试通过Python api连接到Kafka时,出现错误“No Brokers ava

    1热度

    2回答

    如何使用消息批处理或使用pykafka缓冲区生成kafka主题。我的意思是一个生产者可以在一个生产过程中产生很多信息我知道使用消息批处理或缓冲区消息的概念,但我不知道如何实现它。我希望有人可以帮助我在这里

    0热度

    1回答

    你好,我正在写一个小的cassandra触发器,它在插入到某个表后发送信息给kafka。这里是我的触发代码: public class InsertDataTrigger implements ITrigger { public Collection<Mutation> augment(Partition update) { //checking if trigger w

    0热度

    2回答

    我有一个分区主题,它有X分区。 截至目前,在生成消息时,我创建了仅指定topic和value的卡夫卡ProducerRecord。我没有定义key。 据我所知,我的消息将使用默认的内置分区器在分区间均匀分配。 另一方面,我有一个卡夫卡消费者的线程池。每个卡夫卡消费者都将在自己的专用线程中运行消耗该主题的消息。这些消费者中的每一个都被赋予相同的group.id。这将允许消费并行消息。每个消费者将被分

    0热度

    1回答

    我无法理解ProducerRecord。 以前我是构建ProducerRecord这样的: new ProducerRecord<String, String>("my-topic", "key", "value") 我想时间戳通过附加所以我决定检查文档,发现该构造的确可以传递一个时间戳。但它也需要指定一个分区是这样的: new ProducerRecord(String topic,