Camel 2.11批量聚合如何与单独的路由一起工作?

首先,有一个类似的未回答的问题将路由连接到单个聚合器

我们有一些消费者路线(ftp,file,smb)从远程系统读取文件。 使用直接路由进行简化测试,但与批量使用者的行为类似:

from("direct:"+routeId).id(routeId) .setProperty(AGGREGATION_PROPERTY, constant(routeId)) .log(String.format("Sending (${body}) to %s", "direct:start1")) .to("direct:aggregate"); 

转换后,一次轮询的所有结果将在一个单独的路径中按批次聚合:

 from("direct:aggregate") .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy()) .completionFromBatchConsumer() .to("log:result", "mock:result"); 

如果每个消费者分开运行,一切正常。 但如果多个消费者并行运行,聚合将拆分民意调查。 例如,如果文件消费者轮询500条消息,而第二条路线开始从ftp读取6个文件,那么我们得到2个聚合1,其中500条来自文件,1条来自ftp的6条消息。

测试用例:

 public void testAggregateByProperty() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z"); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1); template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7); assertMockEndpointsSatisfied(); } 

结果是:“A + A”,“B”,“A”,“B”,“A”而不是预期的“A + A + A”,“B + B”,“A”,“Z” 。 问题:

  1. 我们对聚合的假设是错误的吗?
  2. 我们如何实现预期的行为?
  3. 如果我们设置了completionTimeout,那么它会在第一次交换时发生超时 – 如果还有新的交换,那么它是独立的吗?

你几乎让它工作了。 这是你需要的改变(在我解释之后)。

 from("direct:aggregate").id("aggregate") .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy()) .completionSize(property(Exchange.BATCH_SIZE)) .to("log:result", "mock:result") 

结果将是:

 Exchange received, body: A+A+A Exchange received, body: B+B Exchange received, body: A 

注意:由于批次大小为7您将不会收到"Z"的结果。

要解释 – 正如您所读到的, 聚合器是一个多function的骆驼组件,正确定义的关键是:

  • 聚合表达式
  • 完成规则

现在,在您的情况下,您正在聚合属性AGGREGATION_PROPERTY ,它将是ABZ 此外,您还指定了批量大小。

但是,您没有在路线中表达completionSize() 。 相反,你使用的是completionFromBatchConsumer – 它做了一些不同的事情(代码声明它寻找Exchange#BATCH_COMPLETE属性),因此产生了奇怪的结果。

无论如何, .completionSize(Exchange.BATCH_SIZE)将使您的测试运行。

祝你好运。