-
Notifications
You must be signed in to change notification settings - Fork 218
Leader Election #1358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Leader Election #1358
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
e0f3009
feat: leader election
csviri 5a19f86
refactor: event processor to controller
csviri 9837ccc
wip
csviri f035fb3
wip
csviri 3f3923c
migrate to v6
csviri dc3f2a9
leader election impl
csviri 866f06a
leader election IT
csviri 4edd392
format
csviri 53cd42f
e2e test
csviri 5a1cdff
e2e improvements
csviri bb4eb9d
pod startup timeout
csviri f0fddf3
pod
csviri f81be31
image pull policy
csviri 382cfd1
disable test for local mode
csviri c2d2211
docs
csviri d29f51b
fixes on rebase
csviri 1e4f64c
docs: improve wording
metacosm 6361b36
fix: match other methods' usage pattern
metacosm 440bf0f
refactor: make sure all constructors cascade
metacosm 0cd2a90
docs: improve wording
metacosm 93a260a
refactor: move identity generation to configuration
metacosm b09bf5c
refactor: isLeaderElectionOn -> isLeaderElectionEnabled
metacosm 951d28d
refactor: restore LifecycleAware compatibility
metacosm a2b79d8
Revert "refactor: move identity generation to configuration"
csviri df6f2ef
removed builder
csviri e7a80ee
no builder in e2e test
csviri f8e2211
put back test constraint
csviri 5dbef65
improve expressiveness on constat
csviri aec94c2
update stat with optimistic locking
csviri e58457f
fix config override sequence issue
csviri 24e2e3f
added back test run condition
csviri File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package io.javaoperatorsdk.operator; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.javaoperatorsdk.operator.processing.Controller; | ||
|
||
/** | ||
* Not to be confused with the controller manager concept from Go's controller-runtime project. In | ||
* JOSDK, the equivalent concept is {@link Operator}. | ||
*/ | ||
class ControllerManager { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ControllerManager.class); | ||
|
||
@SuppressWarnings("rawtypes") | ||
private final Map<String, Controller> controllers = new HashMap<>(); | ||
private boolean started = false; | ||
|
||
public synchronized void shouldStart() { | ||
if (started) { | ||
return; | ||
} | ||
if (controllers.isEmpty()) { | ||
throw new OperatorException("No Controller exists. Exiting!"); | ||
} | ||
} | ||
|
||
public synchronized void start(boolean startEventProcessor) { | ||
controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); | ||
started = true; | ||
} | ||
|
||
public synchronized void stop() { | ||
controllers().parallelStream().forEach(closeable -> { | ||
log.debug("closing {}", closeable); | ||
closeable.stop(); | ||
}); | ||
started = false; | ||
} | ||
|
||
public synchronized void startEventProcessing() { | ||
controllers().parallelStream().forEach(Controller::startEventProcessing); | ||
} | ||
|
||
@SuppressWarnings({"unchecked", "rawtypes"}) | ||
synchronized void add(Controller controller) { | ||
final var configuration = controller.getConfiguration(); | ||
final var resourceTypeName = ReconcilerUtils | ||
.getResourceTypeNameWithVersion(configuration.getResourceClass()); | ||
final var existing = controllers.get(resourceTypeName); | ||
if (existing != null) { | ||
throw new OperatorException("Cannot register controller '" + configuration.getName() | ||
+ "': another controller named '" + existing.getConfiguration().getName() | ||
+ "' is already registered for resource '" + resourceTypeName + "'"); | ||
} | ||
controllers.put(resourceTypeName, controller); | ||
} | ||
|
||
@SuppressWarnings("rawtypes") | ||
synchronized Optional<Controller> get(String name) { | ||
return controllers().stream() | ||
.filter(c -> name.equals(c.getConfiguration().getName())) | ||
.findFirst(); | ||
} | ||
|
||
@SuppressWarnings("rawtypes") | ||
synchronized Collection<Controller> controllers() { | ||
return controllers.values(); | ||
} | ||
|
||
synchronized int size() { | ||
return controllers.size(); | ||
} | ||
} | ||
|
85 changes: 85 additions & 0 deletions
85
operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package io.javaoperatorsdk.operator; | ||
|
||
import java.util.UUID; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.fabric8.kubernetes.client.KubernetesClient; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; | ||
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock; | ||
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; | ||
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration; | ||
|
||
public class LeaderElectionManager { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class); | ||
|
||
private LeaderElector leaderElector = null; | ||
private final ControllerManager controllerManager; | ||
private String identity; | ||
private CompletableFuture<?> leaderElectionFuture; | ||
|
||
public LeaderElectionManager(ControllerManager controllerManager) { | ||
this.controllerManager = controllerManager; | ||
} | ||
|
||
public void init(LeaderElectionConfiguration config, KubernetesClient client) { | ||
this.identity = identity(config); | ||
Lock lock = new LeaseLock(config.getLeaseNamespace(), config.getLeaseName(), identity); | ||
// releaseOnCancel is not used in the underlying implementation | ||
leaderElector = new LeaderElectorBuilder(client, | ||
ConfigurationServiceProvider.instance().getExecutorService()) | ||
.withConfig( | ||
new LeaderElectionConfig(lock, config.getLeaseDuration(), config.getRenewDeadline(), | ||
config.getRetryPeriod(), leaderCallbacks(), true, config.getLeaseName())) | ||
.build(); | ||
} | ||
|
||
public boolean isLeaderElectionEnabled() { | ||
return leaderElector != null; | ||
} | ||
|
||
private LeaderCallbacks leaderCallbacks() { | ||
return new LeaderCallbacks(this::startLeading, this::stopLeading, leader -> { | ||
log.info("New leader with identity: {}", leader); | ||
}); | ||
} | ||
|
||
private void startLeading() { | ||
controllerManager.startEventProcessing(); | ||
} | ||
|
||
private void stopLeading() { | ||
log.info("Stopped leading for identity: {}. Exiting.", identity); | ||
// When leader stops leading the process ends immediately to prevent multiple reconciliations | ||
// running parallel. | ||
// Note that some reconciliations might run for a very long time. | ||
System.exit(1); | ||
} | ||
|
||
private String identity(LeaderElectionConfiguration config) { | ||
String id = config.getIdentity().orElse(System.getenv("HOSTNAME")); | ||
if (id == null || id.isBlank()) { | ||
id = UUID.randomUUID().toString(); | ||
} | ||
return id; | ||
} | ||
|
||
public void start() { | ||
if (isLeaderElectionEnabled()) { | ||
leaderElectionFuture = leaderElector.start(); | ||
} | ||
} | ||
|
||
public void stop() { | ||
if (leaderElectionFuture != null) { | ||
leaderElectionFuture.cancel(false); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is quite awkward, imo… maybe we should improve it in the Fabric8 client project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes, this could be better there.