如何触发预定的Spring批处理作业?

我希望能够使用REST控制器开始我的工作,然后当作业启动时,它应该按计划运行,直到我再次使用REST停止它。

所以这是我的控制器:

@RestController public class LauncherController { @Autowired JobLauncher jobLauncher; @Autowired Job job; @RequestMapping("/launch") public String launch() throws Exception { ... jobLauncher.run(job, jobParameters); } 

这是Batch conf的一部分:

 @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Scheduled(cron = "0/5 * * * * ?") @Bean public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } 

我还设置了属性spring.batch.job.enabled = false,因为我不想在Spring Boot App启动后立即运行作业。

现在我可以打电话给我的Rest api lauch,这项工作仍在进行,但只有一次。 调度程序不起作用。 我无法确定我应该在哪里定义我的@Scheduled Annotation。

我会以某种方式处理它,预定的作业始终运行,但只有当标志设置为true时才会执行某些操作:

 @Component class ScheduledJob { private final AtomicBoolean enabled = new AtomicBoolean(false); @Scheduled(fixedRate = 1000) void execute() { if (enabled.get()) { // run spring batch here. } } void toggle() { enabled.set(!enabled.get()); } } 

和一个控制器:

 @RestController class HelloController { private final ScheduledJob scheduledJob; // constructor @GetMapping("/launch") void toggle() { scheduledJob.toggle(); } } 

首先,您要定义工作:

 @Bean @Qualifier("fancyScheduledJob") public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); } 

第二,你开始执行这项工作:

 @Autowired @Qualifier(value = "fancyScheduledJob") private Job job; @Autowired private JobLauncher jobLauncher; @Scheduled(cron = "0/5 * * * * ?") public void launch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException { jobLauncher.run(job, JobParametersBuilder() .addLong("launchTime", System.currentTimeMillis()) .toJobParameters()) } 

另请注意,引入了“launchTime”参数:默认情况下,spring批处理阻止使用相同的参数值启动作业。

虽然您的日程安排非常紧张 – 每5秒钟您应该了解并发性。 或者,如果您想确保在每个时刻只执行一个作业实例,您可以配置自定义单线程作业启动器:

 @Bean(name = "fancyJobExecutorPool") public TaskExecutor singleThreadedJobExecutorPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(100500); executor.setThreadNamePrefix("fancy-job-batch-"); return executor; } @Bean(name = "fancyJobLauncher") public JobLauncher singleThreadedJobLauncher(JobRepository jobRepository) { SimpleJobLauncher sjl = new SimpleJobLauncher(); sjl.setJobRepository(jobRepository); sjl.setTaskExecutor(singleThreadedJobExecutorPool()); return sjl; } 

并在启动期间使用此单线程作业启动器。

 @Autowired @Qualifier("fancyJobLauncher") private JobLauncher jobLauncher; 

通过这种方式,您的作业实例将逐个执行(但这不会限制并行执行作业中的步骤)。

在此解决方案中,您将能够使用http请求计划和取消计划预定义的作业。 在这个例子中,我们将创建每日,每周和一个时间作业。 该应用程序使用Quartz

   org.quartz-scheduler quartz 2.2.3   org.springframework spring-tx   org.springframework spring-context-support  

首先我们要创建一个SpringBeanJobFactory类,扩展SpringBeanJobFactory

  • {@link AdaptableJobFactory}的子类,它还支持对bean属性的Spring样式*dependency injection。 这基本上是Spring的{@link QuartzJobBean}的直接*等价物,形状为Quartz * {@link org.quartz.spi.JobFactory}。 * *

    将调度程序上下文,作业数据映射和触发器数据映射条目*应用为bean属性值。 如果未找到匹配的bean属性,则默认情况下将忽略条目*。 这类似于QuartzJobBean的行为。

 public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { beanFactory = applicationContext.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } } 

第二部分是配置石英配置。 在这个配置中我们需要创建一个

  • SchedulerFactoryBean我们设置全局配置和应用程序上下文,
  • JobDetailFactoryBean我们设置我们的工作,jobGroup和类,

  • CronTriggerFactoryBean我们在哪里设置cron表达式。

