Skip to content

Commit 5cad9c4

Browse files
committed
cleanup wip
1 parent 431ead4 commit 5cad9c4

File tree

4 files changed

+177
-113
lines changed

4 files changed

+177
-113
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import io.fabric8.kubernetes.api.model.HasMetadata;
1313
import io.javaoperatorsdk.operator.api.reconciler.Context;
14+
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1415

1516
public class WorkflowCleanupExecutor<P extends HasMetadata> {
1617

@@ -20,10 +21,8 @@ public class WorkflowCleanupExecutor<P extends HasMetadata> {
2021
new HashMap<>();
2122
private final Map<DependentResourceNode<?, ?>, Exception> exceptionsDuringExecution =
2223
new HashMap<>();
23-
private final Set<DependentResourceNode<?, ?>> alreadyReconciled = new HashSet<>();
24+
private final Set<DependentResourceNode<?, ?>> alreadyVisited = new HashSet<>();
2425
private final Set<DependentResourceNode<?, ?>> notReady = new HashSet<>();
25-
private final Set<DependentResourceNode<?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26-
new HashSet<>();
2726

2827
private final Workflow<P> workflow;
2928
private final P primary;
@@ -35,11 +34,13 @@ public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> conte
3534
this.context = context;
3635
}
3736

37+
// todo cleanup condition
38+
// todo error handling
3839

3940
public synchronized WorkflowCleanupResult cleanup() {
4041
for (DependentResourceNode<?, P> dependentResourceNode : workflow
4142
.getBottomLevelResource()) {
42-
handleCleanup(dependentResourceNode, false);
43+
handleCleanup(dependentResourceNode);
4344
}
4445
while (true) {
4546
try {
@@ -57,15 +58,11 @@ public synchronized WorkflowCleanupResult cleanup() {
5758
return createCleanupResult();
5859
}
5960

60-
private WorkflowCleanupResult createCleanupResult() {
61-
return new WorkflowCleanupResult();
62-
}
63-
6461
private synchronized boolean noMoreExecutionsScheduled() {
6562
return actualExecutions.isEmpty();
6663
}
6764

68-
private void handleCleanup(DependentResourceNode<?, P> dependentResourceNode, boolean b) {
65+
private synchronized void handleCleanup(DependentResourceNode<?, P> dependentResourceNode) {
6966
log.debug("Submitting for cleanup: {}", dependentResourceNode);
7067

7168
if (alreadyVisited(dependentResourceNode)
@@ -76,24 +73,30 @@ private void handleCleanup(DependentResourceNode<?, P> dependentResourceNode, bo
7673
return;
7774
}
7875

76+
Future<?> nodeFuture =
77+
workflow.getExecutorService().submit(
78+
new NodeExecutor(dependentResourceNode));
79+
actualExecutions.put(dependentResourceNode, nodeFuture);
80+
log.debug("Submitted to reconcile: {}", dependentResourceNode);
7981
}
8082

8183
private class NodeExecutor implements Runnable {
8284

8385
private final DependentResourceNode<?, P> dependentResourceNode;
84-
private final boolean onlyReconcileForPossibleDelete;
8586

86-
private NodeExecutor(DependentResourceNode<?, P> dependentResourceNode,
87-
boolean onlyReconcileForDelete) {
87+
private NodeExecutor(DependentResourceNode<?, P> dependentResourceNode) {
8888
this.dependentResourceNode = dependentResourceNode;
89-
this.onlyReconcileForPossibleDelete = onlyReconcileForDelete;
9089
}
9190

9291
@Override
9392
@SuppressWarnings("unchecked")
9493
public void run() {
9594
try {
96-
95+
if (dependentResourceNode.getDependentResource() instanceof Deleter) {
96+
// todo check if not garbage collected
97+
((Deleter<P>) dependentResourceNode.getDependentResource()).delete(primary, context);
98+
}
99+
handleDependentCleaned(dependentResourceNode);
97100
} catch (RuntimeException e) {
98101
handleExceptionInExecutor(dependentResourceNode, e);
99102
} finally {
@@ -102,12 +105,26 @@ public void run() {
102105
}
103106
}
104107

105-
private synchronized void handleExceptionInExecutor(DependentResourceNode dependentResourceNode,
108+
@SuppressWarnings("unchecked")
109+
private synchronized void handleDependentCleaned(
110+
DependentResourceNode<?, P> dependentResourceNode) {
111+
var dependOns = dependentResourceNode.getDependsOn();
112+
if (dependOns != null) {
113+
dependOns.forEach(d -> {
114+
log.debug("Handle cleanup for dependent: {} of parent:{}", d, dependentResourceNode);
115+
handleCleanup(d);
116+
});
117+
}
118+
}
119+
120+
private synchronized void handleExceptionInExecutor(
121+
DependentResourceNode<?, P> dependentResourceNode,
106122
RuntimeException e) {
107123
exceptionsDuringExecution.put(dependentResourceNode, e);
108124
}
109125

110-
private synchronized void handleNodeExecutionFinish(DependentResourceNode dependentResourceNode) {
126+
private synchronized void handleNodeExecutionFinish(
127+
DependentResourceNode<?, P> dependentResourceNode) {
111128
log.debug("Finished execution for: {}", dependentResourceNode);
112129
actualExecutions.remove(dependentResourceNode);
113130
if (actualExecutions.isEmpty()) {
@@ -119,23 +136,27 @@ private boolean isCleaningNow(DependentResourceNode<?, ?> dependentResourceNode)
119136
return actualExecutions.containsKey(dependentResourceNode);
120137
}
121138

122-
123139
private boolean alreadyVisited(
124140
DependentResourceNode<?, ?> dependentResourceNode) {
125-
return alreadyReconciled.contains(dependentResourceNode);
141+
return alreadyVisited.contains(dependentResourceNode);
126142
}
127143

128144
private boolean allParentsCleaned(
129145
DependentResourceNode<?, ?> dependentResourceNode) {
130-
return dependentResourceNode.getDependsOn().isEmpty()
131-
|| dependentResourceNode.getDependsOn().stream()
146+
var parents = workflow.getDependents().get(dependentResourceNode);
147+
return parents.isEmpty()
148+
|| parents.stream()
132149
.allMatch(d -> alreadyVisited(d) && !notReady.contains(d));
133150
}
134151

135152
private boolean hasErroredParent(
136153
DependentResourceNode<?, ?> dependentResourceNode) {
137-
return !dependentResourceNode.getDependsOn().isEmpty()
138-
&& dependentResourceNode.getDependsOn().stream()
139-
.anyMatch(exceptionsDuringExecution::containsKey);
154+
var parents = workflow.getDependents().get(dependentResourceNode);
155+
return !parents.isEmpty()
156+
&& parents.stream().anyMatch(exceptionsDuringExecution::containsKey);
157+
}
158+
159+
private WorkflowCleanupResult createCleanupResult() {
160+
return new WorkflowCleanupResult();
140161
}
141162
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.List;
6+
import java.util.Optional;
7+
8+
import io.javaoperatorsdk.operator.api.reconciler.Context;
9+
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
10+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
11+
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
12+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
13+
14+
public class AbstractWorkflowExecutorTest {
15+
public static final String VALUE = "value";
16+
17+
protected TestDependent dr1 = new TestDependent("DR_1");
18+
protected TestDependent dr2 = new TestDependent("DR_2");
19+
protected TestDeleterDependent drDeleter = new TestDeleterDependent("DR_DELETER");
20+
protected TestErrorDependent drError = new TestErrorDependent("ERROR_1");
21+
22+
protected List<ReconcileRecord> executionHistory =
23+
Collections.synchronizedList(new ArrayList<>());
24+
25+
public class TestDependent implements DependentResource<String, TestCustomResource> {
26+
27+
private String name;
28+
29+
public TestDependent(String name) {
30+
this.name = name;
31+
}
32+
33+
@Override
34+
public ReconcileResult<String> reconcile(TestCustomResource primary,
35+
Context<TestCustomResource> context) {
36+
executionHistory.add(new ReconcileRecord(this));
37+
return ReconcileResult.resourceCreated(VALUE);
38+
}
39+
40+
@Override
41+
public Class<String> resourceType() {
42+
return String.class;
43+
}
44+
45+
@Override
46+
public Optional<String> getSecondaryResource(TestCustomResource primary) {
47+
return Optional.of(VALUE);
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return name;
53+
}
54+
}
55+
56+
public class TestDeleterDependent extends TestDependent implements Deleter<TestCustomResource> {
57+
58+
public TestDeleterDependent(String name) {
59+
super(name);
60+
}
61+
62+
@Override
63+
public void delete(TestCustomResource primary, Context<TestCustomResource> context) {
64+
executionHistory.add(new ReconcileRecord(this, true));
65+
}
66+
}
67+
68+
public class TestErrorDependent implements DependentResource<String, TestCustomResource> {
69+
private String name;
70+
71+
public TestErrorDependent(String name) {
72+
this.name = name;
73+
}
74+
75+
@Override
76+
public ReconcileResult<String> reconcile(TestCustomResource primary,
77+
Context<TestCustomResource> context) {
78+
executionHistory.add(new ReconcileRecord(this));
79+
throw new IllegalStateException("Test exception");
80+
}
81+
82+
@Override
83+
public Class<String> resourceType() {
84+
return String.class;
85+
}
86+
87+
@Override
88+
public Optional<String> getSecondaryResource(TestCustomResource primary) {
89+
return Optional.of(VALUE);
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return name;
95+
}
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import org.assertj.core.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
6+
import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder;
7+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
8+
9+
import static io.javaoperatorsdk.operator.processing.dependent.workflow.ExecutionAssert.assertThat;
10+
import static org.junit.jupiter.api.Assertions.*;
11+
12+
class WorkflowCleanupExecutorTest extends AbstractWorkflowExecutorTest {
13+
14+
protected TestDeleterDependent dd1 = new TestDeleterDependent("DR_DELETER_1");
15+
protected TestDeleterDependent dd2 = new TestDeleterDependent("DR_DELETER_2");
16+
protected TestDeleterDependent dd3 = new TestDeleterDependent("DR_DELETER_3");
17+
18+
@Test
19+
void cleanUpDiamondWorkflow() {
20+
var workflow = new WorkflowBuilder<TestCustomResource>()
21+
.addDependent(dd1).build()
22+
.addDependent(dr1).dependsOn(dd1).build()
23+
.addDependent(dd2).dependsOn(dd1).build()
24+
.addDependent(dd3).dependsOn(dr1,dd2).build()
25+
.build();
26+
27+
var res = workflow.cleanup(new TestCustomResource(), null);
28+
29+
30+
assertThat(executionHistory).reconciledInOrder(dd1, dd2, dd3).reconciledInOrder();
31+
}
32+
33+
34+
35+
}

0 commit comments

Comments
 (0)