2016-02-25 70 views
1

我需要能够为模型对象找到排队和/或工作作业和/或失败的作业,比如说,当模型对象被销毁时,我们想要查找所有模型对象并决定不删除或销毁作业(有条件的)。如何查找ActiveRecord模型对象的关联Resque作业?

有没有推荐的方法来做到这一点之前,我重新发明轮子?

例子:

如果你想创建一个before_destroy回调时破坏的对象被销毁(排队和失败的作业),且仅当没有工作职位

的一些伪代码摧毁所有作业我在想什么在这个例子中使用的情况下,要做到:

报表模型

class Report < ActiveRecord::Base 
    before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs 

    def check_if_working_jobs 
    # find all working jobs related to this report object 
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty? 
    end 

    def destroy_queued_and_failed_jobs 
    # find all jobs related to this report object 
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # destroy/remove all jobs found 
    (queued_jobs + failed_jobs).each do |job| 
     # destroy the job here ... commands? 
    end 

    end 
end 

报表处理工作呃类为resque/redis支持作业

class ProcessReportWorker 

    # find the jobs by report id which is one of the arguments for the job? 
    # envisioned as separate methods so they can be used independently as needed 

    def self.find_queued_jobs_by_report_id(id) 
    # parse all jobs in all queues to find based on the report id argument? 
    end 

    def self.find_working_jobs_by_report_id(id) 
    # parse all jobs in working queues to find based on the report id argument? 
    end 

    def self.find_failed_jobs_by_report_id(id) 
    # parse all jobs in failed queue to find based on the report id argument? 
    end 
end 

这种方法正在跟踪什么需要发生?

以上缺失的部分是什么,通过模型对象ID找到排队或工作的作业,然后将其销毁?

是否已有方法查找和/或销毁我在文档或我的搜索中遗漏的关联模型对象ID?

更新:修改了使用示例,只使用working_jobs作为检查是否应该删除的一种方法vs建议我们也会尝试删除working_jobs。 (因为删除工作任务比简单地删除重拨键项更涉及)

回答

1

它在这里很安静,没有任何反应,所以我设法花一天时间去解决这个问题,遵循我在我的问题中指出的路径。可能有更好的解决方案或其他方法可用,但这似乎完成了迄今为止完成的工作。如果在此使用的方法有更好的选择,或者可以进一步改进,请随时发表评论。

这里的总的做法是,你需要通过所有作业进行搜索(排队,工作,失败),并过滤掉只为classqueue是相关的,那场比赛的对象记录ID,你在正确的找工作args数组的索引位置。例如(确认classqueue匹配后)如果参数位置0是对象ID的位置,则可以测试以查看args[0]是否与对象ID匹配。

从本质上讲,作业相关的对象ID,如果:job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id

  • 排队的作业:找到你需要收集所有的Redis 项名为queue:#{@queue}键,其中@queue是所有排队的作业您工作人员正在使用的队列的名称为 。如果您使用多个队列作为 某个特定的工人类,则相应地修改 循环多个队列。Resque.redis.lrange("queue:#{@queue}",0,-1)
  • 失败的作业:要找到所有排队的,你需要收集所有的Redis 项名为failed键(除非你使用多个 故障队列或比默认安装一些其他的)工作。 Resque.redis.lrange("failed",0,-1)
  • 工作职位:要找到可以使用的所有工作职位Resque.workers 它包含了所有的工人和正在运行的作业的阵列。 Resque.workers.map(&:job)
  • 作业:上面每个列表中的每个作业都是一个编码散列。您可以使用Resque.decode(job)将作业解码为红宝石哈希。
  • 类和args:对于排队作业,所述classargsjob["class"]job["args"]。对于失败工作工作这些是job["payload"]["class"]job["payload"]["args"]
  • 队列:对于每个失败工作职位发现,队列会job["queue"]。在测试对象ID的参数列表之前,您只需要匹配classqueue的作业。排队工作列表将已被限制在您收集的队列中。

以下是查找(和删除)与示例模型对象(报表)关联的作业的示例工人类和模型方法。

为resque/redis的支持工作

报告处理工人阶级

class ProcessReportWorker 
    # queue name 
    @queue = :report_processing 
    # tell the worker class where the report id is in the arguments list 
    REPORT_ID_ARGS_INDEX = 0 

    # <snip> rest of class, not needed here for this answer 

    # find jobs methods - find by report id (report is the 'associated' object) 

    def self.find_queued_jobs_by_report_id report_id 
    queued_jobs(@queue).select do |job| 
     is_job_for_report? :queued, job, report_id 
    end 
    end 

    def self.find_failed_jobs_by_report_id report_id 
    failed_jobs.select do |job| 
     is_job_for_report? :failed, job, report_id 
    end 
    end 

    def self.find_working_jobs_by_report_id report_id 
    working_jobs.select do |worker,job| 
     is_job_for_report? :working, job, report_id 
    end 
    end 

    # association test method - determine if this job is associated  

    def self.is_job_for_report? state, job, report_id 
    attributes = job_attributes(state, job) 
    attributes[:klass] == self.name && 
     attributes[:queue] == @queue && 
     attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id 
    end 

    # remove jobs methods 

    def self.remove_failed_jobs_by_report_id report_id 
    find_failed_jobs_by_report_id(report_id).each do |job| 
     Resque::Failure.remove(job["index"]) 
    end 
    end 

    def self.remove_queued_jobs_by_report_id report_id 
    find_queued_jobs_by_report_id(report_id).each do |job| 
     Resque::Job.destroy(@queue,job["class"],*job["args"]) 
    end 
    end 

    # reusable methods - these methods could go elsewhere and be reusable across worker classes 

    # job attributes method 

    def self.job_attributes(state, job) 
    if state == :queued && job["args"].present? 
     args = job["args"] 
     klass = job["class"] 
    elsif job["payload"] && job["payload"]["args"].present? 
     args = job["payload"]["args"] 
     klass = job["payload"]["class"] 
    else 
     return {args: nil, klass: nil, queue: nil} 
    end 
    {args: args, klass: klass, queue: job["queue"]} 
    end 

    # jobs list methods 

    def self.queued_jobs queue 
    Resque.redis.lrange("queue:#{queue}", 0, -1) 
     .collect do |job| 
     job = Resque.decode(job) 
     job["queue"] = queue # for consistency only 
     job 
     end 
    end 

    def self.failed_jobs 
    Resque.redis.lrange("failed", 0, -1) 
     .each_with_index.collect do |job,index| 
     job = Resque.decode(job) 
     job["index"] = index # required if removing 
     job 
     end 
    end 

    def self.working_jobs 
    Resque.workers.zip(Resque.workers.map(&:job)) 
     .reject { |w, j| w.idle? || j['queue'].nil? } 
    end 

end 

那么接下来的报表模型的使用示例变得

class Report < ActiveRecord::Base 
    before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs 

    def check_if_working_jobs 
    # find all working jobs related to this report object 
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty? 
    end 

    def remove_queued_and_failed_jobs 
    # find all jobs related to this report object 
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # extra code and conditionals here for example only as all that is really 
    # needed is to call the remove methods without first finding or checking 

    unless queued_jobs.empty? 
     ProcessReportWorker.remove_queued_jobs_by_report_id(self.id) 
    end 

    unless failed_jobs.empty? 
     ProcessReportWorker.remove_failed_jobs_by_report_id(self.id) 
    end 

    end 
end 

解决方案需要,如果你使用的多个队列进行修改工人类,或者如果你有多个失败队列。另外,还使用了redis失败后端。如果使用不同的失败后端,则可能需要进行更改。