Skip to content

Commit 1d45ed3

Browse files
committed
BATCH-1767: fix optimistic locking exception in multi-threaded step
Currently, when the commit of the chunk fails in a multi-threaded step, the step execution is rolled back to a previous, eventually obsolete version. This previous version might be obsolete because it could be modified by another thread. In this case, a OptimisticLockingFailureException is thrown when trying to persist the step execution leaving the step in an UNKNOWN state while it should be FAILED. This commit fixes the issue by refreshing the step execution to the latest correctly persisted version before applying the step contribution so that the contribution is applied on a fresh correct state. Resolves BATCH-1767
1 parent ea6c312 commit 1d45ed3

File tree

3 files changed

+308
-8
lines changed

3 files changed

+308
-8
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.commons.logging.LogFactory;
2020
import org.springframework.batch.core.BatchStatus;
2121
import org.springframework.batch.core.ChunkListener;
22+
import org.springframework.batch.core.JobExecution;
2223
import org.springframework.batch.core.JobInterruptedException;
2324
import org.springframework.batch.core.StepContribution;
2425
import org.springframework.batch.core.StepExecution;
@@ -338,6 +339,7 @@ private class ChunkTransactionCallback extends TransactionSynchronizationAdapter
338339
private boolean stepExecutionUpdated = false;
339340

340341
private StepExecution oldVersion;
342+
private ExecutionContext oldExecutionContext;
341343

342344
private boolean locked = false;
343345

