Skip to content

Commit bea2c50

Browse files
committed
Merge remote-tracking branch 'origin/master' into ticker_mode_on_scheduled_registry
# Conflicts: # src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
2 parents 5ee388c + 442edf4 commit bea2c50

File tree

6 files changed

+412
-25
lines changed

6 files changed

+412
-25
lines changed

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.function.Function;
1515

1616
/**
17-
* This allows data loaders to be registered together into a single place so
17+
* This allows data loaders to be registered together into a single place, so
1818
* they can be dispatched as one. It also allows you to retrieve data loaders by
1919
* name from a central place
2020
*/

src/main/java/org/dataloader/annotations/GuardedBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
public @interface GuardedBy {
1616

1717
/**
18-
* The lock that should be held.
18+
* @return The lock that should be held.
1919
*/
2020
String value();
2121
}

src/main/java/org/dataloader/registries/DispatchPredicate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@
1010
*/
1111
@FunctionalInterface
1212
public interface DispatchPredicate {
13+
14+
/**
15+
* A predicate that always returns true
16+
*/
17+
DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
18+
/**
19+
* A predicate that always returns false
20+
*/
21+
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;
22+
1323
/**
1424
* This predicate tests whether the data loader should be dispatched or not.
1525
*

src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java

Lines changed: 147 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,25 @@
55
import org.dataloader.annotations.ExperimentalApi;
66

77
import java.time.Duration;
8-
import java.util.HashMap;
8+
import java.util.LinkedHashMap;
99
import java.util.Map;
10+
import java.util.concurrent.ConcurrentHashMap;
1011
import java.util.concurrent.Executors;
1112
import java.util.concurrent.ScheduledExecutorService;
1213
import java.util.concurrent.TimeUnit;
1314

1415
import static org.dataloader.impl.Assertions.nonNull;
1516

1617
/**
17-
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18-
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false,
19-
* then a task is scheduled to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
18+
* This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
19+
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
20+
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
21+
* <p>
22+
* It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
23+
* whole {@link ScheduledDataLoaderRegistry}.
24+
* <p>
25+
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
26+
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
2027
* <p>
2128
* In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
2229
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
@@ -35,8 +42,7 @@
3542
* <p>
3643
* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job
3744
* on the {@link ScheduledExecutorService} that is continuously dispatching.
38-
* <p>
39-
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
45+
* <p> * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
4046
* call {@link #rescheduleNow()}.
4147
* <p>
4248
* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you
@@ -48,19 +54,22 @@
4854
@ExperimentalApi
4955
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
5056

51-
private final ScheduledExecutorService scheduledExecutorService;
57+
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
5258
private final DispatchPredicate dispatchPredicate;
59+
private final ScheduledExecutorService scheduledExecutorService;
5360
private final Duration schedule;
5461
private final boolean tickerMode;
5562
private volatile boolean closed;
5663

5764
private ScheduledDataLoaderRegistry(Builder builder) {
65+
super();
5866
this.dataLoaders.putAll(builder.dataLoaders);
5967
this.scheduledExecutorService = builder.scheduledExecutorService;
60-
this.dispatchPredicate = builder.dispatchPredicate;
6168
this.schedule = builder.schedule;
6269
this.tickerMode = builder.tickerMode;
6370
this.closed = false;
71+
this.dispatchPredicate = builder.dispatchPredicate;
72+
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
6473
}
6574

6675
/**
@@ -85,6 +94,88 @@ public boolean isTickerMode() {
8594
return tickerMode;
8695
}
8796

97+
/**
98+
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
99+
* and return a new combined registry
100+
*
101+
* @param registry the registry to combine into this registry
102+
*
103+
* @return a new combined registry
104+
*/
105+
public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
106+
Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry()
107+
.dispatchPredicate(this.dispatchPredicate);
108+
combinedBuilder.registerAll(this);
109+
combinedBuilder.registerAll(registry);
110+
return combinedBuilder.build();
111+
}
112+
113+
114+
/**
115+
* This will unregister a new dataloader
116+
*
117+
* @param key the key of the data loader to unregister
118+
*
119+
* @return this registry
120+
*/
121+
public ScheduledDataLoaderRegistry unregister(String key) {
122+
DataLoader<?, ?> dataLoader = dataLoaders.remove(key);
123+
if (dataLoader != null) {
124+
dataLoaderPredicates.remove(dataLoader);
125+
}
126+
return this;
127+
}
128+
129+
/**
130+
* @return a map of data loaders to specific dispatch predicates
131+
*/
132+
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() {
133+
return new LinkedHashMap<>(dataLoaderPredicates);
134+
}
135+
136+
/**
137+
* There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
138+
*
139+
* @return the default dispatch predicate
140+
*/
141+
public DispatchPredicate getDispatchPredicate() {
142+
return dispatchPredicate;
143+
}
144+
145+
/**
146+
* This will register a new dataloader and dispatch predicate associated with that data loader
147+
*
148+
* @param key the key to put the data loader under
149+
* @param dataLoader the data loader to register
150+
* @param dispatchPredicate the dispatch predicate to associate with this data loader
151+
*
152+
* @return this registry
153+
*/
154+
public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
155+
dataLoaders.put(key, dataLoader);
156+
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
157+
return this;
158+
}
159+
160+
/**
161+
* Returns true if the dataloader has a predicate which returned true, OR the overall
162+
* registry predicate returned true.
163+
*
164+
* @param dataLoaderKey the key in the dataloader map
165+
* @param dataLoader the dataloader
166+
*
167+
* @return true if it should dispatch
168+
*/
169+
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
170+
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
171+
if (dispatchPredicate != null) {
172+
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
173+
return true;
174+
}
175+
}
176+
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
177+
}
178+
88179
@Override
89180
public void dispatchAll() {
90181
dispatchAllWithCount();
@@ -101,24 +192,28 @@ public int dispatchAllWithCount() {
101192
return sum;
102193
}
103194

195+
104196
/**
105197
* This will immediately dispatch the {@link DataLoader}s in the registry
106-
* without testing the predicate
198+
* without testing the predicates
107199
*/
108200
public void dispatchAllImmediately() {
109-
super.dispatchAll();
201+
dispatchAllWithCountImmediately();
110202
}
111203

112204
/**
113205
* This will immediately dispatch the {@link DataLoader}s in the registry
114-
* without testing the predicate
206+
* without testing the predicates
115207
*
116208
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
117209
*/
118210
public int dispatchAllWithCountImmediately() {
119-
return super.dispatchAllWithCount();
211+
return dataLoaders.values().stream()
212+
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
213+
.sum();
120214
}
121215

216+
122217
/**
123218
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
124219
* a pre check of the preodicate like {@link #dispatchAll()} would
@@ -136,7 +231,7 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
136231

137232
private int dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
138233
int sum = 0;
139-
boolean shouldDispatch = dispatchPredicate.test(key, dataLoader);
234+
boolean shouldDispatch = shouldDispatch(key, dataLoader);
140235
if (shouldDispatch) {
141236
sum = dataLoader.dispatchWithCounts().getKeysCount();
142237
}
@@ -147,8 +242,8 @@ private int dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
147242
}
148243

149244
/**
150-
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
151-
* and a schedule duration of 10 milli seconds.
245+
* By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
246+
* and a schedule duration of 10 milliseconds.
152247
*
153248
* @return A builder of {@link ScheduledDataLoaderRegistry}s
154249
*/
@@ -158,10 +253,11 @@ public static Builder newScheduledRegistry() {
158253

159254
public static class Builder {
160255

256+
private final Map<String, DataLoader<?, ?>> dataLoaders = new LinkedHashMap<>();
257+
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>();
258+
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
161259
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
162-
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
163260
private Duration schedule = Duration.ofMillis(10);
164-
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
165261
private boolean tickerMode = false;
166262

167263
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
@@ -174,11 +270,6 @@ public Builder schedule(Duration schedule) {
174270
return this;
175271
}
176272

177-
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
178-
this.dispatchPredicate = nonNull(dispatchPredicate);
179-
return this;
180-
}
181-
182273
/**
183274
* This will register a new dataloader
184275
*
@@ -192,8 +283,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
192283
return this;
193284
}
194285

286+
287+
/**
288+
* This will register a new dataloader with a specific {@link DispatchPredicate}
289+
*
290+
* @param key the key to put the data loader under
291+
* @param dataLoader the data loader to register
292+
* @param dispatchPredicate the dispatch predicate
293+
*
294+
* @return this builder for a fluent pattern
295+
*/
296+
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
297+
register(key, dataLoader);
298+
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
299+
return this;
300+
}
301+
195302
/**
196-
* This will combine together the data loaders in this builder with the ones
303+
* This will combine the data loaders in this builder with the ones
197304
* from a previous {@link DataLoaderRegistry}
198305
*
199306
* @param otherRegistry the previous {@link DataLoaderRegistry}
@@ -202,6 +309,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
202309
*/
203310
public Builder registerAll(DataLoaderRegistry otherRegistry) {
204311
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
312+
if (otherRegistry instanceof ScheduledDataLoaderRegistry) {
313+
ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry;
314+
dataLoaderPredicates.putAll(other.dataLoaderPredicates);
315+
}
316+
return this;
317+
}
318+
319+
/**
320+
* This sets a default predicate on the {@link DataLoaderRegistry} that will control
321+
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
322+
*
323+
* @param dispatchPredicate the predicate
324+
*
325+
* @return this builder for a fluent pattern
326+
*/
327+
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
328+
this.dispatchPredicate = dispatchPredicate;
205329
return this;
206330
}
207331

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
import java.time.Duration;
1212
import java.util.ArrayList;
13+
import java.util.Arrays;
1314
import java.util.Collection;
15+
import java.util.LinkedHashSet;
1416
import java.util.HashMap;
1517
import java.util.List;
18+
import java.util.Set;
1619
import java.util.Map;
1720
import java.util.Set;
1821
import java.util.concurrent.CompletableFuture;
@@ -120,4 +123,12 @@ public static void snooze(long millis) {
120123
public static <T> List<T> sort(Collection<? extends T> collection) {
121124
return collection.stream().sorted().collect(toList());
122125
}
126+
127+
public static <T> Set<T> asSet(T... elements) {
128+
return new LinkedHashSet<>(Arrays.asList(elements));
129+
}
130+
131+
public static <T> Set<T> asSet(Collection<T> elements) {
132+
return new LinkedHashSet<>(elements);
133+
}
123134
}

0 commit comments

Comments
 (0)