Skip to content

Commit 632cb1c

Browse files
authored
feat: EventChannel (#178)
* feat: EventChannel * hot-restart compliant * feat: add custom return type (ResultEngineNotRunning) * review: spelling & and formatting & delete test
1 parent 897675a commit 632cb1c

File tree

4 files changed

+184
-67
lines changed

4 files changed

+184
-67
lines changed

embedder/embedder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ const (
5050
ResultSuccess Result = C.kSuccess
5151
ResultInvalidLibraryVersion Result = C.kInvalidLibraryVersion
5252
ResultInvalidArguments Result = C.kInvalidArguments
53+
ResultEngineNotRunning Result = -1
5354
)
5455

5556
// FlutterEngine corresponds to the C.FlutterEngine with his associated callback's method.
5657
type FlutterEngine struct {
5758
// Flutter Engine.
5859
Engine C.FlutterEngine
5960

61+
// closed indicates if the engine has Shutdown
62+
closed bool
63+
sync sync.Mutex
6064
// index of the engine in the global flutterEngines slice
6165
index int
6266

@@ -125,6 +129,9 @@ func (flu *FlutterEngine) Run(userData unsafe.Pointer, vmArgs []string) Result {
125129

126130
// Shutdown stops the Flutter engine.
127131
func (flu *FlutterEngine) Shutdown() Result {
132+
flu.sync.Lock()
133+
defer flu.sync.Unlock()
134+
flu.closed = true
128135
res := C.FlutterEngineShutdown(flu.Engine)
129136
return (Result)(res)
130137
}
@@ -248,6 +255,12 @@ func (p PlatformMessage) ExpectsResponse() bool {
248255

249256
// SendPlatformMessage is used to send a PlatformMessage to the Flutter engine.
250257
func (flu *FlutterEngine) SendPlatformMessage(msg *PlatformMessage) Result {
258+
flu.sync.Lock()
259+
defer flu.sync.Unlock()
260+
if flu.closed {
261+
return ResultEngineNotRunning
262+
}
263+
251264
cPlatformMessage := C.FlutterPlatformMessage{
252265
channel: C.CString(msg.Channel),
253266
// TODO: who is responsible for free-ing this C alloc? And can they be

plugin/event-channel.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package plugin
2+
3+
import (
4+
"fmt"
5+
"runtime/debug"
6+
7+
"github.com/pkg/errors"
8+
)
9+
10+
// EventChannel provides way for flutter applications and hosts to communicate
11+
// using event streams.
12+
// It must be used with a codec, for example the StandardMethodCodec.
13+
type EventChannel struct {
14+
messenger BinaryMessenger
15+
channelName string
16+
methodCodec MethodCodec
17+
18+
handler StreamHandler
19+
activeSink *EventSink
20+
}
21+
22+
// NewEventChannel creates a new event channel
23+
func NewEventChannel(messenger BinaryMessenger, channelName string, methodCodec MethodCodec) (channel *EventChannel) {
24+
ec := &EventChannel{
25+
messenger: messenger,
26+
channelName: channelName,
27+
methodCodec: methodCodec,
28+
}
29+
messenger.SetChannelHandler(channelName, ec.handleChannelMessage)
30+
return ec
31+
}
32+
33+
// Handle registers a StreamHandler for a event channel.
34+
//
35+
// Consecutive calls override any existing handler registration.
36+
// When given nil as handler, the previously registered
37+
// handler for a method is unregistrered.
38+
//
39+
// When no handler is registered for a method, it will be handled silently by
40+
// sending a nil reply which triggers the dart MissingPluginException exception.
41+
func (e *EventChannel) Handle(handler StreamHandler) {
42+
e.handler = handler
43+
}
44+
45+
// handleChannelMessage decodes incoming binary message to a method call, calls the
46+
// handler, and encodes the outgoing reply.
47+
func (e *EventChannel) handleChannelMessage(binaryMessage []byte, responseSender ResponseSender) (err error) {
48+
methodCall, err := e.methodCodec.DecodeMethodCall(binaryMessage)
49+
if err != nil {
50+
return errors.Wrap(err, "failed to decode incomming message")
51+
}
52+
53+
if e.handler == nil {
54+
fmt.Printf("go-flutter: no method handler registered for event channel '%s'\n", e.channelName)
55+
responseSender.Send(nil)
56+
return nil
57+
}
58+
59+
defer func() {
60+
p := recover()
61+
if p != nil {
62+
fmt.Printf("go-flutter: recovered from panic while handling message for event channel '%s': %v\n", e.channelName, p)
63+
debug.PrintStack()
64+
}
65+
}()
66+
67+
switch methodCall.Method {
68+
case "listen":
69+
70+
binaryReply, err := e.methodCodec.EncodeSuccessEnvelope(nil)
71+
if err != nil {
72+
fmt.Printf("go-flutter: failed to encode listen envelope for event channel '%s', error: %v\n", e.channelName, err)
73+
}
74+
responseSender.Send(binaryReply)
75+
76+
if e.activeSink != nil {
77+
// Repeated calls to onListen may happen during hot restart.
78+
// We separate them with a call to onCancel.
79+
e.handler.OnCancel(nil)
80+
}
81+
82+
e.activeSink = &EventSink{eventChannel: e}
83+
go e.handler.OnListen(methodCall.Arguments, e.activeSink)
84+
85+
case "cancel":
86+
if e.activeSink != nil {
87+
e.activeSink = nil
88+
go e.handler.OnCancel(methodCall.Arguments)
89+
90+
binaryReply, _ := e.methodCodec.EncodeSuccessEnvelope(nil)
91+
responseSender.Send(binaryReply)
92+
} else {
93+
fmt.Printf("go-flutter: No active stream to cancel onEventChannel '%s'\n", e.channelName)
94+
binaryReply, _ := e.methodCodec.EncodeErrorEnvelope("error", "No active stream to cancel", nil)
95+
responseSender.Send(binaryReply)
96+
}
97+
98+
default:
99+
fmt.Printf("go-flutter: no StreamHandler handler registered for method '%s' on EventChannel '%s'\n", methodCall.Method, e.channelName)
100+
responseSender.Send(nil) // MissingPluginException
101+
}
102+
103+
return nil
104+
}

plugin/event-channel_test.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

plugin/event-sink.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package plugin
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
)
7+
8+
// StreamHandler defines the interface for a stream handler setup and tear-down
9+
// requests.
10+
type StreamHandler interface {
11+
// OnListen handles a request to set up an event stream.
12+
OnListen(arguments interface{}, sink *EventSink)
13+
// OnCancel handles a request to tear down the most recently created event
14+
// stream.
15+
OnCancel(arguments interface{})
16+
}
17+
18+
// EventSink defines the interface for producers of events to send message to
19+
// Flutter. StreamHandler act as a clients of EventSink for sending events.
20+
type EventSink struct {
21+
eventChannel *EventChannel
22+
23+
hasEnded bool
24+
sync.Mutex
25+
}
26+
27+
// Success consumes a successful event.
28+
func (es *EventSink) Success(event interface{}) {
29+
es.Lock()
30+
defer es.Unlock()
31+
if es.hasEnded || es != es.eventChannel.activeSink {
32+
return
33+
}
34+
35+
binaryMsg, err := es.eventChannel.methodCodec.EncodeSuccessEnvelope(event)
36+
if err != nil {
37+
fmt.Printf("go-flutter: failed to encode success envelope for event channel '%s', error: %v\n", es.eventChannel.channelName, err)
38+
}
39+
es.eventChannel.messenger.Send(es.eventChannel.channelName, binaryMsg)
40+
}
41+
42+
// Error consumes an error event.
43+
func (es *EventSink) Error(errorCode string, errorMessage string, errorDetails interface{}) {
44+
es.Lock()
45+
defer es.Unlock()
46+
if es.hasEnded || es != es.eventChannel.activeSink {
47+
return
48+
}
49+
50+
binaryMsg, err := es.eventChannel.methodCodec.EncodeErrorEnvelope(errorCode, errorMessage, errorDetails)
51+
if err != nil {
52+
fmt.Printf("go-flutter: failed to encode success envelope for event channel '%s', error: %v\n", es.eventChannel.channelName, err)
53+
}
54+
es.eventChannel.messenger.Send(es.eventChannel.channelName, binaryMsg)
55+
}
56+
57+
// EndOfStream consumes end of stream.
58+
func (es *EventSink) EndOfStream() {
59+
es.Lock()
60+
defer es.Unlock()
61+
if es.hasEnded || es != es.eventChannel.activeSink {
62+
return
63+
}
64+
es.hasEnded = true
65+
66+
es.eventChannel.messenger.Send(es.eventChannel.channelName, nil)
67+
}

0 commit comments

Comments
 (0)