@@ -359,6 +361,7 @@ public void afterCompletion(int status) {
359361
logger.info("Commit failed while step execution data was already updated. "
360362
+ "Reverting to old version.");
361363
copy(oldVersion, stepExecution);
364+
stepExecution.setExecutionContext(oldExecutionContext);
362365
if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
363366
rollback(stepExecution);
364367
}
@@ -396,8 +399,7 @@ public RepeatStatus doInTransaction(TransactionStatus status) {
396399

397400
// In case we need to push it back to its old value
398401
// after a commit fails...
399-
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
400-
copy(stepExecution, oldVersion);
402+
oldExecutionContext = new ExecutionContext(stepExecution.getExecutionContext());
401403

402404
try {
403405

@@ -432,6 +434,23 @@ public RepeatStatus doInTransaction(TransactionStatus status) {
432434
Thread.currentThread().interrupt();
433435
}
434436

437+
// Refresh stepExecution to the latest correctly persisted
438+
// state in order to apply the contribution on the latest version
439+
String stepName = stepExecution.getStepName();
440+
JobExecution jobExecution = stepExecution.getJobExecution();
441+
StepExecution lastStepExecution = getJobRepository()
442+
.getLastStepExecution(jobExecution.getJobInstance(), stepName);
443+
if (lastStepExecution != null &&
444+
!lastStepExecution.getVersion().equals(stepExecution.getVersion())) {
445+
copy(lastStepExecution, stepExecution);
446+
}
447+
448+
// Take a copy of the stepExecution in case we need to
449+
// undo the current contribution to the in memory instance
450+
// if the commit fails
451+
oldVersion = new StepExecution(stepName, jobExecution);
452+
copy(stepExecution, oldVersion);
453+
435454
// Apply the contribution to the step
436455
// even if unsuccessful
437456
if (logger.isDebugEnabled()) {
@@ -498,11 +517,9 @@ private void rollback(StepExecution stepExecution) {
498517
}
499518

500519
private void copy(final StepExecution source, final StepExecution target) {
501-
target.setVersion(source.getVersion());
502520
target.setWriteCount(source.getWriteCount());
503521
target.setFilterCount(source.getFilterCount());
504522
target.setCommitCount(source.getCommitCount());
505-
target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
506523
}
507524

508525
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.step.item;
17+
18+
import org.junit.Assert;
19+
import org.junit.Test;
20+
import org.springframework.batch.core.BatchStatus;
21+
import org.springframework.batch.core.Job;
22+
import org.springframework.batch.core.JobExecution;
23+
import org.springframework.batch.core.JobParameters;
24+
import org.springframework.batch.core.StepExecution;
25+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
26+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
27+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
28+
import org.springframework.batch.core.launch.JobLauncher;
29+
import org.springframework.batch.core.listener.ChunkListenerSupport;
30+
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
31+
import org.springframework.batch.core.scope.context.ChunkContext;
32+
import org.springframework.batch.core.step.tasklet.TaskletStep;
33+
import org.springframework.batch.item.ItemReader;
34+
import org.springframework.batch.item.ItemWriter;
35+
import org.springframework.beans.factory.annotation.Autowired;
36+
import org.springframework.context.ApplicationContext;
37+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.context.annotation.Import;
41+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
42+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
43+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
44+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
45+
import org.springframework.transaction.PlatformTransactionManager;
46+
import org.springframework.transaction.support.DefaultTransactionStatus;
47+
48+
import javax.sql.DataSource;
49+
import java.util.concurrent.atomic.AtomicInteger;
50+
51+
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertTrue;
53+
54+
/**
55+
* Tests for the behavior of a multi-threaded TaskletStep.
56+
*
57+
* @author Mahmoud Ben Hassine
58+
*/
59+
public class MultiThreadedTaskletStepIntegrationTests {
60+
61+
@Test
62+
public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception {
63+
// given
64+
Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class};
65+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
66+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
67+
Job job = context.getBean(Job.class);
68+
69+
// when
70+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
71+
72+
// then
73+
Assert.assertNotNull(jobExecution);
74+
Assert.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
75+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
76+
Assert.assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
77+
assertEquals(0, stepExecution.getFailureExceptions().size());
78+
// assert that metrics are correct
79+
assertEquals(9, stepExecution.getReadCount());
80+
assertEquals(9, stepExecution.getWriteCount());
81+
// assert that execution context is correct
82+
assertTrue(stepExecution.getExecutionContext().containsKey("spring-batch-worker-thread-1"));
83+
assertTrue(stepExecution.getExecutionContext().containsKey("spring-batch-worker-thread-2"));
84+
assertTrue(stepExecution.getExecutionContext().containsKey("spring-batch-worker-thread-3"));
85+
}
86+
87+
@Test
88+
public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception {
89+
// given
90+
Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class};
91+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
92+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
93+
Job job = context.getBean(Job.class);
94+
95+
// when
96+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
97+
98+
// then
99+
Assert.assertNotNull(jobExecution);
100+
Assert.assertEquals(BatchStatus.FAILED, jobExecution.getStatus());
101+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
102+
Assert.assertEquals(BatchStatus.FAILED, stepExecution.getStatus());
103+
Throwable e = stepExecution.getFailureExceptions().get(0);
104+
assertEquals("Planned commit exception!", e.getMessage());
105+
// assert that metrics are correct (contribution of thread 2 is rolled back)
106+
assertEquals(9, stepExecution.getReadCount());
107+
assertEquals(6, stepExecution.getWriteCount());
108+
// No assertions on execution context because it is undefined in this case
109+
}
110+
111+
@Test
112+
public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception {
113+
// given
114+
Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class};
115+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
116+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
117+
Job job = context.getBean(Job.class);
118+
119+
// when
120+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
121+
122+
// then
123+
Assert.assertNotNull(jobExecution);
124+
Assert.assertEquals(BatchStatus.UNKNOWN, jobExecution.getStatus());
125+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
126+
Assert.assertEquals(BatchStatus.UNKNOWN, stepExecution.getStatus());
127+
Throwable e = stepExecution.getFailureExceptions().get(0);
128+
assertEquals("Planned rollback exception!", e.getMessage());
129+
// assert that metrics are correct (contribution of thread 2 is rolled back)
130+
assertEquals(9, stepExecution.getReadCount());
131+
assertEquals(6, stepExecution.getWriteCount());
132+
// No assertions on execution context because it is undefined in this case
133+
}
134+
135+
@Configuration
136+
@EnableBatchProcessing
137+
public static class JobConfiguration {
138+
139+
@Autowired
140+
private JobBuilderFactory jobBuilderFactory;
141+
@Autowired
142+
private StepBuilderFactory stepBuilderFactory;
143+
144+
@Bean
145+
public TaskletStep step() {
146+
return stepBuilderFactory.get("step")
147+
.<Integer, Integer>chunk(3)
148+
.reader(itemReader())
149+
.writer(itemWriter())
150+
.taskExecutor(taskExecutor())
151+
.listener(new ChunkListenerSupport() {
152+
@Override
153+
public void beforeChunk(ChunkContext context) {
154+
context
155+
.getStepContext()
156+
.getStepExecution()
157+
.getExecutionContext()
158+
.put(Thread.currentThread().getName(), "done");
159+
}
160+
})
161+
.build();
162+
}
163+
164+
@Bean
165+
public Job job(ThreadPoolTaskExecutor taskExecutor) {
166+
return jobBuilderFactory.get("job")
167+
.start(step())
168+
.listener(new JobExecutionListenerSupport() {
169+
@Override
170+
public void afterJob(JobExecution jobExecution) {
171+
taskExecutor.shutdown();
172+
}
173+
})
174+
.build();
175+
}
176+
177+
@Bean
178+
public ThreadPoolTaskExecutor taskExecutor() {
179+
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
180+
taskExecutor.setCorePoolSize(3);
181+
taskExecutor.setMaxPoolSize(3);
182+
taskExecutor.setThreadNamePrefix("spring-batch-worker-thread-");
183+
return taskExecutor;
184+
}
185+
186+
@Bean
187+
public ItemReader<Integer> itemReader() {
188+
return new ItemReader<Integer>() {
189+
private AtomicInteger atomicInteger = new AtomicInteger();
190+
191+
@Override
192+
public synchronized Integer read() {
193+
int value = atomicInteger.incrementAndGet();
194+
return value <= 9 ? value : null;
195+
}
196+
};
197+
}
198+
199+
@Bean
200+
public ItemWriter<Integer> itemWriter() {
201+
return items -> {
202+
};
203+
}
204+
}
205+
206+
@Configuration
207+
public static class DataSourceConfiguration {
208+
209+
@Bean
210+
public DataSource dataSource() {
211+
return new EmbeddedDatabaseBuilder()
212+
.setType(EmbeddedDatabaseType.HSQL)
213+
.addScript("org/springframework/batch/core/schema-drop-hsqldb.sql")
214+
.addScript("org/springframework/batch/core/schema-hsqldb.sql")
215+
.build();
216+
}
217+
218+
}
219+
220+
@Configuration
221+
@Import(DataSourceConfiguration.class)
222+
public static class TransactionManagerConfiguration {
223+
224+
@Bean
225+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
226+
return new DataSourceTransactionManager(dataSource);
227+
}
228+
229+
}
230+
231+
@Configuration
232+
@Import(DataSourceConfiguration.class)
233+
public static class CommitFailingTransactionManagerConfiguration {
234+
235+
@Bean
236+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
237+
return new DataSourceTransactionManager(dataSource) {
238+
@Override
239+
protected void doCommit(DefaultTransactionStatus status) {
240+
super.doCommit(status);
241+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
242+
throw new RuntimeException("Planned commit exception!");
243+
}
244+
}
245+
};
246+
}
247+
248+
}
249+
250+
@Configuration
251+
@Import(DataSourceConfiguration.class)
252+
public static class RollbackFailingTransactionManagerConfiguration {
253+
254+
@Bean
255+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
256+
return new DataSourceTransactionManager(dataSource) {
257+
@Override
258+
protected void doCommit(DefaultTransactionStatus status) {
259+
super.doCommit(status);
260+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
261+
throw new RuntimeException("Planned commit exception!");
262+
}
263+
}
264+
265+
@Override
266+
protected void doRollback(DefaultTransactionStatus status) {
267+
super.doRollback(status);
268+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
269+
throw new RuntimeException("Planned rollback exception!");
270+
}
271+
}
272+
};
273+
}
274+
275+
}
276+
277+
}

