Updated pub_sub

This commit is contained in:
thomashii@dukefirehawk.com 2023-12-12 09:30:25 +08:00
parent b6c1ba243a
commit dfccf23629
9 changed files with 298 additions and 158 deletions

1
.gitignore vendored
View file

@ -27,3 +27,4 @@ doc/api/
!.vscode/launch.json !.vscode/launch.json
!.vscode/extensions.json !.vscode/extensions.json
.metals/ .metals/
.DS_Store

View file

@ -1,5 +1,11 @@
# Change Log # Change Log
## 6.2.0
* Require Dart >= 3.2
* Upgraded `lints` to 3.0.0
* Refactored encode/decode message handling into `MessageHandler`
## 6.1.0 ## 6.1.0
* Upgraded `uuid` to 4.0.0 * Upgraded `uuid` to 4.0.0

View file

@ -14,7 +14,7 @@ Add `belatuk_pub_sub` as a dependency in your `pubspec.yaml` file:
```yaml ```yaml
dependencies: dependencies:
belatuk_pub_sub: ^8.0.0 belatuk_pub_sub: ^8.2.0
``` ```
Then, be sure to run `dart pub get` in your terminal. Then, be sure to run `dart pub get` in your terminal.
@ -60,8 +60,7 @@ pub_sub.IsolateClient(null);
### Access Control ### Access Control
The ID's of all *untrusted* clients who will connect to the server must be known at start-up time. The ID's of all *untrusted* clients who will connect to the server must be known at start-up time.
You may not register new clients after the server has started. This is mostly a security consideration; You may not register new clients after the server has started. This is mostly a security consideration, to make it impossible to register new clients, thus preventing malicious users from granting themselves additional privileges within the system.
mainly to make it impossible to register new clients, thus preventing malicious users from granting themselves additional privileges within the system.
```dart ```dart
import 'package:belatuk_pub_sub/belatuk_pub_sub.dart' as pub_sub; import 'package:belatuk_pub_sub/belatuk_pub_sub.dart' as pub_sub;

View file

