2016-04-03 33 views
-2

假设我有需要处理的x个不同类型的项目。 不断从输入源中读取新项目并传递到此线程进行处理。独特的锁池 - 基于类别的队列

下面是规则:

  1. 只有一种类型的每一种类型的可以在给定时间进行处理。
  2. 不同时间的项目可以同时处理?
  3. 根据类别类型需要处理的顺序。假设有3个新项目,其中2个是1类,2个是1类,2类或1类都可以消耗。但是,当消费第一类物品时,应该保持它们到达的顺序。

如果有x个不同的类型,并且每种类型的项目已被捕获。我应该能够同时处理x个项目。

1类 -  项目-C1-1  项目-C1-2
2类 -  项目-C2-1  项目-C2-2  项目-C2-3
类别-3 -  项目-C3-1
类别-4- -  项目-C4-1  项目-C4-2

在此示例中,每个类别的项目都存在,每个队列中的一个项目应该是可消费的。

假设这里的每个类别是BlockingQueue。并且每个类别都有一个单独的处理线程。

什么是最好的方式来存储传入的项目,使不同的线程试图从每个队列中挑选项目可以同时并且有效地进行?

什么是合适的数据结构来保存BlockingQueue的这样的项目可以有效地获取?

如果您可以建议更好的结构来保存传入值,请分享。

+1

“*我可以同时处理它们,但每种类型只能处理一个类型。”“这是自相矛盾的,就像是说我想要做,但我不想做..请更清楚地说明你的要求.. – hagrawal

+0

说在问题中。提示:编辑它。 –

+1

@ bhrt93使用“*我可以同时处理它们,但是每种类型只能处理一种类型*”有了读者如何理解您想要以不同类别处理类别和项目的方式,与你原来的问题是自相矛盾的。只有在你编辑你的问题并增加了更多的信息之后,你才会明白你想达到什么。 – hagrawal

回答

2

以下是一个队列管理器,可以帮助您查找所需内容。它为每个支持的类型创建QueueLock。对于任何对象,可以调用getTypeQueue来获取包含QueueLock的对象。您有责任确保没有线程访问队列,除非它首先获取锁,并且它保持该锁,直到它完成处理该项。

public final class QueueManager { 
    // maps item classes to their TypeQueue objects 
    private static final Map<Class<?>, TypeQueue<?>> entryMap = new HashMap<>(); 

    // add all the supported types here 
    static { 
     entryMap.put(Item1.class, new TypeQueue<Item1>()); 
     entryMap.put(Item2.class, new TypeQueue<Item2>()); 
    } 

    /** 
    * @param o The object for which the TypeQueue is to be retrieved 
    * @return The TypeQueue associated with this object. 
    * @throw IllegalArgumentException if the specified object is not supported 
    *  by this QueueManager 
    */ 
    @SuppressWarnings("unchecked") 
    public static <T> TypeQueue<T> getTypeQueue(T o){ 
     TypeQueue<?> res = entryMap.get(o.getClass()); 
     if(res == null) 
      throw new IllegalArgumentException("Cannot get queue for specified object class."); 
     /* 
     * cast is safe since res is non-null implies T is a supported class 
     * which has a static entry in the entryMap 
     */ 
     return (TypeQueue<T>) res; 
    } 

    //private to prevent instantiation 
    private QueueManager() { 
     throw new AssertionError(); 
    } 

    // an entry houses the lock and the queue for each item type 
    static class TypeQueue<T> { 
     final private Lock lock; 
     final private Queue<T> queue; 

     private TypeQueue() { 
      this.lock = new ReentrantLock(); 
      this.queue = new LinkedList<>(); 
     } 

     public Lock getReentrantLock(){ return lock;} 
     public Queue<T> getQueue(){ return queue;} 
    } 
} 
2

让我们假设有N类

创建项目主队列。

创建一个N个布尔值(或AtomicBoolean)的数组,初始化为false。这些代表每个类别的“繁忙”标志。

创建N个“延迟”队列的数组,每个类别一个队列。

算法:

  • 当新邮件到达,将其添加到队列中。

  • 当工作线程正在找工作,它执行以下操作:

    1. 采取的第一个项目从队列
    2. 获得该项目的类别
    3. 原子:
      • 检查项目类别标志是否“繁忙”。
      • 如果是,该项目添加到“延迟”队列类别和去1
      • 如果没有,设置项目类别标志,以“忙”
    4. 开始处理项目
  • 当工人线程完成一个项目:

    1. 得到处理项目的类别
    2. 原子:
      • 如果“延迟”队列中类非空,采取的第一个元素
      • 如果“延迟”队列为空,清除该类别的“忙”标志。
    3. 如果我们从延期队列中取一个项目,开始处理它。否则,在主队列中寻找新的工作。

注意,在上述的同步是“粗略”,和代码有轻微的“肉酱”气味给它。嘿,这是伪代码!

以上将公平地处理项目(即按照到达顺序),受限于处理任何给定类别中任何一个项目的大部分时间。

  • 它不需要每个类别的工作线程。
  • 它不会阻止工作线程,如果有工作可以做(除类别约束)。
  • 如果队列中存在N个给定类别的项目,在寻找工作时不会出现最坏情况的O(N)减速。事实上,根本没有放缓。

一种更简单的方法是创建为每个类别一个单独的队列,并为每个队列的单个工作线程。新项目只是简单地添加到队列中用于其类别,并按该类别的工作线程按顺序处理。

如果N(您的类别数量)相对较小,那么拥有一堆空闲线程和/或线程争用的开销应该不成问题。如果N是可用物理内核数量的10倍以上,那么我会开始担心。

(注:这个就不用你的问题的原始版本工作,在那里,似乎到n 1000)

+0

这种方法克服了我自己的答案中的几个问题,特别是找到可用项目队列的挑战。 +1 –

+0

是的......它花了一些思考。 –

+0

@StephenC - 非常感谢,但我看到的东西似乎有所不同。我已经标记了这个问题的编辑。 – bhrt

1

除非你有1000级的CPU,你不能同时处理它们。我建议,而不是你有N个单线程的执行者,其中大约是双CPU的,你有这样的

public class WorkerPool { 
    final ExecutorService[] executors; 

    public WorkerPool() { 
     this(Runtime.getRuntime().availableProcessors()*2); 
    } 

    public WorkerPool(int workers) { 
     executors = IntStream.range(0, workers) 
       .mapToObj(i -> Executors.newSingleThreadExecutor()) 
       .toArray(ExecutorService[]::new); 
    } 

    public Future<?> submit(Object key, Runnable runnable) { 
     int w = hash(key); 
     return executors[w].submit(runnable); 
    } 

    public <T> Future<T> submit(Object key, Callable<T> callable) { 
     int w = hash(key); 
     return executors[w].submit(callable); 
    } 

    private int hash(Object key) { 
     return (key.hashCode() & 0x7FFF_FFFF) % executors.length; 
    } 
} 

数这将确保你有足够的就业岗位,保证所有CPU的繁忙,而无需使用锁的开销。

例如

WorkerPool pool = new WorkerPool(); 
pool.submit("2", task1); 
pool.submit("2", task2); 
pool.submit("3", task3); 
pool.submit("2", task4); 

在这种情况下,task2无法启动,直到task1已完成,task4等待task2然而task3可能会立即启动。