0

我实现了卡夫卡的消费者应用程序,我只是想知道,如果我在PM2集群模式下运行这个程序,将所有的内核消耗相同的消息或不同的消息?有没有一种方法可以验证它?在集群模式下运行此应用程序是否理想?我在集群模式下运行这个原因是因为我们的卡夫卡产生了大量的消息。卡夫卡的NodeJS与消费群PM2

也是目前如果我在PM2集群模式下运行,这所有的核心都达到了它的CPU使用率达到100%。它是否会像这样发生?

FYI:我使用https://www.npmjs.com/package/no-kafka

+0

请检查配置选项并按可用分区数拆分您的使用者。如果您的消费者正在阅读特定主题的多个分区,则会收到重复的消息, –

回答

2

将全部核消耗相同的消息或不同的消息?有没有一种方法可以验证它?

这取决于你主题配置+消费结构。我们举个例子吧。

  • 假设我们有一个包含3个分区的主题。
  • 现在我们开始1消费者流程与消费者群体“some_consumer_group”。对于comsumer小组的细节看看这里https://www.npmjs.com/package/no-kafka#groupconsumer-new-unified-consumer-api
  • 现在您的一位消费者正在倾听3个分区。
  • 因为卡夫卡维护每个主题的偏移量,每个分区对每个消费群体消费的将获得3个消息,从3个不同的分区。因此没有消息的重复。
  • 现在让我们再添加一个消费者流程。
  • 现在消费者组1的消费者组“some_consumer_group”正在监听分区0和1,而消费者组2的消费者组“some_consumer_group”正在监听分区2(它也可能会反过来)。
  • 最后,如果我们为该组添加更多的消费者,现在我们让每个消费者都聆听1个分区
  • 如果这是设置,您将不会遇到重复的消息。

也是目前如果我在PM2集群模式下运行,这所有的核心都达到了它的CPU使用率达到100%。它是否会像这样发生?

我并不真正熟悉no-kafka以及消息如何处理。

但检查时,库等待提交是否获取下一批消息之前发生。

如果不是有可能是一个机会,你的进程的信息创建过多的处理程序。

2

基于PM2的群集仅适用于网络服务器,因为群集进程共享传入网络端口并分发请求。

就你而言,数据源是消息订阅,它必须手动分发到集群的工作进程。

所以,为了安全起见,主进程应与数据源的交互,并均匀地分配消息工作进程,以便在外部,它似乎是一个单一的消费者,但仍然可以处理所有的消息CPU核心。

下面的例子演示了这样的设置,而不依赖于基于PM2集群:

const cluster = require('cluster'); 
const _ = require('lodash'); 
const os = require('os'); 

// dispatch index 
let dispatchIndex = 0; 

/** 
* Dispatches data to workers in a cyclic fashion 
* @param {*} data - data to process 
*/ 
function dispatch(data) { 

    // ensure master 
    if (!cluster.isMaster) { 
     throw new Error('Only master can dispatch'); 
    } 

    // get worker ids, sorted 
    const workersIds = _.sortBy(_.keys(cluster.workers), _.identity); 

    // ensure at least one worker is available 
    if (workersIds.length < 1) { 
     throw new Error('No worker process alive'); 
    } 

    // select next worker 
    dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex; 
    const worker = cluster.workers[workersIds[dispatchIndex]]; 
    dispatchIndex++; 

    // send data to worker 
    worker.send(data); 
} 


// Main Script 
if (cluster.isMaster) { 

    // Setup master process 
    console.info(`Master ${process.pid} started.`); 

    // fork worker processes to match available CPUs 
    const numCpu = os.cpus().length; 
    for (let i = 0; i < numCpu; i++) { 
     cluster.fork(); 
    } 

    // *** Get/Subscribe data from external source and dispatch to workers *** 
    setInterval(() => dispatch({ a: 'value' }), 1000); 

} else if (cluster.isWorker) { 

    // Setup worker process 
    console.info(`Worker ${process.pid} started.`); 

    // *** handle dispatched data *** 
    process.on('message', (data) => { 
     console.info(`Data processed by ${process.pid}`); 
    }); 
} 

这也是很好的阅读了cluster module documentation