diff --git a/build.gradle b/build.gradle
index 6dea10e..edc81c7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,7 +30,6 @@ def getDevelopmentVersion() {
version
}
-
def releaseVersion = System.env.RELEASE_VERSION
version = releaseVersion ? releaseVersion : getDevelopmentVersion()
group = 'com.graphql-java'
@@ -76,6 +75,7 @@ dependencies {
testImplementation 'org.awaitility:awaitility:2.0.0'
testImplementation 'io.projectreactor:reactor-core:3.6.6'
testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0'
+ testImplementation 'io.projectreactor:reactor-core:3.6.6'
}
task sourcesJar(type: Jar) {
diff --git a/src/main/java/org/dataloader/BatchPublisher.java b/src/main/java/org/dataloader/BatchPublisher.java
new file mode 100644
index 0000000..5ab41e1
--- /dev/null
+++ b/src/main/java/org/dataloader/BatchPublisher.java
@@ -0,0 +1,22 @@
+package org.dataloader;
+
+import org.reactivestreams.Subscriber;
+
+import java.util.List;
+
+/**
+ * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
+ *
+ * The function will call the provided {@link Subscriber} to process the values it has retrieved to allow
+ * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
+ * (rather than when all values have been retrieved).
+ *
+ * NOTE: It is required that {@link Subscriber#onNext(V)} is invoked on each value in the same order as
+ * the provided keys.
+ *
+ * @param type parameter indicating the type of keys to use for data load requests.
+ * @param type parameter indicating the type of values returned
+ */
+public interface BatchPublisher {
+ void load(List keys, Subscriber subscriber);
+}
diff --git a/src/main/java/org/dataloader/BatchPublisherWithContext.java b/src/main/java/org/dataloader/BatchPublisherWithContext.java
new file mode 100644
index 0000000..effda90
--- /dev/null
+++ b/src/main/java/org/dataloader/BatchPublisherWithContext.java
@@ -0,0 +1,12 @@
+package org.dataloader;
+
+import org.reactivestreams.Subscriber;
+
+import java.util.List;
+
+/**
+ * An {@link BatchPublisher} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}.
+ */
+public interface BatchPublisherWithContext {
+ void load(List keys, Subscriber subscriber, BatchLoaderEnvironment environment);
+}
diff --git a/src/main/java/org/dataloader/DataLoaderFactory.java b/src/main/java/org/dataloader/DataLoaderFactory.java
index 013f473..db14f2e 100644
--- a/src/main/java/org/dataloader/DataLoaderFactory.java
+++ b/src/main/java/org/dataloader/DataLoaderFactory.java
@@ -278,6 +278,274 @@ public static DataLoader newMappedDataLoaderWithTry(MappedBatchLoad
return mkDataLoader(batchLoadFunction, options);
}
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size).
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction) {
+ return newPublisherDataLoader(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function with the provided options
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size) where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then
+ * you can use this form to create the data loader.
+ *
+ * Using Try objects allows you to capture a value returned or an exception that might
+ * have occurred trying to get a value. .
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction) {
+ return newPublisherDataLoaderWithTry(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and with the provided options
+ * where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ *
+ * @see #newDataLoaderWithTry(BatchLoader)
+ */
+ public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size).
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction) {
+ return newPublisherDataLoader(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function with the provided options
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size) where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then
+ * you can use this form to create the data loader.
+ *
+ * Using Try objects allows you to capture a value returned or an exception that might
+ * have occurred trying to get a value. .
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction) {
+ return newPublisherDataLoaderWithTry(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and with the provided options
+ * where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ *
+ * @see #newPublisherDataLoaderWithTry(BatchPublisher)
+ */
+ public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size).
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction) {
+ return newMappedPublisherDataLoader(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function with the provided options
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size) where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then
+ * you can use this form to create the data loader.
+ *
+ * Using Try objects allows you to capture a value returned or an exception that might
+ * have occurred trying to get a value. .
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction) {
+ return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and with the provided options
+ * where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ *
+ * @see #newDataLoaderWithTry(BatchLoader)
+ */
+ public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size).
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction) {
+ return newMappedPublisherDataLoader(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function with the provided options
+ *
+ * @param batchLoadFunction the batch load function to use
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and default options
+ * (batching, caching and unlimited batch size) where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then
+ * you can use this form to create the data loader.
+ *
+ * Using Try objects allows you to capture a value returned or an exception that might
+ * have occurred trying to get a value. .
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ */
+ public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction) {
+ return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null);
+ }
+
+ /**
+ * Creates new DataLoader with the specified batch loader function and with the provided options
+ * where the batch loader function returns a list of
+ * {@link org.dataloader.Try} objects.
+ *
+ * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects
+ * @param options the options to use
+ * @param the key type
+ * @param the value type
+ *
+ * @return a new DataLoader
+ *
+ * @see #newMappedPublisherDataLoaderWithTry(MappedBatchPublisher)
+ */
+ public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) {
+ return mkDataLoader(batchLoadFunction, options);
+ }
+
static DataLoader mkDataLoader(Object batchLoadFunction, DataLoaderOptions options) {
return new DataLoader<>(batchLoadFunction, options);
}
diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java
index d934de2..ee8d78b 100644
--- a/src/main/java/org/dataloader/DataLoaderHelper.java
+++ b/src/main/java/org/dataloader/DataLoaderHelper.java
@@ -10,11 +10,14 @@
import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext;
import org.dataloader.stats.context.IncrementLoadCountStatisticsContext;
import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -241,10 +244,14 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List<
@SuppressWarnings("unchecked")
private CompletableFuture> dispatchQueueBatch(List keys, List