2014-11-14 65 views
0

我有一个Grails应用程序,每天在午夜运行一项作业。在我的示例应用程序我有10000条Person记录,并做石英工作如下:使用grails和gpars处理大量数据

package threading 

import static grails.async.Promises.task 
import static groovyx.gpars.GParsExecutorsPool.withPool 

class ComplexJob { 
    static triggers = { 
     simple repeatInterval: 30 * 1000l 
    } 

    def execute() { 
     if (Person.count == 5000) { 
      println "Executing job"     
      withPool 10000, { 
       Person.listOrderByAge(order: "asc").each { p -> 
        task { 
         log.info "Started ${p}" 
         Thread.sleep(15000l - (-1 * p.age)) 
        }.onComplete { 
         log.info "Completed ${p}" 
        } 
       } 
      }     
     } 
    } 
} 

忽略repeatInterval因为这只是用于测试目的。 当作业被执行,我得到以下异常:

2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job 
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000] 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111) 
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202) 
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 
    at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) 
    at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:305) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:286) 
    at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93) 
    at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41) 
    at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68) 
    at grails.async.Promises.task(Promises.java:123) 
    at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20) 
    at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19) 
    at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192) 
    at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136) 
    at threading.ComplexJob.execute(ComplexJob.groovy:18) 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104) 
    ... 2 more 
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

看来,如果线程池还没有被设置为10000,而我用withPool(10000) 我是不是可以这样做计算(现在只打印日志报表)大块?如果是这样,我怎么能告诉最近的项目是被处理的(例如,继续)?

+5

为什么不只是使用一个较小的池(1000例外说)?创建10000个线程来完成这项工作不可能比按顺序完成每个任务要快。 – 2014-11-14 15:22:56

+3

为什么不使用实际上为您的Grails应用程序中的Spring Batch批处理设计的内容?这就是我所做的,并且工作得很好。 – 2014-11-14 15:26:23

+0

情况并非如此,池越大,处理速度越快。前一段时间我使用了100个线程,实际上JVM的效率实际上造成了很大的问题。经过反复试验发现,15个线程完全够用了。 – Opal 2014-11-14 22:37:48

回答

0

试图将每个元素的处理包装为任务看起来并不是最优的。标准的并行处理方式是将整个任务分解为适当数量的子任务。您开始选择这个号码。对于CPU绑定任务,您可能创建N =处理器数量任务。然后你将任务分成N个子任务。就像这样:

persons = Person.listOrderByAge(order: "asc") 
nThreads = Runtime.getRuntime().availableProcessors() 
size = persons.size()/nThreads 
withPool nThreads, { 
    persons.collate(size).each { subList => 
     task { 
      subList.each { p => 
       ...  
      } 
     }   
    } 
} 
1

我怀疑withPool()方法没有任何影响,因为任务是最有可能使用默认的线程池,而不是在withPool创建的。尝试删除对withPool()的调用并查看任务是否仍然运行。

GPars中的groovyx.gpars.scheduler.DefaultPool池(默认为任务)使用任务调整大小,并限制为1000个并发线程。

我建议创建一个固定大小的池代替,例如:

def group = new DefaultPGroup(numberOfThreads) 
group.task {...} 

注:我不熟悉grails.async任务,只有核心GPars的,这样的事情可能会略有不同在grails.async中的PGroups周围。