11
11
import io .fabric8 .kubernetes .api .model .HasMetadata ;
12
12
import io .fabric8 .kubernetes .client .DefaultKubernetesClient ;
13
13
import io .fabric8 .kubernetes .client .KubernetesClient ;
14
- import io .fabric8 .kubernetes .client .NamespacedKubernetesClient ;
15
14
import io .fabric8 .kubernetes .client .Version ;
16
- import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElectionConfig ;
17
- import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElector ;
18
- import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElectorBuilder ;
19
- import io .fabric8 .kubernetes .client .extended .leaderelection .resourcelock .LeaseLock ;
20
- import io .fabric8 .kubernetes .client .extended .leaderelection .resourcelock .Lock ;
21
15
import io .javaoperatorsdk .operator .api .config .*;
22
16
import io .javaoperatorsdk .operator .api .reconciler .Reconciler ;
23
17
import io .javaoperatorsdk .operator .processing .Controller ;
27
21
public class Operator implements LifecycleAware {
28
22
private static final Logger log = LoggerFactory .getLogger (Operator .class );
29
23
private final KubernetesClient kubernetesClient ;
30
- private final ControllerManager controllers = new ControllerManager ();
31
- private LeaderElector leaderElector ;
24
+ private final ControllerManager controllerManager = new ControllerManager ();
25
+ private final LeaderElectionManager leaderElectionManager =
26
+ new LeaderElectionManager (controllerManager );
32
27
33
28
public Operator () {
34
29
this (new DefaultKubernetesClient (), ConfigurationServiceProvider .instance ());
@@ -53,7 +48,8 @@ public Operator(Consumer<ConfigurationServiceOverrider> overrider) {
53
48
public Operator (KubernetesClient client , Consumer <ConfigurationServiceOverrider > overrider ) {
54
49
this .kubernetesClient = client ;
55
50
ConfigurationServiceProvider .overrideCurrent (overrider );
56
-
51
+ ConfigurationServiceProvider .instance ().getLeaderElectionConfiguration ()
52
+ .ifPresent (c -> leaderElectionManager .init (c , kubernetesClient ));
57
53
}
58
54
59
55
/**
@@ -66,6 +62,8 @@ public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider>
66
62
public Operator (KubernetesClient kubernetesClient , ConfigurationService configurationService ) {
67
63
this .kubernetesClient = kubernetesClient ;
68
64
ConfigurationServiceProvider .set (configurationService );
65
+ configurationService .getLeaderElectionConfiguration ()
66
+ .ifPresent (c -> leaderElectionManager .init (c , kubernetesClient ));
69
67
}
70
68
71
69
/** Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. */
@@ -85,7 +83,7 @@ public KubernetesClient getKubernetesClient() {
85
83
public void start () {
86
84
try {
87
85
88
- controllers .shouldStart ();
86
+ controllerManager .shouldStart ();
89
87
90
88
final var version = ConfigurationServiceProvider .instance ().getVersion ();
91
89
log .info (
@@ -98,7 +96,7 @@ public void start() {
98
96
log .info ("Client version: {}" , clientVersion );
99
97
100
98
ExecutorServiceManager .init ();
101
- controllers .start ();
99
+ controllerManager .start ();
102
100
} catch (Exception e ) {
103
101
log .error ("Error starting operator" , e );
104
102
stop ();
@@ -112,7 +110,7 @@ public void stop() throws OperatorException {
112
110
log .info (
113
111
"Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
114
112
115
- controllers .stop ();
113
+ controllerManager .stop ();
116
114
117
115
ExecutorServiceManager .stop ();
118
116
if (configurationService .closeClientOnStop ()) {
@@ -162,7 +160,7 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
162
160
163
161
final var controller = new Controller <>(reconciler , configuration , kubernetesClient );
164
162
165
- controllers .add (controller );
163
+ controllerManager .add (controller );
166
164
167
165
final var watchedNS = configuration .watchAllNamespaces () ? "[all namespaces]"
168
166
: configuration .getEffectiveNamespaces ();
@@ -192,37 +190,15 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
192
190
}
193
191
194
192
public Optional <RegisteredController > getRegisteredController (String name ) {
195
- return controllers .get (name ).map (RegisteredController .class ::cast );
193
+ return controllerManager .get (name ).map (RegisteredController .class ::cast );
196
194
}
197
195
198
196
public Set <RegisteredController > getRegisteredControllers () {
199
- return new HashSet <>(controllers .controllers ());
197
+ return new HashSet <>(controllerManager .controllers ());
200
198
}
201
199
202
200
public int getRegisteredControllersNumber () {
203
- return controllers .size ();
204
- }
205
-
206
- private void initLeaderElector (KubernetesClient client ) {
207
-
208
- var leaderElectionConfig =
209
- ConfigurationServiceProvider .instance ().getLeaderElectionConfiguration ();
210
- if (leaderElectionConfig .isEmpty ()) {
211
- return ;
212
- }
213
- var conf = leaderElectionConfig .get ();
214
-
215
- // todo discuss openshift client not a NamespacedKubernetesClient?
216
- // name of the pod
217
- // todo configurable
218
- String identity = System .getenv ("HOSTNAME" );
219
-
220
- Lock lock = new LeaseLock (conf .getLeaseNamespace (), conf .getLeaseName (), identity );
221
- // todo check release on cancel
222
- leaderElector = new LeaderElectorBuilder <>((NamespacedKubernetesClient ) client )
223
- .withConfig (new LeaderElectionConfig (lock , conf .getLeaseDuration (), conf .getRenewDeadline (),
224
- conf .getRetryPeriod (), null , true , conf .getLeaseName ()))
225
- .build ();
201
+ return controllerManager .size ();
226
202
}
227
203
228
204
}
0 commit comments