Subject: [PATCH] gh3857 --- Index: spring-batch-samples/src/main/java/org/springframework/batch/sample/JobConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/JobConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/JobConfiguration.java new file mode 100644 --- /dev/null (date 1689669045896) +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/JobConfiguration.java (date 1689669045896) @@ -0,0 +1,163 @@ +package org.springframework.batch.sample; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.sql.DataSource; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.job.builder.FlowBuilder; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.support.JdbcTransactionManager; + +@Configuration +@EnableBatchProcessing +public class JobConfiguration { + + //This method of dynamic construction of a parallel flow works! + @Bean + public Flow parallelFlowWorks(JobRepository jobRepository, JdbcTransactionManager transactionManager) { + SimpleAsyncTaskExecutor exe = new SimpleAsyncTaskExecutor("WorksExe"); + exe.setConcurrencyLimit(2); + + FlowBuilder myFlow = new FlowBuilder("WorkingParallelFlow"); + + //Create a list of Steps, each single step wrapped in a flow because Flow.split.add only accepts flows + List flows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + flows.add(createFlow("WorkingParallelFlow", i, jobRepository, transactionManager)); + } + + myFlow.split(exe).add(flows.toArray(new Flow[flows.size()])); //Add flows all in one 'add' operation + + return myFlow.build(); + } + + //This method of dynamic construction of a parallel flow, results in a flow that hangs. + @Bean + public Flow parallelFlowNoWork(JobRepository jobRepository, JdbcTransactionManager transactionManager) { + SimpleAsyncTaskExecutor exe = new SimpleAsyncTaskExecutor("NoWorksExe"); + exe.setConcurrencyLimit(2); + + FlowBuilder myFlow = new FlowBuilder("NonWorkingParallelFlow"); + + //Directly add flows to Flow.split.add, calling add for each flow (apparently doesn't work correctly) + FlowBuilder.SplitBuilder split = myFlow.split(exe); + for (int i = 0; i < 10; i++) { + split.add(createFlow("NonWorkingParallelFlow", i, jobRepository, transactionManager)); // <<< Key change - call split.add() multiple times + } + + return myFlow.build(); + } + + @Bean + public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager) { + return new JobBuilder("MyJob", jobRepository) + .start(parallelFlowWorks(jobRepository, transactionManager)) + .next(new StepBuilder("WorkingFlowDone", jobRepository) + .tasklet((contribution, chunkContext) -> { + System.out.println("#### WorkingFlowDone, now starting NonWorkingFlow ####"); + return RepeatStatus.FINISHED; + }, transactionManager).build()) + .next(parallelFlowNoWork(jobRepository, transactionManager)) /* This one just hangs */ + .build().build(); + } + + // + //Methods below just help in the construction + + /** + * Create a Flow two wrap a single step. + * Steps can't be directly added to a Flow.split.add, so this just wraps for that purpose. + * @param parentFlowName + * @param stepId + * @return + */ + public Flow createFlow(String parentFlowName, int stepId, JobRepository jobRepository, JdbcTransactionManager transactionManager) { + Step step = createStep(parentFlowName, stepId, stepId * 10, jobRepository, transactionManager); + + return new FlowBuilder("WrapperFlowForStep_" + step.getName()) + .start(step).build(); + } + + /** + * Create a step that just sleeps for a bit. ParentFlowName and stepId just ensure it has a unique name. + * @param parentFlowName + * @param stepId + * @param taskMillsDuration + * @return + */ + public Step createStep(String parentFlowName, int stepId, int taskMillsDuration, JobRepository jobRepository, JdbcTransactionManager transactionManager) { + String name = parentFlowName + "_Step_" + stepId + "_duration_" + taskMillsDuration; + + return new StepBuilder(name, jobRepository) + .tasklet((contribution, chunkContext) -> { + System.out.println(">>>>>>>> Begin " + name + " <<<<<<<<<<<<<<<<<<<<"); + long sleepTime = sleep(taskMillsDuration); + System.out.println(">>>>>>>> Done " + name + " Actual sleep ms = " + sleepTime + " <<<<<<<<<<<<<<<<<<<<"); + + return RepeatStatus.FINISHED; + }, transactionManager).build(); + } + + + /** + * Sleep the running thread, ignoring interruptions. + * + * @param msDuration + * @return Actual time slept. + */ + public static long sleep(int msDuration) { + long remaining = msDuration; + long start = System.currentTimeMillis(); + + while (remaining > 0) { + + try { + TimeUnit.MILLISECONDS.sleep(remaining); + } catch (InterruptedException ie) { + //ignore + } finally { + remaining = remaining - (System.currentTimeMillis() - start); + } + } + + return System.currentTimeMillis() - start; + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("/org/springframework/batch/core/schema-hsqldb.sql") + .generateUniqueName(true) + .build(); + } + + @Bean + public JdbcTransactionManager transactionManager(DataSource dataSource) { + return new JdbcTransactionManager(dataSource); + } + + public static void main(String[] args) throws Exception { + ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + jobLauncher.run(job, new JobParameters()); + } +} \ No newline at end of file