Spring集成多个UDP入站/出站通道

我正在尝试使用Spring引导构建一个部署在多个节点上的模块。 由于特定应用程序的时间限制,我必须使用UDP并且不能依赖Spring提供的易于使用的REST工具。

我必须能够将数据报发送到可能随时间变化的一组节点 (即,集合可能增大或缩小,或者某些节点可能移动到新的ip / port“坐标”)。 沟通必须是单播的

我一直在阅读有关TCP和UDP支持TCP和UDP支持的官方文档,但它相当……紧凑,不透明。 org.springframework.integration类上的javadoc也非常简短。 根据我的理解,“入站”通道用于发送数据包,而出站通道用于接收数据包。

到目前为止,我还没能找到入站的以下问题的答案(即“发送”频道,如果我理解的话): – 如何在运行时创建更多频道,将数据包发送到多个目的地? – 如果主机被移动,我应该销毁通道并设置一个新通道,还是可以在运行时更改通道的参数(目标IP /端口)?

对于出站通道(如果我理解的话,“接收”通道),我有与上述相似的问题,如: – 如何在运行时设置多个通道? – 如何在运行时更改现有通道的目标,而不是必须将其拆除并重新设置? – 我应该只打开/关闭“原始”UDP套接字吗?

您有入站和出站逆转。

这是一个应该为您提供所需内容的示例; 它使用发布/订阅频道进行广播…

@SpringBootApplication public class So48213450Application { private final Map registrations = new HashMap<>(); public static void main(String[] args) { SpringApplication.run(So48213450Application.class, args); } @Bean public PublishSubscribeChannel channel() { return new PublishSubscribeChannel(); } @Bean public ApplicationRunner runner(PublishSubscribeChannel channel) { return args -> { makeANewUdpAdapter(1234); makeANewUdpAdapter(1235); channel.send(MessageBuilder.withPayload("foo\n").build()); registrations.values().forEach(r -> { r.stop(); r.destroy(); }); }; } @Autowired private IntegrationFlowContext flowContext; public void makeANewUdpAdapter(int port) { System.out.println("Creating an adapter to send to port " + port); IntegrationFlow flow = IntegrationFlows.from(channel()) .handle(Udp.outboundAdapter("localhost", port)) .get(); IntegrationFlowRegistration registration = flowContext.registration(flow).register(); registrations.put(port, registration); } } 

结果:

 $ nc -u -l 1234 & [1] 56730 $ nc -u -l 1235 & [2] 56739 $ jobs [1]- Running nc -u -l 1234 & [2]+ Running nc -u -l 1235 & $ foo foo 

您无法在运行时更改参数,您必须创建新参数。

编辑

回应下面的评论……

你不能混合和匹配弹簧集成jar(2.1.x和5.0.x); 他们必须都是相同的版本。 我上面的例子使用了Boot 2.0.0.M7(boot 2计划在下个月发布)。

Udp工厂类已添加到5.0.0中的spring-integration-ip中。

下面是一个类似的示例(也添加了接收适配器),用于启动1.5.9和弹簧集成4.3.13 ……

 @SpringBootApplication public class So482134501Application { private final Map registrations = new HashMap<>(); @Autowired private IntegrationFlowContext flowContext; public static void main(String[] args) { SpringApplication.run(So482134501Application.class, args); } @Bean public PublishSubscribeChannel channel() { return new PublishSubscribeChannel(); } @Bean public ApplicationRunner runner(PublishSubscribeChannel channel) { return args -> { makeANewUdpInbound(1234); makeANewUdpInbound(1235); makeANewUdpOutbound(1234); makeANewUdpOutbound(1235); Thread.sleep(5_000); channel.send(MessageBuilder.withPayload("foo\n").build()); this.registrations.values().forEach(r -> { r.stop(); r.destroy(); }); this.registrations.clear(); }; } public void makeANewUdpOutbound(int port) { System.out.println("Creating an adapter to send to port " + port); IntegrationFlow flow = IntegrationFlows.from(channel()) .handle(new UnicastSendingMessageHandler("localhost", port)) .get(); IntegrationFlowRegistration registration = flowContext.registration(flow).register(); registrations.put(port, registration); } public void makeANewUdpInbound(int port) { System.out.println("Creating an adapter to receive from port " + port); IntegrationFlow flow = IntegrationFlows.from(new UnicastReceivingChannelAdapter(port)) .transform(String::new) .handle(System.out::println) .get(); IntegrationFlowRegistration registration = flowContext.registration(flow).register(); registrations.put(port, registration); } } 

结果:

 GenericMessage [payload=foo , headers={ip_packetAddress=localhost/127.0.0.1:54881, ip_address=127.0.0.1, id=db7dae61-078c-5eb6-dde4-f83fc6c591d1, ip_port=54881, ip_hostname=localhost, timestamp=1515764556722}] GenericMessage [payload=foo , headers={ip_packetAddress=localhost/127.0.0.1:54880, ip_address=127.0.0.1, id=d1f79e79-569b-637b-57c5-549051f1b031, ip_port=54880, ip_hostname=localhost, timestamp=1515764556722}]