2016-09-23 119 views
1

我需要与在给定端口上作为服务器运行的C++应用程序对话。它公开了一个二进制API(协议缓冲区)以获得更好的性能。我的RESTful服务是在Spring MVC和Jersey中开发的,并希望使用这个新功能。我已经能够成功地使用和生成协议缓冲区消息。Java Web应用程序和C++服务器之间的套接字通信

在我的spring web应用程序中,我最初创建了一个Apache Commons Pool来创建一个套接字连接池。这是我在读/写插槽

更新1:添加PooledObjectFactory实施

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<Socket> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public Socket create() throws Exception { 
     return new Socket(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(Socket socket) { 
     return new DefaultPooledObject<>(socket); 
    } 

    @Override 
    public void destroyObject(final PooledObject<Socket> p) throws Exception { 
     final Socket socket = p.getObject(); 
     socket.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<Socket> p) { 
     final Socket socket = p.getObject(); 
     return socket != null && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 
} 

@Service 
@Scope("prototype") 
public class Gateway { 
    @Autowired 
    private GenericObjectPool pool; 

    public Response sendAndReceive(Request request) throws CommunicationException { 
     Response response = null; 
     final Socket socket = pool.borrowObject(); 
     try { 
      request.writeDelimitedTo(socket.getOutputStream()); 
      response = Response.parseDelimitedFrom(socket.getInputStream()); 
     } catch (Exception ex) { 
      LOGGER.error("Gateway error", ex); 
      throw new CommunicationException("Gateway error", ex); 
     } finally { 
      pool.returnObject(socket); 
     } 
     return response; 
    } 
} 

这适用于第一个请求,并在池中返回任何以前使用的插座被发现套接字已经关闭。这可能是因为不同的请求正在连接到相同的输入和输出流。如果我在阅读响应后关闭套接字,那么它会胜过共享的目的。如果我使用单例套接字并注入它,它能够处理第一个请求,然后超时。

如果我在每个实例上创建套接字,那么它可以工作,并且对于每个请求,性能大约为2500微秒。我的目标是在500微秒内获得性能。

考虑到要求,最好的方法是什么?

更新2:添加服务器和客户端

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.ServerSocket; 
import java.net.Socket; 

public class TcpServer1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpServer1.class.getName()); 

    public static void main(String[] args) throws Exception { 
     ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[0])); 
     Socket socket = null; 
     while (true) { 
      try { 
       socket = serverSocket.accept(); 
      } catch (IOException e) { 
       LOGGER.warn("Could not listen on port"); 
       System.exit(-1); 
      } 

      Thread thread = new Thread(new ServerConnection1(socket)); 
      thread.start(); 
     } 
    } 
} 

class ServerConnection1 implements Runnable { 

