这里是一个解决方案,使用OODK-JS通过webworkers计算1.000.000条目数组的总和。
该解决方案使用SynchronizedQueue基础类实现生产者/消费者设计模式:生产者(主线程)为数组的每个块生成任务并将其添加到队列中。消费者(webworker)在队列中执行任务并执行它,直到没有人离开。一旦所有任务的执行,生产者显示最终结果
// main.js (producer)
OODK.config({
'path': {
'oodk': '../src',
'workspace': 'workspace'
}
});
OODK(function($, _){
$.import('{oodk}/foundation/utility/Thread', '[util.concurrent]', '{workspace}/project/Task');
// array helper class to handle arrays
var ArrayHelper = $.class(function($, µ, _){
$.static(function($, µ, _){
// slice an array into chunks using chunkLength argument
// as delimiter
$.public(function slice(arr, chunkLength){
return arr.reduce(function(arr, val, index){
var chunkIndex = Math.floor(index/chunkLength);
if(!arr[chunkIndex]) {
arr[chunkIndex] = [];
}
arr[chunkIndex].push(val);
return arr;
}, []);
});
// generate an array of len argument length
// containing random values
$.public(function random(len){
var arr = [];
for(var i =0; i<len; i++){
arr.push(Math.random()*10);
}
return arr;
})
});
});
// class to handle a pool of thread
var ThreadPool = $.class(function($, µ, _){
// number of threads to instantiate
$.private('num');
// queue to works with
$.private('queue');
$.public(function __initialize(num, queue){
_.num = num;
_.queue = queue;
});
// start the pool
$.public(function start(){
// bind listeners
var threadListener= $.new(Producer);
for(var i=0; i<_.num; i++){
// instantiate consumers
var consumer = $.new(OODK.foundation.util.Thread, "consumer.js");
$.on(consumer, 'thread.ready', threadListener);
consumer.start();
}
$.on(_.queue, 'synchronizedQueue.taskDone', threadListener);
});
});
// Event Listener for the thread
var Producer = $.implements(OODK.foundation.EventListener).class(function($, µ, _){
// number of task done
$.private('taskDone', 0);
// final result
$.private('finalResult', 0);
$.private(function __processEvent(evt){
if(evt.getType() === 'thread.ready'){
// the thread is ready, synchronize the queue with the current thread
queue.synchronize(evt.getTarget());
}else if(evt.getType() == 'synchronizedQueue.taskDone'){
//message received from the consumer that it has performed a task
_.taskDone++;
var cqueue = evt.getTarget();
var chunkResult = evt.getData();
_.finalResult += chunkResult;
jQuery('#chunksDone').text(_.taskDone);
if(cqueue.getCapacity() == _.taskDone){
// once all tasks are performed display the final result
$.log('final sum is ' + _.finalResult);
}else{
// each time a chunk is calculated display the intermediate result
$.log('intermediate result ' + _.finalResult);
}
}
});
});
// generate a large array of 1.000.000 random values
var myHugeArray = ArrayHelper.self.random(1000000);
// split this array into chunks of 2500 length
var chunks = ArrayHelper.self.slice(myHugeArray, 25000);
// instantiate a synchronized queue setted as size the number of chunks
var queue = $.new(OODK.foundation.util.concurrent.SynchronizedQueue, chunks.length);
// for each chunk create a task and add it to queue
for(var i=0; i<chunks.length; i++){
var chunk = chunks[i];
// create a task for each chunk of the array
var task = OODK.project.Task.self.factory(chunk);
// and add it to the queue
queue.put(task);
}
// instantiate a pool of 2 threads working on the given queue
var threadPool = $.new(ThreadPool, 2, queue);
// start the pool
threadPool.start();
$.log('calculate the sum of an array of 1.000.000 entries using 2 threads ...');
});
消费者(webworker)
//consumer.js
OODK.config({
'path': {
'oodk': '../src',
'workspace': 'workspace'
}
});
OODK(function($, _){
// import the concurrent API package as well as the task class
$.import('[util.concurrent]', '{workspace}/project/Task');
// start the synchronizer
OODK.foundation.util.concurrent.SynchronizedObject.self.start();
// EventListener Class to handle synchronized queue events
$.implements(OODK.foundation.EventListener).class(function Consumer($, µ, _){
$.protected(function __processEvent(evt){
if(evt.getType() == 'synchronizedQueue.ready'){
//queue is synchronized
var queue = evt.getTarget();
// bind listener
$.on(queue, 'synchronizedQueue.elementRetrieved', this);
// take a task: get the heap of the stack and delete it
queue.take();
}else if(evt.getType() == 'synchronizedQueue.elementRetrieved'){
// task is retrieved from the queue
var task = evt.getData();
var queue = evt.getTarget();
// execute the task
var result = task.execute();
// notify the producer that the task is done
queue.notify('synchronizedQueue.taskDone', result);
if(queue.remainingElements()>0){
// at least one task is still in the queue, take it
queue.take();
}
}
});
});
var threadListener = $.new(_.Consumer);
// global listener for the synchronizedQueue.ready event
// triggered when the synchronzied queue is synchronized with this thread
$.on('synchronizedQueue.ready', threadListener);
});
任务类来实现自定义逻辑
OODK('project', function($, _){
$.public().implements(OODK.foundation.Serializable).class(function Task($, µ, _){
// the array chunk to calculate
$.private('chunk');
$.public(function __initialize(chunk){
_.chunk = chunk;
});
// calculate the sum of all entries of a chunk
// implements the custom logic here
$.public(function execute(){
var result = 0;
for(var i=0; i<_.chunk.length; i++){
result += _.chunk[i];
}
return result;
});
$.static(function($, µ, _){
$.public(function factory(chunk){
var task = $.new($.ns.Task, chunk);
return task;
});
});
});
});
'setTImeout'会工作,你的问题是什么?你试过了吗?张贴你已经尝试过,最好在jsfiddle.net并发布链接。 – mikeb
使用webworkers https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers – rafaelcastrocouto
好的谢谢你的建议。较新的尝试过。也许有些人有良好的工作榜样? –