spring-batch-docs/asciidoc/scalability.adoc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ your step, such as a `DataSource`. Be sure to make the pool in those resources
123123
as large as the desired number of concurrent threads in the step.
124124

125125
There are some practical limitations of using multi-threaded `Step` implementations for
126-
some common batch use cases. Many participants in a `Step` (such as readers and writers)
126+
some common batch use cases:
127+
128+
* Many participants in a `Step` (such as readers and writers)
127129
are stateful. If the state is not segregated by thread, then those components are not
128130
usable in a multi-threaded `Step`. In particular, most of the off-the-shelf readers and
129131
writers from Spring Batch are not designed for multi-threaded use. It is, however,
@@ -132,9 +134,8 @@ possible to work with stateless or thread safe readers and writers, and there is
132134
https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples[Spring
133135
Batch Samples] that shows the use of a process indicator (see
134136
<<readersAndWriters.adoc#process-indicator,Preventing State Persistence>>) to keep track
135-
of items that have been processed in a database input table.
136-
137-
Spring Batch provides some implementations of `ItemWriter` and `ItemReader`. Usually,
137+
of items that have been processed in a database input table. Spring Batch provides some
138+
implementations of `ItemWriter` and `ItemReader`. Usually,
138139
they say in the Javadoc if they are thread safe or not or what you have to do to avoid
139140
problems in a concurrent environment. If there is no information in the Javadoc, you can
140141
check the implementation to see if there is any state. If a reader is not thread safe,
@@ -143,6 +144,11 @@ synchronizing delegator. You can synchronize the call to `read()` and as long as
143144
processing and writing is the most expensive part of the chunk, your step may still
144145
complete much faster than it would in a single threaded configuration.
145146

147+
* In a multi-threaded `Step`, each thread runs in its own transaction and the `ChunkContext`
148+
is shared between threads. This shared state might end up in an inconsistent state
149+
if one of the transactions is rolled back. Hence, we recommend avoiding `ExecutionContext`
150+
manipulation in a multi-threaded `Step`.
151+
146152
[[scalabilityParallelSteps]]
147153

148154

0 commit comments

Comments
 (0)