diff --git a/README.md b/README.md index af853612..17474398 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,27 @@ main() async { ``` +**Adding Handlers within a Controller** + +```dart +import 'dart:async'; +import "package:angel_framework/angel_framework.dart"; +import "package:angel_websocket/server.dart"; + +@Expose("/") +class MyController extends Controller { + @override + Future call(AngelBase app) async { + var ws = app.container.make(AngelWebSocket); + ws.onConnection.listen((WebSocketContext socket) { + socket.on["message"].listen((WebSocketEvent e) { + socket.send("new_message", { "text": e.data["text"] }); + }); + }); + } +} +``` + **In the Browser** ```dart diff --git a/lib/browser.dart b/lib/browser.dart index 70c64bcc..69252e56 100644 --- a/lib/browser.dart +++ b/lib/browser.dart @@ -9,6 +9,7 @@ export 'package:angel_websocket/angel_websocket.dart'; class WebSocketClient extends Angel { WebSocket _socket; Map> _services = {}; + WebSocket get _underlyingSocket => _socket; WebSocketClient(String wsEndpoint) : super(wsEndpoint) { _socket = new WebSocket(wsEndpoint); @@ -67,6 +68,10 @@ class WebSocketClient extends Angel { }); } + void send(String eventName, data) { + _socket.send(JSON.encode({"eventName": eventName, "data": data})); + } + @override Service service(Pattern path, {Type type}) { var service = @@ -159,16 +164,11 @@ class WebSocketService extends Service { } _serialize(WebSocketAction action) { - var data = { - "id": action.id, - "eventName": action.eventName - }; + var data = {"id": action.id, "eventName": action.eventName}; - if (action.data != null) - data["data"] = action.data; + if (action.data != null) data["data"] = action.data; - if (action.params != null) - data["params"] = action.params; + if (action.params != null) data["params"] = action.params; return JSON.encode(data); } diff --git a/lib/cli.dart b/lib/cli.dart index 7802d4db..8d429e3c 100644 --- a/lib/cli.dart +++ b/lib/cli.dart @@ -11,6 +11,7 @@ export 'package:angel_websocket/angel_websocket.dart'; class WebSocketClient extends Angel { WebSocket _socket; Map> _services = {}; + WebSocket get underlyingSocket => _socket; WebSocketClient(String wsEndpoint) : super(wsEndpoint); @@ -70,6 +71,13 @@ class WebSocketClient extends Angel { _socket.listen(onData); } + void send(String eventName, data) { + _socket.add(JSON.encode({ + "eventName": eventName, + "data": data + })); + } + @override Service service(Pattern path, {Type type}) { var service = diff --git a/lib/server.dart b/lib/server.dart index 5a42b98b..1f309cc5 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -11,8 +11,6 @@ export 'angel_websocket.dart'; part 'websocket_context.dart'; -final AngelWebSocket websocket = new AngelWebSocket("/ws"); - class Realtime { const Realtime(); } @@ -20,9 +18,12 @@ class Realtime { class AngelWebSocket extends AngelPlugin { Angel _app; List _clients = []; + StreamController _onConnection = + new StreamController.broadcast(); List get clients => new List.from(_clients, growable: false); List servicesAlreadyWired = []; String endpoint; + Stream get onConnection => _onConnection.stream; AngelWebSocket(String this.endpoint); @@ -113,6 +114,7 @@ class AngelWebSocket extends AngelPlugin { onData(WebSocketContext socket, data) async { try { + socket._onData.add(data); var fromJson = JSON.decode(data); var action = new WebSocketAction( id: fromJson['id'], @@ -126,11 +128,31 @@ class AngelWebSocket extends AngelPlugin { throw new AngelHttpException.BadRequest(); } - var event = handleAction(action, socket); - if (event is Future) event = await event; + if (fromJson is Map && fromJson.containsKey("eventName")) { + socket._onAll.add(fromJson); + socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson); + } - if (event is WebSocketEvent) { - batchEvent(event); + if (action.eventName.contains("::")) { + var split = action.eventName.split("::"); + + if (split.length >= 2) { + if ([ + "index", + "read", + "create", + "modify", + "update", + "remove" + ].contains(split[1])) { + var event = handleAction(action, socket); + if (event is Future) event = await event; + + if (event is WebSocketEvent) { + batchEvent(event); + } + } + } } } catch (e) { // Send an error @@ -182,6 +204,7 @@ class AngelWebSocket extends AngelPlugin { var socket = new WebSocketContext(ws, req, res); await onConnect(socket); + _onConnection.add(socket); req.params['socket'] = socket; ws.listen((data) { diff --git a/lib/websocket_context.dart b/lib/websocket_context.dart index 82e88b91..8cdb2302 100644 --- a/lib/websocket_context.dart +++ b/lib/websocket_context.dart @@ -1,6 +1,11 @@ part of angel_websocket.server; class WebSocketContext { + StreamController _onAll = new StreamController.broadcast(); + StreamController _onData = new StreamController.broadcast(); + _WebSocketEventTable on = new _WebSocketEventTable(); + Stream get onAll => _onAll.stream; + Stream get onData => _onData.stream; WebSocket underlyingSocket; RequestContext requestContext; ResponseContext responseContext; @@ -15,3 +20,15 @@ class WebSocketContext { sendError(AngelHttpException error) => send("error", error); } + +class _WebSocketEventTable { + Map> _handlers = {}; + + StreamController _getStreamForEvent(eventName) { + if (!_handlers.containsKey(eventName)) + _handlers[eventName] = new StreamController.broadcast(); + return _handlers[eventName]; + } + + Stream operator [](String key) => _getStreamForEvent(key).stream; +} diff --git a/test/server.dart b/test/server.dart index f398198c..26d71334 100644 --- a/test/server.dart +++ b/test/server.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:io'; import 'package:angel_framework/angel_framework.dart' as server; import 'package:angel_websocket/cli.dart' as client; @@ -10,7 +11,9 @@ main() { server.Angel app; client.WebSocketClient clientApp; client.WebSocketService clientTodos; + Stream customEventStream; WebSocket socket; + AngelWebSocket webSocket = new AngelWebSocket("/ws"); setUp(() async { app = new server.Angel(); @@ -21,7 +24,18 @@ main() { .service("api/todos") .create(new Todo(text: "Clean your room", when: "now")); - await app.configure(websocket); + await app.configure(webSocket); + await app.configure((server.Angel app) async { + AngelWebSocket ws = app.container.make(AngelWebSocket); + + ws.onConnection.listen((WebSocketContext socket) { + socket.onData.listen((data) { + print("Data: $data"); + }); + + customEventStream = socket.on["custom"]; + }); + }); await app.configure(startTestServer); socket = await WebSocket.connect(app.properties["ws_url"]); @@ -36,8 +50,8 @@ main() { }); test("find all real-time services", () { - print(websocket.servicesAlreadyWired); - expect(websocket.servicesAlreadyWired, equals(["api/todos"])); + print(webSocket.servicesAlreadyWired); + expect(webSocket.servicesAlreadyWired, equals(["api/todos"])); }); test("index", () async { @@ -67,6 +81,15 @@ main() { expect(e.data.text, equals(todo.text)); expect(e.data.when, equals(todo.when)); }); + + test("custom event via controller", () async { + clientApp.send("custom", {"hello": "world"}); + + var data = await customEventStream.first; + + expect(data["eventName"], equals("custom")); + expect(data["data"]["hello"], equals("world")); + }); } @Realtime()