diff --git a/CHANGELOG.md b/CHANGELOG.md index 323832a9..25e98b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * Migrated jael to 4.0.0 (20/20 tests passed) * Migrated jael_preprocessor to 3.0.0 (5/5 tests passed) * Migrated angel_jael to 4.0.0 (1/1 test passed) -* Updated pub_sub to 3.0.0 (in progress) +* Updated pub_sub to 4.0.0 (16/16 tests passed) * Updated production to 2.0.0 (in progress) * Updated hot to 3.0.0 (in progress) * Updated static to 3.0.0 (in progress) @@ -47,7 +47,7 @@ * Updated jael_preprocessor to 3.0.0 (5/5 tests passed) * Updated test to 3.0.0 (1/1 tests passed) * Updated angel_jael to 3.0.0 (1/1 tests passed, Issue with 2 dependencies) -* Added pub_sub and updated to 3.0.0 (8/16 tests passed) +* Added pub_sub and updated to 3.0.0 (16/16 tests passed) * Updated production to 2.0.0 (0/0 tests passed) * Updated hot to 3.0.0 (0/0 tests passed) * Updated static to 3.0.0 (12/12 tests passed) diff --git a/packages/pub_sub/lib/src/isolate/client.dart b/packages/pub_sub/lib/src/isolate/client.dart index 2776615f..3c8e7d55 100644 --- a/packages/pub_sub/lib/src/isolate/client.dart +++ b/packages/pub_sub/lib/src/isolate/client.dart @@ -6,50 +6,49 @@ import '../../pub_sub.dart'; /// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s. class IsolateClient extends Client { - final Queue> _onConnect = new Queue>(); + final Queue> _onConnect = Queue>(); final Map> _requests = {}; final List<_IsolateClientSubscription> _subscriptions = []; - final Uuid _uuid = new Uuid(); + final Uuid _uuid = Uuid(); - String _id; + String? _id; /// The ID of the client we are authenticating as. /// /// May be `null`, if and only if we are marked as a trusted source on /// the server side. - String get clientId => _clientId; - String _clientId; + String? get clientId => _clientId; + String? _clientId; /// A server's [SendPort] that messages should be sent to. final SendPort serverSendPort; /// A [ReceivePort] that receives messages from the server. - final ReceivePort receivePort = new ReceivePort(); + final ReceivePort receivePort = ReceivePort(); - IsolateClient(String clientId, this.serverSendPort) { + IsolateClient(String? clientId, this.serverSendPort) { _clientId = clientId; receivePort.listen((data) { if (data is Map && data['request_id'] is String) { - var requestId = data['request_id'] as String; + var requestId = data['request_id'] as String?; var c = _requests.remove(requestId); if (c != null && !c.isCompleted) { if (data['status'] is! bool) { c.completeError( - new FormatException('The server sent an invalid response.')); + FormatException('The server sent an invalid response.')); } else if (!(data['status'] as bool)) { - c.completeError(new PubSubException(data['error_message'] - ?.toString() ?? + c.completeError(PubSubException(data['error_message']?.toString() ?? 'The server sent a failure response, but did not provide an error message.')); } else if (data['result'] is! Map) { - c.completeError(new FormatException( + c.completeError(FormatException( 'The server sent a success response, but did not include a result.')); } else { - c.complete(data['result'] as Map); + c.complete(data['result'] as Map?); } } } else if (data is Map && data['id'] is String && _id == null) { - _id = data['id'] as String; + _id = data['id'] as String?; for (var c in _onConnect) { if (!c.isCompleted) c.complete(_id); @@ -57,7 +56,8 @@ class IsolateClient extends Client { _onConnect.clear(); } else if (data is List && data.length == 2 && data[0] is String) { - var eventName = data[0] as String, event = data[1]; + var eventName = data[0] as String; + var event = data[1]; for (var s in _subscriptions.where((s) => s.eventName == eventName)) { if (!s._stream.isClosed) s._stream.add(event); } @@ -66,11 +66,11 @@ class IsolateClient extends Client { serverSendPort.send(receivePort.sendPort); } - Future _whenConnected(FutureOr callback()) { - if (_id != null) - return new Future.sync(callback); - else { - var c = new Completer(); + Future _whenConnected(FutureOr Function() callback) { + if (_id != null) { + return Future.sync(callback); + } else { + var c = Completer(); _onConnect.add(c); return c.future.then((_) => callback()); } @@ -79,7 +79,7 @@ class IsolateClient extends Client { @override Future publish(String eventName, value) { return _whenConnected(() { - var c = new Completer(); + var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; serverSendPort.send({ @@ -93,7 +93,7 @@ class IsolateClient extends Client { } }); return c.future.then((result) { - _clientId = result['client_id'] as String; + _clientId = result['client_id'] as String?; }); }); } @@ -101,7 +101,7 @@ class IsolateClient extends Client { @override Future subscribe(String eventName) { return _whenConnected(() { - var c = new Completer(); + var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; serverSendPort.send({ @@ -111,9 +111,9 @@ class IsolateClient extends Client { 'params': {'client_id': clientId, 'event_name': eventName} }); return c.future.then((result) { - _clientId = result['client_id'] as String; - var s = new _IsolateClientSubscription( - eventName, result['subscription_id'] as String, this); + _clientId = result['client_id'] as String?; + var s = _IsolateClientSubscription( + eventName, result['subscription_id'] as String?, this); _subscriptions.add(s); return s; }); @@ -126,28 +126,30 @@ class IsolateClient extends Client { for (var c in _onConnect) { if (!c.isCompleted) { - c.completeError(new StateError( + c.completeError(StateError( 'The client was closed before the server ever accepted the connection.')); } } for (var c in _requests.values) { if (!c.isCompleted) { - c.completeError(new StateError( + c.completeError(StateError( 'The client was closed before the server responded to this request.')); } } - for (var s in _subscriptions) s._close(); + for (var s in _subscriptions) { + s._close(); + } _requests.clear(); - return new Future.value(); + return Future.value(); } } class _IsolateClientSubscription extends ClientSubscription { - final StreamController _stream = new StreamController(); - final String eventName, id; + final StreamController _stream = StreamController(); + final String? eventName, id; final IsolateClient client; _IsolateClientSubscription(this.eventName, this.id, this.client); @@ -157,8 +159,8 @@ class _IsolateClientSubscription extends ClientSubscription { } @override - StreamSubscription listen(void onData(event), - {Function onError, void onDone(), bool cancelOnError}) { + StreamSubscription listen(void onData(event)?, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { return _stream.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } @@ -166,7 +168,7 @@ class _IsolateClientSubscription extends ClientSubscription { @override Future unsubscribe() { return client._whenConnected(() { - var c = new Completer(); + var c = Completer(); var requestId = client._uuid.v4(); client._requests[requestId] = c; client.serverSendPort.send({ diff --git a/packages/pub_sub/lib/src/isolate/server.dart b/packages/pub_sub/lib/src/isolate/server.dart index c442d461..6d75b9f1 100644 --- a/packages/pub_sub/lib/src/isolate/server.dart +++ b/packages/pub_sub/lib/src/isolate/server.dart @@ -7,15 +7,15 @@ import '../../pub_sub.dart'; class IsolateAdapter extends Adapter { final Map _clients = {}; final StreamController _onPublish = - new StreamController(); + StreamController(); final StreamController _onSubscribe = - new StreamController(); + StreamController(); final StreamController _onUnsubscribe = - new StreamController(); - final Uuid _uuid = new Uuid(); + StreamController(); + final Uuid _uuid = Uuid(); /// A [ReceivePort] on which to listen for incoming data. - final ReceivePort receivePort = new ReceivePort(); + final ReceivePort receivePort = ReceivePort(); @override Stream get onPublish => _onPublish.stream; @@ -33,7 +33,7 @@ class IsolateAdapter extends Adapter { _onPublish.close(); _onSubscribe.close(); _onUnsubscribe.close(); - return new Future.value(); + return Future.value(); } @override @@ -48,22 +48,22 @@ class IsolateAdapter extends Adapter { data['request_id'] is String && data['method'] is String && data['params'] is Map) { - var id = data['id'] as String, - requestId = data['request_id'] as String, - method = data['method'] as String; - var params = data['params'] as Map; - var sp = _clients[id]; + var id = data['id'] as String?, + requestId = data['request_id'] as String?, + method = data['method'] as String?; + var params = data['params'] as Map?; + var sp = _clients[id!]; if (sp == null) { // There's nobody to respond to, so don't send anything to anyone. Oops. } else if (method == 'publish') { - if (_isValidClientId(params['client_id']) && + if (_isValidClientId(params!['client_id']) && params['event_name'] is String && params.containsKey('value')) { - var clientId = params['client_id'] as String, - eventName = params['event_name'] as String; + var clientId = params['client_id'] as String?, + eventName = params['event_name'] as String?; var value = params['value']; - var rq = new _IsolatePublishRequestImpl( + var rq = _IsolatePublishRequestImpl( requestId, clientId, eventName, value, sp); _onPublish.add(rq); } else { @@ -74,11 +74,11 @@ class IsolateAdapter extends Adapter { }); } } else if (method == 'subscribe') { - if (_isValidClientId(params['client_id']) && + if (_isValidClientId(params!['client_id']) && params['event_name'] is String) { - var clientId = params['client_id'] as String, - eventName = params['event_name'] as String; - var rq = new _IsolateSubscriptionRequestImpl( + var clientId = params['client_id'] as String?, + eventName = params['event_name'] as String?; + var rq = _IsolateSubscriptionRequestImpl( clientId, eventName, sp, requestId, _uuid); _onSubscribe.add(rq); } else { @@ -89,11 +89,11 @@ class IsolateAdapter extends Adapter { }); } } else if (method == 'unsubscribe') { - if (_isValidClientId(params['client_id']) && + if (_isValidClientId(params!['client_id']) && params['subscription_id'] is String) { - var clientId = params['client_id'] as String, - subscriptionId = params['subscription_id'] as String; - var rq = new _IsolateUnsubscriptionRequestImpl( + var clientId = params['client_id'] as String?, + subscriptionId = params['subscription_id'] as String?; + var rq = _IsolateUnsubscriptionRequestImpl( clientId, subscriptionId, sp, requestId); _onUnsubscribe.add(rq); } else { @@ -132,17 +132,17 @@ class IsolateAdapter extends Adapter { class _IsolatePublishRequestImpl extends PublishRequest { @override - final String clientId; + final String? clientId; @override - final String eventName; + final String? eventName; @override final value; final SendPort sendPort; - final String requestId; + final String? requestId; _IsolatePublishRequestImpl( this.requestId, this.clientId, this.eventName, this.value, this.sendPort); @@ -171,14 +171,14 @@ class _IsolatePublishRequestImpl extends PublishRequest { class _IsolateSubscriptionRequestImpl extends SubscriptionRequest { @override - final String clientId; + final String? clientId; @override - final String eventName; + final String? eventName; final SendPort sendPort; - final String requestId; + final String? requestId; final Uuid _uuid; @@ -195,22 +195,22 @@ class _IsolateSubscriptionRequestImpl extends SubscriptionRequest { } @override - FutureOr accept(String clientId) { + FutureOr accept(String? clientId) { var id = _uuid.v4(); sendPort.send({ 'status': true, 'request_id': requestId, 'result': {'subscription_id': id, 'client_id': clientId} }); - return new _IsolateSubscriptionImpl(clientId, id, eventName, sendPort); + return _IsolateSubscriptionImpl(clientId, id, eventName, sendPort); } } class _IsolateSubscriptionImpl extends Subscription { @override - final String clientId, id; + final String? clientId, id; - final String eventName; + final String? eventName; final SendPort sendPort; @@ -225,14 +225,14 @@ class _IsolateSubscriptionImpl extends Subscription { class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest { @override - final String clientId; + final String? clientId; @override - final String subscriptionId; + final String? subscriptionId; final SendPort sendPort; - final String requestId; + final String? requestId; _IsolateUnsubscriptionRequestImpl( this.clientId, this.subscriptionId, this.sendPort, this.requestId); diff --git a/packages/pub_sub/lib/src/json_rpc/client.dart b/packages/pub_sub/lib/src/json_rpc/client.dart index e72cbedc..e9146e04 100644 --- a/packages/pub_sub/lib/src/json_rpc/client.dart +++ b/packages/pub_sub/lib/src/json_rpc/client.dart @@ -8,43 +8,42 @@ import '../../pub_sub.dart'; class JsonRpc2Client extends Client { final Map> _requests = {}; final List<_JsonRpc2ClientSubscription> _subscriptions = []; - final Uuid _uuid = new Uuid(); + final Uuid _uuid = Uuid(); - json_rpc_2.Peer _peer; + json_rpc_2.Peer? _peer; /// The ID of the client we are authenticating as. /// /// May be `null`, if and only if we are marked as a trusted source on /// the server side. - String get clientId => _clientId; - String _clientId; + String? get clientId => _clientId; + String? _clientId; - JsonRpc2Client(String clientId, StreamChannel channel) { + JsonRpc2Client(String? clientId, StreamChannel channel) { _clientId = clientId; - _peer = new json_rpc_2.Peer(channel); + _peer = json_rpc_2.Peer(channel); - _peer.registerMethod('event', (json_rpc_2.Parameters params) { - var eventName = params['event_name'].asString, - event = params['value'].value; + _peer!.registerMethod('event', (json_rpc_2.Parameters params) { + String? eventName = params['event_name'].asString; + var event = params['value'].value; for (var s in _subscriptions.where((s) => s.eventName == eventName)) { if (!s._stream.isClosed) s._stream.add(event); } }); - _peer.registerFallback((json_rpc_2.Parameters params) { + _peer!.registerFallback((json_rpc_2.Parameters params) { var c = _requests.remove(params.method); - if (c == null) - throw new json_rpc_2.RpcException.methodNotFound(params.method); - else { + if (c == null) { + throw json_rpc_2.RpcException.methodNotFound(params.method); + } else { var data = params.asMap; if (data['status'] is! bool) { c.completeError( - new FormatException('The server sent an invalid response.')); + FormatException('The server sent an invalid response.')); } else if (!(data['status'] as bool)) { - c.completeError(new PubSubException(data['error_message'] - ?.toString() ?? + c.completeError(PubSubException(data['error_message']?.toString() ?? 'The server sent a failure response, but did not provide an error message.')); } else { c.complete(data); @@ -52,39 +51,39 @@ class JsonRpc2Client extends Client { } }); - _peer.listen(); + _peer!.listen(); } @override Future publish(String eventName, value) { - var c = new Completer(); + var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; - _peer.sendNotification('publish', { + _peer!.sendNotification('publish', { 'request_id': requestId, 'client_id': clientId, 'event_name': eventName, 'value': value }); return c.future.then((data) { - _clientId = data['result']['client_id'] as String; + _clientId = data['result']['client_id'] as String?; }); } @override Future subscribe(String eventName) { - var c = new Completer(); + var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; - _peer.sendNotification('subscribe', { + _peer!.sendNotification('subscribe', { 'request_id': requestId, 'client_id': clientId, 'event_name': eventName }); return c.future.then((result) { - _clientId = result['client_id'] as String; - var s = new _JsonRpc2ClientSubscription( - eventName, result['subscription_id'] as String, this); + _clientId = result['client_id'] as String?; + var s = _JsonRpc2ClientSubscription( + eventName, result['subscription_id'] as String?, this); _subscriptions.add(s); return s; }); @@ -92,11 +91,11 @@ class JsonRpc2Client extends Client { @override Future close() { - if (_peer?.isClosed != true) _peer.close(); + if (_peer?.isClosed != true) _peer!.close(); for (var c in _requests.values) { if (!c.isCompleted) { - c.completeError(new StateError( + c.completeError(StateError( 'The client was closed before the server responded to this request.')); } } @@ -104,13 +103,13 @@ class JsonRpc2Client extends Client { for (var s in _subscriptions) s._close(); _requests.clear(); - return new Future.value(); + return Future.value(); } } class _JsonRpc2ClientSubscription extends ClientSubscription { - final StreamController _stream = new StreamController(); - final String eventName, id; + final StreamController _stream = StreamController(); + final String? eventName, id; final JsonRpc2Client client; _JsonRpc2ClientSubscription(this.eventName, this.id, this.client); @@ -120,18 +119,18 @@ class _JsonRpc2ClientSubscription extends ClientSubscription { } @override - StreamSubscription listen(void onData(event), - {Function onError, void onDone(), bool cancelOnError}) { + StreamSubscription listen(void onData(event)?, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { return _stream.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } @override Future unsubscribe() { - var c = new Completer(); + var c = Completer(); var requestId = client._uuid.v4(); client._requests[requestId] = c; - client._peer.sendNotification('unsubscribe', { + client._peer!.sendNotification('unsubscribe', { 'request_id': requestId, 'client_id': client.clientId, 'subscription_id': id diff --git a/packages/pub_sub/lib/src/json_rpc/server.dart b/packages/pub_sub/lib/src/json_rpc/server.dart index 5cf4fb84..e7080ce1 100644 --- a/packages/pub_sub/lib/src/json_rpc/server.dart +++ b/packages/pub_sub/lib/src/json_rpc/server.dart @@ -7,16 +7,16 @@ import '../../pub_sub.dart'; /// A [Adapter] implementation that communicates via JSON RPC 2.0. class JsonRpc2Adapter extends Adapter { final StreamController _onPublish = - new StreamController(); + StreamController(); final StreamController _onSubscribe = - new StreamController(); + StreamController(); final StreamController _onUnsubscribe = - new StreamController(); + StreamController(); final List _peers = []; - final Uuid _uuid = new Uuid(); + final Uuid _uuid = Uuid(); - json_rpc_2.Peer _peer; + json_rpc_2.Peer? _peer; /// A [Stream] of incoming clients, who can both send and receive string data. final Stream> clientStream; @@ -44,10 +44,10 @@ class JsonRpc2Adapter extends Adapter { Future.wait(_peers.where((s) => !s.isClosed).map((s) => s.close())) .then((_) => _peers.clear()); - return new Future.value(); + return Future.value(); } - String _getClientId(json_rpc_2.Parameters params) { + String? _getClientId(json_rpc_2.Parameters params) { try { return params['client_id'].asString; } catch (_) { @@ -58,14 +58,14 @@ class JsonRpc2Adapter extends Adapter { @override void start() { clientStream.listen((client) { - var peer = _peer = new json_rpc_2.Peer(client); + var peer = _peer = json_rpc_2.Peer(client); peer.registerMethod('publish', (json_rpc_2.Parameters params) async { var requestId = params['request_id'].asString; var clientId = _getClientId(params); var eventName = params['event_name'].asString; var value = params['value'].value; - var rq = new _JsonRpc2PublishRequestImpl( + var rq = _JsonRpc2PublishRequestImpl( requestId, clientId, eventName, value, peer); _onPublish.add(rq); }); @@ -74,7 +74,7 @@ class JsonRpc2Adapter extends Adapter { var requestId = params['request_id'].asString; var clientId = _getClientId(params); var eventName = params['event_name'].asString; - var rq = new _JsonRpc2SubscriptionRequestImpl( + var rq = _JsonRpc2SubscriptionRequestImpl( clientId, eventName, requestId, peer, _uuid); _onSubscribe.add(rq); }); @@ -83,7 +83,7 @@ class JsonRpc2Adapter extends Adapter { var requestId = params['request_id'].asString; var clientId = _getClientId(params); var subscriptionId = params['subscription_id'].asString; - var rq = new _JsonRpc2UnsubscriptionRequestImpl( + var rq = _JsonRpc2UnsubscriptionRequestImpl( clientId, subscriptionId, peer, requestId); _onUnsubscribe.add(rq); }); @@ -104,8 +104,14 @@ class JsonRpc2Adapter extends Adapter { } class _JsonRpc2PublishRequestImpl extends PublishRequest { - final String requestId, clientId, eventName; + final String requestId; + + @override + final String? clientId, eventName; + + @override final value; + final json_rpc_2.Peer peer; _JsonRpc2PublishRequestImpl( @@ -135,7 +141,7 @@ class _JsonRpc2PublishRequestImpl extends PublishRequest { class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest { @override - final String clientId, eventName; + final String? clientId, eventName; final String requestId; @@ -147,7 +153,7 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest { this.clientId, this.eventName, this.requestId, this.peer, this._uuid); @override - FutureOr accept(String clientId) { + FutureOr accept(String? clientId) { var id = _uuid.v4(); peer.sendNotification(requestId, { 'status': true, @@ -155,7 +161,7 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest { 'subscription_id': id, 'client_id': clientId }); - return new _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer); + return _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer); } @override @@ -170,9 +176,9 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest { class _JsonRpc2SubscriptionImpl extends Subscription { @override - final String clientId, id; + final String? clientId, id; - final String eventName; + final String? eventName; final json_rpc_2.Peer peer; @@ -186,7 +192,7 @@ class _JsonRpc2SubscriptionImpl extends Subscription { class _JsonRpc2UnsubscriptionRequestImpl extends UnsubscriptionRequest { @override - final String clientId; + final String? clientId; @override final String subscriptionId; diff --git a/packages/pub_sub/lib/src/protocol/server/publish.dart b/packages/pub_sub/lib/src/protocol/server/publish.dart index 03fb7903..b6cbf95b 100644 --- a/packages/pub_sub/lib/src/protocol/server/publish.dart +++ b/packages/pub_sub/lib/src/protocol/server/publish.dart @@ -1,10 +1,10 @@ /// Represents a request to publish information to other clients. abstract class PublishRequest { /// The ID of the client sending this request. - String get clientId; + String? get clientId; /// The name of the event to be sent. - String get eventName; + String? get eventName; /// The value to be published as an event. dynamic get value; @@ -22,7 +22,7 @@ class PublishResponse { final int listeners; /// The client ID returned the server. Significant in cases where an ad-hoc client was registered. - final String clientId; + final String? clientId; const PublishResponse(this.listeners, this.clientId); } diff --git a/packages/pub_sub/lib/src/protocol/server/server.dart b/packages/pub_sub/lib/src/protocol/server/server.dart index 0ee8fdb5..b0cc08d7 100644 --- a/packages/pub_sub/lib/src/protocol/server/server.dart +++ b/packages/pub_sub/lib/src/protocol/server/server.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:math'; import 'adapter.dart'; import 'client.dart'; +import 'package:collection/collection.dart' show IterableExtension; import 'publish.dart'; import 'subscription.dart'; @@ -12,32 +13,32 @@ import 'subscription.dart'; class Server { final List _adapters = []; final List _clients = []; - final _rnd = new Random.secure(); - final Map> _subscriptions = {}; + final _rnd = Random.secure(); + final Map> _subscriptions = {}; bool _started = false; int _adHocIds = 0; /// Initialize a server, optionally with a number of [adapters]. Server([Iterable adapters = const []]) { - _adapters.addAll(adapters ?? []); + _adapters.addAll(adapters); } /// Adds a new [Adapter] to adapt incoming clients from a new interface. void addAdapter(Adapter adapter) { - if (_started) - throw new StateError( + if (_started) { + throw StateError( 'You cannot add new adapters after the server has started listening.'); - else { + } else { _adapters.add(adapter); } } /// Registers a new client with the server. void registerClient(ClientInfo client) { - if (_started) - throw new StateError( + if (_started) { + throw StateError( 'You cannot register new clients after the server has started listening.'); - else { + } else { _clients.add(client); } } @@ -48,25 +49,26 @@ class Server { _adapters.clear(); _clients.clear(); _subscriptions.clear(); - return new Future.value(); + return Future.value(); } String _newClientId() { // Create an unpredictable-enough ID. The harder it is for an attacker to guess, the better. var id = - 'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${new DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}'; + 'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}'; // This client is coming from a trusted source, and can therefore both publish and subscribe. - _clients.add(new ClientInfo(id)); + _clients.add(ClientInfo(id)); return id; } void start() { - if (_adapters.isEmpty) - throw new StateError( + if (_adapters.isEmpty) { + throw StateError( 'Cannot start a SyncServer that has no adapters attached.'); - else if (_started) - throw new StateError('A SyncServer may only be started once.'); + } else if (_started) { + throw StateError('A SyncServer may only be started once.'); + } _started = true; @@ -77,15 +79,14 @@ class Server { for (var adapter in _adapters) { // Handle publishes adapter.onPublish.listen((rq) { - ClientInfo client; - String clientId; + ClientInfo? client; + String? clientId; if (rq.clientId?.isNotEmpty == true || adapter.isTrustedPublishRequest(rq)) { clientId = rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId(); - client = - _clients.firstWhere((c) => c.id == clientId, orElse: () => null); + client = _clients.firstWhereOrNull((c) => c.id == clientId); } if (client == null) { @@ -98,28 +99,27 @@ class Server { []; if (listeners.isEmpty) { - rq.accept(new PublishResponse(0, clientId)); + rq.accept(PublishResponse(0, clientId)); } else { for (var listener in listeners) { listener.dispatch(rq.value); } - rq.accept(new PublishResponse(listeners.length, clientId)); + rq.accept(PublishResponse(listeners.length, clientId)); } } }); // Listen for incoming subscriptions adapter.onSubscribe.listen((rq) async { - ClientInfo client; - String clientId; + ClientInfo? client; + String? clientId; if (rq.clientId?.isNotEmpty == true || adapter.isTrustedSubscriptionRequest(rq)) { clientId = rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId(); - client = - _clients.firstWhere((c) => c.id == clientId, orElse: () => null); + client = _clients.firstWhereOrNull((c) => c.id == clientId); } if (client == null) { @@ -135,12 +135,11 @@ class Server { // Unregister subscriptions on unsubscribe adapter.onUnsubscribe.listen((rq) { - Subscription toRemove; - List sourceList; + Subscription? toRemove; + late List sourceList; for (var list in _subscriptions.values) { - toRemove = list.firstWhere((s) => s.id == rq.subscriptionId, - orElse: () => null); + toRemove = list.firstWhereOrNull((s) => s.id == rq.subscriptionId); if (toRemove != null) { sourceList = list; break; diff --git a/packages/pub_sub/lib/src/protocol/server/subscription.dart b/packages/pub_sub/lib/src/protocol/server/subscription.dart index 57efcbf5..9f5db231 100644 --- a/packages/pub_sub/lib/src/protocol/server/subscription.dart +++ b/packages/pub_sub/lib/src/protocol/server/subscription.dart @@ -3,15 +3,15 @@ import 'dart:async'; /// Represents a request to subscribe to an event. abstract class SubscriptionRequest { /// The ID of the client requesting to subscribe. - String get clientId; + String? get clientId; /// The name of the event the client wants to subscribe to. - String get eventName; + String? get eventName; /// Accept the request, and grant the client access to subscribe to the event. /// /// Includes the client's ID, which is necessary for ad-hoc clients. - FutureOr accept(String clientId); + FutureOr accept(String? clientId); /// Deny the request with an error message. void reject(String errorMessage); @@ -20,10 +20,10 @@ abstract class SubscriptionRequest { /// Represents a request to unsubscribe to an event. abstract class UnsubscriptionRequest { /// The ID of the client requesting to unsubscribe. - String get clientId; + String? get clientId; /// The name of the event the client wants to unsubscribe from. - String get subscriptionId; + String? get subscriptionId; /// Accept the request. FutureOr accept(); @@ -37,10 +37,10 @@ abstract class UnsubscriptionRequest { /// Also provides a means to fire an event. abstract class Subscription { /// A unique identifier for this subscription. - String get id; + String? get id; /// The ID of the client who requested this subscription. - String get clientId; + String? get clientId; /// Alerts a client of an event. void dispatch(event); diff --git a/packages/pub_sub/pubspec.yaml b/packages/pub_sub/pubspec.yaml index f214636a..90fd4058 100644 --- a/packages/pub_sub/pubspec.yaml +++ b/packages/pub_sub/pubspec.yaml @@ -5,11 +5,12 @@ author: Tobe O homepage: https://github.com/thosakwe/pub_sub publish_to: none environment: - sdk: ">=2.10.0 <3.0.0" + sdk: '>=2.12.0 <3.0.0' dependencies: - json_rpc_2: ^2.0.0 - stream_channel: ">=1.0.0 <3.0.0" - uuid: ^3.0.1 + json_rpc_2: ^3.0.0 + stream_channel: ^2.1.0 + uuid: ^3.0.4 + collection: ^1.15.0-nullsafety.4 dev_dependencies: - pedantic: ^1.0.0 - test: ^1.0.0 + pedantic: ^1.11.0 + test: ^1.17.3 diff --git a/packages/pub_sub/test/isolate_test.dart b/packages/pub_sub/test/isolate_test.dart index 1d1c0136..5a3ac8d8 100644 --- a/packages/pub_sub/test/isolate_test.dart +++ b/packages/pub_sub/test/isolate_test.dart @@ -3,23 +3,23 @@ import 'package:pub_sub/pub_sub.dart'; import 'package:pub_sub/isolate.dart'; import 'package:test/test.dart'; -main() { - Server server; - Client client1, client2, client3; - IsolateClient trustedClient; - IsolateAdapter adapter; +void main() { + late Server server; + late Client client1, client2, client3; + late IsolateClient trustedClient; + late IsolateAdapter adapter; setUp(() async { - adapter = new IsolateAdapter(); + adapter = IsolateAdapter(); client1 = - new IsolateClient('isolate_test::secret', adapter.receivePort.sendPort); - client2 = new IsolateClient( - 'isolate_test::secret2', adapter.receivePort.sendPort); - client3 = new IsolateClient( - 'isolate_test::secret3', adapter.receivePort.sendPort); - trustedClient = new IsolateClient(null, adapter.receivePort.sendPort); + IsolateClient('isolate_test::secret', adapter.receivePort.sendPort); + client2 = + IsolateClient('isolate_test::secret2', adapter.receivePort.sendPort); + client3 = + IsolateClient('isolate_test::secret3', adapter.receivePort.sendPort); + trustedClient = IsolateClient(null, adapter.receivePort.sendPort); - server = new Server([adapter]) + server = Server([adapter]) ..registerClient(const ClientInfo('isolate_test::secret')) ..registerClient(const ClientInfo('isolate_test::secret2')) ..registerClient(const ClientInfo('isolate_test::secret3')) @@ -51,7 +51,7 @@ main() { expect(trustedClient.clientId, isNotNull); }); test('can sub/unsub', () async { - String clientId; + String? clientId; await trustedClient.publish('heyaaa', 'byeaa'); expect(clientId = trustedClient.clientId, isNotNull); @@ -88,7 +88,7 @@ main() { group('isolate_server', () { test('reject unknown client id', () async { try { - var client = new IsolateClient( + var client = IsolateClient( 'isolate_test::invalid', adapter.receivePort.sendPort); await client.publish('foo', 'bar'); throw 'Invalid client ID\'s should throw an error, but they do not.'; @@ -99,7 +99,7 @@ main() { test('reject unprivileged publish', () async { try { - var client = new IsolateClient( + var client = IsolateClient( 'isolate_test::no_publish', adapter.receivePort.sendPort); await client.publish('foo', 'bar'); throw 'Unprivileged publishes should throw an error, but they do not.'; @@ -110,7 +110,7 @@ main() { test('reject unprivileged subscribe', () async { try { - var client = new IsolateClient( + var client = IsolateClient( 'isolate_test::no_subscribe', adapter.receivePort.sendPort); await client.subscribe('foo'); throw 'Unprivileged subscribes should throw an error, but they do not.'; diff --git a/packages/pub_sub/test/json_rpc_2_test.dart b/packages/pub_sub/test/json_rpc_2_test.dart index 2e959552..ff02ae3b 100644 --- a/packages/pub_sub/test/json_rpc_2_test.dart +++ b/packages/pub_sub/test/json_rpc_2_test.dart @@ -6,17 +6,17 @@ import 'package:pub_sub/json_rpc_2.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:test/test.dart'; -main() { - ServerSocket serverSocket; - Server server; - Client client1, client2, client3; - JsonRpc2Client trustedClient; +void main() { + late ServerSocket serverSocket; + late Server server; + late Client client1, client2, client3; + late JsonRpc2Client trustedClient; JsonRpc2Adapter adapter; setUp(() async { serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0); - adapter = new JsonRpc2Adapter( + adapter = JsonRpc2Adapter( serverSocket.map>(streamSocket), isTrusted: true); @@ -29,15 +29,12 @@ main() { var socket4 = await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port); - client1 = - new JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1)); - client2 = - new JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2)); - client3 = - new JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3)); - trustedClient = new JsonRpc2Client(null, streamSocket(socket4)); + client1 = JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1)); + client2 = JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2)); + client3 = JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3)); + trustedClient = JsonRpc2Client(null, streamSocket(socket4)); - server = new Server([adapter]) + server = Server([adapter]) ..registerClient(const ClientInfo('json_rpc_2_test::secret')) ..registerClient(const ClientInfo('json_rpc_2_test::secret2')) ..registerClient(const ClientInfo('json_rpc_2_test::secret3')) @@ -64,7 +61,7 @@ main() { expect(trustedClient.clientId, isNotNull); }); test('can sub/unsub', () async { - String clientId; + String? clientId; await trustedClient.publish('heyaaa', 'byeaa'); expect(clientId = trustedClient.clientId, isNotNull); @@ -104,7 +101,7 @@ main() { var sock = await Socket.connect( InternetAddress.loopbackIPv4, serverSocket.port); var client = - new JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock)); + JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock)); await client.publish('foo', 'bar'); throw 'Invalid client ID\'s should throw an error, but they do not.'; } on PubSubException catch (e) { @@ -116,8 +113,8 @@ main() { try { var sock = await Socket.connect( InternetAddress.loopbackIPv4, serverSocket.port); - var client = new JsonRpc2Client( - 'json_rpc_2_test::no_publish', streamSocket(sock)); + var client = + JsonRpc2Client('json_rpc_2_test::no_publish', streamSocket(sock)); await client.publish('foo', 'bar'); throw 'Unprivileged publishes should throw an error, but they do not.'; } on PubSubException catch (e) { @@ -129,8 +126,8 @@ main() { try { var sock = await Socket.connect( InternetAddress.loopbackIPv4, serverSocket.port); - var client = new JsonRpc2Client( - 'json_rpc_2_test::no_subscribe', streamSocket(sock)); + var client = + JsonRpc2Client('json_rpc_2_test::no_subscribe', streamSocket(sock)); await client.subscribe('foo'); throw 'Unprivileged subscribes should throw an error, but they do not.'; } on PubSubException catch (e) { @@ -141,18 +138,20 @@ main() { } StreamChannel streamSocket(Socket socket) { - var channel = new _SocketStreamChannel(socket); - return channel.transform(new StreamChannelTransformer.fromCodec(utf8)); + var channel = _SocketStreamChannel(socket); + return channel + .cast>() + .transform(StreamChannelTransformer.fromCodec(utf8)); } class _SocketStreamChannel extends StreamChannelMixin> { - _SocketSink _sink; + _SocketSink? _sink; final Socket socket; _SocketStreamChannel(this.socket); @override - StreamSink> get sink => _sink ??= new _SocketSink(socket); + StreamSink> get sink => _sink ??= _SocketSink(socket); @override Stream> get stream => socket; @@ -169,7 +168,7 @@ class _SocketSink extends StreamSink> { } @override - void addError(Object error, [StackTrace stackTrace]) { + void addError(Object error, [StackTrace? stackTrace]) { Zone.current.errorCallback(error, stackTrace); }