Added pub_sub package

This commit is contained in:
thomashii 2021-03-08 20:56:39 +08:00
parent 967a7c15b2
commit aab3772f15
26 changed files with 1758 additions and 0 deletions

13
packages/pub_sub/.gitignore vendored Normal file
View file

@ -0,0 +1,13 @@
# See https://www.dartlang.org/tools/private-files.html
# Files and directories created by pub
.packages
.pub/
build/
# If you're building an application, you may want to check-in your pubspec.lock
pubspec.lock
# Directory created by dartdoc
# If you don't generate documentation locally you can remove this line.
doc/api/
.dart_tool

View file

@ -0,0 +1 @@
language: dart

View file

@ -0,0 +1,13 @@
# 2.3.0
* Allow `2.x` versions of `stream_channel`.
* Apply `package:pedantic` lints.
# 2.2.0
* Upgrade `uuid`.
# 2.1.0
* Allow for "trusted clients," which are implicitly-registered clients.
This makes using `package:pub_sub` easier, as well making it easier to scale.
# 2.0.0
* Dart 2 updates.

21
packages/pub_sub/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

220
packages/pub_sub/README.md Normal file
View file

@ -0,0 +1,220 @@
# pub_sub
[![Pub](https://img.shields.io/pub/v/pub_sub.svg)](https://pub.dartlang.org/packages/pub_sub)
[![build status](https://travis-ci.org/thosakwe/pub_sub.svg)](https://travis-ci.org/thosakwe/pub_sub)
Keep application instances in sync with a simple pub/sub API.
# Installation
Add `pub_sub` as a dependency in your `pubspec.yaml` file:
```yaml
dependencies:
pub_sub: ^1.0.0
```
Then, be sure to run `pub get` in your terminal.
# Usage
`pub_sub` is your typical pub/sub API. However, `pub_sub` enforces authentication of every
request. It is very possible that `pub_sub` will run on both servers and in the browser,
or on a platform like Flutter. Thus, there are provisions available to limit
access.
**Be careful to not leak any `pub_sub` client ID's if operating over a network.**
If you do, you risk malicious users injecting events into your application, which
could ultimately spell *disaster*.
A `pub_sub` server can operate across multiple *adapters*, which take care of interfacing data over different
media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as
WebSockets, simultaneously.
```dart
import 'package:pub_sub/pub_sub.dart' as pub_sub;
main() async {
var server = new pub_sub.Server([
new FooAdapter(...),
new BarAdapter(...)
]);
server.addAdapter(new BazAdapter(...));
// Call `start` to activate adapters, and begin handling requests.
server.start();
}
```
### Trusted Clients
You can use `package:pub_sub` without explicitly registering
clients, *if and only if* those clients come from trusted sources.
Clients via `Isolate` are always trusted.
Clients via `package:json_rpc_2` must be explicitly marked
as trusted (i.e. using an IP whitelist mechanism):
```dart
new JsonRpc2Adapter(..., isTrusted: false);
// Pass `null` as Client ID when trusted...
new pub_sub.IsolateClient(null);
```
### Access Control
The ID's of all *untrusted* clients who will connect to the server must be known at start-up time.
You may not register new clients after the server has started. This is mostly a security consideration;
if it is impossible to register new clients, then malicious users cannot grant themselves additional
privileges within the system.
```dart
import 'package:pub_sub/pub_sub.dart' as pub_sub;
main() async {
// ...
server.registerClient(const ClientInfo('<client-id>'));
// Create a user who can subscribe, but not publish.
server.registerClient(const ClientInfo('<client-id>', canPublish: false));
// Create a user who can publish, but not subscribe.
server.registerClient(const ClientInfo('<client-id>', canSubscribe: false));
// Create a user with no privileges whatsoever.
server.registerClient(const ClientInfo('<client-id>', canPublish: false, canSubscribe: false));
server.start();
}
```
## Isolates
If you are just running multiple instances of a server,
use `package:pub_sub/isolate.dart`.
You'll need one isolate to be the master. Typically this is the first isolate you create.
```dart
import 'dart:io';
import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub;
main() async {
// Easily bring up a server.
var adapter = new pub_sub.IsolateAdapter();
var server = new pub_sub.Server([adapter]);
// You then need to create a client that will connect to the adapter.
// Each isolate in your application should contain a client.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
server.registerClient(new pub_sub.ClientInfo('client$i'));
}
// Start the server.
server.start();
// Next, let's start isolates that interact with the server.
//
// Fortunately, we can send SendPorts over Isolates, so this is no hassle.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++)
Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);
// It's possible that you're running your application in the server isolate as well:
isolateMain([0, adapter.receivePort.sendPort]);
}
void isolateMain(List args) {
var client =
new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);
// The client will connect automatically. In the meantime, we can start subscribing to events.
client.subscribe('user::logged_in').then((sub) {
// The `ClientSubscription` class extends `Stream`. Hooray for asynchrony!
sub.listen((msg) {
print('Logged in: $msg');
});
});
}
```
## JSON RPC 2.0
If you are not running on isolates, you need to import
`package:pub_sub/json_rpc_2.dart`. This library leverages `package:json_rpc_2` and
`package:stream_channel` to create clients and servers that can hypothetically run on any
medium, i.e. WebSockets, or TCP Sockets.
Check out `test/json_rpc_2_test.dart` for an example of serving `pub_sub` over TCP sockets.
# Protocol
`pub_sub` is built upon a simple RPC, and this package includes
an implementation that runs via `SendPort`s and `ReceivePort`s, as
well as one that runs on any `StreamChannel<String>`.
Data sent over the wire looks like the following:
```typescript
// Sent by a client to initiate an exchange.
interface Request {
// This is an arbitrary string, assigned by your client, but in every case,
// the client uses this to match your requests with asynchronous responses.
request_id: string,
// The ID of the client to authenticate as.
//
// As you can imagine, this should be kept secret, to prevent breaches.
client_id: string,
// Required for *every* request.
params: {
// A value to be `publish`ed.
value?: any,
// The name of an event to `publish`.
event_name?: string,
// The ID of a subscription to be cancelled.
subscription_id?: string
}
}
/// Sent by the server in response to a request.
interface Response {
// `true` for success, `false` for failures.
status: boolean,
// Only appears if `status` is `false`; explains why an operation failed.
error_message?: string,
// Matches the request_id sent by the client.
request_id: string,
result?: {
// The number of other clients to whom an event was `publish`ed.
listeners:? number,
// The ID of a created subscription.
subscription_id?: string
}
}
```
When sending via JSON_RPC 2.0, the `params` of a `Request` are simply folded into the object
itself, for simplicity's sake. In this case, a response will be sent as a notification whose
name is the `request_id`.
In the case of Isolate clients/servers, events will be simply sent as Lists:
```dart
['<event-name>', value]
```
Clients can send the following (3) methods:
* `subscribe` (`event_name`:string): Subscribe to an event.
* `unsubscribe` (`subscription_id`:string): Unsubscribe from an event you previously subscribed to.
* `publish` (`event_name`:string, `value`:any): Publish an event to all other clients who are subscribed.
The client and server in `package:pub_sub/isolate.dart` must make extra
provisions to keep track of client ID's. Since `SendPort`s and `ReceivePort`s
do not have any sort of guaranteed-unique ID's, new clients must send their
`SendPort` to the server before sending any requests. The server then responds
with an `id` that must be used to identify a `SendPort` to send a response to.

View file

@ -0,0 +1,4 @@
include: package:pedantic/analysis_options.yaml
analyzer:
strong-mode:
implicit-casts: false

View file

@ -0,0 +1,44 @@
import 'dart:io';
import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub;
main() async {
// Easily bring up a server.
var adapter = new pub_sub.IsolateAdapter();
var server = new pub_sub.Server([adapter]);
// You then need to create a client that will connect to the adapter.
// Every untrusted client in your application should be pre-registered.
//
// In the case of Isolates, however, those are always implicitly trusted.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
server.registerClient(new pub_sub.ClientInfo('client$i'));
}
// Start the server.
server.start();
// Next, let's start isolates that interact with the server.
//
// Fortunately, we can send SendPorts over Isolates, so this is no hassle.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++)
await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);
// It's possible that you're running your application in the server isolate as well:
isolateMain([0, adapter.receivePort.sendPort]);
}
void isolateMain(List args) {
// Isolates are always trusted, so technically we don't need to pass a client iD.
var client =
new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);
// The client will connect automatically. In the meantime, we can start subscribing to events.
client.subscribe('user::logged_in').then((sub) {
// The `ClientSubscription` class extends `Stream`. Hooray for asynchrony!
sub.listen((msg) {
print('Logged in: $msg');
});
});
}

View file

@ -0,0 +1,2 @@
export 'src/isolate/client.dart';
export 'src/isolate/server.dart';

View file

@ -0,0 +1,2 @@
export 'src/json_rpc/client.dart';
export 'src/json_rpc/server.dart';

View file

@ -0,0 +1 @@
export 'src/protocol/protocol.dart';

View file

@ -0,0 +1,184 @@
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';
import 'package:uuid/uuid.dart';
import '../../pub_sub.dart';
/// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateClient extends Client {
final Queue<Completer<String>> _onConnect = new Queue<Completer<String>>();
final Map<String, Completer<Map>> _requests = {};
final List<_IsolateClientSubscription> _subscriptions = [];
final Uuid _uuid = new Uuid();
String _id;
/// The ID of the client we are authenticating as.
///
/// May be `null`, if and only if we are marked as a trusted source on
/// the server side.
String get clientId => _clientId;
String _clientId;
/// A server's [SendPort] that messages should be sent to.
final SendPort serverSendPort;
/// A [ReceivePort] that receives messages from the server.
final ReceivePort receivePort = new ReceivePort();
IsolateClient(String clientId, this.serverSendPort) {
_clientId = clientId;
receivePort.listen((data) {
if (data is Map && data['request_id'] is String) {
var requestId = data['request_id'] as String;
var c = _requests.remove(requestId);
if (c != null && !c.isCompleted) {
if (data['status'] is! bool) {
c.completeError(
new FormatException('The server sent an invalid response.'));
} else if (!(data['status'] as bool)) {
c.completeError(new PubSubException(data['error_message']
?.toString() ??
'The server sent a failure response, but did not provide an error message.'));
} else if (data['result'] is! Map) {
c.completeError(new FormatException(
'The server sent a success response, but did not include a result.'));
} else {
c.complete(data['result'] as Map);
}
}
} else if (data is Map && data['id'] is String && _id == null) {
_id = data['id'] as String;
for (var c in _onConnect) {
if (!c.isCompleted) c.complete(_id);
}
_onConnect.clear();
} else if (data is List && data.length == 2 && data[0] is String) {
var eventName = data[0] as String, event = data[1];
for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
if (!s._stream.isClosed) s._stream.add(event);
}
}
});
serverSendPort.send(receivePort.sendPort);
}
Future<T> _whenConnected<T>(FutureOr<T> callback()) {
if (_id != null)
return new Future<T>.sync(callback);
else {
var c = new Completer<String>();
_onConnect.add(c);
return c.future.then<T>((_) => callback());
}
}
@override
Future publish(String eventName, value) {
return _whenConnected(() {
var c = new Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
serverSendPort.send({
'id': _id,
'request_id': requestId,
'method': 'publish',
'params': {
'client_id': clientId,
'event_name': eventName,
'value': value
}
});
return c.future.then((result) {
_clientId = result['client_id'] as String;
});
});
}
@override
Future<ClientSubscription> subscribe(String eventName) {
return _whenConnected<ClientSubscription>(() {
var c = new Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
serverSendPort.send({
'id': _id,
'request_id': requestId,
'method': 'subscribe',
'params': {'client_id': clientId, 'event_name': eventName}
});
return c.future.then<ClientSubscription>((result) {
_clientId = result['client_id'] as String;
var s = new _IsolateClientSubscription(
eventName, result['subscription_id'] as String, this);
_subscriptions.add(s);
return s;
});
});
}
@override
Future close() {
receivePort.close();
for (var c in _onConnect) {
if (!c.isCompleted) {
c.completeError(new StateError(
'The client was closed before the server ever accepted the connection.'));
}
}
for (var c in _requests.values) {
if (!c.isCompleted) {
c.completeError(new StateError(
'The client was closed before the server responded to this request.'));
}
}
for (var s in _subscriptions) s._close();
_requests.clear();
return new Future.value();
}
}
class _IsolateClientSubscription extends ClientSubscription {
final StreamController _stream = new StreamController();
final String eventName, id;
final IsolateClient client;
_IsolateClientSubscription(this.eventName, this.id, this.client);
void _close() {
if (!_stream.isClosed) _stream.close();
}
@override
StreamSubscription listen(void onData(event),
{Function onError, void onDone(), bool cancelOnError}) {
return _stream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
Future unsubscribe() {
return client._whenConnected(() {
var c = new Completer<Map>();
var requestId = client._uuid.v4();
client._requests[requestId] = c;
client.serverSendPort.send({
'id': client._id,
'request_id': requestId,
'method': 'unsubscribe',
'params': {'client_id': client.clientId, 'subscription_id': id}
});
return c.future.then((_) {
_close();
});
});
}
}

View file

@ -0,0 +1,253 @@
import 'dart:async';
import 'dart:isolate';
import 'package:uuid/uuid.dart';
import '../../pub_sub.dart';
/// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateAdapter extends Adapter {
final Map<String, SendPort> _clients = {};
final StreamController<PublishRequest> _onPublish =
new StreamController<PublishRequest>();
final StreamController<SubscriptionRequest> _onSubscribe =
new StreamController<SubscriptionRequest>();
final StreamController<UnsubscriptionRequest> _onUnsubscribe =
new StreamController<UnsubscriptionRequest>();
final Uuid _uuid = new Uuid();
/// A [ReceivePort] on which to listen for incoming data.
final ReceivePort receivePort = new ReceivePort();
@override
Stream<PublishRequest> get onPublish => _onPublish.stream;
@override
Stream<SubscriptionRequest> get onSubscribe => _onSubscribe.stream;
@override
Stream<UnsubscriptionRequest> get onUnsubscribe => _onUnsubscribe.stream;
@override
Future close() {
receivePort.close();
_clients.clear();
_onPublish.close();
_onSubscribe.close();
_onUnsubscribe.close();
return new Future.value();
}
@override
void start() {
receivePort.listen((data) {
if (data is SendPort) {
var id = _uuid.v4();
_clients[id] = data;
data.send({'status': true, 'id': id});
} else if (data is Map &&
data['id'] is String &&
data['request_id'] is String &&
data['method'] is String &&
data['params'] is Map) {
var id = data['id'] as String,
requestId = data['request_id'] as String,
method = data['method'] as String;
var params = data['params'] as Map;
var sp = _clients[id];
if (sp == null) {
// There's nobody to respond to, so don't send anything to anyone. Oops.
} else if (method == 'publish') {
if (_isValidClientId(params['client_id']) &&
params['event_name'] is String &&
params.containsKey('value')) {
var clientId = params['client_id'] as String,
eventName = params['event_name'] as String;
var value = params['value'];
var rq = new _IsolatePublishRequestImpl(
requestId, clientId, eventName, value, sp);
_onPublish.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, event_name, and value.'
});
}
} else if (method == 'subscribe') {
if (_isValidClientId(params['client_id']) &&
params['event_name'] is String) {
var clientId = params['client_id'] as String,
eventName = params['event_name'] as String;
var rq = new _IsolateSubscriptionRequestImpl(
clientId, eventName, sp, requestId, _uuid);
_onSubscribe.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, and event_name.'
});
}
} else if (method == 'unsubscribe') {
if (_isValidClientId(params['client_id']) &&
params['subscription_id'] is String) {
var clientId = params['client_id'] as String,
subscriptionId = params['subscription_id'] as String;
var rq = new _IsolateUnsubscriptionRequestImpl(
clientId, subscriptionId, sp, requestId);
_onUnsubscribe.add(rq);
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message': 'Expected client_id, and subscription_id.'
});
}
} else {
sp.send({
'status': false,
'request_id': requestId,
'error_message':
'Unrecognized method "$method". Or, you omitted id, request_id, method, or params.'
});
}
}
});
}
bool _isValidClientId(id) => id == null || id is String;
@override
bool isTrustedPublishRequest(PublishRequest request) {
// Isolate clients are considered trusted, because they are
// running in the same process as the central server.
return true;
}
@override
bool isTrustedSubscriptionRequest(SubscriptionRequest request) {
return true;
}
}
class _IsolatePublishRequestImpl extends PublishRequest {
@override
final String clientId;
@override
final String eventName;
@override
final value;
final SendPort sendPort;
final String requestId;
_IsolatePublishRequestImpl(
this.requestId, this.clientId, this.eventName, this.value, this.sendPort);
@override
void accept(PublishResponse response) {
sendPort.send({
'status': true,
'request_id': requestId,
'result': {
'listeners': response.listeners,
'client_id': response.clientId
}
});
}
@override
void reject(String errorMessage) {
sendPort.send({
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
}
class _IsolateSubscriptionRequestImpl extends SubscriptionRequest {
@override
final String clientId;
@override
final String eventName;
final SendPort sendPort;
final String requestId;
final Uuid _uuid;
_IsolateSubscriptionRequestImpl(
this.clientId, this.eventName, this.sendPort, this.requestId, this._uuid);
@override
void reject(String errorMessage) {
sendPort.send({
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
@override
FutureOr<Subscription> accept(String clientId) {
var id = _uuid.v4();
sendPort.send({
'status': true,
'request_id': requestId,
'result': {'subscription_id': id, 'client_id': clientId}
});
return new _IsolateSubscriptionImpl(clientId, id, eventName, sendPort);
}
}
class _IsolateSubscriptionImpl extends Subscription {
@override
final String clientId, id;
final String eventName;
final SendPort sendPort;
_IsolateSubscriptionImpl(
this.clientId, this.id, this.eventName, this.sendPort);
@override
void dispatch(event) {
sendPort.send([eventName, event]);
}
}
class _IsolateUnsubscriptionRequestImpl extends UnsubscriptionRequest {
@override
final String clientId;
@override
final String subscriptionId;
final SendPort sendPort;
final String requestId;
_IsolateUnsubscriptionRequestImpl(
this.clientId, this.subscriptionId, this.sendPort, this.requestId);
@override
void reject(String errorMessage) {
sendPort.send({
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
@override
void accept() {
sendPort.send({'status': true, 'request_id': requestId, 'result': {}});
}
}

View file

@ -0,0 +1,144 @@
import 'dart:async';
import 'package:stream_channel/stream_channel.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2;
import 'package:uuid/uuid.dart';
import '../../pub_sub.dart';
/// A [Client] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Client extends Client {
final Map<String, Completer<Map>> _requests = {};
final List<_JsonRpc2ClientSubscription> _subscriptions = [];
final Uuid _uuid = new Uuid();
json_rpc_2.Peer _peer;
/// The ID of the client we are authenticating as.
///
/// May be `null`, if and only if we are marked as a trusted source on
/// the server side.
String get clientId => _clientId;
String _clientId;
JsonRpc2Client(String clientId, StreamChannel<String> channel) {
_clientId = clientId;
_peer = new json_rpc_2.Peer(channel);
_peer.registerMethod('event', (json_rpc_2.Parameters params) {
var eventName = params['event_name'].asString,
event = params['value'].value;
for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
if (!s._stream.isClosed) s._stream.add(event);
}
});
_peer.registerFallback((json_rpc_2.Parameters params) {
var c = _requests.remove(params.method);
if (c == null)
throw new json_rpc_2.RpcException.methodNotFound(params.method);
else {
var data = params.asMap;
if (data['status'] is! bool) {
c.completeError(
new FormatException('The server sent an invalid response.'));
} else if (!(data['status'] as bool)) {
c.completeError(new PubSubException(data['error_message']
?.toString() ??
'The server sent a failure response, but did not provide an error message.'));
} else {
c.complete(data);
}
}
});
_peer.listen();
}
@override
Future publish(String eventName, value) {
var c = new Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
_peer.sendNotification('publish', {
'request_id': requestId,
'client_id': clientId,
'event_name': eventName,
'value': value
});
return c.future.then((data) {
_clientId = data['result']['client_id'] as String;
});
}
@override
Future<ClientSubscription> subscribe(String eventName) {
var c = new Completer<Map>();
var requestId = _uuid.v4();
_requests[requestId] = c;
_peer.sendNotification('subscribe', {
'request_id': requestId,
'client_id': clientId,
'event_name': eventName
});
return c.future.then<ClientSubscription>((result) {
_clientId = result['client_id'] as String;
var s = new _JsonRpc2ClientSubscription(
eventName, result['subscription_id'] as String, this);
_subscriptions.add(s);
return s;
});
}
@override
Future close() {
if (_peer?.isClosed != true) _peer.close();
for (var c in _requests.values) {
if (!c.isCompleted) {
c.completeError(new StateError(
'The client was closed before the server responded to this request.'));
}
}
for (var s in _subscriptions) s._close();
_requests.clear();
return new Future.value();
}
}
class _JsonRpc2ClientSubscription extends ClientSubscription {
final StreamController _stream = new StreamController();
final String eventName, id;
final JsonRpc2Client client;
_JsonRpc2ClientSubscription(this.eventName, this.id, this.client);
void _close() {
if (!_stream.isClosed) _stream.close();
}
@override
StreamSubscription listen(void onData(event),
{Function onError, void onDone(), bool cancelOnError}) {
return _stream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
Future unsubscribe() {
var c = new Completer<Map>();
var requestId = client._uuid.v4();
client._requests[requestId] = c;
client._peer.sendNotification('unsubscribe', {
'request_id': requestId,
'client_id': client.clientId,
'subscription_id': id
});
return c.future.then((_) {
_close();
});
}
}

View file

@ -0,0 +1,214 @@
import 'dart:async';
import 'package:stream_channel/stream_channel.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2;
import 'package:uuid/uuid.dart';
import '../../pub_sub.dart';
/// A [Adapter] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Adapter extends Adapter {
final StreamController<PublishRequest> _onPublish =
new StreamController<PublishRequest>();
final StreamController<SubscriptionRequest> _onSubscribe =
new StreamController<SubscriptionRequest>();
final StreamController<UnsubscriptionRequest> _onUnsubscribe =
new StreamController<UnsubscriptionRequest>();
final List<json_rpc_2.Peer> _peers = [];
final Uuid _uuid = new Uuid();
json_rpc_2.Peer _peer;
/// A [Stream] of incoming clients, who can both send and receive string data.
final Stream<StreamChannel<String>> clientStream;
/// If `true`, clients can connect through this endpoint, *without* providing a client ID.
///
/// This can be a security vulnerability if you don't know what you're doing.
/// If you *must* use this over the Internet, use an IP whitelist.
final bool isTrusted;
JsonRpc2Adapter(this.clientStream, {this.isTrusted = false});
@override
Stream<PublishRequest> get onPublish => _onPublish.stream;
@override
Stream<SubscriptionRequest> get onSubscribe => _onSubscribe.stream;
@override
Stream<UnsubscriptionRequest> get onUnsubscribe => _onUnsubscribe.stream;
@override
Future close() {
if (_peer?.isClosed != true) _peer?.close();
Future.wait(_peers.where((s) => !s.isClosed).map((s) => s.close()))
.then((_) => _peers.clear());
return new Future.value();
}
String _getClientId(json_rpc_2.Parameters params) {
try {
return params['client_id'].asString;
} catch (_) {
return null;
}
}
@override
void start() {
clientStream.listen((client) {
var peer = _peer = new json_rpc_2.Peer(client);
peer.registerMethod('publish', (json_rpc_2.Parameters params) async {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var eventName = params['event_name'].asString;
var value = params['value'].value;
var rq = new _JsonRpc2PublishRequestImpl(
requestId, clientId, eventName, value, peer);
_onPublish.add(rq);
});
peer.registerMethod('subscribe', (json_rpc_2.Parameters params) async {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var eventName = params['event_name'].asString;
var rq = new _JsonRpc2SubscriptionRequestImpl(
clientId, eventName, requestId, peer, _uuid);
_onSubscribe.add(rq);
});
peer.registerMethod('unsubscribe', (json_rpc_2.Parameters params) async {
var requestId = params['request_id'].asString;
var clientId = _getClientId(params);
var subscriptionId = params['subscription_id'].asString;
var rq = new _JsonRpc2UnsubscriptionRequestImpl(
clientId, subscriptionId, peer, requestId);
_onUnsubscribe.add(rq);
});
peer.listen();
});
}
@override
bool isTrustedPublishRequest(PublishRequest request) {
return isTrusted;
}
@override
bool isTrustedSubscriptionRequest(SubscriptionRequest request) {
return isTrusted;
}
}
class _JsonRpc2PublishRequestImpl extends PublishRequest {
final String requestId, clientId, eventName;
final value;
final json_rpc_2.Peer peer;
_JsonRpc2PublishRequestImpl(
this.requestId, this.clientId, this.eventName, this.value, this.peer);
@override
void accept(PublishResponse response) {
peer.sendNotification(requestId, {
'status': true,
'request_id': requestId,
'result': {
'listeners': response.listeners,
'client_id': response.clientId
}
});
}
@override
void reject(String errorMessage) {
peer.sendNotification(requestId, {
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
}
class _JsonRpc2SubscriptionRequestImpl extends SubscriptionRequest {
@override
final String clientId, eventName;
final String requestId;
final json_rpc_2.Peer peer;
final Uuid _uuid;
_JsonRpc2SubscriptionRequestImpl(
this.clientId, this.eventName, this.requestId, this.peer, this._uuid);
@override
FutureOr<Subscription> accept(String clientId) {
var id = _uuid.v4();
peer.sendNotification(requestId, {
'status': true,
'request_id': requestId,
'subscription_id': id,
'client_id': clientId
});
return new _JsonRpc2SubscriptionImpl(clientId, id, eventName, peer);
}
@override
void reject(String errorMessage) {
peer.sendNotification(requestId, {
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
}
class _JsonRpc2SubscriptionImpl extends Subscription {
@override
final String clientId, id;
final String eventName;
final json_rpc_2.Peer peer;
_JsonRpc2SubscriptionImpl(this.clientId, this.id, this.eventName, this.peer);
@override
void dispatch(event) {
peer.sendNotification('event', {'event_name': eventName, 'value': event});
}
}
class _JsonRpc2UnsubscriptionRequestImpl extends UnsubscriptionRequest {
@override
final String clientId;
@override
final String subscriptionId;
final json_rpc_2.Peer peer;
final String requestId;
_JsonRpc2UnsubscriptionRequestImpl(
this.clientId, this.subscriptionId, this.peer, this.requestId);
@override
void accept() {
peer.sendNotification(requestId, {'status': true, 'result': {}});
}
@override
void reject(String errorMessage) {
peer.sendNotification(requestId, {
'status': false,
'request_id': requestId,
'error_message': errorMessage
});
}
}

View file

@ -0,0 +1,30 @@
import 'dart:async';
/// Queries a `pub_sub` server.
abstract class Client {
/// Publishes an event to the server.
Future publish(String eventName, value);
/// Request a [ClientSubscription] to the desired [eventName] from the server.
Future<ClientSubscription> subscribe(String eventName);
/// Disposes of this client.
Future close();
}
/// A client-side implementation of a subscription, which acts as a [Stream], and can be cancelled easily.
abstract class ClientSubscription extends Stream {
/// Stops listening for new events, and instructs the server to cancel the subscription.
Future unsubscribe();
}
/// Thrown as the result of an invalid request, or an attempt to perform an action without the correct privileges.
class PubSubException implements Exception {
/// The error message sent by the server.
final String message;
const PubSubException(this.message);
@override
String toString() => '`pub_sub` exception: $message';
}

View file

@ -0,0 +1 @@
export 'client.dart';

View file

@ -0,0 +1,2 @@
export 'client/sync_client.dart';
export 'server/sync_server.dart';

View file

@ -0,0 +1,29 @@
import 'dart:async';
import 'publish.dart';
import 'subscription.dart';
/// Adapts an abstract medium to serve the `pub_sub` RPC protocol.
abstract class Adapter {
/// Determines if a given [request] comes from a trusted source.
///
/// If so, the request does not have to provide a pre-established ID,
/// and instead will be assigned one.
bool isTrustedPublishRequest(PublishRequest request);
bool isTrustedSubscriptionRequest(SubscriptionRequest request);
/// Fires an event whenever a client tries to publish data.
Stream<PublishRequest> get onPublish;
/// Fires whenever a client tries to subscribe to an event.
Stream<SubscriptionRequest> get onSubscribe;
/// Fires whenever a client cancels a subscription.
Stream<UnsubscriptionRequest> get onUnsubscribe;
/// Disposes of this adapter.
Future close();
/// Start listening for incoming clients.
void start();
}

View file

@ -0,0 +1,14 @@
/// Represents information about a client that will be accessing
/// this `angel_sync` server.
class ClientInfo {
/// A unique identifier for this client.
final String id;
/// If `true` (default), then the client is allowed to publish events.
final bool canPublish;
/// If `true` (default), then the client can subscribe to events.
final bool canSubscribe;
const ClientInfo(this.id, {this.canPublish = true, this.canSubscribe = true});
}

View file

@ -0,0 +1,28 @@
/// Represents a request to publish information to other clients.
abstract class PublishRequest {
/// The ID of the client sending this request.
String get clientId;
/// The name of the event to be sent.
String get eventName;
/// The value to be published as an event.
dynamic get value;
/// Accept the request, with a response.
void accept(PublishResponse response);
/// Deny the request with an error message.
void reject(String errorMessage);
}
/// A response to a publish request. Informs the caller of how much clients received the event.
class PublishResponse {
/// The number of unique listeners to whom this event was propogated.
final int listeners;
/// The client ID returned the server. Significant in cases where an ad-hoc client was registered.
final String clientId;
const PublishResponse(this.listeners, this.clientId);
}

View file

@ -0,0 +1,161 @@
import 'dart:async';
import 'dart:math';
import 'adapter.dart';
import 'client.dart';
import 'publish.dart';
import 'subscription.dart';
/// A server that implements the `pub_sub` protocol.
///
/// It can work using multiple [Adapter]s, to simultaneously
/// serve local and remote clients alike.
class Server {
final List<Adapter> _adapters = [];
final List<ClientInfo> _clients = [];
final _rnd = new Random.secure();
final Map<String, List<Subscription>> _subscriptions = {};
bool _started = false;
int _adHocIds = 0;
/// Initialize a server, optionally with a number of [adapters].
Server([Iterable<Adapter> adapters = const []]) {
_adapters.addAll(adapters ?? []);
}
/// Adds a new [Adapter] to adapt incoming clients from a new interface.
void addAdapter(Adapter adapter) {
if (_started)
throw new StateError(
'You cannot add new adapters after the server has started listening.');
else {
_adapters.add(adapter);
}
}
/// Registers a new client with the server.
void registerClient(ClientInfo client) {
if (_started)
throw new StateError(
'You cannot register new clients after the server has started listening.');
else {
_clients.add(client);
}
}
/// Disposes of this server, and closes all of its adapters.
Future close() {
Future.wait(_adapters.map((a) => a.close()));
_adapters.clear();
_clients.clear();
_subscriptions.clear();
return new Future.value();
}
String _newClientId() {
// Create an unpredictable-enough ID. The harder it is for an attacker to guess, the better.
var id =
'pub_sub::adhoc_client${_rnd.nextDouble()}::${_adHocIds++}:${new DateTime.now().millisecondsSinceEpoch * _rnd.nextDouble()}';
// This client is coming from a trusted source, and can therefore both publish and subscribe.
_clients.add(new ClientInfo(id));
return id;
}
void start() {
if (_adapters.isEmpty)
throw new StateError(
'Cannot start a SyncServer that has no adapters attached.');
else if (_started)
throw new StateError('A SyncServer may only be started once.');
_started = true;
for (var adapter in _adapters) {
adapter.start();
}
for (var adapter in _adapters) {
// Handle publishes
adapter.onPublish.listen((rq) {
ClientInfo client;
String clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedPublishRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client =
_clients.firstWhere((c) => c.id == clientId, orElse: () => null);
}
if (client == null) {
rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
} else if (!client.canPublish) {
rq.reject('You are not allowed to publish events.');
} else {
var listeners = _subscriptions[rq.eventName]
?.where((s) => s.clientId != clientId) ??
[];
if (listeners.isEmpty) {
rq.accept(new PublishResponse(0, clientId));
} else {
for (var listener in listeners) {
listener.dispatch(rq.value);
}
rq.accept(new PublishResponse(listeners.length, clientId));
}
}
});
// Listen for incoming subscriptions
adapter.onSubscribe.listen((rq) async {
ClientInfo client;
String clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedSubscriptionRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client =
_clients.firstWhere((c) => c.id == clientId, orElse: () => null);
}
if (client == null) {
rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
} else if (!client.canSubscribe) {
rq.reject('You are not allowed to subscribe to events.');
} else {
var sub = await rq.accept(clientId);
var list = _subscriptions.putIfAbsent(rq.eventName, () => []);
list.add(sub);
}
});
// Unregister subscriptions on unsubscribe
adapter.onUnsubscribe.listen((rq) {
Subscription toRemove;
List<Subscription> sourceList;
for (var list in _subscriptions.values) {
toRemove = list.firstWhere((s) => s.id == rq.subscriptionId,
orElse: () => null);
if (toRemove != null) {
sourceList = list;
break;
}
}
if (toRemove == null) {
rq.reject('The specified subscription does not exist.');
} else if (toRemove.clientId != rq.clientId) {
rq.reject('That is not your subscription to cancel.');
} else {
sourceList.remove(toRemove);
rq.accept();
}
});
}
}
}

View file

@ -0,0 +1,47 @@
import 'dart:async';
/// Represents a request to subscribe to an event.
abstract class SubscriptionRequest {
/// The ID of the client requesting to subscribe.
String get clientId;
/// The name of the event the client wants to subscribe to.
String get eventName;
/// Accept the request, and grant the client access to subscribe to the event.
///
/// Includes the client's ID, which is necessary for ad-hoc clients.
FutureOr<Subscription> accept(String clientId);
/// Deny the request with an error message.
void reject(String errorMessage);
}
/// Represents a request to unsubscribe to an event.
abstract class UnsubscriptionRequest {
/// The ID of the client requesting to unsubscribe.
String get clientId;
/// The name of the event the client wants to unsubscribe from.
String get subscriptionId;
/// Accept the request.
FutureOr<void> accept();
/// Deny the request with an error message.
void reject(String errorMessage);
}
/// Represents a client's subscription to an event.
///
/// Also provides a means to fire an event.
abstract class Subscription {
/// A unique identifier for this subscription.
String get id;
/// The ID of the client who requested this subscription.
String get clientId;
/// Alerts a client of an event.
void dispatch(event);
}

View file

@ -0,0 +1,5 @@
export 'adapter.dart';
export 'client.dart';
export 'publish.dart';
export 'server.dart';
export 'subscription.dart';

View file

@ -0,0 +1,15 @@
name: pub_sub
version: 3.0.0
description: Keep application instances in sync with a simple pub/sub API.
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/thosakwe/pub_sub
publish_to: none
environment:
sdk: ">=2.10.0 <3.0.0"
dependencies:
json_rpc_2: ^2.0.0
stream_channel: ">=1.0.0 <3.0.0"
uuid: ^3.0.1
dev_dependencies:
pedantic: ^1.0.0
test: ^1.0.0

View file

@ -0,0 +1,122 @@
import 'dart:async';
import 'package:pub_sub/pub_sub.dart';
import 'package:pub_sub/isolate.dart';
import 'package:test/test.dart';
main() {
Server server;
Client client1, client2, client3;
IsolateClient trustedClient;
IsolateAdapter adapter;
setUp(() async {
adapter = new IsolateAdapter();
client1 =
new IsolateClient('isolate_test::secret', adapter.receivePort.sendPort);
client2 = new IsolateClient(
'isolate_test::secret2', adapter.receivePort.sendPort);
client3 = new IsolateClient(
'isolate_test::secret3', adapter.receivePort.sendPort);
trustedClient = new IsolateClient(null, adapter.receivePort.sendPort);
server = new Server([adapter])
..registerClient(const ClientInfo('isolate_test::secret'))
..registerClient(const ClientInfo('isolate_test::secret2'))
..registerClient(const ClientInfo('isolate_test::secret3'))
..registerClient(
const ClientInfo('isolate_test::no_publish', canPublish: false))
..registerClient(
const ClientInfo('isolate_test::no_subscribe', canSubscribe: false))
..start();
var sub = await client3.subscribe('foo');
sub.listen((data) {
print('Client3 caught foo: $data');
});
});
tearDown(() {
Future.wait([
server.close(),
client1.close(),
client2.close(),
client3.close(),
trustedClient.close()
]);
});
group('trusted', () {
test('can publish', () async {
await trustedClient.publish('hey', 'bye');
expect(trustedClient.clientId, isNotNull);
});
test('can sub/unsub', () async {
String clientId;
await trustedClient.publish('heyaaa', 'byeaa');
expect(clientId = trustedClient.clientId, isNotNull);
var sub = await trustedClient.subscribe('yeppp');
expect(trustedClient.clientId, clientId);
await sub.unsubscribe();
expect(trustedClient.clientId, clientId);
});
});
test('subscribers receive published events', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
expect(await sub.first, 'bar');
});
test('subscribers are not sent their own events', () async {
var sub = await client1.subscribe('foo');
await client1.publish('foo',
'<this should never be sent to client1, because client1 sent it.>');
await sub.unsubscribe();
expect(await sub.isEmpty, isTrue);
});
test('can unsubscribe', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
await sub.unsubscribe();
await client1.publish('foo', '<client2 will not catch this!>');
expect(await sub.length, 1);
});
group('isolate_server', () {
test('reject unknown client id', () async {
try {
var client = new IsolateClient(
'isolate_test::invalid', adapter.receivePort.sendPort);
await client.publish('foo', 'bar');
throw 'Invalid client ID\'s should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged publish', () async {
try {
var client = new IsolateClient(
'isolate_test::no_publish', adapter.receivePort.sendPort);
await client.publish('foo', 'bar');
throw 'Unprivileged publishes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged subscribe', () async {
try {
var client = new IsolateClient(
'isolate_test::no_subscribe', adapter.receivePort.sendPort);
await client.subscribe('foo');
throw 'Unprivileged subscribes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
});
}

View file

@ -0,0 +1,188 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pub_sub/pub_sub.dart';
import 'package:pub_sub/json_rpc_2.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
main() {
ServerSocket serverSocket;
Server server;
Client client1, client2, client3;
JsonRpc2Client trustedClient;
JsonRpc2Adapter adapter;
setUp(() async {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);
adapter = new JsonRpc2Adapter(
serverSocket.map<StreamChannel<String>>(streamSocket),
isTrusted: true);
var socket1 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket2 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket3 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket4 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
client1 =
new JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1));
client2 =
new JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2));
client3 =
new JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3));
trustedClient = new JsonRpc2Client(null, streamSocket(socket4));
server = new Server([adapter])
..registerClient(const ClientInfo('json_rpc_2_test::secret'))
..registerClient(const ClientInfo('json_rpc_2_test::secret2'))
..registerClient(const ClientInfo('json_rpc_2_test::secret3'))
..registerClient(
const ClientInfo('json_rpc_2_test::no_publish', canPublish: false))
..registerClient(const ClientInfo('json_rpc_2_test::no_subscribe',
canSubscribe: false))
..start();
var sub = await client3.subscribe('foo');
sub.listen((data) {
print('Client3 caught foo: $data');
});
});
tearDown(() {
Future.wait(
[server.close(), client1.close(), client2.close(), client3.close()]);
});
group('trusted', () {
test('can publish', () async {
await trustedClient.publish('hey', 'bye');
expect(trustedClient.clientId, isNotNull);
});
test('can sub/unsub', () async {
String clientId;
await trustedClient.publish('heyaaa', 'byeaa');
expect(clientId = trustedClient.clientId, isNotNull);
var sub = await trustedClient.subscribe('yeppp');
expect(trustedClient.clientId, clientId);
await sub.unsubscribe();
expect(trustedClient.clientId, clientId);
});
});
test('subscribers receive published events', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
expect(await sub.first, 'bar');
});
test('subscribers are not sent their own events', () async {
var sub = await client1.subscribe('foo');
await client1.publish('foo',
'<this should never be sent to client1, because client1 sent it.>');
await sub.unsubscribe();
expect(await sub.isEmpty, isTrue);
});
test('can unsubscribe', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
await sub.unsubscribe();
await client1.publish('foo', '<client2 will not catch this!>');
expect(await sub.length, 1);
});
group('json_rpc_2_server', () {
test('reject unknown client id', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client =
new JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock));
await client.publish('foo', 'bar');
throw 'Invalid client ID\'s should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged publish', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client = new JsonRpc2Client(
'json_rpc_2_test::no_publish', streamSocket(sock));
await client.publish('foo', 'bar');
throw 'Unprivileged publishes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged subscribe', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client = new JsonRpc2Client(
'json_rpc_2_test::no_subscribe', streamSocket(sock));
await client.subscribe('foo');
throw 'Unprivileged subscribes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
});
}
StreamChannel<String> streamSocket(Socket socket) {
var channel = new _SocketStreamChannel(socket);
return channel.transform(new StreamChannelTransformer.fromCodec(utf8));
}
class _SocketStreamChannel extends StreamChannelMixin<List<int>> {
_SocketSink _sink;
final Socket socket;
_SocketStreamChannel(this.socket);
@override
StreamSink<List<int>> get sink => _sink ??= new _SocketSink(socket);
@override
Stream<List<int>> get stream => socket;
}
class _SocketSink extends StreamSink<List<int>> {
final Socket socket;
_SocketSink(this.socket);
@override
void add(List<int> event) {
socket.add(event);
}
@override
void addError(Object error, [StackTrace stackTrace]) {
Zone.current.errorCallback(error, stackTrace);
}
@override
Future addStream(Stream<List<int>> stream) {
return socket.addStream(stream);
}
@override
Future close() {
return socket.close();
}
@override
Future get done => socket.done;
}