Skip to content

Commit 04e948b

Browse files
authored
execute: move publishing code into separate file (#3903)
1 parent d22d32d commit 04e948b

File tree

5 files changed

+429
-343
lines changed

5 files changed

+429
-343
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import type { ObjMap } from '../jsutils/ObjMap.js';
2+
import type { Path } from '../jsutils/Path.js';
3+
import { pathToArray } from '../jsutils/Path.js';
4+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
5+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
6+
7+
import type {
8+
GraphQLError,
9+
GraphQLFormattedError,
10+
} from '../error/GraphQLError.js';
11+
12+
export interface SubsequentIncrementalExecutionResult<
13+
TData = ObjMap<unknown>,
14+
TExtensions = ObjMap<unknown>,
15+
> {
16+
hasNext: boolean;
17+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
18+
extensions?: TExtensions;
19+
}
20+
21+
export interface FormattedSubsequentIncrementalExecutionResult<
22+
TData = ObjMap<unknown>,
23+
TExtensions = ObjMap<unknown>,
24+
> {
25+
hasNext: boolean;
26+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
27+
extensions?: TExtensions;
28+
}
29+
30+
export interface IncrementalDeferResult<
31+
TData = ObjMap<unknown>,
32+
TExtensions = ObjMap<unknown>,
33+
> {
34+
errors?: ReadonlyArray<GraphQLError>;
35+
data?: TData | null;
36+
path?: ReadonlyArray<string | number>;
37+
label?: string;
38+
extensions?: TExtensions;
39+
}
40+
41+
export interface FormattedIncrementalDeferResult<
42+
TData = ObjMap<unknown>,
43+
TExtensions = ObjMap<unknown>,
44+
> {
45+
errors?: ReadonlyArray<GraphQLFormattedError>;
46+
data?: TData | null;
47+
path?: ReadonlyArray<string | number>;
48+
label?: string;
49+
extensions?: TExtensions;
50+
}
51+
52+
export interface IncrementalStreamResult<
53+
TData = Array<unknown>,
54+
TExtensions = ObjMap<unknown>,
55+
> {
56+
errors?: ReadonlyArray<GraphQLError>;
57+
items?: TData | null;
58+
path?: ReadonlyArray<string | number>;
59+
label?: string;
60+
extensions?: TExtensions;
61+
}
62+
63+
export interface FormattedIncrementalStreamResult<
64+
TData = Array<unknown>,
65+
TExtensions = ObjMap<unknown>,
66+
> {
67+
errors?: ReadonlyArray<GraphQLFormattedError>;
68+
items?: TData | null;
69+
path?: ReadonlyArray<string | number>;
70+
label?: string;
71+
extensions?: TExtensions;
72+
}
73+
74+
export type IncrementalResult<
75+
TData = ObjMap<unknown>,
76+
TExtensions = ObjMap<unknown>,
77+
> =
78+
| IncrementalDeferResult<TData, TExtensions>
79+
| IncrementalStreamResult<TData, TExtensions>;
80+
81+
export type FormattedIncrementalResult<
82+
TData = ObjMap<unknown>,
83+
TExtensions = ObjMap<unknown>,
84+
> =
85+
| FormattedIncrementalDeferResult<TData, TExtensions>
86+
| FormattedIncrementalStreamResult<TData, TExtensions>;
87+
88+
export function yieldSubsequentPayloads(
89+
subsequentPayloads: Set<IncrementalDataRecord>,
90+
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
91+
let isDone = false;
92+
93+
async function next(): Promise<
94+
IteratorResult<SubsequentIncrementalExecutionResult, void>
95+
> {
96+
if (isDone) {
97+
return { value: undefined, done: true };
98+
}
99+
100+
await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise));
101+
102+
if (isDone) {
103+
// a different call to next has exhausted all payloads
104+
return { value: undefined, done: true };
105+
}
106+
107+
const incremental = getCompletedIncrementalResults(subsequentPayloads);
108+
const hasNext = subsequentPayloads.size > 0;
109+
110+
if (!incremental.length && hasNext) {
111+
return next();
112+
}
113+
114+
if (!hasNext) {
115+
isDone = true;
116+
}
117+
118+
return {
119+
value: incremental.length ? { incremental, hasNext } : { hasNext },
120+
done: false,
121+
};
122+
}
123+
124+
function returnStreamIterators() {
125+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
126+
subsequentPayloads.forEach((incrementalDataRecord) => {
127+
if (
128+
isStreamItemsRecord(incrementalDataRecord) &&
129+
incrementalDataRecord.asyncIterator?.return
130+
) {
131+
promises.push(incrementalDataRecord.asyncIterator.return());
132+
}
133+
});
134+
return Promise.all(promises);
135+
}
136+
137+
return {
138+
[Symbol.asyncIterator]() {
139+
return this;
140+
},
141+
next,
142+
async return(): Promise<
143+
IteratorResult<SubsequentIncrementalExecutionResult, void>
144+
> {
145+
await returnStreamIterators();
146+
isDone = true;
147+
return { value: undefined, done: true };
148+
},
149+
async throw(
150+
error?: unknown,
151+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
152+
await returnStreamIterators();
153+
isDone = true;
154+
return Promise.reject(error);
155+
},
156+
};
157+
}
158+
159+
function getCompletedIncrementalResults(
160+
subsequentPayloads: Set<IncrementalDataRecord>,
161+
): Array<IncrementalResult> {
162+
const incrementalResults: Array<IncrementalResult> = [];
163+
for (const incrementalDataRecord of subsequentPayloads) {
164+
const incrementalResult: IncrementalResult = {};
165+
if (!incrementalDataRecord.isCompleted) {
166+
continue;
167+
}
168+
subsequentPayloads.delete(incrementalDataRecord);
169+
if (isStreamItemsRecord(incrementalDataRecord)) {
170+
const items = incrementalDataRecord.items;
171+
if (incrementalDataRecord.isCompletedAsyncIterator) {
172+
// async iterable resolver just finished but there may be pending payloads
173+
continue;
174+
}
175+
(incrementalResult as IncrementalStreamResult).items = items;
176+
} else {
177+
const data = incrementalDataRecord.data;
178+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
179+
}
180+
181+
incrementalResult.path = incrementalDataRecord.path;
182+
if (incrementalDataRecord.label != null) {
183+
incrementalResult.label = incrementalDataRecord.label;
184+
}
185+
if (incrementalDataRecord.errors.length > 0) {
186+
incrementalResult.errors = incrementalDataRecord.errors;
187+
}
188+
incrementalResults.push(incrementalResult);
189+
}
190+
return incrementalResults;
191+
}
192+
193+
export function filterSubsequentPayloads(
194+
subsequentPayloads: Set<IncrementalDataRecord>,
195+
nullPath: Path,
196+
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
197+
): void {
198+
const nullPathArray = pathToArray(nullPath);
199+
subsequentPayloads.forEach((incrementalDataRecord) => {
200+
if (incrementalDataRecord === currentIncrementalDataRecord) {
201+
// don't remove payload from where error originates
202+
return;
203+
}
204+
for (let i = 0; i < nullPathArray.length; i++) {
205+
if (incrementalDataRecord.path[i] !== nullPathArray[i]) {
206+
// incrementalDataRecord points to a path unaffected by this payload
207+
return;
208+
}
209+
}
210+
// incrementalDataRecord path points to nulled error field
211+
if (
212+
isStreamItemsRecord(incrementalDataRecord) &&
213+
incrementalDataRecord.asyncIterator?.return
214+
) {
215+
incrementalDataRecord.asyncIterator.return().catch(() => {
216+
// ignore error
217+
});
218+
}
219+
subsequentPayloads.delete(incrementalDataRecord);
220+
});
221+
}
222+
223+
/** @internal */
224+
export class DeferredFragmentRecord {
225+
type: 'defer';
226+
errors: Array<GraphQLError>;
227+
label: string | undefined;
228+
path: Array<string | number>;
229+
promise: Promise<void>;
230+
data: ObjMap<unknown> | null;
231+
parentContext: IncrementalDataRecord | undefined;
232+
isCompleted: boolean;
233+
_subsequentPayloads: Set<IncrementalDataRecord>;
234+
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
235+
constructor(opts: {
236+
label: string | undefined;
237+
path: Path | undefined;
238+
parentContext: IncrementalDataRecord | undefined;
239+
subsequentPayloads: Set<IncrementalDataRecord>;
240+
}) {
241+
this.type = 'defer';
242+
this.label = opts.label;
243+
this.path = pathToArray(opts.path);
244+
this.parentContext = opts.parentContext;
245+
this.errors = [];
246+
this._subsequentPayloads = opts.subsequentPayloads;
247+
this._subsequentPayloads.add(this);
248+
this.isCompleted = false;
249+
this.data = null;
250+
const { promise, resolve } = promiseWithResolvers<ObjMap<unknown> | null>();
251+
this._resolve = resolve;
252+
this.promise = promise.then((data) => {
253+
this.data = data;
254+
this.isCompleted = true;
255+
});
256+
}
257+
258+
addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
259+
const parentData = this.parentContext?.promise;
260+
if (parentData) {
261+
this._resolve?.(parentData.then(() => data));
262+
return;
263+
}
264+
this._resolve?.(data);
265+
}
266+
}
267+
268+
/** @internal */
269+
export class StreamItemsRecord {
270+
type: 'stream';
271+
errors: Array<GraphQLError>;
272+
label: string | undefined;
273+
path: Array<string | number>;
274+
items: Array<unknown> | null;
275+
promise: Promise<void>;
276+
parentContext: IncrementalDataRecord | undefined;
277+
asyncIterator: AsyncIterator<unknown> | undefined;
278+
isCompletedAsyncIterator?: boolean;
279+
isCompleted: boolean;
280+
_subsequentPayloads: Set<IncrementalDataRecord>;
281+
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
282+
constructor(opts: {
283+
label: string | undefined;
284+
path: Path | undefined;
285+
asyncIterator?: AsyncIterator<unknown>;
286+
parentContext: IncrementalDataRecord | undefined;
287+
subsequentPayloads: Set<IncrementalDataRecord>;
288+
}) {
289+
this.type = 'stream';
290+
this.items = null;
291+
this.label = opts.label;
292+
this.path = pathToArray(opts.path);
293+
this.parentContext = opts.parentContext;
294+
this.asyncIterator = opts.asyncIterator;
295+
this.errors = [];
296+
this._subsequentPayloads = opts.subsequentPayloads;
297+
this._subsequentPayloads.add(this);
298+
this.isCompleted = false;
299+
this.items = null;
300+
const { promise, resolve } = promiseWithResolvers<Array<unknown> | null>();
301+
this._resolve = resolve;
302+
this.promise = promise.then((items) => {
303+
this.items = items;
304+
this.isCompleted = true;
305+
});
306+
}
307+
308+
addItems(items: PromiseOrValue<Array<unknown> | null>) {
309+
const parentData = this.parentContext?.promise;
310+
if (parentData) {
311+
this._resolve?.(parentData.then(() => items));
312+
return;
313+
}
314+
this._resolve?.(items);
315+
}
316+
317+
setIsCompletedAsyncIterator() {
318+
this.isCompletedAsyncIterator = true;
319+
}
320+
}
321+
322+
export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord;
323+
324+
function isStreamItemsRecord(
325+
incrementalDataRecord: IncrementalDataRecord,
326+
): incrementalDataRecord is StreamItemsRecord {
327+
return incrementalDataRecord.type === 'stream';
328+
}

src/execution/__tests__/defer-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type {
20-
InitialIncrementalExecutionResult,
21-
SubsequentIncrementalExecutionResult,
22-
} from '../execute.js';
19+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2320
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21+
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
2422

2523
const friendType = new GraphQLObjectType({
2624
fields: {

src/execution/__tests__/stream-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ import {
1717
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1818
import { GraphQLSchema } from '../../type/schema.js';
1919

20-
import type {
21-
InitialIncrementalExecutionResult,
22-
SubsequentIncrementalExecutionResult,
23-
} from '../execute.js';
20+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2421
import { experimentalExecuteIncrementally } from '../execute.js';
22+
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
2523

2624
const friendType = new GraphQLObjectType({
2725
fields: {

0 commit comments

Comments
 (0)