2014-09-01 59 views
4

这是我第一次在Java中创建一个多线程应用程序,它将持续运行直到取消,并且无法关闭/中断我的线程。如何中断循环中阻塞的可调用线程?

我有一些线程与Mediator进行通信,该Mediator封装了TransferQueue和ExecutorService,并且促进了生产线程和消费线程之间的通信。

我使用这个调解器而不是Future,因为TransferQueue对于处理多个生产者的单个消费者(生产者线程可以随时调用mediator.put(E e))和消费者线程可以在E e = mediator.take()上等待某些东西可用),并且我不想浪费CPU周期轮询。

设计非常干净,快速和有效,但是我在中断阻塞queue.take(),serverSocket.accept()并中断整个线程时遇到了问题。


生产者:

public class SocketProducer implements Colleague<Socket> { 
    private Mediator<Socket> mediator; 
    private ServerSocket serverSocket; 
    private Integer listeningPort; 

    private volatile boolean runnable = true; 

    public SocketProducer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Colleague<Socket> setListeningPort(Integer listeningPort) { 
     this.listeningPort = listeningPort; 
     return this; 
    } 

    public Void call() throws Exception { 
     serverSocket = new ServerSocket(listeningPort, 10); 

     while (runnable) { 
      Socket socket = serverSocket.accept(); // blocks until connection 

      mediator.putIntoQueue(socket); 
     } 

     return null; 
    } 

    public void interrupt() { 
     // ? 
     runnable = false; 
     serverSocket.close(); 
     // ? 
    } 
} 

和消费者:

private class SocketConsumer implements Colleague<Socket> { 
    private Mediator<Socket> mediator; 
    private volatile boolean runnable = true; 

    public SomeConsumer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Void call() throws Exception { 
     while (runnable) { 
      Socket socket = mediator.takeFromQueue(); // blocks until element is in queue 
     } 

     return null; 
    } 

    public void interrupt() { 
     // ? 
     runnable = false; 
     // ? 
    } 
} 

同事接口只是扩展可赎回,给一些额外的功能给中介管理其生产/消费者同事(即:要求:每个同事。中断())。

我已经尝试了很多方法,在各个地方抛出InterruptedException,在各个地方捕获InterruptedException,让线程返回其中断的中介的线程实例。我所尝试过的所有东西都是如此无效,以至于我觉得我错过了这个难题的一些关键部分。

到目前为止,我所见过的最有效的方法是毒丸(如果队列没有将NPE置于空插入状态,这将非常棒),以及我尝试引入毒物的所有方法由于ClassCastException(尝试将Object转换为套接字,试图实例化一个通用套接字等)而失败。

我真的不知道该从哪里出发。我真的希望能够按需要干净地终止这些线程。


完成的解决方案:

public class SocketProducer implements Colleague<Socket> { 
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName()); 
    private Mediator<Socket> mediator; 
    private ServerSocket serverSocket; 
    private Integer listeningPort; 

    private volatile boolean runnable = true; 

    public SocketProducer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Colleague<Socket> setListeningPort(Integer listeningPort) { 
     this.listeningPort = listeningPort; 
     return this; 
    } 

    public Void call() throws Exception { 
     serverSocket = new ServerSocket(listeningPort, 10); 
     logger.info("Listening on port " + listeningPort); 

     while (runnable) { 
      try { 
       Socket socket = serverSocket.accept(); 
       logger.info("Connected on port " + socket.getLocalPort()); 
       mediator.putIntoQueue(socket); 
      } catch (SocketException e) { 
       logger.info("Stopped listening on port " + listeningPort); 
      } 
     } 

     return null; 
    } 

    public void interrupt() { 
     try { 
      runnable = false; 
      serverSocket.close(); 
     } catch (IOException e) { 
      logger.error(e); 
     } 
    } 

} 

public class SocketConsumer implements Colleague<Socket> { 
    private static final Logger logger = getLogger(SocketConsumer.class.getName()); 
    private Mediator<Socket> socketMediator; 

    public SocketConsumer(Mediator<Socket> mediator) { 
     this.socketMediator = mediator; 
    } 

    public Void call() throws Exception { 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
       Socket socket = socketMediator.takeFromQueue(); 
       logger.info("Received socket on port: " + socket.getLocalPort()); 
      } catch (InterruptedException e) { 
       logger.info("Interrupted."); 
       Thread.currentThread().interrupt(); 
      } 
     } 
     return null; 
    } 

    public void interrupt() { 
     Thread.currentThread().interrupt(); 
    } 

} 
+3

我现在可以看到的第一个显而易见的事情是,你在'runnable'字段旁边缺少'volatile'。由于您尝试中断另一个线程的线程,因此您需要确保字段更改的可见性。我也会把它变成普通的'boolean',而不是'Boolean'。 – 2014-09-01 20:13:17

+0

也许我可以将Socket和ServerSocket封装到它们自己的类中,这将允许我创建它们的独特毒药实例吗? – CodeShaman 2014-09-01 20:23:18

+0

如果你调用'ServerSocket#close()',那么阻塞的'accept()'调用会抛出一个异常......这样做有帮助吗? – 2014-09-01 20:30:33

回答

2

我认为毒丸只会让事情变得更加复杂,所以我会保持简单。

至于ServerSocket,this answer建议调用close()应该足以中断它。

至于BlockingQueue,消费者可以是这样的:

// you can use volatile flag instead if you like 
while (!Thread.currentThread.isInterrupted()) { 
    try { 
     Object item = queue.take(); 
     // do something with item 
    } catch (InterruptedException e) { 
     log.error("Consumer interrupted", e); 
     Thread.currentThread().interrupt(); // restore flag 
    } 
} 

然后在你的Mediator你可以调用一个消费者线程interrupt()

+0

关闭ServerSocket将不会产生IMO作为[TranferQueue.transfer(E)-method](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue .html#transfer%28E%29)似乎是阻止顺序。关闭套接字将会关闭套接字将提供的流,但不会将套接字从队列中移除。因此,无论是中介(队列)需要中断还是替代阻塞语句,都可以使用轮询/阻塞超时方法。 – 2014-09-01 20:45:31

+0

@RomanVottner在生产者中实际上有两个阻塞操作(套接字和队列),都需要处理。 – 2014-09-01 20:49:15

+0

我发誓我已经尝试过这个,但突然它奇妙地工作。我想我希望有一种解决方案适用于这两种情况,但幸好我可以解决这个问题。 消费者中断干净,但我必须使用一个runnable标志并关闭生产者的套接字。它完全忽略了线程中断。 – CodeShaman 2014-09-01 20:54:51

1

毒丸是直线前进。

private static Socket poisonPill = new Socket(); 

public Void call() throws Exception { 
    while (runnable) { 
     Socket socket = mediator.takeFromQueue(); // blocks until element is in queue 
     if (socket == poisonPill) { 
      // quit the thread... 
     } 
    } 

    return null; 
} 

注意socket == poisonPill。这是一个平等检查,他们是完全相同的实例,所以这是poisonPill的工作方式仍然是类型安全的。