我有一个用java编写的卡夫卡生产者代码,用于编写卡夫卡消息。以及接收这些消息的消费者代码。是否有可能使用java编写kafka消费者接收的输出到文件
是否有可能将消费者收到的消息写入java中的任何文本文件。
我有一个用java编写的卡夫卡生产者代码,用于编写卡夫卡消息。以及接收这些消息的消费者代码。是否有可能使用java编写kafka消费者接收的输出到文件
是否有可能将消费者收到的消息写入java中的任何文本文件。
谢谢你们,
我能够实现它。一旦数据在消费者端被接收到,那么它就是你必须编写的一个常见的java代码。
下面是将消息打印到控制台的代码行。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
您可以将所有消息存储到字符串并一次全部打印到文件。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
completMessage += new String(bytes, "UTF-8")+"\n";
new String(bytes, "UTF-8")+"\n";
包含实际的消息。
最后打印所有文件。
writeDataToFile(completMessage);
writeDataToFile
包含简单的java代码来写一个字符串到文件。
谢谢。
Kafka流迭代器是阻塞的,因此当从Java调用它们时,它们将继续等待新消息,并且代码无法关闭FileWriter。为此,您可以执行以下操作:
1.更改为降低超时
'props.put属性(“consumer.timeout.ms”,“1500”);'
2.现在在超时的情况下打破循环
'VAL文件=新的文件(参数(0)) VAL体重=新的BufferedWriter(新的FileWriter(文件)) 而(试行{stream.iterator()hasNext()} {捕案件 E:Throwable的= >假}) { VAL记录= stream.iterator.next()。消息() 的println(record.toString()) bw.write(record.toString()) } ' –
如果您正在编写自己的客户,则应该在同一个应用程序中包含写入文件的逻辑。使用预先打包的控制台使用者,您可以将其管道化为文件。例如:kafka-console-consumer > file.txt
另一个(无代码)选项将尝试StreamSets Data Collector一个开源的Apache许可工具,它也具有拖放UI。它包含用于Kafka和各种数据格式的内置连接器。
*完全披露我是这个项目的提交者。
这是可能的。以下是这个的工作代码。
package com.venk.prac;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import kafka.utils.ShutdownableThread;
public class FileConsumer extends ShutdownableThread {
private final KafkaConsumer<Integer, String> kafkaConsumer;
private String topic;
private String filePath;
private BufferedWriter buffWriter;
public FileConsumer(String topic, String filePath) {
super("FileConsumer", false);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConsumer = new KafkaConsumer<Integer, String>(properties);
this.topic = topic;
this.filePath = filePath;
try {
this.buffWriter = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void doWork() {
// TODO Auto-generated method stub
kafkaConsumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000);
try {
for (ConsumerRecord<Integer, String> record : consumerRecords)
buffWriter.write(record.value() + System.lineSeparator());
buffWriter.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public String name() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isInterruptible() {
return false;
}
}
是的。在发布之前,您是否尝试过谷歌答案? – aviad