diff --git a/packages/pub_sub/.gitignore b/packages/pub_sub/.gitignore new file mode 100644 index 00000000..321543c9 --- /dev/null +++ b/packages/pub_sub/.gitignore @@ -0,0 +1,13 @@ +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub +.packages +.pub/ +build/ +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ +.dart_tool \ No newline at end of file diff --git a/packages/pub_sub/.travis.yml b/packages/pub_sub/.travis.yml new file mode 100644 index 00000000..de2210c9 --- /dev/null +++ b/packages/pub_sub/.travis.yml @@ -0,0 +1 @@ +language: dart \ No newline at end of file diff --git a/packages/pub_sub/CHANGELOG.md b/packages/pub_sub/CHANGELOG.md new file mode 100644 index 00000000..67cad18c --- /dev/null +++ b/packages/pub_sub/CHANGELOG.md @@ -0,0 +1,13 @@ +# 2.3.0 +* Allow `2.x` versions of `stream_channel`. +* Apply `package:pedantic` lints. + +# 2.2.0 +* Upgrade `uuid`. + +# 2.1.0 +* Allow for "trusted clients," which are implicitly-registered clients. +This makes using `package:pub_sub` easier, as well making it easier to scale. + +# 2.0.0 +* Dart 2 updates. \ No newline at end of file diff --git a/packages/pub_sub/LICENSE b/packages/pub_sub/LICENSE new file mode 100644 index 00000000..8864d4a3 --- /dev/null +++ b/packages/pub_sub/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/pub_sub/README.md b/packages/pub_sub/README.md new file mode 100644 index 00000000..1c75f028 --- /dev/null +++ b/packages/pub_sub/README.md @@ -0,0 +1,220 @@ +# pub_sub +[![Pub](https://img.shields.io/pub/v/pub_sub.svg)](https://pub.dartlang.org/packages/pub_sub) +[![build status](https://travis-ci.org/thosakwe/pub_sub.svg)](https://travis-ci.org/thosakwe/pub_sub) + +Keep application instances in sync with a simple pub/sub API. + +# Installation +Add `pub_sub` as a dependency in your `pubspec.yaml` file: + +```yaml +dependencies: + pub_sub: ^1.0.0 +``` + +Then, be sure to run `pub get` in your terminal. + +# Usage +`pub_sub` is your typical pub/sub API. However, `pub_sub` enforces authentication of every +request. It is very possible that `pub_sub` will run on both servers and in the browser, +or on a platform like Flutter. Thus, there are provisions available to limit +access. + +**Be careful to not leak any `pub_sub` client ID's if operating over a network.** +If you do, you risk malicious users injecting events into your application, which +could ultimately spell *disaster*. + +A `pub_sub` server can operate across multiple *adapters*, which take care of interfacing data over different +media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as +WebSockets, simultaneously. + +```dart +import 'package:pub_sub/pub_sub.dart' as pub_sub; + +main() async { + var server = new pub_sub.Server([ + new FooAdapter(...), + new BarAdapter(...) + ]); + + server.addAdapter(new BazAdapter(...)); + + // Call `start` to activate adapters, and begin handling requests. + server.start(); +} +``` +### Trusted Clients +You can use `package:pub_sub` without explicitly registering +clients, *if and only if* those clients come from trusted sources. + +Clients via `Isolate` are always trusted. + +Clients via `package:json_rpc_2` must be explicitly marked +as trusted (i.e. using an IP whitelist mechanism): + +```dart +new JsonRpc2Adapter(..., isTrusted: false); + +// Pass `null` as Client ID when trusted... +new pub_sub.IsolateClient(null); +``` + +### Access Control +The ID's of all *untrusted* clients who will connect to the server must be known at start-up time. +You may not register new clients after the server has started. This is mostly a security consideration; +if it is impossible to register new clients, then malicious users cannot grant themselves additional +privileges within the system. + +```dart +import 'package:pub_sub/pub_sub.dart' as pub_sub; + +main() async { + // ... + server.registerClient(const ClientInfo('')); + + // Create a user who can subscribe, but not publish. + server.registerClient(const ClientInfo('', canPublish: false)); + + // Create a user who can publish, but not subscribe. + server.registerClient(const ClientInfo('', canSubscribe: false)); + + // Create a user with no privileges whatsoever. + server.registerClient(const ClientInfo('', canPublish: false, canSubscribe: false)); + + server.start(); +} +``` + +## Isolates +If you are just running multiple instances of a server, +use `package:pub_sub/isolate.dart`. + +You'll need one isolate to be the master. Typically this is the first isolate you create. + +```dart +import 'dart:io'; +import 'dart:isolate'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; + +main() async { + // Easily bring up a server. + var adapter = new pub_sub.IsolateAdapter(); + var server = new pub_sub.Server([adapter]); + + // You then need to create a client that will connect to the adapter. + // Each isolate in your application should contain a client. + for (int i = 0; i < Platform.numberOfProcessors - 1; i++) { + server.registerClient(new pub_sub.ClientInfo('client$i')); + } + + // Start the server. + server.start(); + + // Next, let's start isolates that interact with the server. + // + // Fortunately, we can send SendPorts over Isolates, so this is no hassle. + for (int i = 0; i < Platform.numberOfProcessors - 1; i++) + Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); + + // It's possible that you're running your application in the server isolate as well: + isolateMain([0, adapter.receivePort.sendPort]); +} + +void isolateMain(List args) { + var client = + new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); + + // The client will connect automatically. In the meantime, we can start subscribing to events. + client.subscribe('user::logged_in').then((sub) { + // The `ClientSubscription` class extends `Stream`. Hooray for asynchrony! + sub.listen((msg) { + print('Logged in: $msg'); + }); + }); +} + +``` + +## JSON RPC 2.0 +If you are not running on isolates, you need to import +`package:pub_sub/json_rpc_2.dart`. This library leverages `package:json_rpc_2` and +`package:stream_channel` to create clients and servers that can hypothetically run on any +medium, i.e. WebSockets, or TCP Sockets. + +Check out `test/json_rpc_2_test.dart` for an example of serving `pub_sub` over TCP sockets. + +# Protocol +`pub_sub` is built upon a simple RPC, and this package includes +an implementation that runs via `SendPort`s and `ReceivePort`s, as +well as one that runs on any `StreamChannel`. + +Data sent over the wire looks like the following: + +```typescript +// Sent by a client to initiate an exchange. +interface Request { + // This is an arbitrary string, assigned by your client, but in every case, + // the client uses this to match your requests with asynchronous responses. + request_id: string, + + // The ID of the client to authenticate as. + // + // As you can imagine, this should be kept secret, to prevent breaches. + client_id: string, + + // Required for *every* request. + params: { + // A value to be `publish`ed. + value?: any, + + // The name of an event to `publish`. + event_name?: string, + + // The ID of a subscription to be cancelled. + subscription_id?: string + } +} + +/// Sent by the server in response to a request. +interface Response { + // `true` for success, `false` for failures. + status: boolean, + + // Only appears if `status` is `false`; explains why an operation failed. + error_message?: string, + + // Matches the request_id sent by the client. + request_id: string, + + result?: { + // The number of other clients to whom an event was `publish`ed. + listeners:? number, + + // The ID of a created subscription. + subscription_id?: string + } +} +``` + +When sending via JSON_RPC 2.0, the `params` of a `Request` are simply folded into the object +itself, for simplicity's sake. In this case, a response will be sent as a notification whose +name is the `request_id`. + +In the case of Isolate clients/servers, events will be simply sent as Lists: + +```dart +['', value] +``` + +Clients can send the following (3) methods: + +* `subscribe` (`event_name`:string): Subscribe to an event. +* `unsubscribe` (`subscription_id`:string): Unsubscribe from an event you previously subscribed to. +* `publish` (`event_name`:string, `value`:any): Publish an event to all other clients who are subscribed. + +The client and server in `package:pub_sub/isolate.dart` must make extra +provisions to keep track of client ID's. Since `SendPort`s and `ReceivePort`s +do not have any sort of guaranteed-unique ID's, new clients must send their +`SendPort` to the server before sending any requests. The server then responds +with an `id` that must be used to identify a `SendPort` to send a response to. \ No newline at end of file diff --git a/packages/pub_sub/analysis_options.yaml b/packages/pub_sub/analysis_options.yaml new file mode 100644 index 00000000..c230cee7 --- /dev/null +++ b/packages/pub_sub/analysis_options.yaml @@ -0,0 +1,4 @@ +include: package:pedantic/analysis_options.yaml +analyzer: + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/packages/pub_sub/example/main.dart b/packages/pub_sub/example/main.dart new file mode 100644 index 00000000..6a1db68a --- /dev/null +++ b/packages/pub_sub/example/main.dart @@ -0,0 +1,44 @@ +import 'dart:io'; +import 'dart:isolate'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; + +main() async { + // Easily bring up a server. + var adapter = new pub_sub.IsolateAdapter(); + var server = new pub_sub.Server([adapter]); + + // You then need to create a client that will connect to the adapter. + // Every untrusted client in your application should be pre-registered. + // + // In the case of Isolates, however, those are always implicitly trusted. + for (int i = 0; i < Platform.numberOfProcessors - 1; i++) { + server.registerClient(new pub_sub.ClientInfo('client$i')); + } + + // Start the server. + server.start(); + + // Next, let's start isolates that interact with the server. + // + // Fortunately, we can send SendPorts over Isolates, so this is no hassle. + for (int i = 0; i < Platform.numberOfProcessors - 1; i++) + await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); + + // It's possible that you're running your application in the server isolate as well: + isolateMain([0, adapter.receivePort.sendPort]); +} + +void isolateMain(List args) { + // Isolates are always trusted, so technically we don't need to pass a client iD. + var client = + new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); + + // The client will connect automatically. In the meantime, we can start subscribing to events. + client.subscribe('user::logged_in').then((sub) { + // The `ClientSubscription` class extends `Stream`. Hooray for asynchrony! + sub.listen((msg) { + print('Logged in: $msg'); + }); + }); +} diff --git a/packages/pub_sub/lib/isolate.dart b/packages/pub_sub/lib/isolate.dart new file mode 100644 index 00000000..0fcf44b3 --- /dev/null +++ b/packages/pub_sub/lib/isolate.dart @@ -0,0 +1,2 @@ +export 'src/isolate/client.dart'; +export 'src/isolate/server.dart'; diff --git a/packages/pub_sub/lib/json_rpc_2.dart b/packages/pub_sub/lib/json_rpc_2.dart new file mode 100644 index 00000000..41bd3b00 --- /dev/null +++ b/packages/pub_sub/lib/json_rpc_2.dart @@ -0,0 +1,2 @@ +export 'src/json_rpc/client.dart'; +export 'src/json_rpc/server.dart'; diff --git a/packages/pub_sub/lib/pub_sub.dart b/packages/pub_sub/lib/pub_sub.dart new file mode 100644 index 00000000..84505470 --- /dev/null +++ b/packages/pub_sub/lib/pub_sub.dart @@ -0,0 +1 @@ +export 'src/protocol/protocol.dart'; diff --git a/packages/pub_sub/lib/src/isolate/client.dart b/packages/pub_sub/lib/src/isolate/client.dart new file mode 100644 index 00000000..2776615f --- /dev/null +++ b/packages/pub_sub/lib/src/isolate/client.dart @@ -0,0 +1,184 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:isolate'; +import 'package:uuid/uuid.dart'; +import '../../pub_sub.dart'; + +/// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s. +class IsolateClient extends Client { + final Queue> _onConnect = new Queue>(); + final Map> _requests = {}; + final List<_IsolateClientSubscription> _subscriptions = []; + final Uuid _uuid = new Uuid(); + + String _id; + + /// The ID of the client we are authenticating as. + /// + /// May be `null`, if and only if we are marked as a trusted source on + /// the server side. + String get clientId => _clientId; + String _clientId; + + /// A server's [SendPort] that messages should be sent to. + final SendPort serverSendPort; + + /// A [ReceivePort] that receives messages from the server. + final ReceivePort receivePort = new ReceivePort(); + + IsolateClient(String clientId, this.serverSendPort) { + _clientId = clientId; + receivePort.listen((data) { + if (data is Map && data['request_id'] is String) { + var requestId = data['request_id'] as String; + var c = _requests.remove(requestId); + + if (c != null && !c.isCompleted) { + if (data['status'] is! bool) { + c.completeError( + new FormatException('The server sent an invalid response.')); + } else if (!(data['status'] as bool)) { + c.completeError(new PubSubException(data['error_message'] + ?.toString() ?? + 'The server sent a failure response, but did not provide an error message.')); + } else if (data['result'] is! Map) { + c.completeError(new FormatException( + 'The server sent a success response, but did not include a result.')); + } else { + c.complete(data['result'] as Map); + } + } + } else if (data is Map && data['id'] is String && _id == null) { + _id = data['id'] as String; + + for (var c in _onConnect) { + if (!c.isCompleted) c.complete(_id); + } + + _onConnect.clear(); + } else if (data is List && data.length == 2 && data[0] is String) { + var eventName = data[0] as String, event = data[1]; + for (var s in _subscriptions.where((s) => s.eventName == eventName)) { + if (!s._stream.isClosed) s._stream.add(event); + } + } + }); + serverSendPort.send(receivePort.sendPort); + } + + Future _whenConnected(FutureOr callback()) { + if (_id != null) + return new Future.sync(callback); + else { + var c = new Completer(); + _onConnect.add(c); + return c.future.then((_) => callback()); + } + } + + @override + Future publish(String eventName, value) { + return _whenConnected(() { + var c = new Completer(); + var requestId = _uuid.v4(); + _requests[requestId] = c; + serverSendPort.send({ + 'id': _id, + 'request_id': requestId, + 'method': 'publish', + 'params': { + 'client_id': clientId, + 'event_name': eventName, + 'value': value + } + }); + return c.future.then((result) { + _clientId = result['client_id'] as String; + }); + }); + } + + @override + Future subscribe(String eventName) { + return _whenConnected(() { + var c = new Completer(); + var requestId = _uuid.v4(); + _requests[requestId] = c; + serverSendPort.send({ + 'id': _id, + 'request_id': requestId, + 'method': 'subscribe', + 'params': {'client_id': clientId, 'event_name': eventName} + }); + return c.future.then((result) { + _clientId = result['client_id'] as String; + var s = new _IsolateClientSubscription( + eventName, result['subscription_id'] as String, this); + _subscriptions.add(s); + return s; + }); + }); + } + + @override + Future close() { + receivePort.close(); + + for (var c in _onConnect) { + if (!c.isCompleted) { + c.completeError(new StateError( + 'The client was closed before the server ever accepted the connection.')); + } + } + + for (var c in _requests.values) { + if (!c.isCompleted) { + c.completeError(new StateError( + 'The client was closed before the server responded to this request.')); + } + } + + for (var s in _subscriptions) s._close(); + + _requests.clear(); + return new Future.value(); + } +} + +class _IsolateClientSubscription extends ClientSubscription { + final StreamController _stream = new StreamController(); + final String eventName, id; + final IsolateClient client; + + _IsolateClientSubscription(this.eventName, this.id, this.client); + + void _close() { + if (!_stream.isClosed) _stream.close(); + } + + @override + StreamSubscription listen(void onData(event), + {Function onError, void onDone(), bool cancelOnError}) { + return _stream.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + Future unsubscribe() { + return client._whenConnected(() { + var c = new Completer(); + var requestId = client._uuid.v4(); + client._requests[requestId] = c; + client.serverSendPort.send({ + 'id': client._id, + 'request_id': requestId, + 'method': 'unsubscribe', + 'params': {'client_id': client.clientId, 'subscription_id': id} + }); + + return c.future.then((_) { + _close(); + }); + }); + } +} diff --git a/packages/pub_sub/lib/src/isolate/server.dart b/packages/pub_sub/lib/src/isolate/server.dart new file mode 100644 index 00000000..c442d461 --- /dev/null +++ b/packages/pub_sub/lib/src/isolate/server.dart @@ -0,0 +1,253 @@ +import 'dart:async'; +import 'dart:isolate'; +import 'package:uuid/uuid.dart'; +import '../../pub_sub.dart'; + +/// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s. +class IsolateAdapter extends Adapter { + final Map _clients = {}; + final StreamController _onPublish = + new StreamController(); + final StreamController _onSubscribe = + new StreamController(); + final StreamController _onUnsubscribe = + new StreamController(); + final Uuid _uuid = new Uuid(); + + /// A [ReceivePort] on which to listen for incoming data. + final ReceivePort receivePort = new ReceivePort(); + + @override + Stream get onPublish => _onPublish.stream; + + @override + Stream get onSubscribe => _onSubscribe.stream; + + @override + Stream get onUnsubscribe => _onUnsubscribe.stream; + + @override + Future close() { + receivePort.close(); + _clients.clear(); + _onPublish.close(); + _onSubscribe.close(); + _onUnsubscribe.close(); + return new Future.value(); + } + + @override + void start() { + receivePort.listen((data) { + if (data is SendPort) { + var id = _uuid.v4(); + _clients[id] = data; + data.send({'status': true, 'id': id}); + } else if (data is Map && + data['id'] is String && + data['request_id'] is String && + data['method'] is String && + data['params'] is Map) { + var id = data['id'] as String, + requestId = data['request_id'] as String, + method = data['method'] as String; + var params = data['params'] as Map; + var sp = _clients[id]; + + if (sp == null) { + // There's nobody to respond to, so don't send anything to anyone. Oops. + } else if (method == 'publish') { + if (_isValidClientId(params['client_id']) && + params['event_name'] is String && + params.containsKey('value')) { + var clientId = params['client_id'] as String, + eventName = params['event_name'] as String; + var value = params['value']; + var rq = new _IsolatePublishRequestImpl( + requestId, clientId, eventName, value, sp); + _onPublish.add(rq); + } else { + sp.send({ + 'status': false, + 'request_id': requestId, + 'error_message': 'Expected client_id, event_name, and value.' + }); + } + } else if (method == 'subscribe') { + if (_isValidClientId(params['client_id']) && + params['event_name'] is String) { + var clientId = params['client_id'] as String, + eventName = params['event_name'] as String; + var rq = new _IsolateSubscriptionRequestImpl( + clientId, eventName, sp, requestId, _uuid); + _onSubscribe.add(rq); + } else { + sp.send({ + 'status': false, + 'request_id': requestId, + 'error_message': 'Expected client_id, and event_name.' + }); + } + } else if (method == 'unsubscribe') { + if (_isValidClientId(params['client_id']) && + params['subscription_id'] is String) { + var clientId = params['client_id'] as String, + subscriptionId = params['subscription_id'] as String; + var rq = new _IsolateUnsubscriptionRequestImpl( + clientId, subscriptionId, sp, requestId); + _onUnsubscribe.add(rq); + } else { + sp.send({ + 'status': false, + 'request_id': requestId, + 'error_message': 'Expected client_id, and subscription_id.' + }); + } + } else { + sp.send({ + 'status': false, + 'request_id': requestId, + 'error_message': + 'Unrecognized method "$method". Or, you omitted id, request_id, method, or params.' + }); + } + } + }); + } + + bool _isValidClientId(id) => id == null || id is String; + + @override + bool isTrustedPublishRequest(PublishRequest request) { + // Isolate clients are considered trusted, because they are + // running in the same process as the central server. + return true; + } + + @override + bool isTrustedSubscriptionRequest(SubscriptionRequest request) { + return true; + } +} + +class _IsolatePublishRequestImpl extends PublishRequest { + @override + final String clientId; + + @override + final String eventName; + + @override + final value; + + final SendPort sendPort; + + final String requestId; + + _IsolatePublishRequestImpl( + this.requestId, this.clientId, this.eventName, this.value, this.sendPort); + + @override + void accept(PublishResponse response) { + sendPort.send({ + 'status': true, + 'request_id': requestId, + 'result': { + 'listeners': response.listeners, + 'client_id': response.clientId + } + }); + } + + @override + void reject(String errorMessage) { + sendPort.send({ + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } +} + +class _IsolateSubscriptionRequestImpl extends SubscriptionRequest { + @override + final String clientId; + + @override + final String eventName; + + final SendPort sendPort; + + final String requestId; + + final Uuid _uuid; + + _IsolateSubscriptionRequestImpl( + this.clientId, this.eventName, this.sendPort, this.requestId, this._uuid); + + @override + void reject(String errorMessage) { + sendPort.send({ + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } + + @override + FutureOr accept(String clientId) { + var id = _uuid.v4(); + sendPort.send({ + 'status': true, + 'request_id': requestId, + 'result': {'subscription_id': id, 'client_id': clientId} + }); + return new _IsolateSubscriptionImpl(clientId, id, eventName, sendPort); + } +} + +class _IsolateSubscriptionImpl extends Subscription { + @override + final String clientId, id; + + final String eventName; + + final SendPort sendPort; + + _IsolateSubscriptionImpl( + this.clientId, this.id, this.eventName, this.sendPort); + + @override + void dispatch(event) { + sendPort.send([eventName, event]); + } +} + +class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest { + @override + final String clientId; + + @override + final String subscriptionId; + + final SendPort sendPort; + + final String requestId; + + _IsolateUnsubscriptionRequestImpl( + this.clientId, this.subscriptionId, this.sendPort, this.requestId); + + @override + void reject(String errorMessage) { + sendPort.send({ + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } + + @override + void accept() { + sendPort.send({'status': true, 'request_id': requestId, 'result': {}}); + } +} diff --git a/packages/pub_sub/lib/src/json_rpc/client.dart b/packages/pub_sub/lib/src/json_rpc/client.dart new file mode 100644 index 00000000..e72cbedc --- /dev/null +++ b/packages/pub_sub/lib/src/json_rpc/client.dart @@ -0,0 +1,144 @@ +import 'dart:async'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2; +import 'package:uuid/uuid.dart'; +import '../../pub_sub.dart'; + +/// A [Client] implementation that communicates via JSON RPC 2.0. +class JsonRpc2Client extends Client { + final Map> _requests = {}; + final List<_JsonRpc2ClientSubscription> _subscriptions = []; + final Uuid _uuid = new Uuid(); + + json_rpc_2.Peer _peer; + + /// The ID of the client we are authenticating as. + /// + /// May be `null`, if and only if we are marked as a trusted source on + /// the server side. + String get clientId => _clientId; + String _clientId; + + JsonRpc2Client(String clientId, StreamChannel channel) { + _clientId = clientId; + _peer = new json_rpc_2.Peer(channel); + + _peer.registerMethod('event', (json_rpc_2.Parameters params) { + var eventName = params['event_name'].asString, + event = params['value'].value; + for (var s in _subscriptions.where((s) => s.eventName == eventName)) { + if (!s._stream.isClosed) s._stream.add(event); + } + }); + + _peer.registerFallback((json_rpc_2.Parameters params) { + var c = _requests.remove(params.method); + + if (c == null) + throw new json_rpc_2.RpcException.methodNotFound(params.method); + else { + var data = params.asMap; + + if (data['status'] is! bool) { + c.completeError( + new FormatException('The server sent an invalid response.')); + } else if (!(data['status'] as bool)) { + c.completeError(new PubSubException(data['error_message'] + ?.toString() ?? + 'The server sent a failure response, but did not provide an error message.')); + } else { + c.complete(data); + } + } + }); + + _peer.listen(); + } + + @override + Future publish(String eventName, value) { + var c = new Completer(); + var requestId = _uuid.v4(); + _requests[requestId] = c; + _peer.sendNotification('publish', { + 'request_id': requestId, + 'client_id': clientId, + 'event_name': eventName, + 'value': value + }); + return c.future.then((data) { + _clientId = data['result']['client_id'] as String; + }); + } + + @override + Future subscribe(String eventName) { + var c = new Completer(); + var requestId = _uuid.v4(); + _requests[requestId] = c; + _peer.sendNotification('subscribe', { + 'request_id': requestId, + 'client_id': clientId, + 'event_name': eventName + }); + return c.future.then((result) { + _clientId = result['client_id'] as String; + var s = new _JsonRpc2ClientSubscription( + eventName, result['subscription_id'] as String, this); + _subscriptions.add(s); + return s; + }); + } + + @override + Future close() { + if (_peer?.isClosed != true) _peer.close(); + + for (var c in _requests.values) { + if (!c.isCompleted) { + c.completeError(new StateError( + 'The client was closed before the server responded to this request.')); + } + } + + for (var s in _subscriptions) s._close(); + + _requests.clear(); + return new Future.value(); + } +} + +class _JsonRpc2ClientSubscription extends ClientSubscription { + final StreamController _stream = new StreamController(); + final String eventName, id; + final JsonRpc2Client client; + + _JsonRpc2ClientSubscription(this.eventName, this.id, this.client); + + void _close() { + if (!_stream.isClosed) _stream.close(); + } + + @override + StreamSubscription listen(void onData(event), + {Function onError, void onDone(), bool cancelOnError}) { + return _stream.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + Future unsubscribe() { + var c = new Completer(); + var requestId = client._uuid.v4(); + client._requests[requestId] = c; + client._peer.sendNotification('unsubscribe', { + 'request_id': requestId, + 'client_id': client.clientId, + 'subscription_id': id + }); + + return c.future.then((_) { + _close(); + }); + } +} diff --git a/packages/pub_sub/lib/src/json_rpc/server.dart b/packages/pub_sub/lib/src/json_rpc/server.dart new file mode 100644 index 00000000..5cf4fb84 --- /dev/null +++ b/packages/pub_sub/lib/src/json_rpc/server.dart @@ -0,0 +1,214 @@ +import 'dart:async'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2; +import 'package:uuid/uuid.dart'; +import '../../pub_sub.dart'; + +/// A [Adapter] implementation that communicates via JSON RPC 2.0. +class JsonRpc2Adapter extends Adapter { + final StreamController _onPublish = + new StreamController(); + final StreamController _onSubscribe = + new StreamController(); + final StreamController _onUnsubscribe = + new StreamController(); + + final List _peers = []; + final Uuid _uuid = new Uuid(); + + json_rpc_2.Peer _peer; + + /// A [Stream] of incoming clients, who can both send and receive string data. + final Stream> clientStream; + + /// If `true`, clients can connect through this endpoint, *without* providing a client ID. + /// + /// This can be a security vulnerability if you don't know what you're doing. + /// If you *must* use this over the Internet, use an IP whitelist. + final bool isTrusted; + + JsonRpc2Adapter(this.clientStream, {this.isTrusted = false}); + + @override + Stream get onPublish => _onPublish.stream; + + @override + Stream get onSubscribe => _onSubscribe.stream; + + @override + Stream get onUnsubscribe => _onUnsubscribe.stream; + + @override + Future close() { + if (_peer?.isClosed != true) _peer?.close(); + + Future.wait(_peers.where((s) => !s.isClosed).map((s) => s.close())) + .then((_) => _peers.clear()); + return new Future.value(); + } + + String _getClientId(json_rpc_2.Parameters params) { + try { + return params['client_id'].asString; + } catch (_) { + return null; + } + } + + @override + void start() { + clientStream.listen((client) { + var peer = _peer = new json_rpc_2.Peer(client); + + peer.registerMethod('publish', (json_rpc_2.Parameters params) async { + var requestId = params['request_id'].asString; + var clientId = _getClientId(params); + var eventName = params['event_name'].asString; + var value = params['value'].value; + var rq = new _JsonRpc2PublishRequestImpl( + requestId, clientId, eventName, value, peer); + _onPublish.add(rq); + }); + + peer.registerMethod('subscribe', (json_rpc_2.Parameters params) async { + var requestId = params['request_id'].asString; + var clientId = _getClientId(params); + var eventName = params['event_name'].asString; + var rq = new _JsonRpc2SubscriptionRequestImpl( + clientId, eventName, requestId, peer, _uuid); + _onSubscribe.add(rq); + }); + + peer.registerMethod('unsubscribe', (json_rpc_2.Parameters params) async { + var requestId = params['request_id'].asString; + var clientId = _getClientId(params); + var subscriptionId = params['subscription_id'].asString; + var rq = new _JsonRpc2UnsubscriptionRequestImpl( + clientId, subscriptionId, peer, requestId); + _onUnsubscribe.add(rq); + }); + + peer.listen(); + }); + } + + @override + bool isTrustedPublishRequest(PublishRequest request) { + return isTrusted; + } + + @override + bool isTrustedSubscriptionRequest(SubscriptionRequest request) { + return isTrusted; + } +} + +class _JsonRpc2PublishRequestImpl extends PublishRequest { + final String requestId, clientId, eventName; + final value; + final json_rpc_2.Peer peer; + + _JsonRpc2PublishRequestImpl( + this.requestId, this.clientId, this.eventName, this.value, this.peer); + + @override + void accept(PublishResponse response) { + peer.sendNotification(requestId, { + 'status': true, + 'request_id': requestId, + 'result': { + 'listeners': response.listeners, + 'client_id': response.clientId + } + }); + } + + @override + void reject(String errorMessage) { + peer.sendNotification(requestId, { + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } +} + +class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest { + @override + final String clientId, eventName; + + final String requestId; + + final json_rpc_2.Peer peer; + + final Uuid _uuid; + + _JsonRpc2SubscriptionRequestImpl( + this.clientId, this.eventName, this.requestId, this.peer, this._uuid); + + @override + FutureOr accept(String clientId) { + var id = _uuid.v4(); + peer.sendNotification(requestId, { + 'status': true, + 'request_id': requestId, + 'subscription_id': id, + 'client_id': clientId + }); + return new _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer); + } + + @override + void reject(String errorMessage) { + peer.sendNotification(requestId, { + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } +} + +class _JsonRpc2SubscriptionImpl extends Subscription { + @override + final String clientId, id; + + final String eventName; + + final json_rpc_2.Peer peer; + + _JsonRpc2SubscriptionImpl(this.clientId, this.id, this.eventName, this.peer); + + @override + void dispatch(event) { + peer.sendNotification('event', {'event_name': eventName, 'value': event}); + } +} + +class _JsonRpc2UnsubscriptionRequestImpl extends UnsubscriptionRequest { + @override + final String clientId; + + @override + final String subscriptionId; + + final json_rpc_2.Peer peer; + + final String requestId; + + _JsonRpc2UnsubscriptionRequestImpl( + this.clientId, this.subscriptionId, this.peer, this.requestId); + + @override + void accept() { + peer.sendNotification(requestId, {'status': true, 'result': {}}); + } + + @override + void reject(String errorMessage) { + peer.sendNotification(requestId, { + 'status': false, + 'request_id': requestId, + 'error_message': errorMessage + }); + } +} diff --git a/packages/pub_sub/lib/src/protocol/client/client.dart b/packages/pub_sub/lib/src/protocol/client/client.dart new file mode 100644 index 00000000..4a327160 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/client/client.dart @@ -0,0 +1,30 @@ +import 'dart:async'; + +/// Queries a `pub_sub` server. +abstract class Client { + /// Publishes an event to the server. + Future publish(String eventName, value); + + /// Request a [ClientSubscription] to the desired [eventName] from the server. + Future subscribe(String eventName); + + /// Disposes of this client. + Future close(); +} + +/// A client-side implementation of a subscription, which acts as a [Stream], and can be cancelled easily. +abstract class ClientSubscription extends Stream { + /// Stops listening for new events, and instructs the server to cancel the subscription. + Future unsubscribe(); +} + +/// Thrown as the result of an invalid request, or an attempt to perform an action without the correct privileges. +class PubSubException implements Exception { + /// The error message sent by the server. + final String message; + + const PubSubException(this.message); + + @override + String toString() => '`pub_sub` exception: $message'; +} diff --git a/packages/pub_sub/lib/src/protocol/client/sync_client.dart b/packages/pub_sub/lib/src/protocol/client/sync_client.dart new file mode 100644 index 00000000..93a32575 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/client/sync_client.dart @@ -0,0 +1 @@ +export 'client.dart'; diff --git a/packages/pub_sub/lib/src/protocol/protocol.dart b/packages/pub_sub/lib/src/protocol/protocol.dart new file mode 100644 index 00000000..9bf74c6a --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/protocol.dart @@ -0,0 +1,2 @@ +export 'client/sync_client.dart'; +export 'server/sync_server.dart'; diff --git a/packages/pub_sub/lib/src/protocol/server/adapter.dart b/packages/pub_sub/lib/src/protocol/server/adapter.dart new file mode 100644 index 00000000..e129b4af --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/adapter.dart @@ -0,0 +1,29 @@ +import 'dart:async'; +import 'publish.dart'; +import 'subscription.dart'; + +/// Adapts an abstract medium to serve the `pub_sub` RPC protocol. +abstract class Adapter { + /// Determines if a given [request] comes from a trusted source. + /// + /// If so, the request does not have to provide a pre-established ID, + /// and instead will be assigned one. + bool isTrustedPublishRequest(PublishRequest request); + + bool isTrustedSubscriptionRequest(SubscriptionRequest request); + + /// Fires an event whenever a client tries to publish data. + Stream get onPublish; + + /// Fires whenever a client tries to subscribe to an event. + Stream get onSubscribe; + + /// Fires whenever a client cancels a subscription. + Stream get onUnsubscribe; + + /// Disposes of this adapter. + Future close(); + + /// Start listening for incoming clients. + void start(); +} diff --git a/packages/pub_sub/lib/src/protocol/server/client.dart b/packages/pub_sub/lib/src/protocol/server/client.dart new file mode 100644 index 00000000..976e3fc4 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/client.dart @@ -0,0 +1,14 @@ +/// Represents information about a client that will be accessing +/// this `angel_sync` server. +class ClientInfo { + /// A unique identifier for this client. + final String id; + + /// If `true` (default), then the client is allowed to publish events. + final bool canPublish; + + /// If `true` (default), then the client can subscribe to events. + final bool canSubscribe; + + const ClientInfo(this.id, {this.canPublish = true, this.canSubscribe = true}); +} diff --git a/packages/pub_sub/lib/src/protocol/server/publish.dart b/packages/pub_sub/lib/src/protocol/server/publish.dart new file mode 100644 index 00000000..03fb7903 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/publish.dart @@ -0,0 +1,28 @@ +/// Represents a request to publish information to other clients. +abstract class PublishRequest { + /// The ID of the client sending this request. + String get clientId; + + /// The name of the event to be sent. + String get eventName; + + /// The value to be published as an event. + dynamic get value; + + /// Accept the request, with a response. + void accept(PublishResponse response); + + /// Deny the request with an error message. + void reject(String errorMessage); +} + +/// A response to a publish request. Informs the caller of how much clients received the event. +class PublishResponse { + /// The number of unique listeners to whom this event was propogated. + final int listeners; + + /// The client ID returned the server. Significant in cases where an ad-hoc client was registered. + final String clientId; + + const PublishResponse(this.listeners, this.clientId); +} diff --git a/packages/pub_sub/lib/src/protocol/server/server.dart b/packages/pub_sub/lib/src/protocol/server/server.dart new file mode 100644 index 00000000..0ee8fdb5 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/server.dart @@ -0,0 +1,161 @@ +import 'dart:async'; +import 'dart:math'; +import 'adapter.dart'; +import 'client.dart'; +import 'publish.dart'; +import 'subscription.dart'; + +/// A server that implements the `pub_sub` protocol. +/// +/// It can work using multiple [Adapter]s, to simultaneously +/// serve local and remote clients alike. +class Server { + final List _adapters = []; + final List _clients = []; + final _rnd = new Random.secure(); + final Map> _subscriptions = {}; + bool _started = false; + int _adHocIds = 0; + + /// Initialize a server, optionally with a number of [adapters]. + Server([Iterable adapters = const []]) { + _adapters.addAll(adapters ?? []); + } + + /// Adds a new [Adapter] to adapt incoming clients from a new interface. + void addAdapter(Adapter adapter) { + if (_started) + throw new StateError( + 'You cannot add new adapters after the server has started listening.'); + else { + _adapters.add(adapter); + } + } + + /// Registers a new client with the server. + void registerClient(ClientInfo client) { + if (_started) + throw new StateError( + 'You cannot register new clients after the server has started listening.'); + else { + _clients.add(client); + } + } + + /// Disposes of this server, and closes all of its adapters. + Future close() { + Future.wait(_adapters.map((a) => a.close())); + _adapters.clear(); + _clients.clear(); + _subscriptions.clear(); + return new Future.value(); + } + + String _newClientId() { + // Create an unpredictable-enough ID. The harder it is for an attacker to guess, the better. + var id = + 'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${new DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}'; + + // This client is coming from a trusted source, and can therefore both publish and subscribe. + _clients.add(new ClientInfo(id)); + return id; + } + + void start() { + if (_adapters.isEmpty) + throw new StateError( + 'Cannot start a SyncServer that has no adapters attached.'); + else if (_started) + throw new StateError('A SyncServer may only be started once.'); + + _started = true; + + for (var adapter in _adapters) { + adapter.start(); + } + + for (var adapter in _adapters) { + // Handle publishes + adapter.onPublish.listen((rq) { + ClientInfo client; + String clientId; + + if (rq.clientId?.isNotEmpty == true || + adapter.isTrustedPublishRequest(rq)) { + clientId = + rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId(); + client = + _clients.firstWhere((c) => c.id == clientId, orElse: () => null); + } + + if (client == null) { + rq.reject('Unrecognized client ID "${clientId ?? ''}".'); + } else if (!client.canPublish) { + rq.reject('You are not allowed to publish events.'); + } else { + var listeners = _subscriptions[rq.eventName] + ?.where((s) => s.clientId != clientId) ?? + []; + + if (listeners.isEmpty) { + rq.accept(new PublishResponse(0, clientId)); + } else { + for (var listener in listeners) { + listener.dispatch(rq.value); + } + + rq.accept(new PublishResponse(listeners.length, clientId)); + } + } + }); + + // Listen for incoming subscriptions + adapter.onSubscribe.listen((rq) async { + ClientInfo client; + String clientId; + + if (rq.clientId?.isNotEmpty == true || + adapter.isTrustedSubscriptionRequest(rq)) { + clientId = + rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId(); + client = + _clients.firstWhere((c) => c.id == clientId, orElse: () => null); + } + + if (client == null) { + rq.reject('Unrecognized client ID "${clientId ?? ''}".'); + } else if (!client.canSubscribe) { + rq.reject('You are not allowed to subscribe to events.'); + } else { + var sub = await rq.accept(clientId); + var list = _subscriptions.putIfAbsent(rq.eventName, () => []); + list.add(sub); + } + }); + + // Unregister subscriptions on unsubscribe + adapter.onUnsubscribe.listen((rq) { + Subscription toRemove; + List sourceList; + + for (var list in _subscriptions.values) { + toRemove = list.firstWhere((s) => s.id == rq.subscriptionId, + orElse: () => null); + if (toRemove != null) { + sourceList = list; + break; + } + } + + if (toRemove == null) { + rq.reject('The specified subscription does not exist.'); + } else if (toRemove.clientId != rq.clientId) { + rq.reject('That is not your subscription to cancel.'); + } else { + sourceList.remove(toRemove); + rq.accept(); + } + }); + } + } +} diff --git a/packages/pub_sub/lib/src/protocol/server/subscription.dart b/packages/pub_sub/lib/src/protocol/server/subscription.dart new file mode 100644 index 00000000..57efcbf5 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/subscription.dart @@ -0,0 +1,47 @@ +import 'dart:async'; + +/// Represents a request to subscribe to an event. +abstract class SubscriptionRequest { + /// The ID of the client requesting to subscribe. + String get clientId; + + /// The name of the event the client wants to subscribe to. + String get eventName; + + /// Accept the request, and grant the client access to subscribe to the event. + /// + /// Includes the client's ID, which is necessary for ad-hoc clients. + FutureOr accept(String clientId); + + /// Deny the request with an error message. + void reject(String errorMessage); +} + +/// Represents a request to unsubscribe to an event. +abstract class UnsubscriptionRequest { + /// The ID of the client requesting to unsubscribe. + String get clientId; + + /// The name of the event the client wants to unsubscribe from. + String get subscriptionId; + + /// Accept the request. + FutureOr accept(); + + /// Deny the request with an error message. + void reject(String errorMessage); +} + +/// Represents a client's subscription to an event. +/// +/// Also provides a means to fire an event. +abstract class Subscription { + /// A unique identifier for this subscription. + String get id; + + /// The ID of the client who requested this subscription. + String get clientId; + + /// Alerts a client of an event. + void dispatch(event); +} diff --git a/packages/pub_sub/lib/src/protocol/server/sync_server.dart b/packages/pub_sub/lib/src/protocol/server/sync_server.dart new file mode 100644 index 00000000..5777b272 --- /dev/null +++ b/packages/pub_sub/lib/src/protocol/server/sync_server.dart @@ -0,0 +1,5 @@ +export 'adapter.dart'; +export 'client.dart'; +export 'publish.dart'; +export 'server.dart'; +export 'subscription.dart'; diff --git a/packages/pub_sub/pubspec.yaml b/packages/pub_sub/pubspec.yaml new file mode 100644 index 00000000..f214636a --- /dev/null +++ b/packages/pub_sub/pubspec.yaml @@ -0,0 +1,15 @@ +name: pub_sub +version: 3.0.0 +description: Keep application instances in sync with a simple pub/sub API. +author: Tobe O +homepage: https://github.com/thosakwe/pub_sub +publish_to: none +environment: + sdk: ">=2.10.0 <3.0.0" +dependencies: + json_rpc_2: ^2.0.0 + stream_channel: ">=1.0.0 <3.0.0" + uuid: ^3.0.1 +dev_dependencies: + pedantic: ^1.0.0 + test: ^1.0.0 diff --git a/packages/pub_sub/test/isolate_test.dart b/packages/pub_sub/test/isolate_test.dart new file mode 100644 index 00000000..1d1c0136 --- /dev/null +++ b/packages/pub_sub/test/isolate_test.dart @@ -0,0 +1,122 @@ +import 'dart:async'; +import 'package:pub_sub/pub_sub.dart'; +import 'package:pub_sub/isolate.dart'; +import 'package:test/test.dart'; + +main() { + Server server; + Client client1, client2, client3; + IsolateClient trustedClient; + IsolateAdapter adapter; + + setUp(() async { + adapter = new IsolateAdapter(); + client1 = + new IsolateClient('isolate_test::secret', adapter.receivePort.sendPort); + client2 = new IsolateClient( + 'isolate_test::secret2', adapter.receivePort.sendPort); + client3 = new IsolateClient( + 'isolate_test::secret3', adapter.receivePort.sendPort); + trustedClient = new IsolateClient(null, adapter.receivePort.sendPort); + + server = new Server([adapter]) + ..registerClient(const ClientInfo('isolate_test::secret')) + ..registerClient(const ClientInfo('isolate_test::secret2')) + ..registerClient(const ClientInfo('isolate_test::secret3')) + ..registerClient( + const ClientInfo('isolate_test::no_publish', canPublish: false)) + ..registerClient( + const ClientInfo('isolate_test::no_subscribe', canSubscribe: false)) + ..start(); + + var sub = await client3.subscribe('foo'); + sub.listen((data) { + print('Client3 caught foo: $data'); + }); + }); + + tearDown(() { + Future.wait([ + server.close(), + client1.close(), + client2.close(), + client3.close(), + trustedClient.close() + ]); + }); + + group('trusted', () { + test('can publish', () async { + await trustedClient.publish('hey', 'bye'); + expect(trustedClient.clientId, isNotNull); + }); + test('can sub/unsub', () async { + String clientId; + await trustedClient.publish('heyaaa', 'byeaa'); + expect(clientId = trustedClient.clientId, isNotNull); + + var sub = await trustedClient.subscribe('yeppp'); + expect(trustedClient.clientId, clientId); + + await sub.unsubscribe(); + expect(trustedClient.clientId, clientId); + }); + }); + + test('subscribers receive published events', () async { + var sub = await client2.subscribe('foo'); + await client1.publish('foo', 'bar'); + expect(await sub.first, 'bar'); + }); + + test('subscribers are not sent their own events', () async { + var sub = await client1.subscribe('foo'); + await client1.publish('foo', + ''); + await sub.unsubscribe(); + expect(await sub.isEmpty, isTrue); + }); + + test('can unsubscribe', () async { + var sub = await client2.subscribe('foo'); + await client1.publish('foo', 'bar'); + await sub.unsubscribe(); + await client1.publish('foo', ''); + expect(await sub.length, 1); + }); + + group('isolate_server', () { + test('reject unknown client id', () async { + try { + var client = new IsolateClient( + 'isolate_test::invalid', adapter.receivePort.sendPort); + await client.publish('foo', 'bar'); + throw 'Invalid client ID\'s should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + + test('reject unprivileged publish', () async { + try { + var client = new IsolateClient( + 'isolate_test::no_publish', adapter.receivePort.sendPort); + await client.publish('foo', 'bar'); + throw 'Unprivileged publishes should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + + test('reject unprivileged subscribe', () async { + try { + var client = new IsolateClient( + 'isolate_test::no_subscribe', adapter.receivePort.sendPort); + await client.subscribe('foo'); + throw 'Unprivileged subscribes should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + }); +} diff --git a/packages/pub_sub/test/json_rpc_2_test.dart b/packages/pub_sub/test/json_rpc_2_test.dart new file mode 100644 index 00000000..2e959552 --- /dev/null +++ b/packages/pub_sub/test/json_rpc_2_test.dart @@ -0,0 +1,188 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'package:pub_sub/pub_sub.dart'; +import 'package:pub_sub/json_rpc_2.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +main() { + ServerSocket serverSocket; + Server server; + Client client1, client2, client3; + JsonRpc2Client trustedClient; + JsonRpc2Adapter adapter; + + setUp(() async { + serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0); + + adapter = new JsonRpc2Adapter( + serverSocket.map>(streamSocket), + isTrusted: true); + + var socket1 = + await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port); + var socket2 = + await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port); + var socket3 = + await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port); + var socket4 = + await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port); + + client1 = + new JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1)); + client2 = + new JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2)); + client3 = + new JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3)); + trustedClient = new JsonRpc2Client(null, streamSocket(socket4)); + + server = new Server([adapter]) + ..registerClient(const ClientInfo('json_rpc_2_test::secret')) + ..registerClient(const ClientInfo('json_rpc_2_test::secret2')) + ..registerClient(const ClientInfo('json_rpc_2_test::secret3')) + ..registerClient( + const ClientInfo('json_rpc_2_test::no_publish', canPublish: false)) + ..registerClient(const ClientInfo('json_rpc_2_test::no_subscribe', + canSubscribe: false)) + ..start(); + + var sub = await client3.subscribe('foo'); + sub.listen((data) { + print('Client3 caught foo: $data'); + }); + }); + + tearDown(() { + Future.wait( + [server.close(), client1.close(), client2.close(), client3.close()]); + }); + + group('trusted', () { + test('can publish', () async { + await trustedClient.publish('hey', 'bye'); + expect(trustedClient.clientId, isNotNull); + }); + test('can sub/unsub', () async { + String clientId; + await trustedClient.publish('heyaaa', 'byeaa'); + expect(clientId = trustedClient.clientId, isNotNull); + + var sub = await trustedClient.subscribe('yeppp'); + expect(trustedClient.clientId, clientId); + + await sub.unsubscribe(); + expect(trustedClient.clientId, clientId); + }); + }); + + test('subscribers receive published events', () async { + var sub = await client2.subscribe('foo'); + await client1.publish('foo', 'bar'); + expect(await sub.first, 'bar'); + }); + + test('subscribers are not sent their own events', () async { + var sub = await client1.subscribe('foo'); + await client1.publish('foo', + ''); + await sub.unsubscribe(); + expect(await sub.isEmpty, isTrue); + }); + + test('can unsubscribe', () async { + var sub = await client2.subscribe('foo'); + await client1.publish('foo', 'bar'); + await sub.unsubscribe(); + await client1.publish('foo', ''); + expect(await sub.length, 1); + }); + + group('json_rpc_2_server', () { + test('reject unknown client id', () async { + try { + var sock = await Socket.connect( + InternetAddress.loopbackIPv4, serverSocket.port); + var client = + new JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock)); + await client.publish('foo', 'bar'); + throw 'Invalid client ID\'s should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + + test('reject unprivileged publish', () async { + try { + var sock = await Socket.connect( + InternetAddress.loopbackIPv4, serverSocket.port); + var client = new JsonRpc2Client( + 'json_rpc_2_test::no_publish', streamSocket(sock)); + await client.publish('foo', 'bar'); + throw 'Unprivileged publishes should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + + test('reject unprivileged subscribe', () async { + try { + var sock = await Socket.connect( + InternetAddress.loopbackIPv4, serverSocket.port); + var client = new JsonRpc2Client( + 'json_rpc_2_test::no_subscribe', streamSocket(sock)); + await client.subscribe('foo'); + throw 'Unprivileged subscribes should throw an error, but they do not.'; + } on PubSubException catch (e) { + print('Expected exception was thrown: ${e.message}'); + } + }); + }); +} + +StreamChannel streamSocket(Socket socket) { + var channel = new _SocketStreamChannel(socket); + return channel.transform(new StreamChannelTransformer.fromCodec(utf8)); +} + +class _SocketStreamChannel extends StreamChannelMixin> { + _SocketSink _sink; + final Socket socket; + + _SocketStreamChannel(this.socket); + + @override + StreamSink> get sink => _sink ??= new _SocketSink(socket); + + @override + Stream> get stream => socket; +} + +class _SocketSink extends StreamSink> { + final Socket socket; + + _SocketSink(this.socket); + + @override + void add(List event) { + socket.add(event); + } + + @override + void addError(Object error, [StackTrace stackTrace]) { + Zone.current.errorCallback(error, stackTrace); + } + + @override + Future addStream(Stream> stream) { + return socket.addStream(stream); + } + + @override + Future close() { + return socket.close(); + } + + @override + Future get done => socket.done; +}