diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
index 12437155e9..b7c2f5e108 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
@@ -132,11 +132,14 @@ public void stop() throws OperatorException {
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();
+
ExecutorServiceManager.stop();
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
}
+
+ started = false;
}
/**
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
index 59962e39f1..a75f51f981 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
@@ -5,14 +5,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
-import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
@@ -40,20 +38,18 @@ public class EventProcessor
implements EventHandler, Life
private final ControllerConfiguration> controllerConfiguration;
private final ReconciliationDispatcher
reconciliationDispatcher;
private final Retry retry;
- private final ExecutorService executor;
private final Metrics metrics;
private final Cache
cache;
private final EventSourceManager
eventSourceManager;
private final RateLimiter extends RateLimitState> rateLimiter;
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final Map metricsMetadata;
-
+ private ExecutorService executor;
public EventProcessor(EventSourceManager eventSourceManager) {
this(
eventSourceManager.getController().getConfiguration(),
eventSourceManager.getControllerResourceEventSource(),
- ExecutorServiceManager.instance().executorService(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager);
@@ -68,7 +64,6 @@ public EventProcessor(EventSourceManager
eventSourceManager) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
- null,
reconciliationDispatcher,
metrics,
eventSourceManager);
@@ -78,17 +73,11 @@ public EventProcessor(EventSourceManager
eventSourceManager) {
private EventProcessor(
ControllerConfiguration controllerConfiguration,
Cache
cache,
- ExecutorService executor,
ReconciliationDispatcher
reconciliationDispatcher,
Metrics metrics,
EventSourceManager
eventSourceManager) {
this.controllerConfiguration = controllerConfiguration;
this.running = false;
- this.executor =
- executor == null
- ? new ScheduledThreadPoolExecutor(
- ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
- : executor;
this.reconciliationDispatcher = reconciliationDispatcher;
this.retry = controllerConfiguration.getRetry();
this.cache = cache;
@@ -376,6 +365,8 @@ public synchronized void stop() {
@Override
public void start() throws OperatorException {
+ // on restart new executor service is created and needs to be set here
+ executor = ExecutorServiceManager.instance().executorService();
this.running = true;
handleAlreadyMarkedEvents();
}
@@ -424,7 +415,8 @@ public void run() {
@Override
public String toString() {
- return controllerName() + " -> " + executionScope;
+ return controllerName() + " -> "
+ + (executionScope.getResource() != null ? executionScope : resourceID);
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 6a6aae471a..be412dcd68 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -97,7 +97,6 @@ public synchronized void stop() {
eventSources.additionalNamedEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
- eventSources.clear();
}
@SuppressWarnings("rawtypes")
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index c6636a4a5d..50ad1d6567 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -41,24 +41,31 @@ public class InformerManager> sources = new ConcurrentHashMap<>();
private Cloner cloner;
- private C configuration;
- private MixedOperation, Resource> client;
- private ResourceEventHandler eventHandler;
+ private final C configuration;
+ private final MixedOperation, Resource> client;
+ private final ResourceEventHandler eventHandler;
private final Map>> indexers = new HashMap<>();
+ public InformerManager(MixedOperation, Resource> client,
+ C configuration,
+ ResourceEventHandler eventHandler) {
+ this.client = client;
+ this.configuration = configuration;
+ this.eventHandler = eventHandler;
+ }
+
@Override
public void start() throws OperatorException {
+ initSources();
// make sure informers are all started before proceeding further
sources.values().parallelStream().forEach(InformerWrapper::start);
}
- void initSources(MixedOperation, Resource> client,
- C configuration, ResourceEventHandler eventHandler) {
+ private void initSources() {
+ if (!sources.isEmpty()) {
+ throw new IllegalStateException("Some sources already initialized.");
+ }
cloner = ConfigurationServiceProvider.instance().getResourceCloner();
- this.configuration = configuration;
- this.client = client;
- this.eventHandler = eventHandler;
-
final var targetNamespaces = configuration.getEffectiveNamespaces();
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES);
@@ -86,7 +93,6 @@ public void changeNamespaces(Set namespaces) {
namespaces.forEach(ns -> {
if (!sources.containsKey(ns)) {
final InformerWrapper source = createEventSourceForNamespace(ns);
- source.addIndexers(this.indexers);
source.start();
log.debug("Registered new {} -> {} for namespace: {}", this, source,
ns);
@@ -106,6 +112,7 @@ private InformerWrapper createEventSourceForNamespace(String namespace) {
client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()),
eventHandler, namespace);
}
+ source.addIndexers(indexers);
return source;
}
@@ -130,6 +137,7 @@ public void stop() {
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
}
});
+ sources.clear();
}
@Override
@@ -177,7 +185,6 @@ private Optional> getSource(String namespace) {
@Override
public void addIndexers(Map>> indexers) {
this.indexers.putAll(indexers);
- sources.values().forEach(s -> s.addIndexers(indexers));
}
@Override
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index 3e7400b90f..d4fed3b816 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource temporaryResourceCache;
- protected InformerManager cache = new InformerManager<>();
+ protected InformerManager cache;
protected C configuration;
protected ManagedInformerEventSource(
MixedOperation, Resource> client, C configuration) {
super(configuration.getResourceClass());
temporaryResourceCache = new TemporaryResourceCache<>(this);
- manager().initSources(client, configuration, this);
+ cache = new InformerManager<>(client, configuration, this);
this.configuration = configuration;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
index f22400453a..fe641e0b0b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
@@ -4,7 +4,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,16 +19,16 @@ public class TimerEventSource
implements ResourceEventAware {
private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class);
- private final Timer timer = new Timer(true);
- private final AtomicBoolean running = new AtomicBoolean();
+ private Timer timer;
private final Map onceTasks = new ConcurrentHashMap<>();
+ @SuppressWarnings("unused")
public void scheduleOnce(R resource, long delay) {
scheduleOnce(ResourceID.fromResource(resource), delay);
}
public void scheduleOnce(ResourceID resourceID, long delay) {
- if (!running.get()) {
+ if (!isRunning()) {
throw new IllegalStateException("The TimerEventSource is not running");
}
@@ -55,14 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) {
@Override
public void start() {
- running.set(true);
+ if (!isRunning()) {
+ super.start();
+ timer = new Timer(true);
+ }
}
@Override
public void stop() {
- running.set(false);
- onceTasks.keySet().forEach(this::cancelOnceSchedule);
- timer.cancel();
+ if (isRunning()) {
+ onceTasks.keySet().forEach(this::cancelOnceSchedule);
+ timer.cancel();
+ super.stop();
+ }
}
public class EventProducerTimeTask extends TimerTask {
@@ -75,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) {
@Override
public void run() {
- if (running.get()) {
+ if (isRunning()) {
log.debug("Producing event for custom resource id: {}", customResourceUid);
getEventHandler().handleEvent(new Event(customResourceUid));
}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java
new file mode 100644
index 0000000000..45b88a126b
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java
@@ -0,0 +1,63 @@
+package io.javaoperatorsdk.operator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource;
+import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+class OperatorRestartIT {
+ private final static KubernetesClient client = new KubernetesClientBuilder().build();
+ private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false));
+ private final static RestartTestReconciler reconciler = new RestartTestReconciler();
+ private static int reconcileNumberBeforeStop = 0;
+
+ @BeforeAll
+ static void registerReconciler() {
+ LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client);
+ operator.register(reconciler);
+ }
+
+ @BeforeEach
+ void startOperator() {
+ operator.start();
+ }
+
+ @AfterEach
+ void stopOperator() {
+ operator.stop();
+ }
+
+ @Test
+ @Order(1)
+ void createResource() {
+ client.resource(testCustomResource()).createOrReplace();
+ await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0));
+ reconcileNumberBeforeStop = reconciler.getNumberOfExecutions();
+ }
+
+ @Test
+ @Order(2)
+ void reconcile() {
+ await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions())
+ .isGreaterThan(reconcileNumberBeforeStop));
+ }
+
+ RestartTestCustomResource testCustomResource() {
+ RestartTestCustomResource cr = new RestartTestCustomResource();
+ cr.setMetadata(new ObjectMetaBuilder()
+ .withName("test1")
+ .build());
+ return cr;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java
new file mode 100644
index 0000000000..edb4e2baff
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java
@@ -0,0 +1,35 @@
+package io.javaoperatorsdk.operator.sample.restart;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource;
+import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
+
+@KubernetesDependent(labelSelector = "app=restart-test")
+public class ConfigMapDependentResource
+ extends CRUDKubernetesDependentResource {
+
+ public static final String DATA_KEY = "key";
+
+ public ConfigMapDependentResource() {
+ super(ConfigMap.class);
+ }
+
+ @Override
+ protected ConfigMap desired(RestartTestCustomResource primary,
+ Context context) {
+ return new ConfigMapBuilder()
+ .withMetadata(new ObjectMetaBuilder()
+ .withLabels(Map.of("app", "restart-test"))
+ .withName(primary.getMetadata().getName())
+ .withNamespace(primary.getMetadata().getNamespace())
+ .build())
+ .withData(Map.of(DATA_KEY, primary.getMetadata().getName()))
+ .build();
+
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java
new file mode 100644
index 0000000000..a3bcd31053
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java
@@ -0,0 +1,15 @@
+package io.javaoperatorsdk.operator.sample.restart;
+
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+@Group("sample.javaoperatorsdk")
+@Version("v1")
+@ShortNames("rt")
+public class RestartTestCustomResource
+ extends CustomResource
+ implements Namespaced {
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java
new file mode 100644
index 0000000000..decd9b597b
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java
@@ -0,0 +1,31 @@
+package io.javaoperatorsdk.operator.sample.restart;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
+import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;
+
+@ControllerConfiguration(
+ dependents = @Dependent(type = ConfigMapDependentResource.class))
+public class RestartTestReconciler
+ implements Reconciler, TestExecutionInfoProvider {
+
+ private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
+
+ @Override
+ public UpdateControl reconcile(
+ RestartTestCustomResource resource,
+ Context context) {
+ numberOfExecutions.addAndGet(1);
+ return UpdateControl.noUpdate();
+ }
+
+ public int getNumberOfExecutions() {
+ return numberOfExecutions.get();
+ }
+
+}