Skip to content

feat: support for graceful shutdown based on configuration #2479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content/en/docs/patterns-and-best-practices/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,15 @@ might be a permission issue for some resources in another namespace.
The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179)
behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start
reconcile resources even if one or more event source caches did not sync yet.

## Graceful Shutdown

You can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down.
The configuration is simple. You just need to set an appropriate duration value for `reconciliationTerminationTimeout` using `ConfigurationServiceOverrider`.

```java
final var overridden = new ConfigurationServiceOverrider(config)
.withReconciliationTerminationTimeout(Duration.ofSeconds(5));

final var operator = new Operator(overridden);
```
Copy link
Contributor Author

@10000-ki 10000-ki Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, it is beneficial for users to be aware of this feature, so I have documented it.

However, in most users' cases, the graceful shutdown feature may not be necessary.
If you think this documentation is unnecessary, please let me know and I will remove it.

Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static ConfigurationService initConfigurationService(KubernetesClient cl
@SuppressWarnings("unused")
public void installShutdownHook(Duration gracefulShutdownTimeout) {
if (!leaderElectionManager.isLeaderElectionEnabled()) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} else {
log.warn("Leader election is on, shutdown hook will not be installed.");
}
Expand Down Expand Up @@ -145,15 +145,18 @@ public synchronized void start() {
}
}

public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
@Override
public void stop() throws OperatorException {
Duration reconciliationTerminationTimeout =
configurationService.reconciliationTerminationTimeout();
if (!started) {
return;
}
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
log.info("Operator SDK {} is shutting down...",
configurationService.getVersion().getSdkVersion());
controllerManager.stop();

configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout);
configurationService.getExecutorServiceManager().stop(reconciliationTerminationTimeout);
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
getKubernetesClient().close();
Expand All @@ -162,11 +165,6 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
started = false;
}

@Override
public void stop() throws OperatorException {
stop(Duration.ZERO);
}

/**
* Add a registration requests for the specified reconciler with this operator. The effective
* registration of the reconciler is delayed till the operator is started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ static ConfigurationService newOverriddenConfigurationService(
*
* @param reconciler the reconciler we want the configuration of
* @param <R> the {@code CustomResource} type associated with the specified reconciler
* @return the {@link ControllerConfiguration} associated with the specified reconciler or {@code
* null} if no configuration exists for the reconciler
* @return the {@link ControllerConfiguration} associated with the specified reconciler or
* {@code null} if no configuration exists for the reconciler
*/
<R extends HasMetadata> ControllerConfiguration<R> getConfigurationFor(Reconciler<R> reconciler);

