From 00b681f04b22337f77bbf8140f937e2ce7891e94 Mon Sep 17 00:00:00 2001 From: thosakwe Date: Sat, 17 Sep 2016 22:53:58 -0400 Subject: [PATCH] Client-side 'on' not working :( --- README.md | 19 ++++--- lib/cli.dart | 89 +++++++++++++++++------------ lib/server.dart | 8 +-- lib/websocket_controller.dart | 103 ++++++++++++++++++++++++++++++++++ test/server.dart | 31 ++++++++-- 5 files changed, 196 insertions(+), 54 deletions(-) create mode 100644 lib/websocket_controller.dart diff --git a/README.md b/README.md index 17474398..5dcef69e 100644 --- a/README.md +++ b/README.md @@ -27,21 +27,24 @@ main() async { **Adding Handlers within a Controller** +`WebSocketController` extends a normal `Controller`, but also listens to WebSockets. + ```dart import 'dart:async'; import "package:angel_framework/angel_framework.dart"; import "package:angel_websocket/server.dart"; @Expose("/") -class MyController extends Controller { +class MyController extends WebSocketController { @override - Future call(AngelBase app) async { - var ws = app.container.make(AngelWebSocket); - ws.onConnection.listen((WebSocketContext socket) { - socket.on["message"].listen((WebSocketEvent e) { - socket.send("new_message", { "text": e.data["text"] }); - }); - }); + void onConnect(WebSocketContext socket) { + // On connect... + } + + // Dependency injection works, too.. + @ExposeWs("read_message") + void sendMessage(WebSocketContext socket, Db db) async { + socket.send("found_message", db.collection("messages").findOne(where.id("..."))); } } ``` diff --git a/lib/cli.dart b/lib/cli.dart index 8d429e3c..ec48b16e 100644 --- a/lib/cli.dart +++ b/lib/cli.dart @@ -12,13 +12,16 @@ class WebSocketClient extends Angel { WebSocket _socket; Map> _services = {}; WebSocket get underlyingSocket => _socket; + _WebSocketEventTable on = new _WebSocketEventTable(); WebSocketClient(String wsEndpoint) : super(wsEndpoint); - onData(data) { + onData(data) async { var fromJson = JSON.decode(data); + print("a: $fromJson"); var e = new WebSocketEvent( eventName: fromJson['eventName'], data: fromJson['data']); + print("b: $e"); var split = e.eventName.split("::"); var serviceName = split[0]; var services = _services[serviceName]; @@ -30,37 +33,41 @@ class WebSocketClient extends Angel { exc.errors = exc.errors ?? []; exc.errors.addAll(e.data['errors'] ?? []); throw exc; - } else if (services != null) { - e.eventName = split[1]; + } else { + on._getStreamForEvent(serviceName).add(e.data); - for (WebSocketService service in services) { - service._onAllEvents.add(e); - switch (e.eventName) { - case srv.HookedServiceEvent.INDEXED: - service._onIndexed.add(e); - break; - case srv.HookedServiceEvent.READ: - service._onRead.add(e); - break; - case srv.HookedServiceEvent.CREATED: - service._onCreated.add(e); - break; - case srv.HookedServiceEvent.MODIFIED: - service._onModified.add(e); - break; - case srv.HookedServiceEvent.UPDATED: - service._onUpdated.add(e); - break; - case srv.HookedServiceEvent.REMOVED: - service._onRemoved.add(e); - break; - case "error": - service._onError.add(e); - break; - default: - if (service._on._events.containsKey(e.eventName)) - service._on._events[e.eventName].add(e); - break; + if (services != null) { + e.eventName = split[1]; + + for (WebSocketService service in services) { + service._onAllEvents.add(e); + switch (e.eventName) { + case srv.HookedServiceEvent.INDEXED: + service._onIndexed.add(e); + break; + case srv.HookedServiceEvent.READ: + service._onRead.add(e); + break; + case srv.HookedServiceEvent.CREATED: + service._onCreated.add(e); + break; + case srv.HookedServiceEvent.MODIFIED: + service._onModified.add(e); + break; + case srv.HookedServiceEvent.UPDATED: + service._onUpdated.add(e); + break; + case srv.HookedServiceEvent.REMOVED: + service._onRemoved.add(e); + break; + case "error": + service._onError.add(e); + break; + default: + if (service._on._events.containsKey(e.eventName)) + service._on._events[e.eventName].add(e); + break; + } } } } @@ -72,10 +79,7 @@ class WebSocketClient extends Angel { } void send(String eventName, data) { - _socket.add(JSON.encode({ - "eventName": eventName, - "data": data - })); + _socket.add(JSON.encode({"eventName": eventName, "data": data})); } @override @@ -112,7 +116,8 @@ class _WebSocketServiceTransformer stream.listen((WebSocketEvent e) { if (_outputType != null && e.eventName != "error") - e.data = god.deserialize(god.serialize(e.data), outputType: _outputType); + e.data = + god.deserialize(god.serialize(e.data), outputType: _outputType); _stream.add(e); }); @@ -205,3 +210,15 @@ class WebSocketService extends Service { eventName: "$_path::remove", id: id, params: params))); } } + +class _WebSocketEventTable { + Map> _handlers = {}; + + StreamController _getStreamForEvent(eventName) { + if (!_handlers.containsKey(eventName)) + _handlers[eventName] = new StreamController.broadcast(); + return _handlers[eventName]; + } + + Stream operator [](String key) => _getStreamForEvent(key).stream; +} diff --git a/lib/server.dart b/lib/server.dart index 1f309cc5..1dd208b7 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -3,6 +3,7 @@ library angel_websocket.server; import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:mirrors'; import 'package:angel_framework/angel_framework.dart'; import 'package:json_god/json_god.dart' as god; import 'package:merge_map/merge_map.dart'; @@ -10,10 +11,7 @@ import 'angel_websocket.dart'; export 'angel_websocket.dart'; part 'websocket_context.dart'; - -class Realtime { - const Realtime(); -} +part 'websocket_controller.dart'; class AngelWebSocket extends AngelPlugin { Angel _app; @@ -130,7 +128,7 @@ class AngelWebSocket extends AngelPlugin { if (fromJson is Map && fromJson.containsKey("eventName")) { socket._onAll.add(fromJson); - socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson); + socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson["data"]); } if (action.eventName.contains("::")) { diff --git a/lib/websocket_controller.dart b/lib/websocket_controller.dart new file mode 100644 index 00000000..702831d7 --- /dev/null +++ b/lib/websocket_controller.dart @@ -0,0 +1,103 @@ +part of angel_websocket.server; + +class ExposeWs { + final String eventName; + + const ExposeWs(this.eventName); +} + +class WebSocketController extends Controller { + Map _handlers = {}; + Map _handlerSymbols = {}; + InstanceMirror _instanceMirror; + AngelWebSocket ws; + + WebSocketController():super() { + _instanceMirror = reflect(this); + } + + @override + Future call(Angel app) async { + await super.call(app); + + ClassMirror classMirror = reflectClass(this.runtimeType); + classMirror.instanceMembers.forEach((sym, mirror) { + if (mirror.isRegularMethod) { + InstanceMirror exposeMirror = mirror.metadata.firstWhere( + (mirror) => mirror.reflectee is ExposeWs, + orElse: () => null); + + if (exposeMirror != null) { + ExposeWs exposeWs = exposeMirror.reflectee; + _handlers[exposeWs.eventName] = mirror; + _handlerSymbols[exposeWs.eventName] = sym; + } + } + }); + + AngelWebSocket ws = app.container.make(AngelWebSocket); + + ws.onConnection.listen((socket) async { + await onConnect(socket); + + socket.onData.listen(onData); + + socket.onAll.listen((Map data) async { + await onAllEvents(data); + + if (_handlers.containsKey(data["eventName"])) { + var methodMirror = _handlers[data["eventName"]]; + try { + // Load parameters, and execute + List args = []; + + for (int i = 0; i < methodMirror.parameters.length; i++) { + ParameterMirror parameter = methodMirror.parameters[i]; + String name = MirrorSystem.getName(parameter.simpleName); + + if (parameter.type.reflectedType == RequestContext || + name == "req") + args.add(socket.requestContext); + else if (parameter.type.reflectedType == ResponseContext || + name == "res") + args.add(socket.responseContext); + else if (parameter.type == AngelWebSocket) + args.add(socket); + else { + if (socket.requestContext.params.containsKey(name)) { + args.add(socket.requestContext.params[name]); + } else { + try { + args.add(app.container.make(parameter.type.reflectedType)); + continue; + } catch (e) { + throw new AngelHttpException.BadRequest( + message: "Missing parameter '$name'"); + } + } + } + } + + await _instanceMirror.invoke(_handlerSymbols[data["eventName"]], args); + } catch (e) { + // Send an error + if (e is AngelHttpException) + socket.sendError(e); + else + socket.sendError(new AngelHttpException(e)); + } + } + }); + }); + } + + void broadcast(String eventName, data) { + ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data)); + } + + Future onConnect(WebSocketContext socket) async {} + + Future onAllEvents(Map data) async {} + + void onData(data) {} +} diff --git a/test/server.dart b/test/server.dart index 26d71334..79abb9ec 100644 --- a/test/server.dart +++ b/test/server.dart @@ -12,6 +12,7 @@ main() { client.WebSocketClient clientApp; client.WebSocketService clientTodos; Stream customEventStream; + Stream customEventStream2; WebSocket socket; AngelWebSocket webSocket = new AngelWebSocket("/ws"); @@ -36,11 +37,13 @@ main() { customEventStream = socket.on["custom"]; }); }); + await app.configure(new Custom2Controller()); await app.configure(startTestServer); socket = await WebSocket.connect(app.properties["ws_url"]); clientApp = new client.WebSocketClient(app.properties["ws_url"]); await clientApp.connect(); + customEventStream2 = clientApp.on["custom2"]; clientTodos = clientApp.service("api/todos", type: Todo); }); @@ -61,8 +64,7 @@ main() { String json = await socket.first; print(json); - WebSocketEvent e = - god.deserialize(json, outputType: WebSocketEvent); + WebSocketEvent e = god.deserialize(json, outputType: WebSocketEvent); expect(e.eventName, equals("api/todos::indexed")); expect(e.data[0]["when"], equals("now")); }); @@ -87,10 +89,29 @@ main() { var data = await customEventStream.first; - expect(data["eventName"], equals("custom")); - expect(data["data"]["hello"], equals("world")); + expect(data["hello"], equals("world")); + }); + + test("custom event via ws controller", () async { + clientApp.send("custom2", {"hello": "world"}); + + var data = customEventStream2.first; + print("Received data from server: $data"); }); } -@Realtime() class FakeService extends server.Service {} + +@server.Expose("/custom2") +class Custom2Controller extends WebSocketController { + @override + Future onConnect(WebSocketContext socket) async { + print( + "Got a WS connection from session #${socket.requestContext.session.id}!"); + } + + @ExposeWs("custom2") + void sayFoo(WebSocketContext socket, server.RequestContext req, AngelWebSocket ws) { + socket.send("custom2", {"franken": "stein"}); + } +}