Migrated pub_sub

This commit is contained in:
thomashii@dukefirehawk.com 2021-05-01 09:54:59 +08:00
parent 9553e41a2b
commit 5994b1f405
11 changed files with 219 additions and 213 deletions

View file

@ -22,7 +22,7 @@
* Migrated jael to 4.0.0 (20/20 tests passed)
* Migrated jael_preprocessor to 3.0.0 (5/5 tests passed)
* Migrated angel_jael to 4.0.0 (1/1 test passed)
* Updated pub_sub to 3.0.0 (in progress)
* Updated pub_sub to 4.0.0 (16/16 tests passed)
* Updated production to 2.0.0 (in progress)
* Updated hot to 3.0.0 (in progress)
* Updated static to 3.0.0 (in progress)
@ -47,7 +47,7 @@
* Updated jael_preprocessor to 3.0.0 (5/5 tests passed)
* Updated test to 3.0.0 (1/1 tests passed)
* Updated angel_jael to 3.0.0 (1/1 tests passed, Issue with 2 dependencies)
* Added pub_sub and updated to 3.0.0 (8/16 tests passed)
* Added pub_sub and updated to 3.0.0 (16/16 tests passed)
* Updated production to 2.0.0 (0/0 tests passed)
* Updated hot to 3.0.0 (0/0 tests passed)
* Updated static to 3.0.0 (12/12 tests passed)

View file

