Skip to content

Event filter fix #1129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ public interface RecentOperationEventFilter<R> extends RecentOperationCacheFille

void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource);

void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID, R resource);
void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID);

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.javaoperatorsdk.operator.processing.dependent;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
Expand All @@ -14,7 +12,6 @@ public abstract class AbstractEventSourceHolderDependentResource<R, P extends Ha
extends AbstractDependentResource<R, P>
implements EventSourceProvider<P> {
private T eventSource;
private boolean isFilteringEventSource;
private boolean isCacheFillerEventSource;

public EventSource initEventSource(EventSourceContext<P> context) {
Expand All @@ -25,9 +22,6 @@ public EventSource initEventSource(EventSourceContext<P> context) {
eventSource = createEventSource(context);
}

// but we still need to record which interfaces the event source implements even if it has
// already been set before this method is called
isFilteringEventSource = eventSource instanceof RecentOperationEventFilter;
isCacheFillerEventSource = eventSource instanceof RecentOperationCacheFiller;
return eventSource;
}
Expand All @@ -42,33 +36,6 @@ protected T eventSource() {
return eventSource;
}

protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R created = null;
try {
prepareEventFiltering(desired, resourceID);
created = super.handleCreate(desired, primary, context);
return created;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, created);
throw e;
}
}

protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(primary);
R updated = null;
try {
prepareEventFiltering(desired, resourceID);
updated = super.handleUpdate(actual, desired, primary, context);
return updated;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(desired, resourceID, updated);
throw e;
}
}


protected void onCreated(ResourceID primaryResourceId, R created) {
if (isCacheFillerEventSource) {
recentOperationCacheFiller().handleRecentResourceCreate(primaryResourceId, created);
Expand All @@ -81,22 +48,6 @@ protected void onUpdated(ResourceID primaryResourceId, R updated, R actual) {
}
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
if (isFilteringEventSource) {
recentOperationEventFilter().prepareForCreateOrUpdateEventFiltering(resourceID, desired);
}
}

private void cleanupAfterEventFiltering(R desired, ResourceID resourceID, R created) {
if (isFilteringEventSource) {
recentOperationEventFilter().cleanupOnCreateOrUpdateEventFiltering(resourceID, created);
}
}

@SuppressWarnings("unchecked")
private RecentOperationEventFilter<R> recentOperationEventFilter() {
return (RecentOperationEventFilter<R>) eventSource;
}

@SuppressWarnings("unchecked")
private RecentOperationCacheFiller<R> recentOperationCacheFiller() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,33 @@ public void configureWith(InformerEventSource<R, P> informerEventSource) {
setEventSource(informerEventSource);
}


protected R handleCreate(R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(desired);
R created = null;
try {
prepareEventFiltering(desired, resourceID);
created = super.handleCreate(desired, primary, context);
return created;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(resourceID);
throw e;
}
}

protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
ResourceID resourceID = ResourceID.fromResource(desired);
R updated = null;
try {
prepareEventFiltering(desired, resourceID);
updated = super.handleUpdate(actual, desired, primary, context);
return updated;
} catch (RuntimeException e) {
cleanupAfterEventFiltering(resourceID);
throw e;
}
}

@SuppressWarnings("unused")
public R create(R target, P primary, Context<P> context) {
return prepare(target, primary, "Creating").create(target);
Expand Down Expand Up @@ -152,4 +179,13 @@ public KubernetesClient getKubernetesClient() {
protected R desired(P primary, Context<P> context) {
return super.desired(primary, context);
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
eventSource().prepareForCreateOrUpdateEventFiltering(resourceID, desired);
}

private void cleanupAfterEventFiltering(ResourceID resourceID) {
eventSource().cleanupOnCreateOrUpdateEventFiltering(resourceID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,10 @@ public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resou
* Mean to be called to clean up in case of an exception from the client. Usually in a catch
* block.
*
* @param resource handled by the informer
* @param resourceID to cleanup
*/
@Override
public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID,
R resource) {
public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID) {
log.debug("Stopping event recording for: {}", resourceID);
eventRecorder.stopEventRecording(resourceID);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.junit.OperatorExtension;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.ConfigMapDependentResource;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResource;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResourceSpec;
import io.javaoperatorsdk.operator.sample.operationeventfiltering.OperationEventFilterCustomResourceTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OperationEventFilterIT {

public static final String TEST = "test";
public static final String SPEC_VAL_1 = "val1";
public static final String SPEC_VAL_2 = "val2";

@RegisterExtension
OperatorExtension operator =
OperatorExtension.builder()
.withReconciler(new OperationEventFilterCustomResourceTestReconciler())
.build();

@Test
void reconcileNotTriggeredWithDependentResourceCreateOrUpdate() {
var resource = operator.create(OperationEventFilterCustomResource.class, createTestResource());

await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
.until(
() -> ((OperationEventFilterCustomResourceTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() == 1);
assertThat(operator.get(ConfigMap.class, TEST).getData())
.containsEntry(ConfigMapDependentResource.KEY, SPEC_VAL_1);

resource.getSpec().setValue(SPEC_VAL_2);
operator.replace(OperationEventFilterCustomResource.class, resource);

await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
.until(
() -> ((OperationEventFilterCustomResourceTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() == 2);
assertThat(operator.get(ConfigMap.class, TEST).getData())
.containsEntry(ConfigMapDependentResource.KEY, SPEC_VAL_2);
}


private OperationEventFilterCustomResource createTestResource() {
OperationEventFilterCustomResource cr = new OperationEventFilterCustomResource();
cr.setMetadata(new ObjectMeta());
cr.getMetadata().setName(TEST);
cr.setSpec(new OperationEventFilterCustomResourceSpec());
cr.getSpec().setValue(SPEC_VAL_1);
return cr;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ void managedDependentsAreReconciledInOrder() {

await().atMost(Duration.ofSeconds(5))
.until(() -> ((OrderedManagedDependentTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() >= 1);
// todo change to more precise values when event filtering is fixed
// assertThat(OrderedManagedDependentTestReconciler.dependentExecution).hasSize(4);
.getNumberOfExecutions() == 1);

assertThat(OrderedManagedDependentTestReconciler.dependentExecution.get(0))
.isEqualTo(ConfigMapDependentResource1.class);
assertThat(OrderedManagedDependentTestReconciler.dependentExecution.get(1))
.isEqualTo(ConfigMapDependentResource2.class);

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public UpdateControl<CreateUpdateEventFilterTestCustomResource> reconcile(
informerEventSource.handleRecentResourceCreate(resourceID, configMap);
} catch (RuntimeException e) {
informerEventSource
.cleanupOnCreateOrUpdateEventFiltering(resourceID, configMapToCreate);
.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
} else {
Expand All @@ -76,7 +76,7 @@ public UpdateControl<CreateUpdateEventFilterTestCustomResource> reconcile(
newConfigMap, configMap);
} catch (RuntimeException e) {
informerEventSource
.cleanupOnCreateOrUpdateEventFiltering(resourceID, configMap);
.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import java.util.HashMap;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUKubernetesDependentResource;

public class ConfigMapDependentResource extends
CRUKubernetesDependentResource<ConfigMap, OperationEventFilterCustomResource> {

public static final String KEY = "key1";

public ConfigMapDependentResource() {
super(ConfigMap.class);
}

@Override
protected ConfigMap desired(OperationEventFilterCustomResource primary,
Context<OperationEventFilterCustomResource> context) {

ConfigMap configMap = new ConfigMap();
configMap.setMetadata(new ObjectMeta());
configMap.getMetadata().setName(primary.getMetadata().getName());
configMap.getMetadata().setNamespace(primary.getMetadata().getNamespace());
HashMap<String, String> data = new HashMap<>();
data.put(KEY, primary.getSpec().getValue());
configMap.setData(data);
return configMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Kind;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@Kind("OperationEventFilterCustomResource")
@ShortNames("oef")
public class OperationEventFilterCustomResource
extends CustomResource<OperationEventFilterCustomResourceSpec, String>
implements Namespaced {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

public class OperationEventFilterCustomResourceSpec {

private String value;

public String getValue() {
return value;
}

public OperationEventFilterCustomResourceSpec setValue(String value) {
this.value = value;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.sample.operationeventfiltering;

import java.util.concurrent.atomic.AtomicInteger;

import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;

@ControllerConfiguration(
namespaces = Constants.WATCH_CURRENT_NAMESPACE,
dependents = {
@Dependent(type = ConfigMapDependentResource.class),
})
public class OperationEventFilterCustomResourceTestReconciler
implements Reconciler<OperationEventFilterCustomResource>,
TestExecutionInfoProvider {

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

@Override
public UpdateControl<OperationEventFilterCustomResource> reconcile(
OperationEventFilterCustomResource resource,
Context<OperationEventFilterCustomResource> context) {
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}

public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

}