我试图创建一个Tcp服务器,它接受入站连接,并异步向连接的客户端发送消息。 有一个Tcp服务器的示例,但它使用网关,它是请求/响应,不支持异步。与Spring集成框架的Tcp连接
我的目标,
- 服务器听插座,例如9000
- 一个tcp客户端连接到9000
- 服务器接受连接和接收消息。 (使用
TcpReceivingChannelAdapter
?) - 服务器保持连接/套接字并记下ip_connectId标头。
- 当某个事件或计划任务为客户端产生消息时,它查找ip_connectId并向该客户端发送消息。 (使用
TcpSendingMessageHandler
?)
从参考文档中,我应该使用协作出站和入站通道适配器。但没有java配置示例。我不明白如何用java配置来做到这一点,尤其是如何以及在哪里寻找客户端发送。
我需要两个通道吗?一个用于入站,一个用于出站? inboundAdapter-> fromTcpChannel->消费 生产者 - > outboundAdapter-> toTcpChannel
做我创建ServiceActivator
或端点充当生产者/消费者? 默认弹簧集成是否保持连接活着?并且当我需要发送消息给它时,只需将ip_connectId头添加到消息中? 我是否使用TcpSendingMessageHandler
将消息发送给客户端,还是需要实施gateway
?
清理我的代码并在Gary的帮助后再次测试,这是我的代码。
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class IntegrationConfig implements
ApplicationListener<TcpConnectionEvent> {
@Value("${listen.port:8000}")
private int port;
@Bean //for accepting text message from TCP, putty
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean //for sending text message to TCP client, outbound
public MessageChannel toTcp() {
return new DirectChannel();
}
// receive from MVC controller
@Bean
public MessageChannel invokeChannel() {
return new DirectChannel();
}
@Bean //inbound, it is working, I could read the inbound message while debugging
public TcpReceivingChannelAdapter in(
AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setOutputChannel(fromTcp());
adapter.setConnectionFactory(connectionFactory);
return adapter;
}
//transform TCP bytes to string message, working
@Transformer(inputChannel = "fromTcp", outputChannel = "toCollaborate")
public String convert(byte[] bytes) {
return new String(bytes);
}
MessageHeaders staticheader; //save ip_connectinId, use this to collaborate outbound message later, for testing purpose only
@ServiceActivator(inputChannel = "toCollaborate", outputChannel = "toTcp")
public Message<String> handleTcpMessage(Message<String> stringMsg) {
staticheader = stringMsg.getHeaders();
return stringMsg;
// save the header, collaborate to output channel
}
//collaborate message from REST API invokeChannel to a outbound tcp client, this fail
@Transformer(inputChannel = "invokeChannel", outputChannel = "toTcp")
public Message<String> headerBeforeSend(String test) {
GenericMessage<String> msg = new GenericMessage<String>(
"from rest api");
if (staticheader != null) {
MessageBuilder
.fromMessage(msg)
.setHeader("ip_connectionId",
staticheader.get("ip_connectionId")).build();
}
return msg;
}
@ServiceActivator(inputChannel = "toTcp")
@Bean
public TcpSendingMessageHandler out(
AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler();
tcpOutboundAdp.setConnectionFactory(connectionFactory);
return tcpOutboundAdp;
}
// should need only 1 factory? and keep connectin alive
// server for in coming connection
@Bean
public AbstractServerConnectionFactory serverCF() {
return new TcpNetServerConnectionFactory(this.port);
}
@Override
public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
// TODO Auto-generated method stub
TcpConnection source = (TcpConnection) tcpEvent.getSource();
}
}
//The MVC controller
@Autowired
MessageChannel invokeChannel;
@RequestMapping(value="/invoke")
public String sayHello()
{
//trigger gateway to send a message
String msg = "hello";
MessagingTemplate template = new MessagingTemplate();
template.send(invokeChannel, new GenericMessage<String>(msg));
return msg;
}
测试结果: 1.油灰连接确定,发送文本消息 2. SI接收消息确定 3.使用REST API localhost/webappname/rest/invoke
将消息发送到invokeChannel,OK 4. transformer
组消息头 5.异常作为遵循
例外org.springframework.web.util.NestedServletException:请求失败 处理;嵌套的例外是 org.springframework.messaging.MessageHandlingException:无法找到 出境插座 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) org.springframework.web.servlet.FrameworkServlet.doGet( FrameworkServlet.java:860) javax.servlet.http.HttpServlet.service(HttpServlet.java:622) org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:845) javax.servlet.http.HttpServlet 。服务(HttpServlet.java:729) org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
另请参见参考手册中的'@ Configuration'样本:http://docs.spring.io /spring-integration/reference/html/ip.html#ip-annotation –
加里,我还是不完全明白。 当我创建'TcpSendingMessageHandler' bean时,我应该为其参数分配一个'AbstractServerConnectionFactory'对象吗? 当有消息输出到'toClientChannel'通道时,如何在out()中获取这个消息对象? 如何通过此'out()'处理程序中的TCP发送消息? –
如何从'newRequest()'返回一个频道?我的频道是在“Endpoint”类的外部定义的。 @EnableIntegration @IntegrationComponentScan @Configuration 公共类IntegrationConfig { @MessageEndpoint \t公共静态类RelayService { @Bean 公共TcpReceivingChannelAdapter()的(){ ... adapter.setOutputChannel(newRequests() ); } } } –