Migrated angel_websocket

This commit is contained in:
thomashii 2021-04-10 23:12:43 +08:00
parent 037e82d699
commit 7186fa7990
19 changed files with 122 additions and 100 deletions

View file

@ -16,7 +16,7 @@
* Updated angel_validate to 4.0.0 (6/7 test passed) * Updated angel_validate to 4.0.0 (6/7 test passed)
* Updated json_god to 4.0.0 (13/13 test passed) * Updated json_god to 4.0.0 (13/13 test passed)
* Updated angel_client to 4.0.0 (6/13 test passed) * Updated angel_client to 4.0.0 (6/13 test passed)
* Updated angel_websocket to 3.0.0 (in progress) * Updated angel_websocket to 4.0.0 (in progress)
* Updated test to 3.0.0 (in progress) * Updated test to 3.0.0 (in progress)
* Updated jael to 3.0.0 (in progress) * Updated jael to 3.0.0 (in progress)
* Updated jael_preprocessor to 3.0.0 (in progress) * Updated jael_preprocessor to 3.0.0 (in progress)

View file

@ -544,8 +544,8 @@ class HookedServiceEvent<Id, Data, T extends Service<Id, Data?>> {
} }
/// Triggered on a hooked service event. /// Triggered on a hooked service event.
typedef FutureOr HookedServiceEventListener<Id, Data, typedef HookedServiceEventListener<Id, Data, T extends Service<Id, Data>>
T extends Service<Id, Data>>(HookedServiceEvent<Id, Data, T> event); = FutureOr<dynamic> Function(HookedServiceEvent<Id, Data, T> event);
/// Can be listened to, but events may be canceled. /// Can be listened to, but events may be canceled.
class HookedServiceEventDispatcher<Id, Data, T extends Service<Id, Data>> { class HookedServiceEventDispatcher<Id, Data, T extends Service<Id, Data>> {

View file

@ -15,7 +15,8 @@ import 'service.dart';
final RegExp _straySlashes = RegExp(r'(^/+)|(/+$)'); final RegExp _straySlashes = RegExp(r'(^/+)|(/+$)');
/// A function that receives an incoming [RequestContext] and responds to it. /// A function that receives an incoming [RequestContext] and responds to it.
typedef FutureOr RequestHandler(RequestContext req, ResponseContext res); typedef RequestHandler = FutureOr<dynamic> Function(
RequestContext<dynamic> req, ResponseContext<dynamic> res);
/// Sequentially runs a list of [handlers] of middleware, and returns early if any does not /// Sequentially runs a list of [handlers] of middleware, and returns early if any does not
/// return `true`. Works well with [Router].chain. /// return `true`. Works well with [Router].chain.

View file

@ -22,10 +22,10 @@ import 'service.dart';
//final RegExp _straySlashes = RegExp(r'(^/+)|(/+$)'); //final RegExp _straySlashes = RegExp(r'(^/+)|(/+$)');
/// A function that configures an [Angel] server in some way. /// A function that configures an [Angel] server in some way.
typedef FutureOr<void> AngelConfigurer(Angel app); typedef AngelConfigurer = FutureOr<void> Function(Angel app);
/// A function that asynchronously generates a view from the given path and data. /// A function that asynchronously generates a view from the given path and data.
typedef FutureOr<String> ViewGenerator(String path, typedef ViewGenerator = FutureOr<String> Function(String path,
[Map<String, dynamic>? data]); [Map<String, dynamic>? data]);
/// A powerful real-time/REST/MVC server class. /// A powerful real-time/REST/MVC server class.

View file

@ -1,6 +1,6 @@
import 'dart:async'; import 'dart:async';
typedef void _InitCallback(); typedef _InitCallback = void Function();
/// A [StreamController] boilerplate that prevents memory leaks. /// A [StreamController] boilerplate that prevents memory leaks.
abstract class SafeCtrl<T> { abstract class SafeCtrl<T> {

View file

@ -6,12 +6,12 @@ import 'package:angel_websocket/server.dart';
import 'package:file/local.dart'; import 'package:file/local.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
main(List<String> args) async { void main(List<String> args) async {
var app = new Angel(); var app = Angel();
var http = new AngelHttp(app); var http = AngelHttp(app);
var ws = new AngelWebSocket(app, sendErrors: !app.environment.isProduction); var ws = AngelWebSocket(app, sendErrors: !app.environment.isProduction);
var fs = const LocalFileSystem(); var fs = const LocalFileSystem();
app.logger = new Logger('angel_websocket'); app.logger = Logger('angel_websocket');
// This is a plug-in. It hooks all your services, // This is a plug-in. It hooks all your services,
// to automatically broadcast events. // to automatically broadcast events.
@ -31,7 +31,7 @@ main(List<String> args) async {
}); });
if (args.contains('http2')) { if (args.contains('http2')) {
var ctx = new SecurityContext() var ctx = SecurityContext()
..useCertificateChain('dev.pem') ..useCertificateChain('dev.pem')
..usePrivateKey('dev.key', password: 'dartdart'); ..usePrivateKey('dev.key', password: 'dartdart');
@ -45,7 +45,7 @@ main(List<String> args) async {
); );
} }
var http2 = new AngelHttp2(app, ctx); var http2 = AngelHttp2(app, ctx);
http2.onHttp1.listen(http.handleRequest); http2.onHttp1.listen(http.handleRequest);
await http2.startServer('127.0.0.1', 3000); await http2.startServer('127.0.0.1', 3000);
print('Listening at ${http2.uri}'); print('Listening at ${http2.uri}');

View file

@ -6,7 +6,7 @@ class WebSocketEvent<Data> {
String eventName; String eventName;
Data data; Data data;
WebSocketEvent({String this.eventName, this.data}); WebSocketEvent({this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) => WebSocketEvent( factory WebSocketEvent.fromJson(Map data) => WebSocketEvent(
eventName: data['eventName'].toString(), data: data['data'] as Data); eventName: data['eventName'].toString(), data: data['data'] as Data);
@ -31,8 +31,7 @@ class WebSocketAction {
var data; var data;
Map<String, dynamic> params; Map<String, dynamic> params;
WebSocketAction( WebSocketAction({this.id, this.eventName, this.data, this.params});
{String this.id, String this.eventName, this.data, this.params});
factory WebSocketAction.fromJson(Map data) => WebSocketAction( factory WebSocketAction.fromJson(Map data) => WebSocketAction(
id: data['id'].toString(), id: data['id'].toString(),

View file

@ -10,7 +10,7 @@ import 'package:web_socket_channel/status.dart' as status;
import 'angel_websocket.dart'; import 'angel_websocket.dart';
import 'constants.dart'; import 'constants.dart';
final RegExp _straySlashes = RegExp(r"(^/)|(/+$)"); final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// An [Angel] client that operates across WebSockets. /// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient { abstract class BaseWebSocketClient extends BaseAngelClient {
@ -38,6 +38,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream; Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired whenever a WebSocket is successfully authenticated. /// Fired whenever a WebSocket is successfully authenticated.
@override
Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream; Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream;
/// A broadcast stream of data coming from the [socket]. /// A broadcast stream of data coming from the [socket].
@ -143,6 +144,8 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
return await c.future.then((socket) { return await c.future.then((socket) {
_socket = socket; _socket = socket;
listen(); listen();
return _socket;
}); });
} else { } else {
_socket = await getConnectedWebSocket(); _socket = await getConnectedWebSocket();
@ -159,7 +162,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
@override @override
WebSocketsService<Id, Data> service<Id, Data>(String path, WebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) { {Type type, AngelDeserializer<Data> deserializer}) {
String uri = path.toString().replaceAll(_straySlashes, ''); var uri = path.toString().replaceAll(_straySlashes, '');
return WebSocketsService<Id, Data>(socket, this, uri, return WebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer); deserializer: deserializer);
} }
@ -192,7 +195,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
_onAuthenticated.add(authResult); _onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) { } else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName var split = event.eventName
.split("::") .split('::')
.where((str) => str.isNotEmpty) .where((str) => str.isNotEmpty)
.toList(); .toList();
@ -225,14 +228,15 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
} }
/// Serializes data to JSON. /// Serializes data to JSON.
serialize(x) => json.encode(x); dynamic serialize(x) => json.encode(x);
/// Sends the given [action] on the [socket]. /// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) { void sendAction(WebSocketAction action) {
if (_socket == null) if (_socket == null) {
_queue.addLast(action); _queue.addLast(action);
else } else {
socket.sink.add(serialize(action)); socket.sink.add(serialize(action));
}
} }
/// Attempts to authenticate a WebSocket, using a valid JWT. /// Attempts to authenticate a WebSocket, using a valid JWT.
@ -274,27 +278,34 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream; Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired on `index` events. /// Fired on `index` events.
@override
Stream<List<Data>> get onIndexed => _onIndexed.stream; Stream<List<Data>> get onIndexed => _onIndexed.stream;
/// Fired on `read` events. /// Fired on `read` events.
@override
Stream<Data> get onRead => _onRead.stream; Stream<Data> get onRead => _onRead.stream;
/// Fired on `created` events. /// Fired on `created` events.
@override
Stream<Data> get onCreated => _onCreated.stream; Stream<Data> get onCreated => _onCreated.stream;
/// Fired on `modified` events. /// Fired on `modified` events.
@override
Stream<Data> get onModified => _onModified.stream; Stream<Data> get onModified => _onModified.stream;
/// Fired on `updated` events. /// Fired on `updated` events.
@override
Stream<Data> get onUpdated => _onUpdated.stream; Stream<Data> get onUpdated => _onUpdated.stream;
/// Fired on `removed` events. /// Fired on `removed` events.
@override
Stream<Data> get onRemoved => _onRemoved.stream; Stream<Data> get onRemoved => _onRemoved.stream;
WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) { WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) {
listen(); listen();
} }
@override
Future close() async { Future close() async {
await _onAllEvents.close(); await _onAllEvents.close();
await _onCreated.close(); await _onCreated.close();
@ -306,7 +317,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
} }
/// Serializes an [action] to be sent over a WebSocket. /// Serializes an [action] to be sent over a WebSocket.
serialize(WebSocketAction action) => json.encode(action); dynamic serialize(WebSocketAction action) => json.encode(action);
/// Deserializes data from a [WebSocketEvent]. /// Deserializes data from a [WebSocketEvent].
Data deserialize(x) { Data deserialize(x) {
@ -423,18 +434,20 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
/// Contains a dynamic Map of [WebSocketEvent] streams. /// Contains a dynamic Map of [WebSocketEvent] streams.
class WebSocketExtraneousEventHandler { class WebSocketExtraneousEventHandler {
Map<String, StreamController<WebSocketEvent>> _events = {}; final Map<String, StreamController<WebSocketEvent>> _events = {};
StreamController<WebSocketEvent> _getStream(String index) { StreamController<WebSocketEvent> _getStream(String index) {
if (_events[index] == null) if (_events[index] == null) {
_events[index] = StreamController<WebSocketEvent>(); _events[index] = StreamController<WebSocketEvent>();
}
return _events[index]; return _events[index];
} }
Stream<WebSocketEvent> operator [](String index) { Stream<WebSocketEvent> operator [](String index) {
if (_events[index] == null) if (_events[index] == null) {
_events[index] = StreamController<WebSocketEvent>(); _events[index] = StreamController<WebSocketEvent>();
}
return _events[index].stream; return _events[index].stream;
} }

View file

@ -11,7 +11,7 @@ import 'package:web_socket_channel/html.dart';
import 'base_websocket_client.dart'; import 'base_websocket_client.dart';
export 'angel_websocket.dart'; export 'angel_websocket.dart';
final RegExp _straySlashes = RegExp(r"(^/)|(/+$)"); final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// Queries an Angel server via WebSockets. /// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient { class WebSockets extends BaseWebSocketClient {
@ -50,8 +50,9 @@ class WebSockets extends BaseWebSocketClient {
timer.cancel(); timer.cancel();
sub?.cancel(); sub?.cancel();
} }
} else } else {
timer.cancel(); timer.cancel();
}
}); });
sub = window.on[eventName ?? 'token'].listen((e) { sub = window.on[eventName ?? 'token'].listen((e) {
@ -81,12 +82,14 @@ class WebSockets extends BaseWebSocketClient {
socket socket
..onOpen.listen((_) { ..onOpen.listen((_) {
if (!completer.isCompleted) if (!completer.isCompleted) {
return completer.complete(HtmlWebSocketChannel(socket)); return completer.complete(HtmlWebSocketChannel(socket));
}
}) })
..onError.listen((e) { ..onError.listen((e) {
if (!completer.isCompleted) if (!completer.isCompleted) {
return completer.completeError(e is ErrorEvent ? e.error : e); return completer.completeError(e is ErrorEvent ? e.error : e);
}
}); });
return completer.future; return completer.future;
@ -95,7 +98,7 @@ class WebSockets extends BaseWebSocketClient {
@override @override
BrowserWebSocketsService<Id, Data> service<Id, Data>(String path, BrowserWebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) { {Type type, AngelDeserializer<Data> deserializer}) {
String uri = path.replaceAll(_straySlashes, ''); var uri = path.replaceAll(_straySlashes, '');
return BrowserWebSocketsService<Id, Data>(socket, this, uri, return BrowserWebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer); deserializer: deserializer);
} }

View file

@ -3,7 +3,6 @@ library angel_websocket.flutter;
import 'dart:async'; import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'package:http/http.dart' as http;
import 'package:http/io_client.dart' as http; import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/io.dart';

View file

@ -7,8 +7,8 @@ import 'package:angel_framework/angel_framework.dart';
HookedServiceEventListener doNotBroadcast([provider]) { HookedServiceEventListener doNotBroadcast([provider]) {
return (HookedServiceEvent e) { return (HookedServiceEvent e) {
if (e.params != null && e.params.containsKey('provider')) { if (e.params != null && e.params.containsKey('provider')) {
bool deny = false; var deny = false;
Iterable providers = provider is Iterable ? provider : [provider]; var providers = provider is Iterable ? provider : [provider];
for (var p in providers) { for (var p in providers) {
if (deny) break; if (deny) break;
@ -19,9 +19,10 @@ HookedServiceEventListener doNotBroadcast([provider]) {
e.params['provider'] == p.via; e.params['provider'] == p.via;
} else if (p == null) { } else if (p == null) {
deny = true; deny = true;
} else } else {
deny = deny =
deny || (e.params['provider'] as Providers).via == p.toString(); deny || (e.params['provider'] as Providers).via == p.toString();
}
} }
e.params['broadcast'] = false; e.params['broadcast'] = false;

View file

@ -4,7 +4,6 @@ library angel_websocket.io;
import 'dart:async'; import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'package:angel_client/angel_client.dart'; import 'package:angel_client/angel_client.dart';
import 'package:http/http.dart' as http;
import 'package:http/io_client.dart' as http; import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/io.dart';
@ -12,7 +11,7 @@ import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart'; export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart'; export 'angel_websocket.dart';
final RegExp _straySlashes = RegExp(r"(^/)|(/+$)"); final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// Queries an Angel server via WebSockets. /// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient { class WebSockets extends BaseWebSocketClient {

View file

@ -21,11 +21,11 @@ part 'websocket_context.dart';
part 'websocket_controller.dart'; part 'websocket_controller.dart';
typedef String WebSocketResponseSerializer(data); typedef WebSocketResponseSerializer = String Function(dynamic data);
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s. /// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket { class AngelWebSocket {
List<WebSocketContext> _clients = <WebSocketContext>[]; final List<WebSocketContext> _clients = <WebSocketContext>[];
final List<String> _servicesAlreadyWired = []; final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction = final StreamController<WebSocketAction> _onAction =
@ -118,7 +118,7 @@ class AngelWebSocket {
/// Slates an event to be dispatched. /// Slates an event to be dispatched.
Future<void> batchEvent(WebSocketEvent event, Future<void> batchEvent(WebSocketEvent event,
{filter(WebSocketContext socket), bool notify = true}) async { {Function(WebSocketContext socket) filter, bool notify = true}) async {
// Default implementation will just immediately fire events // Default implementation will just immediately fire events
_clients.forEach((client) async { _clients.forEach((client) async {
dynamic result = true; dynamic result = true;
@ -128,8 +128,9 @@ class AngelWebSocket {
} }
}); });
if (synchronizationChannel != null && notify != false) if (synchronizationChannel != null && notify != false) {
synchronizationChannel.sink.add(event); synchronizationChannel.sink.add(event);
}
} }
/// Returns a list of events yet to be sent. /// Returns a list of events yet to be sent.
@ -137,7 +138,7 @@ class AngelWebSocket {
/// Responds to an incoming action on a WebSocket. /// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async { Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::"); var split = action.eventName.split('::');
if (split.length < 2) { if (split.length < 2) {
socket.sendError(AngelHttpException.badRequest()); socket.sendError(AngelHttpException.badRequest());
@ -148,7 +149,7 @@ class AngelWebSocket {
if (service == null) { if (service == null) {
socket.sendError(AngelHttpException.notFound( socket.sendError(AngelHttpException.notFound(
message: "No service \"${split[0]}\" exists.")); message: 'No service \"${split[0]}\" exists.'));
return null; return null;
} }
@ -157,17 +158,18 @@ class AngelWebSocket {
if (action.params is! Map) action.params = <String, dynamic>{}; if (action.params is! Map) action.params = <String, dynamic>{};
if (allowClientParams != true) { if (allowClientParams != true) {
if (action.params['query'] is Map) if (action.params['query'] is Map) {
action.params = {'query': action.params['query']}; action.params = {'query': action.params['query']};
else } else {
action.params = {}; action.params = {};
}
} }
var params = mergeMap<String, dynamic>([ var params = mergeMap<String, dynamic>([
((deserializer ?? (params) => params)(action.params)) ((deserializer ?? (params) => params)(action.params))
as Map<String, dynamic>, as Map<String, dynamic>,
{ {
"provider": Providers.websocket, 'provider': Providers.websocket,
'__requestctx': socket.request, '__requestctx': socket.request,
'__responsectx': socket.response '__responsectx': socket.response
} }
@ -176,31 +178,31 @@ class AngelWebSocket {
try { try {
if (actionName == indexAction) { if (actionName == indexAction) {
socket.send( socket.send(
"${split[0]}::" + indexedEvent, await service.index(params)); '${split[0]}::' + indexedEvent, await service.index(params));
return null; return null;
} else if (actionName == readAction) { } else if (actionName == readAction) {
socket.send( socket.send(
"${split[0]}::" + readEvent, await service.read(action.id, params)); '${split[0]}::' + readEvent, await service.read(action.id, params));
return null; return null;
} else if (actionName == createAction) { } else if (actionName == createAction) {
return WebSocketEvent( return WebSocketEvent(
eventName: "${split[0]}::" + createdEvent, eventName: '${split[0]}::' + createdEvent,
data: await service.create(action.data, params)); data: await service.create(action.data, params));
} else if (actionName == modifyAction) { } else if (actionName == modifyAction) {
return WebSocketEvent( return WebSocketEvent(
eventName: "${split[0]}::" + modifiedEvent, eventName: '${split[0]}::' + modifiedEvent,
data: await service.modify(action.id, action.data, params)); data: await service.modify(action.id, action.data, params));
} else if (actionName == updateAction) { } else if (actionName == updateAction) {
return WebSocketEvent( return WebSocketEvent(
eventName: "${split[0]}::" + updatedEvent, eventName: '${split[0]}::' + updatedEvent,
data: await service.update(action.id, action.data, params)); data: await service.update(action.id, action.data, params));
} else if (actionName == removeAction) { } else if (actionName == removeAction) {
return WebSocketEvent( return WebSocketEvent(
eventName: "${split[0]}::" + removedEvent, eventName: '${split[0]}::' + removedEvent,
data: await service.remove(action.id, params)); data: await service.remove(action.id, params));
} else { } else {
socket.sendError(AngelHttpException.methodNotAllowed( socket.sendError(AngelHttpException.methodNotAllowed(
message: "Method Not Allowed: \"$actionName\"")); message: 'Method Not Allowed: \"$actionName\"'));
return null; return null;
} }
} catch (e, st) { } catch (e, st) {
@ -223,7 +225,7 @@ class AngelWebSocket {
var user = await auth.deserializer(token.userId); var user = await auth.deserializer(token.userId);
socket.request socket.request
..container.registerSingleton<AuthToken>(token) ..container.registerSingleton<AuthToken>(token)
..container.registerSingleton(user, as: user.runtimeType as Type); ..container.registerSingleton(user, as: user.runtimeType);
socket._onAuthenticated.add(null); socket._onAuthenticated.add(null);
socket.send(authenticatedEvent, socket.send(authenticatedEvent,
{'token': token.serialize(auth.hmac), 'data': user}); {'token': token.serialize(auth.hmac), 'data': user});
@ -257,7 +259,7 @@ class AngelWebSocket {
Future handleConnect(WebSocketContext socket) async {} Future handleConnect(WebSocketContext socket) async {}
/// Handles incoming data from a WebSocket. /// Handles incoming data from a WebSocket.
handleData(WebSocketContext socket, data) async { dynamic handleData(WebSocketContext socket, data) async {
try { try {
socket._onData.add(data); socket._onData.add(data);
var fromJson = json.decode(data.toString()); var fromJson = json.decode(data.toString());
@ -270,18 +272,19 @@ class AngelWebSocket {
throw AngelHttpException.badRequest(); throw AngelHttpException.badRequest();
} }
if (fromJson is Map && fromJson.containsKey("eventName")) { if (fromJson is Map && fromJson.containsKey('eventName')) {
socket._onAction.add(WebSocketAction.fromJson(fromJson)); socket._onAction.add(WebSocketAction.fromJson(fromJson));
socket.on socket.on
._getStreamForEvent(fromJson["eventName"].toString()) ._getStreamForEvent(fromJson['eventName'].toString())
.add(fromJson["data"] as Map); .add(fromJson['data'] as Map);
} }
if (action.eventName == authenticateAction) if (action.eventName == authenticateAction) {
await handleAuth(action, socket); await handleAuth(action, socket);
}
if (action.eventName.contains("::")) { if (action.eventName.contains('::')) {
var split = action.eventName.split("::"); var split = action.eventName.split('::');
if (split.length >= 2) { if (split.length >= 2) {
if (actions.contains(split[1])) { if (actions.contains(split[1])) {
@ -318,8 +321,8 @@ class AngelWebSocket {
} }
/// Hooks any [HookedService]s that are not being broadcasted yet. /// Hooks any [HookedService]s that are not being broadcasted yet.
wireAllServices(Angel app) { void wireAllServices(Angel app) {
for (Pattern key in app.services.keys.where((x) { for (var key in app.services.keys.where((x) {
return !_servicesAlreadyWired.contains(x) && return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService; app.services[x] is HookedService;
})) { })) {
@ -329,10 +332,11 @@ class AngelWebSocket {
/// Configures an [Angel] instance to listen for WebSocket connections. /// Configures an [Angel] instance to listen for WebSocket connections.
Future configureServer(Angel app) async { Future configureServer(Angel app) async {
app..container.registerSingleton(this); app.container.registerSingleton(this);
if (runtimeType != AngelWebSocket) if (runtimeType != AngelWebSocket) {
app..container.registerSingleton<AngelWebSocket>(this); app.container.registerSingleton<AngelWebSocket>(this);
}
// Set up services // Set up services
wireAllServices(app); wireAllServices(app);
@ -384,9 +388,10 @@ class AngelWebSocket {
/// Handles an incoming HTTP request. /// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async { Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (req is HttpRequestContext && res is HttpResponseContext) { if (req is HttpRequestContext && res is HttpResponseContext) {
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest)) if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest)) {
throw AngelHttpException.badRequest(); throw AngelHttpException.badRequest();
await res.detach(); }
res.detach();
var ws = await WebSocketTransformer.upgrade(req.rawRequest); var ws = await WebSocketTransformer.upgrade(req.rawRequest);
var channel = IOWebSocketChannel(ws); var channel = IOWebSocketChannel(ws);
var socket = WebSocketContext(channel, req, res); var socket = WebSocketContext(channel, req, res);
@ -437,12 +442,12 @@ class AngelWebSocket {
} }
var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink); var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink);
sink.add("HTTP/1.1 101 Switching Protocols\r\n" sink.add('HTTP/1.1 101 Switching Protocols\r\n'
"Upgrade: websocket\r\n" 'Upgrade: websocket\r\n'
"Connection: Upgrade\r\n" 'Connection: Upgrade\r\n'
"Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n"); 'Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n');
if (protocol != null) sink.add("Sec-WebSocket-Protocol: $protocol\r\n"); if (protocol != null) sink.add('Sec-WebSocket-Protocol: $protocol\r\n');
sink.add("\r\n"); sink.add('\r\n');
var ws = WebSocketChannel(ctrl.foreign); var ws = WebSocketChannel(ctrl.foreign);
var socket = WebSocketContext(ws, req, res); var socket = WebSocketContext(ws, req, res);

View file

@ -15,14 +15,14 @@ class WebSocketContext {
/// The original [ResponseContext]. /// The original [ResponseContext].
final ResponseContext response; final ResponseContext response;
StreamController<WebSocketAction> _onAction = final StreamController<WebSocketAction> _onAction =
StreamController<WebSocketAction>(); StreamController<WebSocketAction>();
StreamController<void> _onAuthenticated = StreamController(); final StreamController<void> _onAuthenticated = StreamController();
StreamController<Null> _onClose = StreamController<Null>(); final StreamController<Null> _onClose = StreamController<Null>();
StreamController _onData = StreamController(); final StreamController _onData = StreamController();
/// Fired on any [WebSocketAction]; /// Fired on any [WebSocketAction];
Stream<WebSocketAction> get onAction => _onAction.stream; Stream<WebSocketAction> get onAction => _onAction.stream;
@ -45,7 +45,7 @@ class WebSocketContext {
await _onAction.close(); await _onAction.close();
await _onAuthenticated.close(); await _onAuthenticated.close();
await _onData.close(); await _onData.close();
await _onClose.add(null); _onClose.add(null);
await _onClose.close(); await _onClose.close();
}); });
} }
@ -61,11 +61,12 @@ class WebSocketContext {
} }
class _WebSocketEventTable { class _WebSocketEventTable {
Map<String, StreamController<Map>> _handlers = {}; final Map<String, StreamController<Map>> _handlers = {};
StreamController<Map> _getStreamForEvent(String eventName) { StreamController<Map> _getStreamForEvent(String eventName) {
if (!_handlers.containsKey(eventName)) if (!_handlers.containsKey(eventName)) {
_handlers[eventName] = StreamController<Map>(); _handlers[eventName] = StreamController<Map>();
}
return _handlers[eventName]; return _handlers[eventName];
} }

View file

@ -12,44 +12,46 @@ class WebSocketController extends Controller {
/// The plug-in instance powering this controller. /// The plug-in instance powering this controller.
final AngelWebSocket ws; final AngelWebSocket ws;
Map<String, MethodMirror> _handlers = {}; final Map<String, MethodMirror> _handlers = {};
Map<String, Symbol> _handlerSymbols = {}; final Map<String, Symbol> _handlerSymbols = {};
WebSocketController(this.ws) : super(); WebSocketController(this.ws) : super();
/// Sends an event to all clients. /// Sends an event to all clients.
void broadcast(String eventName, data, {filter(WebSocketContext socket)}) { void broadcast(String eventName, data,
{Function(WebSocketContext socket) filter}) {
ws.batchEvent(WebSocketEvent(eventName: eventName, data: data), ws.batchEvent(WebSocketEvent(eventName: eventName, data: data),
filter: filter); filter: filter);
} }
/// Fired on new connections. /// Fired on new connections.
onConnect(WebSocketContext socket) {} dynamic onConnect(WebSocketContext socket) {}
/// Fired on disconnections. /// Fired on disconnections.
onDisconnect(WebSocketContext socket) {} dynamic onDisconnect(WebSocketContext socket) {}
/// Fired on all incoming actions. /// Fired on all incoming actions.
onAction(WebSocketAction action, WebSocketContext socket) async {} dynamic onAction(WebSocketAction action, WebSocketContext socket) async {}
/// Fired on arbitrary incoming data. /// Fired on arbitrary incoming data.
onData(data, WebSocketContext socket) {} dynamic onData(data, WebSocketContext socket) {}
@override @override
Future configureServer(Angel app) async { Future configureServer(Angel app) async {
if (findExpose(app.container.reflector) != null) if (findExpose(app.container.reflector) != null) {
await super.configureServer(app); await super.configureServer(app);
}
InstanceMirror instanceMirror = reflect(this); var instanceMirror = reflect(this);
ClassMirror classMirror = reflectClass(this.runtimeType); var classMirror = reflectClass(runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) { classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) { if (mirror.isRegularMethod) {
InstanceMirror exposeMirror = mirror.metadata.firstWhere( var exposeMirror = mirror.metadata.firstWhere(
(mirror) => mirror.reflectee is ExposeWs, (mirror) => mirror.reflectee is ExposeWs,
orElse: () => null); orElse: () => null);
if (exposeMirror != null) { if (exposeMirror != null) {
ExposeWs exposeWs = exposeMirror.reflectee as ExposeWs; var exposeWs = exposeMirror.reflectee as ExposeWs;
_handlers[exposeWs.eventName] = mirror; _handlers[exposeWs.eventName] = mirror;
_handlerSymbols[exposeWs.eventName] = sym; _handlerSymbols[exposeWs.eventName] = sym;
} }

View file

@ -1,4 +1,3 @@
import 'dart:async';
import 'package:angel_auth/angel_auth.dart'; import 'package:angel_auth/angel_auth.dart';
import 'package:angel_client/io.dart' as c; import 'package:angel_client/io.dart' as c;
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';

View file

@ -28,7 +28,7 @@ class GameController extends WebSocketController {
GameController(AngelWebSocket ws) : super(ws); GameController(AngelWebSocket ws) : super(ws);
@ExposeWs('search') @ExposeWs('search')
search(WebSocketContext socket) async { dynamic search(WebSocketContext socket) async {
print('User is searching for a game...'); print('User is searching for a game...');
socket.send('searched', johnVsBob); socket.send('searched', johnVsBob);
} }

View file

@ -10,7 +10,7 @@ class Todo extends Model {
String text; String text;
String when; String when;
Todo({String this.text, String this.when}); Todo({this.text, this.when});
} }
class TodoService extends MapService { class TodoService extends MapService {
@ -23,7 +23,7 @@ class TodoService extends MapService {
} }
} }
testIndex(BaseWebSocketClient client) async { dynamic testIndex(BaseWebSocketClient client) async {
var todoService = client.service('api/todos'); var todoService = client.service('api/todos');
scheduleMicrotask(() => todoService.index()); scheduleMicrotask(() => todoService.index());

View file

@ -2,8 +2,8 @@ import 'dart:html';
import 'package:angel_websocket/browser.dart'; import 'package:angel_websocket/browser.dart';
/// Dummy app to ensure client works with DDC. /// Dummy app to ensure client works with DDC.
main() { void main() {
var app = new WebSockets(window.location.origin); var app = WebSockets(window.location.origin);
window.alert(app.baseUrl.toString()); window.alert(app.baseUrl.toString());
app.connect().catchError((_) { app.connect().catchError((_) {