From 74567fe99051ea6db7329bbe49683424cebcd3ed Mon Sep 17 00:00:00 2001 From: bbaker Date: Tue, 21 May 2024 10:34:56 +1000 Subject: [PATCH 1/5] Making the Subscribers use a common base class --- .../java/org/dataloader/DataLoaderHelper.java | 167 +++++++++--------- 1 file changed, 88 insertions(+), 79 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index ee8d78b..3e4bf6e 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -618,24 +618,23 @@ private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } - private class DataLoaderSubscriber implements Subscriber { + private abstract class DataLoaderSubscriberBase implements Subscriber { - private final CompletableFuture> valuesFuture; - private final List keys; - private final List callContexts; - private final List> queuedFutures; + final CompletableFuture> valuesFuture; + final List keys; + final List callContexts; + final List> queuedFutures; - private final List clearCacheKeys = new ArrayList<>(); - private final List completedValues = new ArrayList<>(); - private int idx = 0; - private boolean onErrorCalled = false; - private boolean onCompleteCalled = false; + List clearCacheKeys = new ArrayList<>(); + List completedValues = new ArrayList<>(); + boolean onErrorCalled = false; + boolean onCompleteCalled = false; - private DataLoaderSubscriber( - CompletableFuture> valuesFuture, - List keys, - List callContexts, - List> queuedFutures + DataLoaderSubscriberBase( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures ) { this.valuesFuture = valuesFuture; this.keys = keys; @@ -648,40 +647,87 @@ public void onSubscribe(Subscription subscription) { subscription.request(keys.size()); } - // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee - // correctness (at the cost of speed). @Override - public synchronized void onNext(V value) { + public void onNext(T v) { assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); + } - K key = keys.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); + @Override + public void onComplete() { + assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); + onCompleteCalled = true; + } + + @Override + public void onError(Throwable throwable) { + assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); + onErrorCalled = true; + + stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + } + + /* + * A value has arrived - how do we complete the future that's associated with it in a common way + */ + void onNextValue(K key, V value, Object callContext, CompletableFuture future) { if (value instanceof Try) { // we allow the batch loader to return a Try so we can better represent a computation // that might have worked or not. + //noinspection unchecked Try tryValue = (Try) value; if (tryValue.isSuccess()) { future.complete(tryValue.get()); } else { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); future.completeExceptionally(tryValue.getThrowable()); - clearCacheKeys.add(keys.get(idx)); + clearCacheKeys.add(key); } } else { future.complete(value); } + } + + Throwable unwrapThrowable(Throwable ex) { + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + return ex; + } + } + + private class DataLoaderSubscriber extends DataLoaderSubscriberBase { + + private int idx = 0; + + private DataLoaderSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures + ) { + super(valuesFuture, keys, callContexts, queuedFutures); + } + + // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // correctness (at the cost of speed). + @Override + public synchronized void onNext(V value) { + super.onNext(value); + + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, future); completedValues.add(value); idx++; } + @Override public void onComplete() { - assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); - onCompleteCalled = true; - + super.onComplete(); assertResultSize(keys, completedValues); possiblyClearCacheEntriesOnExceptions(clearCacheKeys); @@ -690,13 +736,8 @@ public void onComplete() { @Override public void onError(Throwable ex) { - assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); - onErrorCalled = true; - - stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); - if (ex instanceof CompletionException) { - ex = ex.getCause(); - } + super.onError(ex); + ex = unwrapThrowable(ex); // Set the remaining keys to the exception. for (int i = idx; i < queuedFutures.size(); i++) { K key = keys.get(i); @@ -706,32 +747,23 @@ public void onError(Throwable ex) { dataLoader.clear(key); } } + } - private class DataLoaderMapEntrySubscriber implements Subscriber> { - private final CompletableFuture> valuesFuture; - private final List keys; - private final List callContexts; - private final List> queuedFutures; + private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase> { + private final Map callContextByKey; private final Map> queuedFutureByKey; - - private final List clearCacheKeys = new ArrayList<>(); private final Map completedValuesByKey = new HashMap<>(); - private boolean onErrorCalled = false; - private boolean onCompleteCalled = false; + private DataLoaderMapEntrySubscriber( - CompletableFuture> valuesFuture, - List keys, - List callContexts, - List> queuedFutures + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures ) { - this.valuesFuture = valuesFuture; - this.keys = keys; - this.callContexts = callContexts; - this.queuedFutures = queuedFutures; - + super(valuesFuture,keys,callContexts,queuedFutures); this.callContextByKey = new HashMap<>(); this.queuedFutureByKey = new HashMap<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { @@ -743,42 +775,24 @@ private DataLoaderMapEntrySubscriber( } } - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(keys.size()); - } @Override public void onNext(Map.Entry entry) { - assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); - assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); + super.onNext(entry); K key = entry.getKey(); V value = entry.getValue(); Object callContext = callContextByKey.get(key); CompletableFuture future = queuedFutureByKey.get(key); - if (value instanceof Try) { - // we allow the batch loader to return a Try so we can better represent a computation - // that might have worked or not. - Try tryValue = (Try) value; - if (tryValue.isSuccess()) { - future.complete(tryValue.get()); - } else { - stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - future.completeExceptionally(tryValue.getThrowable()); - clearCacheKeys.add(key); - } - } else { - future.complete(value); - } + + onNextValue(key, value, callContext, future); completedValuesByKey.put(key, value); } @Override public void onComplete() { - assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); - onCompleteCalled = true; + super.onComplete(); possiblyClearCacheEntriesOnExceptions(clearCacheKeys); List values = new ArrayList<>(keys.size()); @@ -791,13 +805,8 @@ public void onComplete() { @Override public void onError(Throwable ex) { - assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); - onErrorCalled = true; - - stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); - if (ex instanceof CompletionException) { - ex = ex.getCause(); - } + super.onError(ex); + ex = unwrapThrowable(ex); // Complete the futures for the remaining keys with the exception. for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); From 4396624894af67de8462458133c81d4827283bb4 Mon Sep 17 00:00:00 2001 From: bbaker Date: Tue, 21 May 2024 10:38:24 +1000 Subject: [PATCH 2/5] Making the Subscribers use a common base class- synchronized on each method --- src/main/java/org/dataloader/DataLoaderHelper.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 3e4bf6e..f88890c 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -726,7 +726,7 @@ public synchronized void onNext(V value) { @Override - public void onComplete() { + public synchronized void onComplete() { super.onComplete(); assertResultSize(keys, completedValues); @@ -735,7 +735,7 @@ public void onComplete() { } @Override - public void onError(Throwable ex) { + public synchronized void onError(Throwable ex) { super.onError(ex); ex = unwrapThrowable(ex); // Set the remaining keys to the exception. @@ -777,7 +777,7 @@ private DataLoaderMapEntrySubscriber( @Override - public void onNext(Map.Entry entry) { + public synchronized void onNext(Map.Entry entry) { super.onNext(entry); K key = entry.getKey(); V value = entry.getValue(); @@ -791,7 +791,7 @@ public void onNext(Map.Entry entry) { } @Override - public void onComplete() { + public synchronized void onComplete() { super.onComplete(); possiblyClearCacheEntriesOnExceptions(clearCacheKeys); @@ -804,7 +804,7 @@ public void onComplete() { } @Override - public void onError(Throwable ex) { + public synchronized void onError(Throwable ex) { super.onError(ex); ex = unwrapThrowable(ex); // Complete the futures for the remaining keys with the exception. From 8a6448363e67ee1262bce947775c34aeeff7735a Mon Sep 17 00:00:00 2001 From: bbaker Date: Tue, 21 May 2024 13:27:54 +1000 Subject: [PATCH 3/5] Making the Subscribers use a common base class- now with failing test case --- .../java/org/dataloader/DataLoaderBatchPublisherTest.java | 6 ++++-- .../org/dataloader/DataLoaderMappedBatchPublisherTest.java | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java b/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java index e14d9f7..607c1e6 100644 --- a/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java +++ b/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java @@ -385,18 +385,20 @@ public void should_Resolve_to_error_to_indicate_failure() throws ExecutionExcept DataLoader evenLoader = idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = evenLoader.load(1); - evenLoader.dispatch(); + CompletableFuture> dispatchCF = evenLoader.dispatch(); await().until(future1::isDone); assertThat(future1.isCompletedExceptionally(), is(true)); assertThat(cause(future1), instanceOf(IllegalStateException.class)); + assertThat(dispatchCF.isCompletedExceptionally(), is(true)); CompletableFuture future2 = evenLoader.load(2); - evenLoader.dispatch(); + dispatchCF = evenLoader.dispatch(); await().until(future2::isDone); assertThat(future2.get(), equalTo(2)); assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(2)))); + assertThat(dispatchCF.isCompletedExceptionally(), is(true)); } // Accept any kind of key. diff --git a/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java b/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java index 5b9ca0b..aa9ee7b 100644 --- a/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java +++ b/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java @@ -116,7 +116,7 @@ public void should_Propagate_error_to_all_loads() { CompletableFuture future1 = errorLoader.load(1); CompletableFuture future2 = errorLoader.load(2); - errorLoader.dispatch(); + CompletableFuture> dispatchedCF = errorLoader.dispatch(); await().until(future1::isDone); @@ -132,6 +132,8 @@ public void should_Propagate_error_to_all_loads() { assertThat(cause.getMessage(), equalTo(cause.getMessage())); assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); + + assertThat(dispatchedCF.isCompletedExceptionally(),equalTo(true)); } @Test From 3e8ac9cf5a1123304329fb08dd57811f181669ca Mon Sep 17 00:00:00 2001 From: bbaker Date: Tue, 21 May 2024 16:02:27 +1000 Subject: [PATCH 4/5] Making the Subscribers use a common base class- fail the overall CF onError --- src/main/java/org/dataloader/DataLoaderHelper.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index f88890c..7b3a423 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -746,6 +746,7 @@ public synchronized void onError(Throwable ex) { // clear any cached view of this key because they all failed dataLoader.clear(key); } + valuesFuture.completeExceptionally(ex); } } @@ -763,7 +764,7 @@ private DataLoaderMapEntrySubscriber( List callContexts, List> queuedFutures ) { - super(valuesFuture,keys,callContexts,queuedFutures); + super(valuesFuture, keys, callContexts, queuedFutures); this.callContextByKey = new HashMap<>(); this.queuedFutureByKey = new HashMap<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { @@ -817,6 +818,8 @@ public synchronized void onError(Throwable ex) { dataLoader.clear(key); } } + valuesFuture.completeExceptionally(ex); } + } } From 6d3c4eb817a3d1f6b35047a9f9ff48c6f6381168 Mon Sep 17 00:00:00 2001 From: bbaker Date: Wed, 22 May 2024 10:55:21 +1000 Subject: [PATCH 5/5] Making the Subscribers use a common base class - merged in main branch --- .../java/org/dataloader/DataLoaderHelper.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 7b3a423..d01b930 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -155,11 +155,13 @@ CompletableFuture load(K key, Object loadContext) { } } + @SuppressWarnings("unchecked") Object getCacheKey(K key) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKey(key) : key; } + @SuppressWarnings("unchecked") Object getCacheKeyWithContext(K key, Object context) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; @@ -511,6 +513,7 @@ private CompletableFuture> invokeBatchPublisher(List keys, List loadFunction = (BatchPublisherWithContext) batchLoadFunction; if (batchLoaderScheduler != null) { BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); @@ -519,6 +522,7 @@ private CompletableFuture> invokeBatchPublisher(List keys, List loadFunction = (BatchPublisher) batchLoadFunction; if (batchLoaderScheduler != null) { BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber); @@ -536,6 +540,7 @@ private CompletableFuture> invokeMappedBatchPublisher(List keys, List BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { + //noinspection unchecked MappedBatchPublisherWithContext loadFunction = (MappedBatchPublisherWithContext) batchLoadFunction; if (batchLoaderScheduler != null) { BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); @@ -544,6 +549,7 @@ private CompletableFuture> invokeMappedBatchPublisher(List keys, List loadFunction.load(keys, subscriber, environment); } } else { + //noinspection unchecked MappedBatchPublisher loadFunction = (MappedBatchPublisher) batchLoadFunction; if (batchLoaderScheduler != null) { BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber); @@ -670,21 +676,21 @@ public void onError(Throwable throwable) { /* * A value has arrived - how do we complete the future that's associated with it in a common way */ - void onNextValue(K key, V value, Object callContext, CompletableFuture future) { + void onNextValue(K key, V value, Object callContext, List> futures) { if (value instanceof Try) { // we allow the batch loader to return a Try so we can better represent a computation // that might have worked or not. //noinspection unchecked Try tryValue = (Try) value; if (tryValue.isSuccess()) { - future.complete(tryValue.get()); + futures.forEach(f -> f.complete(tryValue.get())); } else { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - future.completeExceptionally(tryValue.getThrowable()); + futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); clearCacheKeys.add(key); } } else { - future.complete(value); + futures.forEach(f -> f.complete(value)); } } @@ -718,7 +724,7 @@ public synchronized void onNext(V value) { K key = keys.get(idx); Object callContext = callContexts.get(idx); CompletableFuture future = queuedFutures.get(idx); - onNextValue(key, value, callContext, future); + onNextValue(key, value, callContext, List.of(future)); completedValues.add(value); idx++; @@ -754,7 +760,7 @@ public synchronized void onError(Throwable ex) { private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase> { private final Map callContextByKey; - private final Map> queuedFutureByKey; + private final Map>> queuedFuturesByKey; private final Map completedValuesByKey = new HashMap<>(); @@ -766,13 +772,13 @@ private DataLoaderMapEntrySubscriber( ) { super(valuesFuture, keys, callContexts, queuedFutures); this.callContextByKey = new HashMap<>(); - this.queuedFutureByKey = new HashMap<>(); + this.queuedFuturesByKey = new HashMap<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); Object callContext = callContexts.get(idx); CompletableFuture queuedFuture = queuedFutures.get(idx); callContextByKey.put(key, callContext); - queuedFutureByKey.put(key, queuedFuture); + queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); } } @@ -784,9 +790,9 @@ public synchronized void onNext(Map.Entry entry) { V value = entry.getValue(); Object callContext = callContextByKey.get(key); - CompletableFuture future = queuedFutureByKey.get(key); + List> futures = queuedFuturesByKey.get(key); - onNextValue(key, value, callContext, future); + onNextValue(key, value, callContext, futures); completedValuesByKey.put(key, value); } @@ -811,15 +817,16 @@ public synchronized void onError(Throwable ex) { // Complete the futures for the remaining keys with the exception. for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); - CompletableFuture future = queuedFutureByKey.get(key); + List> futures = queuedFuturesByKey.get(key); if (!completedValuesByKey.containsKey(key)) { - future.completeExceptionally(ex); + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } // clear any cached view of this key because they all failed dataLoader.clear(key); } } valuesFuture.completeExceptionally(ex); } - } }