Skip to content

Commit 9f2d53e

Browse files
committed
refactor: event processor to controller
1 parent 0e77895 commit 9f2d53e

12 files changed

+290
-142
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.util.Collection;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import io.javaoperatorsdk.operator.processing.Controller;
12+
import io.javaoperatorsdk.operator.processing.LifecycleAware;
13+
14+
/**
15+
* Not confuse with controller manager form go operators. The high level aggregate is Operator in
16+
* JOSDK.
17+
*/
18+
class ControllerManager implements LifecycleAware {
19+
20+
private static final Logger log = LoggerFactory.getLogger(ControllerManager.class);
21+
22+
private final Map<String, Controller> controllers = new HashMap<>();
23+
private boolean started = false;
24+
25+
public synchronized void shouldStart() {
26+
if (started) {
27+
return;
28+
}
29+
if (controllers.isEmpty()) {
30+
throw new OperatorException("No Controller exists. Exiting!");
31+
}
32+
}
33+
34+
public synchronized void start() {
35+
controllers().parallelStream().forEach(Controller::start);
36+
started = true;
37+
}
38+
39+
public synchronized void stop() {
40+
controllers().parallelStream().forEach(closeable -> {
41+
log.debug("closing {}", closeable);
42+
closeable.stop();
43+
});
44+
45+
started = false;
46+
}
47+
48+
@SuppressWarnings("unchecked")
49+
synchronized void add(Controller controller) {
50+
final var configuration = controller.getConfiguration();
51+
final var resourceTypeName = ReconcilerUtils
52+
.getResourceTypeNameWithVersion(configuration.getResourceClass());
53+
final var existing = controllers.get(resourceTypeName);
54+
if (existing != null) {
55+
throw new OperatorException("Cannot register controller '" + configuration.getName()
56+
+ "': another controller named '" + existing.getConfiguration().getName()
57+
+ "' is already registered for resource '" + resourceTypeName + "'");
58+
}
59+
controllers.put(resourceTypeName, controller);
60+
if (started) {
61+
controller.start();
62+
}
63+
}
64+
65+
synchronized Optional<Controller> get(String name) {
66+
return controllers().stream()
67+
.filter(c -> name.equals(c.getConfiguration().getName()))
68+
.findFirst();
69+
}
70+
71+
synchronized Collection<Controller> controllers() {
72+
return controllers.values();
73+
}
74+
75+
synchronized int size() {
76+
return controllers.size();
77+
}
78+
}
79+

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

-61
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package io.javaoperatorsdk.operator;
22

3-
import java.util.Collection;
4-
import java.util.HashMap;
53
import java.util.HashSet;
6-
import java.util.Map;
74
import java.util.Optional;
85
import java.util.Set;
96
import java.util.function.Consumer;
@@ -202,62 +199,4 @@ public int getRegisteredControllersNumber() {
202199
return controllers.size();
203200
}
204201

205-
static class ControllerManager implements LifecycleAware {
206-
private final Map<String, Controller> controllers = new HashMap<>();
207-
private boolean started = false;
208-
209-
public synchronized void shouldStart() {
210-
if (started) {
211-
return;
212-
}
213-
if (controllers.isEmpty()) {
214-
throw new OperatorException("No Controller exists. Exiting!");
215-
}
216-
}
217-
218-
public synchronized void start() {
219-
controllers().parallelStream().forEach(Controller::start);
220-
started = true;
221-
}
222-
223-
public synchronized void stop() {
224-
controllers().parallelStream().forEach(closeable -> {
225-
log.debug("closing {}", closeable);
226-
closeable.stop();
227-
});
228-
229-
started = false;
230-
}
231-
232-
@SuppressWarnings("unchecked")
233-
synchronized void add(Controller controller) {
234-
final var configuration = controller.getConfiguration();
235-
final var resourceTypeName = ReconcilerUtils
236-
.getResourceTypeNameWithVersion(configuration.getResourceClass());
237-
final var existing = controllers.get(resourceTypeName);
238-
if (existing != null) {
239-
throw new OperatorException("Cannot register controller '" + configuration.getName()
240-
+ "': another controller named '" + existing.getConfiguration().getName()
241-
+ "' is already registered for resource '" + resourceTypeName + "'");
242-
}
243-
controllers.put(resourceTypeName, controller);
244-
if (started) {
245-
controller.start();
246-
}
247-
}
248-
249-
synchronized Optional<Controller> get(String name) {
250-
return controllers().stream()
251-
.filter(c -> name.equals(c.getConfiguration().getName()))
252-
.findFirst();
253-
}
254-
255-
synchronized Collection<Controller> controllers() {
256-
return controllers.values();
257-
}
258-
259-
synchronized int size() {
260-
return controllers.size();
261-
}
262-
}
263202
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.util.Optional;
34
import java.util.Set;
45
import java.util.concurrent.ExecutorService;
56
import java.util.concurrent.Executors;
@@ -132,4 +133,9 @@ default ObjectMapper getObjectMapper() {
132133
default DependentResourceFactory dependentResourceFactory() {
133134
return new DependentResourceFactory() {};
134135
}
136+
137+
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
138+
return Optional.empty();
139+
}
140+
135141
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.util.Optional;
34
import java.util.Set;
45
import java.util.concurrent.ExecutorService;
56
import java.util.function.Consumer;
@@ -18,6 +19,7 @@ public class ConfigurationServiceOverrider {
1819
private int timeoutSeconds;
1920
private boolean closeClientOnStop;
2021
private ExecutorService executorService = null;
22+
private LeaderElectionConfiguration leaderElectionConfiguration;
2123

2224
ConfigurationServiceOverrider(ConfigurationService original) {
2325
this.original = original;
@@ -71,6 +73,11 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo
7173
return this;
7274
}
7375

76+
public LeaderElectionConfiguration withLeaderElectionConfiguration(
77+
LeaderElectionConfiguration leaderElectionConfiguration) {
78+
return leaderElectionConfiguration;
79+
}
80+
7481
public ConfigurationService build() {
7582
return new BaseConfigurationService(original.getVersion()) {
7683
@Override
@@ -121,6 +128,11 @@ public ExecutorService getExecutorService() {
121128
return super.getExecutorService();
122129
}
123130
}
131+
132+
@Override
133+
public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
134+
return Optional.ofNullable(leaderElectionConfiguration);
135+
}
124136
};
125137
}
126138

