让我们假设所有那些处理同步的Java类都不存在,因为这是一个综合性任务,并且您要处理的只有sychronized
,wait
和notify
。
用简单的话来回答的第一个问题是:“谁会等待什么?”
- 下载线程将等待URL下载。
- 调用者将等待该下载线程的结果。
这是什么意思?我们需要在调用者和下载线程(您的urlData
)之间至少有一个同步元素,并且为方便起见,还应该有一个数据对象处理下载数据,并检查下载是否尚未完成。
所以会发生的具体步骤:
来电请求新的下载。
create:DownloadResult
write:urlData(url - > DownloadResult)
唤醒urlData上的1个线程。
线程X必须找到数据来下载并处理它或/然后再次入睡。
读:urlData(找到第一未处理DownloadResult,否则等待urlData)
写:DownloadResult(获取它)
写:DownloadResult(下载结果)
通知:任何人在等待DownloadResult
重复
来电必须能够异步检查/等待下载结果。
读:urlData
读:DownloadResult访问对象urlData或DownloadResult时(上DownloadResult如果需要等待)
由于有读取并从在这些对象上的不同线程写入,同步是必需的。
又会有一个等待/通知协会:
- 主叫 - > urlData - > DownTh
- DownTh - > DownloadResult - >主叫
经过仔细分析下面的代码会满足要求:
public class DownloadResult {
protected final URL url; // this is for convenience
protected boolean inProgress;
protected byte[] result;
public DownloadResult(final URL url) {
this.url = url;
this.inProgress = false;
}
/* Try to lock this against tother threads if not already acquired. */
public synchronized boolean acquire() {
if (this.inProgress == false) {
this.inProgress = true;
return true;
} else {
return false;
}
}
public void download() {
final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
synchronized (this) {
this.result = downloadedBytes;
this.notifyAll(); // wake-up ALL callers
}
}
public synchronized byte[] getResult() throws InterruptedException {
while (this.result == null) {
this.wait();
}
return this.result;
}
}
protected class DownTh extends Thread {
protected final Map<URL, DownloadResult> urlData;
public DownTh(final Map<URL, DownloadResult> urlData) {
this.urlData = urlData;
this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
}
protected DownloadResult getTask() {
for (final DownloadResult downloadResult : urlData.values()) {
if (downloadResult.acquire()) {
return downloadResult;
}
}
return null;
}
@Override
public void run() {
DownloadResult downloadResult;
try {
while (true) {
synchronized (urlData) {
while ((downloadResult = this.getTask()) == null) {
urlData.wait();
}
}
downloadResult.download();
}
} catch (InterruptedException ex) {
// can be ignored
} catch (Error e) {
// log here
}
}
}
public class Downloader {
protected final Map<URL, DownloadResult> urlData = new HashMap<>();
// insert constructor that creates the threads here
public DownloadResult download(final URL url) {
final DownloadResult result = new DownloadResult(url);
synchronized (urlData) {
urlData.putIfAbsent(url, result);
urlData.notify(); // only one thread needs to wake up
}
return result;
}
public byte[] getData(final URL url) throws InterruptedException {
DownloadResult result;
synchronized (urlData) {
result = urlData.get(url);
}
if (result != null) {
return result.getResult();
} else {
throw new IllegalStateException("URL " + url + " not requested.");
}
}
}
在真正的Java中,事情将是d换句话说,通过使用Concurrent类和/或Atomic类,所以这仅仅是为了教育目的。有关进一步阅读,请参阅“可调用的未来”。