diff --git a/pom.xml b/pom.xml
index 0624236f7f..c577bb1491 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 3.2.1-SNAPSHOT
+ 3.2.1-GH-2782-SNAPSHOT
Spring Data Redis
Spring Data module for Redis
diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
index cc083f14d3..ab98979117 100644
--- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
+++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
@@ -370,7 +370,7 @@ private void lazyListen() {
State state = this.state.get();
CompletableFuture futureToAwait = state.isPrepareListening() ? containerListenFuture
- : lazyListen(this.backOff.start());
+ : lazyListen(new InitialBackoffExecution(this.backOff.start()));
try {
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
@@ -531,8 +531,7 @@ private void awaitRegistrationTime(CompletableFuture future) {
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
- } catch (ExecutionException | TimeoutException ignore) {
- }
+ } catch (ExecutionException | TimeoutException ignore) {}
}
@Override
@@ -876,7 +875,7 @@ protected void handleSubscriptionException(CompletableFuture future, BackO
if (recoveryInterval != BackOffExecution.STOP) {
String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms",
- cause, recoveryInterval);
+ cause, recoveryInterval);
logger.error(message, cause);
}
@@ -885,8 +884,13 @@ protected void handleSubscriptionException(CompletableFuture future, BackO
Runnable recoveryFunction = () -> {
- CompletableFuture lazyListen = lazyListen(backOffExecution);
- lazyListen.whenComplete(propagate(future));
+ CompletableFuture lazyListen = lazyListen(new RecoveryBackoffExecution(backOffExecution));
+ lazyListen.whenComplete(propagate(future)).thenRun(() -> {
+
+ if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution) {
+ logger.info("Subscription(s) recovered");
+ }
+ });
};
if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
@@ -980,7 +984,7 @@ private boolean hasTopics() {
private Subscriber getRequiredSubscriber() {
Assert.state(this.subscriber != null,
- "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
+ "Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called");
return this.subscriber;
}
@@ -1018,6 +1022,54 @@ private void logTrace(Supplier message) {
}
}
+ BackOffExecution nextBackoffExecution(BackOffExecution backOffExecution, boolean subscribed) {
+
+ if (subscribed) {
+ return new RecoveryAfterSubscriptionBackoffExecution(backOff.start());
+ }
+
+ return backOffExecution;
+ }
+
+ /**
+ * Marker for an initial backoff.
+ *
+ * @param delegate
+ */
+ record InitialBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
+
+ @Override
+ public long nextBackOff() {
+ return delegate.nextBackOff();
+ }
+ }
+
+ /**
+ * Marker for a recovery after a subscription has been active previously.
+ *
+ * @param delegate
+ */
+ record RecoveryAfterSubscriptionBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
+
+ @Override
+ public long nextBackOff() {
+ return delegate.nextBackOff();
+ }
+ }
+
+ /**
+ * Marker for a recovery execution.
+ *
+ * @param delegate
+ */
+ record RecoveryBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
+
+ @Override
+ public long nextBackOff() {
+ return delegate.nextBackOff();
+ }
+ }
+
/**
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
* {@code channel or pattern}, and {@code count} and returns no result.
@@ -1191,7 +1243,7 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col
if (connection.isSubscribed()) {
initFuture.completeExceptionally(
- new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
+ new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
return initFuture;
}
@@ -1199,10 +1251,15 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col
try {
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
} catch (Throwable t) {
- handleSubscriptionException(initFuture, backOffExecution, t);
+ handleSubscriptionException(initFuture, nextBackoffExecution(backOffExecution, connection.isSubscribed()),
+ t);
}
} catch (RuntimeException ex) {
- initFuture.completeExceptionally(ex);
+ if (backOffExecution instanceof InitialBackoffExecution) {
+ initFuture.completeExceptionally(ex);
+ } else {
+ handleSubscriptionException(initFuture, backOffExecution, ex);
+ }
}
return initFuture;
@@ -1215,8 +1272,9 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
CompletableFuture subscriptionDone, Collection patterns, Collection channels) {
- addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
- () -> subscriptionDone.complete(null)));
+ addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> {
+ subscriptionDone.complete(null);
+ }));
doSubscribe(connection, patterns, channels);
}
@@ -1381,7 +1439,10 @@ private void doWithSubscription(byte[][] data, BiConsumer { runner.run(); return null; });
+ doInLock(() -> {
+ runner.run();
+ return null;
+ });
}
private T doInLock(Supplier supplier) {
@@ -1432,7 +1493,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
try {
subscribeChannel(channels.toArray(new byte[0][]));
} catch (Exception ex) {
- handleSubscriptionException(subscriptionDone, backOffExecution, ex);
+ handleSubscriptionException(subscriptionDone, nextBackoffExecution(backOffExecution, true), ex);
}
}));
} else {
@@ -1449,7 +1510,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
closeConnection();
unsubscribeFuture.complete(null);
} catch (Throwable cause) {
- handleSubscriptionException(subscriptionDone, backOffExecution, cause);
+ handleSubscriptionException(subscriptionDone,
+ nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause);
}
});
}
diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java
index ebb268dcc8..5c82c1da82 100644
--- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java
@@ -19,10 +19,15 @@
import static org.mockito.Mockito.*;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
@@ -32,6 +37,7 @@
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
+import org.springframework.util.backoff.FixedBackOff;
/**
* Unit tests for {@link RedisMessageListenerContainer}.
@@ -110,6 +116,7 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
@Test // GH-2335
void containerStartShouldReportFailureOnRedisUnavailability() {
+
when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh"));
doAnswer(it -> {
@@ -147,6 +154,70 @@ void containerListenShouldReportFailureOnRedisUnavailability() {
assertThat(container.isListening()).isFalse();
}
+ @Test // GH-2335
+ void shouldRecoverFromConnectionFailure() throws Exception {
+
+ AtomicInteger requestCount = new AtomicInteger();
+ AtomicBoolean shouldThrowSubscriptionException = new AtomicBoolean();
+
+ container = new RedisMessageListenerContainer();
+ container.setConnectionFactory(connectionFactoryMock);
+ container.setBeanName("container");
+ container.setTaskExecutor(new SyncTaskExecutor());
+ container.setSubscriptionExecutor(new SimpleAsyncTaskExecutor());
+ container.setMaxSubscriptionRegistrationWaitingTime(1000);
+ container.setRecoveryBackoff(new FixedBackOff(1, 5));
+ container.afterPropertiesSet();
+
+ doAnswer(it -> {
+
+ int req = requestCount.incrementAndGet();
+ if (req == 1 || req == 3) {
+ return connectionMock;
+ }
+
+ throw new RedisConnectionFailureException("Booh");
+ }).when(connectionFactoryMock).getConnection();
+
+ CountDownLatch exceptionWait = new CountDownLatch(1);
+ CountDownLatch armed = new CountDownLatch(1);
+ CountDownLatch recoveryArmed = new CountDownLatch(1);
+
+ doAnswer(it -> {
+
+ SubscriptionListener listener = it.getArgument(0);
+ when(connectionMock.isSubscribed()).thenReturn(true);
+
+ listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
+
+ armed.countDown();
+ exceptionWait.await();
+
+ if (shouldThrowSubscriptionException.compareAndSet(true, false)) {
+ when(connectionMock.isSubscribed()).thenReturn(false);
+ throw new RedisConnectionFailureException("Disconnected");
+ }
+
+ recoveryArmed.countDown();
+
+ return null;
+ }).when(connectionMock).subscribe(any(), any());
+
+ container.start();
+ container.addMessageListener(new MessageListenerAdapter(handler), new ChannelTopic("a"));
+ armed.await();
+
+ // let an exception happen
+ shouldThrowSubscriptionException.set(true);
+ exceptionWait.countDown();
+
+ // wait for subscription recovery
+ recoveryArmed.await();
+
+ assertThat(recoveryArmed.getCount()).isZero();
+
+ }
+
@Test // GH-964
void failsOnDuplicateInit() {
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());