Skip to content

Commit 4eb9370

Browse files
committed
feat: stream cancelling tests
1 parent a702f7f commit 4eb9370

File tree

1 file changed

+123
-28
lines changed

1 file changed

+123
-28
lines changed

tests/WebSocketStream.test.ts

Lines changed: 123 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import * as utils from '@/utils';
99
import * as messageUtils from '@/message/utils';
1010
import { StreamMessageType } from '@/message';
1111

12-
type StreamOptions = Partial<Parameters<typeof WebSocketStream.createWebSocketStream>[0]>;
12+
type StreamOptions = Partial<
13+
Parameters<typeof WebSocketStream.createWebSocketStream>[0]
14+
>;
1315

1416
// Smaller buffer size for the sake of testing
1517
const STREAM_BUFFER_SIZE = 64;
@@ -54,7 +56,7 @@ jest.mock('@/WebSocketConnection', () => {
5456
bufferSize: STREAM_BUFFER_SIZE,
5557
connection: instance.connectedConnection!,
5658
logger: logger2,
57-
...streamOptions
59+
...streamOptions,
5860
});
5961
instance.connectedConnection!.dispatchEvent(
6062
new events.EventWebSocketConnectionStream({
@@ -78,7 +80,9 @@ describe(WebSocketStream.name, () => {
7880
streamIdCounter = 0n;
7981
});
8082

81-
async function createConnectionPair(streamOptions: StreamOptions = {}): Promise<[WebSocketConnection, WebSocketConnection]> {
83+
async function createConnectionPair(
84+
streamOptions: StreamOptions = {},
85+
): Promise<[WebSocketConnection, WebSocketConnection]> {
8286
const connection1 = new (WebSocketConnection as any)(streamOptions);
8387
const connection2 = new (WebSocketConnection as any)(streamOptions);
8488
(connection1 as any).connectTo(connection2);
@@ -95,7 +99,7 @@ describe(WebSocketStream.name, () => {
9599
bufferSize: STREAM_BUFFER_SIZE,
96100
connection: connection1,
97101
logger: logger1,
98-
...streamOptions
102+
...streamOptions,
99103
});
100104
const createStream2Prom = promise<WebSocketStream>();
101105
connection2.addEventListener(
@@ -111,7 +115,9 @@ describe(WebSocketStream.name, () => {
111115
}
112116

113117
async function createStreamPair(streamOptions: StreamOptions = {}) {
114-
const [connection1, connection2] = await createConnectionPair(streamOptions);
118+
const [connection1, connection2] = await createConnectionPair(
119+
streamOptions,
120+
);
115121
return createStreamPairFrom(connection1, connection2, streamOptions);
116122
}
117123

@@ -168,30 +174,31 @@ describe(WebSocketStream.name, () => {
168174
await stream.destroy();
169175
}
170176
});
171-
test(
172-
'should propagate errors over stream for writable',
173-
async () => {
174-
const testReason = Symbol('TestReason');
175-
const codeToReason = (type, code: bigint) => {
176-
switch (code) {
177-
case 4002n:
178-
return testReason;
179-
default:
180-
return new Error(`${type.toString()} ${code.toString()}`);
181-
}
182-
};
183-
const reasonToCode = (type, reason) => {
184-
if (reason === testReason) return 4002n;
185-
return 0n;
186-
};
187-
const [stream1, stream2] = await createStreamPair({ codeToReason, reasonToCode });
177+
test('should propagate errors over stream for writable', async () => {
178+
const testReason = Symbol('TestReason');
179+
const codeToReason = (type, code: bigint) => {
180+
switch (code) {
181+
case 4002n:
182+
return testReason;
183+
default:
184+
return new Error(`${type.toString()} ${code.toString()}`);
185+
}
186+
};
187+
const reasonToCode = (type, reason) => {
188+
if (reason === testReason) return 4002n;
189+
return 0n;
190+
};
191+
const [stream1, stream2] = await createStreamPair({
192+
codeToReason,
193+
reasonToCode,
194+
});
188195

189-
const stream1Readable = stream1.readable;
190-
const stream2Writable = stream2.writable;
191-
await stream2Writable.abort(testReason);
192-
await expect(stream1Readable.getReader().read()).rejects.toBe(testReason);
193-
}
194-
);
196+
const stream1Readable = stream1.readable;
197+
const stream2Writable = stream2.writable;
198+
await stream2Writable.abort(testReason);
199+
await expect(stream1Readable.getReader().read()).rejects.toBe(testReason);
200+
await expect(stream2Writable.getWriter().write()).rejects.toBe(testReason);
201+
});
195202
testProp(
196203
'should send data over stream - single write within buffer size',
197204
[fc.uint8Array({ maxLength: STREAM_BUFFER_SIZE })],
@@ -440,4 +447,92 @@ describe(WebSocketStream.name, () => {
440447
}
441448
},
442449
);
450+
test('streams can be cancelled after data sent', async () => {
451+
const cancelReason = Symbol('CancelReason');
452+
const codeToReason = (type, code: bigint) => {
453+
switch (code) {
454+
case 4001n:
455+
return cancelReason;
456+
default:
457+
return new Error(`${type.toString()} ${code.toString()}`);
458+
}
459+
};
460+
const reasonToCode = (_type, reason) => {
461+
if (reason === cancelReason) return 4001n;
462+
return 0n;
463+
};
464+
const [_stream1, stream2] = await createStreamPair({
465+
codeToReason,
466+
reasonToCode,
467+
});
468+
469+
const writer = stream2.writable.getWriter();
470+
await writer.write(new Uint8Array(2));
471+
writer.releaseLock();
472+
await stream2.cancel(cancelReason);
473+
474+
await expect(stream2.readable.getReader().read()).rejects.toBe(
475+
cancelReason,
476+
);
477+
await expect(stream2.writable.getWriter().write()).rejects.toBe(
478+
cancelReason,
479+
);
480+
});
481+
test('streams can be cancelled with no data sent', async () => {
482+
const cancelReason = Symbol('CancelReason');
483+
const codeToReason = (type, code: bigint) => {
484+
switch (code) {
485+
case 4001n:
486+
return cancelReason;
487+
default:
488+
return new Error(`${type.toString()} ${code.toString()}`);
489+
}
490+
};
491+
const reasonToCode = (_type, reason) => {
492+
if (reason === cancelReason) return 4001n;
493+
return 0n;
494+
};
495+
const [_stream1, stream2] = await createStreamPair({
496+
codeToReason,
497+
reasonToCode,
498+
});
499+
500+
await stream2.cancel(cancelReason);
501+
502+
await expect(stream2.readable.getReader().read()).rejects.toBe(
503+
cancelReason,
504+
);
505+
await expect(stream2.writable.getWriter().write()).rejects.toBe(
506+
cancelReason,
507+
);
508+
});
509+
test('streams can be cancelled concurrently after data sent', async () => {
510+
const [stream1, stream2] = await createStreamPair();
511+
512+
const writer = stream2.writable.getWriter();
513+
await writer.write(new Uint8Array(2));
514+
515+
const reader = stream1.readable.getReader();
516+
reader.releaseLock();
517+
518+
await Promise.all([writer.close(), stream1.writable.close()]);
519+
});
520+
// test('stream can error when blocked on data', async () => {
521+
// const [stream1, stream2] = await createStreamPair();
522+
523+
// const message = new Uint8Array(STREAM_BUFFER_SIZE * 2);
524+
525+
// const stream1Writer = stream1.writable.getWriter();
526+
// void stream1Writer.write(message);
527+
// stream1Writer.releaseLock();
528+
529+
// const stream2Writer = stream2.writable.getWriter();
530+
// void stream2Writer.write(message);
531+
// stream2Writer.releaseLock();
532+
533+
// await Promise.all([
534+
// stream1Writer.abort(Error('some error')),
535+
// stream1Writer.abort(Error('some error')),
536+
// ]);
537+
// });
443538
});

0 commit comments

Comments
 (0)