diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index df96f28d90..1007878ea9 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -21,6 +21,7 @@ jobs: - "sample-operators/mysql-schema" - "sample-operators/tomcat-operator" - "sample-operators/webpage" + - "sample-operators/leader-election" runs-on: ubuntu-latest steps: - name: Checkout diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 727349ddd3..06fa0698c0 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -683,6 +683,19 @@ See also the [integration test](https://github.com/java-operator-sdk/java-operator-sdk/blob/ec37025a15046d8f409c77616110024bf32c3416/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/changenamespace/ChangeNamespaceTestReconciler.java) for this feature. +## Leader Election + +Operators are generally deployed with a single running or active instance. However, it is +possible to deploy multiple instances in such a way that only one, called the "leader", processes the +events. This is achieved via a mechanism called "leader election". While all the instances are +running, and even start their event sources to populate the caches, only the leader will process +the events. This means that should the leader change for any reason, for example because it +crashed, the other instances are already warmed up and ready to pick up where the previous +leader left off should one of them become elected leader. + +See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/144947d89323f1c65de6e86bd8b9a6a8ffe714ff/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L26-L30) +. + ## Monitoring with Micrometer ## Automatic Generation of CRDs diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java new file mode 100644 index 0000000000..44aa430ec2 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -0,0 +1,81 @@ +package io.javaoperatorsdk.operator; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.operator.processing.Controller; + +/** + * Not to be confused with the controller manager concept from Go's controller-runtime project. In + * JOSDK, the equivalent concept is {@link Operator}. + */ +class ControllerManager { + + private static final Logger log = LoggerFactory.getLogger(ControllerManager.class); + + @SuppressWarnings("rawtypes") + private final Map controllers = new HashMap<>(); + private boolean started = false; + + public synchronized void shouldStart() { + if (started) { + return; + } + if (controllers.isEmpty()) { + throw new OperatorException("No Controller exists. Exiting!"); + } + } + + public synchronized void start(boolean startEventProcessor) { + controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); + started = true; + } + + public synchronized void stop() { + controllers().parallelStream().forEach(closeable -> { + log.debug("closing {}", closeable); + closeable.stop(); + }); + started = false; + } + + public synchronized void startEventProcessing() { + controllers().parallelStream().forEach(Controller::startEventProcessing); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + synchronized void add(Controller controller) { + final var configuration = controller.getConfiguration(); + final var resourceTypeName = ReconcilerUtils + .getResourceTypeNameWithVersion(configuration.getResourceClass()); + final var existing = controllers.get(resourceTypeName); + if (existing != null) { + throw new OperatorException("Cannot register controller '" + configuration.getName() + + "': another controller named '" + existing.getConfiguration().getName() + + "' is already registered for resource '" + resourceTypeName + "'"); + } + controllers.put(resourceTypeName, controller); + } + + @SuppressWarnings("rawtypes") + synchronized Optional get(String name) { + return controllers().stream() + .filter(c -> name.equals(c.getConfiguration().getName())) + .findFirst(); + } + + @SuppressWarnings("rawtypes") + synchronized Collection controllers() { + return controllers.values(); + } + + synchronized int size() { + return controllers.size(); + } +} + diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java new file mode 100644 index 0000000000..94d201dce9 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java @@ -0,0 +1,85 @@ +package io.javaoperatorsdk.operator; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration; + +public class LeaderElectionManager { + + private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class); + + private LeaderElector leaderElector = null; + private final ControllerManager controllerManager; + private String identity; + private CompletableFuture leaderElectionFuture; + + public LeaderElectionManager(ControllerManager controllerManager) { + this.controllerManager = controllerManager; + } + + public void init(LeaderElectionConfiguration config, KubernetesClient client) { + this.identity = identity(config); + Lock lock = new LeaseLock(config.getLeaseNamespace(), config.getLeaseName(), identity); + // releaseOnCancel is not used in the underlying implementation + leaderElector = new LeaderElectorBuilder(client, + ConfigurationServiceProvider.instance().getExecutorService()) + .withConfig( + new LeaderElectionConfig(lock, config.getLeaseDuration(), config.getRenewDeadline(), + config.getRetryPeriod(), leaderCallbacks(), true, config.getLeaseName())) + .build(); + } + + public boolean isLeaderElectionEnabled() { + return leaderElector != null; + } + + private LeaderCallbacks leaderCallbacks() { + return new LeaderCallbacks(this::startLeading, this::stopLeading, leader -> { + log.info("New leader with identity: {}", leader); + }); + } + + private void startLeading() { + controllerManager.startEventProcessing(); + } + + private void stopLeading() { + log.info("Stopped leading for identity: {}. Exiting.", identity); + // When leader stops leading the process ends immediately to prevent multiple reconciliations + // running parallel. + // Note that some reconciliations might run for a very long time. + System.exit(1); + } + + private String identity(LeaderElectionConfiguration config) { + String id = config.getIdentity().orElse(System.getenv("HOSTNAME")); + if (id == null || id.isBlank()) { + id = UUID.randomUUID().toString(); + } + return id; + } + + public void start() { + if (isLeaderElectionEnabled()) { + leaderElectionFuture = leaderElector.start(); + } + } + + public void stop() { + if (leaderElectionFuture != null) { + leaderElectionFuture.cancel(false); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index a17e86d8cc..57faef696a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,9 +1,6 @@ package io.javaoperatorsdk.operator; -import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -12,15 +9,10 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.Version; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider; -import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; +import io.javaoperatorsdk.operator.api.config.*; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.LifecycleAware; @@ -29,14 +21,17 @@ public class Operator implements LifecycleAware { private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient kubernetesClient; - private final ControllerManager controllers = new ControllerManager(); + private final ControllerManager controllerManager = new ControllerManager(); + private final LeaderElectionManager leaderElectionManager = + new LeaderElectionManager(controllerManager); + private volatile boolean started = false; public Operator() { - this(new DefaultKubernetesClient(), ConfigurationServiceProvider.instance()); + this((KubernetesClient) null); } public Operator(KubernetesClient kubernetesClient) { - this(kubernetesClient, ConfigurationServiceProvider.instance()); + this(kubernetesClient, ConfigurationServiceProvider.instance(), null); } /** @@ -44,16 +39,15 @@ public Operator(KubernetesClient kubernetesClient) { */ @Deprecated public Operator(ConfigurationService configurationService) { - this(new DefaultKubernetesClient(), configurationService); + this(null, configurationService, null); } public Operator(Consumer overrider) { - this(new DefaultKubernetesClient(), overrider); + this(null, overrider); } public Operator(KubernetesClient client, Consumer overrider) { - this(client); - ConfigurationServiceProvider.overrideCurrent(overrider); + this(client, ConfigurationServiceProvider.instance(), overrider); } /** @@ -64,8 +58,19 @@ public Operator(KubernetesClient client, Consumer * @param configurationService provides configuration */ public Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService) { - this.kubernetesClient = kubernetesClient; + this(kubernetesClient, configurationService, null); + } + + private Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService, + Consumer overrider) { + this.kubernetesClient = + kubernetesClient != null ? kubernetesClient : new KubernetesClientBuilder().build(); ConfigurationServiceProvider.set(configurationService); + if (overrider != null) { + ConfigurationServiceProvider.overrideCurrent(overrider); + } + ConfigurationServiceProvider.instance().getLeaderElectionConfiguration() + .ifPresent(c -> leaderElectionManager.init(c, this.kubernetesClient)); } /** Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. */ @@ -84,8 +89,11 @@ public KubernetesClient getKubernetesClient() { */ public void start() { try { - controllers.shouldStart(); - + if (started) { + return; + } + started = true; + controllerManager.shouldStart(); final var version = ConfigurationServiceProvider.instance().getVersion(); log.info( "Operator SDK {} (commit: {}) built on {} starting...", @@ -95,9 +103,11 @@ public void start() { final var clientVersion = Version.clientVersion(); log.info("Client version: {}", clientVersion); - ExecutorServiceManager.init(); - controllers.start(); + // first start the controller manager before leader election, + // the leader election would start subsequently the processor if on + controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); + leaderElectionManager.start(); } catch (Exception e) { log.error("Error starting operator", e); stop(); @@ -111,9 +121,9 @@ public void stop() throws OperatorException { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); - controllers.stop(); - + controllerManager.stop(); ExecutorServiceManager.stop(); + leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); } @@ -149,6 +159,9 @@ public

RegisteredController

register(Reconciler

re public

RegisteredController

register(Reconciler

reconciler, ControllerConfiguration

configuration) throws OperatorException { + if (started) { + throw new OperatorException("Operator already started. Register all the controllers before."); + } if (configuration == null) { throw new OperatorException( @@ -161,7 +174,7 @@ public

RegisteredController

register(Reconciler

re final var controller = new Controller<>(reconciler, configuration, kubernetesClient); - controllers.add(controller); + controllerManager.add(controller); final var watchedNS = configuration.watchAllNamespaces() ? "[all namespaces]" : configuration.getEffectiveNamespaces(); @@ -191,73 +204,15 @@ public

RegisteredController

register(Reconciler

re } public Optional getRegisteredController(String name) { - return controllers.get(name).map(RegisteredController.class::cast); + return controllerManager.get(name).map(RegisteredController.class::cast); } public Set getRegisteredControllers() { - return new HashSet<>(controllers.controllers()); + return new HashSet<>(controllerManager.controllers()); } public int getRegisteredControllersNumber() { - return controllers.size(); + return controllerManager.size(); } - static class ControllerManager implements LifecycleAware { - private final Map controllers = new HashMap<>(); - private boolean started = false; - - public synchronized void shouldStart() { - if (started) { - return; - } - if (controllers.isEmpty()) { - throw new OperatorException("No Controller exists. Exiting!"); - } - } - - public synchronized void start() { - controllers().parallelStream().forEach(Controller::start); - started = true; - } - - public synchronized void stop() { - controllers().parallelStream().forEach(closeable -> { - log.debug("closing {}", closeable); - closeable.stop(); - }); - - started = false; - } - - @SuppressWarnings("unchecked") - synchronized void add(Controller controller) { - final var configuration = controller.getConfiguration(); - final var resourceTypeName = ReconcilerUtils - .getResourceTypeNameWithVersion(configuration.getResourceClass()); - final var existing = controllers.get(resourceTypeName); - if (existing != null) { - throw new OperatorException("Cannot register controller '" + configuration.getName() - + "': another controller named '" + existing.getConfiguration().getName() - + "' is already registered for resource '" + resourceTypeName + "'"); - } - controllers.put(resourceTypeName, controller); - if (started) { - controller.start(); - } - } - - synchronized Optional get(String name) { - return controllers().stream() - .filter(c -> name.equals(c.getConfiguration().getName())) - .findFirst(); - } - - synchronized Collection controllers() { - return controllers.values(); - } - - synchronized int size() { - return controllers.size(); - } - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index a425bda5bd..f0d1455dd7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -132,4 +133,9 @@ default ObjectMapper getObjectMapper() { default DependentResourceFactory dependentResourceFactory() { return new DependentResourceFactory() {}; } + + default Optional getLeaderElectionConfiguration() { + return Optional.empty(); + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index f8833231c2..20dca09f1e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -21,6 +22,7 @@ public class ConfigurationServiceOverrider { private boolean closeClientOnStop; private ObjectMapper objectMapper; private ExecutorService executorService = null; + private LeaderElectionConfiguration leaderElectionConfiguration; ConfigurationServiceOverrider(ConfigurationService original) { this.original = original; @@ -80,6 +82,12 @@ public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) return this; } + public ConfigurationServiceOverrider withLeaderElectionConfiguration( + LeaderElectionConfiguration leaderElectionConfiguration) { + this.leaderElectionConfiguration = leaderElectionConfiguration; + return this; + } + public ConfigurationService build() { return new BaseConfigurationService(original.getVersion(), cloner) { @Override @@ -130,6 +138,11 @@ public ExecutorService getExecutorService() { public ObjectMapper getObjectMapper() { return objectMapper; } + + @Override + public Optional getLeaderElectionConfiguration() { + return Optional.ofNullable(leaderElectionConfiguration); + } }; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/LeaderElectionConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/LeaderElectionConfiguration.java new file mode 100644 index 0000000000..5146fa6a1e --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/LeaderElectionConfiguration.java @@ -0,0 +1,85 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.time.Duration; +import java.util.Optional; + +public class LeaderElectionConfiguration { + + public static final Duration LEASE_DURATION_DEFAULT_VALUE = Duration.ofSeconds(15); + public static final Duration RENEW_DEADLINE_DEFAULT_VALUE = Duration.ofSeconds(10); + public static final Duration RETRY_PERIOD_DEFAULT_VALUE = Duration.ofSeconds(2); + + private final String leaseName; + private final String leaseNamespace; + private final String identity; + + private final Duration leaseDuration; + private final Duration renewDeadline; + private final Duration retryPeriod; + + public LeaderElectionConfiguration(String leaseName, String leaseNamespace, String identity) { + this( + leaseName, + leaseNamespace, + LEASE_DURATION_DEFAULT_VALUE, + RENEW_DEADLINE_DEFAULT_VALUE, + RETRY_PERIOD_DEFAULT_VALUE, identity); + } + + public LeaderElectionConfiguration(String leaseName, String leaseNamespace) { + this( + leaseName, + leaseNamespace, + LEASE_DURATION_DEFAULT_VALUE, + RENEW_DEADLINE_DEFAULT_VALUE, + RETRY_PERIOD_DEFAULT_VALUE, null); + } + + public LeaderElectionConfiguration( + String leaseName, + String leaseNamespace, + Duration leaseDuration, + Duration renewDeadline, + Duration retryPeriod) { + this(leaseName, leaseNamespace, leaseDuration, renewDeadline, retryPeriod, null); + } + + public LeaderElectionConfiguration( + String leaseName, + String leaseNamespace, + Duration leaseDuration, + Duration renewDeadline, + Duration retryPeriod, + String identity) { + this.leaseName = leaseName; + this.leaseNamespace = leaseNamespace; + this.leaseDuration = leaseDuration; + this.renewDeadline = renewDeadline; + this.retryPeriod = retryPeriod; + this.identity = identity; + } + + public String getLeaseNamespace() { + return leaseNamespace; + } + + public String getLeaseName() { + return leaseName; + } + + public Duration getLeaseDuration() { + return leaseDuration; + } + + public Duration getRenewDeadline() { + return renewDeadline; + } + + public Duration getRetryPeriod() { + return retryPeriod; + } + + public Optional getIdentity() { + return Optional.ofNullable(identity); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 66439ab8b8..92b70e722d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -36,6 +36,7 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow; import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult; +import io.javaoperatorsdk.operator.processing.event.EventProcessor; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -44,7 +45,8 @@ @SuppressWarnings({"unchecked", "rawtypes"}) @Ignore public class Controller

- implements Reconciler

, Cleaner

, LifecycleAware, RegisteredController

{ + implements Reconciler

, LifecycleAware, Cleaner

, + RegisteredController

{ private static final Logger log = LoggerFactory.getLogger(Controller.class); @@ -58,6 +60,7 @@ public class Controller

private final ManagedWorkflow

managedWorkflow; private final GroupVersionKind associatedGVK; + private final EventProcessor

eventProcessor; public Controller(Reconciler

reconciler, ControllerConfiguration

configuration, @@ -75,6 +78,8 @@ public Controller(Reconciler

reconciler, managedWorkflow = ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); eventSourceManager = new EventSourceManager<>(this); + eventProcessor = new EventProcessor<>(eventSourceManager); + eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); } @Override @@ -258,6 +263,10 @@ public MixedOperation, Resource

> getCRClient() { return kubernetesClient.resources(configuration.getResourceClass()); } + public void start() throws OperatorException { + start(true); + } + /** * Registers the specified controller with this operator, overriding its default configuration by * the specified one (usually created via @@ -266,7 +275,7 @@ public MixedOperation, Resource

> getCRClient() { * * @throws OperatorException if a problem occurred during the registration process */ - public void start() throws OperatorException { + public synchronized void start(boolean startEventProcessor) throws OperatorException { final Class

resClass = configuration.getResourceClass(); final String controllerName = configuration.getName(); final var crdName = configuration.getResourceTypeName(); @@ -284,6 +293,9 @@ public void start() throws OperatorException { initAndRegisterEventSources(context); eventSourceManager.start(); + if (startEventProcessor) { + eventProcessor.start(); + } log.info("'{}' controller started, pending event sources initialization", controllerName); } catch (MissingCRDException e) { stop(); @@ -291,6 +303,7 @@ public void start() throws OperatorException { } } + private void validateCRDWithLocalModelIfRequired(Class

resClass, String controllerName, String crdName, String specVersion) { final CustomResourceDefinition crd; @@ -311,7 +324,14 @@ public void changeNamespaces(Set namespaces) { || namespaces.contains(WATCH_CURRENT_NAMESPACE)) { throw new OperatorException("Unexpected value in target namespaces: " + namespaces); } + eventProcessor.stop(); eventSourceManager.changeNamespaces(namespaces); + eventProcessor.start(); + } + + public synchronized void startEventProcessing() { + log.info("Started event processing for controller: {}", configuration.getName()); + eventProcessor.start(); } private void throwMissingCRDException(String crdName, String specVersion, String controllerName) { @@ -346,7 +366,10 @@ public EventSourceManager

getEventSourceManager() { return eventSourceManager; } - public void stop() { + public synchronized void stop() { + if (eventProcessor != null) { + eventProcessor.stop(); + } if (eventSourceManager != null) { eventSourceManager.stop(); } @@ -359,4 +382,8 @@ public boolean useFinalizer() { public GroupVersionKind getAssociatedGroupVersionKind() { return associatedGVK; } + + public EventProcessor

getEventProcessor() { + return eventProcessor; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 226a656abc..e46878f927 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -31,7 +31,7 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; -class EventProcessor implements EventHandler, LifecycleAware { +public class EventProcessor implements EventHandler, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; @@ -48,7 +48,7 @@ class EventProcessor implements EventHandler, LifecycleAw private final ResourceStateManager resourceStateManager = new ResourceStateManager(); private final Map metricsMetadata; - EventProcessor(EventSourceManager eventSourceManager) { + public EventProcessor(EventSourceManager eventSourceManager) { this( eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index ade840ff4b..b5db5f8eee 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -23,31 +23,30 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; -public class EventSourceManager implements LifecycleAware { +public class EventSourceManager

implements LifecycleAware { private static final Logger log = LoggerFactory.getLogger(EventSourceManager.class); - private final EventSources eventSources; - private final EventProcessor eventProcessor; - private final Controller controller; + private final EventSources

eventSources; + private final Controller

controller; - public EventSourceManager(Controller controller) { + public EventSourceManager(Controller

controller) { this(controller, new EventSources<>()); } - EventSourceManager(Controller controller, EventSources eventSources) { + EventSourceManager(Controller

controller, EventSources

eventSources) { this.eventSources = eventSources; this.controller = controller; // controller event source needs to be available before we create the event processor eventSources.initControllerEventSource(controller); - this.eventProcessor = new EventProcessor<>(this); - postProcessDefaultEventSources(); + + postProcessDefaultEventSourcesAfterProcessorInitializer(); } - private void postProcessDefaultEventSources() { - eventSources.controllerResourceEventSource().setEventHandler(eventProcessor); - eventSources.retryEventSource().setEventHandler(eventProcessor); + public void postProcessDefaultEventSourcesAfterProcessorInitializer() { + eventSources.controllerResourceEventSource().setEventHandler(controller.getEventProcessor()); + eventSources.retryEventSource().setEventHandler(controller.getEventProcessor()); } /** @@ -64,7 +63,6 @@ private void postProcessDefaultEventSources() { public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); eventSources.additionalNamedEventSources().parallel().forEach(this::startEventSource); - eventProcessor.start(); } @Override @@ -72,7 +70,7 @@ public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource); eventSources.clear(); - eventProcessor.stop(); + } @SuppressWarnings("rawtypes") @@ -122,7 +120,7 @@ public final synchronized void registerEventSource(String name, EventSource even name = EventSourceInitializer.generateNameFor(eventSource); } eventSources.add(name, eventSource); - eventSource.setEventHandler(eventProcessor); + eventSource.setEventHandler(controller.getEventProcessor()); } catch (IllegalStateException | MissingCRDException e) { throw e; // leave untouched } catch (Exception e) { @@ -132,10 +130,10 @@ public final synchronized void registerEventSource(String name, EventSource even } @SuppressWarnings("unchecked") - public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) { + public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldResource) { eventSources.additionalNamedEventSources().forEach(eventSource -> { if (eventSource instanceof ResourceEventAware) { - var lifecycleAwareES = ((ResourceEventAware) eventSource); + var lifecycleAwareES = ((ResourceEventAware

) eventSource); switch (action) { case ADDED: lifecycleAwareES.onResourceCreated(resource); @@ -152,7 +150,6 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes } public void changeNamespaces(Set namespaces) { - eventProcessor.stop(); eventSources.controllerResourceEventSource() .changeNamespaces(namespaces); eventSources @@ -162,11 +159,6 @@ public void changeNamespaces(Set namespaces) { .filter(NamespaceChangeable::allowsNamespaceChanges) .parallel() .forEach(ies -> ies.changeNamespaces(namespaces)); - eventProcessor.start(); - } - - EventHandler getEventHandler() { - return eventProcessor; } public Set getRegisteredEventSources() { @@ -175,30 +167,30 @@ public Set getRegisteredEventSources() { .collect(Collectors.toCollection(LinkedHashSet::new)); } - public ControllerResourceEventSource getControllerResourceEventSource() { + public ControllerResourceEventSource

getControllerResourceEventSource() { return eventSources.controllerResourceEventSource(); } - ResourceEventSource getResourceEventSourceFor( + ResourceEventSource getResourceEventSourceFor( Class dependentType) { return getResourceEventSourceFor(dependentType, null); } - public List> getEventSourcesFor(Class dependentType) { + public List> getEventSourcesFor(Class dependentType) { return eventSources.getEventSources(dependentType); } - public ResourceEventSource getResourceEventSourceFor( + public ResourceEventSource getResourceEventSourceFor( Class dependentType, String qualifier) { Objects.requireNonNull(dependentType, "dependentType is Mandatory"); return eventSources.get(dependentType, qualifier); } - TimerEventSource retryEventSource() { + TimerEventSource

retryEventSource() { return eventSources.retryEventSource(); } - Controller getController() { + Controller

getController() { return controller; } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index 1520f21134..bb19e6b505 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -30,26 +30,26 @@ /** * Handles calls and results of a Reconciler and finalizer related logic */ -class ReconciliationDispatcher { +class ReconciliationDispatcher

{ public static final int MAX_FINALIZER_REMOVAL_RETRY = 10; private static final Logger log = LoggerFactory.getLogger(ReconciliationDispatcher.class); - private final Controller controller; - private final CustomResourceFacade customResourceFacade; + private final Controller

controller; + private final CustomResourceFacade

customResourceFacade; - ReconciliationDispatcher(Controller controller, - CustomResourceFacade customResourceFacade) { + ReconciliationDispatcher(Controller

controller, + CustomResourceFacade

customResourceFacade) { this.controller = controller; this.customResourceFacade = customResourceFacade; } - public ReconciliationDispatcher(Controller controller) { + public ReconciliationDispatcher(Controller

controller) { this(controller, new CustomResourceFacade<>(controller.getCRClient())); } - public PostExecutionControl handleExecution(ExecutionScope executionScope) { + public PostExecutionControl

handleExecution(ExecutionScope

executionScope) { try { return handleDispatch(executionScope); } catch (Exception e) { @@ -58,9 +58,9 @@ public PostExecutionControl handleExecution(ExecutionScope executionScope) } } - private PostExecutionControl handleDispatch(ExecutionScope executionScope) + private PostExecutionControl

handleDispatch(ExecutionScope

executionScope) throws Exception { - R originalResource = executionScope.getResource(); + P originalResource = executionScope.getResource(); var resourceForExecution = cloneResource(originalResource); log.debug("Handling dispatch for resource {}", getName(originalResource)); @@ -73,7 +73,7 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) return PostExecutionControl.defaultDispatch(); } - Context context = + Context

context = new DefaultContext<>(executionScope.getRetryInfo(), controller, originalResource); if (markedForDeletion) { return handleCleanup(resourceForExecution, context); @@ -82,7 +82,7 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) } } - private boolean shouldNotDispatchToCleanupWhenMarkedForDeletion(R resource) { + private boolean shouldNotDispatchToCleanupWhenMarkedForDeletion(P resource) { var alreadyRemovedFinalizer = controller.useFinalizer() && !resource.hasFinalizer(configuration().getFinalizerName()); if (alreadyRemovedFinalizer) { @@ -92,9 +92,9 @@ private boolean shouldNotDispatchToCleanupWhenMarkedForDeletion(R resource) { return !controller.useFinalizer() || alreadyRemovedFinalizer; } - private PostExecutionControl handleReconcile( - ExecutionScope executionScope, R resourceForExecution, R originalResource, - Context context) throws Exception { + private PostExecutionControl

handleReconcile( + ExecutionScope

executionScope, P resourceForExecution, P originalResource, + Context

context) throws Exception { if (controller.useFinalizer() && !originalResource.hasFinalizer(configuration().getFinalizerName())) { /* @@ -114,21 +114,21 @@ private PostExecutionControl handleReconcile( } } - private R cloneResource(R resource) { + private P cloneResource(P resource) { final var cloner = ConfigurationServiceProvider.instance().getResourceCloner(); return cloner.clone(resource); } - private PostExecutionControl reconcileExecution(ExecutionScope executionScope, - R resourceForExecution, R originalResource, Context context) throws Exception { + private PostExecutionControl

reconcileExecution(ExecutionScope

executionScope, + P resourceForExecution, P originalResource, Context

context) throws Exception { log.debug( "Reconciling resource {} with version: {} with execution scope: {}", getName(resourceForExecution), getVersion(resourceForExecution), executionScope); - UpdateControl updateControl = controller.reconcile(resourceForExecution, context); - R updatedCustomResource = null; + UpdateControl

updateControl = controller.reconcile(resourceForExecution, context); + P updatedCustomResource = null; if (updateControl.isUpdateResourceAndStatus()) { updatedCustomResource = updateCustomResource(updateControl.getResource()); @@ -160,8 +160,8 @@ && shouldUpdateObservedGenerationAutomatically(resourceForExecution)) { } @SuppressWarnings("unchecked") - private PostExecutionControl handleErrorStatusHandler(R resource, R originalResource, - Context context, + private PostExecutionControl

handleErrorStatusHandler(P resource, P originalResource, + Context

context, Exception e) throws Exception { if (isErrorStatusHandlerPresent()) { try { @@ -176,11 +176,11 @@ public boolean isLastAttempt() { return controller.getConfiguration().getRetry() == null; } }); - ((DefaultContext) context).setRetryInfo(retryInfo); - var errorStatusUpdateControl = ((ErrorStatusHandler) controller.getReconciler()) + ((DefaultContext

) context).setRetryInfo(retryInfo); + var errorStatusUpdateControl = ((ErrorStatusHandler

) controller.getReconciler()) .updateErrorStatus(resource, context, e); - R updatedResource = null; + P updatedResource = null; if (errorStatusUpdateControl.getResource().isPresent()) { updatedResource = errorStatusUpdateControl.isPatch() ? customResourceFacade .patchStatus(errorStatusUpdateControl.getResource().orElseThrow(), originalResource) @@ -207,7 +207,7 @@ private boolean isErrorStatusHandlerPresent() { return controller.getReconciler() instanceof ErrorStatusHandler; } - private R updateStatusGenerationAware(R resource, R originalResource, boolean patch) { + private P updateStatusGenerationAware(P resource, P originalResource, boolean patch) { updateStatusObservedGenerationIfRequired(resource); if (patch) { return customResourceFacade.patchStatus(resource, originalResource); @@ -217,7 +217,7 @@ private R updateStatusGenerationAware(R resource, R originalResource, boolean pa } @SuppressWarnings("rawtypes") - private boolean shouldUpdateObservedGenerationAutomatically(R resource) { + private boolean shouldUpdateObservedGenerationAutomatically(P resource) { if (configuration().isGenerationAware() && resource instanceof CustomResource) { var customResource = (CustomResource) resource; var status = customResource.getStatus(); @@ -232,7 +232,7 @@ private boolean shouldUpdateObservedGenerationAutomatically(R resource) { } @SuppressWarnings("rawtypes") - private void updateStatusObservedGenerationIfRequired(R resource) { + private void updateStatusObservedGenerationIfRequired(P resource) { if (configuration().isGenerationAware() && resource instanceof CustomResource) { var customResource = (CustomResource) resource; var status = customResource.getStatus(); @@ -244,9 +244,9 @@ private void updateStatusObservedGenerationIfRequired(R resource) { } } - private PostExecutionControl createPostExecutionControl(R updatedCustomResource, - UpdateControl updateControl) { - PostExecutionControl postExecutionControl; + private PostExecutionControl

createPostExecutionControl(P updatedCustomResource, + UpdateControl

updateControl) { + PostExecutionControl

postExecutionControl; if (updatedCustomResource != null) { if (updateControl.isUpdateStatus() && updateControl.isPatch()) { postExecutionControl = @@ -262,7 +262,7 @@ private PostExecutionControl createPostExecutionControl(R updatedCustomResour } private void updatePostExecutionControlWithReschedule( - PostExecutionControl postExecutionControl, + PostExecutionControl

postExecutionControl, BaseControl baseControl) { baseControl.getScheduleDelay().ifPresentOrElse(postExecutionControl::withReSchedule, () -> controller.getConfiguration().maxReconciliationInterval() @@ -270,7 +270,7 @@ private void updatePostExecutionControlWithReschedule( } - private PostExecutionControl handleCleanup(R resource, Context context) { + private PostExecutionControl

handleCleanup(P resource, Context

context) { log.debug( "Executing delete for resource: {} with version: {}", getName(resource), @@ -283,7 +283,7 @@ private PostExecutionControl handleCleanup(R resource, Context context) { // cleanup is finished, nothing left to done final var finalizerName = configuration().getFinalizerName(); if (deleteControl.isRemoveFinalizer() && resource.hasFinalizer(finalizerName)) { - R customResource = removeFinalizer(resource, finalizerName); + P customResource = removeFinalizer(resource, finalizerName); return PostExecutionControl.customResourceFinalizerRemoved(customResource); } } @@ -293,29 +293,29 @@ private PostExecutionControl handleCleanup(R resource, Context context) { getVersion(resource), deleteControl, useFinalizer); - PostExecutionControl postExecutionControl = PostExecutionControl.defaultDispatch(); + PostExecutionControl

postExecutionControl = PostExecutionControl.defaultDispatch(); updatePostExecutionControlWithReschedule(postExecutionControl, deleteControl); return postExecutionControl; } - private R updateCustomResourceWithFinalizer(R resource) { + private P updateCustomResourceWithFinalizer(P resource) { log.debug( "Adding finalizer for resource: {} version: {}", getUID(resource), getVersion(resource)); resource.addFinalizer(configuration().getFinalizerName()); return customResourceFacade.updateResource(resource); } - private R updateCustomResource(R resource) { + private P updateCustomResource(P resource) { log.debug("Updating resource: {} with version: {}", getUID(resource), getVersion(resource)); log.trace("Resource before update: {}", resource); return customResourceFacade.updateResource(resource); } - ControllerConfiguration configuration() { + ControllerConfiguration

configuration() { return controller.getConfiguration(); } - public R removeFinalizer(R resource, String finalizer) { + public P removeFinalizer(P resource, String finalizer) { if (log.isDebugEnabled()) { log.debug("Removing finalizer on resource: {}", ResourceID.fromResource(resource)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index b328ace132..d788f61e4a 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -4,15 +4,10 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.Operator.ControllerManager; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; -import io.javaoperatorsdk.operator.sample.simple.DuplicateCRController; -import io.javaoperatorsdk.operator.sample.simple.TestCustomReconciler; -import io.javaoperatorsdk.operator.sample.simple.TestCustomReconcilerOtherV1; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceOtherV1; +import io.javaoperatorsdk.operator.sample.simple.*; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index 779fe032f9..2782ed52d1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -39,7 +39,7 @@ public void registersEventSource() { Set registeredSources = eventSourceManager.getRegisteredEventSources(); assertThat(registeredSources).contains(eventSource); - verify(eventSource, times(1)).setEventHandler(eventSourceManager.getEventHandler()); + verify(eventSource, times(1)).setEventHandler(any()); } @Test diff --git a/sample-operators/leader-election/README.md b/sample-operators/leader-election/README.md new file mode 100644 index 0000000000..2a9a369a45 --- /dev/null +++ b/sample-operators/leader-election/README.md @@ -0,0 +1,6 @@ +# Leader Election E2E Test + +The purpose of this module is to e2e test leader election feature. + +The deployment is using directly pods in order to better control some aspects in test. +In real life this would be a Deployment. \ No newline at end of file diff --git a/sample-operators/leader-election/k8s/operator-instance-2.yaml b/sample-operators/leader-election/k8s/operator-instance-2.yaml new file mode 100644 index 0000000000..abde7eb56a --- /dev/null +++ b/sample-operators/leader-election/k8s/operator-instance-2.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Pod +metadata: + name: leader-election-operator-2 +spec: + serviceAccountName: leader-election-operator + containers: + - name: operator + image: leader-election-operator + imagePullPolicy: Never + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name diff --git a/sample-operators/leader-election/k8s/operator.yaml b/sample-operators/leader-election/k8s/operator.yaml new file mode 100644 index 0000000000..00ed3e7273 --- /dev/null +++ b/sample-operators/leader-election/k8s/operator.yaml @@ -0,0 +1,65 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: leader-election-operator + +--- +apiVersion: v1 +kind: Pod +metadata: + name: leader-election-operator-1 +spec: + serviceAccountName: leader-election-operator + containers: + - name: operator + image: leader-election-operator + imagePullPolicy: Never + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: operator-admin +subjects: + - kind: ServiceAccount + name: leader-election-operator +roleRef: + kind: ClusterRole + name: leader-election-operator + apiGroup: "" + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: leader-election-operator +rules: + - apiGroups: + - "apiextensions.k8s.io" + resources: + - customresourcedefinitions + verbs: + - '*' + - apiGroups: + - "sample.javaoperatorsdk" + resources: + - leaderelectiontestcustomresources + - leaderelectiontestcustomresources/status + verbs: + - '*' + - apiGroups: + - "coordination.k8s.io" + resources: + - "leases" + verbs: + - '*' + diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml new file mode 100644 index 0000000000..8ae0e3e4ba --- /dev/null +++ b/sample-operators/leader-election/pom.xml @@ -0,0 +1,82 @@ + + + 4.0.0 + + + io.javaoperatorsdk + sample-operators + 3.2.0-SNAPSHOT + + + sample-leader-election + Operator SDK - Samples - Leader Election + An E2E test for leader election + jar + + + 11 + 11 + 3.2.1 + + + + + io.javaoperatorsdk + operator-framework + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.takes + takes + 1.21.1 + + + io.fabric8 + crd-generator-apt + provided + + + io.fabric8 + crd-generator-api + + + org.awaitility + awaitility + compile + + + io.javaoperatorsdk + operator-framework-junit-5 + ${project.version} + test + + + + + + com.google.cloud.tools + jib-maven-plugin + ${jib-maven-plugin.version} + + + gcr.io/distroless/java:11 + + + leader-election-operator + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.0 + + + + + \ No newline at end of file diff --git a/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestCustomResource.java b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestCustomResource.java new file mode 100644 index 0000000000..b440be4d18 --- /dev/null +++ b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("le") +public class LeaderElectionTestCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java new file mode 100644 index 0000000000..47a5a0ed59 --- /dev/null +++ b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.sample; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration; + +public class LeaderElectionTestOperator { + + private static final Logger log = LoggerFactory.getLogger(LeaderElectionTestOperator.class); + + public static void main(String[] args) { + String identity = System.getenv("POD_NAME"); + String namespace = System.getenv("POD_NAMESPACE"); + + log.info("Starting operator with identity: {}", identity); + + var client = new KubernetesClientBuilder().withConfig(new ConfigBuilder() + .withNamespace(namespace) + .build()).build(); + + Operator operator = new Operator(client, + c -> c.withLeaderElectionConfiguration( + new LeaderElectionConfiguration("leader-election-test", namespace, identity))); + + operator.register(new LeaderElectionTestReconciler(identity)); + operator.installShutdownHook(); + operator.start(); + } + +} diff --git a/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestReconciler.java b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestReconciler.java new file mode 100644 index 0000000000..f6231d0a52 --- /dev/null +++ b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestReconciler.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.sample; + +import java.time.Duration; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +@ControllerConfiguration() +public class LeaderElectionTestReconciler + implements Reconciler { + + private final String reconcilerName; + + public LeaderElectionTestReconciler(String reconcilerName) { + this.reconcilerName = reconcilerName; + } + + @Override + public UpdateControl reconcile( + LeaderElectionTestCustomResource resource, + Context context) { + + if (resource.getStatus() == null) { + resource.setStatus(new LeaderElectionTestStatus()); + } + + resource.getStatus().getReconciledBy().add(reconcilerName); + // update status is with optimistic locking + return UpdateControl.updateStatus(resource).rescheduleAfter(Duration.ofSeconds(1)); + } + +} diff --git a/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestStatus.java b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestStatus.java new file mode 100644 index 0000000000..5a9e66e270 --- /dev/null +++ b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestStatus.java @@ -0,0 +1,21 @@ +package io.javaoperatorsdk.operator.sample; + +import java.util.ArrayList; +import java.util.List; + +public class LeaderElectionTestStatus { + + private List reconciledBy; + + public List getReconciledBy() { + if (reconciledBy == null) { + reconciledBy = new ArrayList<>(); + } + return reconciledBy; + } + + public LeaderElectionTestStatus setReconciledBy(List reconciledBy) { + this.reconciledBy = reconciledBy; + return this; + } +} diff --git a/sample-operators/leader-election/src/main/resources/log4j2.xml b/sample-operators/leader-election/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..0ec69bf713 --- /dev/null +++ b/sample-operators/leader-election/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/sample-operators/leader-election/src/test/java/io/javaoperatorsdk/operator/sample/LeaderElectionE2E.java b/sample-operators/leader-election/src/test/java/io/javaoperatorsdk/operator/sample/LeaderElectionE2E.java new file mode 100644 index 0000000000..60b3ad639a --- /dev/null +++ b/sample-operators/leader-election/src/test/java/io/javaoperatorsdk/operator/sample/LeaderElectionE2E.java @@ -0,0 +1,180 @@ +package io.javaoperatorsdk.operator.sample; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.OptionalInt; +import java.util.UUID; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +import static io.javaoperatorsdk.operator.junit.AbstractOperatorExtension.CRD_READY_WAIT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class LeaderElectionE2E { + + private static final Logger log = LoggerFactory.getLogger(LeaderElectionE2E.class); + public static final int POD_STARTUP_TIMEOUT = 60; + + public static final String TEST_RESOURCE_NAME = "test1"; + public static final int MINIMAL_SECONDS_FOR_RENEWAL = 3; + public static final int MAX_WAIT_SECONDS = 30; + + private static final String OPERATOR_1_POD_NAME = "leader-election-operator-1"; + private static final String OPERATOR_2_POD_NAME = "leader-election-operator-2"; + public static final int MINIMAL_EXPECTED_RECONCILIATION = 3; + + private String namespace; + private KubernetesClient client; + + @Test + // not for local mode by design + @EnabledIfSystemProperty(named = "test.deployment", matches = "remote") + void otherInstancesTakesOverWhenSteppingDown() { + log.info("Deploying operator"); + deployOperatorsInOrder(); + log.info("Applying custom resource"); + applyCustomResource(); + + log.info("Awaiting custom resource reconciliations"); + await().pollDelay(Duration.ofSeconds(MINIMAL_SECONDS_FOR_RENEWAL)) + .atMost(Duration.ofSeconds(MAX_WAIT_SECONDS)) + .untilAsserted(() -> { + var actualStatus = client.resources(LeaderElectionTestCustomResource.class) + .inNamespace(namespace).withName(TEST_RESOURCE_NAME).get().getStatus(); + + assertThat(actualStatus).isNotNull(); + assertThat(actualStatus.getReconciledBy()) + .hasSizeGreaterThan(MINIMAL_EXPECTED_RECONCILIATION); + }); + + client.pods().inNamespace(namespace).withName(OPERATOR_1_POD_NAME).delete(); + + var actualListSize = client.resources(LeaderElectionTestCustomResource.class) + .inNamespace(namespace).withName(TEST_RESOURCE_NAME).get().getStatus().getReconciledBy() + .size(); + + await().pollDelay(Duration.ofSeconds(MINIMAL_SECONDS_FOR_RENEWAL)) + .atMost(Duration.ofSeconds(240)) + .untilAsserted(() -> { + var actualStatus = client.resources(LeaderElectionTestCustomResource.class) + .inNamespace(namespace).withName(TEST_RESOURCE_NAME).get().getStatus(); + + assertThat(actualStatus).isNotNull(); + assertThat(actualStatus.getReconciledBy()) + .hasSizeGreaterThan(actualListSize + MINIMAL_EXPECTED_RECONCILIATION); + }); + + assertReconciliations( + client.resources(LeaderElectionTestCustomResource.class).inNamespace(namespace) + .withName(TEST_RESOURCE_NAME).get().getStatus().getReconciledBy()); + } + + private void assertReconciliations(List reconciledBy) { + log.info("Reconciled by content: {}", reconciledBy); + OptionalInt firstO2StatusIndex = IntStream.range(0, reconciledBy.size()) + .filter(i -> reconciledBy.get(i).equals(OPERATOR_2_POD_NAME)) + .findFirst(); + assertThat(firstO2StatusIndex).isPresent(); + assertThat(reconciledBy.subList(0, firstO2StatusIndex.getAsInt() - 1)) + .allMatch(s -> s.equals(OPERATOR_1_POD_NAME)); + assertThat(reconciledBy.subList(firstO2StatusIndex.getAsInt(), reconciledBy.size())) + .allMatch(s -> s.equals(OPERATOR_2_POD_NAME)); + } + + private void applyCustomResource() { + var res = new LeaderElectionTestCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .withNamespace(namespace) + .build()); + client.resource(res).create(); + } + + @BeforeEach + void setup() { + namespace = "leader-election-it-" + UUID.randomUUID(); + client = new KubernetesClientBuilder().withConfig(new ConfigBuilder() + .withNamespace(namespace) + .build()).build(); + applyCRD(); + client.namespaces().resource(new NamespaceBuilder().withNewMetadata().withName(namespace) + .endMetadata().build()).create(); + } + + @AfterEach + void tearDown() { + client.namespaces().resource(new NamespaceBuilder().withNewMetadata().withName(namespace) + .endMetadata().build()).delete(); + await() + .atMost(Duration.ofSeconds(60)) + .untilAsserted(() -> assertThat(client.namespaces().withName(namespace).get()).isNull()); + } + + private void deployOperatorsInOrder() { + applyResources("k8s/operator.yaml"); + await().atMost(Duration.ofSeconds(POD_STARTUP_TIMEOUT)).untilAsserted(() -> { + var pod = client.pods().inNamespace(namespace).withName(OPERATOR_1_POD_NAME).get(); + assertThat(pod.getStatus().getContainerStatuses().get(0).getReady()).isTrue(); + }); + + applyResources("k8s/operator-instance-2.yaml"); + await().atMost(Duration.ofSeconds(POD_STARTUP_TIMEOUT)).untilAsserted(() -> { + var pod = client.pods().inNamespace(namespace).withName(OPERATOR_2_POD_NAME).get(); + assertThat(pod.getStatus().getContainerStatuses().get(0).getReady()).isTrue(); + }); + } + + void applyCRD() { + String path = + "./target/classes/META-INF/fabric8/leaderelectiontestcustomresources.sample.javaoperatorsdk-v1.yml"; + try (InputStream is = new FileInputStream(path)) { + final var crd = client.load(is); + crd.createOrReplace(); + Thread.sleep(CRD_READY_WAIT); + log.debug("Applied CRD with name: {}", crd.get().get(0).getMetadata().getName()); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + } + + void applyResources(String path) { + try { + List resources = client.load(new FileInputStream(path)).get(); + resources.forEach(hm -> { + hm.getMetadata().setNamespace(namespace); + if (hm.getKind().toLowerCase(Locale.ROOT).equals("clusterrolebinding")) { + var crb = (ClusterRoleBinding) hm; + for (var subject : crb.getSubjects()) { + subject.setNamespace(namespace); + } + } + }); + client.resourceList(resources) + .inNamespace(namespace) + .createOrReplace(); + + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sample-operators/leader-election/src/test/resources/log4j2.xml b/sample-operators/leader-election/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..2b7fdd3479 --- /dev/null +++ b/sample-operators/leader-election/src/test/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml index f8772d62da..82b0e8ba00 100644 --- a/sample-operators/pom.xml +++ b/sample-operators/pom.xml @@ -22,5 +22,6 @@ tomcat-operator webpage mysql-schema + leader-election - \ No newline at end of file +