Original file line numberDiff line numberDiff line change
@@ -1,13 +1,64 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
4+
5+
// todo discuss leader election with lease vs for life, other options:
6+
// see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/manager#Options
37
public class LeaderElectionConfiguration {
48

5-
private String leaderElectionNamespaces;
6-
private String leaderElectionID;
9+
public static final Duration LEASE_DURATION_DEFAULT_VALUE = Duration.ofSeconds(15);
10+
public static final Duration RENEW_DEADLINE_DEFAULT_VALUE = Duration.ofSeconds(10);
11+
public static final Duration RETRY_PERIOD_DEFAULT_VALUE = Duration.ofSeconds(2);
712

813
// todo discuss
9-
private boolean syncEventSources;
14+
// private boolean syncEventSources;
15+
16+
private final String leaseName;
17+
private final String leaseNamespace;
18+
19+
private final Duration leaseDuration;
20+
private final Duration renewDeadline;
21+
private final Duration retryPeriod;
22+
23+
public LeaderElectionConfiguration(String leaseName, String leaseNamespace) {
24+
this(
25+
leaseName,
26+
leaseNamespace,
27+
LEASE_DURATION_DEFAULT_VALUE,
28+
RENEW_DEADLINE_DEFAULT_VALUE,
29+
RETRY_PERIOD_DEFAULT_VALUE);
30+
}
31+
32+
public LeaderElectionConfiguration(
33+
String leaseName,
34+
String leaseNamespace,
35+
Duration leaseDuration,
36+
Duration renewDeadline,
37+
Duration retryPeriod) {
38+
this.leaseName = leaseName;
39+
this.leaseNamespace = leaseNamespace;
40+
this.leaseDuration = leaseDuration;
41+
this.renewDeadline = renewDeadline;
42+
this.retryPeriod = retryPeriod;
43+
}
44+
45+
public String getLeaseNamespace() {
46+
return leaseNamespace;
47+
}
48+
49+
public String getLeaseName() {
50+
return leaseName;
51+
}
52+
53+
public Duration getLeaseDuration() {
54+
return leaseDuration;
55+
}
56+
57+
public Duration getRenewDeadline() {
58+
return renewDeadline;
59+
}
1060

11-
// todo leader election with lease vs for life, other options:
12-
// see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/manager#Options
61+
public Duration getRetryPeriod() {
62+
return retryPeriod;
63+
}
1364
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import java.time.Duration;
4+
5+
import static io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration.*;
6+
7+
public final class LeaderElectionConfigurationBuilder {
8+
9+
private String leaseName;
10+
private String leaseNamespace;
11+
private Duration leaseDuration = LEASE_DURATION_DEFAULT_VALUE;
12+
private Duration renewDeadline = RENEW_DEADLINE_DEFAULT_VALUE;
13+
private Duration retryPeriod = RETRY_PERIOD_DEFAULT_VALUE;
14+
15+
private LeaderElectionConfigurationBuilder() {}
16+
17+
public static LeaderElectionConfigurationBuilder aLeaderElectionConfiguration() {
18+
return new LeaderElectionConfigurationBuilder();
19+
}
20+
21+
public LeaderElectionConfigurationBuilder withLeaseName(String leaseName) {
22+
this.leaseName = leaseName;
23+
return this;
24+
}
25+
26+
public LeaderElectionConfigurationBuilder withLeaseNamespace(String leaseNamespace) {
27+
this.leaseNamespace = leaseNamespace;
28+
return this;
29+
}
30+
31+
public LeaderElectionConfigurationBuilder withLeaseDuration(Duration leaseDuration) {
32+
this.leaseDuration = leaseDuration;
33+
return this;
34+
}
35+
36+
public LeaderElectionConfigurationBuilder withRenewDeadline(Duration renewDeadline) {
37+
this.renewDeadline = renewDeadline;
38+
return this;
39+
}
40+
41+
public LeaderElectionConfigurationBuilder withRetryPeriod(Duration retryPeriod) {
42+
this.retryPeriod = retryPeriod;
43+
return this;
44+
}
45+
46+
public LeaderElectionConfiguration build() {
47+
return new LeaderElectionConfiguration(leaseName, leaseNamespace, leaseDuration, renewDeadline,
48+
retryPeriod);
49+
}
50+
}

0 commit comments

Comments
 (0)