This commit is contained in:
Tobe O 2019-01-05 21:41:46 -05:00
parent 6a5a4c35ff
commit ed71ebaaeb
14 changed files with 184 additions and 108 deletions

View file

@ -1,3 +1,6 @@
# 2.0.0
* Update to work with `client@2.0.0`.
# 2.0.0-alpha.8 # 2.0.0-alpha.8
* Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever). * Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever).

View file

@ -1,3 +1,6 @@
include: package:pedantic/analysis_options.yaml
analyzer: analyzer:
strong-mode: strong-mode:
implicit-casts: false implicit-casts: false
errors:
unawaited_futures: ignore

View file

@ -11,12 +11,7 @@ main(List<String> args) async {
var http = new AngelHttp(app); var http = new AngelHttp(app);
var ws = new AngelWebSocket(app, sendErrors: !app.isProduction); var ws = new AngelWebSocket(app, sendErrors: !app.isProduction);
var fs = const LocalFileSystem(); var fs = const LocalFileSystem();
app.logger = new Logger('angel_auth') app.logger = new Logger('angel_websocket');
..onRecord.listen((rec) {
print(rec);
if (rec.error != null) print(rec.error);
if (rec.stackTrace != null) print(rec.stackTrace);
});
// This is a plug-in. It hooks all your services, // This is a plug-in. It hooks all your services,
// to automatically broadcast events. // to automatically broadcast events.

View file

@ -1,43 +1,6 @@
/// WebSocket plugin for Angel. /// WebSocket plugin for Angel.
library angel_websocket; library angel_websocket;
const String ACTION_AUTHENTICATE = 'authenticate';
const String ACTION_INDEX = 'index';
const String ACTION_READ = 'read';
const String ACTION_CREATE = 'create';
const String ACTION_MODIFY = 'modify';
const String ACTION_UPDATE = 'update';
const String ACTION_REMOVE = 'remove';
const String EVENT_AUTHENTICATED = 'authenticated';
const String EVENT_ERROR = 'error';
const String EVENT_INDEXED = 'indexed';
const String EVENT_READ = 'read';
const String EVENT_CREATED = 'created';
const String EVENT_MODIFIED = 'modified';
const String EVENT_UPDATED = 'updated';
const String EVENT_REMOVED = 'removed';
/// The standard Angel service actions.
const List<String> ACTIONS = const [
ACTION_INDEX,
ACTION_READ,
ACTION_CREATE,
ACTION_MODIFY,
ACTION_UPDATE,
ACTION_REMOVE
];
/// The standard Angel service events.
const List<String> EVENTS = const [
EVENT_INDEXED,
EVENT_READ,
EVENT_CREATED,
EVENT_MODIFIED,
EVENT_UPDATED,
EVENT_REMOVED
];
/// A notification from the server that something has occurred. /// A notification from the server that something has occurred.
class WebSocketEvent<Data> { class WebSocketEvent<Data> {
String eventName; String eventName;

View file

@ -8,6 +8,7 @@ import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status; import 'package:web_socket_channel/status.dart' as status;
import 'angel_websocket.dart'; import 'angel_websocket.dart';
import 'constants.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
@ -66,9 +67,28 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// The amount of time to wait between reconnect attempts. Default: 10 seconds. /// The amount of time to wait between reconnect attempts. Default: 10 seconds.
Duration get reconnectInterval => _reconnectInterval; Duration get reconnectInterval => _reconnectInterval;
BaseWebSocketClient(http.BaseClient client, String basePath, Uri _wsUri;
{this.reconnectOnClose: true, Duration reconnectInterval})
: super(client, basePath) { /// The [Uri] to which a websocket should point.
Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl);
static Uri _toWsUri(Uri u) {
if (u.hasScheme) {
if (u.scheme == 'http') {
return u.replace(scheme: 'ws');
} else if (u.scheme == 'https') {
return u.replace(scheme: 'wss');
} else {
return u;
}
} else {
return _toWsUri(u.replace(scheme: Uri.base.scheme));
}
}
BaseWebSocketClient(http.BaseClient client, baseUrl,
{this.reconnectOnClose = true, Duration reconnectInterval})
: super(client, baseUrl) {
_reconnectInterval = reconnectInterval ?? new Duration(seconds: 10); _reconnectInterval = reconnectInterval ?? new Duration(seconds: 10);
} }
@ -159,11 +179,11 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
on._getStream(event.eventName).add(event); on._getStream(event.eventName).add(event);
} }
if (event.eventName == EVENT_ERROR) { if (event.eventName == errorEvent) {
var error = var error =
new AngelHttpException.fromMap((event.data ?? {}) as Map); new AngelHttpException.fromMap((event.data ?? {}) as Map);
_onError.add(error); _onError.add(error);
} else if (event.eventName == EVENT_AUTHENTICATED) { } else if (event.eventName == authenticatedEvent) {
var authResult = new AngelAuthResult.fromMap(event.data as Map); var authResult = new AngelAuthResult.fromMap(event.data as Map);
_onAuthenticated.add(authResult); _onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) { } else if (event.eventName?.isNotEmpty == true) {
@ -203,10 +223,6 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// Serializes data to JSON. /// Serializes data to JSON.
serialize(x) => json.encode(x); serialize(x) => json.encode(x);
/// Alternative form of [send]ing an action.
void send(String eventName, WebSocketAction action) =>
sendAction(action..eventName = eventName);
/// Sends the given [action] on the [socket]. /// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) { void sendAction(WebSocketAction action) {
if (_socket == null) if (_socket == null)
@ -217,11 +233,12 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// Attempts to authenticate a WebSocket, using a valid JWT. /// Attempts to authenticate a WebSocket, using a valid JWT.
void authenticateViaJwt(String jwt) { void authenticateViaJwt(String jwt) {
send( sendAction(new WebSocketAction(
ACTION_AUTHENTICATE, eventName: authenticateAction,
new WebSocketAction(params: { params: {
'query': {'jwt': jwt} 'query': {'jwt': jwt}
})); },
));
} }
} }
@ -306,7 +323,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
_onAllEvents.add(event); _onAllEvents.add(event);
if (event.eventName == EVENT_INDEXED) { if (event.eventName == indexedEvent) {
var d = event.data; var d = event.data;
var transformed = new WebSocketEvent( var transformed = new WebSocketEvent(
eventName: event.eventName, eventName: event.eventName,
@ -318,19 +335,19 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
var transformed = transformEvent(event).data; var transformed = transformEvent(event).data;
switch (event.eventName) { switch (event.eventName) {
case EVENT_READ: case readEvent:
_onRead.add(transformed); _onRead.add(transformed);
break; break;
case EVENT_CREATED: case createdEvent:
_onCreated.add(transformed); _onCreated.add(transformed);
break; break;
case EVENT_MODIFIED: case modifiedEvent:
_onModified.add(transformed); _onModified.add(transformed);
break; break;
case EVENT_UPDATED: case updatedEvent:
_onUpdated.add(transformed); _onUpdated.add(transformed);
break; break;
case EVENT_REMOVED: case removedEvent:
_onRemoved.add(transformed); _onRemoved.add(transformed);
break; break;
} }
@ -346,14 +363,14 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
@override @override
Future<List<Data>> index([Map<String, dynamic> params]) async { Future<List<Data>> index([Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_INDEX}', params: params ?? {})); eventName: '$path::$indexAction', params: params ?? {}));
return null; return null;
} }
@override @override
Future<Data> read(id, [Map<String, dynamic> params]) async { Future<Data> read(id, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_READ}', eventName: '$path::$readAction',
id: id.toString(), id: id.toString(),
params: params ?? {})); params: params ?? {}));
return null; return null;
@ -362,16 +379,14 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
@override @override
Future<Data> create(data, [Map<String, dynamic> params]) async { Future<Data> create(data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_CREATE}', eventName: '$path::$createAction', data: data, params: params ?? {}));
data: data,
params: params ?? {}));
return null; return null;
} }
@override @override
Future<Data> modify(id, data, [Map<String, dynamic> params]) async { Future<Data> modify(id, data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_MODIFY}', eventName: '$path::$modifyAction',
id: id.toString(), id: id.toString(),
data: data, data: data,
params: params ?? {})); params: params ?? {}));
@ -381,7 +396,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
@override @override
Future<Data> update(id, data, [Map<String, dynamic> params]) async { Future<Data> update(id, data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_UPDATE}', eventName: '$path::$updateAction',
id: id.toString(), id: id.toString(),
data: data, data: data,
params: params ?? {})); params: params ?? {}));
@ -391,7 +406,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
@override @override
Future<Data> remove(id, [Map<String, dynamic> params]) async { Future<Data> remove(id, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_REMOVE}', eventName: '$path::$removeAction',
id: id.toString(), id: id.toString(),
params: params ?? {})); params: params ?? {}));
return null; return null;

