我实现了卡夫卡的消费者应用程序,我只是想知道,如果我在PM2集群模式下运行这个程序,将所有的内核消耗相同的消息或不同的消息?有没有一种方法可以验证它?在集群模式下运行此应用程序是否理想?我在集群模式下运行这个原因是因为我们的卡夫卡产生了大量的消息。卡夫卡的NodeJS与消费群PM2
也是目前如果我在PM2集群模式下运行,这所有的核心都达到了它的CPU使用率达到100%。它是否会像这样发生?
FYI:我使用https://www.npmjs.com/package/no-kafka
我实现了卡夫卡的消费者应用程序,我只是想知道,如果我在PM2集群模式下运行这个程序,将所有的内核消耗相同的消息或不同的消息?有没有一种方法可以验证它?在集群模式下运行此应用程序是否理想?我在集群模式下运行这个原因是因为我们的卡夫卡产生了大量的消息。卡夫卡的NodeJS与消费群PM2
也是目前如果我在PM2集群模式下运行,这所有的核心都达到了它的CPU使用率达到100%。它是否会像这样发生?
FYI:我使用https://www.npmjs.com/package/no-kafka
将全部核消耗相同的消息或不同的消息?有没有一种方法可以验证它?
这取决于你主题配置+消费结构。我们举个例子吧。
也是目前如果我在PM2集群模式下运行,这所有的核心都达到了它的CPU使用率达到100%。它是否会像这样发生?
我并不真正熟悉no-kafka以及消息如何处理。
但检查时,库等待提交是否获取下一批消息之前发生。
如果不是有可能是一个机会,你的进程的信息创建过多的处理程序。
基于PM2的群集仅适用于网络服务器,因为群集进程共享传入网络端口并分发请求。
就你而言,数据源是消息订阅,它必须手动分发到集群的工作进程。
所以,为了安全起见,主进程应与数据源的交互,并均匀地分配消息工作进程,以便在外部,它似乎是一个单一的消费者,但仍然可以处理所有的消息CPU核心。
下面的例子演示了这样的设置,而不依赖于基于PM2集群:
const cluster = require('cluster');
const _ = require('lodash');
const os = require('os');
// dispatch index
let dispatchIndex = 0;
/**
* Dispatches data to workers in a cyclic fashion
* @param {*} data - data to process
*/
function dispatch(data) {
// ensure master
if (!cluster.isMaster) {
throw new Error('Only master can dispatch');
}
// get worker ids, sorted
const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);
// ensure at least one worker is available
if (workersIds.length < 1) {
throw new Error('No worker process alive');
}
// select next worker
dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
const worker = cluster.workers[workersIds[dispatchIndex]];
dispatchIndex++;
// send data to worker
worker.send(data);
}
// Main Script
if (cluster.isMaster) {
// Setup master process
console.info(`Master ${process.pid} started.`);
// fork worker processes to match available CPUs
const numCpu = os.cpus().length;
for (let i = 0; i < numCpu; i++) {
cluster.fork();
}
// *** Get/Subscribe data from external source and dispatch to workers ***
setInterval(() => dispatch({ a: 'value' }), 1000);
} else if (cluster.isWorker) {
// Setup worker process
console.info(`Worker ${process.pid} started.`);
// *** handle dispatched data ***
process.on('message', (data) => {
console.info(`Data processed by ${process.pid}`);
});
}
这也是很好的阅读了cluster module documentation。
请检查配置选项并按可用分区数拆分您的使用者。如果您的消费者正在阅读特定主题的多个分区,则会收到重复的消息, –