QuartzConfig.class

 @Configuration public class QuartzConfig { @Autowired ApplicationContext context; @Bean public SchedulerFactoryBean quartzScheduler(){ SchedulerFactoryBean quartzScheduler = new SchedulerFactoryBean(); quartzScheduler.setOverwriteExistingJobs(true); quartzScheduler.setSchedulerName("job-scheduler"); AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(context); quartzScheduler.setJobFactory(jobFactory); return quartzScheduler; } @Bean @Scope(value = "prototype") public JobDetailFactoryBean getJobBean(String jobName, String jobGroup, Class clazz){ JobDetailFactoryBean bean = new JobDetailFactoryBean(); bean.setJobClass(clazz); bean.setGroup(jobGroup); bean.setName(jobName); return bean; } @Bean @Scope(value = "prototype") public CronTriggerFactoryBean getCronTriggerBean(String cronExpression, String triggerGroup){ CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); bean.setCronExpression(cronExpression); bean.setGroup(triggerGroup); return bean; } } 

因此,在配置完成后,我们现在可以创建业务逻辑所在的作业。 为此,我们必须创建一个实现Job的类。

 @Component public class DailyJob implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("Daily Job runs!"); } } 

DailyJob类现在已准备就绪。 我们希望通过http请求从外部安排此作业。 在这个例子中,我们有一个控制器,我们可以发送jobname和cron表达式来安排dailyJob

 @Controller public class JobController { @Autowired private Scheduler scheduler; @Autowired private ApplicationContext context;; @ResponseBody @RequestMapping(value = "/job/create/daily", method = RequestMethod.POST) public ResponseEntity dailyJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean( JobDetail.class, jobModel.getName(), "MyDailyJob", DailyJob.class); Trigger cronTrigger = context.getBean( Trigger.class, jobModel.getCronExpression(), "MyDailyJob"); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity(jobModel, HttpStatus.CREATED); } } 

我们在这里看到的是,我们将使用JobModel作为@RequestBody发送一个post请求。 JobModel是一个简单的Pojo,有两个属性namecronExpression这两个字符串。

在这个方法中,我们必须创建之前在config类中配置的bean实例。 首先使用Quartz JobDetail.class创建JobDetail ,作业名称,组名称和应安排的类(在本例中为DailyJob.class )。 之后,我们必须使用Quartz Trigger.class ,cronExpression和组名创建Trigger。

创建两个bean后,我们需要立即安排作业。 所以我们已经自动安装了Quartz Scheduler来安排工作。 之后,该作业已启用并准备好完成其工作。

所以让我们测试一下这些东西。 启动应用程序并将发布请求发送到/job/create/daily

 {"name":"Job 1", "cronExpression":"0 * * * * ?"} 

在这里,我们说工作应该每分钟运行一次(只是为了看到一切正常)。 在你的控制台中你应该看到每一分钟的Daily Job runs!

以下是您可以做的其他一些事情。 例如,获取预定作业的列表:

  @ResponseBody @RequestMapping("job/list") public List jobList() throws SchedulerException { return scheduler.getJobGroupNames(); } 

要删除作业,您也可以创建端点。 例如:

 @ResponseBody @RequestMapping(value = "job/delete/daily", method = RequestMethod.POST) public ResponseEntity deleteJob(@RequestBody JobModel jobModel) throws SchedulerException { JobKey jobKey = new JobKey(jobModel.getName(), "MyDailyJob"); return new ResponseEntity(scheduler.deleteJob(jobKey), HttpStatus.OK); } 

您可以自由创建许多不同的端点,以获取有关当前正在运行的作业的信息,作业运行的频率,重新安排作业等。 重要的是,您的工作名称和工作组(在我们的案例中为"MyDailyJob" )是可重用的。 创建jobKey需要这些信息。

PS:只是为了显示其他工作的其他映射:

 @ResponseBody @RequestMapping(value = "/job/create/weekly", method = RequestMethod.POST) public ResponseEntity weeklyJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean(JobDetail.class, jobModel.getName(), JobGroup.WEEKLY_GROUP.name(), WeeklyJob.class); Trigger cronTrigger = context.getBean(Trigger.class, jobModel.getCronExpression(), JobGroup.WEEKLY_GROUP.name()); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity(jobModel, HttpStatus.CREATED); } @ResponseBody @RequestMapping(value = "/job/create/oneTime", method = RequestMethod.POST) public ResponseEntity oneTimeJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean(JobDetail.class, jobModel.getName(), JobGroup.ONE_TIME_GROUP.name(), OneTimeJob.class); Trigger cronTrigger = context.getBean(Trigger.class, jobModel.getCronExpression(), JobGroup.ONE_TIME_GROUP.name()); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity(jobModel, HttpStatus.CREATED); } 

完整的应用程序在github上

@Scheduled是在方法上定义的,而不是在Bean上定义的。 因此,创建一个将成为Bean的新类

 public class BatchConfiguration { ... @Bean public Job job() { return new Job(); } 

新课程:

 public class Job { @Scheduled(cron = "0/5 * * * * ?") public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); }