Skip to content

Commit 5fc74d6

Browse files
committed
Implement auto fork joining
1 parent d3bb412 commit 5fc74d6

File tree

3 files changed

+80
-23
lines changed

3 files changed

+80
-23
lines changed

packages/toolkit/src/listenerMiddleware/index.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import type {
2323
TaskResult,
2424
AbortSignalWithReason,
2525
UnsubscribeListenerOptions,
26+
ForkOptions,
2627
} from './types'
2728
import {
2829
abortControllerWithReason,
@@ -78,13 +79,19 @@ const INTERNAL_NIL_TOKEN = {} as const
7879

7980
const alm = 'listenerMiddleware' as const
8081

81-
const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
82+
const createFork = (
83+
parentAbortSignal: AbortSignalWithReason<unknown>,
84+
parentBlockingPromises: Promise<any>[]
85+
) => {
8286
const linkControllers = (controller: AbortController) =>
8387
addAbortSignalListener(parentAbortSignal, () =>
8488
abortControllerWithReason(controller, parentAbortSignal.reason)
8589
)
8690

87-
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
91+
return <T>(
92+
taskExecutor: ForkedTaskExecutor<T>,
93+
opts?: ForkOptions
94+
): ForkedTask<T> => {
8895
assertFunction(taskExecutor, 'taskExecutor')
8996
const childAbortController = new AbortController()
9097

@@ -105,6 +112,10 @@ const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
105112
() => abortControllerWithReason(childAbortController, taskCompleted)
106113
)
107114

115+
if (opts?.autoJoin) {
116+
parentBlockingPromises.push(result)
117+
}
118+
108119
return {
109120
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
110121
cancel() {
@@ -376,6 +387,7 @@ export function createListenerMiddleware<
376387
startListening,
377388
internalTaskController.signal
378389
)
390+
const autoJoinPromises: Promise<any>[] = []
379391

380392
try {
381393
entry.pending.add(internalTaskController)
@@ -394,7 +406,7 @@ export function createListenerMiddleware<
394406
pause: createPause<any>(internalTaskController.signal),
395407
extra,
396408
signal: internalTaskController.signal,
397-
fork: createFork(internalTaskController.signal),
409+
fork: createFork(internalTaskController.signal, autoJoinPromises),
398410
unsubscribe: entry.unsubscribe,
399411
subscribe: () => {
400412
listenerMap.set(entry.id, entry)
@@ -417,6 +429,8 @@ export function createListenerMiddleware<
417429
})
418430
}
419431
} finally {
432+
await Promise.allSettled(autoJoinPromises)
433+
420434
abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed
421435
entry.pending.delete(internalTaskController)
422436
}

packages/toolkit/src/listenerMiddleware/tests/fork.test.ts

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
listenerCancelled,
1313
listenerCompleted,
1414
taskCancelled,
15+
taskCompleted,
1516
} from '../exceptions'
1617

1718
function delay(ms: number) {
@@ -349,28 +350,60 @@ describe('fork', () => {
349350
)
350351
})
351352

352-
test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
353-
let deferredResult = deferred()
353+
it.each([
354+
{
355+
autoJoin: true,
356+
expectedAbortReason: taskCompleted,
357+
cancelListener: false,
358+
},
359+
{
360+
autoJoin: false,
361+
expectedAbortReason: listenerCompleted,
362+
cancelListener: false,
363+
},
364+
{
365+
autoJoin: true,
366+
expectedAbortReason: listenerCancelled,
367+
cancelListener: true,
368+
},
369+
{
370+
autoJoin: false,
371+
expectedAbortReason: listenerCancelled,
372+
cancelListener: true,
373+
},
374+
])(
375+
'signal is $expectedAbortReason when autoJoin: $autoJoin, cancelListener: $cancelListener',
376+
async ({ autoJoin, cancelListener, expectedAbortReason }) => {
377+
let deferredResult = deferred()
378+
379+
const unsubscribe = startListening({
380+
actionCreator: increment,
381+
async effect(_, listenerApi) {
382+
listenerApi.fork(
383+
async (forkApi) => {
384+
forkApi.signal.addEventListener('abort', () => {
385+
deferredResult.resolve(
386+
(forkApi.signal as AbortSignalWithReason<unknown>).reason
387+
)
388+
})
389+
390+
await forkApi.delay(10)
391+
},
392+
{ autoJoin }
393+
)
394+
},
395+
})
354396

355-
startListening({
356-
actionCreator: increment,
357-
async effect(_, listenerApi) {
358-
const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
359-
async (forkApi) => {
360-
forkApi.signal.addEventListener('abort', () => {
361-
deferredResult.resolve(
362-
(forkApi.signal as AbortSignalWithReason<unknown>).reason
363-
)
364-
})
365-
}
366-
)
367-
},
368-
})
397+
store.dispatch(increment())
369398

370-
store.dispatch(increment())
399+
// let task start
400+
await Promise.resolve()
371401

372-
expect(await deferredResult).toBe(listenerCompleted)
373-
})
402+
if (cancelListener) unsubscribe({ cancelActive: true })
403+
404+
expect(await deferredResult).toBe(expectedAbortReason)
405+
}
406+
)
374407

375408
test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
376409
let deferredCompletedEvt = deferred()

packages/toolkit/src/listenerMiddleware/types.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ export interface ForkedTask<T> {
131131
cancel(): void
132132
}
133133

134+
/** @public */
135+
export interface ForkOptions {
136+
/**
137+
* If true, causes the parent task to not be marked as complete until
138+
* all autoJoined forks have completed or failed.
139+
*/
140+
autoJoin: boolean;
141+
}
142+
134143
/** @public */
135144
export interface ListenerEffectAPI<
136145
State,
@@ -238,8 +247,9 @@ export interface ListenerEffectAPI<
238247
/**
239248
* Queues in the next microtask the execution of a task.
240249
* @param executor
250+
* @param options
241251
*/
242-
fork<T>(executor: ForkedTaskExecutor<T>): ForkedTask<T>
252+
fork<T>(executor: ForkedTaskExecutor<T>, options?: ForkOptions): ForkedTask<T>
243253
/**
244254
* Returns a promise that resolves when `waitFor` resolves or
245255
* rejects if the listener has been cancelled or is completed.

0 commit comments

Comments
 (0)