platform/lib/server.dart

394 lines
13 KiB
Dart
Raw Normal View History

2016-12-23 20:57:46 +00:00
/// Server-side support for WebSockets.
2016-04-29 01:19:09 +00:00
library angel_websocket.server;
2016-06-27 00:42:21 +00:00
import 'dart:async';
2016-07-06 13:33:40 +00:00
import 'dart:convert';
2016-04-29 01:19:09 +00:00
import 'dart:io';
2016-09-18 02:53:58 +00:00
import 'dart:mirrors';
2017-02-28 14:15:34 +00:00
import 'package:angel_auth/angel_auth.dart';
2016-04-29 01:19:09 +00:00
import 'package:angel_framework/angel_framework.dart';
2016-06-27 00:42:21 +00:00
import 'package:json_god/json_god.dart' as god;
import 'package:merge_map/merge_map.dart';
2016-06-27 00:42:21 +00:00
import 'angel_websocket.dart';
2016-09-03 12:34:01 +00:00
export 'angel_websocket.dart';
2016-04-29 01:19:09 +00:00
part 'websocket_context.dart';
2017-06-03 18:13:38 +00:00
2016-09-18 02:53:58 +00:00
part 'websocket_controller.dart';
2016-04-29 01:19:09 +00:00
2017-01-28 21:38:26 +00:00
/// Used to assign routes to a given handler.
typedef AngelWebSocketRegisterer(Angel app, RequestHandler handler);
2016-12-23 20:57:46 +00:00
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
2016-09-17 20:00:17 +00:00
class AngelWebSocket extends AngelPlugin {
Angel _app;
2017-01-29 20:02:19 +00:00
List<WebSocketContext> _clients = [];
2016-12-23 10:47:21 +00:00
final List<String> _servicesAlreadyWired = [];
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketAction> _onAction =
2017-06-03 18:13:38 +00:00
new StreamController<WebSocketAction>();
2016-12-23 20:57:46 +00:00
final StreamController _onData = new StreamController();
final StreamController<WebSocketContext> _onConnection =
2017-06-03 18:13:38 +00:00
new StreamController<WebSocketContext>.broadcast();
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketContext> _onDisconnect =
2017-06-03 18:13:38 +00:00
new StreamController<WebSocketContext>.broadcast();
2016-12-23 20:57:46 +00:00
2017-02-22 22:34:35 +00:00
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
final bool allowClientParams;
2017-02-28 14:15:34 +00:00
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
2016-12-23 20:57:46 +00:00
/// Include debug information, and send error information across WebSockets.
final bool debug;
2017-04-23 18:40:30 +00:00
bool _sendErrors;
/// Send error information across WebSockets, without including [debug] information..
bool get sendErrors => _sendErrors == true;
2017-01-28 21:38:26 +00:00
/// Registers this instance as a route on the server.
final AngelWebSocketRegisterer register;
2016-12-23 10:47:21 +00:00
/// A list of clients currently connected to this server via WebSockets.
2017-01-29 20:02:19 +00:00
List<WebSocketContext> get clients => new List.unmodifiable(_clients);
2016-12-23 10:47:21 +00:00
/// Services that have already been hooked to fire socket events.
2016-12-23 20:57:46 +00:00
List<String> get servicesAlreadyWired =>
new List.unmodifiable(_servicesAlreadyWired);
2016-12-23 10:47:21 +00:00
2016-12-23 20:57:46 +00:00
/// The endpoint that users should connect a WebSocket to.
2016-12-23 10:47:21 +00:00
final String endpoint;
2017-02-28 14:15:34 +00:00
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final WebSocketSynchronizer synchronizer;
2016-12-23 20:57:46 +00:00
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired whenever a WebSocket sends data.
Stream get onData => _onData.stream;
2016-12-23 10:47:21 +00:00
/// Fired on incoming connections.
2016-09-18 01:35:16 +00:00
Stream<WebSocketContext> get onConnection => _onConnection.stream;
2016-12-23 10:47:21 +00:00
/// Fired when a user disconnects.
2016-09-18 03:16:51 +00:00
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
2016-04-29 01:19:09 +00:00
2017-04-23 18:40:30 +00:00
/// Serializes data to WebSockets.
ResponseSerializer serializer;
/// Deserializes data from WebSockets.
Function deserializer;
2017-06-03 18:13:38 +00:00
AngelWebSocket({this.endpoint: '/ws',
this.debug: false,
bool sendErrors,
this.allowClientParams: false,
this.allowAuth: true,
this.register,
this.synchronizer,
this.serializer,
this.deserializer}) {
2017-04-23 18:40:30 +00:00
_sendErrors = sendErrors;
if (serializer == null) serializer = god.serialize;
if (deserializer == null) deserializer = (params) => params;
}
2017-02-22 22:34:35 +00:00
serviceHook(String path) {
return (HookedServiceEvent e) async {
2017-02-22 22:34:35 +00:00
if (e.params != null && e.params['broadcast'] == false) return;
var event = await transformEvent(e);
event.eventName = "$path::${event.eventName}";
2017-01-29 20:02:19 +00:00
_filter(WebSocketContext socket) {
if (e.service.properties.containsKey('ws:filter'))
2017-04-10 01:45:45 +00:00
return e.service.properties['ws:filter'](e, socket);
2017-01-29 20:02:19 +00:00
else
return true;
}
await batchEvent(event, filter: _filter);
};
}
2017-02-12 05:05:57 +00:00
void _printDebug(String msg) {
if (debug == true) print(msg);
}
2016-12-23 20:57:46 +00:00
/// Slates an event to be dispatched.
2017-01-29 20:02:19 +00:00
Future batchEvent(WebSocketEvent event,
2017-02-28 14:15:34 +00:00
{filter(WebSocketContext socket), bool notify: true}) async {
// Default implementation will just immediately fire events
2017-01-29 20:02:19 +00:00
_clients.forEach((client) async {
var result = true;
if (filter != null) result = await filter(client);
2017-02-12 05:05:57 +00:00
if (result == true) {
var serialized = event.toJson();
_printDebug('Batching this event: $serialized');
2017-02-12 20:06:54 +00:00
// print('Serialized: ' + JSON.encode(serialized));
2017-04-23 18:40:30 +00:00
client.io.add((serializer ?? god.serialize)(event.toJson()));
2017-02-12 05:05:57 +00:00
}
});
2017-02-28 14:15:34 +00:00
if (synchronizer != null && notify != false)
synchronizer.notifyOthers(event);
}
2016-12-23 20:57:46 +00:00
/// Returns a list of events yet to be sent.
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
2016-06-27 00:42:21 +00:00
2016-12-23 20:57:46 +00:00
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::");
2016-06-27 00:42:21 +00:00
if (split.length < 2)
2017-01-28 21:38:26 +00:00
return socket.sendError(new AngelHttpException.badRequest());
2016-06-27 00:42:21 +00:00
var service = _app.service(split[0]);
2016-04-29 01:19:09 +00:00
if (service == null)
2017-01-28 21:38:26 +00:00
return socket.sendError(new AngelHttpException.notFound(
message: "No service \"${split[0]}\" exists."));
2016-12-23 20:57:46 +00:00
var actionName = split[1];
2017-02-22 22:34:35 +00:00
if (action.params is! Map) action.params = {};
if (allowClientParams != true) {
if (action.params['query'] is Map)
action.params = {'query': action.params['query']};
else
action.params = {};
}
var params = mergeMap([
2017-04-23 18:40:30 +00:00
(deserializer ?? (params) => params)(action.params),
2017-02-12 20:06:54 +00:00
{
"provider": Providers.WEBSOCKET,
'__requestctx': socket.request,
'__responsectx': socket.response
}
]);
2016-07-06 13:33:40 +00:00
try {
2016-12-23 20:57:46 +00:00
if (actionName == ACTION_INDEX) {
return socket.send(
"${split[0]}::" + EVENT_INDEXED, await service.index(params));
} else if (actionName == ACTION_READ) {
return socket.send("${split[0]}::" + EVENT_READ,
await service.read(action.id, params));
2016-12-23 20:57:46 +00:00
} else if (actionName == ACTION_CREATE) {
return new WebSocketEvent(
2016-12-23 20:57:46 +00:00
eventName: "${split[0]}::" + EVENT_CREATED,
data: await service.create(action.data, params));
2016-12-23 20:57:46 +00:00
} else if (actionName == ACTION_MODIFY) {
return new WebSocketEvent(
2016-12-23 20:57:46 +00:00
eventName: "${split[0]}::" + EVENT_MODIFIED,
data: await service.modify(action.id, action.data, params));
2016-12-23 20:57:46 +00:00
} else if (actionName == ACTION_UPDATE) {
return new WebSocketEvent(
2016-12-23 20:57:46 +00:00
eventName: "${split[0]}::" + EVENT_UPDATED,
data: await service.update(action.id, action.data, params));
2016-12-23 20:57:46 +00:00
} else if (actionName == ACTION_REMOVE) {
return new WebSocketEvent(
2016-12-23 20:57:46 +00:00
eventName: "${split[0]}::" + EVENT_REMOVED,
data: await service.remove(action.id, params));
} else {
2017-01-28 21:38:26 +00:00
return socket.sendError(new AngelHttpException.methodNotAllowed(
2016-12-23 20:57:46 +00:00
message: "Method Not Allowed: \"$actionName\""));
2016-04-29 01:19:09 +00:00
}
2016-12-23 20:57:46 +00:00
} catch (e, st) {
if (e is AngelHttpException)
return socket.sendError(e);
2017-04-23 18:40:30 +00:00
else if (debug == true || _sendErrors == true)
2016-12-23 20:57:46 +00:00
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
2016-04-29 01:19:09 +00:00
}
}
2016-04-29 01:19:09 +00:00
2017-02-28 14:15:34 +00:00
/// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
action.eventName == ACTION_AUTHENTICATE &&
2017-04-23 18:53:12 +00:00
action.params['query'] is Map &&
action.params['query']['jwt'] is String) {
2017-02-28 14:15:34 +00:00
try {
var auth = socket.request.grab<AngelAuth>(AngelAuth);
2017-04-23 18:53:12 +00:00
var jwt = action.params['query']['jwt'] as String;
2017-02-28 14:15:34 +00:00
AuthToken token;
token = new AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
var req = socket.request;
2017-06-03 18:13:38 +00:00
req..inject(AuthToken, req.properties['token'] = token)..inject(
user.runtimeType, req.properties["user"] = user);
2017-02-28 14:15:34 +00:00
socket.send(EVENT_AUTHENTICATED,
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
2017-04-23 18:40:30 +00:00
else if (debug == true || _sendErrors == true)
2017-02-28 14:15:34 +00:00
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
}
2017-04-23 18:53:12 +00:00
} else {
socket.sendError(new AngelHttpException.badRequest(
message: 'No JWT provided for authentication.'));
2017-02-28 14:15:34 +00:00
}
}
2016-12-23 20:57:46 +00:00
/// Hooks a service up to have its events broadcasted.
hookupService(Pattern _path, HookedService service) {
String path = _path.toString();
2017-04-17 12:37:17 +00:00
service.after([
HookedServiceEvent.CREATED,
HookedServiceEvent.MODIFIED,
HookedServiceEvent.UPDATED,
HookedServiceEvent.REMOVED
], serviceHook(path));
2016-12-23 10:47:21 +00:00
_servicesAlreadyWired.add(path);
}
2016-12-23 20:57:46 +00:00
/// Runs before firing [onConnection].
Future handleConnect(WebSocketContext socket) async {}
2016-07-06 13:33:40 +00:00
2016-12-23 20:57:46 +00:00
/// Handles incoming data from a WebSocket.
handleData(WebSocketContext socket, data) async {
try {
2016-09-18 01:35:16 +00:00
socket._onData.add(data);
2016-07-06 13:33:40 +00:00
var fromJson = JSON.decode(data);
2016-12-23 20:57:46 +00:00
var action = new WebSocketAction.fromJson(fromJson);
_onAction.add(action);
if (action.eventName == null ||
action.eventName is! String ||
2016-07-06 13:33:40 +00:00
action.eventName.isEmpty) {
2017-01-28 21:38:26 +00:00
throw new AngelHttpException.badRequest();
2016-07-06 13:33:40 +00:00
}
2016-05-03 23:42:06 +00:00
2016-09-18 01:35:16 +00:00
if (fromJson is Map && fromJson.containsKey("eventName")) {
2016-12-23 20:57:46 +00:00
socket._onAction.add(new WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson["eventName"].toString())
.add(fromJson["data"]);
2016-09-18 01:35:16 +00:00
}
2016-07-06 13:33:40 +00:00
2017-06-03 18:13:38 +00:00
if (action.eventName == ACTION_AUTHENTICATE)
await handleAuth(action, socket);
2016-09-18 01:35:16 +00:00
if (action.eventName.contains("::")) {
var split = action.eventName.split("::");
if (split.length >= 2) {
2016-12-23 20:57:46 +00:00
if (ACTIONS.contains(split[1])) {
2016-09-18 01:35:16 +00:00
var event = handleAction(action, socket);
if (event is Future) event = await event;
}
}
2016-04-29 01:19:09 +00:00
}
2016-12-23 20:57:46 +00:00
} catch (e, st) {
// Send an error
2016-07-06 13:33:40 +00:00
if (e is AngelHttpException)
socket.sendError(e);
2017-04-23 18:40:30 +00:00
else if (debug == true || _sendErrors == true)
2016-12-23 20:57:46 +00:00
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
2016-07-06 13:33:40 +00:00
else
socket.sendError(new AngelHttpException(e));
}
}
2016-12-23 20:57:46 +00:00
/// Transforms a [HookedServiceEvent], so that it can be broadcasted.
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
return new WebSocketEvent(eventName: event.eventName, data: event.result);
}
2016-12-23 20:57:46 +00:00
/// Hooks any [HookedService]s that are not being broadcasted yet.
wireAllServices(Angel app) {
for (Pattern key in app.services.keys.where((x) {
2016-12-23 10:47:21 +00:00
return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
hookupService(key, app.services[key]);
}
2016-06-27 00:42:21 +00:00
}
2016-09-17 20:00:17 +00:00
@override
Future call(Angel app) async {
2017-04-23 18:40:30 +00:00
if (_sendErrors == null) _sendErrors = app.isProduction;
2016-12-23 20:57:46 +00:00
_app = app..container.singleton(this);
2016-09-17 20:00:17 +00:00
if (runtimeType != AngelWebSocket)
app.container.singleton(this, as: AngelWebSocket);
// Set up services
wireAllServices(app);
app.onService.listen((_) {
wireAllServices(app);
});
2017-01-28 21:38:26 +00:00
handler(RequestContext req, ResponseContext res) async {
2016-12-23 20:57:46 +00:00
if (!WebSocketTransformer.isUpgradeRequest(req.io))
2017-01-28 21:38:26 +00:00
throw new AngelHttpException.badRequest();
2016-09-03 12:43:34 +00:00
res
..willCloseItself = true
..end();
2016-12-23 20:57:46 +00:00
var ws = await WebSocketTransformer.upgrade(req.io);
var socket = new WebSocketContext(ws, req, res);
2017-01-29 20:02:19 +00:00
_clients.add(socket);
2016-12-23 20:57:46 +00:00
await handleConnect(socket);
2016-09-18 01:35:16 +00:00
_onConnection.add(socket);
2017-01-29 20:02:19 +00:00
req
..properties['socket'] = socket
..inject(WebSocketContext, socket);
2016-09-17 20:00:17 +00:00
ws.listen((data) {
2016-12-23 20:57:46 +00:00
_onData.add(data);
handleData(socket, data);
}, onDone: () {
2016-09-18 03:16:51 +00:00
_onDisconnect.add(socket);
_clients.remove(ws);
}, onError: (e) {
2016-09-18 03:16:51 +00:00
_onDisconnect.add(socket);
_clients.remove(ws);
}, cancelOnError: true);
2017-01-28 21:38:26 +00:00
}
_register() {
if (register != null)
return register(app, handler);
else
app.get(endpoint, handler);
}
await _register();
2017-02-28 14:15:34 +00:00
if (synchronizer != null) {
synchronizer.stream.listen((e) => batchEvent(e, notify: false));
}
}
}
2017-02-28 14:15:34 +00:00
/// Notifies other nodes of outgoing WWebSocket events, and listens for
/// notifications from other nodes.
abstract class WebSocketSynchronizer {
Stream<WebSocketEvent> get stream;
2017-06-03 18:13:38 +00:00
2017-02-28 14:15:34 +00:00
void notifyOthers(WebSocketEvent e);
}