kafka-producer-api

    2热度

    1回答

    我的流是生产类型的记录Tuple2<String,String> .toString()输出(usr12345,{"_key":"usr12345","_temperature":46.6}) 其中键是usr12345和值是{"_key":"usr12345","_temperature":46.6} 在流输出的.print()该值正确: (usr12345,{"_key":"usr12345",

    2热度

    3回答

    我创建其中前端服务推消息到卡夫卡请求“主题并监听另一“响应”主题对于一些下游后端消费者(实际上是一个复杂的系统中的系统,该系统最终推回卡夫卡),对“请求”消息进行处理,最终推到“响应”主题。 我想弄清楚最优雅的方式,以确保消费者侦听适当的分区并接收响应,并且后端推到前端使用者正在侦听的分区。我们总是需要确保响应发送给产生初始消息的相同消费者。 我有两个解决方案,截至目前,但也不是特别令人满意。任何

    0热度

    1回答

    可以来自不同组的两位消费者从同一主题和分区x中读取并且写入另一个主题和分区y? 我会实施消费者策略,其中一位消费者放弃其他消费者处理的数据。 生产者将数据保存到共享分区的顺序并不重要。 我只是想知道这是否可能

    0热度

    1回答

    我们最近遇到了一个问题,卡夫卡经纪人遇到了阻止IO的内核问题(但是我能够心跳回到zookeeper)。这样做的结果是卡夫卡经纪人留在ISR集,但实际上无法完成任何任务。 的问题是: 1)是否有什么卡夫卡检查它发出心跳之前,还是仅仅是弱智化发射心跳(我看https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pro

    2热度

    1回答

    我正在研究kafka用例,我需要在生产者&消费者一方拥有事务性语义。我能够使用kafka事务API 0.11将事务性消息发布到kafka集群,但消费者我现在面临这个问题身边...我已isolation.level=read_committed属性文件中,但我不能消耗it..I可以看到邮件被消耗与isolation.level=read_uncommitted但这不希望.. 生产者代码 packag

    0热度

    1回答

    我有一个使用案例,其中关于传感器的事件信息在MySQL中连续插入。我们需要每隔1或2分钟通过一些卡夫卡话题处理这些信息。 我正在使用Spark将这些信息发送给Kafka主题,并在Phoenix表中维护CDC。我使用Cron作业每隔1分钟运行一次Spark任务。 我目前面临的问题是消息排序,我需要发送这些消息以升序时间戳结束系统卡夫卡主题(其中有1个分区)。但是由于多于1个火花DataFrame分区

    1热度

    2回答

    我正在设计高吞吐量系统,我要有几个生产者。 我的主题将被分区。制作人将以键值对的形式发送记录。 键将用于分区数据。 消费者将组织在消费者群体中(他们将被分配相同的群组ID,以便他们可以同时消费来自同一主题的消息,但来自不同的分区)。 卡夫卡保证单个分区内的消息顺序。 消费者将被分配他们公平份额的分区。 唯一令我担心的是,我的分区密钥不会以循环方式分发消息,有些分区可能比其他分区更繁忙。 问:可能存

    -1热度

    1回答

    我使用的是卡夫卡版本0.11.0.0,并试图建立由Avro的加载数据的输入流file.But它与异常实例化生产者未按: [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 0 ms. Exception in thread "ma

    5热度

    1回答

    所以我试图获得使用Kafka流的交互式查询。我有Zookeeper和Kafka在本地运行(在Windows上)。我在哪里使用C:\ temp作为存储文件夹,适用于Zookeeper和Kafka。 我已经安装这样 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --t

    4热度

    2回答

    我有麻烦连接从我的主机(Windows)到客户(Linux),我安装了卡夫卡。 我已经设置了一个VM(带有VirtualBox),我安装了Confluent工具。在此VM中,我运行以下命令: confluent start schema-registry 它启动zookeeper,kafka和模式注册表。 在这个虚拟机,我可以运行 kafka-console-producer --broker