Skip to content

Commit 71f27e4

Browse files
committed
feat: parallel start of additional event sources (#1284)
1 parent 19dc589 commit 71f27e4

File tree

3 files changed

+55
-64
lines changed

3 files changed

+55
-64
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+39-30
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import java.util.LinkedHashSet;
4-
import java.util.List;
5-
import java.util.Objects;
6-
import java.util.Set;
3+
import java.util.*;
74
import java.util.stream.Collectors;
85

96
import org.slf4j.Logger;
@@ -62,20 +59,22 @@ private void postProcessDefaultEventSources() {
6259
*/
6360
@Override
6461
public synchronized void start() {
65-
for (var eventSource : eventSources) {
66-
try {
67-
logEventSourceEvent(eventSource, "Starting");
68-
eventSource.start();
69-
logEventSourceEvent(eventSource, "Started");
70-
} catch (MissingCRDException e) {
71-
throw e; // leave untouched
72-
} catch (Exception e) {
73-
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
74-
}
75-
}
62+
startEventSource(eventSources.namedControllerResourceEventSource());
63+
eventSources.additionalNamedEventSources().parallel().forEach(this::startEventSource);
7664
eventProcessor.start();
7765
}
7866

67+
@SuppressWarnings("rawtypes")
68+
69+
70+
@Override
71+
public synchronized void stop() {
72+
stopEventSource(eventSources.namedControllerResourceEventSource());
73+
eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
74+
eventSources.clear();
75+
eventProcessor.stop();
76+
}
77+
7978
@SuppressWarnings("rawtypes")
8079
private void logEventSourceEvent(NamedEventSource eventSource, String event) {
8180
if (log.isDebugEnabled()) {
@@ -89,19 +88,26 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
8988
}
9089
}
9190

92-
@Override
93-
public synchronized void stop() {
94-
for (var eventSource : eventSources) {
95-
try {
96-
logEventSourceEvent(eventSource, "Stopping");
97-
eventSource.stop();
98-
logEventSourceEvent(eventSource, "Stopped");
99-
} catch (Exception e) {
100-
log.warn("Error closing {} -> {}", eventSource.name(), e);
101-
}
91+
private void startEventSource(NamedEventSource eventSource) {
92+
try {
93+
logEventSourceEvent(eventSource, "Starting");
94+
eventSource.start();
95+
logEventSourceEvent(eventSource, "Started");
96+
} catch (MissingCRDException e) {
97+
throw e; // leave untouched
98+
} catch (Exception e) {
99+
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
100+
}
101+
}
102+
103+
private void stopEventSource(NamedEventSource eventSource) {
104+
try {
105+
logEventSourceEvent(eventSource, "Stopping");
106+
eventSource.stop();
107+
logEventSourceEvent(eventSource, "Stopped");
108+
} catch (Exception e) {
109+
log.warn("Error closing {} -> {}", eventSource.name(), e);
102110
}
103-
eventSources.clear();
104-
eventProcessor.stop();
105111
}
106112

107113
public final void registerEventSource(EventSource eventSource) throws OperatorException {
@@ -127,7 +133,7 @@ public final synchronized void registerEventSource(String name, EventSource even
127133

128134
@SuppressWarnings("unchecked")
129135
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
130-
for (var eventSource : eventSources) {
136+
eventSources.additionalNamedEventSources().forEach(eventSource -> {
131137
if (eventSource instanceof ResourceEventAware) {
132138
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
133139
switch (action) {
@@ -142,16 +148,19 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes
142148
break;
143149
}
144150
}
145-
}
151+
});
146152
}
147153

148154
public void changeNamespaces(Set<String> namespaces) {
149155
eventProcessor.stop();
156+
eventSources.controllerResourceEventSource()
157+
.changeNamespaces(namespaces);
150158
eventSources
151-
.eventSources()
159+
.additionalEventSources()
152160
.filter(NamespaceChangeable.class::isInstance)
153161
.map(NamespaceChangeable.class::cast)
154162
.filter(NamespaceChangeable::allowsNamespaceChanges)
163+
.parallel()
155164
.forEach(ies -> ies.changeNamespaces(namespaces));
156165
eventProcessor.start();
157166
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
1515
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
1616

17-
class EventSources<R extends HasMetadata> implements Iterable<NamedEventSource> {
17+
class EventSources<R extends HasMetadata> {
1818

1919
public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME =
2020
"ControllerResourceEventSource";
@@ -40,26 +40,29 @@ TimerEventSource<R> retryEventSource() {
4040
return retryAndRescheduleTimerEventSource;
4141
}
4242

43-
@Override
44-
public Iterator<NamedEventSource> iterator() {
43+
public Stream<NamedEventSource> additionalNamedEventSources() {
4544
return Stream.concat(Stream.of(
46-
new NamedEventSource(controllerResourceEventSource, CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
4745
new NamedEventSource(retryAndRescheduleTimerEventSource,
4846
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
49-
flatMappedSources()).iterator();
47+
flatMappedSources());
48+
}
49+
50+
Stream<EventSource> additionalEventSources() {
51+
return Stream.concat(
52+
Stream.of(retryEventSource()).filter(Objects::nonNull),
53+
sources.values().stream().flatMap(c -> c.values().stream()));
54+
}
55+
56+
NamedEventSource namedControllerResourceEventSource() {
57+
return new NamedEventSource(controllerResourceEventSource,
58+
CONTROLLER_RESOURCE_EVENT_SOURCE_NAME);
5059
}
5160

5261
Stream<NamedEventSource> flatMappedSources() {
5362
return sources.values().stream().flatMap(c -> c.entrySet().stream()
5463
.map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey())));
5564
}
5665

57-
Stream<EventSource> eventSources() {
58-
return Stream.concat(
59-
Stream.of(controllerResourceEventSource(), retryEventSource()).filter(Objects::nonNull),
60-
sources.values().stream().flatMap(c -> c.values().stream()));
61-
}
62-
6366
public void clear() {
6467
sources.clear();
6568
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java

+2-23
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import io.javaoperatorsdk.operator.processing.Controller;
1010
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1111

12-
import static io.javaoperatorsdk.operator.processing.event.EventSources.CONTROLLER_RESOURCE_EVENT_SOURCE_NAME;
1312
import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME;
1413
import static org.assertj.core.api.Assertions.assertThat;
1514
import static org.junit.jupiter.api.Assertions.*;
@@ -29,34 +28,14 @@ void cannotAddTwoEventSourcesWithSameName() {
2928
});
3029
}
3130

32-
@Test
33-
void allEventSourcesShouldReturnAll() {
34-
// initial state doesn't have ControllerResourceEventSource
35-
assertThat(eventSources.eventSources()).containsExactly(eventSources.retryEventSource());
36-
37-
initControllerEventSource();
38-
39-
assertThat(eventSources.eventSources()).containsExactly(
40-
eventSources.controllerResourceEventSource(),
41-
eventSources.retryEventSource());
42-
43-
final var source = mock(EventSource.class);
44-
eventSources.add(EVENT_SOURCE_NAME, source);
45-
// order matters
46-
assertThat(eventSources.eventSources())
47-
.containsExactly(eventSources.controllerResourceEventSource(),
48-
eventSources.retryEventSource(), source);
49-
}
5031

5132
@Test
52-
void eventSourcesIteratorShouldReturnControllerEventSourceAsFirst() {
33+
void eventSourcesStreamShouldNotReturnControllerEventSource() {
5334
initControllerEventSource();
5435
final var source = mock(EventSource.class);
5536
eventSources.add(EVENT_SOURCE_NAME, source);
5637

57-
assertThat(eventSources.iterator()).toIterable().containsExactly(
58-
new NamedEventSource(eventSources.controllerResourceEventSource(),
59-
CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
38+
assertThat(eventSources.additionalNamedEventSources()).containsExactly(
6039
new NamedEventSource(eventSources.retryEventSource(),
6140
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME),
6241
new NamedEventSource(source, EVENT_SOURCE_NAME));

0 commit comments

Comments
 (0)