2015-11-09 128 views

回答

2

谢谢你们,

我能够实现它。一旦数据在消费者端被接收到,那么它就是你必须编写的一个常见的java代码。

下面是将消息打印到控制台的代码行。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 

您可以将所有消息存储到字符串并一次全部打印到文件。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 
completMessage += new String(bytes, "UTF-8")+"\n"; 

new String(bytes, "UTF-8")+"\n";包含实际的消息。

最后打印所有文件。

writeDataToFile(completMessage); 

writeDataToFile包含简单的java代码来写一个字符串到文件。

谢谢。

+0

Kafka流迭代器是阻塞的,因此当从Java调用它们时,它们将继续等待新消息,并且代码无法关闭FileWriter。为此,您可以执行以下操作:
1.更改为降低超时
'props.put属性(“consumer.timeout.ms”,“1500”);'
2.现在在超时的情况下打破循环
'VAL文件=新的文件(参数(0)) VAL体重=新的BufferedWriter(新的FileWriter(文件)) 而(试行{stream.iterator()hasNext()} {捕案件 E:Throwable的= >假}) { VAL记录= stream.iterator.next()。消息() 的println(record.toString()) bw.write(record.toString()) } ' –

3

如果您正在编写自己的客户,则应该在同一个应用程序中包含写入文件的逻辑。使用预先打包的控制台使用者,您可以将其管道化为文件。例如:kafka-console-consumer > file.txt

另一个(无代码)选项将尝试StreamSets Data Collector一个开源的Apache许可工具,它也具有拖放UI。它包含用于Kafka和各种数据格式的内置连接器。

*完全披露我是这个项目的提交者。

0

这是可能的。以下是这个的工作代码。

package com.venk.prac; 

import java.io.BufferedWriter; 
import java.io.FileWriter; 
import java.io.IOException; 
import java.util.Collections; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.serialization.IntegerDeserializer; 
import org.apache.kafka.common.serialization.StringDeserializer; 

import kafka.utils.ShutdownableThread; 

public class FileConsumer extends ShutdownableThread { 

    private final KafkaConsumer<Integer, String> kafkaConsumer; 
    private String topic; 
    private String filePath; 
    private BufferedWriter buffWriter; 

    public FileConsumer(String topic, String filePath) { 

     super("FileConsumer", false); 
     Properties properties = new Properties(); 
     properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
       KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING); 
     properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer"); 
     properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); 
     properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); 
     properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); 
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 

     kafkaConsumer = new KafkaConsumer<Integer, String>(properties); 
     this.topic = topic; 
     this.filePath = filePath; 

     try { 
      this.buffWriter = new BufferedWriter(new FileWriter(filePath)); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void doWork() { 
     // TODO Auto-generated method stub 
     kafkaConsumer.subscribe(Collections.singletonList(this.topic)); 
     ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); 
     try { 
      for (ConsumerRecord<Integer, String> record : consumerRecords) 
       buffWriter.write(record.value() + System.lineSeparator()); 
      buffWriter.flush(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public String name() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public boolean isInterruptible() { 
     return false; 
    } 

}