Skip to content

Commit 2e82858

Browse files
Add README sections for *BatchPublisher
1 parent 3c3cc99 commit 2e82858

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

README.md

+66
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob
286286
// ...
287287
```
288288

289+
### Returning a stream of results from your batch publisher
290+
291+
It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.
292+
293+
For example, let's say you wanted to load many users from a service without forcing the service to load all
294+
users into its memory (which may exert considerable pressure on it).
295+
296+
A `org.dataloader.BatchPublisher` may be used to load this data:
297+
298+
```java
299+
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
300+
@Override
301+
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
302+
userManager.publishUsersById(userIds, userSubscriber);
303+
}
304+
};
305+
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
306+
307+
// ...
308+
```
309+
310+
Rather than waiting for all values to be returned, this `DataLoader` will complete
311+
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
312+
processed.
313+
314+
If an exception is thrown, the remaining futures yet to be completed are completed
315+
exceptionally.
316+
317+
You *MUST* ensure that the values are streamed in the same order as the keys provided,
318+
with the same cardinality (i.e. the number of values must match the number of keys).
319+
Failing to do so will result in incorrect data being returned from `DataLoader#load`.
320+
321+
322+
### Returning a mapped stream of results from your batch publisher
323+
324+
Your publisher may not necessarily return values in the same order in which it processes keys.
325+
326+
For example, let's say your batch publisher function loads user data which is spread across shards,
327+
with some shards responding more quickly than others.
328+
329+
In instances like these, `org.dataloader.MappedBatchPublisher` can be used.
330+
331+
```java
332+
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
333+
@Override
334+
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
335+
userManager.publishUsersById(userIds, userEntrySubscriber);
336+
}
337+
};
338+
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
339+
340+
// ...
341+
```
342+
343+
Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed
344+
exceptionally.
345+
346+
Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
347+
or even the same number of values.
348+
289349
### Error object is not a thing in a type safe Java world
290350

291351
In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected
@@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo
541601
return scheduledCall.invoke();
542602
}).thenCompose(Function.identity());
543603
}
604+
605+
@Override
606+
public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
607+
snooze(10);
608+
scheduledCall.invoke();
609+
}
544610
};
545611
```
546612

src/test/java/ReadmeExamples.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import org.dataloader.BatchLoader;
22
import org.dataloader.BatchLoaderEnvironment;
33
import org.dataloader.BatchLoaderWithContext;
4+
import org.dataloader.BatchPublisher;
45
import org.dataloader.CacheMap;
56
import org.dataloader.DataLoader;
67
import org.dataloader.DataLoaderFactory;
78
import org.dataloader.DataLoaderOptions;
89
import org.dataloader.MappedBatchLoaderWithContext;
10+
import org.dataloader.MappedBatchPublisher;
911
import org.dataloader.Try;
1012
import org.dataloader.fixtures.SecurityCtx;
1113
import org.dataloader.fixtures.User;
@@ -15,6 +17,7 @@
1517
import org.dataloader.scheduler.BatchLoaderScheduler;
1618
import org.dataloader.stats.Statistics;
1719
import org.dataloader.stats.ThreadLocalStatisticsCollector;
20+
import org.reactivestreams.Subscriber;
1821

1922
import java.time.Duration;
2023
import java.util.ArrayList;
@@ -171,7 +174,7 @@ private void tryExample() {
171174
}
172175
}
173176

174-
private void tryBatcLoader() {
177+
private void tryBatchLoader() {
175178
DataLoader<String, User> dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader<String, Try<User>>() {
176179
@Override
177180
public CompletionStage<List<Try<User>>> load(List<String> keys) {
@@ -187,6 +190,26 @@ public CompletionStage<List<Try<User>>> load(List<String> keys) {
187190
});
188191
}
189192

193+
private void batchPublisher() {
194+
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
195+
@Override
196+
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
197+
userManager.publishUsersById(userIds, userSubscriber);
198+
}
199+
};
200+
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
201+
}
202+
203+
private void mappedBatchPublisher() {
204+
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
205+
@Override
206+
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
207+
userManager.publishUsersById(userIds, userEntrySubscriber);
208+
}
209+
};
210+
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
211+
}
212+
190213
DataLoader<String, User> userDataLoader;
191214

192215
private void clearCacheOnError() {

0 commit comments

Comments
 (0)