2017-03-03 98 views
0

我有,随着每一次 初始化连接产生数据的一个卡夫卡生产类,这是耗时的过程, 因此使其更加快我想实施卡夫卡连接 池。我搜索了很多解决方案,但没有找到合适的 one.Please重定向我正确solution.Thanks。我的卡夫卡生产者 类是:如何实现卡夫卡连接池一样的JDBC连接池

import java.util.Properties; 
import org.apache.log4j.Logger; 
import com.bisil.report.queue.QueueDBFeederService; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaProducer1 implements ProducerService { 
      private static Producer<Integer, String> producer; 
      private static final String topic= "mytopic1"; 
      private Logger logger = Logger.getLogger(KafkaProducer1.class); 

@Override 
public void initialize() { 
     try { 
      Properties producerProps = new Properties(); 
      producerProps.put("metadata.broker.list", "192.168.21.182:9092"); 
      producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
      producerProps.put("request.required.acks", "1"); 
      ProducerConfig producerConfig = new ProducerConfig(producerProps); 
      producer = new Producer<Integer, String>(producerConfig); 
     } catch (Exception e) { 
      logger.error("Exception while sending data to server "+e,e); 

     } 
     logger.info("Test Message"); 
    } 

    @Override 
public void publishMessage(String jsonPacket) { 
      KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String >(topic, jsonPacket); 
      producer.send(keyedMsg); 
     // This publishes message on given topic 
    } 

    @Override 
public void callMessage(String jsonPacket){ 
      initialize(); 
      // Publish message 
      publishMessage(jsonPacket); 
      //Close the producer 
      producer.close(); 

    } 

} 
+0

在initialize方法内部,您可以检查生产者是否已经存在return如果已经初始化。完成所有处理/应用程序关闭后关闭生产者。我建议最好的方法是,使用依赖容器和注入独立生产者使用依赖注入。 – Kaushal

回答

0

你可以把所有的消息在阵列中,反复将其发布到主题,然后关闭生产时done.This方式只有一次初始化和一次接近或破坏被称为。你可以做这样的事情

String[] jsonPacket/// your message array 
for (long i = 0; i < jsonPacket.length; i++) { 
      producer.send(new KeyedMessage<Integer, String>(topic, jsonPacket[i])); 
     } 
     producer.close(); 
+0

其实情况是我有一个表单,当我创建新的或更新现有的时间跨度而不是连续的,数据存储在jsonPacket(单个变量)中时,那么您的建议对我的方案有效吗? –

+0

Ahh ..在这种情况下,你可以检查kaushal答案。你也可以把每几秒钟的应用程序更新,并写入kafka话题。您需要根据您的应用程序设计进行实验 –

0

如果我的理解是正确的,你需要制作的对象池可以是始终可用,当一个新的发布请求时,等待其他请求,当任务完成后,你的要求可能匹配'对象池'(一个对象工厂与在Java中执行器框架工作(池)),这是由Apache公共实现的,因为您需要从池中获取KafkaProducer对象。在apache commons jar中实现并可用的对象池概念。 https://dzone.com/articles/creating-object-pool-java