View file

@ -17,7 +17,7 @@ final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
class WebSockets extends BaseWebSocketClient { class WebSockets extends BaseWebSocketClient {
final List<BrowserWebSocketsService> _services = []; final List<BrowserWebSocketsService> _services = [];
WebSockets(String path) : super(new http.BrowserClient(), path); WebSockets(path) : super(new http.BrowserClient(), path);
@override @override
Future close() { Future close() {
@ -30,7 +30,7 @@ class WebSockets extends BaseWebSocketClient {
@override @override
Stream<String> authenticateViaPopup(String url, Stream<String> authenticateViaPopup(String url,
{String eventName: 'token', String errorMessage}) { {String eventName = 'token', String errorMessage}) {
var ctrl = new StreamController<String>(); var ctrl = new StreamController<String>();
var wnd = window.open(url, 'angel_client_auth_popup'); var wnd = window.open(url, 'angel_client_auth_popup');
@ -64,9 +64,15 @@ class WebSockets extends BaseWebSocketClient {
@override @override
Future<WebSocketChannel> getConnectedWebSocket() { Future<WebSocketChannel> getConnectedWebSocket() {
var socket = new WebSocket(authToken?.isNotEmpty == true var url = websocketUri;
? '$basePath?token=$authToken'
: basePath); if (authToken?.isNotEmpty == true) {
url = url.replace(
queryParameters: new Map<String, String>.from(url.queryParameters)
..['token'] = authToken);
}
var socket = new WebSocket(url.toString());
var completer = new Completer<WebSocketChannel>(); var completer = new Completer<WebSocketChannel>();
socket socket

87
lib/constants.dart Normal file
View file

@ -0,0 +1,87 @@
const String authenticateAction = 'authenticate';
const String indexAction = 'index';
const String readAction = 'read';
const String createAction = 'create';
const String modifyAction = 'modify';
const String updateAction = 'update';
const String removeAction = 'remove';
@deprecated
const String ACTION_AUTHENTICATE = authenticateAction;
@deprecated
const String ACTION_INDEX = indexAction;
@deprecated
const String ACTION_READ = readAction;
@deprecated
const String ACTION_CREATE = createAction;
@deprecated
const String ACTION_MODIFY = modifyAction;
@deprecated
const String ACTION_UPDATE = updateAction;
@deprecated
const String ACTION_REMOVE = removeAction;
const String authenticatedEvent = 'authenticated';
const String errorEvent = 'error';
const String indexedEvent = 'indexed';
const String readEvent = 'read';
const String createdEvent = 'created';
const String modifiedEvent = 'modified';
const String updatedEvent = 'updated';
const String removedEvent = 'removed';
@deprecated
const String EVENT_AUTHENTICATED = authenticatedEvent;
@deprecated
const String EVENT_ERROR = errorEvent;
@deprecated
const String EVENT_INDEXED = indexedEvent;
@deprecated
const String EVENT_READ = readEvent;
@deprecated
const String EVENT_CREATED = createdEvent;
@deprecated
const String EVENT_MODIFIED = modifiedEvent;
@deprecated
const String EVENT_UPDATED = updatedEvent;
@deprecated
const String EVENT_REMOVED = removedEvent;
/// The standard Angel service actions.
const List<String> actions = const <String>[
indexAction,
readAction,
createAction,
modifyAction,
updateAction,
removeAction
];
@deprecated
const List<String> ACTIONS = actions;
/// The standard Angel service events.
const List<String> events = const <String>[
indexedEvent,
readEvent,
createdEvent,
modifiedEvent,
updatedEvent,
removedEvent
];
@deprecated
const List<String> EVENTS = events;

View file

@ -20,7 +20,8 @@ class WebSockets extends BaseWebSocketClient {
WebSockets(String path) : super(new http.IOClient(), path); WebSockets(String path) : super(new http.IOClient(), path);
@override @override
Stream<String> authenticateViaPopup(String url, {String eventName: 'token'}) { Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw new UnimplementedError( throw new UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.'); 'Opening popup windows is not supported in the `dart:io` client.');
} }
@ -36,7 +37,7 @@ class WebSockets extends BaseWebSocketClient {
@override @override
Future<WebSocketChannel> getConnectedWebSocket() async { Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(basePath, var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'} ? {'Authorization': 'Bearer $authToken'}
: {}); : {});

View file

@ -21,7 +21,8 @@ class WebSockets extends BaseWebSocketClient {
WebSockets(String path) : super(new http.IOClient(), path); WebSockets(String path) : super(new http.IOClient(), path);
@override @override
Stream<String> authenticateViaPopup(String url, {String eventName: 'token'}) { Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw new UnimplementedError( throw new UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.'); 'Opening popup windows is not supported in the `dart:io` client.');
} }
@ -37,7 +38,7 @@ class WebSockets extends BaseWebSocketClient {
@override @override
Future<WebSocketChannel> getConnectedWebSocket() async { Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(basePath, var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'} ? {'Authorization': 'Bearer $authToken'}
: {}); : {});

View file

@ -14,6 +14,7 @@ import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart';
import 'angel_websocket.dart'; import 'angel_websocket.dart';
import 'constants.dart';
export 'angel_websocket.dart'; export 'angel_websocket.dart';
part 'websocket_context.dart'; part 'websocket_context.dart';
@ -82,9 +83,9 @@ class AngelWebSocket {
Function deserializer; Function deserializer;
AngelWebSocket(this.app, AngelWebSocket(this.app,
{this.sendErrors: false, {this.sendErrors = false,
this.allowClientParams: false, this.allowClientParams = false,
this.allowAuth: true, this.allowAuth = true,
this.synchronizationChannel, this.synchronizationChannel,
this.serializer, this.serializer,
this.deserializer, this.deserializer,
@ -115,8 +116,8 @@ class AngelWebSocket {
} }
/// Slates an event to be dispatched. /// Slates an event to be dispatched.
Future batchEvent(WebSocketEvent event, Future<void> batchEvent(WebSocketEvent event,
{filter(WebSocketContext socket), bool notify: true}) async { {filter(WebSocketContext socket), bool notify = true}) async {
// Default implementation will just immediately fire events // Default implementation will just immediately fire events
_clients.forEach((client) async { _clients.forEach((client) async {
dynamic result = true; dynamic result = true;
@ -172,29 +173,29 @@ class AngelWebSocket {
]); ]);
try { try {
if (actionName == ACTION_INDEX) { if (actionName == indexAction) {
socket.send( socket.send(
"${split[0]}::" + EVENT_INDEXED, await service.index(params)); "${split[0]}::" + indexedEvent, await service.index(params));
return null; return null;
} else if (actionName == ACTION_READ) { } else if (actionName == readAction) {
socket.send("${split[0]}::" + EVENT_READ, socket.send(
await service.read(action.id, params)); "${split[0]}::" + readEvent, await service.read(action.id, params));
return null; return null;
} else if (actionName == ACTION_CREATE) { } else if (actionName == createAction) {
return new WebSocketEvent( return new WebSocketEvent(
eventName: "${split[0]}::" + EVENT_CREATED, eventName: "${split[0]}::" + createdEvent,
data: await service.create(action.data, params)); data: await service.create(action.data, params));
} else if (actionName == ACTION_MODIFY) { } else if (actionName == modifyAction) {
return new WebSocketEvent( return new WebSocketEvent(
eventName: "${split[0]}::" + EVENT_MODIFIED, eventName: "${split[0]}::" + modifiedEvent,
data: await service.modify(action.id, action.data, params)); data: await service.modify(action.id, action.data, params));
} else if (actionName == ACTION_UPDATE) { } else if (actionName == updateAction) {
return new WebSocketEvent( return new WebSocketEvent(
eventName: "${split[0]}::" + EVENT_UPDATED, eventName: "${split[0]}::" + updatedEvent,
data: await service.update(action.id, action.data, params)); data: await service.update(action.id, action.data, params));
} else if (actionName == ACTION_REMOVE) { } else if (actionName == removeAction) {
return new WebSocketEvent( return new WebSocketEvent(
eventName: "${split[0]}::" + EVENT_REMOVED, eventName: "${split[0]}::" + removedEvent,
data: await service.remove(action.id, params)); data: await service.remove(action.id, params));
} else { } else {
socket.sendError(new AngelHttpException.methodNotAllowed( socket.sendError(new AngelHttpException.methodNotAllowed(
@ -209,7 +210,7 @@ class AngelWebSocket {
/// Authenticates a [WebSocketContext]. /// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async { Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false && if (allowAuth != false &&
action.eventName == ACTION_AUTHENTICATE && action.eventName == authenticateAction &&
action.params['query'] is Map && action.params['query'] is Map &&
action.params['query']['jwt'] is String) { action.params['query']['jwt'] is String) {
try { try {
@ -222,7 +223,7 @@ class AngelWebSocket {
socket.request socket.request
..container.registerSingleton<AuthToken>(token) ..container.registerSingleton<AuthToken>(token)
..container.registerSingleton(user, as: user.runtimeType as Type); ..container.registerSingleton(user, as: user.runtimeType as Type);
socket.send(EVENT_AUTHENTICATED, socket.send(authenticatedEvent,
{'token': token.serialize(auth.hmac), 'data': user}); {'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) { } catch (e, st) {
catchError(e, st, socket); catchError(e, st, socket);
@ -272,14 +273,14 @@ class AngelWebSocket {
.add(fromJson["data"] as Map); .add(fromJson["data"] as Map);
} }
if (action.eventName == ACTION_AUTHENTICATE) if (action.eventName == authenticateAction)
await handleAuth(action, socket); await handleAuth(action, socket);
if (action.eventName.contains("::")) { if (action.eventName.contains("::")) {
var split = action.eventName.split("::"); var split = action.eventName.split("::");
if (split.length >= 2) { if (split.length >= 2) {
if (ACTIONS.contains(split[1])) { if (actions.contains(split[1])) {
var event = await handleAction(action, socket); var event = await handleAction(action, socket);
if (event is Future) event = await event; if (event is Future) event = await event;
} }

View file

@ -49,7 +49,7 @@ class WebSocketContext {
} }
/// Sends an error event. /// Sends an error event.
void sendError(AngelHttpException error) => send(EVENT_ERROR, error.toJson()); void sendError(AngelHttpException error) => send(errorEvent, error.toJson());
} }
class _WebSocketEventTable { class _WebSocketEventTable {

View file

@ -1,8 +1,8 @@
name: angel_websocket name: angel_websocket
description: WebSocket plugin for Angel. description: Support for using pkg:angel_client with WebSockets. Designed for Angel.
environment: environment:
sdk: ">=2.0.0-dev <3.0.0" sdk: ">=2.0.0-dev <3.0.0"
version: 2.0.0-alpha.8 version: 2.0.0
author: Tobe O <thosakwe@gmail.com> author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket homepage: https://github.com/angel-dart/angel_websocket
dependencies: dependencies:
@ -19,4 +19,5 @@ dev_dependencies:
angel_container: ^1.0.0-alpha angel_container: ^1.0.0-alpha
angel_model: ^1.0.0 angel_model: ^1.0.0
logging: ^0.11.0 logging: ^0.11.0
pedantic: ^1.0.0
test: ^1.0.0 test: ^1.0.0

View file

@ -61,7 +61,7 @@ main() {
group('controller.io', () { group('controller.io', () {
test('search', () async { test('search', () async {
client.send('search', new ws.WebSocketAction()); client.sendAction(new ws.WebSocketAction(eventName: 'search'));
var search = await client.on['searched'].first; var search = await client.on['searched'].first;
print('Searched: ${search.data}'); print('Searched: ${search.data}');
expect(new Game.fromJson(search.data as Map), equals(johnVsBob)); expect(new Game.fromJson(search.data as Map), equals(johnVsBob));

View file

@ -4,7 +4,7 @@ import 'package:angel_websocket/browser.dart';
/// Dummy app to ensure client works with DDC. /// Dummy app to ensure client works with DDC.
main() { main() {
var app = new WebSockets(window.location.origin); var app = new WebSockets(window.location.origin);
window.alert(app.basePath); window.alert(app.baseUrl.toString());
app.connect().catchError((_) { app.connect().catchError((_) {
window.alert('no websocket'); window.alert('no websocket');