kafka-producer-api

    0热度

    1回答

    我有下面的代码,它将监听卡夫卡主题并使用火花流传输原样重现文本。但是,我无法在控制台上看到文本。我没有收到控制台上的任何错误消息。我可能是错的,但我期望卡夫卡主题中的文本显示在控制台上。 object scalaSparkProcessor { def main(args: Array[String]) { if (args.length < 3) { System.err.

    0热度

    1回答

    我不得不机(机#1和机#2)之间卡夫卡集群设置和配置之间传输文件如下: 1)每个机被配置成具有一个代理和一个动物园管理员正在运行。 2)服务器和动物园管理员属性被配置为具有多代理,多节点动物园管理员。 我现在有KafkaProducer和KafkaConsumer以下认识: 1)如果我从machine#1发送文件到machine#2,它的分解在使用一些默认的分隔符(LF或\ n)的线。 2)因此,

    4热度

    1回答

    我有一个安装在具有8个内核和32GB RAM的VM上的单个kafka实例。 我从10台不同的机器写入(生产),并从一台机器中消耗,所有机器都在同一个网络中。 我生成的数据大小约为35MBit/s。 由于某些原因,大部分时间我不能消耗超过〜10MBit/s(在有限的时间段内,我设法消耗所有生成的数据),尽管kafka和消费者服务器大多数闲置(因此我不认为这是保留问题)。 kafka可能会忽略一些生成

    0热度

    2回答

    我正在尝试运行Flink流式作业。我想确定流式传输过程的吞吐量和延迟。我已经开始了卡夫卡经纪人服务器,并从卡夫卡传入消息。我如何计算每秒消息(吞吐量)? (像rdd.count。有没有类似的方法来获取传入消息的计数) (完整的scenerio:我已经通过生产者发送消息作为Json对象,我添加了一些信息,如名称作为字符串和在Json对象中也是System.currentTimeMills 在流式传输

    0热度

    1回答

    我正在调用一个从kafka生产者发送一些数据的函数,但是在它发送之后,我返回了一个不返回的响应。代码在返回时卡住了。任何人有任何想法发生什么事? 我的代码如下, def postEvent(eventData): print("The eventData is...",eventData) timestamp = datetime.now().__format__("%Y-%m

    1热度

    2回答

    鉴于:我在卡夫卡有两个主题让我们说主题A和主题B.卡夫卡流从主题A读取记录,处理它并产生多个记录(比如说recordA和recordB)对应于消耗的记录。现在,问题是如何使用Kafka Streams来实现这一点。 KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Me

    0热度

    1回答

    EDIT2的恒定延迟:最后,我已经用Java做了我自己制作,而且运作良好,因此的问题是在卡夫卡的控制台制片。卡夫卡控制台消费者运作良好。 编辑:我已经尝试过版本0.9.0.1,并具有相同的行为。 我正在研究我的单身汉最终项目,Spark Streaming和Flink之间的比较。在两个框架之前,我使用Kafka和一个脚本来生成数据(如下所述)。我的第一个测试是比较两种框架与简单工作负载之间的延迟,

    0热度

    1回答

    我在CDH 5.9上运行卡夫卡0.10.0,群集被kerborized。 我想要做的是将消息从远程机器写入我的卡夫卡经纪人。 群集(其中安装了Kafka)具有内部以及外部IP地址。 集群中机器的主机名解析为私有IP,远程机器将相同的主机名解析为公有IP地址。 我从远程机器打开必要的端口9092(我使用SASL_PLAINTEXT协议)到Kafka Broker,验证了使用telnet。 第一步 -

    1热度

    2回答

    在运行我的制片人上课的时候Eclipse的我得到这个错误没有默认值: org.apache .kafka.common.config.ConfigException:缺少必需的配置“bootstrap.servers”,它没有默认值 这里是我的制作等级: public class SimpleProducer { public static void main(String[] arg

    0热度

    1回答

    当我试图在eclipse中运行我的SupplierConsumer类时,出现这些错误。这里是我的代码: public class SupplierConsumer{ public static void main(String[] args) throws Exception{ String topicName = "SupplierTopic"; Str