1.1.0-alpha

This commit is contained in:
Tobe O 2017-09-24 00:37:58 -04:00
parent 1108997817
commit 3c01c4b360
12 changed files with 143 additions and 168 deletions

View file

@ -23,8 +23,13 @@ import "package:angel_websocket/server.dart";
main() async {
var app = new Angel();
// Ensure this runs after all our services are in-place
app.justBeforeStart.add(new AngelWebSocket("/ws"));
var ws = new AngelWebSocket();
// Apply configuration
await app.configure(ws.configureServer);
// Listen for requests at `/ws`.
app.all('/ws', ws.handleRequest);
}
```

View file

@ -2,12 +2,12 @@ import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:angel_client/angel_client.dart';
import 'package:angel_client/base_angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'angel_websocket.dart';
export 'package:angel_client/angel_client.dart';
import 'package:angel_client/base_angel_client.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
@ -133,7 +133,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
Future<WebSocketChannel> getConnectedWebSocket();
@override
WebSocketsService service<T>(String path,
WebSocketsService service(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.toString().replaceAll(_straySlashes, '');
return new WebSocketsService(socket, this, uri,

View file

@ -4,11 +4,11 @@ library angel_websocket.browser;
import 'dart:async';
import 'dart:html';
import 'package:angel_client/angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:http/browser_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/html.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
@ -41,7 +41,7 @@ class WebSockets extends BaseWebSocketClient {
if (wnd.closed) {
ctrl.addError(new AngelHttpException.notAuthenticated(
message:
errorMessage ?? 'Authentication via popup window failed.'));
errorMessage ?? 'Authentication via popup window failed.'));
ctrl.close();
timer.cancel();
sub?.cancel();
@ -50,9 +50,9 @@ class WebSockets extends BaseWebSocketClient {
timer.cancel();
});
sub = window.on[eventName ?? 'token'].listen((CustomEvent e) {
sub = window.on[eventName ?? 'token'].listen((e) {
if (!ctrl.isClosed) {
ctrl.add(e.detail);
ctrl.add((e as CustomEvent).detail);
t.cancel();
ctrl.close();
sub.cancel();
@ -64,7 +64,9 @@ class WebSockets extends BaseWebSocketClient {
@override
Future<WebSocketChannel> getConnectedWebSocket() {
var socket = new WebSocket(authToken?.isNotEmpty == true ? '$basePath?token=$authToken' : basePath );
var socket = new WebSocket(authToken?.isNotEmpty == true
? '$basePath?token=$authToken'
: basePath);
var completer = new Completer<WebSocketChannel>();
socket
@ -72,18 +74,20 @@ class WebSockets extends BaseWebSocketClient {
if (!completer.isCompleted)
return completer.complete(new HtmlWebSocketChannel(socket));
})
..onError.listen((ErrorEvent e) {
if (!completer.isCompleted) return completer.completeError(e.error);
..onError.listen((e) {
var err = e as ErrorEvent;
if (!completer.isCompleted) return completer.completeError(err.error);
});
return completer.future;
}
@override
BrowserWebSocketsService service<T>(String path,
BrowserWebSocketsService service(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.replaceAll(_straySlashes, '');
return new BrowserWebSocketsService(socket, this, uri, deserializer: deserializer);
return new BrowserWebSocketsService(socket, this, uri,
deserializer: deserializer);
}
}

View file

@ -3,11 +3,9 @@ library angel_websocket.flutter;
import 'dart:async';
import 'dart:io';
import 'package:angel_client/angel_client.dart';
import 'package:http/http.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'angel_websocket.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';

View file

@ -46,10 +46,10 @@ class WebSockets extends BaseWebSocketClient {
}
@override
IoWebSocketsService service<T>(String path,
IoWebSocketsService service(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.replaceAll(_straySlashes, '');
return new IoWebSocketsService(socket, this, uri, T != dynamic ? T : type);
return new IoWebSocketsService(socket, this, uri, type);
}
@override

View file

@ -16,22 +16,20 @@ part 'websocket_context.dart';
part 'websocket_controller.dart';
/// Used to assign routes to a given handler.
typedef AngelWebSocketRegisterer(Angel app, RequestHandler handler);
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket extends AngelPlugin {
Angel _app;
class AngelWebSocket {
List<WebSocketContext> _clients = [];
final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>();
new StreamController<WebSocketAction>();
final StreamController _onData = new StreamController();
final StreamController<WebSocketContext> _onConnection =
new StreamController<WebSocketContext>.broadcast();
new StreamController<WebSocketContext>.broadcast();
final StreamController<WebSocketContext> _onDisconnect =
new StreamController<WebSocketContext>.broadcast();
new StreamController<WebSocketContext>.broadcast();
final Angel app;
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
@ -40,16 +38,8 @@ class AngelWebSocket extends AngelPlugin {
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
/// Include debug information, and send error information across WebSockets.
final bool debug;
bool _sendErrors;
/// Send error information across WebSockets, without including [debug] information..
bool get sendErrors => _sendErrors == true;
/// Registers this instance as a route on the server.
final AngelWebSocketRegisterer register;
final bool sendErrors;
/// A list of clients currently connected to this server via WebSockets.
List<WebSocketContext> get clients => new List.unmodifiable(_clients);
@ -58,9 +48,6 @@ class AngelWebSocket extends AngelPlugin {
List<String> get servicesAlreadyWired =>
new List.unmodifiable(_servicesAlreadyWired);
/// The endpoint that users should connect a WebSocket to.
final String endpoint;
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final WebSocketSynchronizer synchronizer;
@ -82,17 +69,13 @@ class AngelWebSocket extends AngelPlugin {
/// Deserializes data from WebSockets.
Function deserializer;
AngelWebSocket({this.endpoint: '/ws',
this.debug: false,
bool sendErrors,
this.allowClientParams: false,
this.allowAuth: true,
this.register,
this.synchronizer,
this.serializer,
this.deserializer}) {
_sendErrors = sendErrors;
AngelWebSocket(this.app,
{this.sendErrors: false,
this.allowClientParams: false,
this.allowAuth: true,
this.synchronizer,
this.serializer,
this.deserializer}) {
if (serializer == null) serializer = god.serialize;
if (deserializer == null) deserializer = (params) => params;
}
@ -117,10 +100,6 @@ class AngelWebSocket extends AngelPlugin {
};
}
void _printDebug(String msg) {
if (debug == true) print(msg);
}
/// Slates an event to be dispatched.
Future batchEvent(WebSocketEvent event,
{filter(WebSocketContext socket), bool notify: true}) async {
@ -129,9 +108,6 @@ class AngelWebSocket extends AngelPlugin {
var result = true;
if (filter != null) result = await filter(client);
if (result == true) {
var serialized = event.toJson();
_printDebug('Batching this event: $serialized');
// print('Serialized: ' + JSON.encode(serialized));
client.io.add((serializer ?? god.serialize)(event.toJson()));
}
});
@ -150,7 +126,7 @@ class AngelWebSocket extends AngelPlugin {
if (split.length < 2)
return socket.sendError(new AngelHttpException.badRequest());
var service = _app.service(split[0]);
var service = app.service(split[0]);
if (service == null)
return socket.sendError(new AngelHttpException.notFound(
@ -170,7 +146,7 @@ class AngelWebSocket extends AngelPlugin {
var params = mergeMap([
(deserializer ?? (params) => params)(action.params),
{
"provider": Providers.WEBSOCKET,
"provider": Providers.websocket,
'__requestctx': socket.request,
'__responsectx': socket.response
}
@ -204,13 +180,7 @@ class AngelWebSocket extends AngelPlugin {
message: "Method Not Allowed: \"$actionName\""));
}
} catch (e, st) {
if (e is AngelHttpException)
return socket.sendError(e);
else if (debug == true || _sendErrors == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
catchError(e, st, socket);
}
}
@ -228,19 +198,13 @@ class AngelWebSocket extends AngelPlugin {
token = new AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
var req = socket.request;
req..inject(AuthToken, req.properties['token'] = token)..inject(
user.runtimeType, req.properties["user"] = user);
req
..inject(AuthToken, req.properties['token'] = token)
..inject(user.runtimeType, req.properties["user"] = user);
socket.send(EVENT_AUTHENTICATED,
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (debug == true || _sendErrors == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
catchError(e, st, socket);
}
} else {
socket.sendError(new AngelHttpException.badRequest(
@ -251,12 +215,15 @@ class AngelWebSocket extends AngelPlugin {
/// Hooks a service up to have its events broadcasted.
hookupService(Pattern _path, HookedService service) {
String path = _path.toString();
service.after([
HookedServiceEvent.CREATED,
HookedServiceEvent.MODIFIED,
HookedServiceEvent.UPDATED,
HookedServiceEvent.REMOVED
], serviceHook(path));
service.after(
[
HookedServiceEvent.created,
HookedServiceEvent.modified,
HookedServiceEvent.updated,
HookedServiceEvent.removed
],
serviceHook(path),
);
_servicesAlreadyWired.add(path);
}
@ -298,14 +265,24 @@ class AngelWebSocket extends AngelPlugin {
}
}
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (debug == true || _sendErrors == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
catchError(e, st, socket);
}
}
void catchError(e, StackTrace st, WebSocketContext socket) {
// Send an error
if (e is AngelHttpException) {
socket.sendError(e);
app.logger?.severe(e.message, e.error ?? e, e.stackTrace);
} else if (sendErrors) {
var err = new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]);
socket.sendError(err);
app.logger?.severe(err.message, e, st);
} else {
var err = new AngelHttpException(e);
socket.sendError(err);
app.logger?.severe(e.toString(), e, st);
}
}
@ -324,11 +301,9 @@ class AngelWebSocket extends AngelPlugin {
}
}
@override
Future call(Angel app) async {
if (_sendErrors == null) _sendErrors = app.isProduction;
_app = app..container.singleton(this);
/// Configiures an [Angel] instance to listen for WebSocket connections.
Future configureServer(Angel app) async {
app..container.singleton(this);
if (runtimeType != AngelWebSocket)
app.container.singleton(this, as: AngelWebSocket);
@ -340,50 +315,48 @@ class AngelWebSocket extends AngelPlugin {
wireAllServices(app);
});
handler(RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.io))
throw new AngelHttpException.badRequest();
res
..willCloseItself = true
..end();
var ws = await WebSocketTransformer.upgrade(req.io);
var socket = new WebSocketContext(ws, req, res);
_clients.add(socket);
await handleConnect(socket);
_onConnection.add(socket);
req
..properties['socket'] = socket
..inject(WebSocketContext, socket);
ws.listen((data) {
_onData.add(data);
handleData(socket, data);
}, onDone: () {
_onDisconnect.add(socket);
_clients.remove(ws);
}, onError: (e) {
_onDisconnect.add(socket);
_clients.remove(ws);
}, cancelOnError: true);
}
_register() {
if (register != null)
return register(app, handler);
else
app.get(endpoint, handler);
}
await _register();
if (synchronizer != null) {
synchronizer.stream.listen((e) => batchEvent(e, notify: false));
}
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.io))
throw new AngelHttpException.badRequest();
res
..willCloseItself = true
..end();
var ws = await WebSocketTransformer.upgrade(req.io);
var socket = new WebSocketContext(ws, req, res);
_clients.add(socket);
await handleConnect(socket);
_onConnection.add(socket);
req
..properties['socket'] = socket
..inject(WebSocketContext, socket);
ws.listen(
(data) {
_onData.add(data);
handleData(socket, data);
},
onDone: () {
_onDisconnect.add(socket);
_clients.remove(ws);
},
onError: (e) {
_onDisconnect.add(socket);
_clients.remove(ws);
},
cancelOnError: true,
);
return false;
}
}
/// Notifies other nodes of outgoing WWebSocket events, and listens for

View file

@ -78,16 +78,7 @@ class WebSocketController extends Controller {
return app.runContained(fn, socket.request, socket.response);
}
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (ws.debug == true || ws.sendErrors == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(),
stackTrace: st,
errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
ws.catchError(e, st, socket);
}
});
});

View file

@ -2,18 +2,18 @@ name: angel_websocket
description: WebSocket plugin for Angel.
environment:
sdk: ">=1.19.0"
version: 1.0.8
version: 1.1.0-alpha
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket
dependencies:
angel_auth: ^1.0.0-dev
angel_client: "^1.0.0"
angel_framework: ^1.0.0-dev
angel_auth: ^1.1.0-alpha
angel_client: ^1.1.0-alpha
angel_framework: ^1.1.0-alpha
http: ">=0.11.0 <0.12.0"
json_god: ^2.0.0-beta
merge_map: ^1.0.0
meta: ^1.0.0
uuid: "^0.5.3"
web_socket_channel: "^1.0.0"
dev_dependencies:
angel_diagnostics: "^1.0.0"
test: "^0.12.15"

View file

@ -1,10 +1,10 @@
import 'dart:async';
import 'package:angel_auth/angel_auth.dart';
import 'package:angel_client/io.dart' as c;
import 'package:angel_diagnostics/angel_diagnostics.dart';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_websocket/io.dart' as c;
import 'package:angel_websocket/server.dart';
import 'package:logging/logging.dart';
import 'package:test/test.dart';
const Map<String, String> USER = const {'username': 'foo', 'password': 'bar'};
@ -27,10 +27,11 @@ main() {
app.post('/auth/local', auth.authenticate('local'));
await app.configure(auth);
var sock = new AngelWebSocket();
await app.configure(sock);
await app.configure(logRequests());
await app.configure(auth.configureServer);
var sock = new AngelWebSocket(app);
await app.configure(sock.configureServer);
app.all('/ws', sock.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
var server = await app.startServer();
client = new c.Rest('http://${server.address.address}:${server.port}');
@ -38,12 +39,13 @@ main() {
await ws.connect();
});
tearDown(() =>
Future.wait([
app.close(),
client.close(),
ws.close()
]));
tearDown(() {
return Future.wait([
app.close(),
client.close(),
ws.close(),
]);
});
test('auth event fires', () async {
var localAuth = await client.authenticate(type: 'local', credentials: USER);

View file

@ -1,8 +1,8 @@
import 'dart:io';
import 'package:angel_diagnostics/angel_diagnostics.dart';
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
@ -16,14 +16,15 @@ main() {
setUp(() async {
app = new srv.Angel();
websockets = new srv.AngelWebSocket(debug: true)
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets);
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
await app.configure(new GameController());
await app.configure(logRequests(new File('log.txt')));
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await app.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';

View file

@ -1,8 +1,8 @@
import 'dart:io';
import 'package:angel_diagnostics/angel_diagnostics.dart';
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
@ -16,13 +16,14 @@ main() {
setUp(() async {
app = new srv.Angel()..use('/api/todos', new TodoService());
websockets = new srv.AngelWebSocket(debug: true)
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets);
await app.configure(logRequests(new File('log.txt')));
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await app.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';