Migrated websocket

This commit is contained in:
thomashii@dukefirehawk.com 2021-04-26 08:47:32 +08:00
parent ee2ac157eb
commit 41188b3df8
20 changed files with 257 additions and 210 deletions

View file

@ -1,22 +1,22 @@
# 4.0.0 (NNBD)
* Changed Dart SDK requirements for all packages to ">=2.12.0 <3.0.0" to support NNBD.
* Updated pretty_logging to 3.0.0 (0/0 tests)
* Updated angel_http_exception to 3.0.0 (0/0 tests)
* Migrated pretty_logging to 3.0.0 (0/0 tests)
* Migrated angel_http_exception to 3.0.0 (0/0 tests)
* Moved angel_cli to https://github.com/dukefirehawk/cli (Not migrated)
* Added code_buffer and updated to 2.0.0 (16/16 tests)
* Added combinator and updated to 2.0.0 (16/16 tests)
* Updated angel_route to 5.0.0 (35/35 tests passed)
* Updated angel_model to 3.0.0 (0/0 tests)
* Updated angel_container to 3.0.0 (55/55 tests passed)
* Added merge_map and updated to 2.0.0 (6/6 tests passed)
* Added mock_request and updated to 2.0.0 (0/0 tests)
* Updated angel_framework to 4.0.0 (146/149 tests passed)
* Updated angel_auth to 4.0.0 (22/32 test passed)
* Updated angel_configuration to 4.0.0 (6/8 test passed)
* Updated angel_validate to 4.0.0 (6/7 test passed)
* Updated json_god to 4.0.0 (13/13 test passed)
* Updated angel_client to 4.0.0 (6/13 test passed)
* Updated angel_websocket to 4.0.0 (in progress)
* Added code_buffer and migrated to 2.0.0 (16/16 tests)
* Added combinator and migrated to 2.0.0 (16/16 tests)
* Migrated angel_route to 5.0.0 (35/35 tests passed)
* Migrated angel_model to 3.0.0 (0/0 tests)
* Migrated angel_container to 3.0.0 (55/55 tests passed)
* Added merge_map and migrated to 2.0.0 (6/6 tests passed)
* Added mock_request and migrated to 2.0.0 (0/0 tests)
* Migrated angel_framework to 4.0.0 (146/149 tests passed)
* Migrated angel_auth to 4.0.0 (22/32 tests passed)
* Migrated angel_configuration to 4.0.0 (6/8 testspassed)
* Migrated angel_validate to 4.0.0 (6/7 tests passed)
* Migrated json_god to 4.0.0 (13/13 tests passed)
* Migrated angel_client to 4.0.0 (6/13 tests passed)
* Migrated angel_websocket to 4.0.0 (2/3 tests passed)
* Updated test to 3.0.0 (in progress)
* Updated jael to 3.0.0 (in progress)
* Updated jael_preprocessor to 3.0.0 (in progress)

View file

