@@ -190,6 +190,7 @@ describe(WebSocketStream.name, () => {
190
190
const stream2Writable = stream2 . writable ;
191
191
await stream2Writable . abort ( testReason ) ;
192
192
await expect ( stream1Readable . getReader ( ) . read ( ) ) . rejects . toBe ( testReason ) ;
193
+ await expect ( stream2Writable . getWriter ( ) . write ( ) ) . rejects . toBe ( testReason ) ;
193
194
}
194
195
) ;
195
196
testProp (
@@ -440,4 +441,73 @@ describe(WebSocketStream.name, () => {
440
441
}
441
442
} ,
442
443
) ;
444
+ test ( 'streams can be cancelled after data sent' , async ( ) => {
445
+ const cancelReason = Symbol ( 'CancelReason' ) ;
446
+ const codeToReason = ( type , code : bigint ) => {
447
+ switch ( code ) {
448
+ case 4001n :
449
+ return cancelReason ;
450
+ default :
451
+ return new Error ( `${ type . toString ( ) } ${ code . toString ( ) } ` ) ;
452
+ }
453
+ } ;
454
+ const reasonToCode = ( _type , reason ) => {
455
+ if ( reason === cancelReason ) return 4001n ;
456
+ return 0n ;
457
+ } ;
458
+ const [ _stream1 , stream2 ] = await createStreamPair ( { codeToReason, reasonToCode } ) ;
459
+
460
+ const writer = stream2 . writable . getWriter ( ) ;
461
+ await writer . write ( new Uint8Array ( 2 ) ) ;
462
+ writer . releaseLock ( ) ;
463
+ await stream2 . cancel ( cancelReason ) ;
464
+
465
+ await expect ( stream2 . readable . getReader ( ) . read ( ) ) . rejects . toBe ( cancelReason ) ;
466
+ await expect ( stream2 . writable . getWriter ( ) . write ( ) ) . rejects . toBe ( cancelReason ) ;
467
+ } ) ;
468
+ test ( 'streams can be cancelled with no data sent' , async ( ) => {
469
+ const cancelReason = Symbol ( 'CancelReason' ) ;
470
+ const codeToReason = ( type , code : bigint ) => {
471
+ switch ( code ) {
472
+ case 4001n :
473
+ return cancelReason ;
474
+ default :
475
+ return new Error ( `${ type . toString ( ) } ${ code . toString ( ) } ` ) ;
476
+ }
477
+ } ;
478
+ const reasonToCode = ( _type , reason ) => {
479
+ if ( reason === cancelReason ) return 4001n ;
480
+ return 0n ;
481
+ } ;
482
+ const [ _stream1 , stream2 ] = await createStreamPair ( { codeToReason, reasonToCode } ) ;
483
+
484
+ await stream2 . cancel ( cancelReason ) ;
485
+
486
+ await expect ( stream2 . readable . getReader ( ) . read ( ) ) . rejects . toBe ( cancelReason ) ;
487
+ await expect ( stream2 . writable . getWriter ( ) . write ( ) ) . rejects . toBe ( cancelReason ) ;
488
+ } ) ;
489
+ test ( 'streams can be cancelled concurrently after data sent' , async ( ) => {
490
+ const [ stream1 , stream2 ] = await createStreamPair ( ) ;
491
+
492
+ const writer = stream2 . writable . getWriter ( ) ;
493
+ await writer . write ( new Uint8Array ( 2 ) ) ;
494
+
495
+ const reader = stream1 . readable . getReader ( ) ;
496
+ reader . releaseLock ( ) ;
497
+
498
+ await Promise . all ( [ writer . close ( ) , stream1 . writable . close ( ) ] )
499
+ } ) ;
500
+ test ( 'stream can error when blocked on data' , async ( ) => {
501
+ const [ stream1 , stream2 ] = await createStreamPair ( ) ;
502
+
503
+ const message = new Uint8Array ( STREAM_BUFFER_SIZE * 2 ) ;
504
+
505
+ const stream1Writer = stream1 . writable . getWriter ( ) ;
506
+ stream1Writer . write ( message ) ;
507
+
508
+ const stream2Writer = stream2 . writable . getWriter ( ) ;
509
+ stream2Writer . write ( message ) ;
510
+
511
+ Promise . all ( [ stream1Writer . abort ( Error ( 'some error' ) ) , stream1Writer . abort ( Error ( 'some error' ) ) ] ) ;
512
+ } ) ;
443
513
} ) ;
0 commit comments