diff --git a/embedder/embedder.go b/embedder/embedder.go index ffb1626d..fc2160bb 100644 --- a/embedder/embedder.go +++ b/embedder/embedder.go @@ -50,6 +50,7 @@ const ( ResultSuccess Result = C.kSuccess ResultInvalidLibraryVersion Result = C.kInvalidLibraryVersion ResultInvalidArguments Result = C.kInvalidArguments + ResultEngineNotRunning Result = -1 ) // FlutterEngine corresponds to the C.FlutterEngine with his associated callback's method. @@ -57,6 +58,9 @@ type FlutterEngine struct { // Flutter Engine. Engine C.FlutterEngine + // closed indicates if the engine has Shutdown + closed bool + sync sync.Mutex // index of the engine in the global flutterEngines slice index int @@ -125,6 +129,9 @@ func (flu *FlutterEngine) Run(userData unsafe.Pointer, vmArgs []string) Result { // Shutdown stops the Flutter engine. func (flu *FlutterEngine) Shutdown() Result { + flu.sync.Lock() + defer flu.sync.Unlock() + flu.closed = true res := C.FlutterEngineShutdown(flu.Engine) return (Result)(res) } @@ -226,6 +233,12 @@ func (p PlatformMessage) ExpectsResponse() bool { // SendPlatformMessage is used to send a PlatformMessage to the Flutter engine. func (flu *FlutterEngine) SendPlatformMessage(msg *PlatformMessage) Result { + flu.sync.Lock() + defer flu.sync.Unlock() + if flu.closed { + return ResultEngineNotRunning + } + cPlatformMessage := C.FlutterPlatformMessage{ channel: C.CString(msg.Channel), // TODO: who is responsible for free-ing this C alloc? And can they be diff --git a/plugin/event-channel.go b/plugin/event-channel.go new file mode 100644 index 00000000..b5017b66 --- /dev/null +++ b/plugin/event-channel.go @@ -0,0 +1,104 @@ +package plugin + +import ( + "fmt" + "runtime/debug" + + "github.com/pkg/errors" +) + +// EventChannel provides way for flutter applications and hosts to communicate +// using event streams. +// It must be used with a codec, for example the StandardMethodCodec. +type EventChannel struct { + messenger BinaryMessenger + channelName string + methodCodec MethodCodec + + handler StreamHandler + activeSink *EventSink +} + +// NewEventChannel creates a new event channel +func NewEventChannel(messenger BinaryMessenger, channelName string, methodCodec MethodCodec) (channel *EventChannel) { + ec := &EventChannel{ + messenger: messenger, + channelName: channelName, + methodCodec: methodCodec, + } + messenger.SetChannelHandler(channelName, ec.handleChannelMessage) + return ec +} + +// Handle registers a StreamHandler for a event channel. +// +// Consecutive calls override any existing handler registration. +// When given nil as handler, the previously registered +// handler for a method is unregistrered. +// +// When no handler is registered for a method, it will be handled silently by +// sending a nil reply which triggers the dart MissingPluginException exception. +func (e *EventChannel) Handle(handler StreamHandler) { + e.handler = handler +} + +// handleChannelMessage decodes incoming binary message to a method call, calls the +// handler, and encodes the outgoing reply. +func (e *EventChannel) handleChannelMessage(binaryMessage []byte, responseSender ResponseSender) (err error) { + methodCall, err := e.methodCodec.DecodeMethodCall(binaryMessage) + if err != nil { + return errors.Wrap(err, "failed to decode incomming message") + } + + if e.handler == nil { + fmt.Printf("go-flutter: no method handler registered for event channel '%s'\n", e.channelName) + responseSender.Send(nil) + return nil + } + + defer func() { + p := recover() + if p != nil { + fmt.Printf("go-flutter: recovered from panic while handling message for event channel '%s': %v\n", e.channelName, p) + debug.PrintStack() + } + }() + + switch methodCall.Method { + case "listen": + + binaryReply, err := e.methodCodec.EncodeSuccessEnvelope(nil) + if err != nil { + fmt.Printf("go-flutter: failed to encode listen envelope for event channel '%s', error: %v\n", e.channelName, err) + } + responseSender.Send(binaryReply) + + if e.activeSink != nil { + // Repeated calls to onListen may happen during hot restart. + // We separate them with a call to onCancel. + e.handler.OnCancel(nil) + } + + e.activeSink = &EventSink{eventChannel: e} + go e.handler.OnListen(methodCall.Arguments, e.activeSink) + + case "cancel": + if e.activeSink != nil { + e.activeSink = nil + go e.handler.OnCancel(methodCall.Arguments) + + binaryReply, _ := e.methodCodec.EncodeSuccessEnvelope(nil) + responseSender.Send(binaryReply) + } else { + fmt.Printf("go-flutter: No active stream to cancel onEventChannel '%s'\n", e.channelName) + binaryReply, _ := e.methodCodec.EncodeErrorEnvelope("error", "No active stream to cancel", nil) + responseSender.Send(binaryReply) + } + + default: + fmt.Printf("go-flutter: no StreamHandler handler registered for method '%s' on EventChannel '%s'\n", methodCall.Method, e.channelName) + responseSender.Send(nil) // MissingPluginException + } + + return nil +} diff --git a/plugin/event-channel_test.go b/plugin/event-channel_test.go deleted file mode 100644 index 90961cd0..00000000 --- a/plugin/event-channel_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package plugin - -// group('EventChannel', () { -// const MessageCodec jsonMessage = JSONMessageCodec(); -// const MethodCodec jsonMethod = JSONMethodCodec(); -// const EventChannel channel = EventChannel('ch', jsonMethod); -// void emitEvent(dynamic event) { -// BinaryMessages.handlePlatformMessage( -// 'ch', -// event, -// (ByteData reply) {}, -// ); -// } -// test('can receive event stream', () async { -// bool canceled = false; -// BinaryMessages.setMockMessageHandler( -// 'ch', -// (ByteData message) async { -// final Map methodCall = jsonMessage.decodeMessage(message); -// if (methodCall['method'] == 'listen') { -// final String argument = methodCall['args']; -// emitEvent(jsonMethod.encodeSuccessEnvelope(argument + '1')); -// emitEvent(jsonMethod.encodeSuccessEnvelope(argument + '2')); -// emitEvent(null); -// return jsonMethod.encodeSuccessEnvelope(null); -// } else if (methodCall['method'] == 'cancel') { -// canceled = true; -// return jsonMethod.encodeSuccessEnvelope(null); -// } else { -// fail('Expected listen or cancel'); -// } -// }, -// ); -// final List events = await channel.receiveBroadcastStream('hello').toList(); -// expect(events, orderedEquals(['hello1', 'hello2'])); -// await Future.delayed(Duration.zero); -// expect(canceled, isTrue); -// }); -// test('can receive error event', () async { -// BinaryMessages.setMockMessageHandler( -// 'ch', -// (ByteData message) async { -// final Map methodCall = jsonMessage.decodeMessage(message); -// if (methodCall['method'] == 'listen') { -// final String argument = methodCall['args']; -// emitEvent(jsonMethod.encodeErrorEnvelope(code: '404', message: 'Not Found.', details: argument)); -// return jsonMethod.encodeSuccessEnvelope(null); -// } else if (methodCall['method'] == 'cancel') { -// return jsonMethod.encodeSuccessEnvelope(null); -// } else { -// fail('Expected listen or cancel'); -// } -// }, -// ); -// final List events = []; -// final List errors = []; -// channel.receiveBroadcastStream('hello').listen(events.add, onError: errors.add); -// await Future.delayed(Duration.zero); -// expect(events, isEmpty); -// expect(errors, hasLength(1)); -// expect(errors[0], isInstanceOf()); -// final PlatformException error = errors[0]; -// expect(error.code, '404'); -// expect(error.message, 'Not Found.'); -// expect(error.details, 'hello'); -// }); -// }); diff --git a/plugin/event-sink.go b/plugin/event-sink.go new file mode 100644 index 00000000..1be8a0c0 --- /dev/null +++ b/plugin/event-sink.go @@ -0,0 +1,67 @@ +package plugin + +import ( + "fmt" + "sync" +) + +// StreamHandler defines the interface for a stream handler setup and tear-down +// requests. +type StreamHandler interface { + // OnListen handles a request to set up an event stream. + OnListen(arguments interface{}, sink *EventSink) + // OnCancel handles a request to tear down the most recently created event + // stream. + OnCancel(arguments interface{}) +} + +// EventSink defines the interface for producers of events to send message to +// Flutter. StreamHandler act as a clients of EventSink for sending events. +type EventSink struct { + eventChannel *EventChannel + + hasEnded bool + sync.Mutex +} + +// Success consumes a successful event. +func (es *EventSink) Success(event interface{}) { + es.Lock() + defer es.Unlock() + if es.hasEnded || es != es.eventChannel.activeSink { + return + } + + binaryMsg, err := es.eventChannel.methodCodec.EncodeSuccessEnvelope(event) + if err != nil { + fmt.Printf("go-flutter: failed to encode success envelope for event channel '%s', error: %v\n", es.eventChannel.channelName, err) + } + es.eventChannel.messenger.Send(es.eventChannel.channelName, binaryMsg) +} + +// Error consumes an error event. +func (es *EventSink) Error(errorCode string, errorMessage string, errorDetails interface{}) { + es.Lock() + defer es.Unlock() + if es.hasEnded || es != es.eventChannel.activeSink { + return + } + + binaryMsg, err := es.eventChannel.methodCodec.EncodeErrorEnvelope(errorCode, errorMessage, errorDetails) + if err != nil { + fmt.Printf("go-flutter: failed to encode success envelope for event channel '%s', error: %v\n", es.eventChannel.channelName, err) + } + es.eventChannel.messenger.Send(es.eventChannel.channelName, binaryMsg) +} + +// EndOfStream consumes end of stream. +func (es *EventSink) EndOfStream() { + es.Lock() + defer es.Unlock() + if es.hasEnded || es != es.eventChannel.activeSink { + return + } + es.hasEnded = true + + es.eventChannel.messenger.Send(es.eventChannel.channelName, nil) +}