骆驼聚合策略

我正在解析一个CSV文件,将其拆分并通过camel中的多个处理器进行路由。 有两个端点,一个具有错误数据,另一个端点具有validation数据。

我需要汇总数据的建议。

假设CSV文件有10条记录,其中6条记录到达一个端点,而4条记录到达另一个端点。 如何从每个端点的文件中完成所有10个并完成聚合器。 我需要创建两个文件,一个包含有效数据,另一个包含来自单个文件的损坏数据。

让我们看一下分离器返回的内容。

根据Camel 2.2的文档。 或者更早,拆分器将默认使用您的示例返回最后一条拆分消息,这可能是完成其处理器的最后一行,因此它可能不是第10行(使用您的示例)。

在Camel 2.3及更新版本中,分割器默认返回原始输入消息,即全部10行。 这是默认行为,您无需编写任何代码即可使用。 默认情况下,当拆分器完成时,它会将此消息传递给下一个端点。

所以,如果我在Camel 2.3或更新版本上使用以下DSL:

    A\nB\nC          

以下内容将出现在日志中:

  INFO The message body before the splitter contains A B C INFO Split line A INFO Split line B INFO Split line C INFO The message body after the splitter contains A B C 

正如您所看到的,默认情况下,camel会在拆分器返回后将消息组合回一个消息。 要覆盖此行为,您需要实现自己的聚合器。 为此,创建一个类让我们调用它MyAggregationStrategy并使该类实现AggregationStrategy 。 我在这里使用了apache文档中的示例。 例如,我们将汇总收到的出价,并希望汇总最高出价。

 private static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { // the first time we only have the new exchange so it wins the first round return newExchange; } int oldPrice = oldExchange.getIn().getBody(Integer.class); int newPrice = newExchange.getIn().getBody(Integer.class); // return the "winner" that has the highest price return newPrice > oldPrice ? newExchange : oldExchange; } } 

完成此操作后,您可以通过执行以下操作告诉拆分器使用您的聚合器:

Spring / XML DSL:

  

在Java中:

 from("direct:start") // aggregated by header id and use our own strategy how to aggregate .aggregate(new MyAggregationStrategy()) 

希望这能让您充分了解分离器的工作原理。 在你的情况下,我可能会为每一行设置一个标题值,表明它是成功还是失败然后我会使用我的客户聚合器创建一个失败的新消息,并将成功分组为两个列表作为消息正文。 一个列表包含失败,一个列表包含已完成的订单项。

然后可以将该新的聚合消息发送到处理器或另一个端点以进行进一步处理。 例如,您可以获取失败的列表并将其发送到生成文件的路由。 seda组件可以在这里提供很多帮助。