diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index e65f34df48..0346e25f73 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -141,4 +141,8 @@ public void put(ResourceID key, T resource) { getSource(key.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY)) .ifPresent(c -> c.put(key, resource)); } + + int numberOfSources() { + return sources.size(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index c7de41f331..57664039a9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -18,7 +18,7 @@ class InformerWrapper private final SharedIndexInformer informer; private final InformerResourceCache cache; - public InformerWrapper(SharedIndexInformer informer) { + InformerWrapper(SharedIndexInformer informer) { this.informer = informer; this.cache = new InformerResourceCache<>(informer); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 1dcfa2aa0e..9076d937a9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -1,5 +1,10 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.dsl.MixedOperation; @@ -13,15 +18,31 @@ public abstract class ManagedInformerEventSource implements ResourceEventHandler { + @SuppressWarnings("rawtypes") + private static final ConcurrentMap managedInformers = + new ConcurrentHashMap<>(); + protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); + initCache(configuration); manager().initSources(client, configuration, this); } + @SuppressWarnings("unchecked") + private void initCache(C configuration) { + final var key = new ResourceConfigurationAsKey(configuration); + var existing = managedInformers.get(key); + if (existing == null) { + existing = new InformerManager<>(); + managedInformers.put(key, existing); + } + cache = existing; + } + @Override protected UpdatableCache initCache() { - return new InformerManager<>(); + return null; // cache needs the configuration to be properly initialized } protected InformerManager manager() { @@ -30,13 +51,63 @@ protected InformerManager manager() { @Override public void start() { - manager().start(); - super.start(); + if (!isRunning()) { + manager().start(); + super.start(); + } } @Override public void stop() { - super.stop(); - manager().stop(); + if (isRunning()) { + super.stop(); + manager().stop(); + } + } + + @SuppressWarnings("rawtypes") + private static class ResourceConfigurationAsKey { + private final ResourceConfiguration configuration; + + private ResourceConfigurationAsKey(ResourceConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final var that = (ResourceConfigurationAsKey) o; + if (configuration == that.configuration) { + return true; + } + + return Objects.equals(getLabelSelector(), that.getLabelSelector()) + && Objects.equals(getResourceClass(), that.getResourceClass()) + && Objects.equals(getNamespaces(), that.getNamespaces()); + } + + @Override + public int hashCode() { + return Objects.hash(getLabelSelector(), getResourceClass(), getNamespaces()); + } + + public String getLabelSelector() { + return configuration.getLabelSelector(); + } + + public Class getResourceClass() { + return configuration.getResourceClass(); + } + + @SuppressWarnings("unchecked") + public Set getNamespaces() { + return configuration.getNamespaces(); + } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSourceTest.java new file mode 100644 index 0000000000..45e70f51a3 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSourceTest.java @@ -0,0 +1,53 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.MockKubernetesClient; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.Mockito.mock; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ManagedInformerEventSourceTest { + @Test + void sourcesWithSameConfigurationShouldShareInformer() { + final var namespaces = new String[] {"foo", "bar"}; + final var config = InformerConfiguration.from(mock(ConfigurationService.class), + HasMetadata.class) + .withLabelSelector("label=value").withNamespaces(namespaces) + .build(); + + final var informer1 = + new InformerEventSource<>(config, MockKubernetesClient.client(HasMetadata.class)); + final var informer2 = + new InformerEventSource<>(config, MockKubernetesClient.client(HasMetadata.class)); + + final var manager = informer1.manager(); + assertEquals(manager, informer2.manager()); + assertEquals(namespaces.length, manager.numberOfSources()); + } + + @Test + void sourcesWithDifferentConfigurationsShouldNotShareInformer() { + final var config1 = InformerConfiguration.from(mock(ConfigurationService.class), + HasMetadata.class) + .withLabelSelector("label=value").withNamespaces("foo", "bar") + .build(); + + final var config2 = InformerConfiguration.from(config1) + .withLabelSelector("label=otherValue").withNamespaces("baz") + .build(); + + final var informer1 = + new InformerEventSource<>(config1, MockKubernetesClient.client(HasMetadata.class)); + final var informer2 = + new InformerEventSource<>(config2, MockKubernetesClient.client(HasMetadata.class)); + + assertNotEquals(informer1.manager(), informer2.manager()); + assertEquals(1, informer2.manager().numberOfSources()); + } +}