Spring Cloud数据流类型转换在处理器组件中不起作用?

我有一个处理器,它将byte[]有效负载转换为MyClass有效负载:

 @Slf4j @EnableBinding(Processor.class) public class MyDecoder { @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public MyClass decode(final byte[] payload) { MyClass decoded = doStuff(payload); if (decoded != null) { log.info("Successfully decoded!"); } return decoded; } } 

我尝试创建以下DSL: some-source | my-decoder | some-sink some-source | my-decoder | some-sink some-source | my-decoder | some-sinksome-sink报告错误,因为它在classLoader中没有MyClass类。 这是预期的行为。

我尝试在my-decoder上应用类型转换,例如: some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink我在my-decoder日志中遇到以下错误:

 2017-01-20 21:45:17.278 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 2017-01-20 21:45:18.441 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 2017-01-20 21:45:20.512 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] oskafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [B@7dfad000) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na] at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] Caused by: java.lang.IllegalArgumentException: payload must not be null at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] at org.springframework.integration.support.MutableMessage.(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.support.MutableMessage.(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] ... 42 common frames omitted 

我可以看到消息已从byte[]转换为MyClass ,并且不为null。 我不明白为什么我在失败前看到该消息3次因为kafka属性’retries’为0,如启动时my-decoder log中所示:

 2017-01-20 21:44:32.080 INFO 9408 --- [ main] oakclients.producer.ProducerConfig : ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [localhost:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null ssl.protocol = TLS request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 0 

我尝试编写集成测试:

 @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) @DirtiesContext public abstract class MyDecoderTests { @Autowired protected Processor channels; @Autowired protected MessageCollector collector; public static class UsingNothingIntegrationTests extends MyDecoderTest { @Test public void test() throws Exception { channels.input().send(new GenericMessage(Hex.decodeHex("ff".toCharArray()))); assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class))); } } @SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json") public static class UsingOutputConverterIntegrationTests extends MyDecoderTest { @Test public void test() throws Exception { channels.input().send(new GenericMessage(Hex.decodeHex("ff".toCharArray()))); assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}"))); } } @Configuration @EnableAutoConfiguration @Import(MyDecoderConfiguration.class) public static class MyDecoderTestApplication { } } 

测试成功运行,转换发生。

然后,我认为我的DSL不对,所以我写了一个新的源代码来测试:

 @Bean @InboundChannelAdapter(Source.OUTPUT) public MessageSource exampleSource() { return () -> new GenericMessage(getMyClassObject()); } 

以下DSL按预期将MyClass转换为JSON: my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink

为什么我收到有关解码记录3次的消息,为什么它失败并且“有效负载必须不为空”消息? 这是我的处理器吗?

您将看到3次尝试,因为这是绑定器中输入通道的默认重试配置。

绑定器中有一个错误,如果转换器无法转换消息,它会尝试创建一个带有null载荷的消息。

它无法转换有效负载的原因是因为它看到了入站内容类型(可能是application/octet-stream ),并且无法将其转换为JSON。

解决方法是将文件添加到类路径:

 META-INF/spring.integration.properties 

并添加

 spring.integration.readOnly.headers=contentType 

它。

这可以防止入站内容类型标头传播到出站邮件。

这需要弹簧集成4.3.2或更高。

在SCST的未来版本中,默认情况下将设置此项。