1
1
package io .javaoperatorsdk .operator .processing .event ;
2
2
3
+ import java .time .Duration ;
3
4
import java .util .HashMap ;
4
5
import java .util .HashSet ;
5
6
import java .util .Map ;
20
21
import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
21
22
import io .javaoperatorsdk .operator .processing .LifecycleAware ;
22
23
import io .javaoperatorsdk .operator .processing .MDCUtils ;
24
+ import io .javaoperatorsdk .operator .processing .event .rate .RateLimiter ;
23
25
import io .javaoperatorsdk .operator .processing .event .source .Cache ;
24
26
import io .javaoperatorsdk .operator .processing .event .source .controller .ResourceAction ;
25
27
import io .javaoperatorsdk .operator .processing .event .source .controller .ResourceEvent ;
32
34
class EventProcessor <R extends HasMetadata > implements EventHandler , LifecycleAware {
33
35
34
36
private static final Logger log = LoggerFactory .getLogger (EventProcessor .class );
37
+ private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50 ;
35
38
39
+ private volatile boolean running ;
36
40
private final Set <ResourceID > underProcessing = new HashSet <>();
37
41
private final ReconciliationDispatcher <R > reconciliationDispatcher ;
38
42
private final Retry retry ;
39
43
private final Map <ResourceID , RetryExecution > retryState = new HashMap <>();
40
44
private final ExecutorService executor ;
41
45
private final String controllerName ;
42
46
private final Metrics metrics ;
43
- private volatile boolean running ;
44
47
private final Cache <R > cache ;
45
48
private final EventSourceManager <R > eventSourceManager ;
46
49
private final EventMarker eventMarker = new EventMarker ();
50
+ private final RateLimiter rateLimiter ;
47
51
48
52
EventProcessor (EventSourceManager <R > eventSourceManager ) {
49
53
this (
@@ -53,6 +57,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
53
57
new ReconciliationDispatcher <>(eventSourceManager .getController ()),
54
58
eventSourceManager .getController ().getConfiguration ().getRetry (),
55
59
ConfigurationServiceProvider .instance ().getMetrics (),
60
+ eventSourceManager .getController ().getConfiguration ().getRateLimiter (),
56
61
eventSourceManager );
57
62
}
58
63
@@ -61,6 +66,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
61
66
EventSourceManager <R > eventSourceManager ,
62
67
String relatedControllerName ,
63
68
Retry retry ,
69
+ RateLimiter rateLimiter ,
64
70
Metrics metrics ) {
65
71
this (
66
72
eventSourceManager .getControllerResourceEventSource (),
@@ -69,6 +75,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
69
75
reconciliationDispatcher ,
70
76
retry ,
71
77
metrics ,
78
+ rateLimiter ,
72
79
eventSourceManager );
73
80
}
74
81
@@ -79,6 +86,7 @@ private EventProcessor(
79
86
ReconciliationDispatcher <R > reconciliationDispatcher ,
80
87
Retry retry ,
81
88
Metrics metrics ,
89
+ RateLimiter rateLimiter ,
82
90
EventSourceManager <R > eventSourceManager ) {
83
91
this .running = false ;
84
92
this .executor =
@@ -92,6 +100,7 @@ private EventProcessor(
92
100
this .cache = cache ;
93
101
this .metrics = metrics != null ? metrics : Metrics .NOOP ;
94
102
this .eventSourceManager = eventSourceManager ;
103
+ this .rateLimiter = rateLimiter ;
95
104
}
96
105
97
106
@ Override
@@ -128,6 +137,11 @@ private void submitReconciliationExecution(ResourceID resourceID) {
128
137
Optional <R > latest = cache .get (resourceID );
129
138
latest .ifPresent (MDCUtils ::addResourceInfo );
130
139
if (!controllerUnderExecution && latest .isPresent ()) {
140
+ var rateLimiterPermission = rateLimiter .acquirePermission (resourceID );
141
+ if (rateLimiterPermission .isPresent ()) {
142
+ handleRateLimitedSubmission (resourceID , rateLimiterPermission .get ());
143
+ return ;
144
+ }
131
145
setUnderExecutionProcessing (resourceID );
132
146
final var retryInfo = retryInfo (resourceID );
133
147
ExecutionScope <R > executionScope = new ExecutionScope <>(latest .get (), retryInfo );
@@ -193,6 +207,14 @@ private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) {
193
207
return resourceEvent .getResource ().map (HasMetadata ::isMarkedForDeletion ).orElse (false );
194
208
}
195
209
210
+ private void handleRateLimitedSubmission (ResourceID resourceID , Duration minimalDuration ) {
211
+ var minimalDurationMillis = minimalDuration .toMillis ();
212
+ log .debug ("Rate limited resource: {}, rescheduled in {} millis" , resourceID ,
213
+ minimalDurationMillis );
214
+ retryEventSource ().scheduleOnce (resourceID ,
215
+ Math .max (minimalDurationMillis , MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION ));
216
+ }
217
+
196
218
private RetryInfo retryInfo (ResourceID resourceID ) {
197
219
return retryState .get (resourceID );
198
220
}
@@ -251,11 +273,10 @@ private void reScheduleExecutionIfInstructed(
251
273
postExecutionControl
252
274
.getReScheduleDelay ()
253
275
.ifPresent (delay -> {
254
- if (log .isDebugEnabled ()) {
255
- log .debug ("ReScheduling event for resource: {} with delay: {}" ,
256
- ResourceID .fromResource (customResource ), delay );
257
- }
258
- retryEventSource ().scheduleOnce (customResource , delay );
276
+ var resourceID = ResourceID .fromResource (customResource );
277
+ log .debug ("ReScheduling event for resource: {} with delay: {}" ,
278
+ resourceID , delay );
279
+ retryEventSource ().scheduleOnce (resourceID , delay );
259
280
});
260
281
}
261
282
@@ -289,7 +310,7 @@ private void handleRetryOnException(
289
310
delay ,
290
311
resourceID );
291
312
metrics .failedReconciliation (resourceID , exception );
292
- retryEventSource ().scheduleOnce (executionScope . getResource () , delay );
313
+ retryEventSource ().scheduleOnce (resourceID , delay );
293
314
},
294
315
() -> log .error ("Exhausted retries for {}" , executionScope ));
295
316
}
@@ -315,6 +336,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
315
336
private void cleanupForDeletedEvent (ResourceID resourceID ) {
316
337
log .debug ("Cleaning up for delete event for: {}" , resourceID );
317
338
eventMarker .cleanup (resourceID );
339
+ rateLimiter .clear (resourceID );
318
340
metrics .cleanupDoneFor (resourceID );
319
341
}
320
342
0 commit comments