Camel – 流缓存不缓存/无法转换?
在阅读一次之后,我似乎正在失去我的“身体”。 请注意,我正在使用Camel的流缓存,并且输入是来自http组件的json文件。 我有一个处理器,代码如下。
log.debug("Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName()); log.debug("In msg1:" + exchange.getIn().getBody(String.class)); log.debug("In msg2:" + exchange.getIn().getBody(String.class));
我期望在这里看到的是msg1和msg2是相同的输出,但是msg2返回一个空字符串(非空)。 以下是TRACE级别的日志。
1- DEBUG com.mycompany.MyProcessor : Body Type: org.apache.camel.converter.stream.InputStreamCache 2- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef 3- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String] 4- DEBUG com.mycompany.MyProcessor : In msg1:{myJson} 5- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef 6- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String] 7- DEBUG com.mycompany.MyProcessor : In msg2:
日志中要注意的事项:
- 第1行 – 正文类型正确显示缓存的输入流
- 第4行转换为String 确实可以生成msg1,即使第3行(转换代码)似乎因IOException而失败
- 第6行 – 转换失败,但重要的是要注意主体仍然是缓存流。
- 第7行 – 我的消息丢失了。
那么msg2在哪里?
编辑
除了Peter的答案之外,还有一些事情需要提及:
Camel的MessageHelper静态类有两个有用的function:
-
resetStreamCache
-
extractBodyAsString
这两种情况都有助于解决这种情况
使用流缓存允许您在不同的处理器中多次读取流,但在同一处理器中仍然只能读取一次 。
我测试过:
ModelCamelContext context = new DefaultCamelContext(); context.setStreamCaching(true); //!! // ... from("direct:start") .to("http://ip.jsontest.com/?callback=showMyIP") .process(new MyProcessor()) .process(new MyProcessor());
和:
public class MyProcessor implements Processor { private static final Logger LOG = LoggerFactory.getLogger(HttpStreamCache.MyProcessor.class); @Override public void process(final Exchange exchange) throws Exception { LOG.info("***** Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName()); LOG.info("***** In msg1 : " + exchange.getIn().getBody(String.class)); LOG.info("***** In msg2 : " + exchange.getIn().getBody(String.class)); } };
这打印:
INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"}); INFO ***** In msg2 : INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"}); INFO ***** In msg2 :
我相信你的问题不是输入流消失了,而是它的读者位置是消费的最后一点,在这种情况下,在流的末尾。
您可以使用流“倒带”方法,该方法可以将您的读者带到流的开头 – 仅当流支持它时。
这是一个提供这种重置器的scala代码:
object StreamResetter extends Processor { import org.apache.camel.util.MessageHelper import org.apache.camel.Exchange override def process(exchange: Exchange): Unit = MessageHelper.resetStreamCache(exchange.getIn) }
在您的路线上,就在第二次日志呼叫之前,您可以使用
process(StreamResetter)
问候。
如果您的JSON输入流的内容不是非常大,您可以将输入流转换为ByteArrayInputStream
,这样您就可以在任何Processor
多次读取它。 希望有所帮助。