@ -6,7 +6,6 @@ import 'package:collection/collection.dart';
import 'dart:convert';
import 'package:http/http.dart' as http;
export 'package:angel_http_exception/angel_http_exception.dart';
import 'package:meta/meta.dart';
/// A function that configures an [Angel] client in some way.
typedef AngelConfigurer = FutureOr<void> Function(Angel app);
@ -29,7 +28,7 @@ abstract class Angel extends http.BaseClient {
final Uri baseUrl;
Angel(baseUrl)
: this.baseUrl = baseUrl is Uri ? baseUrl : Uri.parse(baseUrl.toString());
: baseUrl = baseUrl is Uri ? baseUrl : Uri.parse(baseUrl.toString());
/// Prefer to use [baseUrl] instead.
@deprecated

View file

@ -21,9 +21,7 @@ Map<String, String>? _buildQuery(Map<String, dynamic>? params) {
}
bool _invalid(http.Response response) =>
response.statusCode == null ||
response.statusCode < 200 ||
response.statusCode >= 300;
response.statusCode < 200 || response.statusCode >= 300;
AngelHttpException failure(http.Response response,
{error, String? message, StackTrace? stack}) {
@ -249,7 +247,7 @@ class BaseAngelService<Id, Data> extends Service<Id, Data?> {
}
BaseAngelService(this.client, this.app, baseUrl, {this.deserializer})
: this.baseUrl = baseUrl is Uri ? baseUrl : Uri.parse(baseUrl.toString());
: baseUrl = baseUrl is Uri ? baseUrl : Uri.parse(baseUrl.toString());
/// Use [baseUrl] instead.
@deprecated

View file

@ -191,16 +191,26 @@ abstract class Driver<
message: e?.toString() ?? '500 Internal Server Error');
}, test: (e) => e is AngelHttpException).catchError(
(ee, StackTrace st) {
var e = ee as AngelHttpException;
//print(">>>> Framework error: $ee");
//var t = (st).runtimeType;
//print(">>>> StackTrace: $t");
AngelHttpException e;
if (ee is AngelHttpException) {
e = ee;
} else {
e = AngelHttpException(ee,
stackTrace: st,
statusCode: 500,
message: ee?.toString() ?? '500 Internal Server Error');
}
if (app.logger != null) {
var error = e.error ?? e;
var trace = Trace.from(e.stackTrace ?? StackTrace.current).terse;
var trace = Trace.from(StackTrace.current).terse;
app.logger?.severe(e.message, error, trace);
}
return handleAngelHttpException(
e, e.stackTrace ?? st, req, res, request, response);
return handleAngelHttpException(e, st, req, res, request, response);
});
} else {
var zoneSpec = ZoneSpecification(

View file

@ -38,7 +38,7 @@ void main(List<String> args) async {
try {
ctx.setAlpnProtocols(['h2'], true);
} catch (e, st) {
app.logger.severe(
app.logger!.severe(
'Cannot set ALPN protocol on server to `h2`. The server will only serve HTTP/1.x.',
e,
st,

View file

@ -3,19 +3,19 @@ library angel_websocket;
/// A notification from the server that something has occurred.
class WebSocketEvent<Data> {
String eventName;
Data data;
String? eventName;
Data? data;
WebSocketEvent({this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) => WebSocketEvent(
eventName: data['eventName'].toString(), data: data['data'] as Data);
eventName: data['eventName'].toString(), data: data['data'] as Data?);
WebSocketEvent<T> cast<T>() {
if (T == Data) {
return this as WebSocketEvent<T>;
} else {
return WebSocketEvent<T>(eventName: eventName, data: data as T);
return WebSocketEvent<T>(eventName: eventName, data: data as T?);
}
}
@ -26,10 +26,10 @@ class WebSocketEvent<Data> {
/// A command sent to the server, usually corresponding to a service method.
class WebSocketAction {
String id;
String eventName;
String? id;
String? eventName;
var data;
Map<String, dynamic> params;
Map<String, dynamic>? params;
WebSocketAction({this.id, this.eventName, this.data, this.params});
@ -37,7 +37,7 @@ class WebSocketAction {
id: data['id'].toString(),
eventName: data['eventName'].toString(),
data: data['data'],
params: data['params'] as Map<String, dynamic>);
params: data['params'] as Map<String, dynamic>?);
Map<String, dynamic> toJson() {
return {'id': id, 'eventName': eventName, 'data': data, 'params': params};

View file

@ -14,8 +14,8 @@ final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient {
Duration _reconnectInterval;
WebSocketChannel _socket;
Duration? _reconnectInterval;
WebSocketChannel? _socket;
final Queue<WebSocketAction> _queue = Queue<WebSocketAction>();
final StreamController _onData = StreamController();
@ -58,16 +58,16 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
_onWebSocketChannelException.stream;
/// The [WebSocketChannel] underneath this instance.
WebSocketChannel get socket => _socket;
WebSocketChannel? get socket => _socket;
/// If `true` (default), then the client will automatically try to reconnect to the server
/// if the socket closes.
final bool reconnectOnClose;
/// The amount of time to wait between reconnect attempts. Default: 10 seconds.
Duration get reconnectInterval => _reconnectInterval;
Duration? get reconnectInterval => _reconnectInterval;
Uri _wsUri;
Uri? _wsUri;
/// The [Uri] to which a websocket should point.
Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl);
@ -87,7 +87,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
BaseWebSocketClient(http.BaseClient client, baseUrl,
{this.reconnectOnClose = true, Duration reconnectInterval})
{this.reconnectOnClose = true, Duration? reconnectInterval})
: super(client, baseUrl) {
_reconnectInterval = reconnectInterval ?? Duration(seconds: 10);
}
@ -96,7 +96,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
Future close() async {
on._close();
scheduleMicrotask(() async {
await _socket.sink.close(status.goingAway);
await _socket!.sink.close(status.goingAway);
await _onData.close();
await _onAllEvents.close();
await _onAuthenticated.close();
@ -107,10 +107,10 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
/// Connects the WebSocket. [timeout] is optional.
Future<WebSocketChannel> connect({Duration timeout}) async {
Future<WebSocketChannel?> connect({Duration? timeout}) async {
if (timeout != null) {
var c = Completer<WebSocketChannel>();
Timer timer;
late Timer timer;
timer = Timer(timeout, () {
if (!c.isCompleted) {
@ -122,7 +122,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
});
scheduleMicrotask(() {
return getConnectedWebSocket().then((socket) {
getConnectedWebSocket().then((socket) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
@ -135,8 +135,13 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
}).catchError((e, StackTrace st) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(e, st);
if (timer.isActive) {
timer.cancel();
}
// TODO: Re-evaluate this error
var obj = 'Error';
c.completeError(obj, st);
}
});
});
@ -160,16 +165,17 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
Future<WebSocketChannel> getConnectedWebSocket();
@override
WebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.toString().replaceAll(_straySlashes, '');
return WebSocketsService<Id, Data>(socket, this, uri,
var wsService = WebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer);
return wsService as Service<Id, Data>;
}
/// Starts listening for data.
void listen() {
_socket?.stream?.listen(
_socket?.stream.listen(
(data) {
_onData.add(data);
@ -183,7 +189,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
if (event.eventName?.isNotEmpty == true) {
_onAllEvents.add(event);
on._getStream(event.eventName).add(event);
on._getStream(event.eventName)!.add(event);
}
if (event.eventName == errorEvent) {
@ -191,10 +197,10 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
AngelHttpException.fromMap((event.data ?? {}) as Map);
_onError.add(error);
} else if (event.eventName == authenticatedEvent) {
var authResult = AngelAuthResult.fromMap(event.data as Map);
var authResult = AngelAuthResult.fromMap(event.data as Map?);
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName
var split = event.eventName!
.split('::')
.where((str) => str.isNotEmpty)
.toList();
@ -212,7 +218,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
onDone: () {
_socket = null;
if (reconnectOnClose == true) {
Timer.periodic(reconnectInterval, (Timer timer) async {
Timer.periodic(reconnectInterval!, (Timer timer) async {
var result;
try {
@ -235,12 +241,12 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
if (_socket == null) {
_queue.addLast(action);
} else {
socket.sink.add(serialize(action));
socket!.sink.add(serialize(action));
}
}
/// Attempts to authenticate a WebSocket, using a valid JWT.
void authenticateViaJwt(String jwt) {
void authenticateViaJwt(String? jwt) {
sendAction(WebSocketAction(
eventName: authenticateAction,
params: {
@ -251,55 +257,55 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}
/// A [Service] that asynchronously interacts with the server.
class WebSocketsService<Id, Data> extends Service<Id, Data> {
class WebSocketsService<Id, Data> extends Service<Id, Data?> {
/// The [BaseWebSocketClient] that spawned this service.
@override
final BaseWebSocketClient app;
/// Used to deserialize JSON into typed data.
final AngelDeserializer<Data> deserializer;
final AngelDeserializer<Data>? deserializer;
/// The [WebSocketChannel] to listen to, and send data across.
final WebSocketChannel socket;
final WebSocketChannel? socket;
/// The service path to listen to.
final String path;
final StreamController<WebSocketEvent> _onAllEvents =
StreamController<WebSocketEvent>();
final StreamController<List<Data>> _onIndexed = StreamController();
final StreamController<Data> _onRead = StreamController<Data>();
final StreamController<Data> _onCreated = StreamController<Data>();
final StreamController<Data> _onModified = StreamController<Data>();
final StreamController<Data> _onUpdated = StreamController<Data>();
final StreamController<Data> _onRemoved = StreamController<Data>();
final StreamController<List<Data?>> _onIndexed = StreamController();
final StreamController<Data?> _onRead = StreamController<Data>();
final StreamController<Data?> _onCreated = StreamController<Data>();
final StreamController<Data?> _onModified = StreamController<Data>();
final StreamController<Data?> _onUpdated = StreamController<Data>();
final StreamController<Data?> _onRemoved = StreamController<Data>();
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired on `index` events.
@override
Stream<List<Data>> get onIndexed => _onIndexed.stream;
Stream<List<Data?>> get onIndexed => _onIndexed.stream;
/// Fired on `read` events.
@override
Stream<Data> get onRead => _onRead.stream;
Stream<Data?> get onRead => _onRead.stream;
/// Fired on `created` events.
@override
Stream<Data> get onCreated => _onCreated.stream;
Stream<Data?> get onCreated => _onCreated.stream;
/// Fired on `modified` events.
@override
Stream<Data> get onModified => _onModified.stream;
Stream<Data?> get onModified => _onModified.stream;
/// Fired on `updated` events.
@override
Stream<Data> get onUpdated => _onUpdated.stream;
Stream<Data?> get onUpdated => _onUpdated.stream;
/// Fired on `removed` events.
@override
Stream<Data> get onRemoved => _onRemoved.stream;
Stream<Data?> get onRemoved => _onRemoved.stream;
WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) {
listen();
@ -320,8 +326,8 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
dynamic serialize(WebSocketAction action) => json.encode(action);
/// Deserializes data from a [WebSocketEvent].
Data deserialize(x) {
return deserializer != null ? deserializer(x) : x as Data;
Data? deserialize(x) {
return deserializer != null ? deserializer!(x) : x as Data?;
}
/// Deserializes the contents of an [event].
@ -334,7 +340,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
void listen() {
app.onServiceEvent.listen((map) {
if (map.containsKey(path)) {
var event = map[path];
var event = map[path]!;
_onAllEvents.add(event);
@ -343,7 +349,9 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
var transformed = WebSocketEvent(
eventName: event.eventName,
data: d is Iterable ? d.map(deserialize).toList() : null);
if (transformed.data != null) _onIndexed.add(transformed.data);
if (transformed.data != null) {
_onIndexed.add(transformed.data!);
}
return;
}
@ -376,14 +384,14 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
}
@override
Future<List<Data>> index([Map<String, dynamic> params]) async {
Future<List<Data>?> index([Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$indexAction', params: params ?? {}));
return null;
}
@override
Future<Data> read(id, [Map<String, dynamic> params]) async {
Future<Data?> read(id, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$readAction',
id: id.toString(),
@ -392,14 +400,14 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
}
@override
Future<Data> create(data, [Map<String, dynamic> params]) async {
Future<Data?> create(data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$createAction', data: data, params: params ?? {}));
return null;
}
@override
Future<Data> modify(id, data, [Map<String, dynamic> params]) async {
Future<Data?> modify(id, data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$modifyAction',
id: id.toString(),
@ -409,7 +417,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
}
@override
Future<Data> update(id, data, [Map<String, dynamic> params]) async {
Future<Data?> update(id, data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$updateAction',
id: id.toString(),
@ -419,7 +427,7 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
}
@override
Future<Data> remove(id, [Map<String, dynamic> params]) async {
Future<Data?> remove(id, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$removeAction',
id: id.toString(),
@ -434,9 +442,9 @@ class WebSocketsService<Id, Data> extends Service<Id, Data> {
/// Contains a dynamic Map of [WebSocketEvent] streams.
class WebSocketExtraneousEventHandler {
final Map<String, StreamController<WebSocketEvent>> _events = {};
final Map<String?, StreamController<WebSocketEvent>> _events = {};
StreamController<WebSocketEvent> _getStream(String index) {
StreamController<WebSocketEvent>? _getStream(String? index) {
if (_events[index] == null) {
_events[index] = StreamController<WebSocketEvent>();
}
@ -449,7 +457,7 @@ class WebSocketExtraneousEventHandler {
_events[index] = StreamController<WebSocketEvent>();
}
return _events[index].stream;
return _events[index]!.stream;
}
void _close() {

View file

@ -18,7 +18,7 @@ class WebSockets extends BaseWebSocketClient {
final List<BrowserWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.BrowserClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@ -34,15 +34,15 @@ class WebSockets extends BaseWebSocketClient {
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token', String errorMessage}) {
{String eventName = 'token', String? errorMessage}) {
var ctrl = StreamController<String>();
var wnd = window.open(url, 'angel_client_auth_popup');
Timer t;
StreamSubscription<Event> sub;
StreamSubscription<Event>? sub;
t = Timer.periodic(Duration(milliseconds: 500), (timer) {
if (!ctrl.isClosed) {
if (wnd.closed) {
if (wnd.closed!) {
ctrl.addError(AngelHttpException.notAuthenticated(
message:
errorMessage ?? 'Authentication via popup window failed.'));
@ -55,12 +55,12 @@ class WebSockets extends BaseWebSocketClient {
}
});
sub = window.on[eventName ?? 'token'].listen((e) {
sub = window.on[eventName].listen((e) {
if (!ctrl.isClosed) {
ctrl.add((e as CustomEvent).detail.toString());
t.cancel();
ctrl.close();
sub.cancel();
sub!.cancel();
}
});
@ -73,7 +73,7 @@ class WebSockets extends BaseWebSocketClient {
if (authToken?.isNotEmpty == true) {
url = url.replace(
queryParameters: Map<String, String>.from(url.queryParameters)
queryParameters: Map<String, String?>.from(url.queryParameters)
..['token'] = authToken);
}
@ -88,7 +88,7 @@ class WebSockets extends BaseWebSocketClient {
})
..onError.listen((e) {
if (!completer.isCompleted) {
return completer.completeError(e is ErrorEvent ? e.error : e);
return completer.completeError(e is ErrorEvent ? e.error! : e);
}
});
@ -96,18 +96,18 @@ class WebSockets extends BaseWebSocketClient {
}
@override
BrowserWebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.replaceAll(_straySlashes, '');
return BrowserWebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer);
deserializer: deserializer) as Service<Id, Data>;
}
}
class BrowserWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type type;
final Type? type;
BrowserWebSocketsService(WebSocketChannel socket, WebSockets app, String uri,
{this.type, AngelDeserializer<Data> deserializer})
BrowserWebSocketsService(WebSocketChannel? socket, WebSockets app, String uri,
{this.type, AngelDeserializer<Data>? deserializer})
: super(socket, app, uri, deserializer: deserializer);
}

View file

@ -17,7 +17,7 @@ class WebSockets extends BaseWebSocketClient {
final List<WebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);

View file

@ -6,7 +6,7 @@ import 'package:angel_framework/angel_framework.dart';
/// If [provider] is `null`, any provider will be blocked.
HookedServiceEventListener doNotBroadcast([provider]) {
return (HookedServiceEvent e) {
if (e.params != null && e.params.containsKey('provider')) {
if (e.params != null && e.params!.containsKey('provider')) {
var deny = false;
var providers = provider is Iterable ? provider : [provider];
@ -15,17 +15,17 @@ HookedServiceEventListener doNotBroadcast([provider]) {
if (p is Providers) {
deny = deny ||
p == e.params['provider'] ||
e.params['provider'] == p.via;
p == e.params!['provider'] ||
e.params!['provider'] == p.via;
} else if (p == null) {
deny = true;
} else {
deny =
deny || (e.params['provider'] as Providers).via == p.toString();
deny || (e.params!['provider'] as Providers).via == p.toString();
}
}
e.params['broadcast'] = false;
e.params!['broadcast'] = false;
}
};
}

View file

@ -18,7 +18,7 @@ class WebSockets extends BaseWebSocketClient {
final List<IoWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@ -49,17 +49,18 @@ class WebSockets extends BaseWebSocketClient {
}
@override
IoWebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.replaceAll(_straySlashes, '');
return IoWebSocketsService<Id, Data>(socket, this, uri, type);
return IoWebSocketsService<Id, Data>(socket, this, uri, type)
as Service<Id, Data>;
}
}
class IoWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type type;
final Type? type;
IoWebSocketsService(
WebSocketChannel socket, WebSockets app, String uri, this.type)
WebSocketChannel? socket, WebSockets app, String uri, this.type)
: super(socket, app, uri);
}

View file

@ -13,6 +13,7 @@ import 'package:merge_map/merge_map.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:collection/collection.dart' show IterableExtension;
import 'angel_websocket.dart';
import 'constants.dart';
export 'angel_websocket.dart';
@ -36,17 +37,17 @@ class AngelWebSocket {
final StreamController<WebSocketContext> _onDisconnect =
StreamController<WebSocketContext>.broadcast();
final Angel app;
final Angel? app;
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
final bool allowClientParams;
/// An optional whitelist of allowed client origins, or [:null:].
final List<String> allowedOrigins;
final List<String>? allowedOrigins;
/// An optional whitelist of allowed client protocols, or [:null:].
final List<String> allowedProtocols;
final List<String>? allowedProtocols;
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
@ -62,7 +63,7 @@ class AngelWebSocket {
List.unmodifiable(_servicesAlreadyWired);
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final StreamChannel<WebSocketEvent> synchronizationChannel;
final StreamChannel<WebSocketEvent>? synchronizationChannel;
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
@ -77,10 +78,10 @@ class AngelWebSocket {
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
/// Serializes data to WebSockets.
WebSocketResponseSerializer serializer;
WebSocketResponseSerializer? serializer;
/// Deserializes data from WebSockets.
Function deserializer;
Function? deserializer;
AngelWebSocket(this.app,
{this.sendErrors = false,
@ -95,7 +96,11 @@ class AngelWebSocket {
deserializer ??= (params) => params;
}
HookedServiceEventListener serviceHook(String path) {
/*
* Deprecated. Original code that failed to compile after upgrading
*/
/*
HookedServiceEventListener serviceHookOriginal(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) return;
@ -115,10 +120,33 @@ class AngelWebSocket {
await batchEvent(event, filter: _filter);
};
}
*/
FutureOr<dynamic> Function(HookedServiceEvent<dynamic, dynamic, Service> e)
serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params!['broadcast'] == false) return;
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
} else if (e.params != null && e.params!.containsKey('ws:filter')) {
return e.params!['ws:filter'](e, socket);
} else {
return true;
}
}
await batchEvent(event, filter: _filter);
};
}
/// Slates an event to be dispatched.
Future<void> batchEvent(WebSocketEvent event,
{Function(WebSocketContext socket) filter, bool notify = true}) async {
{Function(WebSocketContext socket)? filter, bool notify = true}) async {
// Default implementation will just immediately fire events
_clients.forEach((client) async {
dynamic result = true;
@ -129,7 +157,7 @@ class AngelWebSocket {
});
if (synchronizationChannel != null && notify != false) {
synchronizationChannel.sink.add(event);
synchronizationChannel!.sink.add(event);
}
}
@ -138,14 +166,14 @@ class AngelWebSocket {
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split('::');
var split = action.eventName!.split('::');
if (split.length < 2) {
socket.sendError(AngelHttpException.badRequest());
return null;
}
var service = app.findService(split[0]);
var service = app!.findService(split[0]);
if (service == null) {
socket.sendError(AngelHttpException.notFound(
@ -158,16 +186,16 @@ class AngelWebSocket {
if (action.params is! Map) action.params = <String, dynamic>{};
if (allowClientParams != true) {
if (action.params['query'] is Map) {
action.params = {'query': action.params['query']};
if (action.params!['query'] is Map) {
action.params = {'query': action.params!['query']};
} else {
action.params = {};
}
}
var params = mergeMap<String, dynamic>([
((deserializer ?? (params) => params)(action.params))
as Map<String, dynamic>,
(((deserializer ?? (params) => params)(action.params))
as Map<String, dynamic>?)!,
{
'provider': Providers.websocket,
'__requestctx': socket.request,
@ -214,21 +242,21 @@ class AngelWebSocket {
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
action.eventName == authenticateAction &&
action.params['query'] is Map &&
action.params['query']['jwt'] is String) {
action.params!['query'] is Map &&
action.params!['query']['jwt'] is String) {
try {
var auth = socket.request.container.make<AngelAuth>();
var jwt = action.params['query']['jwt'] as String;
var auth = socket.request.container!.make<AngelAuth>()!;
var jwt = action.params!['query']['jwt'] as String;
AuthToken token;
token = AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
token = AuthToken.validate(jwt, auth.hmac!);
var user = await auth.deserializer!(token.userId);
socket.request
..container.registerSingleton<AuthToken>(token)
..container.registerSingleton(user, as: user.runtimeType);
..container!.registerSingleton<AuthToken>(token)
..container!.registerSingleton(user, as: user.runtimeType);
socket._onAuthenticated.add(null);
socket.send(authenticatedEvent,
{'token': token.serialize(auth.hmac), 'data': user});
{'token': token.serialize(auth.hmac!), 'data': user});
} catch (e, st) {
catchError(e, st, socket);
}
@ -242,7 +270,6 @@ class AngelWebSocket {
dynamic hookupService(Pattern _path, HookedService service) {
var path = _path.toString();
// TODO: Relook at this code
service.after(
[
HookedServiceEvent.created,
@ -268,23 +295,23 @@ class AngelWebSocket {
if (action.eventName == null ||
action.eventName is! String ||
action.eventName.isEmpty) {
action.eventName!.isEmpty) {
throw AngelHttpException.badRequest();
}
if (fromJson is Map && fromJson.containsKey('eventName')) {
socket._onAction.add(WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson['eventName'].toString())
.add(fromJson['data'] as Map);
._getStreamForEvent(fromJson['eventName'].toString())!
.add(fromJson['data'] as Map?);
}
if (action.eventName == authenticateAction) {
await handleAuth(action, socket);
}
if (action.eventName.contains('::')) {
var split = action.eventName.split('::');
if (action.eventName!.contains('::')) {
var split = action.eventName!.split('::');
if (split.length >= 2) {
if (actions.contains(split[1])) {
@ -302,16 +329,16 @@ class AngelWebSocket {
// Send an error
if (e is AngelHttpException) {
socket.sendError(e);
app.logger?.severe(e.message, e.error ?? e, e.stackTrace);
app!.logger?.severe(e.message, e.error ?? e, e.stackTrace);
} else if (sendErrors) {
var err = AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]);
socket.sendError(err);
app.logger?.severe(err.message, e, st);
app!.logger?.severe(err.message, e, st);
} else {
var err = AngelHttpException(e);
socket.sendError(err);
app.logger?.severe(e.toString(), e, st);
app!.logger?.severe(e.toString(), e, st);
}
}
@ -332,10 +359,10 @@ class AngelWebSocket {
/// Configures an [Angel] instance to listen for WebSocket connections.
Future configureServer(Angel app) async {
app.container.registerSingleton(this);
app.container!.registerSingleton(this);
if (runtimeType != AngelWebSocket) {
app.container.registerSingleton<AngelWebSocket>(this);
app.container!.registerSingleton<AngelWebSocket>(this);
}
// Set up services
@ -346,16 +373,17 @@ class AngelWebSocket {
});
if (synchronizationChannel != null) {
synchronizationChannel.stream.listen((e) => batchEvent(e, notify: false));
synchronizationChannel!.stream
.listen((e) => batchEvent(e, notify: false));
}
app.shutdownHooks.add((_) => synchronizationChannel?.sink?.close());
app.shutdownHooks.add((_) => synchronizationChannel?.sink.close());
}
/// Handles an incoming [WebSocketContext].
Future<void> handleClient(WebSocketContext socket) async {
var origin = socket.request.headers.value('origin');
if (allowedOrigins != null && !allowedOrigins.contains(origin)) {
var origin = socket.request.headers!.value('origin');
if (allowedOrigins != null && !allowedOrigins!.contains(origin)) {
throw AngelHttpException.forbidden(
message:
'WebSocket connections are not allowed from the origin "$origin".');
@ -366,7 +394,7 @@ class AngelWebSocket {
_onConnection.add(socket);
socket.request.container.registerSingleton<WebSocketContext>(socket);
socket.request.container!.registerSingleton<WebSocketContext>(socket);
socket.channel.stream.listen(
(data) {
@ -388,22 +416,22 @@ class AngelWebSocket {
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (req is HttpRequestContext && res is HttpResponseContext) {
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest)) {
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest!)) {
throw AngelHttpException.badRequest();
}
res.detach();
var ws = await WebSocketTransformer.upgrade(req.rawRequest);
var ws = await WebSocketTransformer.upgrade(req.rawRequest!);
var channel = IOWebSocketChannel(ws);
var socket = WebSocketContext(channel, req, res);
scheduleMicrotask(() => handleClient(socket));
return false;
} else if (req is Http2RequestContext && res is Http2ResponseContext) {
var connection =
req.headers['connection']?.map((s) => s.toLowerCase().trim());
var upgrade = req.headers.value('upgrade')?.toLowerCase();
var version = req.headers.value('sec-websocket-version');
var key = req.headers.value('sec-websocket-key');
var protocol = req.headers.value('sec-websocket-protocol');
req.headers!['connection']?.map((s) => s.toLowerCase().trim());
var upgrade = req.headers!.value('upgrade')?.toLowerCase();
var version = req.headers!.value('sec-websocket-version');
var key = req.headers!.value('sec-websocket-key');
var protocol = req.headers!.value('sec-websocket-protocol');
if (connection == null) {
throw AngelHttpException.badRequest(
@ -422,7 +450,7 @@ class AngelWebSocket {
message: 'Missing `sec-websocket-key` header.');
} else if (protocol != null &&
allowedProtocols != null &&
!allowedProtocols.contains(protocol)) {
!allowedProtocols!.contains(protocol)) {
throw AngelHttpException.badRequest(
message: 'Disallowed `sec-websocket-protocol` header "$protocol".');
} else {

View file

@ -61,14 +61,14 @@ class WebSocketContext {
}
class _WebSocketEventTable {
final Map<String, StreamController<Map>> _handlers = {};
final Map<String, StreamController<Map?>> _handlers = {};
StreamController<Map> _getStreamForEvent(String eventName) {
StreamController<Map?>? _getStreamForEvent(String eventName) {
if (!_handlers.containsKey(eventName)) {
_handlers[eventName] = StreamController<Map>();
_handlers[eventName] = StreamController<Map?>();
}
return _handlers[eventName];
}
Stream<Map> operator [](String key) => _getStreamForEvent(key).stream;
Stream<Map?> operator [](String key) => _getStreamForEvent(key)!.stream;
}

View file

@ -19,7 +19,7 @@ class WebSocketController extends Controller {
/// Sends an event to all clients.
void broadcast(String eventName, data,
{Function(WebSocketContext socket) filter}) {
{Function(WebSocketContext socket)? filter}) {
ws.batchEvent(WebSocketEvent(eventName: eventName, data: data),
filter: filter);
}
@ -38,7 +38,7 @@ class WebSocketController extends Controller {
@override
Future configureServer(Angel app) async {
if (findExpose(app.container.reflector) != null) {
if (findExpose(app.container!.reflector) != null) {
await super.configureServer(app);
}
@ -46,9 +46,8 @@ class WebSocketController extends Controller {
var classMirror = reflectClass(runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) {
var exposeMirror = mirror.metadata.firstWhere(
(mirror) => mirror.reflectee is ExposeWs,
orElse: () => null);
var exposeMirror = mirror.metadata
.firstWhereOrNull((mirror) => mirror.reflectee is ExposeWs);
if (exposeMirror != null) {
var exposeWs = exposeMirror.reflectee as ExposeWs;
@ -59,8 +58,8 @@ class WebSocketController extends Controller {
});
ws.onConnection.listen((socket) async {
if (!socket.request.container.has<WebSocketContext>()) {
socket.request.container.registerSingleton<WebSocketContext>(socket);
if (!socket.request.container!.has<WebSocketContext>()) {
socket.request.container!.registerSingleton<WebSocketContext>(socket);
}
await onConnect(socket);
@ -68,14 +67,14 @@ class WebSocketController extends Controller {
socket.onData.listen((data) => onData(data, socket));
socket.onAction.listen((WebSocketAction action) async {
var container = socket.request.container.createChild();
var container = socket.request.container!.createChild();
container.registerSingleton<WebSocketAction>(action);
try {
await onAction(action, socket);
if (_handlers.containsKey(action.eventName)) {
var methodMirror = _handlers[action.eventName];
var methodMirror = _handlers[action.eventName!]!;
var fn = instanceMirror.getField(methodMirror.simpleName).reflectee;
return app.runContained(
fn as Function, socket.request, socket.response, container);

View file

@ -1,7 +1,7 @@
name: angel_websocket
description: Support for using pkg:angel_client with WebSockets. Designed for Angel.
environment:
sdk: ">=2.10.0 <3.0.0"
sdk: '>=2.12.0 <3.0.0'
version: 3.0.0
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket
@ -36,6 +36,7 @@ dependencies:
meta: ^1.3.0
stream_channel: ^2.1.0
web_socket_channel: ^2.0.0
collection: ^1.15.0
dev_dependencies:
angel_container:
git:
@ -50,4 +51,7 @@ dev_dependencies:
# logging: ^0.11.0
pedantic: ^1.11.0
test: ^1.16.8
dependency_overrides:
angel_framework:
path: ../framework

View file

@ -11,9 +11,9 @@ const Map<String, String> USER = {'username': 'foo', 'password': 'bar'};
void main() {
Angel app;
AngelHttp http;
c.Angel client;
c.WebSockets ws;
late AngelHttp http;
late c.Angel client;
late c.WebSockets ws;
setUp(() async {
app = Angel();

View file

@ -2,7 +2,7 @@ import 'package:angel_framework/angel_framework.dart';
import 'package:angel_websocket/server.dart';
class Game {
final String playerOne, playerTwo;
final String? playerOne, playerTwo;
const Game({this.playerOne, this.playerTwo});

View file

@ -9,37 +9,37 @@ import 'package:test/test.dart';
import 'common.dart';
void main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.Angel? app;
late srv.AngelHttp http;
ws.WebSockets? client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
HttpServer? server;
String? url;
setUp(() async {
app = srv.Angel(reflector: const MirrorsReflector());
http = srv.AngelHttp(app, useZone: false);
http = srv.AngelHttp(app!, useZone: false);
websockets = srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
await app.configure(GameController(websockets).configureServer);
app.logger = Logger('angel_auth')..onRecord.listen(print);
await app!.configure(websockets.configureServer);
app!.all('/ws', websockets.handleRequest);
await app!.configure(GameController(websockets).configureServer);
app!.logger = Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
url = 'ws://${server!.address.address}:${server!.port}/ws';
client = ws.WebSockets(url);
await client.connect(timeout: Duration(seconds: 3));
await client!.connect(timeout: Duration(seconds: 3));
print('Connected');
client
..onData.listen((data) {
?..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
@ -51,7 +51,7 @@ void main() {
});
tearDown(() async {
await client.close();
await client!.close();
await http.close();
app = null;
client = null;
@ -61,8 +61,8 @@ void main() {
group('controller.io', () {
test('search', () async {
client.sendAction(ws.WebSocketAction(eventName: 'search'));
var search = await client.on['searched'].first;
client!.sendAction(ws.WebSocketAction(eventName: 'search'));
var search = await client!.on['searched'].first;
print('Searched: ${search.data}');
expect(Game.fromJson(search.data as Map), equals(johnVsBob));
});

View file

@ -7,8 +7,8 @@ import 'package:angel_websocket/server.dart';
import 'package:test/test.dart';
class Todo extends Model {
String text;
String when;
String? text;
String? when;
Todo({this.text, this.when});
}

View file

@ -9,34 +9,34 @@ import 'package:test/test.dart';
import 'common.dart';
void main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.Angel? app;
late srv.AngelHttp http;
ws.WebSockets? client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
HttpServer? server;
String? url;
setUp(() async {
app = srv.Angel(reflector: MirrorsReflector())
..use('/api/todos', TodoService());
http = srv.AngelHttp(app, useZone: false);
http = srv.AngelHttp(app!, useZone: false);
websockets = srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
app.logger = Logger('angel_auth')..onRecord.listen(print);
await app!.configure(websockets.configureServer);
app!.all('/ws', websockets.handleRequest);
app!.logger = Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
url = 'ws://${server!.address.address}:${server!.port}/ws';
client = ws.WebSockets(url);
await client.connect();
await client!.connect();
client
..onData.listen((data) {
?..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
@ -48,8 +48,8 @@ void main() {
});
tearDown(() async {
await client.close();
await http.server.close(force: true);
await client!.close();
await http.server!.close(force: true);
app = null;
client = null;
@ -59,6 +59,6 @@ void main() {
});
group('service.io', () {
test('index', () => testIndex(client));
test('index', () => testIndex(client!));
});
}