From 3c01c4b3609d95d9d7a576355711491ca6f8dd03 Mon Sep 17 00:00:00 2001 From: Tobe O Date: Sun, 24 Sep 2017 00:37:58 -0400 Subject: [PATCH] 1.1.0-alpha --- README.md | 9 +- ...ysis-options.yaml => analysis_options.yaml | 0 lib/base_websocket_client.dart | 6 +- lib/browser.dart | 22 +- lib/flutter.dart | 2 - lib/io.dart | 4 +- lib/server.dart | 205 ++++++++---------- lib/websocket_controller.dart | 11 +- pubspec.yaml | 10 +- test/auth_test.dart | 24 +- test/controller/io_test.dart | 9 +- test/service/io_test.dart | 9 +- 12 files changed, 143 insertions(+), 168 deletions(-) rename .analysis-options.yaml => analysis_options.yaml (100%) diff --git a/README.md b/README.md index 646d0d69..78ec9696 100644 --- a/README.md +++ b/README.md @@ -23,8 +23,13 @@ import "package:angel_websocket/server.dart"; main() async { var app = new Angel(); - // Ensure this runs after all our services are in-place - app.justBeforeStart.add(new AngelWebSocket("/ws")); + var ws = new AngelWebSocket(); + + // Apply configuration + await app.configure(ws.configureServer); + + // Listen for requests at `/ws`. + app.all('/ws', ws.handleRequest); } ``` diff --git a/.analysis-options.yaml b/analysis_options.yaml similarity index 100% rename from .analysis-options.yaml rename to analysis_options.yaml diff --git a/lib/base_websocket_client.dart b/lib/base_websocket_client.dart index 8711a33a..a44780b1 100644 --- a/lib/base_websocket_client.dart +++ b/lib/base_websocket_client.dart @@ -2,12 +2,12 @@ import 'dart:async'; import 'dart:collection'; import 'dart:convert'; import 'package:angel_client/angel_client.dart'; +import 'package:angel_client/base_angel_client.dart'; +import 'package:angel_http_exception/angel_http_exception.dart'; import 'package:http/src/base_client.dart' as http; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/status.dart' as status; import 'angel_websocket.dart'; -export 'package:angel_client/angel_client.dart'; -import 'package:angel_client/base_angel_client.dart'; final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); @@ -133,7 +133,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient { Future getConnectedWebSocket(); @override - WebSocketsService service(String path, + WebSocketsService service(String path, {Type type, AngelDeserializer deserializer}) { String uri = path.toString().replaceAll(_straySlashes, ''); return new WebSocketsService(socket, this, uri, diff --git a/lib/browser.dart b/lib/browser.dart index c0598cbd..f1380051 100644 --- a/lib/browser.dart +++ b/lib/browser.dart @@ -4,11 +4,11 @@ library angel_websocket.browser; import 'dart:async'; import 'dart:html'; import 'package:angel_client/angel_client.dart'; +import 'package:angel_http_exception/angel_http_exception.dart'; import 'package:http/browser_client.dart' as http; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/html.dart'; import 'base_websocket_client.dart'; -export 'package:angel_client/angel_client.dart'; export 'angel_websocket.dart'; final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); @@ -41,7 +41,7 @@ class WebSockets extends BaseWebSocketClient { if (wnd.closed) { ctrl.addError(new AngelHttpException.notAuthenticated( message: - errorMessage ?? 'Authentication via popup window failed.')); + errorMessage ?? 'Authentication via popup window failed.')); ctrl.close(); timer.cancel(); sub?.cancel(); @@ -50,9 +50,9 @@ class WebSockets extends BaseWebSocketClient { timer.cancel(); }); - sub = window.on[eventName ?? 'token'].listen((CustomEvent e) { + sub = window.on[eventName ?? 'token'].listen((e) { if (!ctrl.isClosed) { - ctrl.add(e.detail); + ctrl.add((e as CustomEvent).detail); t.cancel(); ctrl.close(); sub.cancel(); @@ -64,7 +64,9 @@ class WebSockets extends BaseWebSocketClient { @override Future getConnectedWebSocket() { - var socket = new WebSocket(authToken?.isNotEmpty == true ? '$basePath?token=$authToken' : basePath ); + var socket = new WebSocket(authToken?.isNotEmpty == true + ? '$basePath?token=$authToken' + : basePath); var completer = new Completer(); socket @@ -72,18 +74,20 @@ class WebSockets extends BaseWebSocketClient { if (!completer.isCompleted) return completer.complete(new HtmlWebSocketChannel(socket)); }) - ..onError.listen((ErrorEvent e) { - if (!completer.isCompleted) return completer.completeError(e.error); + ..onError.listen((e) { + var err = e as ErrorEvent; + if (!completer.isCompleted) return completer.completeError(err.error); }); return completer.future; } @override - BrowserWebSocketsService service(String path, + BrowserWebSocketsService service(String path, {Type type, AngelDeserializer deserializer}) { String uri = path.replaceAll(_straySlashes, ''); - return new BrowserWebSocketsService(socket, this, uri, deserializer: deserializer); + return new BrowserWebSocketsService(socket, this, uri, + deserializer: deserializer); } } diff --git a/lib/flutter.dart b/lib/flutter.dart index 1186dbe8..e4ab38ad 100644 --- a/lib/flutter.dart +++ b/lib/flutter.dart @@ -3,11 +3,9 @@ library angel_websocket.flutter; import 'dart:async'; import 'dart:io'; -import 'package:angel_client/angel_client.dart'; import 'package:http/http.dart' as http; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/io.dart'; -import 'angel_websocket.dart'; import 'base_websocket_client.dart'; export 'package:angel_client/angel_client.dart'; export 'angel_websocket.dart'; diff --git a/lib/io.dart b/lib/io.dart index 19f9e648..fa89368e 100644 --- a/lib/io.dart +++ b/lib/io.dart @@ -46,10 +46,10 @@ class WebSockets extends BaseWebSocketClient { } @override - IoWebSocketsService service(String path, + IoWebSocketsService service(String path, {Type type, AngelDeserializer deserializer}) { String uri = path.replaceAll(_straySlashes, ''); - return new IoWebSocketsService(socket, this, uri, T != dynamic ? T : type); + return new IoWebSocketsService(socket, this, uri, type); } @override diff --git a/lib/server.dart b/lib/server.dart index d48f87c1..8d6a8628 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -16,22 +16,20 @@ part 'websocket_context.dart'; part 'websocket_controller.dart'; -/// Used to assign routes to a given handler. -typedef AngelWebSocketRegisterer(Angel app, RequestHandler handler); - /// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s. -class AngelWebSocket extends AngelPlugin { - Angel _app; +class AngelWebSocket { List _clients = []; final List _servicesAlreadyWired = []; final StreamController _onAction = - new StreamController(); + new StreamController(); final StreamController _onData = new StreamController(); final StreamController _onConnection = - new StreamController.broadcast(); + new StreamController.broadcast(); final StreamController _onDisconnect = - new StreamController.broadcast(); + new StreamController.broadcast(); + + final Angel app; /// If this is not `true`, then all client-side service parameters will be /// discarded, other than `params['query']`. @@ -40,16 +38,8 @@ class AngelWebSocket extends AngelPlugin { /// If `true`, then clients can authenticate their WebSockets by sending a valid JWT. final bool allowAuth; - /// Include debug information, and send error information across WebSockets. - final bool debug; - - bool _sendErrors; - /// Send error information across WebSockets, without including [debug] information.. - bool get sendErrors => _sendErrors == true; - - /// Registers this instance as a route on the server. - final AngelWebSocketRegisterer register; + final bool sendErrors; /// A list of clients currently connected to this server via WebSockets. List get clients => new List.unmodifiable(_clients); @@ -58,9 +48,6 @@ class AngelWebSocket extends AngelPlugin { List get servicesAlreadyWired => new List.unmodifiable(_servicesAlreadyWired); - /// The endpoint that users should connect a WebSocket to. - final String endpoint; - /// Used to notify other nodes of an event's firing. Good for scaled applications. final WebSocketSynchronizer synchronizer; @@ -82,17 +69,13 @@ class AngelWebSocket extends AngelPlugin { /// Deserializes data from WebSockets. Function deserializer; - AngelWebSocket({this.endpoint: '/ws', - this.debug: false, - bool sendErrors, - this.allowClientParams: false, - this.allowAuth: true, - this.register, - this.synchronizer, - this.serializer, - this.deserializer}) { - _sendErrors = sendErrors; - + AngelWebSocket(this.app, + {this.sendErrors: false, + this.allowClientParams: false, + this.allowAuth: true, + this.synchronizer, + this.serializer, + this.deserializer}) { if (serializer == null) serializer = god.serialize; if (deserializer == null) deserializer = (params) => params; } @@ -117,10 +100,6 @@ class AngelWebSocket extends AngelPlugin { }; } - void _printDebug(String msg) { - if (debug == true) print(msg); - } - /// Slates an event to be dispatched. Future batchEvent(WebSocketEvent event, {filter(WebSocketContext socket), bool notify: true}) async { @@ -129,9 +108,6 @@ class AngelWebSocket extends AngelPlugin { var result = true; if (filter != null) result = await filter(client); if (result == true) { - var serialized = event.toJson(); - _printDebug('Batching this event: $serialized'); - // print('Serialized: ' + JSON.encode(serialized)); client.io.add((serializer ?? god.serialize)(event.toJson())); } }); @@ -150,7 +126,7 @@ class AngelWebSocket extends AngelPlugin { if (split.length < 2) return socket.sendError(new AngelHttpException.badRequest()); - var service = _app.service(split[0]); + var service = app.service(split[0]); if (service == null) return socket.sendError(new AngelHttpException.notFound( @@ -170,7 +146,7 @@ class AngelWebSocket extends AngelPlugin { var params = mergeMap([ (deserializer ?? (params) => params)(action.params), { - "provider": Providers.WEBSOCKET, + "provider": Providers.websocket, '__requestctx': socket.request, '__responsectx': socket.response } @@ -204,13 +180,7 @@ class AngelWebSocket extends AngelPlugin { message: "Method Not Allowed: \"$actionName\"")); } } catch (e, st) { - if (e is AngelHttpException) - return socket.sendError(e); - else if (debug == true || _sendErrors == true) - socket.sendError(new AngelHttpException(e, - message: e.toString(), stackTrace: st, errors: [st.toString()])); - else - socket.sendError(new AngelHttpException(e)); + catchError(e, st, socket); } } @@ -228,19 +198,13 @@ class AngelWebSocket extends AngelPlugin { token = new AuthToken.validate(jwt, auth.hmac); var user = await auth.deserializer(token.userId); var req = socket.request; - req..inject(AuthToken, req.properties['token'] = token)..inject( - user.runtimeType, req.properties["user"] = user); + req + ..inject(AuthToken, req.properties['token'] = token) + ..inject(user.runtimeType, req.properties["user"] = user); socket.send(EVENT_AUTHENTICATED, {'token': token.serialize(auth.hmac), 'data': user}); } catch (e, st) { - // Send an error - if (e is AngelHttpException) - socket.sendError(e); - else if (debug == true || _sendErrors == true) - socket.sendError(new AngelHttpException(e, - message: e.toString(), stackTrace: st, errors: [st.toString()])); - else - socket.sendError(new AngelHttpException(e)); + catchError(e, st, socket); } } else { socket.sendError(new AngelHttpException.badRequest( @@ -251,12 +215,15 @@ class AngelWebSocket extends AngelPlugin { /// Hooks a service up to have its events broadcasted. hookupService(Pattern _path, HookedService service) { String path = _path.toString(); - service.after([ - HookedServiceEvent.CREATED, - HookedServiceEvent.MODIFIED, - HookedServiceEvent.UPDATED, - HookedServiceEvent.REMOVED - ], serviceHook(path)); + service.after( + [ + HookedServiceEvent.created, + HookedServiceEvent.modified, + HookedServiceEvent.updated, + HookedServiceEvent.removed + ], + serviceHook(path), + ); _servicesAlreadyWired.add(path); } @@ -298,14 +265,24 @@ class AngelWebSocket extends AngelPlugin { } } } catch (e, st) { - // Send an error - if (e is AngelHttpException) - socket.sendError(e); - else if (debug == true || _sendErrors == true) - socket.sendError(new AngelHttpException(e, - message: e.toString(), stackTrace: st, errors: [st.toString()])); - else - socket.sendError(new AngelHttpException(e)); + catchError(e, st, socket); + } + } + + void catchError(e, StackTrace st, WebSocketContext socket) { + // Send an error + if (e is AngelHttpException) { + socket.sendError(e); + app.logger?.severe(e.message, e.error ?? e, e.stackTrace); + } else if (sendErrors) { + var err = new AngelHttpException(e, + message: e.toString(), stackTrace: st, errors: [st.toString()]); + socket.sendError(err); + app.logger?.severe(err.message, e, st); + } else { + var err = new AngelHttpException(e); + socket.sendError(err); + app.logger?.severe(e.toString(), e, st); } } @@ -324,11 +301,9 @@ class AngelWebSocket extends AngelPlugin { } } - @override - Future call(Angel app) async { - if (_sendErrors == null) _sendErrors = app.isProduction; - - _app = app..container.singleton(this); + /// Configiures an [Angel] instance to listen for WebSocket connections. + Future configureServer(Angel app) async { + app..container.singleton(this); if (runtimeType != AngelWebSocket) app.container.singleton(this, as: AngelWebSocket); @@ -340,50 +315,48 @@ class AngelWebSocket extends AngelPlugin { wireAllServices(app); }); - handler(RequestContext req, ResponseContext res) async { - if (!WebSocketTransformer.isUpgradeRequest(req.io)) - throw new AngelHttpException.badRequest(); - - res - ..willCloseItself = true - ..end(); - - var ws = await WebSocketTransformer.upgrade(req.io); - var socket = new WebSocketContext(ws, req, res); - _clients.add(socket); - await handleConnect(socket); - - _onConnection.add(socket); - - req - ..properties['socket'] = socket - ..inject(WebSocketContext, socket); - - ws.listen((data) { - _onData.add(data); - handleData(socket, data); - }, onDone: () { - _onDisconnect.add(socket); - _clients.remove(ws); - }, onError: (e) { - _onDisconnect.add(socket); - _clients.remove(ws); - }, cancelOnError: true); - } - - _register() { - if (register != null) - return register(app, handler); - else - app.get(endpoint, handler); - } - - await _register(); - if (synchronizer != null) { synchronizer.stream.listen((e) => batchEvent(e, notify: false)); } } + + /// Handles an incoming HTTP request. + Future handleRequest(RequestContext req, ResponseContext res) async { + if (!WebSocketTransformer.isUpgradeRequest(req.io)) + throw new AngelHttpException.badRequest(); + + res + ..willCloseItself = true + ..end(); + + var ws = await WebSocketTransformer.upgrade(req.io); + var socket = new WebSocketContext(ws, req, res); + _clients.add(socket); + await handleConnect(socket); + + _onConnection.add(socket); + + req + ..properties['socket'] = socket + ..inject(WebSocketContext, socket); + + ws.listen( + (data) { + _onData.add(data); + handleData(socket, data); + }, + onDone: () { + _onDisconnect.add(socket); + _clients.remove(ws); + }, + onError: (e) { + _onDisconnect.add(socket); + _clients.remove(ws); + }, + cancelOnError: true, + ); + return false; + } } /// Notifies other nodes of outgoing WWebSocket events, and listens for diff --git a/lib/websocket_controller.dart b/lib/websocket_controller.dart index f692df71..4aad8d9c 100644 --- a/lib/websocket_controller.dart +++ b/lib/websocket_controller.dart @@ -78,16 +78,7 @@ class WebSocketController extends Controller { return app.runContained(fn, socket.request, socket.response); } } catch (e, st) { - // Send an error - if (e is AngelHttpException) - socket.sendError(e); - else if (ws.debug == true || ws.sendErrors == true) - socket.sendError(new AngelHttpException(e, - message: e.toString(), - stackTrace: st, - errors: [st.toString()])); - else - socket.sendError(new AngelHttpException(e)); + ws.catchError(e, st, socket); } }); }); diff --git a/pubspec.yaml b/pubspec.yaml index fe514dbe..828ce5d3 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -2,18 +2,18 @@ name: angel_websocket description: WebSocket plugin for Angel. environment: sdk: ">=1.19.0" -version: 1.0.8 +version: 1.1.0-alpha author: Tobe O homepage: https://github.com/angel-dart/angel_websocket dependencies: - angel_auth: ^1.0.0-dev - angel_client: "^1.0.0" - angel_framework: ^1.0.0-dev + angel_auth: ^1.1.0-alpha + angel_client: ^1.1.0-alpha + angel_framework: ^1.1.0-alpha http: ">=0.11.0 <0.12.0" json_god: ^2.0.0-beta merge_map: ^1.0.0 + meta: ^1.0.0 uuid: "^0.5.3" web_socket_channel: "^1.0.0" dev_dependencies: - angel_diagnostics: "^1.0.0" test: "^0.12.15" diff --git a/test/auth_test.dart b/test/auth_test.dart index 1118affc..0a79c973 100644 --- a/test/auth_test.dart +++ b/test/auth_test.dart @@ -1,10 +1,10 @@ import 'dart:async'; import 'package:angel_auth/angel_auth.dart'; import 'package:angel_client/io.dart' as c; -import 'package:angel_diagnostics/angel_diagnostics.dart'; import 'package:angel_framework/angel_framework.dart'; import 'package:angel_websocket/io.dart' as c; import 'package:angel_websocket/server.dart'; +import 'package:logging/logging.dart'; import 'package:test/test.dart'; const Map USER = const {'username': 'foo', 'password': 'bar'}; @@ -27,10 +27,11 @@ main() { app.post('/auth/local', auth.authenticate('local')); - await app.configure(auth); - var sock = new AngelWebSocket(); - await app.configure(sock); - await app.configure(logRequests()); + await app.configure(auth.configureServer); + var sock = new AngelWebSocket(app); + await app.configure(sock.configureServer); + app.all('/ws', sock.handleRequest); + app.logger = new Logger('angel_auth')..onRecord.listen(print); var server = await app.startServer(); client = new c.Rest('http://${server.address.address}:${server.port}'); @@ -38,12 +39,13 @@ main() { await ws.connect(); }); - tearDown(() => - Future.wait([ - app.close(), - client.close(), - ws.close() - ])); + tearDown(() { + return Future.wait([ + app.close(), + client.close(), + ws.close(), + ]); + }); test('auth event fires', () async { var localAuth = await client.authenticate(type: 'local', credentials: USER); diff --git a/test/controller/io_test.dart b/test/controller/io_test.dart index 0be6243a..7e78b796 100644 --- a/test/controller/io_test.dart +++ b/test/controller/io_test.dart @@ -1,8 +1,8 @@ import 'dart:io'; -import 'package:angel_diagnostics/angel_diagnostics.dart'; import 'package:angel_framework/angel_framework.dart' as srv; import 'package:angel_websocket/io.dart' as ws; import 'package:angel_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; import 'package:test/test.dart'; import 'common.dart'; @@ -16,14 +16,15 @@ main() { setUp(() async { app = new srv.Angel(); - websockets = new srv.AngelWebSocket(debug: true) + websockets = new srv.AngelWebSocket(app) ..onData.listen((data) { print('Received by server: $data'); }); - await app.configure(websockets); + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); await app.configure(new GameController()); - await app.configure(logRequests(new File('log.txt'))); + app.logger = new Logger('angel_auth')..onRecord.listen(print); server = await app.startServer(); url = 'ws://${server.address.address}:${server.port}/ws'; diff --git a/test/service/io_test.dart b/test/service/io_test.dart index e3d8f9d7..883a64f2 100644 --- a/test/service/io_test.dart +++ b/test/service/io_test.dart @@ -1,8 +1,8 @@ import 'dart:io'; -import 'package:angel_diagnostics/angel_diagnostics.dart'; import 'package:angel_framework/angel_framework.dart' as srv; import 'package:angel_websocket/io.dart' as ws; import 'package:angel_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; import 'package:test/test.dart'; import 'common.dart'; @@ -16,13 +16,14 @@ main() { setUp(() async { app = new srv.Angel()..use('/api/todos', new TodoService()); - websockets = new srv.AngelWebSocket(debug: true) + websockets = new srv.AngelWebSocket(app) ..onData.listen((data) { print('Received by server: $data'); }); - await app.configure(websockets); - await app.configure(logRequests(new File('log.txt'))); + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); + app.logger = new Logger('angel_auth')..onRecord.listen(print); server = await app.startServer(); url = 'ws://${server.address.address}:${server.port}/ws';