    static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class.getName()); 

    private Socket socket = null; 

    ServerConnection1(Socket socket) { 
     this.socket = socket; 
    } 

    public void run() { 
     try { 
      serveRequest(socket.getInputStream(), socket.getOutputStream()); 
      //socket.close(); 
     } catch (IOException ex) { 
      LOGGER.warn("Error", ex); 
     } 
    } 

    public void serveRequest(InputStream inputStream, OutputStream outputStream) { 
     try { 
      read(inputStream); 
      write(outputStream); 
     } catch (IOException ex) { 
      LOGGER.warn("ERROR", ex); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Response.Builder builder = Response.newBuilder(); 
     Response response = builder.setStatus("SUCCESS").setPing("PING").build(); 
     response.writeDelimitedTo(outputStream); 
     LOGGER.info("Server sent {}", response.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Request request = Request.parseDelimitedFrom(inputStream); 
     LOGGER.info("Server received {}", request.toString()); 
    } 
} 

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.Socket; 

public class TcpClient1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpClient1.class.getName()); 

    private Socket openConnection(final String hostName, final int port) { 
     Socket clientSocket = null; 
     try { 
      clientSocket = new Socket(hostName, port); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while connecting to server", e); 
     } 
     return clientSocket; 
    } 

    private void closeConnection(Socket clientSocket) { 
     try { 
      LOGGER.info("Closing the connection"); 
      clientSocket.close(); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while closing the connection", e); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Request.Builder builder = Request.newBuilder(); 
     Request request = builder.setPing("PING").build(); 
     request.writeDelimitedTo(outputStream); 
     LOGGER.info("Client sent {}", request.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Response response = Response.parseDelimitedFrom(inputStream); 
     LOGGER.info("Client received {}", response.toString()); 
    } 

    public static void main(String args[]) throws Exception { 
     TcpClient1 client = new TcpClient1(); 
     try { 
      Socket clientSocket = null; 

      LOGGER.info("Scenario 1 --> One socket for each call"); 
      for (int i = 0; i < 2; i++) { 
       clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
       OutputStream outputStream = clientSocket.getOutputStream(); 
       InputStream inputStream = clientSocket.getInputStream(); 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
       client.closeConnection(clientSocket); 
      } 

      LOGGER.info("Scenario 2 --> One socket for all calls"); 
      clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
      OutputStream outputStream = clientSocket.getOutputStream(); 
      InputStream inputStream = clientSocket.getInputStream(); 
      for (int i = 0; i < 2; i++) { 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
      } 
      client.closeConnection(clientSocket); 
     } catch (Exception e) { 
      LOGGER.warn("Exception occurred", e); 
      System.exit(1); 
     } 
    } 
} 

这里请求和响应协议缓冲区类。在方案1中,它能够处理两个调用,而在方案2中,它不会从第二个读取返回。似乎协议缓冲区API正在处理不同的流。下面的示例输出

17:03:10.508 [main] INFO c.d.e.socket.TcpClient1 - Scenario 1 --> One socket for each call 
17:03:10.537 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.698 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.731 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.732 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Scenario 2 --> One socket for all calls 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.735 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
+0

上没有足够的信息了'PooledObjectFactory'(由'GenericObjectPool'使用)的行为 - 也许'passivateObject'方法关闭套接字? –

+0

它是否也可能不是关闭连接的C++应用程序,比如说在闲置的特定时间段之后? – Gimby

+0

@Adrian新增工厂类别 – user2459396

回答

0

经过很大的痛苦,我才解决了这个问题。正在处理对套接字的读/写的类被定义为原型。所以一旦检索到套接字的引用,它就不会被清除(由Tomcat管理)。随后对套接字的后续调用被排队,然后超时并且Apache Commons Pool销毁该对象。

为了解决这个问题,我使用Socket的ThreadLocal创建了类SocketConnection。在处理方面,我创建了一个Callback来处理对套接字的读/写操作。下面的示例代码段:

class SocketConnection { 

    final private String identity; 
    private boolean alive; 
    final private ThreadLocal<Socket> threadLocal; 

    public SocketConnection(final String hostname, final int port) throws IOException { 
     this.identity = UUID.randomUUID().toString(); 
     this.alive = true; 
     threadLocal = ThreadLocal.withInitial(rethrowSupplier(() -> new Socket(hostname, port))); 
    } 

} 

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<SocketConnection> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 
    private SocketConnection connection = null; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public SocketConnection create() throws Exception { 
     LOGGER.info("Creating Socket"); 
     return new SocketConnection(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(SocketConnection socketConnection) { 
     return new DefaultPooledObject<>(socketConnection); 
    } 

    @Override 
    public void destroyObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
     socketConnection.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<SocketConnection> p) { 
     final SocketConnection connection = p.getObject(); 
     final Socket socket = connection.get(); 
     return connection != null && connection.isAlive() && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(true); 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
    } 

} 

class SocketCallback implements Callable<Response> { 

    private SocketConnection socketConnection; 
    private Request request; 

    public SocketCallback() { 
    } 

    public SocketCallback(SocketConnection socketConnection, Request request) { 
     this.socketConnection = socketConnection; 
     this.request = request; 
    } 

    public Response call() throws Exception { 
     final Socket socket = socketConnection.get(); 
     request.writeDelimitedTo(socket.getOutputStream()); 
     Response response = Response.parseDelimitedFrom(socket.getInputStream()); 
     return response; 
    } 

} 

@Service 
@Scope("prototype") 
public class SocketGateway { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketGateway.class); 

    @Autowired 
    private GenericObjectPool<SocketConnection> socketPool; 
    @Autowired 
    private ExecutorService executorService; 

    public Response eligibility(Request request) throws DataException { 
     EligibilityResponse response = null; 
     SocketConnection connection = null; 
     if (request != null) { 
      try { 
       connection = socketPool.borrowObject(); 
       Future<Response> future = executorService.submit(new SocketCallback(connection, request)); 
       response = future.get(); 
      } catch (Exception ex) { 
       LOGGER.error("Gateway error {}"); 
       throw new DataException("Gateway error", ex); 
      } finally { 
       socketPool.returnObject(connection); 
      } 
     } 

     return response; 
    } 

} 
相关问题