@ -6,50 +6,49 @@ import '../../pub_sub.dart';
/// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateClient extends Client {
final Queue<Completer<String>> _onConnect = new Queue<Completer<String>>();
final Queue<Completer<String>> _onConnect = Queue<Completer<String>>();
final Map<String, Completer<Map>> _requests = {};
final List<_IsolateClientSubscription> _subscriptions = [];
final Uuid _uuid = new Uuid();
final Uuid _uuid = Uuid();
String _id;
String? _id;
/// The ID of the client we are authenticating as.
///
/// May be `null`, if and only if we are marked as a trusted source on
/// the server side.
String get clientId => _clientId;
String _clientId;
String? get clientId => _clientId;
String? _clientId;
/// A server's [SendPort] that messages should be sent to.
final SendPort serverSendPort;
/// A [ReceivePort] that receives messages from the server.
final ReceivePort receivePort = new ReceivePort();
final ReceivePort receivePort = ReceivePort();
IsolateClient(String clientId, this.serverSendPort) {
IsolateClient(String? clientId, this.serverSendPort) {
_clientId = clientId;
receivePort.listen((data) {
if (data is Map && data['request_id'] is String) {
var requestId = data['request_id'] as String;
var requestId = data['request_id'] as String?;
var c = _requests.remove(requestId);
if (c != null && !c.isCompleted) {
if (data['status'] is! bool) {
c.completeError(
new FormatException('The server sent an invalid response.'));
FormatException('The server sent an invalid response.'));
} else if (!(data['status'] as bool)) {
c.completeError(new PubSubException(data['error_message']
?.toString() ??
c.completeError(PubSubException(data['error_message']?.toString() ??
'The server sent a failure response, but did not provide an error message.'));
} else if (data['result'] is! Map) {
c.completeError(new FormatException(
c.completeError(FormatException(
'The server sent a success response, but did not include a result.'));
} else {
c.complete(data['result'] as Map);
c.complete(data['result'] as Map?);
}
}
} else if (data is Map && data['id'] is String && _id == null) {
_id = data['id'] as String;
_id = data['id'] as String?;
for (var c in _onConnect) {
if (!c.isCompleted) c.complete(_id);
@ -57,7 +56,8 @@ class IsolateClient extends Client {
_onConnect.clear();
} else if (data is List && data.length == 2 && data[0] is String) {
var eventName = data[0] as String, event = data[1];
var eventName = data[0] as String;
var event = data[1];
for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
if (!s._stream.isClosed) s._stream.add(event);
}
@ -66,11 +66,11 @@ class IsolateClient extends Client {
serverSendPort.send(receivePort.sendPort);
}
Future<T> _whenConnected<T>(FutureOr<T> callback()) {
if (_id != null)
return new Future<T>.sync(callback);
else {
var c = new Completer<String>();
Future<T> _whenConnected<T>(FutureOr<T> Function() callback) {
if (_id != null) {
return Future<T>.sync(callback);
} else {
var c = Completer<String>();
_onConnect.add(c);
return c.future.then<T>((_) => callback());
}
@ -79,7 +79,7 @@ class IsolateClient extends Client {
@override
Future publish(String eventName, value) {
return _whenConnected(() {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
serverSendPort.send({
@ -93,7 +93,7 @@ class IsolateClient extends Client {
}
});
return c.future.then((result) {
_clientId = result['client_id'] as String;
_clientId = result['client_id'] as String?;
});
});
}
@ -101,7 +101,7 @@ class IsolateClient extends Client {
@override
Future<ClientSubscription> subscribe(String eventName) {
return _whenConnected<ClientSubscription>(() {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
serverSendPort.send({
@ -111,9 +111,9 @@ class IsolateClient extends Client {
'params': {'client_id': clientId, 'event_name': eventName}
});
return c.future.then<ClientSubscription>((result) {
_clientId = result['client_id'] as String;
var s = new _IsolateClientSubscription(
eventName, result['subscription_id'] as String, this);
_clientId = result['client_id'] as String?;
var s = _IsolateClientSubscription(
eventName, result['subscription_id'] as String?, this);
_subscriptions.add(s);
return s;
});
@ -126,28 +126,30 @@ class IsolateClient extends Client {
for (var c in _onConnect) {
if (!c.isCompleted) {
c.completeError(new StateError(
c.completeError(StateError(
'The client was closed before the server ever accepted the connection.'));
}
}
for (var c in _requests.values) {
if (!c.isCompleted) {
c.completeError(new StateError(
c.completeError(StateError(
'The client was closed before the server responded to this request.'));
}
}
for (var s in _subscriptions) s._close();
for (var s in _subscriptions) {
s._close();
}
_requests.clear();
return new Future.value();
return Future.value();
}
}
class _IsolateClientSubscription extends ClientSubscription {
final StreamController _stream = new StreamController();
final String eventName, id;
final StreamController _stream = StreamController();
final String? eventName, id;
final IsolateClient client;
_IsolateClientSubscription(this.eventName, this.id, this.client);
@ -157,8 +159,8 @@ class _IsolateClientSubscription extends ClientSubscription {
}
@override
StreamSubscription listen(void onData(event),
{Function onError, void onDone(), bool cancelOnError}) {
StreamSubscription listen(void onData(event)?,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
return _stream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@ -166,7 +168,7 @@ class _IsolateClientSubscription extends ClientSubscription {
@override
Future unsubscribe() {
return client._whenConnected(() {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = client._uuid.v4();
client._requests[requestId] = c;
client.serverSendPort.send({

View file

@ -7,15 +7,15 @@ import '../../pub_sub.dart';
class IsolateAdapter extends Adapter {
final Map<String, SendPort> _clients = {};
final StreamController<PublishRequest> _onPublish =
new StreamController<PublishRequest>();
StreamController<PublishRequest>();
final StreamController<SubscriptionRequest> _onSubscribe =
new StreamController<SubscriptionRequest>();
StreamController<SubscriptionRequest>();
final StreamController<UnsubscriptionRequest> _onUnsubscribe =
new StreamController<UnsubscriptionRequest>();
final Uuid _uuid = new Uuid();
StreamController<UnsubscriptionRequest>();
final Uuid _uuid = Uuid();
/// A [ReceivePort] on which to listen for incoming data.
final ReceivePort receivePort = new ReceivePort();
final ReceivePort receivePort = ReceivePort();
@override
Stream<PublishRequest> get onPublish => _onPublish.stream;
@ -33,7 +33,7 @@ class IsolateAdapter extends Adapter {
_onPublish.close();
_onSubscribe.close();
_onUnsubscribe.close();
return new Future.value();
return Future.value();
}
@override
@ -48,22 +48,22 @@ class IsolateAdapter extends Adapter {
data['request_id'] is String &&
data['method'] is String &&
data['params'] is Map) {
var id = data['id'] as String,
requestId = data['request_id'] as String,
method = data['method'] as String;
var params = data['params'] as Map;
var sp = _clients[id];
var id = data['id'] as String?,
requestId = data['request_id'] as String?,
method = data['method'] as String?;
var params = data['params'] as Map?;
var sp = _clients[id!];
if (sp == null) {
// There's nobody to respond to, so don't send anything to anyone. Oops.
} else if (method == 'publish') {
if (_isValidClientId(params['client_id']) &&
if (_isValidClientId(params!['client_id']) &&
params['event_name'] is String &&
params.containsKey('value')) {
var clientId = params['client_id'] as String,
eventName = params['event_name'] as String;
var clientId = params['client_id'] as String?,
eventName = params['event_name'] as String?;
var value = params['value'];
var rq = new _IsolatePublishRequestImpl(
var rq = _IsolatePublishRequestImpl(
requestId, clientId, eventName, value, sp);
_onPublish.add(rq);
} else {
@ -74,11 +74,11 @@ class IsolateAdapter extends Adapter {
});
}
} else if (method == 'subscribe') {
if (_isValidClientId(params['client_id']) &&
if (_isValidClientId(params!['client_id']) &&
params['event_name'] is String) {
var clientId = params['client_id'] as String,
eventName = params['event_name'] as String;
var rq = new _IsolateSubscriptionRequestImpl(
var clientId = params['client_id'] as String?,
eventName = params['event_name'] as String?;
var rq = _IsolateSubscriptionRequestImpl(
clientId, eventName, sp, requestId, _uuid);
_onSubscribe.add(rq);
} else {
@ -89,11 +89,11 @@ class IsolateAdapter extends Adapter {
});
}
} else if (method == 'unsubscribe') {
if (_isValidClientId(params['client_id']) &&
if (_isValidClientId(params!['client_id']) &&
params['subscription_id'] is String) {
var clientId = params['client_id'] as String,
subscriptionId = params['subscription_id'] as String;
var rq = new _IsolateUnsubscriptionRequestImpl(
var clientId = params['client_id'] as String?,
subscriptionId = params['subscription_id'] as String?;
var rq = _IsolateUnsubscriptionRequestImpl(
clientId, subscriptionId, sp, requestId);
_onUnsubscribe.add(rq);
} else {
@ -132,17 +132,17 @@ class IsolateAdapter extends Adapter {
class _IsolatePublishRequestImpl extends PublishRequest {
@override
final String clientId;
final String? clientId;
@override
final String eventName;
final String? eventName;
@override
final value;
final SendPort sendPort;
final String requestId;
final String? requestId;
_IsolatePublishRequestImpl(
this.requestId, this.clientId, this.eventName, this.value, this.sendPort);
@ -171,14 +171,14 @@ class _IsolatePublishRequestImpl extends PublishRequest {
class _IsolateSubscriptionRequestImpl extends SubscriptionRequest {
@override
final String clientId;
final String? clientId;
@override
final String eventName;
final String? eventName;
final SendPort sendPort;
final String requestId;
final String? requestId;
final Uuid _uuid;
@ -195,22 +195,22 @@ class _IsolateSubscriptionRequestImpl extends SubscriptionRequest {
}
@override
FutureOr<Subscription> accept(String clientId) {
FutureOr<Subscription> accept(String? clientId) {
var id = _uuid.v4();
sendPort.send({
'status': true,
'request_id': requestId,
'result': {'subscription_id': id, 'client_id': clientId}
});
return new _IsolateSubscriptionImpl(clientId, id, eventName, sendPort);
return _IsolateSubscriptionImpl(clientId, id, eventName, sendPort);
}
}
class _IsolateSubscriptionImpl extends Subscription {
@override
final String clientId, id;
final String? clientId, id;
final String eventName;
final String? eventName;
final SendPort sendPort;
@ -225,14 +225,14 @@ class _IsolateSubscriptionImpl extends Subscription {
class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest {
@override
final String clientId;
final String? clientId;
@override
final String subscriptionId;
final String? subscriptionId;
final SendPort sendPort;
final String requestId;
final String? requestId;
_IsolateUnsubscriptionRequestImpl(
this.clientId, this.subscriptionId, this.sendPort, this.requestId);

View file

@ -8,43 +8,42 @@ import '../../pub_sub.dart';
class JsonRpc2Client extends Client {
final Map<String, Completer<Map>> _requests = {};
final List<_JsonRpc2ClientSubscription> _subscriptions = [];
final Uuid _uuid = new Uuid();
final Uuid _uuid = Uuid();
json_rpc_2.Peer _peer;
json_rpc_2.Peer? _peer;
/// The ID of the client we are authenticating as.
///
/// May be `null`, if and only if we are marked as a trusted source on
/// the server side.
String get clientId => _clientId;
String _clientId;
String? get clientId => _clientId;
String? _clientId;
JsonRpc2Client(String clientId, StreamChannel<String> channel) {
JsonRpc2Client(String? clientId, StreamChannel<String> channel) {
_clientId = clientId;
_peer = new json_rpc_2.Peer(channel);
_peer = json_rpc_2.Peer(channel);
_peer.registerMethod('event', (json_rpc_2.Parameters params) {
var eventName = params['event_name'].asString,
event = params['value'].value;
_peer!.registerMethod('event', (json_rpc_2.Parameters params) {
String? eventName = params['event_name'].asString;
var event = params['value'].value;
for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
if (!s._stream.isClosed) s._stream.add(event);
}
});
_peer.registerFallback((json_rpc_2.Parameters params) {
_peer!.registerFallback((json_rpc_2.Parameters params) {
var c = _requests.remove(params.method);
if (c == null)
throw new json_rpc_2.RpcException.methodNotFound(params.method);
else {
if (c == null) {
throw json_rpc_2.RpcException.methodNotFound(params.method);
} else {
var data = params.asMap;
if (data['status'] is! bool) {
c.completeError(
new FormatException('The server sent an invalid response.'));
FormatException('The server sent an invalid response.'));
} else if (!(data['status'] as bool)) {
c.completeError(new PubSubException(data['error_message']
?.toString() ??
c.completeError(PubSubException(data['error_message']?.toString() ??
'The server sent a failure response, but did not provide an error message.'));
} else {
c.complete(data);
@ -52,39 +51,39 @@ class JsonRpc2Client extends Client {
}
});
_peer.listen();
_peer!.listen();
}
@override
Future publish(String eventName, value) {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
_peer.sendNotification('publish', {
_peer!.sendNotification('publish', {
'request_id': requestId,
'client_id': clientId,
'event_name': eventName,
'value': value
});
return c.future.then((data) {
_clientId = data['result']['client_id'] as String;
_clientId = data['result']['client_id'] as String?;
});
}
@override
Future<ClientSubscription> subscribe(String eventName) {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
_peer.sendNotification('subscribe', {
_peer!.sendNotification('subscribe', {
'request_id': requestId,
'client_id': clientId,
'event_name': eventName
});
return c.future.then<ClientSubscription>((result) {
_clientId = result['client_id'] as String;
var s = new _JsonRpc2ClientSubscription(
eventName, result['subscription_id'] as String, this);
_clientId = result['client_id'] as String?;
var s = _JsonRpc2ClientSubscription(
eventName, result['subscription_id'] as String?, this);
_subscriptions.add(s);
return s;
});
@ -92,11 +91,11 @@ class JsonRpc2Client extends Client {
@override
Future close() {
if (_peer?.isClosed != true) _peer.close();
if (_peer?.isClosed != true) _peer!.close();
for (var c in _requests.values) {
if (!c.isCompleted) {
c.completeError(new StateError(
c.completeError(StateError(
'The client was closed before the server responded to this request.'));
}
}
@ -104,13 +103,13 @@ class JsonRpc2Client extends Client {
for (var s in _subscriptions) s._close();
_requests.clear();
return new Future.value();
return Future.value();
}
}
class _JsonRpc2ClientSubscription extends ClientSubscription {
final StreamController _stream = new StreamController();
final String eventName, id;
final StreamController _stream = StreamController();
final String? eventName, id;
final JsonRpc2Client client;
_JsonRpc2ClientSubscription(this.eventName, this.id, this.client);
@ -120,18 +119,18 @@ class _JsonRpc2ClientSubscription extends ClientSubscription {
}
@override
StreamSubscription listen(void onData(event),
{Function onError, void onDone(), bool cancelOnError}) {
StreamSubscription listen(void onData(event)?,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
return _stream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
Future unsubscribe() {
var c = new Completer<Map>();
var c = Completer<Map>();
var requestId = client._uuid.v4();
client._requests[requestId] = c;
client._peer.sendNotification('unsubscribe', {
client._peer!.sendNotification('unsubscribe', {
'request_id': requestId,
'client_id': client.clientId,
'subscription_id': id

View file

@ -7,16 +7,16 @@ import '../../pub_sub.dart';
/// A [Adapter] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Adapter extends Adapter {
final StreamController<PublishRequest> _onPublish =
new StreamController<PublishRequest>();
StreamController<PublishRequest>();
final StreamController<SubscriptionRequest> _onSubscribe =
new StreamController<SubscriptionRequest>();
StreamController<SubscriptionRequest>();
final StreamController<UnsubscriptionRequest> _onUnsubscribe =
new StreamController<UnsubscriptionRequest>();
StreamController<UnsubscriptionRequest>();
final List<json_rpc_2.Peer> _peers = [];
final Uuid _uuid = new Uuid();
final Uuid _uuid = Uuid();
json_rpc_2.Peer _peer;
json_rpc_2.Peer? _peer;
/// A [Stream] of incoming clients, who can both send and receive string data.
final Stream<StreamChannel<String>> clientStream;
@ -44,10 +44,10 @@ class JsonRpc2Adapter extends Adapter {
Future.wait(_peers.where((s) => !s.isClosed).map((s) => s.close()))
.then((_) => _peers.clear());
return new Future.value();
return Future.value();
}
String _getClientId(json_rpc_2.Parameters params) {
String? _getClientId(json_rpc_2.Parameters params) {
try {
return params['client_id'].asString;
} catch (_) {
@ -58,14 +58,14 @@ class JsonRpc2Adapter extends Adapter {
@override
void start() {
clientStream.listen((client) {
var peer = _peer = new json_rpc_2.Peer(client);
var peer = _peer = json_rpc_2.Peer(client);
peer.registerMethod('publish', (json_rpc_2.Parameters params) async {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var eventName = params['event_name'].asString;
var value = params['value'].value;
var rq = new _JsonRpc2PublishRequestImpl(
var rq = _JsonRpc2PublishRequestImpl(
requestId, clientId, eventName, value, peer);
_onPublish.add(rq);
});
@ -74,7 +74,7 @@ class JsonRpc2Adapter extends Adapter {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var eventName = params['event_name'].asString;
var rq = new _JsonRpc2SubscriptionRequestImpl(
var rq = _JsonRpc2SubscriptionRequestImpl(
clientId, eventName, requestId, peer, _uuid);
_onSubscribe.add(rq);
});
@ -83,7 +83,7 @@ class JsonRpc2Adapter extends Adapter {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var subscriptionId = params['subscription_id'].asString;
var rq = new _JsonRpc2UnsubscriptionRequestImpl(
var rq = _JsonRpc2UnsubscriptionRequestImpl(
clientId, subscriptionId, peer, requestId);
_onUnsubscribe.add(rq);
});
@ -104,8 +104,14 @@ class JsonRpc2Adapter extends Adapter {
}
class _JsonRpc2PublishRequestImpl extends PublishRequest {
final String requestId, clientId, eventName;
final String requestId;
@override
final String? clientId, eventName;
@override
final value;
final json_rpc_2.Peer peer;
_JsonRpc2PublishRequestImpl(
@ -135,7 +141,7 @@ class _JsonRpc2PublishRequestImpl extends PublishRequest {
class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest {
@override
final String clientId, eventName;
final String? clientId, eventName;
final String requestId;
@ -147,7 +153,7 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest {
this.clientId, this.eventName, this.requestId, this.peer, this._uuid);
@override
FutureOr<Subscription> accept(String clientId) {
FutureOr<Subscription> accept(String? clientId) {
var id = _uuid.v4();
peer.sendNotification(requestId, {
'status': true,
@ -155,7 +161,7 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest {
'subscription_id': id,
'client_id': clientId
});
return new _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer);
return _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer);
}
@override
@ -170,9 +176,9 @@ class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest {
class _JsonRpc2SubscriptionImpl extends Subscription {
@override
final String clientId, id;
final String? clientId, id;
final String eventName;
final String? eventName;
final json_rpc_2.Peer peer;
@ -186,7 +192,7 @@ class _JsonRpc2SubscriptionImpl extends Subscription {
class _JsonRpc2UnsubscriptionRequestImpl extends UnsubscriptionRequest {
@override
final String clientId;
final String? clientId;
@override
final String subscriptionId;

View file

@ -1,10 +1,10 @@
/// Represents a request to publish information to other clients.
abstract class PublishRequest {
/// The ID of the client sending this request.
String get clientId;
String? get clientId;
/// The name of the event to be sent.
String get eventName;
String? get eventName;
/// The value to be published as an event.
dynamic get value;
@ -22,7 +22,7 @@ class PublishResponse {
final int listeners;
/// The client ID returned the server. Significant in cases where an ad-hoc client was registered.
final String clientId;
final String? clientId;
const PublishResponse(this.listeners, this.clientId);
}

View file

@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:math';
import 'adapter.dart';
import 'client.dart';
import 'package:collection/collection.dart' show IterableExtension;
import 'publish.dart';
import 'subscription.dart';
@ -12,32 +13,32 @@ import 'subscription.dart';
class Server {
final List<Adapter> _adapters = [];
final List<ClientInfo> _clients = [];
final _rnd = new Random.secure();
final Map<String, List<Subscription>> _subscriptions = {};
final _rnd = Random.secure();
final Map<String?, List<Subscription>> _subscriptions = {};
bool _started = false;
int _adHocIds = 0;
/// Initialize a server, optionally with a number of [adapters].
Server([Iterable<Adapter> adapters = const []]) {
_adapters.addAll(adapters ?? []);
_adapters.addAll(adapters);
}
/// Adds a new [Adapter] to adapt incoming clients from a new interface.
void addAdapter(Adapter adapter) {
if (_started)
throw new StateError(
if (_started) {
throw StateError(
'You cannot add new adapters after the server has started listening.');
else {
} else {
_adapters.add(adapter);
}
}
/// Registers a new client with the server.
void registerClient(ClientInfo client) {
if (_started)
throw new StateError(
if (_started) {
throw StateError(
'You cannot register new clients after the server has started listening.');
else {
} else {
_clients.add(client);
}
}
@ -48,25 +49,26 @@ class Server {
_adapters.clear();
_clients.clear();
_subscriptions.clear();
return new Future.value();
return Future.value();
}
String _newClientId() {
// Create an unpredictable-enough ID. The harder it is for an attacker to guess, the better.
var id =
'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${new DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}';
'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}';
// This client is coming from a trusted source, and can therefore both publish and subscribe.
_clients.add(new ClientInfo(id));
_clients.add(ClientInfo(id));
return id;
}
void start() {
if (_adapters.isEmpty)
throw new StateError(
if (_adapters.isEmpty) {
throw StateError(
'Cannot start a SyncServer that has no adapters attached.');
else if (_started)
throw new StateError('A SyncServer may only be started once.');
} else if (_started) {
throw StateError('A SyncServer may only be started once.');
}
_started = true;
@ -77,15 +79,14 @@ class Server {
for (var adapter in _adapters) {
// Handle publishes
adapter.onPublish.listen((rq) {
ClientInfo client;
String clientId;
ClientInfo? client;
String? clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedPublishRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client =
_clients.firstWhere((c) => c.id == clientId, orElse: () => null);
client = _clients.firstWhereOrNull((c) => c.id == clientId);
}
if (client == null) {
@ -98,28 +99,27 @@ class Server {
[];
if (listeners.isEmpty) {
rq.accept(new PublishResponse(0, clientId));
rq.accept(PublishResponse(0, clientId));
} else {
for (var listener in listeners) {
listener.dispatch(rq.value);
}
rq.accept(new PublishResponse(listeners.length, clientId));
rq.accept(PublishResponse(listeners.length, clientId));
}
}
});
// Listen for incoming subscriptions
adapter.onSubscribe.listen((rq) async {
ClientInfo client;
String clientId;
ClientInfo? client;
String? clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedSubscriptionRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client =
_clients.firstWhere((c) => c.id == clientId, orElse: () => null);
client = _clients.firstWhereOrNull((c) => c.id == clientId);
}
if (client == null) {
@ -135,12 +135,11 @@ class Server {
// Unregister subscriptions on unsubscribe
adapter.onUnsubscribe.listen((rq) {
Subscription toRemove;
List<Subscription> sourceList;
Subscription? toRemove;
late List<Subscription> sourceList;
for (var list in _subscriptions.values) {
toRemove = list.firstWhere((s) => s.id == rq.subscriptionId,
orElse: () => null);
toRemove = list.firstWhereOrNull((s) => s.id == rq.subscriptionId);
if (toRemove != null) {
sourceList = list;
break;

View file

@ -3,15 +3,15 @@ import 'dart:async';
/// Represents a request to subscribe to an event.
abstract class SubscriptionRequest {
/// The ID of the client requesting to subscribe.
String get clientId;
String? get clientId;
/// The name of the event the client wants to subscribe to.
String get eventName;
String? get eventName;
/// Accept the request, and grant the client access to subscribe to the event.
///
/// Includes the client's ID, which is necessary for ad-hoc clients.
FutureOr<Subscription> accept(String clientId);
FutureOr<Subscription> accept(String? clientId);
/// Deny the request with an error message.
void reject(String errorMessage);
@ -20,10 +20,10 @@ abstract class SubscriptionRequest {
/// Represents a request to unsubscribe to an event.
abstract class UnsubscriptionRequest {
/// The ID of the client requesting to unsubscribe.
String get clientId;
String? get clientId;
/// The name of the event the client wants to unsubscribe from.
String get subscriptionId;
String? get subscriptionId;
/// Accept the request.
FutureOr<void> accept();
@ -37,10 +37,10 @@ abstract class UnsubscriptionRequest {
/// Also provides a means to fire an event.
abstract class Subscription {
/// A unique identifier for this subscription.
String get id;
String? get id;
/// The ID of the client who requested this subscription.
String get clientId;
String? get clientId;
/// Alerts a client of an event.
void dispatch(event);

View file

@ -5,11 +5,12 @@ author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/thosakwe/pub_sub
publish_to: none
environment:
sdk: ">=2.10.0 <3.0.0"
sdk: '>=2.12.0 <3.0.0'
dependencies:
json_rpc_2: ^2.0.0
stream_channel: ">=1.0.0 <3.0.0"
uuid: ^3.0.1
json_rpc_2: ^3.0.0
stream_channel: ^2.1.0
uuid: ^3.0.4
collection: ^1.15.0-nullsafety.4
dev_dependencies:
pedantic: ^1.0.0
test: ^1.0.0
pedantic: ^1.11.0
test: ^1.17.3

View file

@ -3,23 +3,23 @@ import 'package:pub_sub/pub_sub.dart';
import 'package:pub_sub/isolate.dart';
import 'package:test/test.dart';
main() {
Server server;
Client client1, client2, client3;
IsolateClient trustedClient;
IsolateAdapter adapter;
void main() {
late Server server;
late Client client1, client2, client3;
late IsolateClient trustedClient;
late IsolateAdapter adapter;
setUp(() async {
adapter = new IsolateAdapter();
adapter = IsolateAdapter();
client1 =
new IsolateClient('isolate_test::secret', adapter.receivePort.sendPort);
client2 = new IsolateClient(
'isolate_test::secret2', adapter.receivePort.sendPort);
client3 = new IsolateClient(
'isolate_test::secret3', adapter.receivePort.sendPort);
trustedClient = new IsolateClient(null, adapter.receivePort.sendPort);
IsolateClient('isolate_test::secret', adapter.receivePort.sendPort);
client2 =
IsolateClient('isolate_test::secret2', adapter.receivePort.sendPort);
client3 =
IsolateClient('isolate_test::secret3', adapter.receivePort.sendPort);
trustedClient = IsolateClient(null, adapter.receivePort.sendPort);
server = new Server([adapter])
server = Server([adapter])
..registerClient(const ClientInfo('isolate_test::secret'))
..registerClient(const ClientInfo('isolate_test::secret2'))
..registerClient(const ClientInfo('isolate_test::secret3'))
@ -51,7 +51,7 @@ main() {
expect(trustedClient.clientId, isNotNull);
});
test('can sub/unsub', () async {
String clientId;
String? clientId;
await trustedClient.publish('heyaaa', 'byeaa');
expect(clientId = trustedClient.clientId, isNotNull);
@ -88,7 +88,7 @@ main() {
group('isolate_server', () {
test('reject unknown client id', () async {
try {
var client = new IsolateClient(
var client = IsolateClient(
'isolate_test::invalid', adapter.receivePort.sendPort);
await client.publish('foo', 'bar');
throw 'Invalid client ID\'s should throw an error, but they do not.';
@ -99,7 +99,7 @@ main() {
test('reject unprivileged publish', () async {
try {
var client = new IsolateClient(
var client = IsolateClient(
'isolate_test::no_publish', adapter.receivePort.sendPort);
await client.publish('foo', 'bar');
throw 'Unprivileged publishes should throw an error, but they do not.';
@ -110,7 +110,7 @@ main() {
test('reject unprivileged subscribe', () async {
try {
var client = new IsolateClient(
var client = IsolateClient(
'isolate_test::no_subscribe', adapter.receivePort.sendPort);
await client.subscribe('foo');
throw 'Unprivileged subscribes should throw an error, but they do not.';

View file

@ -6,17 +6,17 @@ import 'package:pub_sub/json_rpc_2.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
main() {
ServerSocket serverSocket;
Server server;
Client client1, client2, client3;
JsonRpc2Client trustedClient;
void main() {
late ServerSocket serverSocket;
late Server server;
late Client client1, client2, client3;
late JsonRpc2Client trustedClient;
JsonRpc2Adapter adapter;
setUp(() async {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);
adapter = new JsonRpc2Adapter(
adapter = JsonRpc2Adapter(
serverSocket.map<StreamChannel<String>>(streamSocket),
isTrusted: true);
@ -29,15 +29,12 @@ main() {
var socket4 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
client1 =
new JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1));
client2 =
new JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2));
client3 =
new JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3));
trustedClient = new JsonRpc2Client(null, streamSocket(socket4));
client1 = JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1));
client2 = JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2));
client3 = JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3));
trustedClient = JsonRpc2Client(null, streamSocket(socket4));
server = new Server([adapter])
server = Server([adapter])
..registerClient(const ClientInfo('json_rpc_2_test::secret'))
..registerClient(const ClientInfo('json_rpc_2_test::secret2'))
..registerClient(const ClientInfo('json_rpc_2_test::secret3'))
@ -64,7 +61,7 @@ main() {
expect(trustedClient.clientId, isNotNull);
});
test('can sub/unsub', () async {
String clientId;
String? clientId;
await trustedClient.publish('heyaaa', 'byeaa');
expect(clientId = trustedClient.clientId, isNotNull);
@ -104,7 +101,7 @@ main() {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client =
new JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock));
JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock));
await client.publish('foo', 'bar');
throw 'Invalid client ID\'s should throw an error, but they do not.';
} on PubSubException catch (e) {
@ -116,8 +113,8 @@ main() {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client = new JsonRpc2Client(
'json_rpc_2_test::no_publish', streamSocket(sock));
var client =
JsonRpc2Client('json_rpc_2_test::no_publish', streamSocket(sock));
await client.publish('foo', 'bar');
throw 'Unprivileged publishes should throw an error, but they do not.';
} on PubSubException catch (e) {
@ -129,8 +126,8 @@ main() {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client = new JsonRpc2Client(
'json_rpc_2_test::no_subscribe', streamSocket(sock));
var client =
JsonRpc2Client('json_rpc_2_test::no_subscribe', streamSocket(sock));
await client.subscribe('foo');
throw 'Unprivileged subscribes should throw an error, but they do not.';
} on PubSubException catch (e) {
@ -141,18 +138,20 @@ main() {
}
StreamChannel<String> streamSocket(Socket socket) {
var channel = new _SocketStreamChannel(socket);
return channel.transform(new StreamChannelTransformer.fromCodec(utf8));
var channel = _SocketStreamChannel(socket);
return channel
.cast<List<int>>()
.transform(StreamChannelTransformer.fromCodec(utf8));
}
class _SocketStreamChannel extends StreamChannelMixin<List<int>> {
_SocketSink _sink;
_SocketSink? _sink;
final Socket socket;
_SocketStreamChannel(this.socket);
@override
StreamSink<List<int>> get sink => _sink ??= new _SocketSink(socket);
StreamSink<List<int>> get sink => _sink ??= _SocketSink(socket);
@override
Stream<List<int>> get stream => socket;
@ -169,7 +168,7 @@ class _SocketSink extends StreamSink<List<int>> {
}
@override
void addError(Object error, [StackTrace stackTrace]) {
void addError(Object error, [StackTrace? stackTrace]) {
Zone.current.errorCallback(error, stackTrace);
}