-
Notifications
You must be signed in to change notification settings - Fork 218
Custom event filter for controllers #457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d0582ca
ff9ba8d
da3437d
3d849dc
ad84c2f
c136f09
9e02044
c0b8f5a
5d60b4a
8dd8302
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import java.util.Set; | ||
|
||
import io.fabric8.kubernetes.client.CustomResource; | ||
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventFilter; | ||
|
||
public class ControllerConfigurationOverrider<R extends CustomResource<?, ?>> { | ||
|
||
|
@@ -13,6 +14,7 @@ public class ControllerConfigurationOverrider<R extends CustomResource<?, ?>> { | |
private final Set<String> namespaces; | ||
private RetryConfiguration retry; | ||
private String labelSelector; | ||
private CustomResourceEventFilter<R> customResourcePredicate; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also a list of predicates. Also the name of the field should correlated with the type IMHO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the name here is a leftover from the previous iteration, will fix it |
||
private final ControllerConfiguration<R> original; | ||
|
||
private ControllerConfigurationOverrider(ControllerConfiguration<R> original) { | ||
|
@@ -21,6 +23,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) { | |
namespaces = new HashSet<>(original.getNamespaces()); | ||
retry = original.getRetryConfiguration(); | ||
labelSelector = original.getLabelSelector(); | ||
customResourcePredicate = original.getEventFilter(); | ||
this.original = original; | ||
} | ||
|
||
|
@@ -65,6 +68,12 @@ public ControllerConfigurationOverrider<R> withLabelSelector(String labelSelecto | |
return this; | ||
} | ||
|
||
public ControllerConfigurationOverrider<R> withCustomResourcePredicate( | ||
CustomResourceEventFilter<R> customResourcePredicate) { | ||
this.customResourcePredicate = customResourcePredicate; | ||
return this; | ||
} | ||
|
||
public ControllerConfiguration<R> build() { | ||
return new DefaultControllerConfiguration<>( | ||
original.getAssociatedControllerClassName(), | ||
|
@@ -75,6 +84,7 @@ public ControllerConfiguration<R> build() { | |
namespaces, | ||
retry, | ||
labelSelector, | ||
customResourcePredicate, | ||
original.getCustomResourceClass(), | ||
original.getConfigurationService()); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,13 +23,13 @@ | |
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; | ||
|
||
@SuppressWarnings("rawtypes") | ||
public class CustomResourceCache { | ||
public class CustomResourceCache<T extends CustomResource<?, ?>> { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(CustomResourceCache.class); | ||
private static final Predicate passthrough = o -> true; | ||
|
||
private final ObjectMapper objectMapper; | ||
private final ConcurrentMap<String, CustomResource> resources; | ||
private final ConcurrentMap<String, T> resources; | ||
private final Lock lock = new ReentrantLock(); | ||
|
||
public CustomResourceCache() { | ||
|
@@ -44,19 +44,21 @@ public CustomResourceCache(ObjectMapper objectMapper, Metrics metrics) { | |
resources = metrics.monitorSizeOf(new ConcurrentHashMap<>(), "cache"); | ||
} | ||
|
||
public void cacheResource(CustomResource resource) { | ||
@SuppressWarnings("unchecked") | ||
public void cacheResource(T resource) { | ||
cacheResource(resource, passthrough); | ||
} | ||
|
||
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) { | ||
public void cacheResource(T resource, Predicate<T> predicate) { | ||
try { | ||
lock.lock(); | ||
final var uid = getUID(resource); | ||
if (predicate.test(resources.get(uid))) { | ||
if (passthrough != predicate) { | ||
log.trace("Update cache after condition is true: {}", getName(resource)); | ||
} | ||
resources.put(uid, resource); | ||
// defensive copy | ||
resources.put(getUID(resource), clone(resource)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we cloning here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason is that as example, if for some reason the process that applies the finalizer fails when updating the object, then the cache would still reporting the object as having the finalizer which is not correct. it probably won't matter so if it should be removed, I ca do it in a subsequent pr |
||
} | ||
} finally { | ||
lock.unlock(); | ||
|
@@ -70,11 +72,11 @@ public void cacheResource(CustomResource resource, Predicate<CustomResource> pre | |
* @param uuid | ||
* @return | ||
*/ | ||
public Optional<CustomResource> getLatestResource(String uuid) { | ||
public Optional<T> getLatestResource(String uuid) { | ||
return Optional.ofNullable(resources.get(uuid)).map(this::clone); | ||
} | ||
|
||
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) { | ||
public List<T> getLatestResources(Predicate<CustomResource> selector) { | ||
try { | ||
lock.lock(); | ||
return resources.values().stream() | ||
|
@@ -98,16 +100,17 @@ public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) { | |
} | ||
} | ||
|
||
private CustomResource clone(CustomResource customResource) { | ||
@SuppressWarnings("unchecked") | ||
private T clone(CustomResource customResource) { | ||
try { | ||
return objectMapper.readValue( | ||
return (T) objectMapper.readValue( | ||
objectMapper.writeValueAsString(customResource), customResource.getClass()); | ||
} catch (JsonProcessingException e) { | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
|
||
public CustomResource cleanup(String customResourceUid) { | ||
public T cleanup(String customResourceUid) { | ||
return resources.remove(customResourceUid); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package io.javaoperatorsdk.operator.processing.event.internal; | ||
|
||
import io.fabric8.kubernetes.client.CustomResource; | ||
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; | ||
|
||
/** | ||
* A functional interface to determine whether resource events should be processed by the SDK. This | ||
* allows users to more finely tuned which events trigger a reconciliation than was previously | ||
* possible (where the logic was limited to generation-based checking). | ||
* | ||
* @param <T> the type of custom resources handled by this filter | ||
*/ | ||
@FunctionalInterface | ||
public interface CustomResourceEventFilter<T extends CustomResource> { | ||
|
||
/** | ||
* Determines whether the change between the old version of the resource and the new one needs to | ||
* be propagated to the controller or not. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have filter on Creates(Deletes)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
* @param configuration the target controller's configuration | ||
* @param oldResource the old version of the resource, null if no old resource available | ||
* @param newResource the new version of the resource | ||
* @return {@code true} if the change needs to be propagated to the controller, {@code false} | ||
* otherwise | ||
*/ | ||
boolean acceptChange(ControllerConfiguration<T> configuration, T oldResource, T newResource); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to discuss this @lburgazzoli @metacosm . The old object should not be here as I see this now. |
||
|
||
/** | ||
* Combines this filter with the provided one with an AND logic, i.e. the resulting filter will | ||
* only accept the change if both this and the other filter accept it, reject it otherwise. | ||
* | ||
* @param other the possibly {@code null} other filter to combine this one with | ||
* @return a composite filter implementing the AND logic between this and the provided filter | ||
*/ | ||
default CustomResourceEventFilter<T> and(CustomResourceEventFilter<T> other) { | ||
return other == null ? this | ||
: (ControllerConfiguration<T> configuration, T oldResource, T newResource) -> { | ||
boolean result = acceptChange(configuration, oldResource, newResource); | ||
return result && other.acceptChange(configuration, oldResource, newResource); | ||
}; | ||
} | ||
|
||
/** | ||
* Combines this filter with the provided one with an OR logic, i.e. the resulting filter will | ||
* accept the change if any of this or the other filter accept it, rejecting it only if both | ||
* reject it. | ||
* | ||
* @param other the possibly {@code null} other filter to combine this one with | ||
* @return a composite filter implementing the OR logic between this and the provided filter | ||
*/ | ||
default CustomResourceEventFilter<T> or(CustomResourceEventFilter<T> other) { | ||
return other == null ? this | ||
: (ControllerConfiguration<T> configuration, T oldResource, T newResource) -> { | ||
boolean result = acceptChange(configuration, oldResource, newResource); | ||
return result || other.acceptChange(configuration, oldResource, newResource); | ||
}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a list too, or?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a list but the it will become an AND so the rationale here is that if you have to chain multiple filers, then you can probably chain them using the
and
/or
operations provided by the interface so you are free to compose them as you like.But no problem to make this a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You right, is might be a too strong assumption to make. Thus that this would be usually an "AND" relationship in those filters.