@ -1,19 +1,20 @@
import 'dart:io'; import 'dart:io';
import 'dart:isolate'; import 'dart:isolate';
import 'package:belatuk_pub_sub/isolate.dart' as pub_sub; import 'package:belatuk_pub_sub/isolate.dart';
import 'package:belatuk_pub_sub/belatuk_pub_sub.dart' as pub_sub; import 'package:belatuk_pub_sub/belatuk_pub_sub.dart';
void main() async { void main() async {
// Easily bring up a server. // Easily bring up a server.
var adapter = pub_sub.IsolateAdapter(); var adapter = IsolateAdapter();
var server = pub_sub.Server([adapter]); var server = Server([adapter]);
// You then need to create a client that will connect to the adapter. // You then need to create a client that will connect to the adapter.
// Every untrusted client in your application should be pre-registered. // Every untrusted client in your application should be pre-registered.
// //
// In the case of Isolates, however, those are always implicitly trusted. // In the case of Isolates, however, those are always implicitly trusted.
print("Register Client");
for (var i = 0; i < Platform.numberOfProcessors - 1; i++) { for (var i = 0; i < Platform.numberOfProcessors - 1; i++) {
server.registerClient(pub_sub.ClientInfo('client$i')); server.registerClient(ClientInfo('client$i'));
} }
// Start the server. // Start the server.
@ -22,6 +23,7 @@ void main() async {
// Next, let's start isolates that interact with the server. // Next, let's start isolates that interact with the server.
// //
// Fortunately, we can send SendPorts over Isolates, so this is no hassle. // Fortunately, we can send SendPorts over Isolates, so this is no hassle.
print("Create Isolate");
for (var i = 0; i < Platform.numberOfProcessors - 1; i++) { for (var i = 0; i < Platform.numberOfProcessors - 1; i++) {
await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);
} }
@ -32,7 +34,7 @@ void main() async {
void isolateMain(List args) { void isolateMain(List args) {
// Isolates are always trusted, so technically we don't need to pass a client iD. // Isolates are always trusted, so technically we don't need to pass a client iD.
var client = pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); var client = IsolateClient('client${args[0]}', args[1] as SendPort);
// The client will connect automatically. In the meantime, we can start subscribing to events. // The client will connect automatically. In the meantime, we can start subscribing to events.
client.subscribe('user::logged_in').then((sub) { client.subscribe('user::logged_in').then((sub) {

View file

@ -2,7 +2,9 @@ import 'dart:async';
import 'dart:collection'; import 'dart:collection';
import 'dart:isolate'; import 'dart:isolate';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../belatuk_pub_sub.dart';
import '../protocol/protocol.dart';
import 'shared.dart';
/// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s. /// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateClient extends Client { class IsolateClient extends Client {
@ -29,33 +31,39 @@ class IsolateClient extends Client {
IsolateClient(String? clientId, this.serverSendPort) { IsolateClient(String? clientId, this.serverSendPort) {
_clientId = clientId; _clientId = clientId;
receivePort.listen((data) { receivePort.listen((data) {
if (data is Map && data['request_id'] is String) { if (data is Map<String, Object?>) {
var requestId = data['request_id'] as String?; var (status, id, requestId, result, errorMessage) =
var c = _requests.remove(requestId); MessageHandler().decodeResponseMessage(data);
if (c != null && !c.isCompleted) { if (requestId != null) {
if (data['status'] is! bool) { //var requestId = data['request_id'] as String?;
c.completeError( var c = _requests.remove(requestId);
FormatException('The server sent an invalid response.'));
} else if (!(data['status'] as bool)) { if (c != null && !c.isCompleted) {
c.completeError(PubSubException(data['error_message']?.toString() ?? //if (data['status'] is! bool) {
'The server sent a failure response, but did not provide an error message.')); // c.completeError(
} else if (data['result'] is! Map) { // FormatException('The server sent an invalid response.'));
c.completeError(FormatException( //} else if (!(data['status'] as bool)) {
'The server sent a success response, but did not include a result.')); if (!status) {
} else { c.completeError(PubSubException(errorMessage ??
c.complete(data['result'] as Map?); 'The server sent a failure response, but did not provide an error message.'));
} else if (result is! Map) {
c.completeError(FormatException(
'The server sent a success response, but did not include a result.'));
} else {
c.complete(result);
}
} }
} } else if (id != null && _id == null) {
} else if (data is Map && data['id'] is String && _id == null) { _id = id;
_id = data['id'] as String?;
for (var c in _onConnect) { for (var c in _onConnect) {
if (!c.isCompleted) c.complete(_id); if (!c.isCompleted) c.complete(_id);
} }
_onConnect.clear(); _onConnect.clear();
} else if (data is List && data.length == 2 && data[0] is String) { }
} else if (data is List) {
var eventName = data[0] as String; var eventName = data[0] as String;
var event = data[1]; var event = data[1];
for (var s in _subscriptions.where((s) => s.eventName == eventName)) { for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
@ -82,18 +90,13 @@ class IsolateClient extends Client {
var c = Completer<Map>(); var c = Completer<Map>();
var requestId = _uuid.v4(); var requestId = _uuid.v4();
_requests[requestId] = c; _requests[requestId] = c;
serverSendPort.send({ serverSendPort.send(MessageHandler().encodePublishRequestMessage(
'id': _id, _id, requestId, clientId, eventName, value));
'request_id': requestId,
'method': 'publish',
'params': {
'client_id': clientId,
'event_name': eventName,
'value': value
}
});
return c.future.then((result) { return c.future.then((result) {
_clientId = result['client_id'] as String?; var (_, clientId) = MessageHandler()
.decodePublishResponseMessage(result as Map<String, Object?>);
_clientId = clientId;
}); });
}); });
} }
@ -104,16 +107,14 @@ class IsolateClient extends Client {
var c = Completer<Map>(); var c = Completer<Map>();
var requestId = _uuid.v4(); var requestId = _uuid.v4();
_requests[requestId] = c; _requests[requestId] = c;
serverSendPort.send({ serverSendPort.send(MessageHandler().encodeSubscriptionRequestMessage(
'id': _id, _id, requestId, clientId, eventName));
'request_id': requestId,
'method': 'subscribe',
'params': {'client_id': clientId, 'event_name': eventName}
});
return c.future.then<ClientSubscription>((result) { return c.future.then<ClientSubscription>((result) {
_clientId = result['client_id'] as String?; var (subcriptionId, clientId) = MessageHandler()
var s = _IsolateClientSubscription( .decodeSubscriptionResponseMessage(result as Map<String, Object?>);
eventName, result['subscription_id'] as String?, this); _clientId = clientId;
var s = _IsolateClientSubscription(eventName, subcriptionId, this);
_subscriptions.add(s); _subscriptions.add(s);
return s; return s;
}); });
@ -171,14 +172,11 @@ class _IsolateClientSubscription extends ClientSubscription {
var c = Completer<Map>(); var c = Completer<Map>();
var requestId = client._uuid.v4(); var requestId = client._uuid.v4();
client._requests[requestId] = c; client._requests[requestId] = c;
client.serverSendPort.send({ client.serverSendPort.send(MessageHandler()
'id': client._id, .encodeUnsubscriptionRequestMessage(
'request_id': requestId, client._id, requestId, client.clientId, id));
'method': 'unsubscribe',
'params': {'client_id': client.clientId, 'subscription_id': id}
});
return c.future.then((_) { return c.future.then((result) {
_close(); _close();
}); });
}); });

View file

@ -1,7 +1,8 @@
import 'dart:async'; import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../belatuk_pub_sub.dart'; import '../protocol/protocol.dart';
import 'shared.dart';
/// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s. /// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateAdapter extends Adapter { class IsolateAdapter extends Adapter {
@ -42,81 +43,50 @@ class IsolateAdapter extends Adapter {
if (data is SendPort) { if (data is SendPort) {
var id = _uuid.v4(); var id = _uuid.v4();
_clients[id] = data; _clients[id] = data;
data.send({'status': true, 'id': id}); data.send(MessageHandler().encodeSendPortResponseMessage(id));
} else if (data is Map && } else if (data is Map<String, Object?>) {
data['id'] is String && var (id, method, requestId, params) =
data['request_id'] is String && MessageHandler().decodeRequestMessage(data);
data['method'] is String && var (clientId, eventName, subscriptionId, value) =
data['params'] is Map) { MessageHandler().decodeRequestParams(params);
var id = data['id'] as String?,
requestId = data['request_id'] as String?,
method = data['method'] as String?;
var params = data['params'] as Map?;
var sp = _clients[id!];
var sp = _clients[id];
if (sp == null) { if (sp == null) {
// There's nobody to respond to, so don't send anything to anyone. Oops. // There's nobody to respond to, so don't send anything to anyone
} else if (method == 'publish') { return;
if (_isValidClientId(params!['client_id']) && }
params['event_name'] is String &&
params.containsKey('value')) { if (method == 'publish') {
var clientId = params['client_id'] as String?, if (eventName == null || value == null) {
eventName = params['event_name'] as String?; sp.send(MessageHandler().encodePublishResponseError(requestId));
var value = params['value'];
var rq = _IsolatePublishRequestImpl(
requestId, clientId, eventName, value, sp);
_onPublish.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, event_name, and value.'
});
} }
var rq = _IsolatePublishRequestImpl(
requestId, clientId, eventName, value, sp);
_onPublish.add(rq);
} else if (method == 'subscribe') { } else if (method == 'subscribe') {
if (_isValidClientId(params!['client_id']) && if (eventName == null) {
params['event_name'] is String) { sp.send(
var clientId = params['client_id'] as String?, MessageHandler().encodeSubscriptionResponseError(requestId));
eventName = params['event_name'] as String?;
var rq = _IsolateSubscriptionRequestImpl(
clientId, eventName, sp, requestId, _uuid);
_onSubscribe.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, and event_name.'
});
} }
var rq = _IsolateSubscriptionRequestImpl(
clientId, eventName, sp, requestId, _uuid);
_onSubscribe.add(rq);
} else if (method == 'unsubscribe') { } else if (method == 'unsubscribe') {
if (_isValidClientId(params!['client_id']) && if (subscriptionId == null) {
params['subscription_id'] is String) { sp.send(
var clientId = params['client_id'] as String?, MessageHandler().encodeUnsubscriptionResponseError(requestId));
subscriptionId = params['subscription_id'] as String?;
var rq = _IsolateUnsubscriptionRequestImpl(
clientId, subscriptionId, sp, requestId);
_onUnsubscribe.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, and subscription_id.'
});
} }
var rq = _IsolateUnsubscriptionRequestImpl(
clientId, subscriptionId, sp, requestId);
_onUnsubscribe.add(rq);
} else { } else {
sp.send({ sp.send(MessageHandler()
'status': false, .encodeUnknownMethodResponseError(requestId, method));
'request_id': requestId,
'error_message':
'Unrecognized method "$method". Or, you omitted id, request_id, method, or params.'
});
} }
} }
}); });
} }
bool _isValidClientId(id) => id == null || id is String;
@override @override
bool isTrustedPublishRequest(PublishRequest request) { bool isTrustedPublishRequest(PublishRequest request) {
// Isolate clients are considered trusted, because they are // Isolate clients are considered trusted, because they are
@ -138,7 +108,7 @@ class _IsolatePublishRequestImpl extends PublishRequest {
final String? eventName; final String? eventName;
@override @override
final dynamic value; final Object? value;
final SendPort sendPort; final SendPort sendPort;
@ -148,24 +118,15 @@ class _IsolatePublishRequestImpl extends PublishRequest {
this.requestId, this.clientId, this.eventName, this.value, this.sendPort); this.requestId, this.clientId, this.eventName, this.value, this.sendPort);
@override @override
void accept(PublishResponse response) { void reject(String errorMessage) {
sendPort.send({ sendPort.send(MessageHandler()
'status': true, .encodePublishResponseError(requestId, errorMessage: errorMessage));
'request_id': requestId,
'result': {
'listeners': response.listeners,
'client_id': response.clientId
}
});
} }
@override @override
void reject(String errorMessage) { void accept(PublishResponse response) {
sendPort.send({ sendPort.send(MessageHandler().encodePublishResponseMessage2(
'status': false, requestId, response.listeners, response.clientId));
'request_id': requestId,
'error_message': errorMessage
});
} }
} }
@ -187,21 +148,15 @@ class _IsolateSubscriptionRequestImpl extends SubscriptionRequest {
@override @override
void reject(String errorMessage) { void reject(String errorMessage) {
sendPort.send({ sendPort.send(MessageHandler().encodeSubscriptionResponseError(requestId,
'status': false, errorMessage: errorMessage));
'request_id': requestId,
'error_message': errorMessage
});
} }
@override @override
FutureOr<Subscription> accept(String? clientId) { FutureOr<Subscription> accept(String? clientId) {
var id = _uuid.v4(); var id = _uuid.v4();
sendPort.send({ sendPort.send(MessageHandler()
'status': true, .encodeSubscriptionResponseMessage(requestId, id, clientId));
'request_id': requestId,
'result': {'subscription_id': id, 'client_id': clientId}
});
return _IsolateSubscriptionImpl(clientId, id, eventName, sendPort); return _IsolateSubscriptionImpl(clientId, id, eventName, sendPort);
} }
} }
@ -239,15 +194,13 @@ class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest {
@override @override
void reject(String errorMessage) { void reject(String errorMessage) {
sendPort.send({ sendPort.send(MessageHandler().encodeUnsubscriptionResponseError(requestId,
'status': false, errorMessage: errorMessage));
'request_id': requestId,
'error_message': errorMessage
});
} }
@override @override
void accept() { void accept() {
sendPort.send({'status': true, 'request_id': requestId, 'result': {}}); sendPort
.send(MessageHandler().encodeUnsubscriptionResponseMessage(requestId));
} }
} }

