Skip to content

Commit 4676ce6

Browse files
csvirimetacosm
andcommitted
feat: support leader Election (#1358)
Fixes #411 Co-authored-by: Chris Laprun <[email protected]>
1 parent 51e16ed commit 4676ce6

File tree

26 files changed

+906
-165
lines changed

26 files changed

+906
-165
lines changed

.github/workflows/e2e-test.yml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
- "sample-operators/mysql-schema"
2222
- "sample-operators/tomcat-operator"
2323
- "sample-operators/webpage"
24+
- "sample-operators/leader-election"
2425
runs-on: ubuntu-latest
2526
steps:
2627
- name: Checkout

docs/documentation/features.md

+13
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,19 @@ See also
683683
the [integration test](https://github.com/java-operator-sdk/java-operator-sdk/blob/ec37025a15046d8f409c77616110024bf32c3416/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/changenamespace/ChangeNamespaceTestReconciler.java)
684684
for this feature.
685685

686+
## Leader Election
687+
688+
Operators are generally deployed with a single running or active instance. However, it is
689+
possible to deploy multiple instances in such a way that only one, called the "leader", processes the
690+
events. This is achieved via a mechanism called "leader election". While all the instances are
691+
running, and even start their event sources to populate the caches, only the leader will process
692+
the events. This means that should the leader change for any reason, for example because it
693+
crashed, the other instances are already warmed up and ready to pick up where the previous
694+
leader left off should one of them become elected leader.
695+
696+
See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/144947d89323f1c65de6e86bd8b9a6a8ffe714ff/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L26-L30)
697+
.
698+
686699
## Monitoring with Micrometer
687700

688701
## Automatic Generation of CRDs
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.util.Collection;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import io.javaoperatorsdk.operator.processing.Controller;
12+
13+
/**
14+
* Not to be confused with the controller manager concept from Go's controller-runtime project. In
15+
* JOSDK, the equivalent concept is {@link Operator}.
16+
*/
17+
class ControllerManager {
18+
19+
private static final Logger log = LoggerFactory.getLogger(ControllerManager.class);
20+
21+
@SuppressWarnings("rawtypes")
22+
private final Map<String, Controller> controllers = new HashMap<>();
23+
private boolean started = false;
24+
25+
public synchronized void shouldStart() {
26+
if (started) {
27+
return;
28+
}
29+
if (controllers.isEmpty()) {
30+
throw new OperatorException("No Controller exists. Exiting!");
31+
}
32+
}
33+
34+
public synchronized void start(boolean startEventProcessor) {
35+
controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
36+
started = true;
37+
}
38+
39+
public synchronized void stop() {
40+
controllers().parallelStream().forEach(closeable -> {
41+
log.debug("closing {}", closeable);
42+
closeable.stop();
43+
});
44+
started = false;
45+
}
46+
47+
public synchronized void startEventProcessing() {
48+
controllers().parallelStream().forEach(Controller::startEventProcessing);
49+
}
50+
51+
@SuppressWarnings({"unchecked", "rawtypes"})
52+
synchronized void add(Controller controller) {
53+
final var configuration = controller.getConfiguration();
54+
final var resourceTypeName = ReconcilerUtils
55+
.getResourceTypeNameWithVersion(configuration.getResourceClass());
56+
final var existing = controllers.get(resourceTypeName);
57+
if (existing != null) {
58+
throw new OperatorException("Cannot register controller '" + configuration.getName()
59+
+ "': another controller named '" + existing.getConfiguration().getName()
60+
+ "' is already registered for resource '" + resourceTypeName + "'");
61+
}
62+
controllers.put(resourceTypeName, controller);
63+
}
64+
65+
@SuppressWarnings("rawtypes")
66+
synchronized Optional<Controller> get(String name) {
67+
return controllers().stream()
68+
.filter(c -> name.equals(c.getConfiguration().getName()))
69+
.findFirst();
70+
}
71+
72+
@SuppressWarnings("rawtypes")
73+
synchronized Collection<Controller> controllers() {
74+
return controllers.values();
75+
}
76+
77+
synchronized int size() {
78+
return controllers.size();
79+
}
80+
}
81+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.util.UUID;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.fabric8.kubernetes.client.KubernetesClient;
10+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
11+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
12+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
13+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
14+
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
15+
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock;
16+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
17+
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
18+
19+
public class LeaderElectionManager {
20+
21+
private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class);
22+
23+
private LeaderElector leaderElector = null;
24+
private final ControllerManager controllerManager;
25+
private String identity;
26+
private CompletableFuture<?> leaderElectionFuture;
27+
28+
public LeaderElectionManager(ControllerManager controllerManager) {
29+
this.controllerManager = controllerManager;
30+
}
31+
32+
public void init(LeaderElectionConfiguration config, KubernetesClient client) {
33+
this.identity = identity(config);
34+
Lock lock = new LeaseLock(config.getLeaseNamespace(), config.getLeaseName(), identity);
35+
// releaseOnCancel is not used in the underlying implementation
36+
leaderElector = new LeaderElectorBuilder(client,
37+
ConfigurationServiceProvider.instance().getExecutorService())
38+
.withConfig(
39+
new LeaderElectionConfig(lock, config.getLeaseDuration(), config.getRenewDeadline(),
40+
config.getRetryPeriod(), leaderCallbacks(), true, config.getLeaseName()))
41+
.build();
42+
}
43+
44+
public boolean isLeaderElectionEnabled() {
45+
return leaderElector != null;
46+
}
47+
48+
private LeaderCallbacks leaderCallbacks() {
49+
return new LeaderCallbacks(this::startLeading, this::stopLeading, leader -> {
50+
log.info("New leader with identity: {}", leader);
51+
});
52+
}
53+
54+
private void startLeading() {
55+
controllerManager.startEventProcessing();
56+
}
57+
58+
private void stopLeading() {
59+
log.info("Stopped leading for identity: {}. Exiting.", identity);
60+
// When leader stops leading the process ends immediately to prevent multiple reconciliations
61+
// running parallel.
62+
// Note that some reconciliations might run for a very long time.
63+
System.exit(1);
64+
}
65+
66+
private String identity(LeaderElectionConfiguration config) {
67+
String id = config.getIdentity().orElse(System.getenv("HOSTNAME"));
68+
if (id == null || id.isBlank()) {
69+
id = UUID.randomUUID().toString();
70+
}
71+
return id;
72+
}
73+
74+
public void start() {
75+
if (isLeaderElectionEnabled()) {
76+
leaderElectionFuture = leaderElector.start();
77+
}
78+
}
79+
80+
public void stop() {
81+
if (leaderElectionFuture != null) {
82+
leaderElectionFuture.cancel(false);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)