1
1
package io .javaoperatorsdk .operator .processing .event ;
2
2
3
- import java .util .LinkedHashSet ;
4
- import java .util .List ;
5
- import java .util .Objects ;
6
- import java .util .Set ;
3
+ import java .util .*;
7
4
import java .util .stream .Collectors ;
8
5
9
6
import org .slf4j .Logger ;
@@ -62,20 +59,22 @@ private void postProcessDefaultEventSources() {
62
59
*/
63
60
@ Override
64
61
public synchronized void start () {
65
- for (var eventSource : eventSources ) {
66
- try {
67
- logEventSourceEvent (eventSource , "Starting" );
68
- eventSource .start ();
69
- logEventSourceEvent (eventSource , "Started" );
70
- } catch (MissingCRDException e ) {
71
- throw e ; // leave untouched
72
- } catch (Exception e ) {
73
- throw new OperatorException ("Couldn't start source " + eventSource .name (), e );
74
- }
75
- }
62
+ startEventSource (eventSources .namedControllerResourceEventSource ());
63
+ eventSources .additionalNamedEventSources ().parallel ().forEach (this ::startEventSource );
76
64
eventProcessor .start ();
77
65
}
78
66
67
+ @ SuppressWarnings ("rawtypes" )
68
+
69
+
70
+ @ Override
71
+ public synchronized void stop () {
72
+ stopEventSource (eventSources .namedControllerResourceEventSource ());
73
+ eventSources .additionalNamedEventSources ().parallel ().forEach (this ::stopEventSource );
74
+ eventSources .clear ();
75
+ eventProcessor .stop ();
76
+ }
77
+
79
78
@ SuppressWarnings ("rawtypes" )
80
79
private void logEventSourceEvent (NamedEventSource eventSource , String event ) {
81
80
if (log .isDebugEnabled ()) {
@@ -89,19 +88,26 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
89
88
}
90
89
}
91
90
92
- @ Override
93
- public synchronized void stop () {
94
- for (var eventSource : eventSources ) {
95
- try {
96
- logEventSourceEvent (eventSource , "Stopping" );
97
- eventSource .stop ();
98
- logEventSourceEvent (eventSource , "Stopped" );
99
- } catch (Exception e ) {
100
- log .warn ("Error closing {} -> {}" , eventSource .name (), e );
101
- }
91
+ private void startEventSource (NamedEventSource eventSource ) {
92
+ try {
93
+ logEventSourceEvent (eventSource , "Starting" );
94
+ eventSource .start ();
95
+ logEventSourceEvent (eventSource , "Started" );
96
+ } catch (MissingCRDException e ) {
97
+ throw e ; // leave untouched
98
+ } catch (Exception e ) {
99
+ throw new OperatorException ("Couldn't start source " + eventSource .name (), e );
100
+ }
101
+ }
102
+
103
+ private void stopEventSource (NamedEventSource eventSource ) {
104
+ try {
105
+ logEventSourceEvent (eventSource , "Stopping" );
106
+ eventSource .stop ();
107
+ logEventSourceEvent (eventSource , "Stopped" );
108
+ } catch (Exception e ) {
109
+ log .warn ("Error closing {} -> {}" , eventSource .name (), e );
102
110
}
103
- eventSources .clear ();
104
- eventProcessor .stop ();
105
111
}
106
112
107
113
public final void registerEventSource (EventSource eventSource ) throws OperatorException {
@@ -127,7 +133,7 @@ public final synchronized void registerEventSource(String name, EventSource even
127
133
128
134
@ SuppressWarnings ("unchecked" )
129
135
public void broadcastOnResourceEvent (ResourceAction action , R resource , R oldResource ) {
130
- for ( var eventSource : eventSources ) {
136
+ eventSources . additionalNamedEventSources (). forEach ( eventSource -> {
131
137
if (eventSource instanceof ResourceEventAware ) {
132
138
var lifecycleAwareES = ((ResourceEventAware <R >) eventSource );
133
139
switch (action ) {
@@ -142,16 +148,19 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes
142
148
break ;
143
149
}
144
150
}
145
- }
151
+ });
146
152
}
147
153
148
154
public void changeNamespaces (Set <String > namespaces ) {
149
155
eventProcessor .stop ();
156
+ eventSources .controllerResourceEventSource ()
157
+ .changeNamespaces (namespaces );
150
158
eventSources
151
- .eventSources ()
159
+ .additionalEventSources ()
152
160
.filter (NamespaceChangeable .class ::isInstance )
153
161
.map (NamespaceChangeable .class ::cast )
154
162
.filter (NamespaceChangeable ::allowsNamespaceChanges )
163
+ .parallel ()
155
164
.forEach (ies -> ies .changeNamespaces (namespaces ));
156
165
eventProcessor .start ();
157
166
}
0 commit comments