Skip to content

Commit 3ba1346

Browse files
committed
refactor: simplify handling of reused event sources
DependentResources that provide an EventSource should implement EventSourceAware. If they want to reuse an already created event source instead of providing their own, they need to select the appropriate event source identified by the name specified by the useEventSourceNamed method.
1 parent 9c1be9d commit 3ba1346

21 files changed

+115
-175
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ public List<DependentResourceSpec> getDependentResources() {
248248
instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
249249
instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
250250
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
251-
dependent.provideEventSource());
251+
Constants.NO_VALUE_SET.equals(dependent.eventSource()) ? null
252+
: dependent.eventSource());
252253
specsMap.put(name, spec);
253254
}
254255

@@ -314,14 +315,12 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
314315
resourceDiscriminator =
315316
instantiateIfNotDefault(kubeDependent.resourceDiscriminator(),
316317
ResourceDiscriminator.class, context);
317-
eventSourceNameToUse = Constants.NO_VALUE_SET.equals(kubeDependent.eventSourceToUse()) ? null
318-
: kubeDependent.eventSourceToUse();
319318
}
320319

321320
config =
322321
new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS,
323322
resourceDiscriminator, onAddFilter,
324-
onUpdateFilter, onDeleteFilter, genericFilter, eventSourceNameToUse);
323+
onUpdateFilter, onDeleteFilter, genericFilter);
325324

326325
return config;
327326
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
174174
namedDependentResourceSpecs.put(name,
175175
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
176176
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
177-
current.getDeletePostCondition(), current.provideEventSource()));
177+
current.getDeletePostCondition(), current.getEventSourceName().orElse(null)));
178178
}
179179

180180
@SuppressWarnings("unchecked")
@@ -220,7 +220,8 @@ public ControllerConfiguration<R> build() {
220220
KubernetesDependentResourceConfig c) {
221221
return new DependentResourceSpec(spec.getDependentResourceClass(),
222222
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
223-
spec.getReconcileCondition(), spec.getDeletePostCondition(), spec.provideEventSource());
223+
spec.getReconcileCondition(), spec.getDeletePostCondition(),
224+
(String) spec.getEventSourceName().orElse(null));
224225
}
225226

