Camel – 流缓存不缓存/无法转换?

在阅读一次之后,我似乎正在失去我的“身体”。 请注意,我正在使用Camel的流缓存,并且输入是来自http组件的json文件。 我有一个处理器,代码如下。

log.debug("Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName()); log.debug("In msg1:" + exchange.getIn().getBody(String.class)); log.debug("In msg2:" + exchange.getIn().getBody(String.class)); 

我期望在这里看到的是msg1和msg2是相同的输出,但是msg2返回一个空字符串(非空)。 以下是TRACE级别的日志。

 1- DEBUG com.mycompany.MyProcessor : Body Type: org.apache.camel.converter.stream.InputStreamCache 2- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef 3- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String] 4- DEBUG com.mycompany.MyProcessor : In msg1:{myJson} 5- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef 6- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String] 7- DEBUG com.mycompany.MyProcessor : In msg2: 

日志中要注意的事项:

  • 第1行 – 正文类型正确显示缓存的输入流
  • 第4行转换为String 确实可以生成msg1,即使第3行(转换代码)似乎因IOException而失败
  • 第6行 – 转换失败,但重要的是要注意主体仍然是缓存流。
  • 第7行 – 我的消息丢失了。

那么msg2在哪里?

编辑

除了Peter的答案之外,还有一些事情需要提及:

Camel的MessageHelper静态类有两个有用的function:

  • resetStreamCache
  • extractBodyAsString

这两种情况都有助于解决这种情况

使用流缓存允许您在不同的处理器中多次读取流在同一处理器中仍然只能读取一次

我测试过:

 ModelCamelContext context = new DefaultCamelContext(); context.setStreamCaching(true); //!! // ... from("direct:start") .to("http://ip.jsontest.com/?callback=showMyIP") .process(new MyProcessor()) .process(new MyProcessor()); 

和:

 public class MyProcessor implements Processor { private static final Logger LOG = LoggerFactory.getLogger(HttpStreamCache.MyProcessor.class); @Override public void process(final Exchange exchange) throws Exception { LOG.info("***** Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName()); LOG.info("***** In msg1 : " + exchange.getIn().getBody(String.class)); LOG.info("***** In msg2 : " + exchange.getIn().getBody(String.class)); } }; 

这打印:

 INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"}); INFO ***** In msg2 : INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"}); INFO ***** In msg2 : 

我相信你的问题不是输入流消失了,而是它的读者位置是消费的最后一点,在这种情况下,在流的末尾。

您可以使用流“倒带”方法,该方法可以将您的读者带到流的开头 – 仅当流支持它时。

这是一个提供这种重置器的scala代码:

 object StreamResetter extends Processor { import org.apache.camel.util.MessageHelper import org.apache.camel.Exchange override def process(exchange: Exchange): Unit = MessageHelper.resetStreamCache(exchange.getIn) } 

在您的路线上,就在第二次日志呼叫之前,您可以使用

 process(StreamResetter) 

问候。

如果您的JSON输入流的内容不是非常大,您可以将输入流转换为ByteArrayInputStream ,这样您就可以在任何Processor多次读取它。 希望有所帮助。