diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index 2c18fa55d3..c4854a1422 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -92,4 +92,8 @@ default String fieldManager() { } C getConfigurationFor(DependentResourceSpec spec); + + default boolean reconcileOnPrimaryDelete() { + return false; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java index e5fbaad68e..2996c813ee 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java @@ -55,6 +55,8 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) { @SuppressWarnings("unused") IndexedResourceCache

getPrimaryCache(); + boolean isPrimaryDeleted(); + /** * Determines whether a new reconciliation will be triggered right after the current * reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index d407ed0fc6..30ac9ffc4b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -77,4 +77,10 @@ MaxReconciliationInterval maxReconciliationInterval() default * @return the name used as field manager for SSA operations */ String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER; + + /** + * Will trigger reconciliation on delete event of the primary resource. Can be set to true only if + * the reconciler does not implement {@link Cleaner} interface. + */ + boolean reconcileOnPrimaryDelete() default false; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index b5ea66f8bc..6363e6892e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -24,12 +24,15 @@ public class DefaultContext

implements Context

{ private final ControllerConfiguration

controllerConfiguration; private final DefaultManagedWorkflowAndDependentResourceContext

defaultManagedDependentResourceContext; + private final boolean isPrimaryDeleted; - public DefaultContext(RetryInfo retryInfo, Controller

controller, P primaryResource) { + public DefaultContext( + RetryInfo retryInfo, Controller

controller, P primaryResource, boolean isPrimaryDeleted) { this.retryInfo = retryInfo; this.controller = controller; this.primaryResource = primaryResource; this.controllerConfiguration = controller.getConfiguration(); + this.isPrimaryDeleted = isPrimaryDeleted; this.defaultManagedDependentResourceContext = new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this); } @@ -49,6 +52,11 @@ public IndexedResourceCache

getPrimaryCache() { return controller.getEventSourceManager().getControllerEventSource(); } + @Override + public boolean isPrimaryDeleted() { + return isPrimaryDeleted; + } + @Override public boolean isNextReconciliationImminent() { return controller diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index f9af175053..360be6f20f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -43,10 +43,11 @@ public class EventProcessor

implements EventHandler, Life private final Cache

cache; private final EventSourceManager

eventSourceManager; private final RateLimiter rateLimiter; - private final ResourceStateManager resourceStateManager = new ResourceStateManager(); + private final ResourceStateManager

resourceStateManager = new ResourceStateManager<>(); private final Map metricsMetadata; private ExecutorService executor; + // todo handle/test case when there is finalizer but not ours public EventProcessor( EventSourceManager

eventSourceManager, ConfigurationService configurationService) { this( @@ -121,19 +122,24 @@ public synchronized void handleEvent(Event event) { } } - private void handleMarkedEventForResource(ResourceState state) { - if (state.deleteEventPresent()) { + private void handleMarkedEventForResource(ResourceState

state) { + if (doCleanupForDeleteEvent(state)) { cleanupForDeletedEvent(state.getId()); - } else if (!state.processedMarkForDeletionPresent()) { + } else if (!state.processedMarkForDeletionPresent() + && !state.deleteEventReconciliationSubmitted()) { submitReconciliationExecution(state); } } - private void submitReconciliationExecution(ResourceState state) { + private boolean doCleanupForDeleteEvent(ResourceState

state) { + return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete(); + } + + private void submitReconciliationExecution(ResourceState

state) { try { boolean controllerUnderExecution = isControllerUnderExecution(state); final var resourceID = state.getId(); - Optional

maybeLatest = cache.get(resourceID); + Optional

maybeLatest = getCachedResource(resourceID, state); maybeLatest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && maybeLatest.isPresent()) { var rateLimit = state.getRateLimit(); @@ -141,6 +147,7 @@ private void submitReconciliationExecution(ResourceState state) { rateLimit = rateLimiter.initState(); state.setRateLimit(rateLimit); } + // todo rate limit handling var rateLimiterPermission = rateLimiter.isLimited(rateLimit); if (rateLimiterPermission.isPresent()) { handleRateLimitedSubmission(resourceID, rateLimiterPermission.get()); @@ -148,11 +155,21 @@ private void submitReconciliationExecution(ResourceState state) { } state.setUnderProcessing(true); final var latest = maybeLatest.get(); - ExecutionScope

executionScope = new ExecutionScope<>(state.getRetry()); - state.unMarkEventReceived(); + ExecutionScope

executionScope = + new ExecutionScope<>(state.getRetry(), state.deleteEventPresent()); + + if (state.deleteEventPresent()) { + state.markDeleteEventReconciliationSubmitted(); + } else if (!state.deleteEventReconciliationSubmitted()) { // if there is a retry + state.unMarkEventReceived(); + } metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata); log.debug("Executing events for custom resource. Scope: {}", executionScope); - executor.execute(new ReconcilerExecutor(resourceID, executionScope)); + executor.execute( + new ReconcilerExecutor( + resourceID, + executionScope, + state.deleteEventReconciliationSubmitted() ? latest : null)); } else { log.debug( "Skipping executing controller for resource id: {}. Controller in execution: {}. Latest" @@ -164,7 +181,7 @@ private void submitReconciliationExecution(ResourceState state) { // there can be multiple reasons why the primary resource is not present, one is that the // informer is currently disconnected from k8s api server, but will eventually receive the // resource. Other is that simply there is no primary resource present for an event, this - // might indicate issue with the implementation, but could happen also naturally, thus + // might indicate an issue with the implementation, but could happen also naturally, thus // this is not necessarily a problem. log.debug("no primary resource found in cache with resource id: {}", resourceID); } @@ -174,17 +191,35 @@ private void submitReconciliationExecution(ResourceState state) { } } - private void handleEventMarking(Event event, ResourceState state) { + private Optional

getCachedResource(ResourceID resourceID, ResourceState

state) { + var resource = cache.get(resourceID); + if (resource.isPresent()) { + return resource; + } + if (controllerConfiguration.reconcileOnPrimaryDelete() + && (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted())) { + return Optional.of(state.getDeletedResource()); + } + return Optional.empty(); + } + + @SuppressWarnings("unchecked") + private void handleEventMarking(Event event, ResourceState

state) { final var relatedCustomResourceID = event.getRelatedCustomResourceID(); if (event instanceof ResourceEvent resourceEvent) { if (resourceEvent.getAction() == ResourceAction.DELETED) { log.debug("Marking delete event received for: {}", relatedCustomResourceID); - state.markDeleteEventReceived(); + // todo check can there be delete event without resource? + state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow()); } else { - if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) { + if (state.deleteEventReconciliationSubmitted() + || (state.processedMarkForDeletionPresent() + && isResourceMarkedForDeletion(resourceEvent))) { log.debug( - "Skipping mark of event received, since already processed mark for deletion and" - + " resource marked for deletion: {}", + "Skipping mark of event received, delete event reconciliation submitted ({}), or" + + " marked for deletion but already processed mark. resource marked for deletion:" + + " {}", + state.deleteEventReconciliationSubmitted(), relatedCustomResourceID); return; } @@ -196,18 +231,21 @@ private void handleEventMarking(Event event, ResourceState state) { // event as below. markEventReceived(state); } - } else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) { + } else if (!state.deleteEventPresent() + && !state.processedMarkForDeletionPresent() + && !state.deleteEventReconciliationSubmitted()) { markEventReceived(state); } else if (log.isDebugEnabled()) { log.debug( "Skipped marking event as received. Delete event present: {}, processed mark for" - + " deletion: {}", + + " deletion: {}, delete event reconciliation submitted: {}", state.deleteEventPresent(), - state.processedMarkForDeletionPresent()); + state.processedMarkForDeletionPresent(), + state.deleteEventReconciliationSubmitted()); } } - private void markEventReceived(ResourceState state) { + private void markEventReceived(ResourceState

state) { log.debug("Marking event received for: {}", state.getId()); state.markEventReceived(); } @@ -251,7 +289,7 @@ synchronized void eventProcessingFinished( } cleanupOnSuccessfulExecution(executionScope); metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata); - if (state.deleteEventPresent()) { + if (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted()) { cleanupForDeletedEvent(executionScope.getResourceID()); } else if (postExecutionControl.isFinalizerRemoved()) { state.markProcessedMarkForDeletion(); @@ -383,7 +421,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope

executionScope) { retryEventSource().cancelOnceSchedule(executionScope.getResourceID()); } - private ResourceState getOrInitRetryExecution(ExecutionScope

executionScope) { + private ResourceState

getOrInitRetryExecution(ExecutionScope

executionScope) { final var state = resourceStateManager.getOrCreate(executionScope.getResourceID()); RetryExecution retryExecution = state.getRetry(); if (retryExecution == null) { @@ -399,7 +437,7 @@ private void cleanupForDeletedEvent(ResourceID resourceID) { metrics.cleanupDoneFor(resourceID, metricsMetadata); } - private boolean isControllerUnderExecution(ResourceState state) { + private boolean isControllerUnderExecution(ResourceState

state) { return state.isUnderProcessing(); } @@ -443,10 +481,13 @@ private void handleAlreadyMarkedEvents() { private class ReconcilerExecutor implements Runnable { private final ExecutionScope

executionScope; private final ResourceID resourceID; + private final P deleteEventResource; - private ReconcilerExecutor(ResourceID resourceID, ExecutionScope

executionScope) { + private ReconcilerExecutor( + ResourceID resourceID, ExecutionScope

executionScope, P deleteEventResource) { this.executionScope = executionScope; this.resourceID = resourceID; + this.deleteEventResource = deleteEventResource; } @Override @@ -462,7 +503,9 @@ public void run() { final var thread = Thread.currentThread(); final var name = thread.getName(); try { - var actualResource = cache.get(resourceID); + + var actualResource = + deleteEventResource != null ? Optional.of(deleteEventResource) : cache.get(resourceID); if (actualResource.isEmpty()) { log.debug("Skipping execution; primary resource missing from cache: {}", resourceID); return; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java index 90899a6e1a..faac3a8deb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java @@ -8,9 +8,11 @@ class ExecutionScope { // the latest custom resource from cache private R resource; private final RetryInfo retryInfo; + private boolean isPrimaryDeleted = false; - ExecutionScope(RetryInfo retryInfo) { + ExecutionScope(RetryInfo retryInfo, boolean isPrimaryDeleted) { this.retryInfo = retryInfo; + this.isPrimaryDeleted = isPrimaryDeleted; } public ExecutionScope setResource(R resource) { @@ -42,4 +44,12 @@ public String toString() { public RetryInfo getRetryInfo() { return retryInfo; } + + public void setPrimaryDeleted(boolean primaryDeleted) { + isPrimaryDeleted = primaryDeleted; + } + + public boolean isPrimaryDeleted() { + return isPrimaryDeleted; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index ee861982b1..08b2e608aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -90,7 +90,11 @@ private PostExecutionControl

handleDispatch(ExecutionScope

executionScope) } Context

context = - new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution); + new DefaultContext<>( + executionScope.getRetryInfo(), + controller, + resourceForExecution, + executionScope.isPrimaryDeleted()); if (markedForDeletion) { return handleCleanup(resourceForExecution, originalResource, context); } else { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java index 5d4e74d681..b963f94161 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -1,9 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; -class ResourceState { +class ResourceState

{ /** * Manages the state of received events. Basically there can be only three distinct states @@ -21,6 +22,7 @@ private enum EventingState { PROCESSED_MARK_FOR_DELETION, /** Delete event present, from this point other events are not relevant */ DELETE_EVENT_PRESENT, + DELETE_EVENT_RECONCILIATION_SUBMITTED } private final ResourceID id; @@ -30,6 +32,8 @@ private enum EventingState { private EventingState eventing; private RateLimitState rateLimit; + private P deletedResource; + public ResourceState(ResourceID id) { this.id = id; eventing = EventingState.NO_EVENT_PRESENT; @@ -63,8 +67,13 @@ public void setUnderProcessing(boolean underProcessing) { this.underProcessing = underProcessing; } - public void markDeleteEventReceived() { + public void markDeleteEventReceived(P deletedResource) { eventing = EventingState.DELETE_EVENT_PRESENT; + this.deletedResource = deletedResource; + } + + public void markDeleteEventReconciliationSubmitted() { + this.eventing = EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED; } public boolean deleteEventPresent() { @@ -75,8 +84,16 @@ public boolean processedMarkForDeletionPresent() { return eventing == EventingState.PROCESSED_MARK_FOR_DELETION; } + public boolean deleteEventReconciliationSubmitted() { + return eventing == EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED; + } + + public P getDeletedResource() { + return deletedResource; + } + public void markEventReceived() { - if (deleteEventPresent()) { + if (deleteEventPresent() || deleteEventReconciliationSubmitted()) { throw new IllegalStateException("Cannot receive event after a delete event received"); } eventing = EventingState.EVENT_PRESENT; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java index 6932e1ca5e..87051f028e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java @@ -5,17 +5,19 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -class ResourceStateManager { +import io.fabric8.kubernetes.api.model.HasMetadata; + +class ResourceStateManager

{ // maybe we should have a way for users to specify a hint on the amount of CRs their reconciler // will process to avoid under- or over-sizing the state maps and avoid too many resizing that // take time and memory? - private final Map states = new ConcurrentHashMap<>(100); + private final Map> states = new ConcurrentHashMap<>(100); - public ResourceState getOrCreate(ResourceID resourceID) { + public ResourceState

getOrCreate(ResourceID resourceID) { return states.computeIfAbsent(resourceID, ResourceState::new); } - public ResourceState remove(ResourceID resourceID) { + public ResourceState

remove(ResourceID resourceID) { return states.remove(resourceID); } @@ -23,7 +25,7 @@ public boolean contains(ResourceID resourceID) { return states.containsKey(resourceID); } - public List resourcesWithEventPresent() { + public List> resourcesWithEventPresent() { return states.values().stream() .filter(state -> !state.noEventPresent()) .collect(Collectors.toList());