我实施应遵循这些要求的自定义数据结构:让两个操作原子
- 元素应该是搜索(例如,查找和检索根据
.equals()
元素) - 当我尝试找回一个元素,它不会被发现该方法应该阻止,直到这样的元素可用。如果从不,该方法永远阻塞或给定超时。
我已经实现了我自己的类 - 我开始修改LinkedBlockingQueue - 但我有一个点,我可以有一个错误的交错,我不知道如何避免。
要测试数据结构我创建了3个线程的方法。这些线程中的每一个都会尝试接收一个唯一的字符串(例如,第一个线程的“Thread1”等)。如果在数据存储中找到该字符串,它将为下一个线程插入一个字符串(例如,线程1将插入“Thread2”)。
这样,一个线程将阻塞且仅当它的信息是在商店发送消息。首先,我开始创建线程后,手动将“Thread1”添加到商店。这应该触发线程1得到他的价值了店门和线程插入值,那么2.线程2应通知并得到他的价值和线程3等插入值..
然而,我注意到,循环在传递消息的随机时间后停止。这是 - 我认为 - 由于在takeOrWait()
方法中可能出现的错误交错(如此处所示)。我尝试了几种解决方案,但我无法想出一个办法。
问题是 - 我认为 - 我必须释放要修改的锁,然后调用sync.wait()
。在这些调用之间,线程可以在队列中插入一个元素,这会导致所有等待线程错过通知。
是否有可能引发wait()
之前我解除锁定?
对于completness的缘故我已经加入我用它来测试Main.java类。
BlockingDataStore
public class BlockingStore<T> {
// Linked list node.
static class Node<E> {
/**
* The item, volatile to ensure barrier separating write and read
*/
volatile E item;
Node<E> next;
Node(E x) {
item = x;
}
}
private Node<T> _head;
private Node<T> _lastPtr;
private int _size;
// Locks
private Lock changeLock = new ReentrantLock();
private final Object sync = new Object();
//////////////////////////////////
// CONSTRUCTOR //
//////////////////////////////////
public BlockingStore() {
_head = null;
_lastPtr = null;
}
//////////////////////////////////
// INTERNAL MODIFICATION //
//////////////////////////////////
/**
* Locates an element in the storage and removes it.
* Returns null if the element is not found in the list.
*
* @param toRemove Element to remove.
* @return Returns the removed element.
*/
private T findAndRemove(T toRemove) {
T result = null;
// Empty queue.
if (_head == null)
return result;
// Do we want the head?
if (_head.item.equals(toRemove)) {
result = _head.item;
_head = _head.next;
this._size--;
return result;
}
Node<T> previous = _head;
Node<T> current = previous.next;
while (current != null) {
if (current.item.equals(toRemove)) {
// Take the element out of the list.
result = current.item;
// we have to update the last pointer.
if (current == _lastPtr)
_lastPtr = previous.next;
else
previous.next = current.next;
this._size--;
return result;
}
previous = current;
current = current.next;
}
return result;
}
/**
* Adds an element to the end of the list.
*
* @param toAdd Element to add to the end of the list.
*/
private void addToEnd(T toAdd) {
// If the queue is empty
if (_head == null) {
_head = new Node<T>(toAdd);
_lastPtr = _head;
} else {
_lastPtr.next = new Node<T>(toAdd);
_lastPtr = _lastPtr.next;
}
this._size++;
}
/**
* Takes an element from the front of the list.
* Returns null if list is empty.
*
* @return Element taken from the front of the list.
*/
private T takeFromFront() {
// Check for empty queue.
if (_head == null)
return null;
T result = _head.item;
_head = _head.next;
this._size--;
return result;
}
//////////////////////////////////
// API METHODS //
//////////////////////////////////
/**
* Takes an element from the datastore,
* if it is not found the method blocks
* and retries every time a new object
* is inserted into the store.
*
* @param toTake
* @return
*/
public T takeOrWait(T toTake) {
T value;
changeLock.lock();
value = findAndRemove(toTake);
// Wait until somebody adds to the store
// and then try again.
while (value == null)
// Sync on the notification object
// such that we are waken up when there
// is a new element.
synchronized (sync) {
changeLock.unlock(); // allow writes.
// I think I can have bad inter-leavings here.
// If an insert is interleaved here, the thread
// will never be notified..
try {
sync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
changeLock.lock();
value = findAndRemove(toTake);
}
changeLock.unlock();
return value;
}
public T dequeue() {
T value;
changeLock.lock();
value = takeFromFront();
changeLock.unlock();
return value;
}
public void enqueue(T toPut) {
changeLock.lock();
addToEnd(toPut);
// Notify about new element in queue.
synchronized (sync) {
sync.notifyAll();
changeLock.unlock();
}
}
public int size() {
return _size;
}
}
** ** Main.java
public class Main {
public static void main(String[] args) throws InterruptedException {
final BlockingStore<String> q = new BlockingStore<String>();
new Thread(new Runnable() {
public String name = "Thread 1: ";
public String to = "Thread 2: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
new Thread(new Runnable() {
public String name = "Thread 2: ";
public String to = "Thread 3: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
new Thread(new Runnable() {
public String name = "Thread 3: ";
public String to = "Thread 1: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
Thread.sleep(1000);
System.out.println("Main: Sending new message to queue for thread 1");
q.enqueue("Thread 1: ");
}
}