View file

@ -0,0 +1,181 @@
class MessageHandler {
static const _requestId = 'request_id';
static const _method = 'method';
static const _clientId = 'client_id';
static const _eventName = 'event_name';
static const _subscriptionId = 'subscription_id';
static const _errorMessage = 'error_message';
static const _value = 'value';
static const _id = 'id';
static const _params = 'params';
static const _status = 'status';
static const _result = 'result';
static const _listeners = 'listeners';
static const _publishErrorMsg = 'Expected client_id, event_name, and value';
static const _subscribeErrorMsg = 'Expected client_id, and event_name';
static const _unsubscribeErrorMsg = 'Expected client_id, and subscription_id';
const MessageHandler();
Map<String, dynamic> encodePublishResponseError(String? requestId,
{String errorMessage = _publishErrorMsg}) {
return _encodeResponseError(requestId, errorMessage);
}
Map<String, dynamic> encodeSubscriptionResponseError(String? requestId,
{String errorMessage = _subscribeErrorMsg}) {
return _encodeResponseError(requestId, errorMessage);
}
Map<String, dynamic> encodeUnsubscriptionResponseError(String? requestId,
{String errorMessage = _unsubscribeErrorMsg}) {
return _encodeResponseError(requestId, errorMessage);
}
Map<String, dynamic> encodeUnknownMethodResponseError(
String? requestId, String method) {
var unknownMethodErrorMsg =
'Unrecognized method "$method" or you have omitted id, request_id, method, or params';
return _encodeResponseError(requestId, unknownMethodErrorMsg);
}
Map<String, dynamic> _encodeResponseError(String? requestId, String message) {
return {
_status: false,
_requestId: requestId ?? '',
_errorMessage: message
};
}
Map<String, dynamic> encodeEventMessage(String? requestId, Object message) {
return {_status: true, _requestId: requestId ?? '', _result: message};
}
Map<String, dynamic> encodeSubscriptionResponseMessage(
String? requestId, String? subscriptionId, String? clientId) {
return {
_status: true,
_requestId: requestId ?? '',
_result: {_subscriptionId: subscriptionId, _clientId: clientId}
};
}
(String?, String?) decodeSubscriptionResponseMessage(
Map<String, Object?> message) {
var subscriptionId = message[_subscriptionId] as String?;
var clientId = message[_clientId] as String?;
return (subscriptionId, clientId);
}
Map<String, dynamic> encodeUnsubscriptionResponseMessage(String? requestId) {
return {_status: true, _requestId: requestId, _result: {}};
}
(bool, String?, Object?, String?) decodeUnsubscriptionResponseMessage(
Map<String, Object?> message) {
var status = message[_status] as bool? ?? false;
var requestId = message[_requestId] as String?;
var result = message[_result];
var errorMessage = message[_errorMessage] as String?;
return (status, requestId, result, errorMessage);
}
Map<String, Object?> encodePublishResponseMessage2(
String? requestId, int listeners, String? clientId) {
return {
_status: true,
_requestId: requestId,
_result: {_listeners: listeners, _clientId: clientId}
};
}
(int, String?) decodePublishResponseMessage(Map<String, Object?> message) {
var listeners = message[_listeners] as int;
var clientId = message[_clientId] as String?;
return (listeners, clientId);
}
Map<String, Object?> encodePublishResponseMessage(String? id,
String? requestId, String? clientId, String? eventName, Object? value) {
return {
_id: id,
_requestId: requestId,
_method: 'publish',
_params: {_clientId: clientId, _eventName: eventName, _value: value}
};
}
Map<String, dynamic> encodeResponseMessage(
String? requestId, Object message) {
return {_status: true, _requestId: requestId ?? '', _result: message};
}
(bool, String?, String?, Object?, String?) decodeResponseMessage(
Map<String, Object?> message) {
var id = message[_id] as String?;
var status = message[_status] as bool? ?? false;
var requestId = message[_requestId] as String?;
var result = message[_result];
var errorMessage = message[_errorMessage] as String?;
return (status, id, requestId, result, errorMessage);
}
(String, String, String, Map<String, Object?>) decodeRequestMessage(
Map<String, Object?> message) {
var id = message[_id] as String? ?? '';
var method = message[_method] as String? ?? '';
var requestId = message[_requestId] as String? ?? '';
var params = message[_params] as Map<String, Object?>? ?? {};
return (id, method, requestId, params);
}
Map<String, Object?> encodeSubscriptionRequestMessage(
String? id, String? requestId, String? clientId, String? eventName) {
return {
_id: id,
_requestId: requestId,
_method: 'subscribe',
_params: {_clientId: clientId, _eventName: eventName}
};
}
Map<String, Object?> encodeUnsubscriptionRequestMessage(
String? id, String? requestId, String? clientId, String? subscriptionId) {
return {
_id: id,
_requestId: requestId,
_method: 'unsubscribe',
_params: {_clientId: clientId, _subscriptionId: subscriptionId}
};
}
Map<String, Object?> encodePublishRequestMessage(String? id,
String? requestId, String? clientId, String? eventName, Object? value) {
return {
_id: id,
_requestId: requestId,
_method: 'publish',
_params: {_clientId: clientId, _eventName: eventName, _value: value}
};
}
(String?, String?, String?, Object?) decodeRequestParams(
Map<String, Object?> params) {
var clientId = params[_clientId] as String?;
var eventName = params[_eventName] as String?;
var value = params[_value];
var subscriptionId = params[_subscriptionId] as String?;
return (clientId, eventName, subscriptionId, value);
}
Map<String, Object> encodeSendPortResponseMessage(String id) {
return {_status: true, _id: id};
}
}

View file

@ -7,7 +7,7 @@ abstract class PublishRequest {
String? get eventName; String? get eventName;
/// The value to be published as an event. /// The value to be published as an event.
dynamic get value; Object? get value;
/// Accept the request, with a response. /// Accept the request, with a response.
void accept(PublishResponse response); void accept(PublishResponse response);

View file

@ -1,5 +1,5 @@
name: belatuk_pub_sub name: belatuk_pub_sub
version: 6.1.0 version: 6.2.0
description: Keep application instances in sync with a simple pub/sub API. description: Keep application instances in sync with a simple pub/sub API.
homepage: https://github.com/dart-backend/belatuk-common-utilities/tree/main/packages/pub_sub homepage: https://github.com/dart-backend/belatuk-common-utilities/tree/main/packages/pub_sub
environment: environment: