Skip to content

Commit 326a3ad

Browse files
committed
Custom event filter for controllers
1 parent 8d1b15e commit 326a3ad

File tree

6 files changed

+105
-60
lines changed

6 files changed

+105
-60
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.javaoperatorsdk.operator.ControllerUtils;
55
import io.javaoperatorsdk.operator.api.Controller;
66
import java.lang.reflect.ParameterizedType;
7+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicate;
8+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicates;
79
import java.util.Collections;
810
import java.util.Set;
911

@@ -97,4 +99,10 @@ default void setConfigurationService(ConfigurationService service) {}
9799
default boolean useFinalizer() {
98100
return !Controller.NO_FINALIZER.equals(getFinalizer());
99101
}
102+
103+
default CustomResourcePredicate<R> getPredicate() {
104+
return isGenerationAware()
105+
? CustomResourcePredicates.generationAware()
106+
: CustomResourcePredicates.passthrough();
107+
}
100108
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import org.slf4j.LoggerFactory;
2020

2121
@SuppressWarnings("rawtypes")
22-
public class CustomResourceCache {
22+
public class CustomResourceCache<T extends CustomResource<?, ?>> {
2323

2424
private static final Logger log = LoggerFactory.getLogger(CustomResourceCache.class);
2525

2626
private final ObjectMapper objectMapper;
27-
private final ConcurrentMap<String, CustomResource> resources = new ConcurrentHashMap<>();
27+
private final ConcurrentMap<String, T> resources = new ConcurrentHashMap<>();
2828
private final Lock lock = new ReentrantLock();
2929

3030
public CustomResourceCache() {
@@ -35,21 +35,24 @@ public CustomResourceCache(ObjectMapper objectMapper) {
3535
this.objectMapper = objectMapper;
3636
}
3737

38-
public void cacheResource(CustomResource resource) {
38+
public void cacheResource(T resource) {
3939
try {
4040
lock.lock();
41-
resources.put(KubernetesResourceUtils.getUID(resource), resource);
41+
42+
// defensive copy
43+
resources.put(KubernetesResourceUtils.getUID(resource), clone(resource));
4244
} finally {
4345
lock.unlock();
4446
}
4547
}
4648

47-
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
49+
public void cacheResource(T resource, Predicate<CustomResource> predicate) {
4850
try {
4951
lock.lock();
5052
if (predicate.test(resources.get(KubernetesResourceUtils.getUID(resource)))) {
5153
log.trace("Update cache after condition is true: {}", getName(resource));
52-
resources.put(getUID(resource), resource);
54+
// defensive copy
55+
resources.put(getUID(resource), clone(resource));
5356
}
5457
} finally {
5558
lock.unlock();
@@ -63,11 +66,11 @@ public void cacheResource(CustomResource resource, Predicate<CustomResource> pre
6366
* @param uuid
6467
* @return
6568
*/
66-
public Optional<CustomResource> getLatestResource(String uuid) {
69+
public Optional<T> getLatestResource(String uuid) {
6770
return Optional.ofNullable(resources.get(uuid)).map(this::clone);
6871
}
6972

70-
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
73+
public List<T> getLatestResources(Predicate<CustomResource> selector) {
7174
try {
7275
lock.lock();
7376
return resources.values().stream()
@@ -91,16 +94,17 @@ public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) {
9194
}
9295
}
9396

94-
private CustomResource clone(CustomResource customResource) {
97+
@SuppressWarnings("unchecked")
98+
private T clone(CustomResource customResource) {
9599
try {
96-
return objectMapper.readValue(
100+
return (T) objectMapper.readValue(
97101
objectMapper.writeValueAsString(customResource), customResource.getClass());
98102
} catch (JsonProcessingException e) {
99103
throw new IllegalStateException(e);
100104
}
101105
}
102106

103-
public CustomResource cleanup(String customResourceUid) {
107+
public T cleanup(String customResourceUid) {
104108
return resources.remove(customResourceUid);
105109
}
106110
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 18 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,6 @@
44
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
55
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
66

7-
import java.io.IOException;
8-
import java.util.LinkedList;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.concurrent.ConcurrentHashMap;
12-
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
167
import io.fabric8.kubernetes.api.model.ListOptions;
178
import io.fabric8.kubernetes.client.CustomResource;
189
import io.fabric8.kubernetes.client.Watch;
@@ -24,22 +15,28 @@
2415
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
2516
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
2617
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
18+
import java.io.IOException;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2723

28-
/** This is a special case since is not bound to a single custom resource */
24+
/**
25+
* This is a special case since is not bound to a single custom resource
26+
*/
2927
public class CustomResourceEventSource<T extends CustomResource<?, ?>> extends AbstractEventSource
3028
implements Watcher<T> {
3129

3230
private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class);
3331

3432
private final ConfiguredController<T> controller;
35-
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
3633
private final List<Watch> watches;
37-
private final CustomResourceCache customResourceCache;
34+
private final CustomResourceCache<T> customResourceCache;
3835

3936
public CustomResourceEventSource(ConfiguredController<T> controller) {
4037
this.controller = controller;
4138
this.watches = new LinkedList<>();
42-
this.customResourceCache = new CustomResourceCache(
39+
this.customResourceCache = new CustomResourceCache<>(
4340
controller.getConfiguration().getConfigurationService().getObjectMapper());
4441
}
4542

@@ -86,6 +83,9 @@ public void eventReceived(Watcher.Action action, T customResource) {
8683
log.debug(
8784
"Event received for action: {}, resource: {}", action.name(), getName(customResource));
8885

86+
final String uuid = KubernetesResourceUtils.getUID(customResource);
87+
final T oldResource = customResourceCache.getLatestResource(uuid).orElse(null);
88+
8989
// cache the latest version of the CR
9090
customResourceCache.cacheResource(customResource);
9191

@@ -98,9 +98,11 @@ public void eventReceived(Watcher.Action action, T customResource) {
9898
return;
9999
}
100100

101-
if (!skipBecauseOfGeneration(customResource)) {
101+
boolean fire = controller.getConfiguration().getPredicate().test(
102+
controller.getConfiguration(), oldResource, customResource);
103+
104+
if (fire) {
102105
eventHandler.handleEvent(new CustomResourceEvent(action, customResource, this));
103-
markLastGenerationProcessed(customResource);
104106
} else {
105107
log.debug(
106108
"Skipping event handling resource {} with version: {}",
@@ -109,38 +111,6 @@ public void eventReceived(Watcher.Action action, T customResource) {
109111
}
110112
}
111113

112-
private void markLastGenerationProcessed(T resource) {
113-
if (controller.getConfiguration().isGenerationAware()
114-
&& resource.hasFinalizer(controller.getConfiguration().getFinalizer())) {
115-
lastGenerationProcessedSuccessfully.put(
116-
KubernetesResourceUtils.getUID(resource), resource.getMetadata().getGeneration());
117-
}
118-
}
119-
120-
private boolean skipBecauseOfGeneration(T customResource) {
121-
if (!controller.getConfiguration().isGenerationAware()) {
122-
return false;
123-
}
124-
// if CR being deleted generation is naturally not changing, so we process all the events
125-
if (customResource.isMarkedForDeletion()) {
126-
return false;
127-
}
128-
129-
// only proceed if we haven't already seen this custom resource generation
130-
Long lastGeneration =
131-
lastGenerationProcessedSuccessfully.get(customResource.getMetadata().getUid());
132-
if (lastGeneration == null) {
133-
return false;
134-
} else {
135-
return customResource.getMetadata().getGeneration() <= lastGeneration;
136-
}
137-
}
138-
139-
@Override
140-
public void eventSourceDeRegisteredForResource(String customResourceUid) {
141-
lastGenerationProcessedSuccessfully.remove(customResourceUid);
142-
}
143-
144114
@Override
145115
public void onClose(WatcherException e) {
146116
if (e == null) {
@@ -164,7 +134,7 @@ public void onClose(WatcherException e) {
164134
}
165135

166136
// todo: remove
167-
public CustomResourceCache getCache() {
137+
public CustomResourceCache<T> getCache() {
168138
return customResourceCache;
169139
}
170140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
5+
6+
@FunctionalInterface
7+
public interface CustomResourcePredicate<T extends CustomResource> {
8+
boolean test(ControllerConfiguration<T> configuration, T oldResource, T newResource);
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
5+
6+
public final class CustomResourcePredicates {
7+
8+
private static final CustomResourcePredicate<CustomResource> GENERATION_AWARE =
9+
new CustomResourcePredicate<>() {
10+
@Override
11+
public boolean test(
12+
ControllerConfiguration configuration,
13+
CustomResource oldResource,
14+
CustomResource newResource) {
15+
if (oldResource == null) {
16+
return true;
17+
}
18+
if (newResource.isMarkedForDeletion()) {
19+
return true;
20+
}
21+
if (configuration.useFinalizer()) {
22+
boolean oldFinalizer = oldResource.hasFinalizer(configuration.getFinalizer());
23+
boolean newFinalizer = newResource.hasFinalizer(configuration.getFinalizer());
24+
25+
return !newFinalizer || !oldFinalizer;
26+
}
27+
return oldResource.getMetadata().getGeneration() < newResource.getMetadata()
28+
.getGeneration();
29+
}
30+
};
31+
32+
private static final CustomResourcePredicate<CustomResource> PASSTHROUGH =
33+
new CustomResourcePredicate<>() {
34+
@Override
35+
public boolean test(
36+
ControllerConfiguration configuration,
37+
CustomResource oldResource,
38+
CustomResource newResource) {
39+
return true;
40+
}
41+
};
42+
43+
private CustomResourcePredicates() {}
44+
45+
@SuppressWarnings("unchecked")
46+
public static <T extends CustomResource> CustomResourcePredicate<T> passthrough() {
47+
return (CustomResourcePredicate<T>) PASSTHROUGH;
48+
}
49+
50+
@SuppressWarnings("unchecked")
51+
public static <T extends CustomResource> CustomResourcePredicate<T> generationAware() {
52+
return (CustomResourcePredicate<T>) GENERATION_AWARE;
53+
}
54+
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceSelectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public static class MyConfiguration implements ControllerConfiguration<TestCusto
144144

145145
public MyConfiguration(ConfigurationService configurationService, String labelSelector) {
146146
this.labelSelector = labelSelector;
147-
service = configurationService;
147+
this.service = configurationService;
148148
}
149149

150150
@Override

0 commit comments

Comments
 (0)