这是我第一次在Java中创建一个多线程应用程序,它将持续运行直到取消,并且无法关闭/中断我的线程。如何中断循环中阻塞的可调用线程?
我有一些线程与Mediator进行通信,该Mediator封装了TransferQueue和ExecutorService,并且促进了生产线程和消费线程之间的通信。
我使用这个调解器而不是Future,因为TransferQueue对于处理多个生产者的单个消费者(生产者线程可以随时调用mediator.put(E e))和消费者线程可以在E e = mediator.take()上等待某些东西可用),并且我不想浪费CPU周期轮询。
设计非常干净,快速和有效,但是我在中断阻塞queue.take(),serverSocket.accept()并中断整个线程时遇到了问题。
生产者:
public class SocketProducer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
while (runnable) {
Socket socket = serverSocket.accept(); // blocks until connection
mediator.putIntoQueue(socket);
}
return null;
}
public void interrupt() {
// ?
runnable = false;
serverSocket.close();
// ?
}
}
和消费者:
private class SocketConsumer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private volatile boolean runnable = true;
public SomeConsumer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Void call() throws Exception {
while (runnable) {
Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
}
return null;
}
public void interrupt() {
// ?
runnable = false;
// ?
}
}
同事接口只是扩展可赎回,给一些额外的功能给中介管理其生产/消费者同事(即:要求:每个同事。中断())。
我已经尝试了很多方法,在各个地方抛出InterruptedException,在各个地方捕获InterruptedException,让线程返回其中断的中介的线程实例。我所尝试过的所有东西都是如此无效,以至于我觉得我错过了这个难题的一些关键部分。
到目前为止,我所见过的最有效的方法是毒丸(如果队列没有将NPE置于空插入状态,这将非常棒),以及我尝试引入毒物的所有方法由于ClassCastException(尝试将Object转换为套接字,试图实例化一个通用套接字等)而失败。
我真的不知道该从哪里出发。我真的希望能够按需要干净地终止这些线程。
完成的解决方案:
public class SocketProducer implements Colleague<Socket> {
private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
logger.info("Listening on port " + listeningPort);
while (runnable) {
try {
Socket socket = serverSocket.accept();
logger.info("Connected on port " + socket.getLocalPort());
mediator.putIntoQueue(socket);
} catch (SocketException e) {
logger.info("Stopped listening on port " + listeningPort);
}
}
return null;
}
public void interrupt() {
try {
runnable = false;
serverSocket.close();
} catch (IOException e) {
logger.error(e);
}
}
}
public class SocketConsumer implements Colleague<Socket> {
private static final Logger logger = getLogger(SocketConsumer.class.getName());
private Mediator<Socket> socketMediator;
public SocketConsumer(Mediator<Socket> mediator) {
this.socketMediator = mediator;
}
public Void call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
Socket socket = socketMediator.takeFromQueue();
logger.info("Received socket on port: " + socket.getLocalPort());
} catch (InterruptedException e) {
logger.info("Interrupted.");
Thread.currentThread().interrupt();
}
}
return null;
}
public void interrupt() {
Thread.currentThread().interrupt();
}
}
我现在可以看到的第一个显而易见的事情是,你在'runnable'字段旁边缺少'volatile'。由于您尝试中断另一个线程的线程,因此您需要确保字段更改的可见性。我也会把它变成普通的'boolean',而不是'Boolean'。 – 2014-09-01 20:13:17
也许我可以将Socket和ServerSocket封装到它们自己的类中,这将允许我创建它们的独特毒药实例吗? – CodeShaman 2014-09-01 20:23:18
如果你调用'ServerSocket#close()',那么阻塞的'accept()'调用会抛出一个异常......这样做有帮助吗? – 2014-09-01 20:30:33