Skip to content

feat: operator can be restarted #1675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,14 @@ public void stop() throws OperatorException {
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();

ExecutorServiceManager.stop();
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
}

started = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
Expand Down Expand Up @@ -40,20 +38,18 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
private final ControllerConfiguration<?> controllerConfiguration;
private final ReconciliationDispatcher<P> reconciliationDispatcher;
private final Retry retry;
private final ExecutorService executor;
private final Metrics metrics;
private final Cache<P> cache;
private final EventSourceManager<P> eventSourceManager;
private final RateLimiter<? extends RateLimitState> rateLimiter;
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final Map<String, Object> metricsMetadata;

private ExecutorService executor;

public EventProcessor(EventSourceManager<P> eventSourceManager) {
this(
eventSourceManager.getController().getConfiguration(),
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager);
Expand All @@ -68,7 +64,6 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
null,
reconciliationDispatcher,
metrics,
eventSourceManager);
Expand All @@ -78,17 +73,11 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
private EventProcessor(
ControllerConfiguration controllerConfiguration,
Cache<P> cache,
ExecutorService executor,
ReconciliationDispatcher<P> reconciliationDispatcher,
Metrics metrics,
EventSourceManager<P> eventSourceManager) {
this.controllerConfiguration = controllerConfiguration;
this.running = false;
this.executor =
executor == null
? new ScheduledThreadPoolExecutor(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
: executor;
this.reconciliationDispatcher = reconciliationDispatcher;
this.retry = controllerConfiguration.getRetry();
this.cache = cache;
Expand Down Expand Up @@ -376,6 +365,8 @@ public synchronized void stop() {

@Override
public void start() throws OperatorException {
// on restart new executor service is created and needs to be set here
executor = ExecutorServiceManager.instance().executorService();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the executor service be shut down then in stop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but what I meant is that we're lacking symmetry between start and stop here (i.e. the executor service is stopped at another level). Maybe stop should set executor to null to prevent it from being used before being restarted? I guess this is more a theoretical question than a practical one.

this.running = true;
handleAlreadyMarkedEvents();
}
Expand Down Expand Up @@ -424,7 +415,8 @@ public void run() {

@Override
public String toString() {
return controllerName() + " -> " + executionScope;
return controllerName() + " -> "
+ (executionScope.getResource() != null ? executionScope : resourceID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public synchronized void stop() {
eventSources.additionalNamedEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
eventSources.clear();
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat

private final Map<String, InformerWrapper<T>> sources = new ConcurrentHashMap<>();
private Cloner cloner;
private C configuration;
private MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
private ResourceEventHandler<T> eventHandler;
private final C configuration;
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
private final ResourceEventHandler<T> eventHandler;
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();

public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
C configuration,
ResourceEventHandler<T> eventHandler) {
this.client = client;
this.configuration = configuration;
this.eventHandler = eventHandler;
}

@Override
public void start() throws OperatorException {
initSources();
// make sure informers are all started before proceeding further
sources.values().parallelStream().forEach(InformerWrapper::start);
}

void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
C configuration, ResourceEventHandler<T> eventHandler) {
private void initSources() {
if (!sources.isEmpty()) {
throw new IllegalStateException("Some sources already initialized.");
}
cloner = ConfigurationServiceProvider.instance().getResourceCloner();
this.configuration = configuration;
this.client = client;
this.eventHandler = eventHandler;

final var targetNamespaces = configuration.getEffectiveNamespaces();
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES);
Expand Down Expand Up @@ -86,7 +93,6 @@ public void changeNamespaces(Set<String> namespaces) {
namespaces.forEach(ns -> {
if (!sources.containsKey(ns)) {
final InformerWrapper<T> source = createEventSourceForNamespace(ns);
source.addIndexers(this.indexers);
source.start();
log.debug("Registered new {} -> {} for namespace: {}", this, source,
ns);
Expand All @@ -106,6 +112,7 @@ private InformerWrapper<T> createEventSourceForNamespace(String namespace) {
client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()),
eventHandler, namespace);
}
source.addIndexers(indexers);
return source;
}

Expand All @@ -130,6 +137,7 @@ public void stop() {
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
}
});
sources.clear();
}

@Override
Expand Down Expand Up @@ -177,7 +185,6 @@ private Optional<InformerWrapper<T>> getSource(String namespace) {
@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
this.indexers.putAll(indexers);
sources.values().forEach(s -> s.addIndexers(indexers));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);

protected TemporaryResourceCache<R> temporaryResourceCache;
protected InformerManager<R, C> cache = new InformerManager<>();
protected InformerManager<R, C> cache;
protected C configuration;

protected ManagedInformerEventSource(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
super(configuration.getResourceClass());
temporaryResourceCache = new TemporaryResourceCache<>(this);
manager().initSources(client, configuration, this);
cache = new InformerManager<>(client, configuration, this);
this.configuration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,16 +19,16 @@ public class TimerEventSource<R extends HasMetadata>
implements ResourceEventAware<R> {
private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class);

private final Timer timer = new Timer(true);
private final AtomicBoolean running = new AtomicBoolean();
private Timer timer;
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();

@SuppressWarnings("unused")
public void scheduleOnce(R resource, long delay) {
scheduleOnce(ResourceID.fromResource(resource), delay);
}

public void scheduleOnce(ResourceID resourceID, long delay) {
if (!running.get()) {
if (!isRunning()) {
throw new IllegalStateException("The TimerEventSource is not running");
}

Expand All @@ -55,14 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) {

@Override
public void start() {
running.set(true);
if (!isRunning()) {
super.start();
timer = new Timer(true);
}
}

@Override
public void stop() {
running.set(false);
onceTasks.keySet().forEach(this::cancelOnceSchedule);
timer.cancel();
if (isRunning()) {
onceTasks.keySet().forEach(this::cancelOnceSchedule);
timer.cancel();
super.stop();
}
}

public class EventProducerTimeTask extends TimerTask {
Expand All @@ -75,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) {

@Override
public void run() {
if (running.get()) {
if (isRunning()) {
log.debug("Producing event for custom resource id: {}", customResourceUid);
getEventHandler().handleEvent(new Event(customResourceUid));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource;
import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OperatorRestartIT {
private final static KubernetesClient client = new KubernetesClientBuilder().build();
private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false));
private final static RestartTestReconciler reconciler = new RestartTestReconciler();
private static int reconcileNumberBeforeStop = 0;

@BeforeAll
static void registerReconciler() {
LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client);
operator.register(reconciler);
}

@BeforeEach
void startOperator() {
operator.start();
}

@AfterEach
void stopOperator() {
operator.stop();
}

@Test
@Order(1)
void createResource() {
client.resource(testCustomResource()).createOrReplace();
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0));
reconcileNumberBeforeStop = reconciler.getNumberOfExecutions();
}

@Test
@Order(2)
void reconcile() {
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions())
.isGreaterThan(reconcileNumberBeforeStop));
}

RestartTestCustomResource testCustomResource() {
RestartTestCustomResource cr = new RestartTestCustomResource();
cr.setMetadata(new ObjectMetaBuilder()
.withName("test1")
.build());
return cr;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.javaoperatorsdk.operator.sample.restart;

import java.util.Map;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;

@KubernetesDependent(labelSelector = "app=restart-test")
public class ConfigMapDependentResource
extends CRUDKubernetesDependentResource<ConfigMap, RestartTestCustomResource> {

public static final String DATA_KEY = "key";

public ConfigMapDependentResource() {
super(ConfigMap.class);
}

@Override
protected ConfigMap desired(RestartTestCustomResource primary,
Context<RestartTestCustomResource> context) {
return new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withLabels(Map.of("app", "restart-test"))
.withName(primary.getMetadata().getName())
.withNamespace(primary.getMetadata().getNamespace())
.build())
.withData(Map.of(DATA_KEY, primary.getMetadata().getName()))
.build();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.restart;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("rt")
public class RestartTestCustomResource
extends CustomResource<Void, Void>
implements Namespaced {
}
Loading