From dfccf23629c223c2f3d4005933343bc53d43a850 Mon Sep 17 00:00:00 2001 From: "thomashii@dukefirehawk.com" Date: Tue, 12 Dec 2023 09:30:25 +0800 Subject: [PATCH] Updated pub_sub --- .gitignore | 3 +- packages/pub_sub/CHANGELOG.md | 6 + packages/pub_sub/README.md | 5 +- packages/pub_sub/example/main.dart | 14 +- packages/pub_sub/lib/src/isolate/client.dart | 100 +++++----- packages/pub_sub/lib/src/isolate/server.dart | 143 +++++--------- packages/pub_sub/lib/src/isolate/shared.dart | 181 ++++++++++++++++++ .../lib/src/protocol/server/publish.dart | 2 +- packages/pub_sub/pubspec.yaml | 2 +- 9 files changed, 298 insertions(+), 158 deletions(-) create mode 100644 packages/pub_sub/lib/src/isolate/shared.dart diff --git a/.gitignore b/.gitignore index a67f375..2e43b8d 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,5 @@ doc/api/ !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json -.metals/ \ No newline at end of file +.metals/ +.DS_Store \ No newline at end of file diff --git a/packages/pub_sub/CHANGELOG.md b/packages/pub_sub/CHANGELOG.md index f0e042f..8861780 100644 --- a/packages/pub_sub/CHANGELOG.md +++ b/packages/pub_sub/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## 6.2.0 + +* Require Dart >= 3.2 +* Upgraded `lints` to 3.0.0 +* Refactored encode/decode message handling into `MessageHandler` + ## 6.1.0 * Upgraded `uuid` to 4.0.0 diff --git a/packages/pub_sub/README.md b/packages/pub_sub/README.md index bb4867b..1af71f7 100644 --- a/packages/pub_sub/README.md +++ b/packages/pub_sub/README.md @@ -14,7 +14,7 @@ Add `belatuk_pub_sub` as a dependency in your `pubspec.yaml` file: ```yaml dependencies: - belatuk_pub_sub: ^8.0.0 + belatuk_pub_sub: ^8.2.0 ``` Then, be sure to run `dart pub get` in your terminal. @@ -60,8 +60,7 @@ pub_sub.IsolateClient(null); ### Access Control The ID's of all *untrusted* clients who will connect to the server must be known at start-up time. -You may not register new clients after the server has started. This is mostly a security consideration; -mainly to make it impossible to register new clients, thus preventing malicious users from granting themselves additional privileges within the system. +You may not register new clients after the server has started. This is mostly a security consideration, to make it impossible to register new clients, thus preventing malicious users from granting themselves additional privileges within the system. ```dart import 'package:belatuk_pub_sub/belatuk_pub_sub.dart' as pub_sub; diff --git a/packages/pub_sub/example/main.dart b/packages/pub_sub/example/main.dart index 42c7727..de482e5 100644 --- a/packages/pub_sub/example/main.dart +++ b/packages/pub_sub/example/main.dart @@ -1,19 +1,20 @@ import 'dart:io'; import 'dart:isolate'; -import 'package:belatuk_pub_sub/isolate.dart' as pub_sub; -import 'package:belatuk_pub_sub/belatuk_pub_sub.dart' as pub_sub; +import 'package:belatuk_pub_sub/isolate.dart'; +import 'package:belatuk_pub_sub/belatuk_pub_sub.dart'; void main() async { // Easily bring up a server. - var adapter = pub_sub.IsolateAdapter(); - var server = pub_sub.Server([adapter]); + var adapter = IsolateAdapter(); + var server = Server([adapter]); // You then need to create a client that will connect to the adapter. // Every untrusted client in your application should be pre-registered. // // In the case of Isolates, however, those are always implicitly trusted. + print("Register Client"); for (var i = 0; i < Platform.numberOfProcessors - 1; i++) { - server.registerClient(pub_sub.ClientInfo('client$i')); + server.registerClient(ClientInfo('client$i')); } // Start the server. @@ -22,6 +23,7 @@ void main() async { // Next, let's start isolates that interact with the server. // // Fortunately, we can send SendPorts over Isolates, so this is no hassle. + print("Create Isolate"); for (var i = 0; i < Platform.numberOfProcessors - 1; i++) { await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); } @@ -32,7 +34,7 @@ void main() async { void isolateMain(List args) { // Isolates are always trusted, so technically we don't need to pass a client iD. - var client = pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); + var client = IsolateClient('client${args[0]}', args[1] as SendPort); // The client will connect automatically. In the meantime, we can start subscribing to events. client.subscribe('user::logged_in').then((sub) { diff --git a/packages/pub_sub/lib/src/isolate/client.dart b/packages/pub_sub/lib/src/isolate/client.dart index 4fab48e..cd1ce29 100644 --- a/packages/pub_sub/lib/src/isolate/client.dart +++ b/packages/pub_sub/lib/src/isolate/client.dart @@ -2,7 +2,9 @@ import 'dart:async'; import 'dart:collection'; import 'dart:isolate'; import 'package:uuid/uuid.dart'; -import '../../belatuk_pub_sub.dart'; + +import '../protocol/protocol.dart'; +import 'shared.dart'; /// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s. class IsolateClient extends Client { @@ -29,33 +31,39 @@ class IsolateClient extends Client { IsolateClient(String? clientId, this.serverSendPort) { _clientId = clientId; receivePort.listen((data) { - if (data is Map && data['request_id'] is String) { - var requestId = data['request_id'] as String?; - var c = _requests.remove(requestId); + if (data is Map) { + var (status, id, requestId, result, errorMessage) = + MessageHandler().decodeResponseMessage(data); - if (c != null && !c.isCompleted) { - if (data['status'] is! bool) { - c.completeError( - FormatException('The server sent an invalid response.')); - } else if (!(data['status'] as bool)) { - c.completeError(PubSubException(data['error_message']?.toString() ?? - 'The server sent a failure response, but did not provide an error message.')); - } else if (data['result'] is! Map) { - c.completeError(FormatException( - 'The server sent a success response, but did not include a result.')); - } else { - c.complete(data['result'] as Map?); + if (requestId != null) { + //var requestId = data['request_id'] as String?; + var c = _requests.remove(requestId); + + if (c != null && !c.isCompleted) { + //if (data['status'] is! bool) { + // c.completeError( + // FormatException('The server sent an invalid response.')); + //} else if (!(data['status'] as bool)) { + if (!status) { + c.completeError(PubSubException(errorMessage ?? + 'The server sent a failure response, but did not provide an error message.')); + } else if (result is! Map) { + c.completeError(FormatException( + 'The server sent a success response, but did not include a result.')); + } else { + c.complete(result); + } } - } - } else if (data is Map && data['id'] is String && _id == null) { - _id = data['id'] as String?; + } else if (id != null && _id == null) { + _id = id; - for (var c in _onConnect) { - if (!c.isCompleted) c.complete(_id); - } + for (var c in _onConnect) { + if (!c.isCompleted) c.complete(_id); + } - _onConnect.clear(); - } else if (data is List && data.length == 2 && data[0] is String) { + _onConnect.clear(); + } + } else if (data is List) { var eventName = data[0] as String; var event = data[1]; for (var s in _subscriptions.where((s) => s.eventName == eventName)) { @@ -82,18 +90,13 @@ class IsolateClient extends Client { var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; - serverSendPort.send({ - 'id': _id, - 'request_id': requestId, - 'method': 'publish', - 'params': { - 'client_id': clientId, - 'event_name': eventName, - 'value': value - } - }); + serverSendPort.send(MessageHandler().encodePublishRequestMessage( + _id, requestId, clientId, eventName, value)); + return c.future.then((result) { - _clientId = result['client_id'] as String?; + var (_, clientId) = MessageHandler() + .decodePublishResponseMessage(result as Map); + _clientId = clientId; }); }); } @@ -104,16 +107,14 @@ class IsolateClient extends Client { var c = Completer(); var requestId = _uuid.v4(); _requests[requestId] = c; - serverSendPort.send({ - 'id': _id, - 'request_id': requestId, - 'method': 'subscribe', - 'params': {'client_id': clientId, 'event_name': eventName} - }); + serverSendPort.send(MessageHandler().encodeSubscriptionRequestMessage( + _id, requestId, clientId, eventName)); + return c.future.then((result) { - _clientId = result['client_id'] as String?; - var s = _IsolateClientSubscription( - eventName, result['subscription_id'] as String?, this); + var (subcriptionId, clientId) = MessageHandler() + .decodeSubscriptionResponseMessage(result as Map); + _clientId = clientId; + var s = _IsolateClientSubscription(eventName, subcriptionId, this); _subscriptions.add(s); return s; }); @@ -171,14 +172,11 @@ class _IsolateClientSubscription extends ClientSubscription { var c = Completer(); var requestId = client._uuid.v4(); client._requests[requestId] = c; - client.serverSendPort.send({ - 'id': client._id, - 'request_id': requestId, - 'method': 'unsubscribe', - 'params': {'client_id': client.clientId, 'subscription_id': id} - }); + client.serverSendPort.send(MessageHandler() + .encodeUnsubscriptionRequestMessage( + client._id, requestId, client.clientId, id)); - return c.future.then((_) { + return c.future.then((result) { _close(); }); }); diff --git a/packages/pub_sub/lib/src/isolate/server.dart b/packages/pub_sub/lib/src/isolate/server.dart index bd9fadf..95c9f79 100644 --- a/packages/pub_sub/lib/src/isolate/server.dart +++ b/packages/pub_sub/lib/src/isolate/server.dart @@ -1,7 +1,8 @@ import 'dart:async'; import 'dart:isolate'; import 'package:uuid/uuid.dart'; -import '../../belatuk_pub_sub.dart'; +import '../protocol/protocol.dart'; +import 'shared.dart'; /// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s. class IsolateAdapter extends Adapter { @@ -42,81 +43,50 @@ class IsolateAdapter extends Adapter { if (data is SendPort) { var id = _uuid.v4(); _clients[id] = data; - data.send({'status': true, 'id': id}); - } else if (data is Map && - data['id'] is String && - data['request_id'] is String && - data['method'] is String && - data['params'] is Map) { - var id = data['id'] as String?, - requestId = data['request_id'] as String?, - method = data['method'] as String?; - var params = data['params'] as Map?; - var sp = _clients[id!]; + data.send(MessageHandler().encodeSendPortResponseMessage(id)); + } else if (data is Map) { + var (id, method, requestId, params) = + MessageHandler().decodeRequestMessage(data); + var (clientId, eventName, subscriptionId, value) = + MessageHandler().decodeRequestParams(params); + var sp = _clients[id]; if (sp == null) { - // There's nobody to respond to, so don't send anything to anyone. Oops. - } else if (method == 'publish') { - if (_isValidClientId(params!['client_id']) && - params['event_name'] is String && - params.containsKey('value')) { - var clientId = params['client_id'] as String?, - eventName = params['event_name'] as String?; - var value = params['value']; - var rq = _IsolatePublishRequestImpl( - requestId, clientId, eventName, value, sp); - _onPublish.add(rq); - } else { - sp.send({ - 'status': false, - 'request_id': requestId, - 'error_message': 'Expected client_id, event_name, and value.' - }); + // There's nobody to respond to, so don't send anything to anyone + return; + } + + if (method == 'publish') { + if (eventName == null || value == null) { + sp.send(MessageHandler().encodePublishResponseError(requestId)); } + var rq = _IsolatePublishRequestImpl( + requestId, clientId, eventName, value, sp); + _onPublish.add(rq); } else if (method == 'subscribe') { - if (_isValidClientId(params!['client_id']) && - params['event_name'] is String) { - var clientId = params['client_id'] as String?, - eventName = params['event_name'] as String?; - var rq = _IsolateSubscriptionRequestImpl( - clientId, eventName, sp, requestId, _uuid); - _onSubscribe.add(rq); - } else { - sp.send({ - 'status': false, - 'request_id': requestId, - 'error_message': 'Expected client_id, and event_name.' - }); + if (eventName == null) { + sp.send( + MessageHandler().encodeSubscriptionResponseError(requestId)); } + var rq = _IsolateSubscriptionRequestImpl( + clientId, eventName, sp, requestId, _uuid); + _onSubscribe.add(rq); } else if (method == 'unsubscribe') { - if (_isValidClientId(params!['client_id']) && - params['subscription_id'] is String) { - var clientId = params['client_id'] as String?, - subscriptionId = params['subscription_id'] as String?; - var rq = _IsolateUnsubscriptionRequestImpl( - clientId, subscriptionId, sp, requestId); - _onUnsubscribe.add(rq); - } else { - sp.send({ - 'status': false, - 'request_id': requestId, - 'error_message': 'Expected client_id, and subscription_id.' - }); + if (subscriptionId == null) { + sp.send( + MessageHandler().encodeUnsubscriptionResponseError(requestId)); } + var rq = _IsolateUnsubscriptionRequestImpl( + clientId, subscriptionId, sp, requestId); + _onUnsubscribe.add(rq); } else { - sp.send({ - 'status': false, - 'request_id': requestId, - 'error_message': - 'Unrecognized method "$method". Or, you omitted id, request_id, method, or params.' - }); + sp.send(MessageHandler() + .encodeUnknownMethodResponseError(requestId, method)); } } }); } - bool _isValidClientId(id) => id == null || id is String; - @override bool isTrustedPublishRequest(PublishRequest request) { // Isolate clients are considered trusted, because they are @@ -138,7 +108,7 @@ class _IsolatePublishRequestImpl extends PublishRequest { final String? eventName; @override - final dynamic value; + final Object? value; final SendPort sendPort; @@ -148,24 +118,15 @@ class _IsolatePublishRequestImpl extends PublishRequest { this.requestId, this.clientId, this.eventName, this.value, this.sendPort); @override - void accept(PublishResponse response) { - sendPort.send({ - 'status': true, - 'request_id': requestId, - 'result': { - 'listeners': response.listeners, - 'client_id': response.clientId - } - }); + void reject(String errorMessage) { + sendPort.send(MessageHandler() + .encodePublishResponseError(requestId, errorMessage: errorMessage)); } @override - void reject(String errorMessage) { - sendPort.send({ - 'status': false, - 'request_id': requestId, - 'error_message': errorMessage - }); + void accept(PublishResponse response) { + sendPort.send(MessageHandler().encodePublishResponseMessage2( + requestId, response.listeners, response.clientId)); } } @@ -187,21 +148,15 @@ class _IsolateSubscriptionRequestImpl extends SubscriptionRequest { @override void reject(String errorMessage) { - sendPort.send({ - 'status': false, - 'request_id': requestId, - 'error_message': errorMessage - }); + sendPort.send(MessageHandler().encodeSubscriptionResponseError(requestId, + errorMessage: errorMessage)); } @override FutureOr accept(String? clientId) { var id = _uuid.v4(); - sendPort.send({ - 'status': true, - 'request_id': requestId, - 'result': {'subscription_id': id, 'client_id': clientId} - }); + sendPort.send(MessageHandler() + .encodeSubscriptionResponseMessage(requestId, id, clientId)); return _IsolateSubscriptionImpl(clientId, id, eventName, sendPort); } } @@ -239,15 +194,13 @@ class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest { @override void reject(String errorMessage) { - sendPort.send({ - 'status': false, - 'request_id': requestId, - 'error_message': errorMessage - }); + sendPort.send(MessageHandler().encodeUnsubscriptionResponseError(requestId, + errorMessage: errorMessage)); } @override void accept() { - sendPort.send({'status': true, 'request_id': requestId, 'result': {}}); + sendPort + .send(MessageHandler().encodeUnsubscriptionResponseMessage(requestId)); } } diff --git a/packages/pub_sub/lib/src/isolate/shared.dart b/packages/pub_sub/lib/src/isolate/shared.dart new file mode 100644 index 0000000..d8f1728 --- /dev/null +++ b/packages/pub_sub/lib/src/isolate/shared.dart @@ -0,0 +1,181 @@ +class MessageHandler { + static const _requestId = 'request_id'; + static const _method = 'method'; + static const _clientId = 'client_id'; + static const _eventName = 'event_name'; + static const _subscriptionId = 'subscription_id'; + static const _errorMessage = 'error_message'; + static const _value = 'value'; + static const _id = 'id'; + static const _params = 'params'; + static const _status = 'status'; + static const _result = 'result'; + static const _listeners = 'listeners'; + + static const _publishErrorMsg = 'Expected client_id, event_name, and value'; + static const _subscribeErrorMsg = 'Expected client_id, and event_name'; + static const _unsubscribeErrorMsg = 'Expected client_id, and subscription_id'; + + const MessageHandler(); + + Map encodePublishResponseError(String? requestId, + {String errorMessage = _publishErrorMsg}) { + return _encodeResponseError(requestId, errorMessage); + } + + Map encodeSubscriptionResponseError(String? requestId, + {String errorMessage = _subscribeErrorMsg}) { + return _encodeResponseError(requestId, errorMessage); + } + + Map encodeUnsubscriptionResponseError(String? requestId, + {String errorMessage = _unsubscribeErrorMsg}) { + return _encodeResponseError(requestId, errorMessage); + } + + Map encodeUnknownMethodResponseError( + String? requestId, String method) { + var unknownMethodErrorMsg = + 'Unrecognized method "$method" or you have omitted id, request_id, method, or params'; + + return _encodeResponseError(requestId, unknownMethodErrorMsg); + } + + Map _encodeResponseError(String? requestId, String message) { + return { + _status: false, + _requestId: requestId ?? '', + _errorMessage: message + }; + } + + Map encodeEventMessage(String? requestId, Object message) { + return {_status: true, _requestId: requestId ?? '', _result: message}; + } + + Map encodeSubscriptionResponseMessage( + String? requestId, String? subscriptionId, String? clientId) { + return { + _status: true, + _requestId: requestId ?? '', + _result: {_subscriptionId: subscriptionId, _clientId: clientId} + }; + } + + (String?, String?) decodeSubscriptionResponseMessage( + Map message) { + var subscriptionId = message[_subscriptionId] as String?; + var clientId = message[_clientId] as String?; + + return (subscriptionId, clientId); + } + + Map encodeUnsubscriptionResponseMessage(String? requestId) { + return {_status: true, _requestId: requestId, _result: {}}; + } + + (bool, String?, Object?, String?) decodeUnsubscriptionResponseMessage( + Map message) { + var status = message[_status] as bool? ?? false; + var requestId = message[_requestId] as String?; + var result = message[_result]; + var errorMessage = message[_errorMessage] as String?; + + return (status, requestId, result, errorMessage); + } + + Map encodePublishResponseMessage2( + String? requestId, int listeners, String? clientId) { + return { + _status: true, + _requestId: requestId, + _result: {_listeners: listeners, _clientId: clientId} + }; + } + + (int, String?) decodePublishResponseMessage(Map message) { + var listeners = message[_listeners] as int; + var clientId = message[_clientId] as String?; + + return (listeners, clientId); + } + + Map encodePublishResponseMessage(String? id, + String? requestId, String? clientId, String? eventName, Object? value) { + return { + _id: id, + _requestId: requestId, + _method: 'publish', + _params: {_clientId: clientId, _eventName: eventName, _value: value} + }; + } + + Map encodeResponseMessage( + String? requestId, Object message) { + return {_status: true, _requestId: requestId ?? '', _result: message}; + } + + (bool, String?, String?, Object?, String?) decodeResponseMessage( + Map message) { + var id = message[_id] as String?; + var status = message[_status] as bool? ?? false; + var requestId = message[_requestId] as String?; + var result = message[_result]; + var errorMessage = message[_errorMessage] as String?; + + return (status, id, requestId, result, errorMessage); + } + + (String, String, String, Map) decodeRequestMessage( + Map message) { + var id = message[_id] as String? ?? ''; + var method = message[_method] as String? ?? ''; + var requestId = message[_requestId] as String? ?? ''; + var params = message[_params] as Map? ?? {}; + + return (id, method, requestId, params); + } + + Map encodeSubscriptionRequestMessage( + String? id, String? requestId, String? clientId, String? eventName) { + return { + _id: id, + _requestId: requestId, + _method: 'subscribe', + _params: {_clientId: clientId, _eventName: eventName} + }; + } + + Map encodeUnsubscriptionRequestMessage( + String? id, String? requestId, String? clientId, String? subscriptionId) { + return { + _id: id, + _requestId: requestId, + _method: 'unsubscribe', + _params: {_clientId: clientId, _subscriptionId: subscriptionId} + }; + } + + Map encodePublishRequestMessage(String? id, + String? requestId, String? clientId, String? eventName, Object? value) { + return { + _id: id, + _requestId: requestId, + _method: 'publish', + _params: {_clientId: clientId, _eventName: eventName, _value: value} + }; + } + + (String?, String?, String?, Object?) decodeRequestParams( + Map params) { + var clientId = params[_clientId] as String?; + var eventName = params[_eventName] as String?; + var value = params[_value]; + var subscriptionId = params[_subscriptionId] as String?; + return (clientId, eventName, subscriptionId, value); + } + + Map encodeSendPortResponseMessage(String id) { + return {_status: true, _id: id}; + } +} diff --git a/packages/pub_sub/lib/src/protocol/server/publish.dart b/packages/pub_sub/lib/src/protocol/server/publish.dart index b6cbf95..8aa0d76 100644 --- a/packages/pub_sub/lib/src/protocol/server/publish.dart +++ b/packages/pub_sub/lib/src/protocol/server/publish.dart @@ -7,7 +7,7 @@ abstract class PublishRequest { String? get eventName; /// The value to be published as an event. - dynamic get value; + Object? get value; /// Accept the request, with a response. void accept(PublishResponse response); diff --git a/packages/pub_sub/pubspec.yaml b/packages/pub_sub/pubspec.yaml index ca587e4..1b22f2d 100644 --- a/packages/pub_sub/pubspec.yaml +++ b/packages/pub_sub/pubspec.yaml @@ -1,5 +1,5 @@ name: belatuk_pub_sub -version: 6.1.0 +version: 6.2.0 description: Keep application instances in sync with a simple pub/sub API. homepage: https://github.com/dart-backend/belatuk-common-utilities/tree/main/packages/pub_sub environment: