diff --git a/docs/content/en/docs/patterns-and-best-practices/_index.md b/docs/content/en/docs/patterns-and-best-practices/_index.md index 422f3f7bfe..a2b3b716b6 100644 --- a/docs/content/en/docs/patterns-and-best-practices/_index.md +++ b/docs/content/en/docs/patterns-and-best-practices/_index.md @@ -120,3 +120,15 @@ might be a permission issue for some resources in another namespace. The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179) behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start reconcile resources even if one or more event source caches did not sync yet. + +## Graceful Shutdown + +You can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down. +The configuration is simple. You just need to set an appropriate duration value for `reconciliationTerminationTimeout` using `ConfigurationServiceOverrider`. + +```java +final var overridden = new ConfigurationServiceOverrider(config) + .withReconciliationTerminationTimeout(Duration.ofSeconds(5)); + +final var operator = new Operator(overridden); +``` diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index d85de6b1e5..1283896a42 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -100,7 +100,7 @@ private static ConfigurationService initConfigurationService(KubernetesClient cl @SuppressWarnings("unused") public void installShutdownHook(Duration gracefulShutdownTimeout) { if (!leaderElectionManager.isLeaderElectionEnabled()) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout))); + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); } else { log.warn("Leader election is on, shutdown hook will not be installed."); } @@ -145,15 +145,18 @@ public synchronized void start() { } } - public void stop(Duration gracefulShutdownTimeout) throws OperatorException { + @Override + public void stop() throws OperatorException { + Duration reconciliationTerminationTimeout = + configurationService.reconciliationTerminationTimeout(); if (!started) { return; } - log.info( - "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); + log.info("Operator SDK {} is shutting down...", + configurationService.getVersion().getSdkVersion()); controllerManager.stop(); - configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout); + configurationService.getExecutorServiceManager().stop(reconciliationTerminationTimeout); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { getKubernetesClient().close(); @@ -162,11 +165,6 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException { started = false; } - @Override - public void stop() throws OperatorException { - stop(Duration.ZERO); - } - /** * Add a registration requests for the specified reconciler with this operator. The effective * registration of the reconciler is delayed till the operator is started. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 942d770506..0bd1607213 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -101,8 +101,8 @@ static ConfigurationService newOverriddenConfigurationService( * * @param reconciler the reconciler we want the configuration of * @param the {@code CustomResource} type associated with the specified reconciler - * @return the {@link ControllerConfiguration} associated with the specified reconciler or {@code - * null} if no configuration exists for the reconciler + * @return the {@link ControllerConfiguration} associated with the specified reconciler or + * {@code null} if no configuration exists for the reconciler */ ControllerConfiguration getConfigurationFor(Reconciler reconciler); @@ -211,7 +211,7 @@ default int concurrentWorkflowExecutorThreads() { /** * Override to provide a custom {@link Metrics} implementation - * + * * @return the {@link Metrics} implementation */ default Metrics getMetrics() { @@ -221,7 +221,7 @@ default Metrics getMetrics() { /** * Override to provide a custom {@link ExecutorService} implementation to change how threads * handle concurrent reconciliations - * + * * @return the {@link ExecutorService} implementation to use for concurrent reconciliation * processing */ @@ -232,7 +232,7 @@ default ExecutorService getExecutorService() { /** * Override to provide a custom {@link ExecutorService} implementation to change how dependent * workflows are processed in parallel - * + * * @return the {@link ExecutorService} implementation to use for dependent workflow processing */ default ExecutorService getWorkflowExecutorService() { @@ -242,7 +242,7 @@ default ExecutorService getWorkflowExecutorService() { /** * Determines whether the associated Kubernetes client should be closed when the associated * {@link io.javaoperatorsdk.operator.Operator} is stopped. - * + * * @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise */ default boolean closeClientOnStop() { @@ -252,7 +252,7 @@ default boolean closeClientOnStop() { /** * Override to provide a custom {@link DependentResourceFactory} implementation to change how * {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated - * + * * @return the custom {@link DependentResourceFactory} implementation */ @SuppressWarnings("rawtypes") @@ -264,7 +264,7 @@ default DependentResourceFactory dependentResourceFactory() { * Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated * {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one * instance of the operator runs on the cluster at any given time - * + * * @return the {@link LeaderElectionConfiguration} */ default Optional getLeaderElectionConfiguration() { @@ -299,6 +299,17 @@ default Duration cacheSyncTimeout() { return Duration.ofMinutes(2); } + /** + * This is the timeout value that allows the reconciliation threads to gracefully shut down. If no + * value is set, the default is immediate shutdown. + * + * @return The duration of time to wait before terminating the reconciliation threads + * @since 5.0.0 + */ + default Duration reconciliationTerminationTimeout() { + return Duration.ZERO; + } + /** * Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received * a resource that cannot be deserialized. @@ -326,7 +337,7 @@ default Optional getInformerStoppedHandler() { * Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how * {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are * instantiated - * + * * @return the custom {@link ManagedWorkflowFactory} implementation */ @SuppressWarnings("rawtypes") @@ -336,7 +347,7 @@ default ManagedWorkflowFactory getWorkflowFactory() { /** * Override to provide a custom {@link ExecutorServiceManager} implementation - * + * * @return the custom {@link ExecutorServiceManager} implementation */ default ExecutorServiceManager getExecutorServiceManager() { @@ -353,9 +364,8 @@ default ExecutorServiceManager getExecutorServiceManager() { * SSA based create/update can be still used with the legacy matching, just overriding the match * method of Kubernetes Dependent Resource. * - * @since 4.4.0 - * * @return if SSA should be used for dependent resources + * @since 4.4.0 */ default boolean ssaBasedCreateUpdateMatchForDependentResources() { return true; @@ -383,9 +393,8 @@ default Set> defaultNonSSAResource() { *

* Disable this if you want to react to your own dependent resource updates * - * @since 4.5.0 - * * @return if special annotation should be used for dependent resource to filter events + * @since 4.5.0 */ default boolean previousAnnotationForDependentResourcesEventFiltering() { return true; @@ -400,9 +409,8 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() { * logic, and you want to further minimize the amount of work done / updates issued by the * operator. * - * @since 4.5.0 - * * @return if resource version should be parsed (as integer) + * @since 4.5.0 */ default boolean parseResourceVersionsForEventFilteringAndCaching() { return false; @@ -415,8 +423,8 @@ default boolean parseResourceVersionsForEventFilteringAndCaching() { * * @return {@code true} if Server-Side Apply (SSA) should be used when patching the primary * resources, {@code false} otherwise - * @since 5.0.0 * @see ConfigurationServiceOverrider#withUseSSAToPatchPrimaryResource(boolean) + * @since 5.0.0 */ default boolean useSSAToPatchPrimaryResource() { return true; @@ -427,7 +435,7 @@ default boolean useSSAToPatchPrimaryResource() { * Determines whether resources retrieved from caches such as via calls to * {@link Context#getSecondaryResource(Class)} should be defensively cloned first. *

- * + * *

* Defensive cloning to prevent problematic cache modifications (modifying the resource would * otherwise modify the stored copy in the cache) was transparently done in previous JOSDK @@ -435,10 +443,9 @@ default boolean useSSAToPatchPrimaryResource() { * Server-Side Apply, where you should create a new copy of your resource with only modified * fields, such modifications of these resources are less likely to occur. *

- * + * * @return {@code true} if resources should be defensively cloned before returning them from * caches, {@code false} otherwise - * * @since 5.0.0 */ default boolean cloneSecondaryResourcesWhenGettingFromCache() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 8da14aa0c3..e35d843cb1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -32,6 +32,7 @@ public class ConfigurationServiceOverrider { private InformerStoppedHandler informerStoppedHandler; private Boolean stopOnInformerErrorDuringStartup; private Duration cacheSyncTimeout; + private Duration reconciliationTerminationTimeout; private Boolean ssaBasedCreateUpdateMatchForDependentResources; private Set> defaultNonSSAResource; private Boolean previousAnnotationForDependentResources; @@ -127,6 +128,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime return this; } + public ConfigurationServiceOverrider withReconciliationTerminationTimeout( + Duration reconciliationTerminationTimeout) { + this.reconciliationTerminationTimeout = reconciliationTerminationTimeout; + return this; + } + public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources( boolean value) { this.ssaBasedCreateUpdateMatchForDependentResources = value; @@ -251,6 +258,12 @@ public Duration cacheSyncTimeout() { return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout); } + @Override + public Duration reconciliationTerminationTimeout() { + return overriddenValueOrDefault(reconciliationTerminationTimeout, + ConfigurationService::reconciliationTerminationTimeout); + } + @Override public boolean ssaBasedCreateUpdateMatchForDependentResources() { return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources, diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java index 4c374e09f2..0e2b8e9cc2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.Executors; @@ -63,6 +64,7 @@ public R clone(R object) { .withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS")) .withInformerStoppedHandler((informer, ex) -> { }) + .withReconciliationTerminationTimeout(Duration.ofSeconds(30)) .build(); assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop()); @@ -77,6 +79,8 @@ public R clone(R object) { overridden.getLeaderElectionConfiguration()); assertNotEquals(config.getInformerStoppedHandler(), overridden.getLeaderElectionConfiguration()); + assertNotEquals(config.reconciliationTerminationTimeout(), + overridden.reconciliationTerminationTimeout()); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java index 715921ecbb..e6a38326d5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java @@ -18,26 +18,21 @@ public class GracefulStopIT { public static final String TEST_1 = "test1"; - public static final String TEST_2 = "test2"; @RegisterExtension LocallyRunOperatorExtension operator = LocallyRunOperatorExtension.builder() - .withConfigurationService(o -> o.withCloseClientOnStop(false)) + .withConfigurationService(o -> o.withCloseClientOnStop(false) + .withReconciliationTerminationTimeout(Duration.ofMillis(RECONCILER_SLEEP))) .withReconciler(new GracefulStopTestReconciler()) .build(); @Test - void stopsGracefullyWIthTimeout() { - testGracefulStop(TEST_1, RECONCILER_SLEEP, 2); + void stopsGracefullyWithTimeoutConfiguration() { + testGracefulStop(TEST_1, 2); } - @Test - void stopsGracefullyWithExpiredTimeout() { - testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1); - } - - private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) { + private void testGracefulStop(String resourceName, int expectedFinalGeneration) { var testRes = operator.create(testResource(resourceName)); await().untilAsserted(() -> { var r = operator.get(GracefulStopTestCustomResource.class, resourceName); @@ -54,7 +49,7 @@ private void testGracefulStop(String resourceName, int stopTimeout, int expected () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) .getNumberOfExecutions()).isEqualTo(2)); - operator.getOperator().stop(Duration.ofMillis(stopTimeout)); + operator.getOperator().stop(); await().untilAsserted(() -> { var r = operator.get(GracefulStopTestCustomResource.class, resourceName); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java index 3a6f4d05e9..a8288db7af 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java @@ -76,7 +76,7 @@ void beforeEach(TestInfo testInfo) { @AfterEach void cleanup() { if (operator != null) { - operator.stop(Duration.ofSeconds(1)); + operator.stop(); } adminClient.resource(dependentConfigMap()).delete(); adminClient.resource(testCustomResource()).delete(); @@ -321,6 +321,7 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop co.withKubernetesClient(clientUsingServiceAccount()); co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup); co.withCacheSyncTimeout(Duration.ofMillis(3000)); + co.withReconciliationTerminationTimeout(Duration.ofSeconds(1)); if (addStopHandler) { co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true); }