2014-10-03 75 views
0

我实施应遵循这些要求的自定义数据结构:让两个操作原子

  1. 元素应该是搜索(例如,查找和检索根据.equals()元素)
  2. 当我尝试找回一个元素,它不会被发现该方法应该阻止,直到这样的元素可用。如果从不,该方法永远阻塞或给定超时。

我已经实现了我自己的类 - 我开始修改LinkedBlockingQueue - 但我有一个点,我可以有一个错误的交错,我不知道如何避免。

要测试数据结构我创建了3个线程的方法。这些线程中的每一个都会尝试接收一个唯一的字符串(例如,第一个线程的“Thread1”等)。如果在数据存储中找到该字符串,它将为下一个线程插入一个字符串(例如,线程1将插入“Thread2”)。

这样,一个线程将阻塞且仅当它的信息是在商店发送消息。首先,我开始创建线程后,手动将“Thread1”添​​加到商店。这应该触发线程1得到他的价值了店门和线程插入值,那么2.线程2应通知并得到他的价值和线程3等插入值..

然而,我注意到,循环在传递消息的随机时间后停止。这是 - 我认为 - 由于在takeOrWait()方法中可能出现的错误交错(如此处所示)。我尝试了几种解决方案,但我无法想出一个办法。

问题是 - 我认为 - 我必须释放要修改的锁,然后调用sync.wait()。在这些调用之间,线程可以在队列中插入一个元素,这会导致所有等待线程错过通知。

是否有可能引发wait()之前我解除锁定?

对于completness的缘故我已经加入我用它来测试Main.java类。

BlockingDataStore

public class BlockingStore<T> { 

    // Linked list node. 
    static class Node<E> { 
     /** 
     * The item, volatile to ensure barrier separating write and read 
     */ 
     volatile E item; 
     Node<E> next; 

     Node(E x) { 
      item = x; 
     } 
    } 


    private Node<T> _head; 
    private Node<T> _lastPtr; 
    private int _size; 

    // Locks 
    private Lock changeLock = new ReentrantLock(); 
    private final Object sync = new Object(); 

    ////////////////////////////////// 
    // CONSTRUCTOR     // 
    ////////////////////////////////// 
    public BlockingStore() { 
     _head = null; 
     _lastPtr = null; 
    } 

    ////////////////////////////////// 
    // INTERNAL MODIFICATION  // 
    ////////////////////////////////// 

    /** 
    * Locates an element in the storage and removes it. 
    * Returns null if the element is not found in the list. 
    * 
    * @param toRemove Element to remove. 
    * @return Returns the removed element. 
    */ 
    private T findAndRemove(T toRemove) { 
     T result = null; 

     // Empty queue. 
     if (_head == null) 
      return result; 

     // Do we want the head? 
     if (_head.item.equals(toRemove)) { 
      result = _head.item; 
      _head = _head.next; 
      this._size--; 
      return result; 
     } 

     Node<T> previous = _head; 
     Node<T> current = previous.next; 

     while (current != null) { 
      if (current.item.equals(toRemove)) { 
       // Take the element out of the list. 
       result = current.item; 

       // we have to update the last pointer. 
       if (current == _lastPtr) 
        _lastPtr = previous.next; 
       else 
        previous.next = current.next; 
       this._size--; 
       return result; 
      } 
      previous = current; 
      current = current.next; 
     } 
     return result; 
    } 

    /** 
    * Adds an element to the end of the list. 
    * 
    * @param toAdd Element to add to the end of the list. 
    */ 
    private void addToEnd(T toAdd) { 
     // If the queue is empty 
     if (_head == null) { 
      _head = new Node<T>(toAdd); 
      _lastPtr = _head; 
     } else { 
      _lastPtr.next = new Node<T>(toAdd); 
      _lastPtr = _lastPtr.next; 
     } 
     this._size++; 
    } 

    /** 
    * Takes an element from the front of the list. 
    * Returns null if list is empty. 
    * 
    * @return Element taken from the front of the list. 
    */ 
    private T takeFromFront() { 
     // Check for empty queue. 
     if (_head == null) 
      return null; 

     T result = _head.item; 
     _head = _head.next; 
     this._size--; 
     return result; 
    } 

    ////////////////////////////////// 
    // API METHODS     // 
    ////////////////////////////////// 

