Skip to content

Commit 73b3b61

Browse files
authored
[breaking] Simplified gRPC streams helpers (#1857)
* Simplified gRPC streams helpers * Moved FeedStreamTo and ConsumeStreamFrom into deamon package and mde them private
1 parent 4a4d1fd commit 73b3b61

File tree

4 files changed

+51
-32
lines changed

4 files changed

+51
-32
lines changed

commands/daemon/daemon.go

+8-17
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"io"
2323

2424
"github.com/arduino/arduino-cli/arduino"
25-
"github.com/arduino/arduino-cli/arduino/utils"
2625
"github.com/arduino/arduino-cli/commands"
2726
"github.com/arduino/arduino-cli/commands/board"
2827
"github.com/arduino/arduino-cli/commands/compile"
@@ -259,16 +258,14 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke
259258

260259
// Compile FIXMEDOC
261260
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
262-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
263-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
261+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
262+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
264263
compileResp, compileErr := compile.Compile(
265264
stream.Context(), req, outStream, errStream,
266265
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
267266
false) // Set debug to false
268267
outStream.Close()
269268
errStream.Close()
270-
<-outCtx.Done()
271-
<-errCtx.Done()
272269
var compileRespSendErr error
273270
if compileResp != nil {
274271
compileRespSendErr = stream.Send(compileResp)
@@ -346,31 +343,27 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf
346343

347344
// Upload FIXMEDOC
348345
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
349-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
350-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
346+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
347+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
351348
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
352349
outStream.Close()
353350
errStream.Close()
354351
if err != nil {
355352
return convertErrorToRPCStatus(err)
356353
}
357-
<-outCtx.Done()
358-
<-errCtx.Done()
359354
return stream.Send(resp)
360355
}
361356

362357
// UploadUsingProgrammer FIXMEDOC
363358
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
364-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
365-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
359+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
360+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
366361
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
367362
outStream.Close()
368363
errStream.Close()
369364
if err != nil {
370365
return convertErrorToRPCStatus(err)
371366
}
372-
<-outCtx.Done()
373-
<-errCtx.Done()
374367
return stream.Send(resp)
375368
}
376369

@@ -382,16 +375,14 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp
382375

383376
// BurnBootloader FIXMEDOC
384377
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
385-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
386-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
378+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
379+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
387380
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
388381
outStream.Close()
389382
errStream.Close()
390383
if err != nil {
391384
return convertErrorToRPCStatus(err)
392385
}
393-
<-outCtx.Done()
394-
<-errCtx.Done()
395386
return stream.Send(resp)
396387
}
397388

commands/daemon/debug.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"context"
2020
"os"
2121

22-
"github.com/arduino/arduino-cli/arduino/utils"
2322
cmd "github.com/arduino/arduino-cli/commands/debug"
2423
dbg "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/debug/v1"
2524
"github.com/pkg/errors"
@@ -50,9 +49,9 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
5049
// Launch debug recipe attaching stdin and out to grpc streaming
5150
signalChan := make(chan os.Signal)
5251
defer close(signalChan)
53-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
52+
outStream := feedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
5453
resp, debugErr := cmd.Debug(stream.Context(), req,
55-
utils.ConsumeStreamFrom(func() ([]byte, error) {
54+
consumeStreamFrom(func() ([]byte, error) {
5655
command, err := stream.Recv()
5756
if command.GetSendInterrupt() {
5857
signalChan <- os.Interrupt
@@ -65,7 +64,6 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
6564
if debugErr != nil {
6665
return debugErr
6766
}
68-
<-outCtx.Done()
6967
return stream.Send(resp)
7068
}
7169

arduino/utils/stream.go renamed to commands/daemon/stream.go

+35-11
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,42 @@
1313
// Arduino software without disclosing the source code of your own applications.
1414
// To purchase a commercial license, send an email to [email protected].
1515

16-
package utils
16+
package daemon
1717

1818
import (
19-
"context"
2019
"io"
20+
"sync"
2121
"time"
2222

2323
"github.com/djherbis/buffer"
2424
"github.com/djherbis/nio/v3"
2525
)
2626

27-
// FeedStreamTo creates a pipe to pass data to the writer function.
28-
// FeedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
27+
// implWriteCloser is an helper struct to implement an anonymous io.WriteCloser
28+
type implWriteCloser struct {
29+
write func(buff []byte) (int, error)
30+
close func() error
31+
}
32+
33+
func (w *implWriteCloser) Write(buff []byte) (int, error) {
34+
return w.write(buff)
35+
}
36+
37+
func (w *implWriteCloser) Close() error {
38+
return w.close()
39+
}
40+
41+
// feedStreamTo creates a pipe to pass data to the writer function.
42+
// feedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
2943
// The user must call Close() on the returned io.WriteCloser to release all the resources.
3044
// If needed, the context can be used to detect when all the data has been processed after
3145
// closing the writer.
32-
func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
33-
ctx, cancel := context.WithCancel(context.Background())
46+
func feedStreamTo(writer func(data []byte)) io.WriteCloser {
3447
r, w := nio.Pipe(buffer.New(32 * 1024))
48+
var wg sync.WaitGroup
49+
wg.Add(1)
3550
go func() {
36-
defer cancel()
51+
defer wg.Done()
3752
data := make([]byte, 16384)
3853
for {
3954
if n, err := r.Read(data); err == nil {
@@ -50,12 +65,21 @@ func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
5065
}
5166
}
5267
}()
53-
return w, ctx
68+
return &implWriteCloser{
69+
write: w.Write,
70+
close: func() error {
71+
if err := w.Close(); err != nil {
72+
return err
73+
}
74+
wg.Wait()
75+
return nil
76+
},
77+
}
5478
}
5579

56-
// ConsumeStreamFrom creates a pipe to consume data from the reader function.
57-
// ConsumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
58-
func ConsumeStreamFrom(reader func() ([]byte, error)) io.Reader {
80+
// consumeStreamFrom creates a pipe to consume data from the reader function.
81+
// consumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
82+
func consumeStreamFrom(reader func() ([]byte, error)) io.Reader {
5983
r, w := io.Pipe()
6084
go func() {
6185
for {

docs/UPGRADING.md

+6
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ directory.
305305
- `github.com/arduino/arduino-cli/configuration.BundleToolsDirectories` has been renamed to `BuiltinToolsDirectories`
306306
- `github.com/arduino/arduino-cli/configuration.IDEBundledLibrariesDir` has been renamed to `IDEBuiltinLibrariesDir`
307307
308+
### Removed `utils.FeedStreamTo` and `utils.ConsumeStreamFrom`
309+
310+
`github.com/arduino/arduino-cli/arduino/utils.FeedStreamTo` and
311+
`github.com/arduino/arduino-cli/arduino/utils.ConsumeStreamFrom` are now private. They are mainly used internally for
312+
gRPC stream handling and are not suitable to be public API.
313+
308314
## 0.26.0
309315
310316
### `github.com/arduino/arduino-cli/commands.DownloadToolRelease`, and `InstallToolRelease` functions have been removed

0 commit comments

Comments
 (0)