226227
public static <R extends HasMetadata> ControllerConfigurationOverrider<R> override(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ public class DependentResourceSpec<T extends DependentResource<?, ?>, C> {
2323

2424
private final Condition<?, ?> deletePostCondition;
2525

26-
private final boolean provideEventSource;
26+
private final String eventSourceName;
2727

2828
public DependentResourceSpec(Class<T> dependentResourceClass, C dependentResourceConfig,
2929
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
3030
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
31-
boolean provideEventSource) {
31+
String eventSourceName) {
3232
this.dependentResourceClass = dependentResourceClass;
3333
this.dependentResourceConfig = dependentResourceConfig;
3434
this.name = name;
3535
this.dependsOn = dependsOn;
3636
this.readyCondition = readyCondition;
3737
this.reconcileCondition = reconcileCondition;
3838
this.deletePostCondition = deletePostCondition;
39-
this.provideEventSource = provideEventSource;
39+
this.eventSourceName = eventSourceName;
4040
}
4141

4242
public Class<T> getDependentResourceClass() {
@@ -94,7 +94,7 @@ public Condition getDeletePostCondition() {
9494
return deletePostCondition;
9595
}
9696

97-
public boolean provideEventSource() {
98-
return provideEventSource;
97+
public Optional<String> getEventSourceName() {
98+
return Optional.ofNullable(eventSourceName);
9999
}
100100
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContext.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import io.fabric8.kubernetes.api.model.HasMetadata;
44
import io.fabric8.kubernetes.client.KubernetesClient;
55
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
6+
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
7+
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
68
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
79
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
810

@@ -13,14 +15,14 @@
1315
*/
1416
public class EventSourceContext<P extends HasMetadata> {
1517

16-
private final IndexerResourceCache<P> primaryCache;
18+
private final EventSourceManager<P> eventSourceManager;
1719
private final ControllerConfiguration<P> controllerConfiguration;
1820
private final KubernetesClient client;
1921

20-
public EventSourceContext(IndexerResourceCache<P> primaryCache,
22+
public EventSourceContext(EventSourceManager<P> eventSourceManager,
2123
ControllerConfiguration<P> controllerConfiguration,
2224
KubernetesClient client) {
23-
this.primaryCache = primaryCache;
25+
this.eventSourceManager = eventSourceManager;
2426
this.controllerConfiguration = controllerConfiguration;
2527
this.client = client;
2628
}
@@ -31,7 +33,7 @@ public EventSourceContext(IndexerResourceCache<P> primaryCache,
3133
* @return the primary resource cache
3234
*/
3335
public IndexerResourceCache<P> getPrimaryCache() {
34-
return primaryCache;
36+
return eventSourceManager.getControllerResourceEventSource();
3537
}
3638

3739
/**
@@ -54,4 +56,8 @@ public ControllerConfiguration<P> getControllerConfiguration() {
5456
public KubernetesClient getClient() {
5557
return client;
5658
}
59+
60+
public EventSourceRetriever<P> getEventSourceRetriever() {
61+
return eventSourceManager;
62+
}
5763
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.util.Arrays;
34
import java.util.Collections;
45
import java.util.HashMap;
56
import java.util.Map;
6-
import java.util.Optional;
77

88
import io.fabric8.kubernetes.api.model.HasMetadata;
99
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
10+
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
1011
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
11-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
1212

1313
/**
1414
* An interface that a {@link Reconciler} can implement to have the SDK register the provided
@@ -46,13 +46,12 @@ static Map<String, EventSource> nameEventSources(EventSource... eventSources) {
4646
@SuppressWarnings("unchecked,rawtypes")
4747
static <K extends HasMetadata> Map<String, EventSource> nameEventSourcesFromDependentResource(
4848
EventSourceContext<K> context, DependentResource... dependentResources) {
49-
5049
if (dependentResources != null) {
5150
Map<String, EventSource> eventSourceMap = new HashMap<>(dependentResources.length);
52-
for (DependentResource dependentResource : dependentResources) {
53-
Optional<ResourceEventSource> es = dependentResource.eventSource(context);
54-
es.ifPresent(e -> eventSourceMap.put(generateNameFor(e), e));
55-
}
51+
Arrays.stream(dependentResources)
52+
.filter(EventSourceAware.class::isInstance)
53+
.forEach(esa -> ((EventSourceAware<?, K>) esa).eventSource(context)
54+
.ifPresent(es -> eventSourceMap.put(generateNameFor(es), es)));
5655
return eventSourceMap;
5756
} else {
5857
return Collections.emptyMap();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,5 @@
6565
*
6666
* @return if the event source (if any) provided by the dependent resource should be used or not.
6767
*/
68-
boolean provideEventSource() default true;
68+
String eventSource() default NO_VALUE_SET;
6969
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
66
import io.javaoperatorsdk.operator.api.reconciler.Context;
7-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
8-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
97

108
/**
119
* An interface to implement and provide dependent resource support.
@@ -31,31 +29,6 @@ public interface DependentResource<R, P extends HasMetadata> {
3129
*/
3230
Class<R> resourceType();
3331

34-
/**
35-
* Dependent resources are designed to by default provide event sources. There are cases where it
36-
* might not:
37-
* <ul>
38-
* <li>If an event source is shared between multiple dependent resources. In this case only one or
39-
* none of the dependent resources sharing the event source should provide one.</li>
40-
* <li>Some special implementation of an event source. That just execute some action might not
41-
* provide one.</li>
42-
* </ul>
43-
*
44-
* @param eventSourceContext context of event source initialization
45-
* @return an optional event source
46-
*/
47-
default Optional<ResourceEventSource<R, P>> eventSource(
48-
EventSourceContext<P> eventSourceContext) {
49-
return Optional.empty();
50-
}
51-
52-
/**
53-
* Calling this method, instructs the implementation to not provide an event source, even if it
54-
* normally does.
55-
*/
56-
void doNotProvideEventSource();
57-
58-
5932
default Optional<R> getSecondaryResource(P primary, Context<P> context) {
6033
return Optional.empty();
6134
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,36 @@
11
package io.javaoperatorsdk.operator.api.reconciler.dependent;
22

3+
import java.util.Optional;
4+
35
import io.fabric8.kubernetes.api.model.HasMetadata;
4-
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
6+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
7+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
8+
9+
public interface EventSourceAware<R, P extends HasMetadata> {
10+
11+
/**
12+
* Dependent resources are designed to by default provide event sources. There are cases where it
13+
* might not:
14+
* <ul>
15+
* <li>If an event source is shared between multiple dependent resources. In this case only one or
16+
* none of the dependent resources sharing the event source should provide one.</li>
17+
* <li>Some special implementation of an event source. That just execute some action might not
18+
* provide one.</li>
19+
* </ul>
20+
*
21+
* @param context context of event source initialization
22+
* @return an optional event source
23+
*/
24+
default Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> context) {
25+
return getUsedEventSourceName().map(
26+
name -> context.getEventSourceRetriever().getResourceEventSourceFor(resourceType(), name));
27+
}
528

6-
public interface EventSourceAware<P extends HasMetadata> {
29+
Class<R> resourceType();
730

8-
void selectEventSources(EventSourceRetriever<P> eventSourceRetriever);
31+
void useEventSourceNamed(String eventSourceName);
932

33+
default Optional<String> getUsedEventSourceName() {
34+
return Optional.empty();
35+
}
1036
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import io.javaoperatorsdk.operator.processing.event.EventProcessor;
4141
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
4242
import io.javaoperatorsdk.operator.processing.event.ResourceID;
43-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
4443

4544
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE;
4645

@@ -214,25 +213,21 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
214213
final var ownSources = provider.prepareEventSources(context);
215214
ownSources.forEach(eventSourceManager::registerEventSource);
216215
}
217-
managedWorkflow
218-
.getDependentResourcesByName().entrySet().stream()
219-
.forEach(drEntry -> {
220-
if (drEntry.getValue() instanceof EventSourceProvider) {
221-
final var provider = (EventSourceProvider) drEntry.getValue();
222-
final var source = provider.initEventSource(context);
223-
eventSourceManager.registerEventSource(drEntry.getKey(), source);
224-
} else {
225-
Optional<ResourceEventSource> eventSource =
226-
drEntry.getValue().eventSource(context);
227-
eventSource.ifPresent(es -> {
228-
eventSourceManager.registerEventSource(drEntry.getKey(), es);
229-
});
230-
}
231-
});
232-
managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue)
233-
.filter(EventSourceAware.class::isInstance)
234-
.forEach(dr -> ((EventSourceAware) dr)
235-
.selectEventSources(eventSourceManager));
216+
managedWorkflow.getDependentResourcesByName().entrySet().stream().filter(entry -> {
217+
final var value = entry.getValue();
218+
return value instanceof EventSourceProvider || value instanceof EventSourceAware;
219+
}).forEach(entry -> {
220+
final var value = entry.getValue();
221+
final var key = entry.getKey();
222+
if (value instanceof EventSourceProvider) {
223+
final var provider = (EventSourceProvider) value;
224+
final var source = provider.initEventSource(context);
225+
eventSourceManager.registerEventSource(key, source);
226+
} else {
227+
((EventSourceAware<?, P>) value).eventSource(context)
228+
.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
229+
}
230+
});
236231
}
237232

238233
@Override
@@ -299,8 +294,8 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep
299294
try {
300295
// check that the custom resource is known by the cluster if configured that way
301296
validateCRDWithLocalModelIfRequired(resClass, controllerName, crdName, specVersion);
302-
final var context = new EventSourceContext<>(
303-
eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient);
297+
final var context =
298+
new EventSourceContext<>(eventSourceManager, configuration, kubernetesClient);
304299

305300
initAndRegisterEventSources(context);
306301
eventSourceManager.start();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
1111
import io.javaoperatorsdk.operator.api.reconciler.Context;
12-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
1312
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
1413
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
1514
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1615
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
1716
import io.javaoperatorsdk.operator.processing.event.ResourceID;
18-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
1917

2018
@Ignore
2119
public abstract class AbstractDependentResource<R, P extends HasMetadata>
@@ -29,7 +27,6 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
2927
protected Creator<R, P> creator;
3028
protected Updater<R, P> updater;
3129
protected BulkDependentResource<R, P> bulkDependentResource;
32-
private boolean returnEventSource = true;
3330

3431
protected List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);
3532

@@ -41,23 +38,6 @@ public AbstractDependentResource() {
4138
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
4239
}
4340

44-
@Override
45-
public void doNotProvideEventSource() {
46-
this.returnEventSource = false;
47-
}
48-
49-
@Override
50-
public Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> eventSourceContext) {
51-
if (!returnEventSource) {
52-
return Optional.empty();
53-
} else {
54-
return Optional.of(provideEventSource(eventSourceContext));
55-
}
56-
}
57-
58-
protected abstract ResourceEventSource<R, P> provideEventSource(
59-
EventSourceContext<P> eventSourceContext);
60-
6141
@Override
6242
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
6343
if (bulk) {
@@ -238,8 +218,4 @@ public ResourceDiscriminator<R, P> getResourceDiscriminator() {
238218
protected int lastKnownBulkSize() {
239219
return resourceDiscriminator.size();
240220
}
241-
242-
protected boolean getReturnEventSource() {
243-
return returnEventSource;
244-
}
245221
}

0 commit comments

Comments
 (0)