    /** 
    * Takes an element from the datastore, 
    * if it is not found the method blocks 
    * and retries every time a new object 
    * is inserted into the store. 
    * 
    * @param toTake 
    * @return 
    */ 
    public T takeOrWait(T toTake) { 
     T value; 
     changeLock.lock(); 
     value = findAndRemove(toTake); 

     // Wait until somebody adds to the store 
     // and then try again. 
     while (value == null) 
      // Sync on the notification object 
      // such that we are waken up when there 
      // is a new element. 
      synchronized (sync) { 
       changeLock.unlock(); // allow writes. 
       // I think I can have bad inter-leavings here. 
       // If an insert is interleaved here, the thread 
       // will never be notified.. 
       try { 
        sync.wait(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       changeLock.lock(); 
       value = findAndRemove(toTake); 
      } 

     changeLock.unlock(); 
     return value; 
    } 

    public T dequeue() { 
     T value; 
     changeLock.lock(); 
     value = takeFromFront(); 
     changeLock.unlock(); 
     return value; 
    } 

    public void enqueue(T toPut) { 
     changeLock.lock(); 
     addToEnd(toPut); 

     // Notify about new element in queue. 
     synchronized (sync) { 
      sync.notifyAll(); 
      changeLock.unlock(); 
     } 

    } 

    public int size() { 
     return _size; 
    } 
} 

** ** Main.java

public class Main { 
    public static void main(String[] args) throws InterruptedException { 
     final BlockingStore<String> q = new BlockingStore<String>(); 

     new Thread(new Runnable() { 
      public String name = "Thread 1: "; 
      public String to = "Thread 2: "; 

      public void print(String message) { 
       System.out.println(name + message); 
      } 

      @Override 
      public void run() { 
       while (true) { 
        String value = q.takeOrWait(name); 
        print("Got: " + value); 
        q.enqueue(to); 
       } 
      } 
     }).start(); 

     new Thread(new Runnable() { 
      public String name = "Thread 2: "; 
      public String to = "Thread 3: "; 

      public void print(String message) { 
       System.out.println(name + message); 
      } 

      @Override 
      public void run() { 
       while (true) { 
        String value = q.takeOrWait(name); 
        print("Got: " + value); 
        q.enqueue(to); 
       } 
      } 
     }).start(); 

     new Thread(new Runnable() { 
      public String name = "Thread 3: "; 
      public String to = "Thread 1: "; 

      public void print(String message) { 
       System.out.println(name + message); 
      } 

      @Override 
      public void run() { 
       while (true) { 
        String value = q.takeOrWait(name); 
        print("Got: " + value); 
        q.enqueue(to); 
       } 
      } 
     }).start(); 

     Thread.sleep(1000); 
     System.out.println("Main: Sending new message to queue for thread 1"); 
     q.enqueue("Thread 1: "); 

    } 

} 

回答

1

的问题是 - 我想 - 我必须解除锁定修改,随后来电sync.wait()

听起来像丢失通知。了解sync.notify()什么都不做,如果没有其他线程已经在sync.wait()被阻止。 sync对象不记得它已被通知。

这不起作用(根据您的例子):

public void waitForFlag() { 
    ... 
    while (! flag) { 
     synchronized (sync) { 
      try { 
       sync.wait(); 
      } catch (InterruptedException e) { ... } 
     } 
    } 
} 

public void setFlag() { 
    flag = true; 
    synchronized (sync) { 
     sync.notifyAll(); 
    } 
} 

假设线程A调用waitForFlag(),发现标志是假的,然后被抢占。然后,线程B调用setFlag(),不通知任何人。最后,线程A调用sync.wait()。

现在你已经设置了标志,并且线程A在wait()调用中被阻塞,等待有人设置标志。这就是丢失通知的样子。

下面是它应该是什么样子:

public void waitForFlag() { 
    ... 
    synchronized(sync) { 
     while (! flag) { 
      try { 
       sync.wait(); 
      } catch (InterruptedException e) { ... } 
     } 
    } 
} 


public void setFlag() { 
    synchronized (sync) { 
     flag = true; 
     sync.notifyAll(); 
    } 
} 

这种方式,通知不能丢,因为设置标志的声明和测试标志的声明都 synchronized块。