This commit is contained in:
thosakwe 2017-02-28 09:15:34 -05:00
parent fe1f85bfa8
commit 01db4aa9eb
8 changed files with 205 additions and 42 deletions

View file

@ -1,5 +1,5 @@
# angel_websocket # angel_websocket
[![1.0.1](https://img.shields.io/badge/pub-1.0.1-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket) [![1.0.2](https://img.shields.io/badge/pub-1.0.2-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket)
[![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket) [![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket)
WebSocket plugin for Angel. WebSocket plugin for Angel.
@ -84,6 +84,9 @@ main() async {
// Happens asynchronously // Happens asynchronously
Cars.create({"brand": "Toyota"}); Cars.create({"brand": "Toyota"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
// Listen for arbitrary events // Listen for arbitrary events
app.on['custom_event'].listen((event) { app.on['custom_event'].listen((event) {
// For example, this might be sent by a // For example, this might be sent by a
@ -127,5 +130,8 @@ main() async {
// Happens asynchronously // Happens asynchronously
Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"}); Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
} }
``` ```

View file

@ -1,6 +1,7 @@
/// WebSocket plugin for Angel. /// WebSocket plugin for Angel.
library angel_websocket; library angel_websocket;
const String ACTION_AUTHENTICATE = 'authenticate';
const String ACTION_INDEX = 'index'; const String ACTION_INDEX = 'index';
const String ACTION_READ = 'read'; const String ACTION_READ = 'read';
const String ACTION_CREATE = 'create'; const String ACTION_CREATE = 'create';
@ -8,6 +9,7 @@ const String ACTION_MODIFY = 'modify';
const String ACTION_UPDATE = 'update'; const String ACTION_UPDATE = 'update';
const String ACTION_REMOVE = 'remove'; const String ACTION_REMOVE = 'remove';
const String EVENT_AUTHENTICATED = 'authenticated';
const String EVENT_ERROR = 'error'; const String EVENT_ERROR = 'error';
const String EVENT_INDEXED = 'indexed'; const String EVENT_INDEXED = 'indexed';
const String EVENT_READ = 'read'; const String EVENT_READ = 'read';

View file

@ -12,11 +12,14 @@ final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// An [Angel] client that operates across WebSockets. /// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient { abstract class BaseWebSocketClient extends BaseAngelClient {
Duration _reconnectInterval;
WebSocketChannel _socket; WebSocketChannel _socket;
final StreamController _onData = new StreamController(); final StreamController _onData = new StreamController();
final StreamController<WebSocketEvent> _onAllEvents = final StreamController<WebSocketEvent> _onAllEvents =
new StreamController<WebSocketEvent>(); new StreamController<WebSocketEvent>();
final StreamController<AngelAuthResult> _onAuthenticated =
new StreamController<AngelAuthResult>();
final StreamController<AngelHttpException> _onError = final StreamController<AngelHttpException> _onError =
new StreamController<AngelHttpException>(); new StreamController<AngelHttpException>();
final StreamController<Map<String, WebSocketEvent>> _onServiceEvent = final StreamController<Map<String, WebSocketEvent>> _onServiceEvent =
@ -32,6 +35,9 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// Fired on all events. /// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream; Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired whenever a WebSocket is successfully authenticated.
Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream;
/// A broadcast stream of data coming from the [socket]. /// A broadcast stream of data coming from the [socket].
/// ///
/// Mostly just for internal use. /// Mostly just for internal use.
@ -51,18 +57,67 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// The [WebSocketChannel] underneath this instance. /// The [WebSocketChannel] underneath this instance.
WebSocketChannel get socket => _socket; WebSocketChannel get socket => _socket;
BaseWebSocketClient(http.BaseClient client, String basePath) /// If `true` (default), then the client will automatically try to reconnect to the server
: super(client, basePath) {} /// if the socket closes.
final bool reconnectOnClose;
/// The amount of time to wait between reconnect attempts. Default: 10 seconds.
Duration get reconnectInterval => _reconnectInterval;
BaseWebSocketClient(http.BaseClient client, String basePath,
{this.reconnectOnClose: true, Duration reconnectInterval})
: super(client, basePath) {
_reconnectInterval = reconnectInterval ?? new Duration(seconds: 10);
}
@override @override
Future close() async => _socket.sink.close(status.goingAway); Future close() async {
await _socket.sink.close(status.goingAway);
_onData.close();
_onAllEvents.close();
_onAuthenticated.close();
_onError.close();
_onServiceEvent.close();
_onWebSocketChannelException.close();
}
/// Connects the WebSocket. /// Connects the WebSocket. [timeout] is optional.
Future<WebSocketChannel> connect() async { Future<WebSocketChannel> connect({Duration timeout}) async {
if (timeout != null) {
var c = new Completer<WebSocketChannel>();
Timer timer;
timer = new Timer(timeout, () {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(new TimeoutException(
'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms',
timeout));
}
});
getConnectedWebSocket().then((socket) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.complete(socket);
}
}).catchError((e, st) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(e, st);
}
});
return await c.future.then((socket) {
_socket = socket;
listen();
});
} else {
_socket = await getConnectedWebSocket(); _socket = await getConnectedWebSocket();
listen(); listen();
return _socket; return _socket;
} }
}
/// Returns a new [WebSocketChannel], ready to be listened on. /// Returns a new [WebSocketChannel], ready to be listened on.
/// ///
@ -79,7 +134,8 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// Starts listening for data. /// Starts listening for data.
void listen() { void listen() {
_socket.stream.listen((data) { _socket?.stream?.listen(
(data) {
_onData.add(data); _onData.add(data);
if (data is WebSocketChannelException) { if (data is WebSocketChannelException) {
@ -98,6 +154,9 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
if (event.eventName == EVENT_ERROR) { if (event.eventName == EVENT_ERROR) {
var error = new AngelHttpException.fromMap(event.data ?? {}); var error = new AngelHttpException.fromMap(event.data ?? {});
_onError.add(error); _onError.add(error);
} else if (event.eventName == EVENT_AUTHENTICATED) {
var authResult = new AngelAuthResult.fromMap(event.data);
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) { } else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName var split = event.eventName
.split("::") .split("::")
@ -106,11 +165,28 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
if (split.length >= 2) { if (split.length >= 2) {
var serviceName = split[0], eventName = split[1]; var serviceName = split[0], eventName = split[1];
_onServiceEvent.add({serviceName: event..eventName = eventName}); _onServiceEvent
.add({serviceName: event..eventName = eventName});
} }
} }
} }
} }
},
cancelOnError: true,
onDone: () {
if (reconnectOnClose == true) {
new Timer.periodic(reconnectInterval, (Timer timer) async {
var result;
try {
result = await connect(timeout: reconnectInterval);
} catch (e) {
//
}
if (result != null) timer.cancel();
});
}
}); });
} }
@ -125,6 +201,11 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
void sendAction(WebSocketAction action) { void sendAction(WebSocketAction action) {
socket.sink.add(serialize(action)); socket.sink.add(serialize(action));
} }
/// Attempts to authenticate a WebSocket, using a valid JWT.
void authenticateViaJwt(String jwt) {
send(ACTION_AUTHENTICATE, new WebSocketAction(params: {'jwt': jwt}));
}
} }
/// A [Service] that asynchronously interacts with the server. /// A [Service] that asynchronously interacts with the server.

