Skip to content

WIP: feat: depends on wait condition design #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

@SuppressWarnings("rawtypes")
public class UpdateControl<T extends HasMetadata> extends BaseControl<UpdateControl<T>> {
public class UpdateControl<P extends HasMetadata> extends BaseControl<UpdateControl<P>> {

private final T resource;
private final P resource;
private final boolean updateStatus;
private final boolean updateResource;

private UpdateControl(
T resource, boolean updateStatus, boolean updateResource) {
P resource, boolean updateStatus, boolean updateResource) {
if ((updateResource || updateStatus) && resource == null) {
throw new IllegalArgumentException("CustomResource cannot be null in case of update");
}
Expand Down Expand Up @@ -54,7 +54,7 @@ public static <T extends HasMetadata> UpdateControl<T> noUpdate() {
return new UpdateControl<>(null, false, false);
}

public T getResource() {
public P getResource() {
return resource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ public EventSource initEventSource(EventSourceContext<P> context) {
if (informerEventSource == null) {
configureWith(context.getConfigurationService(), null, null,
KubernetesDependent.ADD_OWNER_REFERENCE_DEFAULT);
log.warn("Using default configuration for " + resourceType().getSimpleName()
+ " KubernetesDependentResource, call configureWith to provide configuration");
log.warn(
"Using default configuration for {} KubernetesDependentResource, call configureWith to provide configuration",
resourceType().getSimpleName());
}
return informerEventSource;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.javaoperatorsdk.operator.processing.dependent.waitfor;

@FunctionalInterface
public interface Condition<R> {

boolean isFulfilled(R resource);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.javaoperatorsdk.operator.processing.dependent.waitfor;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;

import static java.lang.Thread.sleep;

public class ConditionChecker<R> {

public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(1);
public static final Duration DEFAULT_TIMEOUT = Duration.ZERO;
private static final int MIN_THREAD_SLEEP = 25;

private Duration pollingInterval;
private Duration timeout;
private UnfulfillmentHandler<?> unfulfillmentHandler;
private Condition<R> condition;

public static <T> ConditionChecker<T> checker() {
return new ConditionChecker<>();
}

public ConditionChecker() {
this(DEFAULT_POLLING_INTERVAL, DEFAULT_TIMEOUT, UpdateControl::noUpdate);
}

public ConditionChecker(Duration pollingInterval, Duration timeout,
UnfulfillmentHandler unfulfillmentHandler) {
this.pollingInterval = pollingInterval;
this.timeout = timeout;
this.unfulfillmentHandler = unfulfillmentHandler;
}

public <P extends HasMetadata> void check(DependentResource<R, P> resource, P primary) {
check(() -> resource.getResource(primary));
}

public void check(Supplier<Optional<R>> supplier) {
checkSetup();
Optional<R> resource = supplier.get();
if (timeout.isNegative() || timeout.isZero()) {
if (resource.isPresent() && condition.isFulfilled(resource.get())) {
return;
} else {
handleConditionNotMet();
}
}
var deadline = Instant.now().plus(timeout.toMillis(), ChronoUnit.MILLIS);
while (Instant.now().isBefore(deadline)) {
resource = supplier.get();
if (resource.isPresent() && condition.isFulfilled(resource.get())) {
return;
} else {
var timeLeft = Duration.between(Instant.now(), deadline);
if (timeLeft.isZero() || timeLeft.isNegative()) {
handleConditionNotMet();
} else {
sleepUntilNextPoll(timeLeft);
}
}
}
handleConditionNotMet();
}
Comment on lines +57 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is supposed to be called in a reconcile loop, then it is a synchronuous wait. IMHO, while-sleep-until timeout is not a good pattern especially because timeout can be very long and it will lock the loop. It can be start in a parrallel thread but then, I think the same waiting loop can be achieve with event source. No need to poll, the dependent should trigger an event when it change and the condition will eventually be met in a latter reconcile loop. We only miss to setup to trigger an event at most when deadline occurs.
Also, the sleep loop is not resilient to an operator restart. I expect the timeout to last from the creation. If the operator stop just after the creation and restart after the timeout, then it should reconcile and immediatly report a timeout and not wait for the timeout.

Copy link
Collaborator Author

@csviri csviri Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @scrocquesel , pls take a look here, that explains the use case for this:
#995 (comment)

TBH, I'm also kinda thinking also to put it there or not (I mean the sync wating), to explicitly support that use cases mentioned there. Since it's just for some small subset of operators. But as described, and alos shown in the integration tests, most condition check is async or that is perferred.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the sleep loop is not resilient to an operator restart. I expect the timeout to last from the creation. If the operator stop just after the creation and restart after the timeout, then it should reconcile and immediately report a timeout and not wait for the timeout.

That would require a state management, what we deliberatily want to avoid if possible.

Copy link
Collaborator Author

@csviri csviri Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's still a goal that on a reconciliation we reconcile every resource we manage. So if we want to have an analogy here the goal is not to have a state machine rather a workflow engine that is executed then on every reconciliation from beginning to end.
Well at least that is the idea :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @scrocquesel , pls take a look here, that explains the use case for this: #995 (comment)

TBH, I'm also kinda thinking also to put it there or not (I mean the sync wating), to explicitly support that use cases mentioned there. Since it's just for some small subset of operators. But as described, and alos shown in the integration tests, most condition check is async or that is perferred.

@csviri In the test below, what would happen if nginxNginxDeploymentDependentResource trigger an event before the checker has poll for the condition to be met ? Does a new reconcile is started on another thread or is it discarded because there is already a reconcile ongoing ? A deployment can take quite some time and the more the reconcile loop is running, the more the update is exposed to concurrency edition from the operator itself or from outside. What if an error occurs with the deployment and I want to delete the primary ? Does the check with the default infinite will be interrupted ?

Also, this is may works well for creation, but what should happen if the condition is unmet latter on. The reconcile will again wait in a loop and the status of the primary status will not reflect the actual state of the unmet condition.


private void checkSetup() {
Objects.requireNonNull(unfulfillmentHandler, "ConditionNotFulfilledHandler is not set");
Objects.requireNonNull(condition, "Condition is not set");
}

private void sleepUntilNextPoll(Duration timeLeft) {
try {
sleep(Math.max(MIN_THREAD_SLEEP, Math.min(pollingInterval.toMillis(), timeLeft.toMillis())));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread interrupted.", e);
}
}

private void handleConditionNotMet() {
throw new ConditionUnfulfilledException(unfulfillmentHandler);
}

public ConditionChecker<R> withPollingInterval(Duration pollingInterval) {
this.pollingInterval = pollingInterval;
return this;
}

public ConditionChecker<R> withTimeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public ConditionChecker<R> withUnfulfilledHandler(
UnfulfillmentHandler unFulfillmentHandler) {
this.unfulfillmentHandler = unFulfillmentHandler;
return this;
}

public ConditionChecker<R> withCondition(Condition<R> condition) {
this.condition = condition;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.javaoperatorsdk.operator.processing.dependent.waitfor;

import io.javaoperatorsdk.operator.OperatorException;

public class ConditionUnfulfilledException extends OperatorException {

private final UnfulfillmentHandler unfulfillmentHandler;

public ConditionUnfulfilledException(UnfulfillmentHandler unfulfillmentHandler) {
this.unfulfillmentHandler = unfulfillmentHandler;
}

public ConditionUnfulfilledException(String message,
UnfulfillmentHandler unfulfillmentHandler) {
super(message);
this.unfulfillmentHandler = unfulfillmentHandler;
}

public UnfulfillmentHandler getUnfulfillmentHandler() {
return unfulfillmentHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.javaoperatorsdk.operator.processing.dependent.waitfor;

import io.javaoperatorsdk.operator.api.reconciler.BaseControl;

public interface UnfulfillmentHandler<P extends BaseControl<P>> {

BaseControl<P> control();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.dependent.waitfor.ConditionUnfulfilledException;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
Expand Down Expand Up @@ -139,8 +140,12 @@ private PostExecutionControl<R> reconcileExecution(ExecutionScope<R> executionSc
getName(resourceForExecution),
getVersion(resourceForExecution),
executionScope);

UpdateControl<R> updateControl = controller.reconcile(resourceForExecution, context);
UpdateControl<R> updateControl;
try {
updateControl = controller.reconcile(resourceForExecution, context);
} catch (ConditionUnfulfilledException ex) {
updateControl = (UpdateControl<R>) ex.getUnfulfillmentHandler().control();
}
R updatedCustomResource = null;
if (updateControl.isUpdateResourceAndStatus()) {
updatedCustomResource = updateCustomResource(updateControl.getResource());
Expand Down Expand Up @@ -248,8 +253,12 @@ private PostExecutionControl<R> handleCleanup(R resource, Context context) {
"Executing delete for resource: {} with version: {}",
getName(resource),
getVersion(resource));

DeleteControl deleteControl = controller.cleanup(resource, context);
DeleteControl deleteControl;
try {
deleteControl = controller.cleanup(resource, context);
} catch (ConditionUnfulfilledException ex) {
deleteControl = (DeleteControl) ex.getUnfulfillmentHandler().control();
}
final var useFinalizer = configuration().useFinalizer();
if (useFinalizer) {
// note that we don't reschedule here even if instructed. Removing finalizer means that
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.javaoperatorsdk.operator.processing.dependent.waitfor;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.processing.dependent.waitfor.ConditionChecker.checker;

class ConditionCheckerTest {

@Test
void returnsIfDurationIsZeroAndConditionMet() {
checker()
.withTimeout(Duration.ZERO)
.withCondition(r -> true)
.check(() -> Optional.of(new TestCustomResource()));
}

@Test
void throwsExceptionIfDurationZeroConditionNotFulfilled() {
Assertions.assertThrows(
ConditionUnfulfilledException.class,
() -> new ConditionChecker<TestCustomResource>()
.withTimeout(Duration.ZERO)
.withCondition(r -> false)
.check(() -> Optional.of(new TestCustomResource())));
}

@Test
void throwsExceptionNoDurationIfResourceNotPresentWithinTimeout() {
Assertions.assertThrows(
ConditionUnfulfilledException.class,
() -> new ConditionChecker<TestCustomResource>()
.withTimeout(Duration.ZERO)
.withCondition(r -> true)
.check(Optional::empty));
}

@Test
@Timeout(value = 100, unit = TimeUnit.MILLISECONDS)
void waitsForTheConditionToFulfill() {
new ConditionChecker<TestCustomResource>()
.withTimeout(Duration.ofMillis(200))
.withPollingInterval(Duration.ofMillis(50))
.withCondition(r -> true)
.check(() -> Optional.of(new TestCustomResource()));
}

@Test
@Timeout(value = 230, unit = TimeUnit.MILLISECONDS)
void throwsExceptionIfConditionNotFulfilledWithinTimeout() {
Assertions.assertThrows(
ConditionUnfulfilledException.class, () -> new ConditionChecker<TestCustomResource>()
.withTimeout(Duration.ofMillis(200))
.withPollingInterval(Duration.ofMillis(50))
.withCondition(r -> false)
.check(() -> Optional.of(new TestCustomResource())));

}

@Test
@Timeout(value = 230, unit = TimeUnit.MILLISECONDS)
void throwsExceptionIfResourceNotPresentWithinTimeout() {
Assertions.assertThrows(
ConditionUnfulfilledException.class, () -> new ConditionChecker<TestCustomResource>()
.withTimeout(Duration.ofMillis(200))
.withPollingInterval(Duration.ofMillis(50))
.withCondition(r -> true)
.check(() -> Optional.empty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.dependent.waitfor.ConditionUnfulfilledException;
import io.javaoperatorsdk.operator.processing.dependent.waitfor.UnfulfillmentHandler;
import io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.CustomResourceFacade;
import io.javaoperatorsdk.operator.sample.observedgeneration.ObservedGenCustomResource;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
Expand Down Expand Up @@ -470,6 +472,41 @@ void canSkipSchedulingMaxDelayIf() {
assertThat(control.getReScheduleDelay()).isNotPresent();
}

@Test
void handlesConditionUnfulfilledExceptionOnReconcile() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);
var uc = UpdateControl.<TestCustomResource>noUpdate()
.rescheduleAfter(1000, TimeUnit.MILLISECONDS);
UnfulfillmentHandler<UpdateControl<TestCustomResource>> unfulfillmentHandler = () -> uc;

when(reconciler.reconcile(eq(testCustomResource), any()))
.thenThrow(new ConditionUnfulfilledException(unfulfillmentHandler));

PostExecutionControl control =
reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource));

assertThat(control.getReScheduleDelay()).isPresent();
assertThat(control.getReScheduleDelay().get()).isEqualTo(1000L);
assertThat(control.getUpdatedCustomResource()).isEmpty();
}

@Test
void handlesConditionUnfulfilledExceptionOnCleanup() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);
markForDeletion(testCustomResource);
var uc = DeleteControl.noFinalizerRemoval().rescheduleAfter(1000);
UnfulfillmentHandler<DeleteControl> unfulfillmentHandler = () -> uc;
when(reconciler.cleanup(eq(testCustomResource), any()))
.thenThrow(new ConditionUnfulfilledException(unfulfillmentHandler));

PostExecutionControl control =
reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource));

verify(reconciler, times(1)).cleanup(eq(testCustomResource), any());
assertThat(control.getReScheduleDelay()).isPresent();
assertThat(control.getReScheduleDelay().get()).isEqualTo(1000L);
}

private ObservedGenCustomResource createObservedGenCustomResource() {
ObservedGenCustomResource observedGenCustomResource = new ObservedGenCustomResource();
observedGenCustomResource.setMetadata(new ObjectMeta());
Expand Down
Loading