如何在弹簧集成中并行和同步处理?
是否有可能在Spring集成中保持通道同步(在发送消息后获得确认)但同时处理更多消息(并行处理)而不用线程创建自己的代码(即ExecutorService执行并提交worker)并等待它们? 我想通过FTP上传文件,但同时上传更多文件而不在代码中创建自己的线程。 我需要知道何时上传所有文件(这就是为什么我希望它是同步的)。 是否可以通过Spring集成配置,如果是,如何?
好吧,看起来你需要一些流程:
-
将文件发送到通道并等待一些结果作为确认 -
到ExecutorChannel
以并行处理每个文件 -
上传每个文件 -
用于关联和分组 -
应该将结果发送到
,那时是waitng。
如果有什么不清楚,请告诉我。
UPDATE
如何在Spring Integration Java DSL中做任何这样的例子?
像这样的东西:
@Configuration @EnableIntegration @IntegrationComponentScan public class Configuration { @Bean public IntegrationFlow uploadFiles() { return f -> f.split() .handle(Ftp.outboundGateway(this.ftpSessionFactory, AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'")) .aggregate(); } } @MessagingGateway(defaultRequestChannel = "uploadFiles.input") interface FtpUploadGateway { List upload(List filesToUpload); }
通过使用@Async
任务处理,这在Spring中非常@Async
。
首先创建一个将异步执行任务的服务。 这里记下了@Async
方法中的@Async
注释,它将被spring扫描并标记为异步执行。
import java.util.concurrent.Future; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; @Service public class AsyncTask { @Async public Future performTask(String someArgument) { // put the business logic here and collect the result below Result result = new Result(); // this is some custom bean holding your result return new AsyncResult (result); } }
接下来创建一个组件( 可选 – 可以来自任何其他现有服务 ),它将调用上述服务。
import java.util.concurrent.Future; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class AsyncClass { @Autowired private AsyncTask asyncTask; public void doAsyncOperation() throws Exception { List> futures = new ArrayList>(); for (int i = 1; i < 10; i++) { // Simulate multiple calls Future future = doAsync(String.valueOf(i)); futures.add(future); } for (Future future : futures) { // fetch the result Result result = future.get(); // process the result } } private Future doAsync(final String someArgument) { // this will immediately return with a placeholder Future object which // can be used later to fetch the result Future future = asyncTask.performAsync(someArgument); return future; } }
启用异步所需的示例xml配置如下( 对于基于注释的配置使用@EnableAsync )
有关详细文档,请参阅此处