Skip to content

Commit 6f12061

Browse files
committed
draft of changes to simplify the recording mechanism
1 parent 3b72bb4 commit 6f12061

File tree

7 files changed

+68
-426
lines changed

7 files changed

+68
-426
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationEventFilter.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public abstract class KubernetesDependentResource<R extends HasMetadata, P exten
3737
implements KubernetesClientAware,
3838
DependentResourceConfigurator<KubernetesDependentResourceConfig<R>> {
3939

40+
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
41+
4042
private static final Logger log = LoggerFactory.getLogger(KubernetesDependentResource.class);
4143

4244
protected KubernetesClient client;
@@ -103,35 +105,14 @@ public void configureWith(InformerEventSource<R, P> informerEventSource) {
103105
setEventSource(informerEventSource);
104106
}
105107

106-
107-
protected R handleCreate(R desired, P primary, Context<P> context) {
108-
ResourceID resourceID = ResourceID.fromResource(desired);
109-
try {
110-
prepareEventFiltering(desired, resourceID);
111-
return super.handleCreate(desired, primary, context);
112-
} catch (RuntimeException e) {
113-
cleanupAfterEventFiltering(resourceID);
114-
throw e;
115-
}
116-
}
117-
118-
protected R handleUpdate(R actual, R desired, P primary, Context<P> context) {
119-
ResourceID resourceID = ResourceID.fromResource(desired);
120-
try {
121-
prepareEventFiltering(desired, resourceID);
122-
return super.handleUpdate(actual, desired, primary, context);
123-
} catch (RuntimeException e) {
124-
cleanupAfterEventFiltering(resourceID);
125-
throw e;
126-
}
127-
}
128-
129108
@SuppressWarnings("unused")
130109
public R create(R target, P primary, Context<P> context) {
131110
if (useSSA(context)) {
132111
// setting resource version for SSA so only created if it doesn't exist already
133112
target.getMetadata().setResourceVersion("1");
134113
}
114+
String id = ((InformerEventSource<R, P>) eventSource().orElseThrow()).getId();
115+
target.getMetadata().getAnnotations().put(PREVIOUS_ANNOTATION_KEY, id);
135116
final var resource = prepare(target, primary, "Creating");
136117
return useSSA(context)
137118
? resource
@@ -147,20 +128,24 @@ public R update(R actual, R target, P primary, Context<P> context) {
147128
actual.getMetadata().getResourceVersion());
148129
}
149130
R updatedResource;
131+
target.getMetadata().setResourceVersion(actual.getMetadata().getResourceVersion());
132+
String id = ((InformerEventSource<R, P>) eventSource().orElseThrow()).getId();
133+
target.getMetadata().getAnnotations().put(PREVIOUS_ANNOTATION_KEY,
134+
id + "," + actual.getMetadata().getResourceVersion());
150135
if (useSSA(context)) {
151-
target.getMetadata().setResourceVersion(actual.getMetadata().getResourceVersion());
152136
updatedResource = prepare(target, primary, "Updating")
153137
.fieldManager(context.getControllerConfiguration().fieldManager())
154138
.forceConflicts().serverSideApply();
155139
} else {
156140
var updatedActual = updaterMatcher.updateResource(actual, target, context);
157-
updatedResource = prepare(updatedActual, primary, "Updating").replace();
141+
updatedResource = prepare(updatedActual, primary, "Updating").update();
158142
}
159143
log.debug("Resource version after update: {}",
160144
updatedResource.getMetadata().getResourceVersion());
161145
return updatedResource;
162146
}
163147

