library angel_websocket.server; import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:mirrors'; import 'package:angel_framework/angel_framework.dart'; import 'package:json_god/json_god.dart' as god; import 'package:merge_map/merge_map.dart'; import 'angel_websocket.dart'; export 'angel_websocket.dart'; part 'websocket_context.dart'; part 'websocket_controller.dart'; class AngelWebSocket extends AngelPlugin { Angel _app; List _clients = []; StreamController _onConnection = new StreamController(); StreamController _onDisconnect = new StreamController(); final List _servicesAlreadyWired = []; /// A list of clients currently connected to this server via WebSockets. List get clients => new List.unmodifiable(_clients); /// Services that have already been hooked to fire socket events. List get servicesAlreadyWired => new List.unmodifiable(_servicesAlreadyWired); final String endpoint; /// Fired on incoming connections. Stream get onConnection => _onConnection.stream; /// Fired when a user disconnects. Stream get onDisconnection => _onDisconnect.stream; AngelWebSocket(String this.endpoint); _batchEvent(String path) { return (HookedServiceEvent e) async { var event = await transformEvent(e); event.eventName = "$path::${event.eventName}"; await batchEvent(event); }; } Future batchEvent(WebSocketEvent event) async { // Default implementation will just immediately fire events _clients.forEach((client) { client.add(god.serialize(event)); }); } Future> getBatchedEvents() async => []; Future handleAction(WebSocketAction action, WebSocketContext socket) async { var split = action.eventName.split("::"); if (split.length < 2) return socket.sendError(new AngelHttpException.BadRequest()); var service = _app.service(split[0]); if (service == null) return socket.sendError(new AngelHttpException.NotFound( message: "No service \"${split[0]}\" exists.")); var eventName = split[1]; var params = mergeMap([ god.deserializeDatum(action.params), {"provider": Providers.WEBSOCKET} ]); try { if (eventName == "index") { return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED, await service.index(params)); } else if (eventName == "read") { return socket.send("${split[0]}::" + HookedServiceEvent.READ, await service.read(action.id, params)); } else if (eventName == "create") { return new WebSocketEvent( eventName: "${split[0]}::" + HookedServiceEvent.CREATED, data: await service.create(action.data, params)); } else if (eventName == "modify") { return new WebSocketEvent( eventName: "${split[0]}::" + HookedServiceEvent.MODIFIED, data: await service.modify(action.id, action.data, params)); } else if (eventName == "update") { return new WebSocketEvent( eventName: "${split[0]}::" + HookedServiceEvent.UPDATED, data: await service.update(action.id, action.data, params)); } else if (eventName == "remove") { return new WebSocketEvent( eventName: "${split[0]}::" + HookedServiceEvent.REMOVED, data: await service.remove(action.id, params)); } else { return socket.sendError(new AngelHttpException.MethodNotAllowed( message: "Method Not Allowed: \"$eventName\"")); } } catch (e) { if (e is AngelHttpException) return socket.sendError(e); return socket.sendError(new AngelHttpException(e)); } } hookupService(Pattern _path, HookedService service) { String path = _path.toString(); var batch = _batchEvent(path); service ..afterCreated.listen(batch) ..afterModified.listen(batch) ..afterUpdated.listen(batch) ..afterRemoved.listen(batch); _servicesAlreadyWired.add(path); } Future onConnect(WebSocketContext socket) async {} onData(WebSocketContext socket, data) async { try { socket._onData.add(data); var fromJson = JSON.decode(data); var action = new WebSocketAction( id: fromJson['id'], eventName: fromJson['eventName'], data: fromJson['data'], params: fromJson['params']); if (action.eventName == null || action.eventName is! String || action.eventName.isEmpty) { throw new AngelHttpException.BadRequest(); } if (fromJson is Map && fromJson.containsKey("eventName")) { socket._onAll.add(fromJson); socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson["data"]); } if (action.eventName.contains("::")) { var split = action.eventName.split("::"); if (split.length >= 2) { if ([ "index", "read", "create", "modify", "update", "remove" ].contains(split[1])) { var event = handleAction(action, socket); if (event is Future) event = await event; if (event is WebSocketEvent) { batchEvent(event); } } } } } catch (e) { // Send an error if (e is AngelHttpException) socket.sendError(e); else socket.sendError(new AngelHttpException(e)); } } Future transformEvent(HookedServiceEvent event) async { return new WebSocketEvent(eventName: event.eventName, data: event.result); } wireAllServices(Angel app) { for (Pattern key in app.services.keys.where((x) { return !_servicesAlreadyWired.contains(x) && app.services[x] is HookedService; })) { hookupService(key, app.services[key]); } } @override Future call(Angel app) async { this._app = app..container.singleton(this); if (runtimeType != AngelWebSocket) app.container.singleton(this, as: AngelWebSocket); // Set up services wireAllServices(app); app.onService.listen((_) { wireAllServices(app); }); app.get(endpoint, (RequestContext req, ResponseContext res) async { if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) throw new AngelHttpException.BadRequest(); res ..willCloseItself = true ..end(); var ws = await WebSocketTransformer.upgrade(req.underlyingRequest); _clients.add(ws); var socket = new WebSocketContext(ws, req, res); await onConnect(socket); _onConnection.add(socket); req.params['socket'] = socket; ws.listen((data) { onData(socket, data); }, onDone: () { _onDisconnect.add(socket); _clients.remove(ws); }, onError: (e) { _onDisconnect.add(socket); _clients.remove(ws); }, cancelOnError: true); }); } }