2012-01-27 77 views
3

我希望jobs.create失败,如果一个相同的工作已经在系统中。有什么办法可以实现这个吗?kue for node.js的独特工作

我需要每24小时运行一次相同的作业,但有些作业甚至可能需要24小时以上,所以我需要确保作业尚未在系统中(活动,排队o失败)添加它。

已更新: 好的,我将简化问题,以便能够在这里解释它。 唯恐我有一个分析服务,我必须每天向我的用户发送一次报告。有时完成这些报告(只有少数情况,但有可能)需要几个小时甚至一天以上。

我需要一种方法来知道哪些是当前正在运行的作业,以避免重复作业。我找不到'''''''''API中的任何内容来知道哪些作业正在运行。当需要更多工作时,我还需要某种事件,然后致电我的生产商getMoreJobs

也许我的方法是错误的,如果是这样,请让我知道一个更好的方法来解决我的问题。

这是我的简化代码:

var kue = require('kue'), 
    cluster = require('cluster'), 
    numCPUs = require('os').cpus().length; 

numCPUs = CONFIG.sync.workers || numCPUs; 

var jobs = kue.createQueue(); 

if (cluster.isMaster) { 
    console.log('Starting master pid:' + process.pid); 
    jobs.on('job complete', function(id){ 
    kue.Job.get(id, function(err, job){ 
     if (err || !job) return; 
     job.remove(function(err){ 
      if (err) throw err; 
      console.log('removed completed job #%d', job.id); 
     }); 
    }); 

    function getMoreJobs() { 
     console.log('looking for more jobs...'); 
     getOutdateReports(function (err, reports) { 
      if (err) return setTimeout(getMoreJobs, 5 * 60 * 60 * 1000); 

      reports.forEach(function(report) { 
       jobs.create('reports', { 
        id: report.id, 
        title: report.name, 
        params: report.params 
       }).attempts(5).save(); 
      }); 

      setTimeout(getMoreJobs, 60 * 60 * 1000); 
     }); 
    } 

    //Create the jobs 
    getMoreJobs(); 

    console.log('Starting ', numCPUs, ' workers'); 
    for (var i = 0; i < numCPUs; i++) { 
     cluster.fork(); 
    } 

    cluster.on('death', function(worker) { 
     console.log('worker pid:' + worker.pid + ' died!'.bold.red); 
    }); 

} else { 
    //Process the jobs 
    console.log('Starting worker pid:' + process.pid); 
    jobs.process('reports', 20, function(job, done){ 
     //completing my work here 
     veryHardWorkGeneratingReports(function(err) { 
      if (err) return done(err); 
      return done(); 
     }); 
    }); 
} 
+0

需要更多信息,代码或其他... – Teemu 2012-01-27 11:45:32

+0

@Teemu我已更新我的问题,谢谢! – aartiles 2012-01-27 12:38:51

回答

2

https//github.com/LearnBoost/kue

在json.js脚本中检查行64-112。在那里你会找到返回一个包含作业的对象的方法,同时也用类型,状态或id范围进行过滤。 (jobRange()jobStateRange()jobTypeRange()

主要页面向下滚动到JSON API -section,你会发现返回的对象的实例。

那如何调用和使用那些你知道比我更好的方法。

jobs.create()将失败,如果您传递未知的关键字。我会创建一个函数来检查forEach -loop中的当前作业,并返回一个关键字。然后在jobs.create()-参数中调用该函数而不是文字关键字。

通过json.js中的这些方法获得的信息可以帮助您创建“moreJobToDo”事件。

+0

谢谢,这些方法对我有很大帮助 – aartiles 2012-01-28 09:34:40

3

你的问题之一的答案是,Kue将它从redis队列中弹出的作业放入“active”中,除非你找到它们,否则你将永远得不到它们。

另一个问题的答案是,您的分布式工作队列是消费者,而不是任务的生产者。像他们一样盯着它们是可以的,但是,这是一个泥泞的范例。我用Kue所做的是为kue的json api制作一个包装器,以便可以从系统中的任何位置将作业放入队列中。由于您似乎需要铲除工作,因此我建议编写一个单独的生产者应用程序,该应用程序除了获得外部工作并将其粘贴到Kue工作队列中之外什么都不做。它可以监视工作队列,以便在作业运行时间较短并加载批处理时执行,或者,我会做的就是尽可能快地铲除作业,并将多个客户应用程序实例缓存以处理负载更快速。

重新迭代:您的问题分离在这里不是很好。你应该有一个完全独立于你的任务消费者应用程序的任务生产者。这为您提供了更大的灵活性,易于扩展(只需启动另一台计算机上的另一位用户,然后您就可以进行扩展),并且整体简化了代码管理。如果可能的话,您应该允许任何人向您提供这些任务,让您“查找”访问您的Kue服务器的JSON API,而不是外出找到它们。工作生产者可以通过Kue安排自己的任务。

+2

(是的,我知道我正在回答一个老问题,但我会投入资金去看SE的Q&A是否包含正确的答案) – 2012-05-08 22:28:10