From 01db4aa9eb406429ef058e91265472347ea1705c Mon Sep 17 00:00:00 2001 From: thosakwe Date: Tue, 28 Feb 2017 09:15:34 -0500 Subject: [PATCH] 1.0.2 --- README.md | 8 +- lib/angel_websocket.dart | 2 + lib/base_websocket_client.dart | 149 +++++++++++++++++++++++++-------- lib/server.dart | 61 +++++++++++++- lib/websocket_context.dart | 17 +++- pubspec.yaml | 4 +- test/controller/io_test.dart | 2 +- test/service/common.dart | 4 +- 8 files changed, 205 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index fad0b8ef..2373bcf5 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # angel_websocket -[![1.0.1](https://img.shields.io/badge/pub-1.0.1-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket) +[![1.0.2](https://img.shields.io/badge/pub-1.0.2-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket) [![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket) WebSocket plugin for Angel. @@ -84,6 +84,9 @@ main() async { // Happens asynchronously Cars.create({"brand": "Toyota"}); + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); + // Listen for arbitrary events app.on['custom_event'].listen((event) { // For example, this might be sent by a @@ -127,5 +130,8 @@ main() async { // Happens asynchronously Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"}); + + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); } ``` diff --git a/lib/angel_websocket.dart b/lib/angel_websocket.dart index 03b4bb4b..9b47796c 100644 --- a/lib/angel_websocket.dart +++ b/lib/angel_websocket.dart @@ -1,6 +1,7 @@ /// WebSocket plugin for Angel. library angel_websocket; +const String ACTION_AUTHENTICATE = 'authenticate'; const String ACTION_INDEX = 'index'; const String ACTION_READ = 'read'; const String ACTION_CREATE = 'create'; @@ -8,6 +9,7 @@ const String ACTION_MODIFY = 'modify'; const String ACTION_UPDATE = 'update'; const String ACTION_REMOVE = 'remove'; +const String EVENT_AUTHENTICATED = 'authenticated'; const String EVENT_ERROR = 'error'; const String EVENT_INDEXED = 'indexed'; const String EVENT_READ = 'read'; diff --git a/lib/base_websocket_client.dart b/lib/base_websocket_client.dart index 351deeb4..6d0eb7f3 100644 --- a/lib/base_websocket_client.dart +++ b/lib/base_websocket_client.dart @@ -12,11 +12,14 @@ final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); /// An [Angel] client that operates across WebSockets. abstract class BaseWebSocketClient extends BaseAngelClient { + Duration _reconnectInterval; WebSocketChannel _socket; final StreamController _onData = new StreamController(); final StreamController _onAllEvents = new StreamController(); + final StreamController _onAuthenticated = + new StreamController(); final StreamController _onError = new StreamController(); final StreamController> _onServiceEvent = @@ -32,6 +35,9 @@ abstract class BaseWebSocketClient extends BaseAngelClient { /// Fired on all events. Stream get onAllEvents => _onAllEvents.stream; + /// Fired whenever a WebSocket is successfully authenticated. + Stream get onAuthenticated => _onAuthenticated.stream; + /// A broadcast stream of data coming from the [socket]. /// /// Mostly just for internal use. @@ -51,17 +57,66 @@ abstract class BaseWebSocketClient extends BaseAngelClient { /// The [WebSocketChannel] underneath this instance. WebSocketChannel get socket => _socket; - BaseWebSocketClient(http.BaseClient client, String basePath) - : super(client, basePath) {} + /// If `true` (default), then the client will automatically try to reconnect to the server + /// if the socket closes. + final bool reconnectOnClose; + + /// The amount of time to wait between reconnect attempts. Default: 10 seconds. + Duration get reconnectInterval => _reconnectInterval; + + BaseWebSocketClient(http.BaseClient client, String basePath, + {this.reconnectOnClose: true, Duration reconnectInterval}) + : super(client, basePath) { + _reconnectInterval = reconnectInterval ?? new Duration(seconds: 10); + } @override - Future close() async => _socket.sink.close(status.goingAway); + Future close() async { + await _socket.sink.close(status.goingAway); + _onData.close(); + _onAllEvents.close(); + _onAuthenticated.close(); + _onError.close(); + _onServiceEvent.close(); + _onWebSocketChannelException.close(); + } - /// Connects the WebSocket. - Future connect() async { - _socket = await getConnectedWebSocket(); - listen(); - return _socket; + /// Connects the WebSocket. [timeout] is optional. + Future connect({Duration timeout}) async { + if (timeout != null) { + var c = new Completer(); + Timer timer; + + timer = new Timer(timeout, () { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.completeError(new TimeoutException( + 'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms', + timeout)); + } + }); + + getConnectedWebSocket().then((socket) { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.complete(socket); + } + }).catchError((e, st) { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.completeError(e, st); + } + }); + + return await c.future.then((socket) { + _socket = socket; + listen(); + }); + } else { + _socket = await getConnectedWebSocket(); + listen(); + return _socket; + } } /// Returns a new [WebSocketChannel], ready to be listened on. @@ -79,39 +134,60 @@ abstract class BaseWebSocketClient extends BaseAngelClient { /// Starts listening for data. void listen() { - _socket.stream.listen((data) { - _onData.add(data); + _socket?.stream?.listen( + (data) { + _onData.add(data); - if (data is WebSocketChannelException) { - _onWebSocketChannelException.add(data); - } else if (data is String) { - var json = JSON.decode(data); + if (data is WebSocketChannelException) { + _onWebSocketChannelException.add(data); + } else if (data is String) { + var json = JSON.decode(data); - if (json is Map) { - var event = new WebSocketEvent.fromJson(json); + if (json is Map) { + var event = new WebSocketEvent.fromJson(json); - if (event.eventName?.isNotEmpty == true) { - _onAllEvents.add(event); - on._getStream(event.eventName).add(event); - } + if (event.eventName?.isNotEmpty == true) { + _onAllEvents.add(event); + on._getStream(event.eventName).add(event); + } - if (event.eventName == EVENT_ERROR) { - var error = new AngelHttpException.fromMap(event.data ?? {}); - _onError.add(error); - } else if (event.eventName?.isNotEmpty == true) { - var split = event.eventName - .split("::") - .where((str) => str.isNotEmpty) - .toList(); + if (event.eventName == EVENT_ERROR) { + var error = new AngelHttpException.fromMap(event.data ?? {}); + _onError.add(error); + } else if (event.eventName == EVENT_AUTHENTICATED) { + var authResult = new AngelAuthResult.fromMap(event.data); + _onAuthenticated.add(authResult); + } else if (event.eventName?.isNotEmpty == true) { + var split = event.eventName + .split("::") + .where((str) => str.isNotEmpty) + .toList(); - if (split.length >= 2) { - var serviceName = split[0], eventName = split[1]; - _onServiceEvent.add({serviceName: event..eventName = eventName}); + if (split.length >= 2) { + var serviceName = split[0], eventName = split[1]; + _onServiceEvent + .add({serviceName: event..eventName = eventName}); + } + } } } - } - } - }); + }, + cancelOnError: true, + onDone: () { + if (reconnectOnClose == true) { + new Timer.periodic(reconnectInterval, (Timer timer) async { + var result; + + try { + result = await connect(timeout: reconnectInterval); + } catch (e) { + // + } + + if (result != null) timer.cancel(); + }); + } + }); } /// Serializes data to JSON. @@ -125,6 +201,11 @@ abstract class BaseWebSocketClient extends BaseAngelClient { void sendAction(WebSocketAction action) { socket.sink.add(serialize(action)); } + + /// Attempts to authenticate a WebSocket, using a valid JWT. + void authenticateViaJwt(String jwt) { + send(ACTION_AUTHENTICATE, new WebSocketAction(params: {'jwt': jwt})); + } } /// A [Service] that asynchronously interacts with the server. diff --git a/lib/server.dart b/lib/server.dart index ee5f11e2..132c8edb 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:mirrors'; +import 'package:angel_auth/angel_auth.dart'; import 'package:angel_framework/angel_framework.dart'; import 'package:json_god/json_god.dart' as god; import 'package:merge_map/merge_map.dart'; @@ -35,6 +36,9 @@ class AngelWebSocket extends AngelPlugin { /// discarded, other than `params['query']`. final bool allowClientParams; + /// If `true`, then clients can authenticate their WebSockets by sending a valid JWT. + final bool allowAuth; + /// Include debug information, and send error information across WebSockets. final bool debug; @@ -51,6 +55,9 @@ class AngelWebSocket extends AngelPlugin { /// The endpoint that users should connect a WebSocket to. final String endpoint; + /// Used to notify other nodes of an event's firing. Good for scaled applications. + final WebSocketSynchronizer synchronizer; + /// Fired on any [WebSocketAction]. Stream get onAction => _onAction.stream; @@ -67,7 +74,9 @@ class AngelWebSocket extends AngelPlugin { {this.endpoint: '/ws', this.debug: false, this.allowClientParams: false, - this.register}); + this.allowAuth: true, + this.register, + this.synchronizer}); serviceHook(String path) { return (HookedServiceEvent e) async { @@ -93,7 +102,7 @@ class AngelWebSocket extends AngelPlugin { /// Slates an event to be dispatched. Future batchEvent(WebSocketEvent event, - {filter(WebSocketContext socket)}) async { + {filter(WebSocketContext socket), bool notify: true}) async { // Default implementation will just immediately fire events _clients.forEach((client) async { var result = true; @@ -105,6 +114,9 @@ class AngelWebSocket extends AngelPlugin { client.io.add(god.serialize(event.toJson())); } }); + + if (synchronizer != null && notify != false) + synchronizer.notifyOthers(event); } /// Returns a list of events yet to be sent. @@ -112,6 +124,9 @@ class AngelWebSocket extends AngelPlugin { /// Responds to an incoming action on a WebSocket. Future handleAction(WebSocketAction action, WebSocketContext socket) async { + if (action.eventName == ACTION_AUTHENTICATE) + return await handleAuth(action, socket); + var split = action.eventName.split("::"); if (split.length < 2) @@ -181,6 +196,37 @@ class AngelWebSocket extends AngelPlugin { } } + /// Authenticates a [WebSocketContext]. + Future handleAuth(WebSocketAction action, WebSocketContext socket) async { + if (allowAuth != false && + action.eventName == ACTION_AUTHENTICATE && + action.params['jwt'] is String) { + try { + var auth = socket.request.grab(AngelAuth); + var jwt = action.params['jwt'] as String; + AuthToken token; + + token = new AuthToken.validate(jwt, auth.hmac); + var user = await auth.deserializer(token.userId); + var req = socket.request; + req + ..inject(AuthToken, req.properties['token'] = token) + ..inject(user.runtimeType, req.properties["user"] = user); + socket.send(EVENT_AUTHENTICATED, + {'token': token.serialize(auth.hmac), 'data': user}); + } catch (e, st) { + // Send an error + if (e is AngelHttpException) + socket.sendError(e); + else if (debug == true) + socket.sendError(new AngelHttpException(e, + message: e.toString(), stackTrace: st, errors: [st.toString()])); + else + socket.sendError(new AngelHttpException(e)); + } + } + } + /// Hooks a service up to have its events broadcasted. hookupService(Pattern _path, HookedService service) { String path = _path.toString(); @@ -309,5 +355,16 @@ class AngelWebSocket extends AngelPlugin { } await _register(); + + if (synchronizer != null) { + synchronizer.stream.listen((e) => batchEvent(e, notify: false)); + } } } + +/// Notifies other nodes of outgoing WWebSocket events, and listens for +/// notifications from other nodes. +abstract class WebSocketSynchronizer { + Stream get stream; + void notifyOthers(WebSocketEvent e); +} diff --git a/lib/websocket_context.dart b/lib/websocket_context.dart index 98a6c78a..7c97689f 100644 --- a/lib/websocket_context.dart +++ b/lib/websocket_context.dart @@ -17,17 +17,32 @@ class WebSocketContext { StreamController _onAction = new StreamController(); + + StreamController _onClose = new StreamController(); + StreamController _onData = new StreamController(); /// Fired on any [WebSocketAction]; Stream get onAction => _onAction.stream; + /// Fired once the underlying [WebSocket] closes. + Stream get onClose => _onClose.stream; + /// Fired when any data is sent through [io]. Stream get onData => _onData.stream; WebSocketContext(WebSocket this.io, RequestContext this.request, ResponseContext this.response); + /// Closes the underlying [WebSocket]. + Future close([int code, String reason]) async { + await io.close(code, reason); + _onAction.close(); + _onData.close(); + _onClose.add(null); + _onClose.close(); + } + /// Sends an arbitrary [WebSocketEvent]; void send(String eventName, data) { io.add(god.serialize(new WebSocketEvent(eventName: eventName, data: data))); @@ -42,7 +57,7 @@ class _WebSocketEventTable { StreamController _getStreamForEvent(eventName) { if (!_handlers.containsKey(eventName)) - _handlers[eventName] = new StreamController.broadcast(); + _handlers[eventName] = new StreamController(); return _handlers[eventName]; } diff --git a/pubspec.yaml b/pubspec.yaml index e0f2ca3c..5f18af65 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -2,12 +2,12 @@ name: angel_websocket description: WebSocket plugin for Angel. environment: sdk: ">=1.19.0" -version: 1.0.1 +version: 1.0.2 author: Tobe O homepage: https://github.com/angel-dart/angel_websocket dependencies: angel_auth: "^1.0.0-dev" - angel_client: "^1.0.0-dev" + angel_client: "^1.0.0" angel_framework: "^1.0.0-dev" uuid: "^0.5.3" web_socket_channel: "^1.0.0" diff --git a/test/controller/io_test.dart b/test/controller/io_test.dart index 505c2582..0be6243a 100644 --- a/test/controller/io_test.dart +++ b/test/controller/io_test.dart @@ -29,7 +29,7 @@ main() { url = 'ws://${server.address.address}:${server.port}/ws'; client = new ws.WebSockets(url); - await client.connect(); + await client.connect(timeout: new Duration(seconds: 3)); client ..onData.listen((data) { diff --git a/test/service/common.dart b/test/service/common.dart index e71f30b5..250e94f9 100644 --- a/test/service/common.dart +++ b/test/service/common.dart @@ -10,7 +10,9 @@ class Todo extends Model { Todo({String this.text, String this.when}); } -class TodoService extends MemoryService {} +class TodoService extends TypedService { + TodoService() : super(new MapService()); +} testIndex(BaseWebSocketClient client) async { var Todos = client.service('api/todos');