@@ -153,8 +153,46 @@ func (o *operation) InitMsg(p []byte, oob []byte) {
153
153
}
154
154
}
155
155
156
+ // waitIO waits for the IO operation o to complete.
157
+ func waitIO (o * operation ) error {
158
+ fd := o .fd
159
+ if ! fd .pd .pollable () {
160
+ // The overlapped handle is not added to the runtime poller,
161
+ // the only way to wait for the IO to complete is block.
162
+ _ , err := syscall .WaitForSingleObject (fd .Sysfd , syscall .INFINITE )
163
+ return err
164
+ }
165
+ // Wait for our request to complete.
166
+ err := fd .pd .wait (int (o .mode ), fd .isFile )
167
+ switch err {
168
+ case nil , ErrNetClosing , ErrFileClosing , ErrDeadlineExceeded :
169
+ // No other error is expected.
170
+ default :
171
+ panic ("unexpected runtime.netpoll error: " + err .Error ())
172
+ }
173
+ return err
174
+ }
175
+
176
+ // cancelIO cancels the IO operation o and waits for it to complete.
177
+ func cancelIO (o * operation ) {
178
+ fd := o .fd
179
+ if ! fd .pd .pollable () {
180
+ return
181
+ }
182
+ // Cancel our request.
183
+ err := syscall .CancelIoEx (fd .Sysfd , & o .o )
184
+ // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
185
+ if err != nil && err != syscall .ERROR_NOT_FOUND {
186
+ // TODO(brainman): maybe do something else, but panic.
187
+ panic (err )
188
+ }
189
+ fd .pd .waitCanceled (int (o .mode ))
190
+ }
191
+
156
192
// execIO executes a single IO operation o.
157
193
// It supports both synchronous and asynchronous IO.
194
+ // o.qty and o.flags are set to zero before calling submit
195
+ // to avoid reusing the values from a previous call.
158
196
func execIO (o * operation , submit func (o * operation ) error ) (int , error ) {
159
197
fd := o .fd
160
198
fd .initIO ()
@@ -163,89 +201,42 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
163
201
if err != nil {
164
202
return 0 , err
165
203
}
166
- getOverlappedResult := func () (int , error ) {
204
+ // Start IO.
205
+ o .qty = 0
206
+ o .flags = 0
207
+ err = submit (o )
208
+ var waitErr error
209
+ if err == syscall .ERROR_IO_PENDING || (err == nil && ! o .fd .skipSyncNotif ) {
210
+ // IO started asynchronously or completed synchronously but
211
+ // a sync notification is required. Wait for it to complete.
212
+ waitErr = waitIO (o )
213
+ if waitErr != nil {
214
+ // IO interrupted by "close" or "timeout".
215
+ cancelIO (o )
216
+ // We issued a cancellation request, but the IO operation may still succeeded
217
+ // before the cancellation request runs.
218
+ }
167
219
if fd .isFile {
168
220
err = windows .GetOverlappedResult (fd .Sysfd , & o .o , & o .qty , false )
169
221
} else {
170
222
err = windows .WSAGetOverlappedResult (fd .Sysfd , & o .o , & o .qty , false , & o .flags )
171
223
}
172
- switch err {
173
- case nil :
174
- return int (o .qty ), nil
175
- case syscall .ERROR_HANDLE_EOF :
176
- // EOF reached.
177
- return int (o .qty ), io .EOF
178
- case syscall .ERROR_MORE_DATA , windows .WSAEMSGSIZE :
179
- // More data available. Return back the size of received data.
180
- return int (o .qty ), err
181
- default :
182
- return 0 , err
183
- }
184
- }
185
- // Start IO.
186
- err = submit (o )
187
- if ! fd .pd .pollable () {
188
- if err == syscall .ERROR_IO_PENDING {
189
- // The overlapped handle is not added to the runtime poller,
190
- // the only way to wait for the IO to complete is block.
191
- _ , err = syscall .WaitForSingleObject (fd .Sysfd , syscall .INFINITE )
192
- if err == nil {
193
- return getOverlappedResult ()
194
- }
195
- }
196
- if err != nil {
197
- return 0 , err
198
- }
199
- return int (o .qty ), nil
200
- }
201
- switch err {
202
- case nil :
203
- // IO completed immediately
204
- if o .fd .skipSyncNotif {
205
- // No completion message will follow, so return immediately.
206
- return int (o .qty ), nil
207
- }
208
- // Need to get our completion message anyway.
209
- case syscall .ERROR_IO_PENDING :
210
- // IO started, and we have to wait for its completion.
211
- err = nil
212
- default :
213
- return 0 , err
214
224
}
215
- // Wait for our request to complete.
216
- err = fd .pd .wait (int (o .mode ), fd .isFile )
217
- if err == nil {
218
- // All is good. Extract our IO results and return.
219
- return getOverlappedResult ()
220
- }
221
- // IO is interrupted by "close" or "timeout"
222
- netpollErr := err
223
- switch netpollErr {
224
- case ErrNetClosing , ErrFileClosing , ErrDeadlineExceeded :
225
- // will deal with those.
226
- default :
227
- panic ("unexpected runtime.netpoll error: " + netpollErr .Error ())
228
- }
229
- // Cancel our request.
230
- err = syscall .CancelIoEx (fd .Sysfd , & o .o )
231
- // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
232
- if err != nil && err != syscall .ERROR_NOT_FOUND {
233
- // TODO(brainman): maybe do something else, but panic.
234
- panic (err )
235
- }
236
- // Wait for cancellation to complete.
237
- fd .pd .waitCanceled (int (o .mode ))
238
- n , err := getOverlappedResult ()
239
- if err != nil {
240
- if err == syscall .ERROR_OPERATION_ABORTED { // IO Canceled
241
- err = netpollErr
225
+ // ERROR_OPERATION_ABORTED may have been caused by us. In that case,
226
+ // map it to our own error. Don't do more than that, each submitted
227
+ // function may have its own meaning for each error.
228
+ if err == syscall .ERROR_OPERATION_ABORTED {
229
+ if waitErr != nil {
230
+ // IO canceled by the poller while waiting for completion.
231
+ err = waitErr
232
+ } else if fd .kind == kindPipe && fd .closing () {
233
+ // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
234
+ // If the fd is a pipe and the Write was interrupted by CancelIoEx,
235
+ // we assume it is interrupted by Close.
236
+ err = errClosing (fd .isFile )
242
237
}
243
- return n , err
244
238
}
245
- // We issued a cancellation request. But, it seems, IO operation succeeded
246
- // before the cancellation request run. We need to treat the IO operation as
247
- // succeeded (the bytes are actually sent/recv from network).
248
- return n , nil
239
+ return int (o .qty ), err
249
240
}
250
241
251
242
// FD is a file descriptor. The net and os packages embed this type in
@@ -341,9 +332,6 @@ const (
341
332
kindPipe
342
333
)
343
334
344
- // logInitFD is set by tests to enable file descriptor initialization logging.
345
- var logInitFD func (net int , fd * FD , err error )
346
-
347
335
func (fd * FD ) initIO () error {
348
336
fd .initIOOnce .Do (func () {
349
337
if fd .initPollable {
@@ -358,19 +346,13 @@ func (fd *FD) initIO() error {
358
346
fd .initPollable = false
359
347
}
360
348
}
361
- if logInitFD != nil {
362
- logInitFD (int (fd .kind ), fd , fd .initIOErr )
363
- }
364
349
if ! fd .initPollable {
365
350
// Handle opened for overlapped I/O (aka non-blocking) that are not added
366
351
// to the runtime poller need special handling when reading and writing.
367
- var info windows.FILE_MODE_INFORMATION
368
- if err := windows .NtQueryInformationFile (fd .Sysfd , & windows.IO_STATUS_BLOCK {}, unsafe .Pointer (& info ), uint32 (unsafe .Sizeof (info )), windows .FileModeInformation ); err == nil {
369
- fd .isBlocking = info .Mode & (windows .FILE_SYNCHRONOUS_IO_ALERT | windows .FILE_SYNCHRONOUS_IO_NONALERT ) != 0
370
- } else {
371
- // If we fail to get the file mode information, assume the file is blocking.
372
- fd .isBlocking = true
373
- }
352
+ // If we fail to get the file mode information, assume the file is blocking.
353
+ overlapped , _ := windows .IsNonblock (fd .Sysfd )
354
+ fd .isBlocking = ! overlapped
355
+ fd .skipSyncNotif = true
374
356
} else {
375
357
fd .rop .runtimeCtx = fd .pd .runtimeCtx
376
358
fd .wop .runtimeCtx = fd .pd .runtimeCtx
@@ -379,9 +361,7 @@ func (fd *FD) initIO() error {
379
361
err := syscall .SetFileCompletionNotificationModes (fd .Sysfd ,
380
362
syscall .FILE_SKIP_SET_EVENT_ON_HANDLE | syscall .FILE_SKIP_COMPLETION_PORT_ON_SUCCESS ,
381
363
)
382
- if err == nil {
383
- fd .skipSyncNotif = true
384
- }
364
+ fd .skipSyncNotif = err == nil
385
365
}
386
366
}
387
367
})
@@ -429,11 +409,6 @@ func (fd *FD) Init(net string, pollable bool) error {
429
409
// handles and that cares about handle IOCP association errors.
430
410
// We can should do the IOCP association here.
431
411
return fd .initIO ()
432
- } else {
433
- if logInitFD != nil {
434
- // For testing.
435
- logInitFD (int (fd .kind ), fd , nil )
436
- }
437
412
}
438
413
return nil
439
414
}
@@ -508,21 +483,13 @@ func (fd *FD) Read(buf []byte) (int, error) {
508
483
return syscall .ReadFile (o .fd .Sysfd , unsafe .Slice (o .buf .Buf , o .buf .Len ), & o .qty , o .overlapped ())
509
484
})
510
485
fd .addOffset (n )
511
- if err == syscall .ERROR_HANDLE_EOF {
486
+ switch err {
487
+ case syscall .ERROR_HANDLE_EOF :
512
488
err = io .EOF
513
- }
514
- if fd .kind == kindPipe && err != nil {
515
- switch err {
516
- case syscall .ERROR_BROKEN_PIPE :
517
- // Returned by pipes when the other end is closed.
518
- err = nil
519
- case syscall .ERROR_OPERATION_ABORTED :
520
- if fd .closing () {
521
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
522
- // If the fd is a pipe and the Read was interrupted by CancelIoEx,
523
- // we assume it is interrupted by Close.
524
- err = ErrFileClosing
525
- }
489
+ case syscall .ERROR_BROKEN_PIPE :
490
+ // ReadFile only documents ERROR_BROKEN_PIPE for pipes.
491
+ if fd .kind == kindPipe {
492
+ err = io .EOF
526
493
}
527
494
}
528
495
case kindNet :
@@ -646,10 +613,8 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
646
613
n , err := execIO (o , func (o * operation ) error {
647
614
return syscall .ReadFile (o .fd .Sysfd , unsafe .Slice (o .buf .Buf , o .buf .Len ), & o .qty , & o .o )
648
615
})
649
- if err != nil {
650
- if err == syscall .ERROR_HANDLE_EOF {
651
- err = io .EOF
652
- }
616
+ if err == syscall .ERROR_HANDLE_EOF {
617
+ err = io .EOF
653
618
}
654
619
if len (b ) != 0 {
655
620
err = fd .eofError (n , err )
@@ -774,12 +739,6 @@ func (fd *FD) Write(buf []byte) (int, error) {
774
739
return syscall .WriteFile (o .fd .Sysfd , unsafe .Slice (o .buf .Buf , o .buf .Len ), & o .qty , o .overlapped ())
775
740
})
776
741
fd .addOffset (n )
777
- if fd .kind == kindPipe && err == syscall .ERROR_OPERATION_ABORTED && fd .closing () {
778
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
779
- // If the fd is a pipe and the Write was interrupted by CancelIoEx,
780
- // we assume it is interrupted by Close.
781
- err = ErrFileClosing
782
- }
783
742
case kindNet :
784
743
if race .Enabled {
785
744
race .ReleaseMerge (unsafe .Pointer (& ioSync ))
@@ -1185,10 +1144,10 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
1185
1144
// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
1186
1145
o := & fd .rop
1187
1146
o .InitBuf (nil )
1188
- if ! fd .IsStream {
1189
- o .flags |= windows .MSG_PEEK
1190
- }
1191
1147
_ , err := execIO (o , func (o * operation ) error {
1148
+ if ! fd .IsStream {
1149
+ o .flags |= windows .MSG_PEEK
1150
+ }
1192
1151
return syscall .WSARecv (o .fd .Sysfd , & o .buf , 1 , & o .qty , & o .flags , & o .o , nil )
1193
1152
})
1194
1153
if err == windows .WSAEMSGSIZE {
0 commit comments