Publish pub_sub

This commit is contained in:
thomashii 2021-05-15 19:20:33 +08:00
parent 048acafa31
commit 0b2dd96a75
10 changed files with 55 additions and 53 deletions

View file

@ -1,50 +1,53 @@
# pub_sub # angel3_pub_sub
[![Pub](https://img.shields.io/pub/v/pub_sub.svg)](https://pub.dartlang.org/packages/pub_sub) [![version](https://img.shields.io/badge/pub-v4.0.1-brightgreen)](https://pub.dartlang.org/packages/angel3_pub_sub)
[![build status](https://travis-ci.org/thosakwe/pub_sub.svg)](https://travis-ci.org/thosakwe/pub_sub) [![Null Safety](https://img.shields.io/badge/null-safety-brightgreen)](https://dart.dev/null-safety)
[![Gitter](https://img.shields.io/gitter/room/angel_dart/discussion)](https://gitter.im/angel_dart/discussion)
[![License](https://img.shields.io/github/license/dukefirehawk/angel)](https://github.com/dukefirehawk/angel/tree/angel3/packages/pub_sub/LICENSE)
Keep application instances in sync with a simple pub/sub API. Keep application instances in sync with a simple pub/sub API.
# Installation # Installation
Add `pub_sub` as a dependency in your `pubspec.yaml` file: Add `angel3_pub_sub` as a dependency in your `pubspec.yaml` file:
```yaml ```yaml
dependencies: dependencies:
pub_sub: ^1.0.0 angel3_pub_sub: ^3.0.0
``` ```
Then, be sure to run `pub get` in your terminal. Then, be sure to run `pub get` in your terminal.
# Usage # Usage
`pub_sub` is your typical pub/sub API. However, `pub_sub` enforces authentication of every `pub_sub` is your typical pub/sub API. However, `angel3_pub_sub` enforces authentication of every
request. It is very possible that `pub_sub` will run on both servers and in the browser, request. It is very possible that `angel3_pub_sub` will run on both servers and in the browser,
or on a platform like Flutter. Thus, there are provisions available to limit or on a platform angel3_pub_sublike Flutter. Thus, there are provisions available to limit
access. access.
**Be careful to not leak any `pub_sub` client ID's if operating over a network.** **Be careful to not leak any `angel3_pub_sub` client ID's if operating over a network.**
If you do, you risk malicious users injecting events into your application, which If you do, you risk malicious users injecting events into your application, which
could ultimately spell *disaster*. could ultimately spell *disaster*.
A `pub_sub` server can operate across multiple *adapters*, which take care of interfacing data over different A `angel3_pub_sub` server can operate across multiple *adapters*, which take care of interfacing data over different
media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as
WebSockets, simultaneously. WebSockets, simultaneously.
```dart ```dart
import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:angel3_pub_sub/angel3_pub_sub.dart' as pub_sub;
main() async { main() async {
var server = new pub_sub.Server([ var server = pub_sub.Server([
new FooAdapter(...), FooAdapter(...),
new BarAdapter(...) BarAdapter(...)
]); ]);
server.addAdapter(new BazAdapter(...)); server.addAdapter( BazAdapter(...));
// Call `start` to activate adapters, and begin handling requests. // Call `start` to activate adapters, and begin handling requests.
server.start(); server.start();
} }
``` ```
### Trusted Clients ### Trusted Clients
You can use `package:pub_sub` without explicitly registering You can use `package:angel3_pub_sub` without explicitly registering
clients, *if and only if* those clients come from trusted sources. clients, *if and only if* those clients come from trusted sources.
Clients via `Isolate` are always trusted. Clients via `Isolate` are always trusted.
@ -53,10 +56,10 @@ Clients via `package:json_rpc_2` must be explicitly marked
as trusted (i.e. using an IP whitelist mechanism): as trusted (i.e. using an IP whitelist mechanism):
```dart ```dart
new JsonRpc2Adapter(..., isTrusted: false); JsonRpc2Adapter(..., isTrusted: false);
// Pass `null` as Client ID when trusted... // Pass `null` as Client ID when trusted...
new pub_sub.IsolateClient(null); pub_sub.IsolateClient(null);
``` ```
### Access Control ### Access Control
@ -66,7 +69,7 @@ if it is impossible to register new clients, then malicious users cannot grant t
privileges within the system. privileges within the system.
```dart ```dart
import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:angel3_pub_sub/angel3_pub_sub.dart' as pub_sub;
main() async { main() async {
// ... // ...
@ -87,25 +90,25 @@ main() async {
## Isolates ## Isolates
If you are just running multiple instances of a server, If you are just running multiple instances of a server,
use `package:pub_sub/isolate.dart`. use `package:angel3_pub_sub/isolate.dart`.
You'll need one isolate to be the master. Typically this is the first isolate you create. You'll need one isolate to be the master. Typically this is the first isolate you create.
```dart ```dart
import 'dart:io'; import 'dart:io';
import 'dart:isolate'; import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub; import 'package:angel3_pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:angel3_pub_sub/angel3_pub_sub.dart' as pub_sub;
main() async { void main() async {
// Easily bring up a server. // Easily bring up a server.
var adapter = new pub_sub.IsolateAdapter(); var adapter = pub_sub.IsolateAdapter();
var server = new pub_sub.Server([adapter]); var server = pub_sub.Server([adapter]);
// You then need to create a client that will connect to the adapter. // You then need to create a client that will connect to the adapter.
// Each isolate in your application should contain a client. // Each isolate in your application should contain a client.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) { for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
server.registerClient(new pub_sub.ClientInfo('client$i')); server.registerClient(pub_sub.ClientInfo('client$i'));
} }
// Start the server. // Start the server.
@ -123,7 +126,7 @@ main() async {
void isolateMain(List args) { void isolateMain(List args) {
var client = var client =
new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);
// The client will connect automatically. In the meantime, we can start subscribing to events. // The client will connect automatically. In the meantime, we can start subscribing to events.
client.subscribe('user::logged_in').then((sub) { client.subscribe('user::logged_in').then((sub) {
@ -138,14 +141,14 @@ void isolateMain(List args) {
## JSON RPC 2.0 ## JSON RPC 2.0
If you are not running on isolates, you need to import If you are not running on isolates, you need to import
`package:pub_sub/json_rpc_2.dart`. This library leverages `package:json_rpc_2` and `package:angel3_pub_sub/json_rpc_2.dart`. This library leverages `package:json_rpc_2` and
`package:stream_channel` to create clients and servers that can hypothetically run on any `package:stream_channel` to create clients and servers that can hypothetically run on any
medium, i.e. WebSockets, or TCP Sockets. medium, i.e. WebSockets, or TCP Sockets.
Check out `test/json_rpc_2_test.dart` for an example of serving `pub_sub` over TCP sockets. Check out `test/json_rpc_2_test.dart` for an example of serving `angel3_pub_sub` over TCP sockets.
# Protocol # Protocol
`pub_sub` is built upon a simple RPC, and this package includes `angel3_pub_sub` is built upon a simple RPC, and this package includes
an implementation that runs via `SendPort`s and `ReceivePort`s, as an implementation that runs via `SendPort`s and `ReceivePort`s, as
well as one that runs on any `StreamChannel<String>`. well as one that runs on any `StreamChannel<String>`.
@ -213,7 +216,7 @@ Clients can send the following (3) methods:
* `unsubscribe` (`subscription_id`:string): Unsubscribe from an event you previously subscribed to. * `unsubscribe` (`subscription_id`:string): Unsubscribe from an event you previously subscribed to.
* `publish` (`event_name`:string, `value`:any): Publish an event to all other clients who are subscribed. * `publish` (`event_name`:string, `value`:any): Publish an event to all other clients who are subscribed.
The client and server in `package:pub_sub/isolate.dart` must make extra The client and server in `package:angel3_pub_sub/isolate.dart` must make extra
provisions to keep track of client ID's. Since `SendPort`s and `ReceivePort`s provisions to keep track of client ID's. Since `SendPort`s and `ReceivePort`s
do not have any sort of guaranteed-unique ID's, new clients must send their do not have any sort of guaranteed-unique ID's, new clients must send their
`SendPort` to the server before sending any requests. The server then responds `SendPort` to the server before sending any requests. The server then responds

View file

@ -1,19 +1,19 @@
import 'dart:io'; import 'dart:io';
import 'dart:isolate'; import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub; import 'package:angel3_pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:angel3_pub_sub/angel3_pub_sub.dart' as pub_sub;
main() async { void main() async {
// Easily bring up a server. // Easily bring up a server.
var adapter = new pub_sub.IsolateAdapter(); var adapter = pub_sub.IsolateAdapter();
var server = new pub_sub.Server([adapter]); var server = pub_sub.Server([adapter]);
// You then need to create a client that will connect to the adapter. // You then need to create a client that will connect to the adapter.
// Every untrusted client in your application should be pre-registered. // Every untrusted client in your application should be pre-registered.
// //
// In the case of Isolates, however, those are always implicitly trusted. // In the case of Isolates, however, those are always implicitly trusted.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) { for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
server.registerClient(new pub_sub.ClientInfo('client$i')); server.registerClient(pub_sub.ClientInfo('client$i'));
} }
// Start the server. // Start the server.
@ -22,8 +22,9 @@ main() async {
// Next, let's start isolates that interact with the server. // Next, let's start isolates that interact with the server.
// //
// Fortunately, we can send SendPorts over Isolates, so this is no hassle. // Fortunately, we can send SendPorts over Isolates, so this is no hassle.
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) for (var i = 0; i < Platform.numberOfProcessors - 1; i++) {
await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);
}
// It's possible that you're running your application in the server isolate as well: // It's possible that you're running your application in the server isolate as well:
isolateMain([0, adapter.receivePort.sendPort]); isolateMain([0, adapter.receivePort.sendPort]);
@ -31,8 +32,7 @@ main() async {
void isolateMain(List args) { void isolateMain(List args) {
// Isolates are always trusted, so technically we don't need to pass a client iD. // Isolates are always trusted, so technically we don't need to pass a client iD.
var client = var client = pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);
new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);
// The client will connect automatically. In the meantime, we can start subscribing to events. // The client will connect automatically. In the meantime, we can start subscribing to events.
client.subscribe('user::logged_in').then((sub) { client.subscribe('user::logged_in').then((sub) {

View file

@ -2,7 +2,7 @@ import 'dart:async';
import 'dart:collection'; import 'dart:collection';
import 'dart:isolate'; import 'dart:isolate';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../pub_sub.dart'; import '../../angel3_pub_sub.dart';
/// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s. /// A [Client] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateClient extends Client { class IsolateClient extends Client {

View file

@ -1,7 +1,7 @@
import 'dart:async'; import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../pub_sub.dart'; import '../../angel3_pub_sub.dart';
/// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s. /// A [Adapter] implementation that communicates via [SendPort]s and [ReceivePort]s.
class IsolateAdapter extends Adapter { class IsolateAdapter extends Adapter {

View file

@ -2,7 +2,7 @@ import 'dart:async';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2; import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2;
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../pub_sub.dart'; import '../../angel3_pub_sub.dart';
/// A [Client] implementation that communicates via JSON RPC 2.0. /// A [Client] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Client extends Client { class JsonRpc2Client extends Client {

View file

@ -2,7 +2,7 @@ import 'dart:async';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2; import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2;
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../pub_sub.dart'; import '../../angel3_pub_sub.dart';
/// A [Adapter] implementation that communicates via JSON RPC 2.0. /// A [Adapter] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Adapter extends Adapter { class JsonRpc2Adapter extends Adapter {

View file

@ -1,15 +1,14 @@
name: pub_sub name: angel3_pub_sub
version: 3.0.0 version: 3.0.0
description: Keep application instances in sync with a simple pub/sub API. description: Keep application instances in sync with a simple pub/sub API.
homepage: https://github.com/thosakwe/pub_sub homepage: https://github.com/dukefirehawk/angel/tree/angel3/packages/pub_sub
publish_to: none
environment: environment:
sdk: '>=2.12.0 <3.0.0' sdk: '>=2.12.0 <3.0.0'
dependencies: dependencies:
json_rpc_2: ^3.0.0 json_rpc_2: ^3.0.0
stream_channel: ^2.1.0 stream_channel: ^2.1.0
uuid: ^3.0.4 uuid: ^3.0.4
collection: ^1.15.0-nullsafety.4 collection: ^1.15.0
dev_dependencies: dev_dependencies:
pedantic: ^1.11.0 pedantic: ^1.11.0
test: ^1.17.3 test: ^1.17.4

View file

@ -1,6 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'package:pub_sub/pub_sub.dart'; import 'package:angel3_pub_sub/angel3_pub_sub.dart';
import 'package:pub_sub/isolate.dart'; import 'package:angel3_pub_sub/isolate.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';
void main() { void main() {

View file

@ -1,8 +1,8 @@
import 'dart:async'; import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'package:pub_sub/pub_sub.dart'; import 'package:angel3_pub_sub/angel3_pub_sub.dart';
import 'package:pub_sub/json_rpc_2.dart'; import 'package:angel3_pub_sub/json_rpc_2.dart';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart'; import 'package:test/test.dart';