Expand Down Expand Up @@ -211,7 +211,7 @@ default int concurrentWorkflowExecutorThreads() {

/**
* Override to provide a custom {@link Metrics} implementation
*
*
* @return the {@link Metrics} implementation
*/
default Metrics getMetrics() {
Expand All @@ -221,7 +221,7 @@ default Metrics getMetrics() {
/**
* Override to provide a custom {@link ExecutorService} implementation to change how threads
* handle concurrent reconciliations
*
*
* @return the {@link ExecutorService} implementation to use for concurrent reconciliation
* processing
*/
Expand All @@ -232,7 +232,7 @@ default ExecutorService getExecutorService() {
/**
* Override to provide a custom {@link ExecutorService} implementation to change how dependent
* workflows are processed in parallel
*
*
* @return the {@link ExecutorService} implementation to use for dependent workflow processing
*/
default ExecutorService getWorkflowExecutorService() {
Expand All @@ -242,7 +242,7 @@ default ExecutorService getWorkflowExecutorService() {
/**
* Determines whether the associated Kubernetes client should be closed when the associated
* {@link io.javaoperatorsdk.operator.Operator} is stopped.
*
*
* @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise
*/
default boolean closeClientOnStop() {
Expand All @@ -252,7 +252,7 @@ default boolean closeClientOnStop() {
/**
* Override to provide a custom {@link DependentResourceFactory} implementation to change how
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated
*
*
* @return the custom {@link DependentResourceFactory} implementation
*/
@SuppressWarnings("rawtypes")
Expand All @@ -264,7 +264,7 @@ default DependentResourceFactory dependentResourceFactory() {
* Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated
* {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one
* instance of the operator runs on the cluster at any given time
*
*
* @return the {@link LeaderElectionConfiguration}
*/
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
Expand Down Expand Up @@ -299,6 +299,17 @@ default Duration cacheSyncTimeout() {
return Duration.ofMinutes(2);
}

/**
* This is the timeout value that allows the reconciliation threads to gracefully shut down. If no
* value is set, the default is immediate shutdown.
*
* @return The duration of time to wait before terminating the reconciliation threads
* @since 5.0.0
*/
default Duration reconciliationTerminationTimeout() {
return Duration.ZERO;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is immediate shutdown

}

/**
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
* a resource that cannot be deserialized.
Expand Down Expand Up @@ -326,7 +337,7 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
* Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how
* {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are
* instantiated
*
*
* @return the custom {@link ManagedWorkflowFactory} implementation
*/
@SuppressWarnings("rawtypes")
Expand All @@ -336,7 +347,7 @@ default ManagedWorkflowFactory getWorkflowFactory() {

/**
* Override to provide a custom {@link ExecutorServiceManager} implementation
*
*
* @return the custom {@link ExecutorServiceManager} implementation
*/
default ExecutorServiceManager getExecutorServiceManager() {
Expand All @@ -353,9 +364,8 @@ default ExecutorServiceManager getExecutorServiceManager() {
* SSA based create/update can be still used with the legacy matching, just overriding the match
* method of Kubernetes Dependent Resource.
*
* @since 4.4.0
*
* @return if SSA should be used for dependent resources
* @since 4.4.0
*/
default boolean ssaBasedCreateUpdateMatchForDependentResources() {
return true;
Expand Down Expand Up @@ -383,9 +393,8 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
* <p>
* Disable this if you want to react to your own dependent resource updates
*
* @since 4.5.0
*
* @return if special annotation should be used for dependent resource to filter events
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResourcesEventFiltering() {
return true;
Expand All @@ -400,9 +409,8 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() {
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
*
* @since 4.5.0
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
Expand All @@ -415,8 +423,8 @@ default boolean parseResourceVersionsForEventFilteringAndCaching() {
*
* @return {@code true} if Server-Side Apply (SSA) should be used when patching the primary
* resources, {@code false} otherwise
* @since 5.0.0
* @see ConfigurationServiceOverrider#withUseSSAToPatchPrimaryResource(boolean)
* @since 5.0.0
*/
default boolean useSSAToPatchPrimaryResource() {
return true;
Expand All @@ -427,18 +435,17 @@ default boolean useSSAToPatchPrimaryResource() {
* Determines whether resources retrieved from caches such as via calls to
* {@link Context#getSecondaryResource(Class)} should be defensively cloned first.
* </p>
*
*
* <p>
* Defensive cloning to prevent problematic cache modifications (modifying the resource would
* otherwise modify the stored copy in the cache) was transparently done in previous JOSDK
* versions. This might have performance consequences and, with the more prevalent use of
* Server-Side Apply, where you should create a new copy of your resource with only modified
* fields, such modifications of these resources are less likely to occur.
* </p>
*
*
* @return {@code true} if resources should be defensively cloned before returning them from
* caches, {@code false} otherwise
*
* @since 5.0.0
*/
default boolean cloneSecondaryResourcesWhenGettingFromCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ConfigurationServiceOverrider {
private InformerStoppedHandler informerStoppedHandler;
private Boolean stopOnInformerErrorDuringStartup;
private Duration cacheSyncTimeout;
private Duration reconciliationTerminationTimeout;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
private Boolean previousAnnotationForDependentResources;
Expand Down Expand Up @@ -127,6 +128,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime
return this;
}

public ConfigurationServiceOverrider withReconciliationTerminationTimeout(
Duration reconciliationTerminationTimeout) {
this.reconciliationTerminationTimeout = reconciliationTerminationTimeout;
return this;
}

public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources(
boolean value) {
this.ssaBasedCreateUpdateMatchForDependentResources = value;
Expand Down Expand Up @@ -251,6 +258,12 @@ public Duration cacheSyncTimeout() {
return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout);
}

@Override
public Duration reconciliationTerminationTimeout() {
return overriddenValueOrDefault(reconciliationTerminationTimeout,
ConfigurationService::reconciliationTerminationTimeout);
}

@Override
public boolean ssaBasedCreateUpdateMatchForDependentResources() {
return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -63,6 +64,7 @@ public <R extends HasMetadata> R clone(R object) {
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
.withInformerStoppedHandler((informer, ex) -> {
})
.withReconciliationTerminationTimeout(Duration.ofSeconds(30))
.build();

assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
Expand All @@ -77,6 +79,8 @@ public <R extends HasMetadata> R clone(R object) {
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.getInformerStoppedHandler(),
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.reconciliationTerminationTimeout(),
overridden.reconciliationTerminationTimeout());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,21 @@
public class GracefulStopIT {

public static final String TEST_1 = "test1";
public static final String TEST_2 = "test2";

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withConfigurationService(o -> o.withCloseClientOnStop(false))
.withConfigurationService(o -> o.withCloseClientOnStop(false)
.withReconciliationTerminationTimeout(Duration.ofMillis(RECONCILER_SLEEP)))
.withReconciler(new GracefulStopTestReconciler())
.build();

@Test
void stopsGracefullyWIthTimeout() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also corrected a typo in the method name while I was at it.

testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
void stopsGracefullyWithTimeoutConfiguration() {
testGracefulStop(TEST_1, 2);
}

@Test
void stopsGracefullyWithExpiredTimeout() {
testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1);
}

private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) {
private void testGracefulStop(String resourceName, int expectedFinalGeneration) {
var testRes = operator.create(testResource(resourceName));
await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
Expand All @@ -54,7 +49,7 @@ private void testGracefulStop(String resourceName, int stopTimeout, int expected
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
.getNumberOfExecutions()).isEqualTo(2));

operator.getOperator().stop(Duration.ofMillis(stopTimeout));
operator.getOperator().stop();

await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void beforeEach(TestInfo testInfo) {
@AfterEach
void cleanup() {
if (operator != null) {
operator.stop(Duration.ofSeconds(1));
operator.stop();
}
adminClient.resource(dependentConfigMap()).delete();
adminClient.resource(testCustomResource()).delete();
Expand Down Expand Up @@ -321,6 +321,7 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop
co.withKubernetesClient(clientUsingServiceAccount());
co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup);
co.withCacheSyncTimeout(Duration.ofMillis(3000));
co.withReconciliationTerminationTimeout(Duration.ofSeconds(1));
if (addStopHandler) {
co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true);
}
Expand Down
Loading