View file

@ -5,6 +5,7 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'dart:mirrors'; import 'dart:mirrors';
import 'package:angel_auth/angel_auth.dart';
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:json_god/json_god.dart' as god; import 'package:json_god/json_god.dart' as god;
import 'package:merge_map/merge_map.dart'; import 'package:merge_map/merge_map.dart';
@ -35,6 +36,9 @@ class AngelWebSocket extends AngelPlugin {
/// discarded, other than `params['query']`. /// discarded, other than `params['query']`.
final bool allowClientParams; final bool allowClientParams;
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
/// Include debug information, and send error information across WebSockets. /// Include debug information, and send error information across WebSockets.
final bool debug; final bool debug;
@ -51,6 +55,9 @@ class AngelWebSocket extends AngelPlugin {
/// The endpoint that users should connect a WebSocket to. /// The endpoint that users should connect a WebSocket to.
final String endpoint; final String endpoint;
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final WebSocketSynchronizer synchronizer;
/// Fired on any [WebSocketAction]. /// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream; Stream<WebSocketAction> get onAction => _onAction.stream;
@ -67,7 +74,9 @@ class AngelWebSocket extends AngelPlugin {
{this.endpoint: '/ws', {this.endpoint: '/ws',
this.debug: false, this.debug: false,
this.allowClientParams: false, this.allowClientParams: false,
this.register}); this.allowAuth: true,
this.register,
this.synchronizer});
serviceHook(String path) { serviceHook(String path) {
return (HookedServiceEvent e) async { return (HookedServiceEvent e) async {
@ -93,7 +102,7 @@ class AngelWebSocket extends AngelPlugin {
/// Slates an event to be dispatched. /// Slates an event to be dispatched.
Future batchEvent(WebSocketEvent event, Future batchEvent(WebSocketEvent event,
{filter(WebSocketContext socket)}) async { {filter(WebSocketContext socket), bool notify: true}) async {
// Default implementation will just immediately fire events // Default implementation will just immediately fire events
_clients.forEach((client) async { _clients.forEach((client) async {
var result = true; var result = true;
@ -105,6 +114,9 @@ class AngelWebSocket extends AngelPlugin {
client.io.add(god.serialize(event.toJson())); client.io.add(god.serialize(event.toJson()));
} }
}); });
if (synchronizer != null && notify != false)
synchronizer.notifyOthers(event);
} }
/// Returns a list of events yet to be sent. /// Returns a list of events yet to be sent.
@ -112,6 +124,9 @@ class AngelWebSocket extends AngelPlugin {
/// Responds to an incoming action on a WebSocket. /// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async { Future handleAction(WebSocketAction action, WebSocketContext socket) async {
if (action.eventName == ACTION_AUTHENTICATE)
return await handleAuth(action, socket);
var split = action.eventName.split("::"); var split = action.eventName.split("::");
if (split.length < 2) if (split.length < 2)
@ -181,6 +196,37 @@ class AngelWebSocket extends AngelPlugin {
} }
} }
/// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
action.eventName == ACTION_AUTHENTICATE &&
action.params['jwt'] is String) {
try {
var auth = socket.request.grab<AngelAuth>(AngelAuth);
var jwt = action.params['jwt'] as String;
AuthToken token;
token = new AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
var req = socket.request;
req
..inject(AuthToken, req.properties['token'] = token)
..inject(user.runtimeType, req.properties["user"] = user);
socket.send(EVENT_AUTHENTICATED,
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (debug == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
}
}
}
/// Hooks a service up to have its events broadcasted. /// Hooks a service up to have its events broadcasted.
hookupService(Pattern _path, HookedService service) { hookupService(Pattern _path, HookedService service) {
String path = _path.toString(); String path = _path.toString();
@ -309,5 +355,16 @@ class AngelWebSocket extends AngelPlugin {
} }
await _register(); await _register();
if (synchronizer != null) {
synchronizer.stream.listen((e) => batchEvent(e, notify: false));
} }
} }
}
/// Notifies other nodes of outgoing WWebSocket events, and listens for
/// notifications from other nodes.
abstract class WebSocketSynchronizer {
Stream<WebSocketEvent> get stream;
void notifyOthers(WebSocketEvent e);
}

