diff --git a/.idea/libraries/Dart_Packages.xml b/.idea/libraries/Dart_Packages.xml index c3dbf8d7..edbf2248 100644 --- a/.idea/libraries/Dart_Packages.xml +++ b/.idea/libraries/Dart_Packages.xml @@ -9,10 +9,17 @@ + + + + + + - @@ -321,7 +328,8 @@ - + + diff --git a/.idea/runConfigurations/All_Tests.xml b/.idea/runConfigurations/All_Tests.xml new file mode 100644 index 00000000..a824b209 --- /dev/null +++ b/.idea/runConfigurations/All_Tests.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Server_Tests.xml b/.idea/runConfigurations/Server_Tests.xml new file mode 100644 index 00000000..0e54f842 --- /dev/null +++ b/.idea/runConfigurations/Server_Tests.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/lib/angel_websocket.dart b/lib/angel_websocket.dart index dca59e7b..1690486f 100644 --- a/lib/angel_websocket.dart +++ b/lib/angel_websocket.dart @@ -1,11 +1,10 @@ library angel_websocket; class WebSocketEvent { - String id; String eventName; var data; - WebSocketEvent({String this.id, String this.eventName, this.data}); + WebSocketEvent({String this.eventName, this.data}); } class WebSocketAction { @@ -13,4 +12,6 @@ class WebSocketAction { String eventName; var data; var params; + + WebSocketAction({String this.id, String this.eventName, this.data, this.params}); } \ No newline at end of file diff --git a/lib/server.dart b/lib/server.dart index 5969d053..d0dbbaaf 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -2,121 +2,167 @@ library angel_websocket.server; import 'dart:async'; import 'dart:io'; +import 'dart:mirrors'; import 'package:angel_framework/angel_framework.dart'; import 'package:json_god/json_god.dart' as god; +import 'package:merge_map/merge_map.dart'; import 'package:uuid/uuid.dart'; import 'angel_websocket.dart'; -typedef Future WebSocketFilter(WebsocketContext context); +part 'websocket_context.dart'; -List _clients = []; -Uuid _uuid = new Uuid(); +final AngelWebSocket websocket = new AngelWebSocket("/ws"); -class WebsocketContext { - WebSocket socket; - RequestContext request; - ResponseContext response; - - WebsocketContext(WebSocket this.socket, RequestContext this.request, - ResponseContext this.response); +class Realtime { + const Realtime(); } -_broadcast(WebSocketEvent event) { - String json = god.serialize(event); - _clients.forEach((WebsocketContext client) { - client.socket.add(json); - }); -} +class AngelWebSocket { + Angel _app; + List _clients = []; + List servicesAlreadyWired = []; + String endpoint; -_onData(Angel app) { - return (data) { - try { - WebSocketAction action = god.deserialize( - data, outputType: WebSocketAction); + AngelWebSocket(String this.endpoint); - List split = action.eventName.split("::"); - - if (split.length >= 2) { - Service service = app.service(split[0]); - - if (service != null) { - String event = split[1]; - - if (event == "index") { - - } - } - } - } catch (e) { - - } - }; -} - -_onError(e) { - -} - -class websocket { - static Map filters = {}; - - call({List endPoints: const['/ws']}) { - return (Angel app) async { - for (Pattern endPoint in endPoints) { - app.all(endPoint, (RequestContext req, ResponseContext res) async { - if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) { - res.write("This endpoint is only accessible via WebSockets."); - res.end(); - } else { - res - ..willCloseItself = true - ..end(); - WebSocket socket = await WebSocketTransformer.upgrade( - req.underlyingRequest); - WebsocketContext context = new WebsocketContext(socket, req, res); - _clients.add(context); - - socket.listen(_onData(app), onError: _onError, onDone: () { - _clients.remove(context); - }); - } - }); - - app.services.forEach((Pattern path, Service service) { - if (service is HookedService) { - String pathName = (path is RegExp) ? path.pattern : path; - List dispatchers = [ - service.afterIndexed, - service.afterCreated, - service.afterRead, - service.afterModified, - service.afterUpdated, - service.afterRemoved - ]; - - for (HookedServiceEventDispatcher dispatcher in dispatchers) { - dispatcher.listen((HookedServiceEvent event) async { - bool canContinue = true; - String filterName = "$pathName::${event.eventName}"; - WebSocketFilter filter = filters[filterName]; - - for (WebsocketContext client in _clients) { - if (filter != null) - canContinue = await filter(client); - } - - if (canContinue) { - WebSocketEvent socketEvent = new WebSocketEvent( - id: _uuid.v4(), - eventName: filterName, - data: event.result); - _broadcast(socketEvent); - } - }); - } - } - }); - } + _batchEvent(String path) { + return (HookedServiceEvent e) async { + var event = await transformEvent(e); + event.eventName = "$path::${event.eventName}"; + await batchEvent(event); }; } -} \ No newline at end of file + + Future batchEvent(WebSocketEvent event) async { + // Default implementation will just immediately fire events + _clients.forEach((client) { + client.add(god.serialize(event)); + }); + } + + Future> getBatchedEvents() async => []; + + Future handleAction(WebSocketAction action, WebSocketContext socket) async { + var split = action.eventName.split("::"); + + if (split.length < 2) + return socket.sendError(new AngelHttpException.BadRequest()); + + var service = _app.service(split[0]); + + if (service == null) + return socket.sendError(new AngelHttpException.NotFound( + message: "No service \"${split[0]}\" exists.")); + + var eventName = split[1]; + + var params = mergeMap([ + god.deserializeDatum(action.params), + {"provider": Providers.WEBSOCKET} + ]); + try { + if (eventName == "index") { + return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED, + await service.index(params)); + } else if (eventName == "read") { + return socket.send("${split[0]}::" + HookedServiceEvent.READ, + await service.read(action.id, params)); + } else if (eventName == "create") { + return new WebSocketEvent( + eventName: "${split[0]}::" + HookedServiceEvent.CREATED, + data: await service.create(action.data, params)); + } else if (eventName == "modify") { + return new WebSocketEvent( + eventName: "${split[0]}::" + HookedServiceEvent.MODIFIED, + data: await service.modify(action.id, action.data, params)); + } else if (eventName == "update") { + return new WebSocketEvent( + eventName: "${split[0]}::" + HookedServiceEvent.UPDATED, + data: await service.update(action.id, action.data, params)); + } else if (eventName == "remove") { + return new WebSocketEvent( + eventName: "${split[0]}::" + HookedServiceEvent.REMOVED, + data: await service.remove(action.id, params)); + } else { + return socket.sendError(new AngelHttpException.MethodNotAllowed( + message: "Method Not Allowed: \"$eventName\"")); + } + } catch (e) { + if (e is AngelHttpException) return socket.sendError(e); + + return socket.sendError(new AngelHttpException(e)); + } + } + + hookupService(Pattern _path, HookedService service) { + String path = _path.toString(); + var batch = _batchEvent(path); + + service + ..afterCreated.listen(batch) + ..afterModified.listen(batch) + ..afterUpdated.listen(batch) + ..afterRemoved.listen(batch); + + servicesAlreadyWired.add(path); + } + + onData(WebSocketContext socket, data) { + try { + WebSocketAction action = + god.deserialize(data, outputType: WebSocketAction); + + if (action.eventName == null || + action.eventName is! String || + action.eventName.isEmpty) throw new AngelHttpException.BadRequest(); + + var event = handleAction(action, socket); + if (event is WebSocketEvent) { + batchEvent(event); + } + } catch (e) { + // Send an error + socket.sendError(new AngelHttpException(e)); + } + } + + Future transformEvent(HookedServiceEvent event) async { + return new WebSocketEvent(eventName: event.eventName, data: event.result); + } + + wireAllServices(Angel app) { + for (Pattern key in app.services.keys.where((x) { + return !servicesAlreadyWired.contains(x) && + app.services[x] is HookedService; + })) { + hookupService(key, app.services[key]); + } + } + + Future call(Angel app) async { + this._app = app; + + // Set up services + wireAllServices(app); + + app.onService.listen((_) { + wireAllServices(app); + }); + + app.get(endpoint, (RequestContext req, ResponseContext res) async { + if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) + throw new AngelHttpException.BadRequest(); + + var ws = await WebSocketTransformer.upgrade(req.underlyingRequest); + var socket = new WebSocketContext(ws, req, res); + + ws.listen((data) { + onData(socket, data); + }, onDone: () { + _clients.remove(ws); + }, onError: (e) { + _clients.remove(ws); + }, cancelOnError: true); + }); + } +} diff --git a/lib/server_old.dart b/lib/server_old.dart new file mode 100644 index 00000000..cf391caf --- /dev/null +++ b/lib/server_old.dart @@ -0,0 +1,120 @@ +library angel_websocket.server; + +import 'dart:async'; +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:json_god/json_god.dart' as god; +import 'package:uuid/uuid.dart'; +import 'angel_websocket.dart'; + +typedef Future WebSocketFilter(WebsocketContext context); + +List _clients = []; +Uuid _uuid = new Uuid(); + +class WebsocketContext { + WebSocket socket; + RequestContext request; + ResponseContext response; + + WebsocketContext(WebSocket this.socket, RequestContext this.request, + ResponseContext this.response); +} + +_broadcast(WebSocketEvent event) { + String json = god.serialize(event); + _clients.forEach((WebsocketContext client) { + client.socket.add(json); + }); +} + +_onData(Angel app) { + return (data) { + try { + WebSocketAction action = god.deserialize( + data, outputType: WebSocketAction); + + List split = action.eventName.split("::"); + + if (split.length >= 2) { + Service service = app.service(split[0]); + + if (service != null) { + String event = split[1]; + + if (event == "index") { + + } + } + } + } catch (e) { + + } + }; +} + +_onError(e) { + +} + +class websocket { + static Map filters = {}; + + call({List endPoints: const['/ws']}) { + return (Angel app) async { + for (Pattern endPoint in endPoints) { + app.all(endPoint, (RequestContext req, ResponseContext res) async { + if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) { + res.write("This endpoint is only accessible via WebSockets."); + res.end(); + } else { + res + ..willCloseItself = true + ..end(); + WebSocket socket = await WebSocketTransformer.upgrade( + req.underlyingRequest); + WebsocketContext context = new WebsocketContext(socket, req, res); + _clients.add(context); + + socket.listen(_onData(app), onError: _onError, onDone: () { + _clients.remove(context); + }); + } + }); + + app.services.forEach((Pattern path, Service service) { + if (service is HookedService) { + String pathName = (path is RegExp) ? path.pattern : path; + List dispatchers = [ + service.afterIndexed, + service.afterCreated, + service.afterRead, + service.afterModified, + service.afterUpdated, + service.afterRemoved + ]; + + for (HookedServiceEventDispatcher dispatcher in dispatchers) { + dispatcher.listen((HookedServiceEvent event) async { + bool canContinue = true; + String filterName = "$pathName::${event.eventName}"; + WebSocketFilter filter = filters[filterName]; + + for (WebsocketContext client in _clients) { + if (filter != null) + canContinue = await filter(client); + } + + if (canContinue) { + WebSocketEvent socketEvent = new WebSocketEvent(eventName: filterName, + data: event.result); + _broadcast(socketEvent); + } + }); + } + } + }); + } + }; + } +} \ No newline at end of file diff --git a/lib/websocket_context.dart b/lib/websocket_context.dart new file mode 100644 index 00000000..d9c92529 --- /dev/null +++ b/lib/websocket_context.dart @@ -0,0 +1,17 @@ +part of angel_websocket.server; + +class WebSocketContext { + WebSocket underlyingSocket; + RequestContext requestContext; + ResponseContext responseContext; + + WebSocketContext(WebSocket this.underlyingSocket, + RequestContext this.requestContext, ResponseContext this.responseContext); + + send(String eventName, data) { + underlyingSocket.add( + god.serialize(new WebSocketEvent(eventName: eventName, data: data))); + } + + sendError(AngelHttpException error) => send("error", error); +} \ No newline at end of file diff --git a/pubspec.yaml b/pubspec.yaml index 35334229..c8b5abde 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -4,6 +4,7 @@ version: 1.0.0-dev author: thosakwe homepage: https://github.com/angel-dart/angel_websocket dependencies: + angel_client: ">=1.0.0-dev <2.0.0" angel_framework: ">=1.0.0-dev < 2.0.0" json_god: ">=2.0.0-beta <3.0.0" jwt: ">=0.1.4 <1.0.0" diff --git a/test/all_tests.dart b/test/all_tests.dart new file mode 100644 index 00000000..b9d49b66 --- /dev/null +++ b/test/all_tests.dart @@ -0,0 +1,6 @@ +import 'package:test/test.dart'; +import 'server.dart' as server; + +main() async { + group("server", server.main); +} \ No newline at end of file diff --git a/test/common.dart b/test/common.dart new file mode 100644 index 00000000..cb09c5c0 --- /dev/null +++ b/test/common.dart @@ -0,0 +1,20 @@ +import 'dart:async'; +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/defs.dart'; + +class Todo extends MemoryModel { + String text; + String when; + + Todo({String this.text, String this.when}); +} + +Future startTestServer(Angel app) async { + var host = InternetAddress.LOOPBACK_IP_V4; + var port = 3000; + + await app.startServer(host, port); + app.properties["ws_url"] = "ws://${host.address}:$port/ws"; + print("Test server listening on ${host.address}:$port"); +} diff --git a/test/server.dart b/test/server.dart new file mode 100644 index 00000000..e51fb462 --- /dev/null +++ b/test/server.dart @@ -0,0 +1,44 @@ +import 'dart:async'; +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_websocket/angel_websocket.dart'; +import 'package:angel_websocket/server.dart'; +import 'package:json_god/json_god.dart' as god; +import 'package:test/test.dart'; +import 'common.dart'; + +main() { + Angel app; + WebSocket socket; + + setUp(() async { + app = new Angel(); + + app.use("/real", new FakeService(), hooked: false); + app.use("/api/todos", new MemoryService()); + + await app.configure(websocket); + await app.configure(startTestServer); + + socket = await WebSocket.connect(app.properties["ws_url"]); + }); + + tearDown(() async { + await app.httpServer.close(force: true); + }); + + test("find all real-time services", () { + print(websocket.servicesAlreadyWired); + expect(websocket.servicesAlreadyWired, equals(["api/todos"])); + }); + + test("index", () async { + var action = new WebSocketAction(eventName: "api/todos::index"); + socket.add(god.serialize(action)); + + print(await socket.first); + }); +} + +@Realtime() +class FakeService extends Service {} \ No newline at end of file