# Belatuk Pub Sub ![Pub Version (including pre-releases)](https://img.shields.io/pub/v/platform_pub_sub?include_prereleases) [![Null Safety](https://img.shields.io/badge/null-safety-brightgreen)](https://dart.dev/null-safety) [![License](https://img.shields.io/github/license/dart-backend/belatuk-common-utilities)](https://github.com/dart-backend/belatuk-common-utilities/blob/main/packages/pub_sub/LICENSE) **Replacement of `package:pub_sub` with breaking changes to support NNBD.** Keep application instances in sync with a simple pub/sub API. ## Installation Add `platform_pub_sub` as a dependency in your `pubspec.yaml` file: ```yaml dependencies: platform_pub_sub: ^6.2.0 ``` Then, be sure to run `dart pub get` in your terminal. ## Usage `platform_pub_sub` is your typical pub/sub API. However, `platform_pub_sub` enforces authentication of every request. It is very possible that `platform_pub_sub` will run on both server and in the browser, or on a platform like Flutter. **Be careful to not leak any `platform_pub_sub` client ID's if operating over a network.** If you do, you run the risk of malicious users injecting events into your application. A `platform_pub_sub` server can operate across multiple *adapters*, which take care of interfacing data over different media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as WebSockets, simultaneously. ```dart import 'package:platform_pub_sub/platform_pub_sub.dart' as pub_sub; main() async { var server = pub_sub.Server([ FooAdapter(...), BarAdapter(...) ]); server.addAdapter( BazAdapter(...)); // Call `start` to activate adapters, and begin handling requests. server.start(); } ``` ### Trusted Clients You can use `package:platform_pub_sub` without explicitly registering clients, *if and only if* those clients come from trusted sources. Clients via `Isolate` are always trusted. Clients via `package:json_rpc_2` must be explicitly marked as trusted (i.e. using an IP whitelist mechanism): ```dart JsonRpc2Adapter(..., isTrusted: false); // Pass `null` as Client ID when trusted... pub_sub.IsolateClient(null); ``` ### Access Control The ID's of all *untrusted* clients who will connect to the server must be known at start-up time. You may not register new clients after the server has started. This is mostly a security consideration, to make it impossible to register new clients, thus preventing malicious users from granting themselves additional privileges within the system. ```dart import 'package:platform_pub_sub/platform_pub_sub.dart' as pub_sub; void main() async { // ... server.registerClient(const ClientInfo('')); // Create a user who can subscribe, but not publish. server.registerClient(const ClientInfo('', canPublish: false)); // Create a user who can publish, but not subscribe. server.registerClient(const ClientInfo('', canSubscribe: false)); // Create a user with no privileges whatsoever. server.registerClient(const ClientInfo('', canPublish: false, canSubscribe: false)); server.start(); } ``` ### Isolates If you are just running multiple instances of a server, use `package:platform_pub_sub/isolate.dart`. You'll need one isolate to be the master. Typically this is the first isolate you create. ```dart import 'dart:io'; import 'dart:isolate'; import 'package:platform_pub_sub/isolate.dart' as pub_sub; import 'package:platform_pub_sub/platform_pub_sub.dart' as pub_sub; void main() async { // Easily bring up a server. var adapter = pub_sub.IsolateAdapter(); var server = pub_sub.Server([adapter]); // You then need to create a client that will connect to the adapter. // Each isolate in your application should contain a client. for (int i = 0; i < Platform.numberOfProcessors - 1; i++) { server.registerClient(pub_sub.ClientInfo('client$i')); } // Start the server. server.start(); // Next, let's start isolates that interact with the server. // // Fortunately, we can send SendPorts over Isolates, so this is no hassle. for (int i = 0; i < Platform.numberOfProcessors - 1; i++) Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]); // It's possible that you're running your application in the server isolate as well: isolateMain([0, adapter.receivePort.sendPort]); } void isolateMain(List args) { var client = pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort); // The client will connect automatically. In the meantime, we can start subscribing to events. client.subscribe('user::logged_in').then((sub) { // The `ClientSubscription` class extends `Stream`. Hooray for asynchrony! sub.listen((msg) { print('Logged in: $msg'); }); }); } ``` ### JSON RPC 2.0 If you are not running on isolates, you need to import `package:platform_pub_sub/json_rpc_2.dart`. This library leverages `package:json_rpc_2` and `package:stream_channel` to create clients and servers that can hypothetically run on any medium, i.e. WebSockets, or TCP Sockets. Check out `test/json_rpc_2_test.dart` for an example of serving `platform_pub_sub` over TCP sockets. ## Protocol `platform_pub_sub` is built upon a simple RPC, and this package includes an implementation that runs via `SendPort`s and `ReceivePort`s, as well as one that runs on any `StreamChannel`. Data sent over the wire looks like the following: ```typescript // Sent by a client to initiate an exchange. interface Request { // This is an arbitrary string, assigned by your client, but in every case, // the client uses this to match your requests with asynchronous responses. request_id: string, // The ID of the client to authenticate as. // // As you can imagine, this should be kept secret, to prevent breaches. client_id: string, // Required for *every* request. params: { // A value to be `publish`ed. value?: any, // The name of an event to `publish`. event_name?: string, // The ID of a subscription to be cancelled. subscription_id?: string } } /// Sent by the server in response to a request. interface Response { // `true` for success, `false` for failures. status: boolean, // Only appears if `status` is `false`; explains why an operation failed. error_message?: string, // Matches the request_id sent by the client. request_id: string, result?: { // The number of other clients to whom an event was `publish`ed. listeners:? number, // The ID of a created subscription. subscription_id?: string } } ``` When sending via JSON_RPC 2.0, the `params` of a `Request` are simply folded into the object itself, for simplicity's sake. In this case, a response will be sent as a notification whose name is the `request_id`. In the case of Isolate clients/servers, events will be simply sent as Lists: ```dart ['', value] ``` Clients can send with the following 3 methods: * `subscribe` (`event_name`:string): Subscribe to an event. * `unsubscribe` (`subscription_id`:string): Unsubscribe from an event you previously subscribed to. * `publish` (`event_name`:string, `value`:any): Publish an event to all other clients who are subscribed. The client and server in `package:platform_pub_sub/isolate.dart` must make extra provisions to keep track of client ID's. Since `SendPort`s and `ReceivePort`s do not have any sort of guaranteed-unique ID's, new clients must send their `SendPort` to the server before sending any requests. The server then responds with an `id` that must be used to identify a `SendPort` to send a response to.