1
1
import type { StreamCodeToReason , StreamReasonToCode } from './types' ;
2
2
import type WebSocketConnection from './WebSocketConnection' ;
3
3
import type { StreamId , StreamMessage , VarInt } from './message' ;
4
+ import {
5
+ ReadableStream ,
6
+ WritableStream ,
7
+ CountQueuingStrategy ,
8
+ } from 'stream/web' ;
4
9
import { CreateDestroy , status } from '@matrixai/async-init/dist/CreateDestroy' ;
5
10
import Logger from '@matrixai/logger' ;
6
11
import { promise } from './utils' ;
@@ -13,12 +18,6 @@ import {
13
18
StreamMessageType ,
14
19
StreamShutdown ,
15
20
} from './message' ;
16
- import {
17
- ReadableStream ,
18
- WritableStream ,
19
- CountQueuingStrategy ,
20
- ReadableByteStreamController ,
21
- } from 'stream/web' ;
22
21
import WebSocketStreamQueue from './WebSocketStreamQueue' ;
23
22
24
23
interface WebSocketStream extends CreateDestroy { }
@@ -129,14 +128,14 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
129
128
return ;
130
129
}
131
130
132
- // reset the promise before a read from the queue to wait until the queue has items
131
+ // Reset the promise before a read from the queue to wait until the queue has items
133
132
if ( this . readableQueue . count === 0 ) {
134
133
this . readableBufferReady . resolveP ( ) ;
135
134
this . readableBufferReady = promise < void > ( ) ;
136
135
}
137
136
await this . readableBufferReady . p ;
138
137
139
- // data will be null in the case of stream destruction before the readable buffer is blocked
138
+ // Data will be null in the case of stream destruction before the readable buffer is blocked
140
139
// we're going to just enqueue an empty buffer in case it is null for some other reason, so that the next read is able to complete
141
140
const data = this . readableQueue . dequeue ( ) ;
142
141
if ( data == null ) {
@@ -146,7 +145,9 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
146
145
const readBytes = data . length ;
147
146
controller . enqueue ( data ) ;
148
147
149
- this . logger . debug ( `${ readBytes } bytes have been pushed onto stream buffer` )
148
+ this . logger . debug (
149
+ `${ readBytes } bytes have been pushed onto stream buffer` ,
150
+ ) ;
150
151
151
152
await this . streamSend ( {
152
153
type : StreamMessageType . Ack ,
@@ -158,9 +159,9 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
158
159
await this . signalReadableEnd ( true , reason ) ;
159
160
} ,
160
161
} ,
161
- {
162
+ new CountQueuingStrategy ( {
162
163
highWaterMark : 1 ,
163
- }
164
+ } ) ,
164
165
) ;
165
166
166
167
this . writable = new WritableStream (
@@ -180,25 +181,28 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
180
181
181
182
await this . writableDesiredSizeProm . p ;
182
183
183
- // chunking
184
- let data : Uint8Array ;
184
+ // Chunking
185
+ // .subarray parameters begin and end are clamped to the size of the Uint8Array
186
+ const data = chunk . subarray ( 0 , this . writableDesiredSize ) ;
185
187
if ( chunk . length > this . writableDesiredSize ) {
186
188
this . logger . debug (
187
189
`this chunk will be split into sizes of ${ this . writableDesiredSize } bytes` ,
188
190
) ;
189
191
}
190
- // .subarray parameters begin and end are clamped to the size of the Uint8Array
191
- data = chunk . subarray ( 0 , this . writableDesiredSize ) ;
192
192
193
193
const bytesWritten = data . length ;
194
194
if ( this . writableDesiredSize === bytesWritten ) {
195
- this . logger . debug ( `this chunk will trigger receiver to send an ACK` ) ;
195
+ this . logger . debug (
196
+ `this chunk will trigger receiver to send an ACK` ,
197
+ ) ;
196
198
// Reset the promise to wait for another ACK
197
199
this . writableDesiredSizeProm = promise ( ) ;
198
200
}
199
201
// Decrement the desired size by the amount of bytes written
200
202
this . writableDesiredSize -= bytesWritten ;
201
- this . logger . debug ( `writableDesiredSize is now ${ this . writableDesiredSize } due to write` ) ;
203
+ this . logger . debug (
204
+ `writableDesiredSize is now ${ this . writableDesiredSize } due to write` ,
205
+ ) ;
202
206
await this . streamSend ( {
203
207
type : StreamMessageType . Data ,
204
208
payload : data ,
@@ -284,13 +288,16 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
284
288
if ( parsedMessage . type === StreamMessageType . Ack ) {
285
289
this . writableDesiredSize += parsedMessage . payload ;
286
290
this . writableDesiredSizeProm . resolveP ( ) ;
287
- this . logger . debug ( `writableDesiredSize is now ${ this . writableDesiredSize } due to ACK` ) ;
291
+ this . logger . debug (
292
+ `writableDesiredSize is now ${ this . writableDesiredSize } due to ACK` ,
293
+ ) ;
288
294
} else if ( parsedMessage . type === StreamMessageType . Data ) {
289
295
if ( this . _readableEnded ) {
290
296
return ;
291
297
}
292
298
if (
293
- parsedMessage . payload . length > ( this . readableQueueBufferSize - this . readableQueue . length )
299
+ parsedMessage . payload . length >
300
+ this . readableQueueBufferSize - this . readableQueue . length
294
301
) {
295
302
await this . signalReadableEnd (
296
303
true ,
@@ -372,7 +379,7 @@ class WebSocketStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
372
379
this . _readableEnded = true ;
373
380
// Resolve readable promise in case blocking
374
381
this . readableBufferReady . resolveP ( ) ;
375
- // clear the readable queue
382
+ // Clear the readable queue
376
383
this . readableQueue . clear ( ) ;
377
384
// Shutdown the write side of the other stream
378
385
if ( isError ) {
0 commit comments