View file

@ -17,17 +17,32 @@ class WebSocketContext {
StreamController<WebSocketAction> _onAction = StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>(); new StreamController<WebSocketAction>();
StreamController<Null> _onClose = new StreamController<Null>();
StreamController _onData = new StreamController(); StreamController _onData = new StreamController();
/// Fired on any [WebSocketAction]; /// Fired on any [WebSocketAction];
Stream<WebSocketAction> get onAction => _onAction.stream; Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired once the underlying [WebSocket] closes.
Stream<Null> get onClose => _onClose.stream;
/// Fired when any data is sent through [io]. /// Fired when any data is sent through [io].
Stream get onData => _onData.stream; Stream get onData => _onData.stream;
WebSocketContext(WebSocket this.io, RequestContext this.request, WebSocketContext(WebSocket this.io, RequestContext this.request,
ResponseContext this.response); ResponseContext this.response);
/// Closes the underlying [WebSocket].
Future close([int code, String reason]) async {
await io.close(code, reason);
_onAction.close();
_onData.close();
_onClose.add(null);
_onClose.close();
}
/// Sends an arbitrary [WebSocketEvent]; /// Sends an arbitrary [WebSocketEvent];
void send(String eventName, data) { void send(String eventName, data) {
io.add(god.serialize(new WebSocketEvent(eventName: eventName, data: data))); io.add(god.serialize(new WebSocketEvent(eventName: eventName, data: data)));
@ -42,7 +57,7 @@ class _WebSocketEventTable {
StreamController<Map> _getStreamForEvent(eventName) { StreamController<Map> _getStreamForEvent(eventName) {
if (!_handlers.containsKey(eventName)) if (!_handlers.containsKey(eventName))
_handlers[eventName] = new StreamController<Map>.broadcast(); _handlers[eventName] = new StreamController<Map>();
return _handlers[eventName]; return _handlers[eventName];
} }

View file

@ -2,12 +2,12 @@ name: angel_websocket
description: WebSocket plugin for Angel. description: WebSocket plugin for Angel.
environment: environment:
sdk: ">=1.19.0" sdk: ">=1.19.0"
version: 1.0.1 version: 1.0.2
author: Tobe O <thosakwe@gmail.com> author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket homepage: https://github.com/angel-dart/angel_websocket
dependencies: dependencies:
angel_auth: "^1.0.0-dev" angel_auth: "^1.0.0-dev"
angel_client: "^1.0.0-dev" angel_client: "^1.0.0"
angel_framework: "^1.0.0-dev" angel_framework: "^1.0.0-dev"
uuid: "^0.5.3" uuid: "^0.5.3"
web_socket_channel: "^1.0.0" web_socket_channel: "^1.0.0"

View file

@ -29,7 +29,7 @@ main() {
url = 'ws://${server.address.address}:${server.port}/ws'; url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url); client = new ws.WebSockets(url);
await client.connect(); await client.connect(timeout: new Duration(seconds: 3));
client client
..onData.listen((data) { ..onData.listen((data) {

View file

@ -10,7 +10,9 @@ class Todo extends Model {
Todo({String this.text, String this.when}); Todo({String this.text, String this.when});
} }
class TodoService extends MemoryService<Todo> {} class TodoService extends TypedService<Todo> {
TodoService() : super(new MapService());
}
testIndex(BaseWebSocketClient client) async { testIndex(BaseWebSocketClient client) async {
var Todos = client.service('api/todos'); var Todos = client.service('api/todos');