This commit is contained in:
Tobe O 2018-07-10 12:54:55 -04:00
parent 23d0ab2105
commit 03aae5624b
16 changed files with 130 additions and 81 deletions

View file

@ -1,3 +1,8 @@
# 1.1.2
* Dart 2 updates.
* Added `handleClient`, which is nice for external implementations
that plug into `AngelWebSocket`.
# 1.1.1
* Deprecated `unwrap`.
* Service streams now pump out `e.data`, rather than the actual event.

View file

@ -1,4 +1,3 @@
analyzer:
strong-mode: true
exclude:
- .scripts-bin/**/*.dart
strong-mode:
implicit-casts: false

18
example/main.dart Normal file
View file

@ -0,0 +1,18 @@
import "package:angel_framework/angel_framework.dart";
import "package:angel_websocket/server.dart";
main() async {
var app = new Angel();
var http = new AngelHttp(app);
var ws = new AngelWebSocket(app);
// This is a plug-in. It hooks all your services,
// to automatically broadcast events.
await app.configure(ws.configureServer);
// Listen for requests at `/ws`.
app.all('/ws', ws.handleRequest);
var server = await http.startServer('127.0.0.1', 3000);
print('Listening at http://${server.address.address}:${server.port}');
}

View file

@ -46,7 +46,7 @@ class WebSocketEvent {
WebSocketEvent({String this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) =>
new WebSocketEvent(eventName: data['eventName'], data: data['data']);
new WebSocketEvent(eventName: data['eventName'].toString(), data: data['data']);
Map toJson() {
return {'eventName': eventName, 'data': data};
@ -64,8 +64,8 @@ class WebSocketAction {
{String this.id, String this.eventName, this.data, this.params});
factory WebSocketAction.fromJson(Map data) => new WebSocketAction(
id: data['id'],
eventName: data['eventName'],
id: data['id'].toString(),
eventName: data['eventName'].toString(),
data: data['data'],
params: data['params']);

View file

@ -1,9 +1,9 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:angel_client/angel_client.dart';
import 'package:angel_client/base_angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:dart2_constant/convert.dart';
import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
@ -109,7 +109,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
c.complete(socket);
}
}).catchError((e, st) {
}).catchError((e, StackTrace st) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(e, st);
@ -148,10 +148,10 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
if (data is WebSocketChannelException) {
_onWebSocketChannelException.add(data);
} else if (data is String) {
var json = JSON.decode(data);
var jsons = json.decode(data);
if (json is Map) {
var event = new WebSocketEvent.fromJson(json);
if (jsons is Map) {
var event = new WebSocketEvent.fromJson(jsons);
if (event.eventName?.isNotEmpty == true) {
_onAllEvents.add(event);
@ -159,10 +159,10 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
if (event.eventName == EVENT_ERROR) {
var error = new AngelHttpException.fromMap(event.data ?? {});
var error = new AngelHttpException.fromMap((event.data ?? {}) as Map);
_onError.add(error);
} else if (event.eventName == EVENT_AUTHENTICATED) {
var authResult = new AngelAuthResult.fromMap(event.data);
var authResult = new AngelAuthResult.fromMap(event.data as Map);
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName
@ -199,7 +199,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
/// Serializes data to JSON.
serialize(x) => JSON.encode(x);
serialize(x) => json.encode(x);
/// Alternative form of [send]ing an action.
void send(String eventName, WebSocketAction action) =>
@ -289,7 +289,7 @@ class WebSocketsService extends Service {
}
/// Serializes an [action] to be sent over a WebSocket.
serialize(WebSocketAction action) => JSON.encode(action);
serialize(WebSocketAction action) => json.encode(action);
/// Deserializes data from a [WebSocketEvent].
deserialize(x) {
@ -349,7 +349,7 @@ class WebSocketsService extends Service {
@override
Future read(id, [Map params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_READ}', id: id, params: params ?? {}));
eventName: '$path::${ACTION_READ}', id: id.toString(), params: params ?? {}));
return null;
}
@ -366,7 +366,7 @@ class WebSocketsService extends Service {
Future modify(id, data, [Map params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_MODIFY}',
id: id,
id: id.toString(),
data: data,
params: params ?? {}));
return null;
@ -376,7 +376,7 @@ class WebSocketsService extends Service {
Future update(id, data, [Map params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_UPDATE}',
id: id,
id: id.toString(),
data: data,
params: params ?? {}));
return null;
@ -385,7 +385,7 @@ class WebSocketsService extends Service {
@override
Future remove(id, [Map params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_REMOVE}', id: id, params: params ?? {}));
eventName: '$path::${ACTION_REMOVE}', id: id.toString(), params: params ?? {}));
return null;
}

View file

@ -52,7 +52,7 @@ class WebSockets extends BaseWebSocketClient {
sub = window.on[eventName ?? 'token'].listen((e) {
if (!ctrl.isClosed) {
ctrl.add((e as CustomEvent).detail);
ctrl.add((e as CustomEvent).detail.toString());
t.cancel();
ctrl.close();
sub.cancel();
@ -93,7 +93,7 @@ class WebSockets extends BaseWebSocketClient {
class BrowserWebSocketsService extends WebSocketsService {
final Type type;
BrowserWebSocketsService(WebSocketChannel socket, Angel app, String uri,
BrowserWebSocketsService(WebSocketChannel socket, WebSockets app, String uri,
{this.type, AngelDeserializer deserializer})
: super(socket, app, uri, deserializer: deserializer);
}

View file

@ -10,13 +10,13 @@ import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
// final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<WebSocketsService> _services = [];
WebSockets(String path) : super(new http.Client(), path);
WebSockets(String path) : super(new http.IOClient(), path);
@override
Stream<String> authenticateViaPopup(String url, {String eventName: 'token'}) {

View file

@ -19,7 +19,7 @@ final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
class WebSockets extends BaseWebSocketClient {
final List<IoWebSocketsService> _services = [];
WebSockets(String path) : super(new http.Client(), path);
WebSockets(String path) : super(new http.IOClient(), path);
@override
Stream<String> authenticateViaPopup(String url, {String eventName: 'token'}) {
@ -59,7 +59,7 @@ class WebSockets extends BaseWebSocketClient {
class IoWebSocketsService extends WebSocketsService {
final Type type;
IoWebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type)
IoWebSocketsService(WebSocketChannel socket, WebSockets app, String uri, this.type)
: super(socket, app, uri);
@override

View file

@ -2,11 +2,11 @@
library angel_websocket.server;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:mirrors';
import 'package:angel_auth/angel_auth.dart';
import 'package:angel_framework/angel_framework.dart';
import 'package:dart2_constant/convert.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:merge_map/merge_map.dart';
import 'package:web_socket_channel/io.dart';
@ -18,9 +18,11 @@ part 'websocket_context.dart';
part 'websocket_controller.dart';
typedef String WebSocketResponseSerializer(data);
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket {
List<WebSocketContext> _clients = [];
List<WebSocketContext> _clients = <WebSocketContext>[];
final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction =
@ -40,7 +42,7 @@ class AngelWebSocket {
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
/// Send error information across WebSockets, without including [debug] information..
/// Send error information across WebSockets, without including debug information..
final bool sendErrors;
/// A list of clients currently connected to this server via WebSockets.
@ -66,7 +68,7 @@ class AngelWebSocket {
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
/// Serializes data to WebSockets.
ResponseSerializer serializer;
WebSocketResponseSerializer serializer;
/// Deserializes data from WebSockets.
Function deserializer;
@ -82,7 +84,7 @@ class AngelWebSocket {
if (deserializer == null) deserializer = (params) => params;
}
serviceHook(String path) {
HookedServiceEventListener serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) return;
@ -107,7 +109,7 @@ class AngelWebSocket {
{filter(WebSocketContext socket), bool notify: true}) async {
// Default implementation will just immediately fire events
_clients.forEach((client) async {
var result = true;
dynamic result = true;
if (filter != null) result = await filter(client);
if (result == true) {
client.channel.sink.add((serializer ?? god.serialize)(event.toJson()));
@ -125,14 +127,18 @@ class AngelWebSocket {
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::");
if (split.length < 2)
return socket.sendError(new AngelHttpException.badRequest());
if (split.length < 2) {
socket.sendError(new AngelHttpException.badRequest());
return null;
}
var service = app.service(split[0]);
if (service == null)
return socket.sendError(new AngelHttpException.notFound(
if (service == null) {
socket.sendError(new AngelHttpException.notFound(
message: "No service \"${split[0]}\" exists."));
return null;
}
var actionName = split[1];
@ -146,7 +152,7 @@ class AngelWebSocket {
}
var params = mergeMap([
(deserializer ?? (params) => params)(action.params),
((deserializer ?? (params) => params)(action.params)) as Map,
{
"provider": Providers.websocket,
'__requestctx': socket.request,
@ -156,11 +162,13 @@ class AngelWebSocket {
try {
if (actionName == ACTION_INDEX) {
return socket.send(
socket.send(
"${split[0]}::" + EVENT_INDEXED, await service.index(params));
return null;
} else if (actionName == ACTION_READ) {
return socket.send("${split[0]}::" + EVENT_READ,
socket.send("${split[0]}::" + EVENT_READ,
await service.read(action.id, params));
return null;
} else if (actionName == ACTION_CREATE) {
return new WebSocketEvent(
eventName: "${split[0]}::" + EVENT_CREATED,
@ -178,8 +186,9 @@ class AngelWebSocket {
eventName: "${split[0]}::" + EVENT_REMOVED,
data: await service.remove(action.id, params));
} else {
return socket.sendError(new AngelHttpException.methodNotAllowed(
socket.sendError(new AngelHttpException.methodNotAllowed(
message: "Method Not Allowed: \"$actionName\""));
return null;
}
} catch (e, st) {
catchError(e, st, socket);
@ -236,8 +245,8 @@ class AngelWebSocket {
handleData(WebSocketContext socket, data) async {
try {
socket._onData.add(data);
var fromJson = JSON.decode(data);
var action = new WebSocketAction.fromJson(fromJson);
var fromJson = json.decode(data.toString());
var action = new WebSocketAction.fromJson(fromJson as Map);
_onAction.add(action);
if (action.eventName == null ||
@ -250,7 +259,7 @@ class AngelWebSocket {
socket._onAction.add(new WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson["eventName"].toString())
.add(fromJson["data"]);
.add(fromJson["data"] as Map);
}
if (action.eventName == ACTION_AUTHENTICATE)
@ -261,7 +270,7 @@ class AngelWebSocket {
if (split.length >= 2) {
if (ACTIONS.contains(split[1])) {
var event = handleAction(action, socket);
var event = await handleAction(action, socket);
if (event is Future) event = await event;
}
}
@ -299,11 +308,11 @@ class AngelWebSocket {
return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
hookupService(key, app.services[key]);
hookupService(key, app.services[key] as HookedService);
}
}
/// Configiures an [Angel] instance to listen for WebSocket connections.
/// Configures an [Angel] instance to listen for WebSocket connections.
Future configureServer(Angel app) async {
app..container.singleton(this);
@ -320,45 +329,56 @@ class AngelWebSocket {
if (synchronizer != null) {
synchronizer.stream.listen((e) => batchEvent(e, notify: false));
}
app.shutdownHooks.add((_) => synchronizer?.close());
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.io))
throw new AngelHttpException.badRequest();
res
..willCloseItself = true
..end();
var ws = await WebSocketTransformer.upgrade(req.io);
var channel = new IOWebSocketChannel(ws);
var socket = new WebSocketContext(channel, req, res);
/// Handles an incoming [WebSocketContext].
Future handleClient(WebSocketContext socket) async {
_clients.add(socket);
await handleConnect(socket);
_onConnection.add(socket);
req
socket.request
..properties['socket'] = socket
..inject(WebSocketContext, socket);
ws.listen(
(data) {
socket.channel.stream.listen(
(data) {
_onData.add(data);
handleData(socket, data);
},
onDone: () {
_onDisconnect.add(socket);
_clients.remove(ws);
_clients.remove(socket);
},
onError: (e) {
_onDisconnect.add(socket);
_clients.remove(ws);
_clients.remove(socket);
},
cancelOnError: true,
);
return false;
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (req is HttpRequestContextImpl) {
if (!WebSocketTransformer.isUpgradeRequest(req.io))
throw new AngelHttpException.badRequest();
res
..willCloseItself = true
..end();
var ws = await WebSocketTransformer.upgrade(req.io);
var channel = new IOWebSocketChannel(ws);
var socket = new WebSocketContext(channel, req, res);
handleClient(socket);
return false;
} else {
throw new ArgumentError('Not an HTTP/1.1 RequestContext: $req');
}
}
}

View file

@ -55,7 +55,7 @@ class WebSocketContext {
class _WebSocketEventTable {
Map<String, StreamController<Map>> _handlers = {};
StreamController<Map> _getStreamForEvent(eventName) {
StreamController<Map> _getStreamForEvent(String eventName) {
if (!_handlers.containsKey(eventName))
_handlers[eventName] = new StreamController<Map>();
return _handlers[eventName];

View file

@ -75,7 +75,7 @@ class WebSocketController extends Controller {
if (_handlers.containsKey(action.eventName)) {
var methodMirror = _handlers[action.eventName];
var fn = instanceMirror.getField(methodMirror.simpleName).reflectee;
return app.runContained(fn, socket.request, socket.response);
return app.runContained(fn as Function, socket.request, socket.response);
}
} catch (e, st) {
ws.catchError(e, st, socket);

View file

@ -1,19 +1,19 @@
name: angel_websocket
description: WebSocket plugin for Angel.
environment:
sdk: ">=1.19.0"
version: 1.1.1
sdk: ">=1.8.0 <3.0.0"
version: 1.1.2
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket
dependencies:
angel_auth: ^1.1.0-alpha
angel_client: ^1.1.0-alpha
angel_framework: ^1.1.0-alpha
http: ">=0.11.0 <0.12.0"
http: ^0.11.0
json_god: ^2.0.0-beta
merge_map: ^1.0.0
meta: ^1.0.0
uuid: "^0.5.3"
web_socket_channel: "^1.0.0"
uuid: ^0.5.3
web_socket_channel: ^1.0.0
dev_dependencies:
test: "^0.12.15"

View file

@ -11,11 +11,13 @@ const Map<String, String> USER = const {'username': 'foo', 'password': 'bar'};
main() {
Angel app;
AngelHttp http;
c.Angel client;
c.WebSockets ws;
setUp(() async {
app = new Angel();
http = new AngelHttp(app, useZone: false);
var auth = new AngelAuth();
auth.serializer = (_) async => 'baz';
@ -33,7 +35,7 @@ main() {
app.all('/ws', sock.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
var server = await app.startServer();
var server = await http.startServer();
client = new c.Rest('http://${server.address.address}:${server.port}');
ws = new c.WebSockets('ws://${server.address.address}:${server.port}/ws');
await ws.connect();
@ -41,7 +43,7 @@ main() {
tearDown(() {
return Future.wait([
app.close(),
http.close(),
client.close(),
ws.close(),
]);

View file

@ -6,8 +6,9 @@ class Game {
const Game({this.playerOne, this.playerTwo});
factory Game.fromJson(Map data) =>
new Game(playerOne: data['playerOne'], playerTwo: data['playerTwo']);
factory Game.fromJson(Map data) => new Game(
playerOne: data['playerOne'].toString(),
playerTwo: data['playerTwo'].toString());
@override
bool operator ==(other) =>
@ -16,7 +17,7 @@ class Game {
other.playerTwo == playerTwo;
}
const Game JOHN_VS_BOB = const Game(playerOne: 'John', playerTwo: 'Bob');
const Game johnVsBob = const Game(playerOne: 'John', playerTwo: 'Bob');
@Expose('/game')
class GameController extends WebSocketController {
@ -25,6 +26,6 @@ class GameController extends WebSocketController {
@ExposeWs('search')
search(WebSocketContext socket) async {
print('User is searching for a game...');
socket.send('searched', JOHN_VS_BOB);
socket.send('searched', johnVsBob);
}
}

View file

@ -8,6 +8,7 @@ import 'common.dart';
main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
@ -15,6 +16,7 @@ main() {
setUp(() async {
app = new srv.Angel();
http = new srv.AngelHttp(app, useZone: false);
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
@ -26,7 +28,7 @@ main() {
await app.configure(new GameController(websockets).configureServer);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await app.startServer();
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
@ -46,7 +48,7 @@ main() {
tearDown(() async {
await client.close();
await server.close(force: true);
await http.close();
app = null;
client = null;
server = null;
@ -58,7 +60,7 @@ main() {
client.send('search', new ws.WebSocketAction());
var search = await client.on['searched'].first;
print('Searched: ${search.data}');
expect(new Game.fromJson(search.data), equals(JOHN_VS_BOB));
expect(new Game.fromJson(search.data as Map), equals(johnVsBob));
});
});
}

View file

@ -8,6 +8,7 @@ import 'common.dart';
main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
@ -15,6 +16,7 @@ main() {
setUp(() async {
app = new srv.Angel()..use('/api/todos', new TodoService());
http = new srv.AngelHttp(app, useZone: false);
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
@ -24,7 +26,7 @@ main() {
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await app.startServer();
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
@ -44,7 +46,7 @@ main() {
tearDown(() async {
await client.close();
await server.close(force: true);
await http.close();
app = null;
client = null;
server = null;