11
11
import io .javaoperatorsdk .operator .api .reconciler .Context ;
12
12
import io .javaoperatorsdk .operator .api .reconciler .dependent .Deleter ;
13
13
import io .javaoperatorsdk .operator .api .reconciler .dependent .DependentResource ;
14
+ import io .javaoperatorsdk .operator .api .reconciler .dependent .GarbageCollected ;
14
15
15
16
public class WorkflowReconcileExecutor <P extends HasMetadata > {
16
17
17
18
private static final Logger log = LoggerFactory .getLogger (WorkflowReconcileExecutor .class );
18
19
19
20
private final Workflow <P > workflow ;
20
21
22
+ /** Covers both deleted and reconciled */
21
23
private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
22
24
private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
23
- private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
24
- new HashSet <>();
25
25
private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
26
26
new HashMap <>();
27
27
private final Map <DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
28
28
new HashMap <>();
29
29
30
+ private final Set <DependentResourceNode <?, ?>> markedForDelete = new HashSet <>();
31
+ private final Set <DependentResourceNode <?, ?>> deleteConditionNotMet = new HashSet <>();
32
+
30
33
private final P primary ;
31
34
private final Context <P > context ;
32
35
@@ -43,7 +46,7 @@ public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> con
43
46
public synchronized WorkflowExecutionResult reconcile () {
44
47
for (DependentResourceNode <?, P > dependentResourceNode : workflow
45
48
.getTopLevelDependentResources ()) {
46
- handleReconcile (dependentResourceNode , false );
49
+ handleReconcile (dependentResourceNode );
47
50
}
48
51
while (true ) {
49
52
try {
@@ -62,32 +65,58 @@ public synchronized WorkflowExecutionResult reconcile() {
62
65
}
63
66
64
67
private synchronized void handleReconcile (
65
- DependentResourceNode <?, P > dependentResourceNode ,
66
- boolean onlyReconcileForPossibleDelete ) {
68
+ DependentResourceNode <?, P > dependentResourceNode ) {
67
69
log .debug ("Submitting for reconcile: {}" , dependentResourceNode );
68
70
69
71
if (alreadyReconciled (dependentResourceNode )
70
72
|| isReconcilingNow (dependentResourceNode )
71
73
|| !allParentsReconciledAndReady (dependentResourceNode )
74
+ || markedForDelete .contains (dependentResourceNode )
72
75
|| hasErroredParent (dependentResourceNode )) {
73
76
log .debug ("Skipping submit of: {}, " , dependentResourceNode );
74
77
return ;
75
78
}
76
79
77
- if (onlyReconcileForPossibleDelete ) {
78
- ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
80
+ boolean reconcileConditionMet = dependentResourceNode .getReconcileCondition ().map (
81
+ rc -> rc .isMet (dependentResourceNode .getDependentResource (), primary , context ))
82
+ .orElse (true );
83
+
84
+ if (!reconcileConditionMet ) {
85
+ handleReconcileConditionNotMet (dependentResourceNode );
79
86
} else {
80
- dependentResourceNode .getReconcileCondition ()
81
- .ifPresent (reconcileCondition -> handleReconcileCondition (dependentResourceNode ,
82
- reconcileCondition ));
87
+ Future <?> nodeFuture =
88
+ workflow
89
+ .getExecutorService ()
90
+ .submit (
91
+ new NodeReconcileExecutor (
92
+ dependentResourceNode ));
93
+ actualExecutions .put (dependentResourceNode , nodeFuture );
94
+ log .debug ("Submitted to reconcile: {}" , dependentResourceNode );
95
+ }
96
+ }
97
+
98
+ private void handleDelete (DependentResourceNode dependentResourceNode ) {
99
+ log .debug ("Submitting for delete: {}" , dependentResourceNode );
100
+
101
+ if (alreadyReconciled (dependentResourceNode )
102
+ || isReconcilingNow (dependentResourceNode )
103
+ || !markedForDelete .contains (dependentResourceNode )
104
+ || !allDependentsDeletedAlready (dependentResourceNode )) {
105
+ log .debug ("Skipping submit for delete of: {}, " , dependentResourceNode );
106
+ return ;
83
107
}
84
108
85
109
Future <?> nodeFuture =
86
- workflow .getExecutorService ().submit (
87
- new NodeExecutor (dependentResourceNode ,
88
- ownOrParentsReconcileConditionNotMet (dependentResourceNode )));
110
+ workflow .getExecutorService ()
111
+ .submit (new NodeDeleteExecutor (dependentResourceNode ));
89
112
actualExecutions .put (dependentResourceNode , nodeFuture );
90
- log .debug ("Submitted to reconcile: {}" , dependentResourceNode );
113
+ log .debug ("Submitted to delete: {}" , dependentResourceNode );
114
+ }
115
+
116
+ private boolean allDependentsDeletedAlready (DependentResourceNode dependentResourceNode ) {
117
+ var dependents = workflow .getDependents (dependentResourceNode );
118
+ return dependents .stream ().allMatch (d -> alreadyReconciled .contains (d ) && !notReady .contains (d )
119
+ && !exceptionsDuringExecution .containsKey (d ));
91
120
}
92
121
93
122
@@ -112,22 +141,12 @@ private synchronized void setAlreadyReconciledButNotReady(
112
141
notReady .add (dependentResourceNode );
113
142
}
114
143
115
- private boolean ownOrParentsReconcileConditionNotMet (
116
- DependentResourceNode <?, ?> dependentResourceNode ) {
117
- return ownOrAncestorReconcileConditionConditionNotMet .contains (dependentResourceNode ) ||
118
- dependentResourceNode .getDependsOn ().stream ()
119
- .anyMatch (ownOrAncestorReconcileConditionConditionNotMet ::contains );
120
- }
121
-
122
- private class NodeExecutor implements Runnable {
144
+ private class NodeReconcileExecutor implements Runnable {
123
145
124
146
private final DependentResourceNode <?, P > dependentResourceNode ;
125
- private final boolean onlyReconcileForPossibleDelete ;
126
147
127
- private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ,
128
- boolean onlyReconcileForDelete ) {
148
+ private NodeReconcileExecutor (DependentResourceNode <?, P > dependentResourceNode ) {
129
149
this .dependentResourceNode = dependentResourceNode ;
130
- this .onlyReconcileForPossibleDelete = onlyReconcileForDelete ;
131
150
}
132
151
133
152
@ Override
@@ -136,23 +155,17 @@ public void run() {
136
155
try {
137
156
DependentResource dependentResource = dependentResourceNode .getDependentResource ();
138
157
boolean ready = true ;
139
- if (onlyReconcileForPossibleDelete ) {
140
- if (dependentResource instanceof Deleter ) {
141
- ((Deleter <P >) dependentResource ).delete (primary , context );
142
- }
143
- } else {
144
- dependentResource .reconcile (primary , context );
145
- if (dependentResourceNode .getReadyCondition ().isPresent ()
146
- && !dependentResourceNode .getReadyCondition ().get ()
147
- .isMet (dependentResource , primary , context )) {
148
- ready = false ;
149
- }
150
- }
151
158
159
+ dependentResource .reconcile (primary , context );
160
+ if (dependentResourceNode .getReadyCondition ().isPresent ()
161
+ && !dependentResourceNode .getReadyCondition ().get ()
162
+ .isMet (dependentResource , primary , context )) {
163
+ ready = false ;
164
+ }
152
165
if (ready ) {
153
166
log .debug ("Setting already reconciled for: {}" , dependentResourceNode );
154
167
alreadyReconciled .add (dependentResourceNode );
155
- handleDependentsReconcile (dependentResourceNode , onlyReconcileForPossibleDelete );
168
+ handleDependentsReconcile (dependentResourceNode );
156
169
} else {
157
170
setAlreadyReconciledButNotReady (dependentResourceNode );
158
171
}
@@ -164,16 +177,59 @@ public void run() {
164
177
}
165
178
}
166
179
180
+ private class NodeDeleteExecutor implements Runnable {
181
+
182
+ private final DependentResourceNode <?, P > dependentResourceNode ;
183
+
184
+ private NodeDeleteExecutor (DependentResourceNode <?, P > dependentResourceNode ) {
185
+ this .dependentResourceNode = dependentResourceNode ;
186
+ }
187
+
188
+ @ Override
189
+ @ SuppressWarnings ("unchecked" )
190
+ public void run () {
191
+ try {
192
+ DependentResource dependentResource = dependentResourceNode .getDependentResource ();
193
+ var deletePostCondition = dependentResourceNode .getDeletePostCondition ();
194
+
195
+ if (dependentResource instanceof Deleter
196
+ && !(dependentResource instanceof GarbageCollected )) {
197
+ ((Deleter <P >) dependentResourceNode .getDependentResource ()).delete (primary , context );
198
+ }
199
+ alreadyReconciled .add (dependentResourceNode );
200
+ boolean deletePostConditionMet =
201
+ deletePostCondition .map (c -> c .isMet (dependentResource , primary , context )).orElse (true );
202
+ if (deletePostConditionMet ) {
203
+ handleDependentDeleted (dependentResourceNode );
204
+ } else {
205
+ deleteConditionNotMet .add (dependentResourceNode );
206
+ }
207
+ } catch (RuntimeException e ) {
208
+ handleExceptionInExecutor (dependentResourceNode , e );
209
+ } finally {
210
+ handleNodeExecutionFinish (dependentResourceNode );
211
+ }
212
+ }
213
+ }
214
+
215
+ private synchronized void handleDependentDeleted (
216
+ DependentResourceNode <?, P > dependentResourceNode ) {
217
+ dependentResourceNode .getDependsOn ().forEach (dr -> {
218
+ log .debug ("Handle deleted for: {} with dependent: {}" , dr , dependentResourceNode );
219
+ handleDelete (dr );
220
+ });
221
+ }
222
+
167
223
private boolean isReconcilingNow (DependentResourceNode <?, ?> dependentResourceNode ) {
168
224
return actualExecutions .containsKey (dependentResourceNode );
169
225
}
170
226
171
227
private synchronized void handleDependentsReconcile (
172
- DependentResourceNode <?, P > dependentResourceNode , boolean onlyReconcileForPossibleDelete ) {
228
+ DependentResourceNode <?, P > dependentResourceNode ) {
173
229
var dependents = workflow .getDependents (dependentResourceNode );
174
230
dependents .forEach (d -> {
175
231
log .debug ("Handle reconcile for dependent: {} of parent:{}" , d , dependentResourceNode );
176
- handleReconcile (d , onlyReconcileForPossibleDelete );
232
+ handleReconcile (d );
177
233
});
178
234
}
179
235
@@ -187,12 +243,20 @@ private boolean alreadyReconciled(
187
243
}
188
244
189
245
190
- private void handleReconcileCondition (DependentResourceNode <?, ?> dependentResourceNode ,
191
- Condition reconcileCondition ) {
192
- boolean conditionMet =
193
- reconcileCondition .isMet (dependentResourceNode .getDependentResource (), primary , context );
194
- if (!conditionMet ) {
195
- ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
246
+ private void handleReconcileConditionNotMet (DependentResourceNode <?, ?> dependentResourceNode ) {
247
+ Set <DependentResourceNode > bottomNodes = new HashSet <>();
248
+ markDependentsForDelete (dependentResourceNode , bottomNodes );
249
+ bottomNodes .forEach (bn -> handleDelete (bn ));
250
+ }
251
+
252
+ private void markDependentsForDelete (DependentResourceNode <?, ?> dependentResourceNode ,
253
+ Set <DependentResourceNode > bottomNodes ) {
254
+ markedForDelete .add (dependentResourceNode );
255
+ var dependents = workflow .getDependents (dependentResourceNode );
256
+ if (dependents .isEmpty ()) {
257
+ bottomNodes .add (dependentResourceNode );
258
+ } else {
259
+ dependents .forEach (d -> markDependentsForDelete (d , bottomNodes ));
196
260
}
197
261
}
198
262
0 commit comments