2017-06-01 79 views
0

我为卡夫卡客户端和生产者创建了单一类,只创建一个对象。 我需要多次发布相同的主题,而无需创建新的客户端和生产者实例。 我发现producer.on('ready',fn(){})没有使用相同的客户端和生产者实例触发,只有当我有新的客户端和生产者对象时它才会被触发。就绪事件上的kafka节点没有被触发

在这里,示例代码:

Singleton类:

const kafka = require('kafka-node'); 
const logger = require('./../../../../applogger'); 
const kafkaConfig = require('./../../../../config/config'); 

function ConnectionProvider() { 
    let kafkaConnection = undefined; 
    let client = undefined; 

    this.getConnection =() => { 

     if (!this.kafkaConnection) { 
      logger.info("Creating new kafka connection ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
      this.kafkaConnection = new kafka.Producer(this.client); 
     } 
     return this.kafkaConnection; 
    }; 
    this.getClient =() => { 
     if (!this.client) { 
      logger.info("Creating new kafka Client ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
     } 
     return this.client; 

    } 
    process.on('SIGINT', function() { 
     logger.info("Going to terminate kafka connection...!"); 
     process.exit(0); 
    }); 
} 
module.exports = exports = new ConnectionProvider; 

主题发布方法:

const kafkaClient = require('./../core/kafkaConnection'); 

    const publishToKafka = function(dataPayload, callback) { 
     logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
     let producer = kafkaClient.getConnection(); 

     producer.on('ready', function() { 
      let payloads = dataPayload; 
      producer.send(payloads, function(err, data) { 
       if (err) { 
        logger.error(
         'Error in publishing message to messaging pipeline ', err 
        ); 
        callback(err, null); 
        return; 
       } 

       logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 

       callback(null, data); 
       return; 
      }); 
     }); 

     producer.on('error', function(err) { 
      logger.error(
       'Error in publishing message to messaging pipeline ', err 
      ); 
      producer.close(); 
     }); 

    }; 

DataPayload是: 让dataPayload = [{主题:某话题,消息:someMessage }]

我需要调用PublishToKafka方法mult很多次,但只想创建一个kafka客户端和生产者实例。 但生产者没有发布主题,因为producer.on('ready',function(){})在使用客户端和生产者的同一对象时没有被触发。

在此先感谢。

回答

0

我通过在每次调用后关闭kafka生产者和客户端实例解决了这个问题,因为我需要多次发布到kafka生产者,但默认情况下kafka zookeeper只允许60个最大连接(如果我们想要增加连接的价值)。所以这就是为什么为单个卡夫卡实例创建单例类。

但是在创建kafka的单个实例后,其producer.on('ready')事件不会被触发,因为第二次当我们使用已经处于就绪状态的kafka生产者的同一对象时。所以我们需要每一次发布新的制作者实例。

const publishToKafka = function(topicName, dataPayload, callback) { 
    logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
    let client = new kafka.Client(kakfaConfig.ZOOKPER_HOST); 
    let producer = new kafka.Producer(client); 


    producer.on('ready', function() { 
     let payloads = dataPayload; 
     producer.send(payloads, function(err, data) { 
      if (err) { 
       logger.error(
        'Error in publishing message to messaging pipeline ', err 
       ); 
       callback(err, null); 
       return; 
      } 

      logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 
      producer.close(); 
      client.close(); 
      callback(null, data); 

      return; 
     }); 
    }); 

    producer.on('error', function(err) { 
     logger.error(
      'Error in publishing message to messaging pipeline ', err 
     ); 
     producer.close(); 
    }); 

}; 

无需为单个对象创建单例类。