2010-05-13 107 views
1

我正在从事一个基本上是聊天室的java程序。这是班级的任务,所以没有代码请,我只是有一些问题,确定处理我需要做的最可行的方法。我已经为使用线程获取数据输入流的单个客户端设置了服务器程序,并且有一个线程用于处理数据输出流上的发送。我现在需要做的是为每个传入请求创建一个新线程。使用线程来处理套接字

我的想法是创建一个链表来包含客户端套接字或可能的线程。我磕磕绊绊的地方是搞清楚如何处理将消息发送给所有客户端。如果我为每个传入消息都有一个线程,那么我该如何转向并将其发送到每个客户端套接字。

我在想,如果我有一个clientsockets的链表,我可以遍历这个链表并将它发送给每个链表,但是我必须每次都创建一个dataoutputstream。我可以创建一个数据输出流的链表吗​​?对不起,如果听起来像我漫不经心,但我不想开始编码这个,它可能会得到混乱,没有一个好的计划。谢谢!

编辑 我决定发布我到目前为止的代码。我还没有机会测试它,所以任何评论都会很棒。谢谢!

import java.io.BufferedReader; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.Socket; 
import java.net.ServerSocket; 
import java.util.LinkedList; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class prog4_server { 

    // A Queue of Strings used to hold out bound Messages 
    // It blocks till on is available 
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>(); 

    // A linked list of data output streams 
    // to all the clients 
    static LinkedList<DataOutputStream> outputstreams; 

    // public variables to track the number of clients 
    // and the state of the server 
    static Boolean serverstate = true; 
    static int clients = 0; 

    public static void main(String[] args) throws IOException{ 

     //create a server socket and a clientSocket 
     ServerSocket serverSocket = null; 
     try { 
      serverSocket = new ServerSocket(6789); 
     } catch (IOException e) { 
      System.out.println("Could not listen on port: 6789"); 
      System.exit(-1); 
     }// try{...}catch(IOException e){...} 

     Socket clientSocket; 

     // start the output thread which waits for elements 
     // in the message queue 
     OutputThread out = new OutputThread(); 
     out.start(); 

     while(serverstate){ 

      try { 

       // wait and accept a new client 
       // pass the socket to a new Input Thread 
       clientSocket = serverSocket.accept(); 
       DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream()); 
       InputThread in = new InputThread(clientSocket, clients); 
       in.start(); 
       outputstreams.add(ServerOut); 

      } catch (IOException e) { 

       System.out.println("Accept failed: 6789"); 
       System.exit(-1); 
      }// try{...}catch{..} 

      // increment the number of clients and report 
      clients = clients++; 

      System.out.println("Client #" + clients + "Accepted"); 

     }//while(serverstate){... 

    }//public static void main 

    public static class OutputThread extends Thread { 

     //OutputThread Class Constructor 
     OutputThread() { 
     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      while(!this.interrupted()) { 

       try { 

        msg = outboundMessages.take(); 

        for(int i=0;i<outputstreams.size();i++){ 

         outputstreams.get(i).writeBytes(msg + '\n'); 

        }// for(...){... 

       } catch (IOException e) { 

        System.out.println(e); 

       } catch (InterruptedException e){ 

        System.out.println(e); 

       }//try{...}catch{...} 

      }//while(...){ 

     }//public void run(){... 

    }// public OutputThread(){... 

    public static class InputThread extends Thread { 

     Boolean threadstate = true; 
     BufferedReader ServerIn; 
     String user; 
     int threadID; 
     //SocketThread Class Constructor 
     InputThread(Socket clientSocket, int ID) { 

      threadID = ID; 

      try{ 
       ServerIn = new BufferedReader(
        new InputStreamReader(clientSocket.getInputStream())); 
        user = ServerIn.readLine(); 
      } 
      catch(IOException e){ 
       System.out.println(e); 
      } 

     }// InputThread(...){... 

     public void run() { 

      String msg = null; 

     while (threadstate) { 

       try { 

        msg = ServerIn.readLine(); 

        if(msg.equals("EXITEXIT")){ 

         // if the client is exiting close the thread 
         // close the output stream with the same ID 
         // and decrement the number of clients 
      threadstate = false; 
         outputstreams.get(threadID).close(); 
         outputstreams.remove(threadID); 
         clients = clients--; 
         if(clients == 0){ 
          // if the number of clients has dropped to zero 
          // close the server 
          serverstate = false; 
          ServerIn.close(); 
         }// if(clients == 0){... 
        }else{ 

         // add a message to the message queue 
         outboundMessages.add(user + ": " + msg); 

        }//if..else... 

       } catch (IOException e) { 

        System.out.println(e); 

       }// try { ... } catch { ...} 

     }// while 

     }// public void run() { ... 
    } 

    public static class ServerThread extends Thread { 

     //public variable declaration 
     BufferedReader UserIn = 
       new BufferedReader(new InputStreamReader(System.in)); 

     //OutputThread Class Constructor 
     ServerThread() { 

     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      try { 

       //while loop will continue until 
       //exit command is received 
       //then send the exit command to all clients 

       msg = UserIn.readLine(); 

       while (!msg.equals("EXITEXIT")) { 

        System.out.println("Enter Message: "); 
        msg = UserIn.readLine(); 

       }//while(...){ 

       outboundMessages.add(msg); 
       serverstate = false; 
       UserIn.close(); 

      } catch (IOException e) { 
       System.out.println(e); 

      }//try{...}catch{...} 


     }//public void run(){... 
    }// public serverThread(){... 

}// public class prog4_server 
+1

