Skip to content

Commit 2f7f18a

Browse files
committed
Workflow engine implementation (#1153)
1 parent 49a1091 commit 2f7f18a

File tree

17 files changed

+1928
-4
lines changed

17 files changed

+1928
-4
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/UpdateControl.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
import io.fabric8.kubernetes.api.model.HasMetadata;
44
import io.fabric8.kubernetes.client.CustomResource;
55

6-
public class UpdateControl<T extends HasMetadata> extends BaseControl<UpdateControl<T>> {
6+
public class UpdateControl<P extends HasMetadata> extends BaseControl<UpdateControl<P>> {
77

8-
private final T resource;
8+
private final P resource;
99
private final boolean updateStatus;
1010
private final boolean updateResource;
1111
private final boolean patch;
1212

1313
private UpdateControl(
14-
T resource, boolean updateStatus, boolean updateResource, boolean patch) {
14+
P resource, boolean updateStatus, boolean updateResource, boolean patch) {
1515
if ((updateResource || updateStatus) && resource == null) {
1616
throw new IllegalArgumentException("CustomResource cannot be null in case of update");
1717
}
@@ -92,7 +92,7 @@ public static <T extends HasMetadata> UpdateControl<T> noUpdate() {
9292
return new UpdateControl<>(null, false, false, false);
9393
}
9494

95-
public T getResource() {
95+
public P getResource() {
9696
return resource;
9797
}
9898

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.api.reconciler.Context;
5+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
6+
7+
public interface Condition<R, P extends HasMetadata> {
8+
9+
boolean isMet(DependentResource<R, P> dependentResource, P primary, Context<P> context);
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.LinkedList;
4+
import java.util.List;
5+
import java.util.Optional;
6+
import java.util.stream.Collectors;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
10+
11+
@SuppressWarnings("rawtypes")
12+
public class DependentResourceNode<R, P extends HasMetadata> {
13+
14+
private final DependentResource<R, P> dependentResource;
15+
private Condition reconcileCondition;
16+
private Condition deletePostCondition;
17+
private Condition readyCondition;
18+
private final List<DependentResourceNode> dependsOn = new LinkedList<>();
19+
private final List<DependentResourceNode> parents = new LinkedList<>();
20+
21+
public DependentResourceNode(DependentResource<R, P> dependentResource) {
22+
this(dependentResource, null, null);
23+
}
24+
25+
public DependentResourceNode(DependentResource<R, P> dependentResource,
26+
Condition reconcileCondition) {
27+
this(dependentResource, reconcileCondition, null);
28+
}
29+
30+
public DependentResourceNode(DependentResource<R, P> dependentResource,
31+
Condition reconcileCondition, Condition deletePostCondition) {
32+
this.dependentResource = dependentResource;
33+
this.reconcileCondition = reconcileCondition;
34+
this.deletePostCondition = deletePostCondition;
35+
}
36+
37+
public DependentResource<R, P> getDependentResource() {
38+
return dependentResource;
39+
}
40+
41+
public Optional<Condition> getReconcileCondition() {
42+
return Optional.ofNullable(reconcileCondition);
43+
}
44+
45+
public Optional<Condition> getDeletePostCondition() {
46+
return Optional.ofNullable(deletePostCondition);
47+
}
48+
49+
public List<DependentResourceNode> getDependsOn() {
50+
return dependsOn;
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
public void addDependsOnRelation(DependentResourceNode node) {
55+
node.parents.add(this);
56+
dependsOn.add(node);
57+
}
58+
59+
@Override
60+
public String toString() {
61+
return "{"
62+
+ parents.stream().map(p -> p.dependentResource.toString())
63+
.collect(Collectors.joining(", ", "[", "]->"))
64+
+ "(" + dependentResource + ")"
65+
+ dependsOn.stream().map(d -> d.dependentResource.toString())
66+
.collect(Collectors.joining(", ", "->[", "]"))
67+
+ '}';
68+
}
69+
70+
public DependentResourceNode<R, P> setReconcileCondition(
71+
Condition reconcileCondition) {
72+
this.reconcileCondition = reconcileCondition;
73+
return this;
74+
}
75+
76+
public DependentResourceNode<R, P> setDeletePostCondition(Condition cleanupCondition) {
77+
this.deletePostCondition = cleanupCondition;
78+
return this;
79+
}
80+
81+
public Optional<Condition<R, P>> getReadyCondition() {
82+
return Optional.ofNullable(readyCondition);
83+
}
84+
85+
public DependentResourceNode<R, P> setReadyCondition(Condition readyCondition) {
86+
this.readyCondition = readyCondition;
87+
return this;
88+
}
89+
90+
public List<DependentResourceNode> getParents() {
91+
return parents;
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
10+
import io.javaoperatorsdk.operator.api.reconciler.Context;
11+
12+
/**
13+
* Dependents definition: so if B depends on A, the B is dependent of A.
14+
*
15+
* @param <P> primary resource
16+
*/
17+
@SuppressWarnings("rawtypes")
18+
public class Workflow<P extends HasMetadata> {
19+
20+
public static final boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true;
21+
22+
private final Set<DependentResourceNode> dependentResourceNodes;
23+
private final Set<DependentResourceNode> topLevelResources = new HashSet<>();
24+
private final Set<DependentResourceNode> bottomLevelResource = new HashSet<>();
25+
26+
private final boolean throwExceptionAutomatically;
27+
// it's "global" executor service shared between multiple reconciliations running parallel
28+
private ExecutorService executorService;
29+
30+
public Workflow(Set<DependentResourceNode> dependentResourceNodes) {
31+
this.executorService = ConfigurationServiceProvider.instance().getExecutorService();
32+
this.dependentResourceNodes = dependentResourceNodes;
33+
this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT;
34+
preprocessForReconcile();
35+
}
36+
37+
public Workflow(Set<DependentResourceNode> dependentResourceNodes,
38+
ExecutorService executorService, boolean throwExceptionAutomatically) {
39+
this.executorService = executorService;
40+
this.dependentResourceNodes = dependentResourceNodes;
41+
this.throwExceptionAutomatically = throwExceptionAutomatically;
42+
preprocessForReconcile();
43+
}
44+
45+
public Workflow(Set<DependentResourceNode> dependentResourceNodes, int globalParallelism) {
46+
this(dependentResourceNodes, Executors.newFixedThreadPool(globalParallelism), true);
47+
}
48+
49+
public WorkflowExecutionResult reconcile(P primary, Context<P> context) {
50+
WorkflowReconcileExecutor<P> workflowReconcileExecutor =
51+
new WorkflowReconcileExecutor<>(this, primary, context);
52+
var result = workflowReconcileExecutor.reconcile();
53+
if (throwExceptionAutomatically) {
54+
result.throwAggregateExceptionIfErrorsPresent();
55+
}
56+
return result;
57+
}
58+
59+
public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
60+
WorkflowCleanupExecutor<P> workflowCleanupExecutor =
61+
new WorkflowCleanupExecutor<>(this, primary, context);
62+
var result = workflowCleanupExecutor.cleanup();
63+
if (throwExceptionAutomatically) {
64+
result.throwAggregateExceptionIfErrorsPresent();
65+
}
66+
return result;
67+
}
68+
69+
// add cycle detection?
70+
private void preprocessForReconcile() {
71+
bottomLevelResource.addAll(dependentResourceNodes);
72+
for (DependentResourceNode<?, P> node : dependentResourceNodes) {
73+
if (node.getDependsOn().isEmpty()) {
74+
topLevelResources.add(node);
75+
} else {
76+
for (DependentResourceNode dependsOn : node.getDependsOn()) {
77+
bottomLevelResource.remove(dependsOn);
78+
}
79+
}
80+
}
81+
}
82+
83+
public boolean isThrowExceptionAutomatically() {
84+
return throwExceptionAutomatically;
85+
}
86+
87+
public void setExecutorService(ExecutorService executorService) {
88+
this.executorService = executorService;
89+
}
90+
91+
Set<DependentResourceNode> getTopLevelDependentResources() {
92+
return topLevelResources;
93+
}
94+
95+
Set<DependentResourceNode> getBottomLevelResource() {
96+
return bottomLevelResource;
97+
}
98+
99+
ExecutorService getExecutorService() {
100+
return executorService;
101+
}
102+
}

0 commit comments

Comments
 (0)