如何在弹簧集成中并行和同步处理?

是否有可能在Spring集成中保持通道同步(在发送消息后获得确认)但同时处理更多消息(并行处理)而不用线程创建自己的代码(即ExecutorService执行并提交worker)并等待它们? 我想通过FTP上传文件,但同时上传更多文件而不在代码中创建自己的线程。 我需要知道何时上传所有文件(这就是为什么我希望它是同步的)。 是否可以通过Spring集成配置,如果是,如何?

好吧,看起来你需要一些流程:

  1. 将文件发送到通道并等待一些结果作为确认

  2. ExecutorChannel以并行处理每个文件

  3. 上传每个文件

  4. 用于关联和分组

  5. 应该将结果发送到 ,那时是waitng。

如果有什么不清楚,请告诉我。

UPDATE

如何在Spring Integration Java DSL中做任何这样的例子?

像这样的东西:

 @Configuration @EnableIntegration @IntegrationComponentScan public class Configuration { @Bean public IntegrationFlow uploadFiles() { return f -> f.split() .handle(Ftp.outboundGateway(this.ftpSessionFactory, AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'")) .aggregate(); } } @MessagingGateway(defaultRequestChannel = "uploadFiles.input") interface FtpUploadGateway { List upload(List filesToUpload); } 

通过使用@Async任务处理,这在Spring中非常@Async

首先创建一个将异步执行任务的服务。 这里记下了@Async方法中的@Async注释,它将被spring扫描并标记为异步执行。

 import java.util.concurrent.Future; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; @Service public class AsyncTask { @Async public Future performTask(String someArgument) { // put the business logic here and collect the result below Result result = new Result(); // this is some custom bean holding your result return new AsyncResult(result); } } 

接下来创建一个组件( 可选 – 可以来自任何其他现有服务 ),它将调用上述服务。

 import java.util.concurrent.Future; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class AsyncClass { @Autowired private AsyncTask asyncTask; public void doAsyncOperation() throws Exception { List> futures = new ArrayList>(); for (int i = 1; i < 10; i++) { // Simulate multiple calls Future future = doAsync(String.valueOf(i)); futures.add(future); } for (Future future : futures) { // fetch the result Result result = future.get(); // process the result } } private Future doAsync(final String someArgument) { // this will immediately return with a placeholder Future object which // can be used later to fetch the result Future future = asyncTask.performAsync(someArgument); return future; } } 

启用异步所需的示例xml配置如下( 对于基于注释的配置使用@EnableAsync

   

有关详细文档,请参阅此处