Tag: spring integration

如何从Http集成流程创建Spring Reactor Flux?

我有一个与此非常类似的问题如何从ActiveMQ队列创建Spring Reactor Flux? 一个区别是消息来自Http端点而不是JMS队列。 问题是消息通道由于某种原因没有填充,或者Flux.from()没有获取它。 日志条目显示GenericMessage是从Http Integration流创建的,有效负载作为路径变量,但不会入队/发布到通道? 我试过.channel(MessageChannels.queue())和.channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。 这是代码: @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter(“/eventmessage/{id}”) .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression(“#pathVariables.id”) ) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value=”eventmessagechannel/{id}”, produces=MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages(@PathVariable String id){ return Flux.from(httpReactiveSource()) .map(Message::getPayload); } UPDATE1: 的build.gradle buildscript { ext { springBootVersion = ‘2.0.0.M2’ } repositories { mavenCentral() maven { […]

如何从两个MessageProducerSpec创建Spring Integration Flow?

我正在使用Spring Integration,Java DSL(版本1.1.3)我将org.springframework.integration.dsl.IntegrationFlow定义如下 return IntegrationFlows.from(messageProducerSpec) .handle(handler) .handle(aggregator) .handle(endpoint) .get(); } messageProducerSpec是org.springframework.integration.dsl.amqp.AmqpBaseInboundChannelAdapterSpec实例 我希望我的集成流程能够使用来自两个独立的messageProducerSpecs消息(两个独立的SimpleMessageListenerContainers ,每个都使用不同的ConnectionFactory )。 如何从多个messageProducerSpec构造integrationFlow? 我看不到任何集成组件能够使用来自多个源的消息。

如何从ActiveMQ队列创建Spring Reactor Flux?

我正在试验Spring Reactor 3组件和Spring Integration来从JMS队列创建一个反应流(Flux)。 我试图从JMS队列(使用Spring Integration的ActiveMQ)创建一个反应流(Spring Reactor 3 Flux),以便客户端异步获取JMS消息。 我相信我已正确连接所有内容但客户端在服务器停止之前不会收到任何JMS消息。 然后,所有消息被“推送”到客户端一次。 任何帮助,将不胜感激。 这是我用来配置JMS,Integration组件和被动发布者的配置文件: @Configuration @EnableJms @EnableIntegration public class JmsConfiguration { @Value(“${spring.activemq.broker-url:tcp://localhost:61616}”) private String defaultBrokerUrl; @Value(“${queues.patient:patient}”) private String patientQueue; @Autowired MessageListenerAdapter messageListenerAdapter; @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, jmsConnectionFactory()); return factory; } @Bean public Queue patientQueue() { return new […]

如何连接消息驱动的适配器列表而不实际写出每个适配器?

嘿所以我需要像十几个队列一样收听,或多或少地通过相同的处理流程将所有传入的消息放入其中。 我有消息驱动的通道适配器连接到每个队列: ……等等。 收到消息后,我将它们全部路由到同一个频道。 但是我仍然需要知道消息来自哪里,所以在我实际路由它们之前,我有点使用标头扩充器来为消息添加队列名称。 有没有办法让我喜欢迭代队列名称列表并动态创建这些适配器? 也许使用java配置? 提前致谢。

Spring集成 – 外部化JDBC查询

有没有一种简单的方法可以从jdbc出站网关外部化大sql查询,而不是内联它? 原因是我们需要做很多大的查询,我们希望将它们放在自己的文件中,或者至少将它们外化到bean中。 一些警告: 我无法控制数据库,所以我无法在那里创建任何东西(例如存储过程) 我不想仅为这个问题创建类,我只是想组织/重构它,而不是让它更复杂,引入许多其他步骤 我更喜欢创建裸.sql文件,但是将查询放在带有bean的xml中也是可以的 我没有选择使用hibernate,坚持使用spring集成jdbc 关于如何更好地组织这个的建议,考虑到我们将有许多其他出站网关,欢迎:) 例如,我不希望在“int-jdbc:outbound-gateway”元素中使用SQL内联,如下所示: 我用答案做了什么 只是: 它也适用于bean内部使用的“:payload”参数。

使用Spring集成IMAP适配器,如何手动获取标记为“未读”的电子邮件?

我有一段代码,它使用spring集成的IMAP适配器轮询收件箱,以读取所有未读的电子邮件,并且工作正常。 但是,如果我打开任何电子邮件,然后在我的Outlook收件箱中将其标记为“未读”,则轮询器不会获取标记的电子邮件。 我可以使用pop3适配器来获取所有电子邮件,但删除它们后,但我想将电子邮件保存在我的收件箱中,我希望轮询器能够获取所有看不见的电子邮件。 有什么建议来处理这个问题吗? 我一直在搜索和阅读有关电子邮件适配器的文章,但没有找到任何有用 提前致谢。

Spring集成 – Queue / Poller似乎在没有任何动作的情况下耗尽线程池

我有一个Spring集成应用程序,附加到AMQP代理。 我想从amqp队列接收消息,并更新db记录。 为了提高性能,我有一个工作池允许多个更新同时发生。 我有以下配置: 如果我开始运行,没有在AMQP频道上处理的入站消息,我很快就会看到thredpool耗尽,并开始拒绝。 这是日志: [Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) – Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) – Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) […]

如何在spring集成中动态创建ftp适配器?

谢谢你的关注 我在我的项目中使用了spring集成,我想从多个ftp服务器检索许多输入文件,其中不同的地址如下图所示: 如何在我的项目中创建动态inbound-adapter来轮询和检索服务器中的文件?

如何在弹簧集成中并行和同步处理?

是否有可能在Spring集成中保持通道同步(在发送消息后获得确认)但同时处理更多消息(并行处理)而不用线程创建自己的代码(即ExecutorService执行并提交worker)并等待它们? 我想通过FTP上传文件,但同时上传更多文件而不在代码中创建自己的线程。 我需要知道何时上传所有文件(这就是为什么我希望它是同步的)。 是否可以通过Spring集成配置,如果是,如何?

Spring Integration通过控制总线手动启动/停止通道适配器

反正手动启动/初始化通道适配器? 我在context.xml中有两对入站/出站适配器,并希望在运行时决定我想要启动哪一个。 编辑: 具体方案: 我有一个客户端,可以在运行时配置为mqtt发布者或订阅者。 我的context.xml看起来像这样: 如您所见,我有两个设置: 1.订户案例:读取mqtt消息 – >写入文件 2.发布者案例:从目录中轮询文件 – >通过mqtt发送 我在运行时决定应用什么设置。 那么你能告诉我这个控制总线的东西究竟适合这里吗?