Skip to content

Differentiate between initial exception handling, recovery and recovery after subscription #2808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.1-SNAPSHOT</version>
<version>3.2.1-GH-2782-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ private void lazyListen() {
State state = this.state.get();

CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture
: lazyListen(this.backOff.start());
: lazyListen(new InitialBackoffExecution(this.backOff.start()));

try {
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -531,8 +531,7 @@ private void awaitRegistrationTime(CompletableFuture<Void> future) {
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException ignore) {
}
} catch (ExecutionException | TimeoutException ignore) {}
}

@Override
Expand Down Expand Up @@ -876,7 +875,7 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO

if (recoveryInterval != BackOffExecution.STOP) {
String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms",
cause, recoveryInterval);
cause, recoveryInterval);
logger.error(message, cause);
}

Expand All @@ -885,8 +884,13 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO

Runnable recoveryFunction = () -> {

CompletableFuture<Void> lazyListen = lazyListen(backOffExecution);
lazyListen.whenComplete(propagate(future));
CompletableFuture<Void> lazyListen = lazyListen(new RecoveryBackoffExecution(backOffExecution));
lazyListen.whenComplete(propagate(future)).thenRun(() -> {

if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution) {
logger.info("Subscription(s) recovered");
}
});
};

if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
Expand Down Expand Up @@ -980,7 +984,7 @@ private boolean hasTopics() {
private Subscriber getRequiredSubscriber() {

Assert.state(this.subscriber != null,
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called");

return this.subscriber;
}
Expand Down Expand Up @@ -1018,6 +1022,54 @@ private void logTrace(Supplier<String> message) {
}
}

BackOffExecution nextBackoffExecution(BackOffExecution backOffExecution, boolean subscribed) {

if (subscribed) {
return new RecoveryAfterSubscriptionBackoffExecution(backOff.start());
}

return backOffExecution;
}

/**
* Marker for an initial backoff.
*
* @param delegate
*/
record InitialBackoffExecution(BackOffExecution delegate) implements BackOffExecution {

@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}

/**
* Marker for a recovery after a subscription has been active previously.
*
* @param delegate
*/
record RecoveryAfterSubscriptionBackoffExecution(BackOffExecution delegate) implements BackOffExecution {

@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}

/**
* Marker for a recovery execution.
*
* @param delegate
*/
record RecoveryBackoffExecution(BackOffExecution delegate) implements BackOffExecution {

@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}

/**
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
* {@code channel or pattern}, and {@code count} and returns no result.
Expand Down Expand Up @@ -1191,18 +1243,23 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
if (connection.isSubscribed()) {

initFuture.completeExceptionally(
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));

return initFuture;
}

try {
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
} catch (Throwable t) {
handleSubscriptionException(initFuture, backOffExecution, t);
handleSubscriptionException(initFuture, nextBackoffExecution(backOffExecution, connection.isSubscribed()),
t);
}
} catch (RuntimeException ex) {
initFuture.completeExceptionally(ex);
if (backOffExecution instanceof InitialBackoffExecution) {
initFuture.completeExceptionally(ex);
} else {
handleSubscriptionException(initFuture, backOffExecution, ex);
}
}

return initFuture;
Expand All @@ -1215,8 +1272,9 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {

addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> {
subscriptionDone.complete(null);
}));

doSubscribe(connection, patterns, channels);
}
Expand Down Expand Up @@ -1381,7 +1439,10 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
}

private void doInLock(Runnable runner) {
doInLock(() -> { runner.run(); return null; });
doInLock(() -> {
runner.run();
return null;
});
}

private <T> T doInLock(Supplier<T> supplier) {
Expand Down Expand Up @@ -1432,7 +1493,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
try {
subscribeChannel(channels.toArray(new byte[0][]));
} catch (Exception ex) {
handleSubscriptionException(subscriptionDone, backOffExecution, ex);
handleSubscriptionException(subscriptionDone, nextBackoffExecution(backOffExecution, true), ex);
}
}));
} else {
Expand All @@ -1449,7 +1510,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
closeConnection();
unsubscribeFuture.complete(null);
} catch (Throwable cause) {
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
handleSubscriptionException(subscriptionDone,
nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import static org.mockito.Mockito.*;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
Expand All @@ -32,6 +37,7 @@
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.util.backoff.FixedBackOff;

/**
* Unit tests for {@link RedisMessageListenerContainer}.
Expand Down Expand Up @@ -110,6 +116,7 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
@Test // GH-2335
void containerStartShouldReportFailureOnRedisUnavailability() {


when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh"));

doAnswer(it -> {
Expand Down Expand Up @@ -147,6 +154,70 @@ void containerListenShouldReportFailureOnRedisUnavailability() {
assertThat(container.isListening()).isFalse();
}

@Test // GH-2335
void shouldRecoverFromConnectionFailure() throws Exception {

AtomicInteger requestCount = new AtomicInteger();
AtomicBoolean shouldThrowSubscriptionException = new AtomicBoolean();

container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactoryMock);
container.setBeanName("container");
container.setTaskExecutor(new SyncTaskExecutor());
container.setSubscriptionExecutor(new SimpleAsyncTaskExecutor());
container.setMaxSubscriptionRegistrationWaitingTime(1000);
container.setRecoveryBackoff(new FixedBackOff(1, 5));
container.afterPropertiesSet();

doAnswer(it -> {

int req = requestCount.incrementAndGet();
if (req == 1 || req == 3) {
return connectionMock;
}

throw new RedisConnectionFailureException("Booh");
}).when(connectionFactoryMock).getConnection();

CountDownLatch exceptionWait = new CountDownLatch(1);
CountDownLatch armed = new CountDownLatch(1);
CountDownLatch recoveryArmed = new CountDownLatch(1);

doAnswer(it -> {

SubscriptionListener listener = it.getArgument(0);
when(connectionMock.isSubscribed()).thenReturn(true);

listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);

armed.countDown();
exceptionWait.await();

if (shouldThrowSubscriptionException.compareAndSet(true, false)) {
when(connectionMock.isSubscribed()).thenReturn(false);
throw new RedisConnectionFailureException("Disconnected");
}

recoveryArmed.countDown();

return null;
}).when(connectionMock).subscribe(any(), any());

container.start();
container.addMessageListener(new MessageListenerAdapter(handler), new ChannelTopic("a"));
armed.await();

// let an exception happen
shouldThrowSubscriptionException.set(true);
exceptionWait.countDown();

// wait for subscription recovery
recoveryArmed.await();

assertThat(recoveryArmed.getCount()).isZero();

}

@Test // GH-964
void failsOnDuplicateInit() {
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());
Expand Down