“每个请求的线程数”或“每个线程的线程数”不会扩展 - 请考虑5K客户端连接到服务器时会发生什么情况。 – 2010-05-13 16:29:15

+0

我不确定你的意思,你是说我需要限制线程数? – Levi 2010-05-13 16:46:03

+0

尼古拉,如果你不提供更好的主意,你的'抱怨'有什么意义?我也会有兴趣知道如何正确地'缩放':) – 2010-05-13 16:46:25

回答

3

我已经通过定义每个客户端连接的“MessageHandler”级,负责入站/出站邮件流量在过去的解决了这个问题。处理程序在内部使用一个BlockingQueue实现,出站消息放置在该实现上(通过内部工作线程)。 I/O发送者线程不断尝试从队列中读取(如果需要,阻塞),并将检索到的每个消息发送给客户端。

这里的一些骨架示例代码(未测试):

/** 
* Our Message definition. A message is capable of writing itself to 
* a DataOutputStream. 
*/ 
public interface Message { 
    void writeTo(DataOutputStream daos) throws IOException; 
} 

/** 
* Handler definition. The handler contains two threads: One for sending 
* and one for receiving messages. It is initialised with an open socket. 
*/  
public class MessageHandler { 
    private final DataOutputStream daos; 
    private final DataInputStream dais; 
    private final Thread sender; 
    private final Thread receiver; 
    private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>(); 

    public MessageHandler(Socket skt) throws IOException { 
    this.daos = new DataOutputStream(skt.getOutputStream()); 
    this.dais = new DataInputStream(skt.getInputStream()); 

    // Create sender and receiver threads responsible for performing the I/O. 
    this.sender = new Thread(new Runnable() { 
     public void run() { 
     while (!Thread.interrupted()) { 
      Message msg = outboundMessages.take(); // Will block until a message is available. 

      try { 
      msg.writeTo(daos); 
      } catch(IOException ex) { 
      // TODO: Handle exception 
      } 
     } 
     } 
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress())); 

    this.receiver = new Thread(new Runnable() { 
     public void run() { 
     // TODO: Read from DataInputStream and create inbound message. 
     } 
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress())); 

    sender.start(); 
    receiver.start(); 
    } 

    /** 
    * Submits a message to the outbound queue, ready for sending. 
    */ 
    public void sendOutboundMessage(Message msg) { 
    outboundMessages.add(msg); 
    } 

    public void destroy() { 
    // TODO: Interrupt and join with threads. Close streams and socket. 
    } 
} 

注意尼古拉是在I/O使用1(或2)每连接线程不是一个可扩展的解决方案,并且通常应用可能是阻挡正确使用Java NIO编写来解决这个问题。但是,实际上,除非你正在编写一个有数千个客户端同时连接的企业服务器,否则这不是一个真正的问题。使用Java NIO编写无错误的可伸缩应用程序是困难,当然不是我推荐的东西。

+0

谢谢。我之前从未使用过阻塞队列,但我可以看到它如何适合我的原始设计。我知道这不是可扩展的,但这不属于本作业的范围。 我看到的是,我最初使用两个线程可以使用全局消息变量并在输出线程中阻塞队列,以便在所有活动客户端端口上发送消息。 我打算看看这是否可行。我也需要找到一种方法来关闭与一个套接字相关的线程。 – Levi 2010-05-13 17:16:47

+0

是的 - 您可以创建一个消息,然后将其传递给每个处理程序。要关闭你的线程,你需要调用thread.interrupt(),然后再调用thread.join()。然而,为了这个工作,重要的是两个线程通过Thread.interrupted()定期检查它们的中断状态。 – Adamski 2010-05-13 17:34:44

+0

我之前的版本基于用户输入而退出。这对于输入线程来说很简单,但我很难看到如何保持该输入线程与其相应的输出线程之间的关联。 再次感谢您的帮助,我喜欢学习新的更好的方式来做事情。 – Levi 2010-05-13 18:04:29