148+
@Override
164149
public Result<R> match(R actualResource, P primary, Context<P> context) {
165150
final var desired = desired(primary, context);
166151
final boolean matches;
@@ -193,6 +178,7 @@ private boolean useSSA(Context<P> context) {
193178
.ssaBasedCreateUpdateMatchForDependentResources();
194179
}
195180

181+
@Override
196182
protected void handleDelete(P primary, R secondary, Context<P> context) {
197183
if (secondary != null) {
198184
client.resource(secondary).delete();
@@ -290,16 +276,6 @@ protected R desired(P primary, Context<P> context) {
290276
return super.desired(primary, context);
291277
}
292278

293-
private void prepareEventFiltering(R desired, ResourceID resourceID) {
294-
((InformerEventSource<R, P>) eventSource().orElseThrow())
295-
.prepareForCreateOrUpdateEventFiltering(resourceID, desired);
296-
}
297-
298-
private void cleanupAfterEventFiltering(ResourceID resourceID) {
299-
((InformerEventSource<R, P>) eventSource().orElseThrow())
300-
.cleanupOnCreateOrUpdateEventFiltering(resourceID);
301-
}
302-
303279
@Override
304280
public Optional<KubernetesDependentResourceConfig<R>> configuration() {
305281
return Optional.ofNullable(kubernetesDependentResourceConfig);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventRecorder.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 42 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Map;
66
import java.util.Optional;
77
import java.util.Set;
8+
import java.util.UUID;
89
import java.util.function.Function;
910
import java.util.stream.Collectors;
1011

@@ -18,7 +19,7 @@
1819
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1920
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
2021
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
21-
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter;
22+
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
2223
import io.javaoperatorsdk.operator.processing.event.Event;
2324
import io.javaoperatorsdk.operator.processing.event.EventHandler;
2425
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -72,17 +73,16 @@
7273
*/
7374
public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7475
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
75-
implements ResourceEventHandler<R>, RecentOperationEventFilter<R> {
76+
implements ResourceEventHandler<R> {
7677

7778
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
7879

7980
private final InformerConfiguration<R> configuration;
80-
// always called from a synchronized method
81-
private final EventRecorder<R> eventRecorder = new EventRecorder<>();
8281
// we need direct control for the indexer to propagate the just update resource also to the index
8382
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
8483
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
8584
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
85+
private String id = UUID.randomUUID().toString();
8686

8787
public InformerEventSource(
8888
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
@@ -110,6 +110,10 @@ public InformerEventSource(InformerConfiguration<R> configuration, KubernetesCli
110110
genericFilter = configuration.genericFilter().orElse(null);
111111
}
112112

113+
public String getId() {
114+
return id;
115+
}
116+
113117
@Override
114118
public void onAdd(R newResource) {
115119
if (log.isDebugEnabled()) {
@@ -154,12 +158,22 @@ public void onDelete(R resource, boolean b) {
154158
private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject,
155159
Runnable superOnOp) {
156160
var resourceID = ResourceID.fromResource(newObject);
157-
if (eventRecorder.isRecordingFor(resourceID)) {
158-
log.debug("Recording event for: {}", resourceID);
159-
eventRecorder.recordEvent(newObject);
160-
return;
161+
162+
String previous = newObject.getMetadata().getAnnotations()
163+
.get(KubernetesDependentResource.PREVIOUS_ANNOTATION_KEY);
164+
boolean known = false;
165+
if (previous != null) {
166+
String[] parts = previous.split(",");
167+
if (id.equals(parts[0])) {
168+
if (oldObject == null && parts.length == 1) {
169+
known = true;
170+
} else if (oldObject != null && parts.length == 2
171+
&& oldObject.getMetadata().getResourceVersion().equals(parts[1])) {
172+
known = true;
173+
}
174+
}
161175
}
162-
if (temporaryCacheHasResourceWithSameVersionAs(newObject)) {
176+
if (known || temporaryCacheHasResourceWithSameVersionAs(newObject)) {
163177
log.debug(
164178
"Skipping event propagation for {}, since was a result of a reconcile action. Resource ID: {}",
165179
operation,
@@ -239,99 +253,37 @@ public InformerConfiguration<R> getConfiguration() {
239253
@Override
240254
public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource,
241255
R previousVersionOfResource) {
242-
handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource,
243-
() -> super.handleRecentResourceUpdate(resourceID, resource, previousVersionOfResource));
256+
handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource);
244257
}
245258

246259
@Override
247260
public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) {
248-
handleRecentCreateOrUpdate(Operation.ADD, resource, null,
249-
() -> super.handleRecentResourceCreate(resourceID, resource));
261+
handleRecentCreateOrUpdate(Operation.ADD, resource, null);
250262
}
251263

252-
private void handleRecentCreateOrUpdate(Operation operation, R resource, R oldResource,
253-
Runnable runnable) {
254-
primaryToSecondaryIndex.onAddOrUpdate(resource);
255-
if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) {
256-
handleRecentResourceOperationAndStopEventRecording(operation, resource, oldResource);
257-
} else {
258-
runnable.run();
259-
}
260-
}
261-
262-
/**
263-
* There can be the following cases:
264-
* <ul>
265-
* <li>1. Did not receive the event yet for the target resource, then we need to put it to temp
266-
* cache. Because event will arrive. Note that this not necessary mean that the even is not sent
267-
* yet (we are in sync context). Also does not mean that there are no more events received after
268-
* that. But during the event processing (onAdd, onUpdate) we make sure that the propagation just
269-
* skipped for the right event.</li>
270-
* <li>2. Received the event about the operation already, it was the last. This means already is
271-
* on cache of informer. So we have to do nothing. Since it was just recorded and not propagated.
272-
* </li>
273-
* <li>3. Received the event but more events received since, so those were not propagated yet. So
274-
* an event needs to be propagated to compensate.</li>
275-
* </ul>
276-
*
277-
* @param newResource just created or updated resource
278-
*/
279-
private void handleRecentResourceOperationAndStopEventRecording(Operation operation,
280-
R newResource, R oldResource) {
264+
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
265+
primaryToSecondaryIndex.onAddOrUpdate(newResource);
281266
ResourceID resourceID = ResourceID.fromResource(newResource);
282-
try {
283-
if (!eventRecorder.containsEventWithResourceVersion(
284-
resourceID, newResource.getMetadata().getResourceVersion())) {
285-
log.debug(
286-
"Did not found event in buffer with target version and resource id: {}", resourceID);
287-
temporaryResourceCache.unconditionallyCacheResource(newResource);
288-
} else {
289-
// if the resource is not added to the temp cache, it is cleared, since
290-
// the cache is cleared by subsequent events after updates, but if those did not receive
291-
// the temp cache is still filled at this point with an old resource
292-
log.debug("Cleaning temporary cache for resource id: {}", resourceID);
293-
temporaryResourceCache.removeResourceFromCache(newResource);
294-
if (eventRecorder.containsEventWithVersionButItsNotLastOne(
295-
resourceID, newResource.getMetadata().getResourceVersion())) {
296-
R lastEvent = eventRecorder.getLastEvent(resourceID);
297-
298-
log.debug(
299-
"Found events in event buffer but the target event is not last for id: {}. Propagating event.",
300-
resourceID);
301-
if (eventAcceptedByFilter(operation, newResource, oldResource)) {
302-
propagateEvent(lastEvent);
303-
}
304-
}
305-
}
306-
} finally {
307-
log.debug("Stopping event recording for: {}", resourceID);
308-
eventRecorder.stopEventRecording(resourceID);
267+
R current = get(resourceID).orElse(null);
268+
if ((oldResource == null && current == null) || (current != null && oldResource != null
269+
&& current.getMetadata().getResourceVersion().equals(oldResource.getMetadata().getResourceVersion()))) {
270+
log.debug(
271+
"Temporarily moving ahead to target version {} for resource id: {}",
272+
newResource.getMetadata().getResourceVersion(), resourceID);
273+
temporaryResourceCache.unconditionallyCacheResource(newResource);
274+
} else {
275+
// if the resource is not added to the temp cache, it is cleared, since
276+
// the cache is cleared by subsequent events after updates, but if those did not receive
277+
// the temp cache is still filled at this point with an old resource
278+
log.debug("Cleaning temporary cache for resource id: {}", resourceID);
279+
temporaryResourceCache.removeResourceFromCache(newResource);
309280
}
310281
}
311282

312283
private boolean useSecondaryToPrimaryIndex() {
313284
return this.primaryToSecondaryMapper == null;
314285
}
315286

316-
@Override
317-
public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID,
318-
R resource) {
319-
log.debug("Starting event recording for: {}", resourceID);
320-
eventRecorder.startEventRecording(resourceID);
321-
}
322-
323-
/**
324-
* Mean to be called to clean up in case of an exception from the client. Usually in a catch
325-
* block.
326-
*
327-
* @param resourceID to cleanup
328-
*/
329-
@Override
330-
public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resourceID) {
331-
log.debug("Stopping event recording for: {}", resourceID);
332-
eventRecorder.stopEventRecording(resourceID);
333-
}
334-
335287
@Override
336288
public boolean allowsNamespaceChanges() {
337289
return getConfiguration().followControllerNamespaceChanges();
@@ -361,13 +313,15 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
361313

362314
// Since this event source instance is created by the user, the ConfigurationService is actually
363315
// injected after it is registered. Some of the subcomponents are initialized at that time here.
316+
@Override
364317
public void setConfigurationService(ConfigurationService configurationService) {
365318
super.setConfigurationService(configurationService);
366319

367320
cache.addIndexers(indexerBuffer);
368321
indexerBuffer = null;
369322
}
370323

324+
@Override
371325
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
372326
if (indexerBuffer == null) {
373327
throw new OperatorException("Cannot add indexers after InformerEventSource started.");

0 commit comments

Comments
 (0)