11
11
import io .fabric8 .kubernetes .api .model .HasMetadata ;
12
12
import io .fabric8 .kubernetes .client .DefaultKubernetesClient ;
13
13
import io .fabric8 .kubernetes .client .KubernetesClient ;
14
+ import io .fabric8 .kubernetes .client .NamespacedKubernetesClient ;
14
15
import io .fabric8 .kubernetes .client .Version ;
15
- import io .javaoperatorsdk . operator . api . config . ConfigurationService ;
16
- import io .javaoperatorsdk . operator . api . config . ConfigurationServiceOverrider ;
17
- import io .javaoperatorsdk . operator . api . config . ConfigurationServiceProvider ;
18
- import io .javaoperatorsdk . operator . api . config . ControllerConfiguration ;
19
- import io .javaoperatorsdk . operator . api . config . ControllerConfigurationOverrider ;
20
- import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
16
+ import io .fabric8 . kubernetes . client . extended . leaderelection . LeaderElectionConfig ;
17
+ import io .fabric8 . kubernetes . client . extended . leaderelection . LeaderElector ;
18
+ import io .fabric8 . kubernetes . client . extended . leaderelection . LeaderElectorBuilder ;
19
+ import io .fabric8 . kubernetes . client . extended . leaderelection . resourcelock . LeaseLock ;
20
+ import io .fabric8 . kubernetes . client . extended . leaderelection . resourcelock . Lock ;
21
+ import io .javaoperatorsdk .operator .api .config .* ;
21
22
import io .javaoperatorsdk .operator .api .reconciler .Reconciler ;
22
23
import io .javaoperatorsdk .operator .processing .Controller ;
23
24
import io .javaoperatorsdk .operator .processing .LifecycleAware ;
@@ -27,6 +28,7 @@ public class Operator implements LifecycleAware {
27
28
private static final Logger log = LoggerFactory .getLogger (Operator .class );
28
29
private final KubernetesClient kubernetesClient ;
29
30
private final ControllerManager controllers = new ControllerManager ();
31
+ private LeaderElector leaderElector ;
30
32
31
33
public Operator () {
32
34
this (new DefaultKubernetesClient (), ConfigurationServiceProvider .instance ());
@@ -49,8 +51,9 @@ public Operator(Consumer<ConfigurationServiceOverrider> overrider) {
49
51
}
50
52
51
53
public Operator (KubernetesClient client , Consumer <ConfigurationServiceOverrider > overrider ) {
52
- this ( client ) ;
54
+ this . kubernetesClient = client ;
53
55
ConfigurationServiceProvider .overrideCurrent (overrider );
56
+
54
57
}
55
58
56
59
/**
@@ -81,6 +84,7 @@ public KubernetesClient getKubernetesClient() {
81
84
*/
82
85
public void start () {
83
86
try {
87
+
84
88
controllers .shouldStart ();
85
89
86
90
final var version = ConfigurationServiceProvider .instance ().getVersion ();
@@ -199,4 +203,26 @@ public int getRegisteredControllersNumber() {
199
203
return controllers .size ();
200
204
}
201
205
206
+ private void initLeaderElector (KubernetesClient client ) {
207
+
208
+ var leaderElectionConfig =
209
+ ConfigurationServiceProvider .instance ().getLeaderElectionConfiguration ();
210
+ if (leaderElectionConfig .isEmpty ()) {
211
+ return ;
212
+ }
213
+ var conf = leaderElectionConfig .get ();
214
+
215
+ // todo discuss openshift client not a NamespacedKubernetesClient?
216
+ // name of the pod
217
+ // todo configurable
218
+ String identity = System .getenv ("HOSTNAME" );
219
+
220
+ Lock lock = new LeaseLock (conf .getLeaseNamespace (), conf .getLeaseName (), identity );
221
+ // todo check release on cancel
222
+ leaderElector = new LeaderElectorBuilder <>((NamespacedKubernetesClient ) client )
223
+ .withConfig (new LeaderElectionConfig (lock , conf .getLeaseDuration (), conf .getRenewDeadline (),
224
+ conf .getRetryPeriod (), null , true , conf .getLeaseName ()))
225
+ .build ();
226
+ }
227
+
202
228
}
0 commit comments