如何触发预定的Spring批处理作业?
我希望能够使用REST控制器开始我的工作,然后当作业启动时,它应该按计划运行,直到我再次使用REST停止它。
所以这是我的控制器:
@RestController public class LauncherController { @Autowired JobLauncher jobLauncher; @Autowired Job job; @RequestMapping("/launch") public String launch() throws Exception { ... jobLauncher.run(job, jobParameters); }
这是Batch conf的一部分:
@Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Scheduled(cron = "0/5 * * * * ?") @Bean public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
我还设置了属性spring.batch.job.enabled = false,因为我不想在Spring Boot App启动后立即运行作业。
现在我可以打电话给我的Rest api lauch,这项工作仍在进行,但只有一次。 调度程序不起作用。 我无法确定我应该在哪里定义我的@Scheduled Annotation。
我会以某种方式处理它,预定的作业始终运行,但只有当标志设置为true时才会执行某些操作:
@Component class ScheduledJob { private final AtomicBoolean enabled = new AtomicBoolean(false); @Scheduled(fixedRate = 1000) void execute() { if (enabled.get()) { // run spring batch here. } } void toggle() { enabled.set(!enabled.get()); } }
和一个控制器:
@RestController class HelloController { private final ScheduledJob scheduledJob; // constructor @GetMapping("/launch") void toggle() { scheduledJob.toggle(); } }
首先,您要定义工作:
@Bean @Qualifier("fancyScheduledJob") public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); }
第二,你开始执行这项工作:
@Autowired @Qualifier(value = "fancyScheduledJob") private Job job; @Autowired private JobLauncher jobLauncher; @Scheduled(cron = "0/5 * * * * ?") public void launch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException { jobLauncher.run(job, JobParametersBuilder() .addLong("launchTime", System.currentTimeMillis()) .toJobParameters()) }
另请注意,引入了“launchTime”参数:默认情况下,spring批处理阻止使用相同的参数值启动作业。
虽然您的日程安排非常紧张 – 每5秒钟您应该了解并发性。 或者,如果您想确保在每个时刻只执行一个作业实例,您可以配置自定义单线程作业启动器:
@Bean(name = "fancyJobExecutorPool") public TaskExecutor singleThreadedJobExecutorPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(100500); executor.setThreadNamePrefix("fancy-job-batch-"); return executor; } @Bean(name = "fancyJobLauncher") public JobLauncher singleThreadedJobLauncher(JobRepository jobRepository) { SimpleJobLauncher sjl = new SimpleJobLauncher(); sjl.setJobRepository(jobRepository); sjl.setTaskExecutor(singleThreadedJobExecutorPool()); return sjl; }
并在启动期间使用此单线程作业启动器。
@Autowired @Qualifier("fancyJobLauncher") private JobLauncher jobLauncher;
通过这种方式,您的作业实例将逐个执行(但这不会限制并行执行作业中的步骤)。
在此解决方案中,您将能够使用http请求计划和取消计划预定义的作业。 在这个例子中,我们将创建每日,每周和一个时间作业。 该应用程序使用Quartz
。
org.quartz-scheduler quartz 2.2.3 org.springframework spring-tx org.springframework spring-context-support
首先我们要创建一个SpringBeanJobFactory
类,扩展SpringBeanJobFactory
。
- {@link AdaptableJobFactory}的子类,它还支持对bean属性的Spring样式*dependency injection。 这基本上是Spring的{@link QuartzJobBean}的直接*等价物,形状为Quartz * {@link org.quartz.spi.JobFactory}。 * *
将调度程序上下文,作业数据映射和触发器数据映射条目*应用为bean属性值。 如果未找到匹配的bean属性,则默认情况下将忽略条目*。 这类似于QuartzJobBean的行为。
public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { beanFactory = applicationContext.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } }
第二部分是配置石英配置。 在这个配置中我们需要创建一个
-
SchedulerFactoryBean
我们设置全局配置和应用程序上下文, -
JobDetailFactoryBean
我们设置我们的工作,jobGroup和类, -
CronTriggerFactoryBean
我们在哪里设置cron表达式。
QuartzConfig.class
@Configuration public class QuartzConfig { @Autowired ApplicationContext context; @Bean public SchedulerFactoryBean quartzScheduler(){ SchedulerFactoryBean quartzScheduler = new SchedulerFactoryBean(); quartzScheduler.setOverwriteExistingJobs(true); quartzScheduler.setSchedulerName("job-scheduler"); AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(context); quartzScheduler.setJobFactory(jobFactory); return quartzScheduler; } @Bean @Scope(value = "prototype") public JobDetailFactoryBean getJobBean(String jobName, String jobGroup, Class> clazz){ JobDetailFactoryBean bean = new JobDetailFactoryBean(); bean.setJobClass(clazz); bean.setGroup(jobGroup); bean.setName(jobName); return bean; } @Bean @Scope(value = "prototype") public CronTriggerFactoryBean getCronTriggerBean(String cronExpression, String triggerGroup){ CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); bean.setCronExpression(cronExpression); bean.setGroup(triggerGroup); return bean; } }
因此,在配置完成后,我们现在可以创建业务逻辑所在的作业。 为此,我们必须创建一个实现Job
的类。
@Component public class DailyJob implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("Daily Job runs!"); } }
DailyJob
类现在已准备就绪。 我们希望通过http请求从外部安排此作业。 在这个例子中,我们有一个控制器,我们可以发送jobname和cron表达式来安排dailyJob
。
@Controller public class JobController { @Autowired private Scheduler scheduler; @Autowired private ApplicationContext context;; @ResponseBody @RequestMapping(value = "/job/create/daily", method = RequestMethod.POST) public ResponseEntity dailyJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean( JobDetail.class, jobModel.getName(), "MyDailyJob", DailyJob.class); Trigger cronTrigger = context.getBean( Trigger.class, jobModel.getCronExpression(), "MyDailyJob"); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity (jobModel, HttpStatus.CREATED); } }
我们在这里看到的是,我们将使用JobModel
作为@RequestBody
发送一个post请求。 JobModel
是一个简单的Pojo,有两个属性name
和cronExpression
这两个字符串。
在这个方法中,我们必须创建之前在config类中配置的bean实例。 首先使用Quartz JobDetail.class
创建JobDetail
,作业名称,组名称和应安排的类(在本例中为DailyJob.class
)。 之后,我们必须使用Quartz Trigger.class
,cronExpression和组名创建Trigger。
创建两个bean后,我们需要立即安排作业。 所以我们已经自动安装了Quartz Scheduler
来安排工作。 之后,该作业已启用并准备好完成其工作。
所以让我们测试一下这些东西。 启动应用程序并将发布请求发送到/job/create/daily
:
{"name":"Job 1", "cronExpression":"0 * * * * ?"}
在这里,我们说工作应该每分钟运行一次(只是为了看到一切正常)。 在你的控制台中你应该看到每一分钟的Daily Job runs!
。
以下是您可以做的其他一些事情。 例如,获取预定作业的列表:
@ResponseBody @RequestMapping("job/list") public List jobList() throws SchedulerException { return scheduler.getJobGroupNames(); }
要删除作业,您也可以创建端点。 例如:
@ResponseBody @RequestMapping(value = "job/delete/daily", method = RequestMethod.POST) public ResponseEntity deleteJob(@RequestBody JobModel jobModel) throws SchedulerException { JobKey jobKey = new JobKey(jobModel.getName(), "MyDailyJob"); return new ResponseEntity (scheduler.deleteJob(jobKey), HttpStatus.OK); }
您可以自由创建许多不同的端点,以获取有关当前正在运行的作业的信息,作业运行的频率,重新安排作业等。 重要的是,您的工作名称和工作组(在我们的案例中为"MyDailyJob"
)是可重用的。 创建jobKey需要这些信息。
PS:只是为了显示其他工作的其他映射:
@ResponseBody @RequestMapping(value = "/job/create/weekly", method = RequestMethod.POST) public ResponseEntity weeklyJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean(JobDetail.class, jobModel.getName(), JobGroup.WEEKLY_GROUP.name(), WeeklyJob.class); Trigger cronTrigger = context.getBean(Trigger.class, jobModel.getCronExpression(), JobGroup.WEEKLY_GROUP.name()); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity (jobModel, HttpStatus.CREATED); } @ResponseBody @RequestMapping(value = "/job/create/oneTime", method = RequestMethod.POST) public ResponseEntity oneTimeJob(@RequestBody JobModel jobModel) throws SchedulerException { JobDetail jobDetail = context.getBean(JobDetail.class, jobModel.getName(), JobGroup.ONE_TIME_GROUP.name(), OneTimeJob.class); Trigger cronTrigger = context.getBean(Trigger.class, jobModel.getCronExpression(), JobGroup.ONE_TIME_GROUP.name()); scheduler.scheduleJob(jobDetail, cronTrigger); return new ResponseEntity (jobModel, HttpStatus.CREATED); }
完整的应用程序在github上
@Scheduled
是在方法上定义的,而不是在Bean上定义的。 因此,创建一个将成为Bean的新类
public class BatchConfiguration { ... @Bean public Job job() { return new Job(); }
新课程:
public class Job { @Scheduled(cron = "0/5 * * * * ?") public Job job() { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .flow(step1()) .end() .build(); }