consumer

    1热度

    1回答

    是否存在可从特定时间段/日期消费的低级别或高级别消费者Kafka 0.8.2? (卡夫卡客户端,火花卡夫卡,...)

    0热度

    2回答

    我试图用BlockingQueue实现一些Consumer-Producer问题。为了达到某种目的,我决定编写文件搜索工具。 我决定搜索机制是递归的工作,并且每个新目录都会有新的线程池来提高搜索速度。 我的问题是,我不知道如何实现停止打印线程(消费者)在最后的机制 - 当搜索线程完成工作。 我试图用POISON PILLS等一些想法做到这一点,但它不能很好地工作(线程在打印任何结果之前停止)。任何

    0热度

    3回答

    我正在为学校解决问题。很多这种方法已经实施,我不能做太多的改变。 其实我只能在具体点进行更改。 下面是我正在使用的方法的代码,虽然有些词语是荷兰语,但它应该是可读的。 它应该读取文件的行,从文本中创建地址(保存为(street +“”+ number +“”+ place))并将它们添加到返回的列表中。该文件以空行结束。 @Override public List<Adres> query(IS

    -1热度

    1回答

    我使用,我需要以下要求 有非常快的消费者作为我的生产者都已经非常快 在租赁需要每秒 不按规定2K的消息处理来处理的ActiveMQ /消费消息再次发生服务器崩溃或其他故障。我可以再次触发整个过程。 需要运行非常正常的配置服务器 - 4Gib RAM 下面 给出使用non-persistent delivery mode(vm://localhost)(http://activemq.apache.

    0热度

    1回答

    嗨我一直在试图学习KAFKA,并与我的远程轮询/消费者有问题。 我在AWS EC2实例中使用私有和公有IP设置了KAFKA。我的server.properties看起来像这样。 听众= PLAINTEXT://172.31.31.58:9092 #AWS专用IP advertised.listeners = PLAINTEXT:// 35 ?? ?? ??:。9092 #AWS公共IP屏蔽 我的A

    0热度

    1回答

    卡夫卡消费者民意调查api没有返回记录到低超时。 如果我增加poll中的超时值,那么记录即将到来。 我无法理解这个逻辑。请按照以下代码进行帮助: public ConsumerRecords<String, Map<String, String>> subscribeToQueue(String topic, QueueListener q) { Properties props = n

    0热度

    1回答

    我正在使用JVm-Junit库编写Pact的消费者端代码。然而,在该行: MockProviderConfig config = MockProviderConfig.createDefault(); 我收到错误“createDefault()不是未定义的类型MockProviderConfig” 我该怎么做才能继续。 我的POM文件看起来是这样的: http://maven.apache.org

    0热度

    1回答

    我必须使用Apache Kafka连接到我公司的经纪人。问题是我以前从未使用过这种技术,这是我需要澄清的一点。 实际上,我创建了一个带有Zookeeper/Server/Consumer的“本地”Kafka,它与命令〜/ bin/kafka-console-consumer.sh一起工作--zookeeper localhost:2181 --topic testGaultier --from-开

    0热度

    1回答

    我第一次使用RabbitMQ和Arduino,我需要发布数据。所以我使用了PubSubCLient类。这是代码: #include <SPI.h> #include <PubSubClient.h> #include <Dhcp.h> #include <Ethernet.h> #include <EthernetUdp.h> #include <Dns.h> #include <Et

    0热度

    2回答

    我正在使用Kafka流并希望重置一些消费者偏移量从Java到开始。 KafkaConsumer.seekToBeginning(...)听起来像做正确的事情,但我与卡夫卡流工作: KafkaStreams streams = new KafkaStreams(builder, props); ... streams.start(); 我想这取决于具体的流管线我确定这将在引擎盖下创建了几位消