diff --git a/packages/sync/.gitignore b/packages/sync/.gitignore new file mode 100644 index 00000000..321543c9 --- /dev/null +++ b/packages/sync/.gitignore @@ -0,0 +1,13 @@ +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub +.packages +.pub/ +build/ +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ +.dart_tool \ No newline at end of file diff --git a/packages/sync/.travis.yml b/packages/sync/.travis.yml new file mode 100644 index 00000000..de2210c9 --- /dev/null +++ b/packages/sync/.travis.yml @@ -0,0 +1 @@ +language: dart \ No newline at end of file diff --git a/packages/sync/CHANGELOG.md b/packages/sync/CHANGELOG.md new file mode 100644 index 00000000..45cde10a --- /dev/null +++ b/packages/sync/CHANGELOG.md @@ -0,0 +1,3 @@ +# 2.0.0 +* Dart 2 + Angel 2 updates. +* Extend `StreamChannel`, instead of the defunct `WebSocketSynchronizer`. \ No newline at end of file diff --git a/packages/sync/LICENSE b/packages/sync/LICENSE new file mode 100644 index 00000000..89074fd3 --- /dev/null +++ b/packages/sync/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 The Angel Framework + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/sync/README.md b/packages/sync/README.md new file mode 100644 index 00000000..83ad914b --- /dev/null +++ b/packages/sync/README.md @@ -0,0 +1,21 @@ +# sync +[![Pub](https://img.shields.io/pub/v/angel_sync.svg)](https://pub.dartlang.org/packages/angel_sync) +[![build status](https://travis-ci.org/angel-dart/sync.svg)](https://travis-ci.org/angel-dart/sync) + +Easily synchronize and scale WebSockets using package:pub_sub. + +# Usage +This package exposes `PubSubSynchronizationChannel`, which +can simply be dropped into any `AngelWebSocket` constructor. + +Once you've set that up, instances of your application will +automatically fire events in-sync. That's all you have to do +to scale a real-time application with Angel! + +```dart +await app.configure(new AngelWebSocket( + synchronizationChannel: new PubSubSynchronizationChannel( + new pub_sub.IsolateClient('', adapter.receivePort.sendPort), + ), +)); +``` \ No newline at end of file diff --git a/packages/sync/analysis_options.yaml b/packages/sync/analysis_options.yaml new file mode 100644 index 00000000..eae1e42a --- /dev/null +++ b/packages/sync/analysis_options.yaml @@ -0,0 +1,3 @@ +analyzer: + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/packages/sync/example/main.dart b/packages/sync/example/main.dart new file mode 100644 index 00000000..dcc6e964 --- /dev/null +++ b/packages/sync/example/main.dart @@ -0,0 +1,99 @@ +import 'dart:isolate'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_sync/angel_sync.dart'; +import 'package:angel_test/angel_test.dart'; +import 'package:angel_websocket/io.dart' as client; +import 'package:angel_websocket/server.dart'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'package:test/test.dart'; + +main() { + Angel app1, app2; + TestClient app1Client; + client.WebSockets app2Client; + pub_sub.Server server; + ReceivePort app1Port, app2Port; + + setUp(() async { + var adapter = new pub_sub.IsolateAdapter(); + + server = new pub_sub.Server([ + adapter, + ]) + ..registerClient(const pub_sub.ClientInfo('angel_sync1')) + ..registerClient(const pub_sub.ClientInfo('angel_sync2')) + ..start(); + + app1 = new Angel(); + app2 = new Angel(); + + app1.post('/message', (req, res) async { + // Manually broadcast. Even though app1 has no clients, it *should* + // propagate to app2. + var ws = req.container.make(); + var body = await req.parseBody(); + ws.batchEvent(new WebSocketEvent( + eventName: 'message', + data: body['message'], + )); + return 'Sent: ${body['message']}'; + }); + + app1Port = new ReceivePort(); + var ws1 = new AngelWebSocket( + app1, + synchronizationChannel: new PubSubSynchronizationChannel( + new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort), + ), + ); + await app1.configure(ws1.configureServer); + app1.get('/ws', ws1.handleRequest); + app1Client = await connectTo(app1); + + app2Port = new ReceivePort(); + var ws2 = new AngelWebSocket( + app2, + synchronizationChannel: new PubSubSynchronizationChannel( + new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort), + ), + ); + await app2.configure(ws2.configureServer); + app2.get('/ws', ws2.handleRequest); + + var http = new AngelHttp(app2); + await http.startServer(); + var wsPath = + http.uri.replace(scheme: 'ws', path: '/ws').removeFragment().toString(); + print(wsPath); + app2Client = new client.WebSockets(wsPath); + await app2Client.connect(); + }); + + tearDown(() { + server.close(); + app1Port.close(); + app2Port.close(); + app1.close(); + app2.close(); + app1Client.close(); + app2Client.close(); + }); + + test('events propagate', () async { + // The point of this test is that neither app1 nor app2 + // is aware that the other even exists. + // + // Regardless, a WebSocket event broadcast in app1 will be + // broadcast by app2 as well. + + var stream = app2Client.on['message']; + var response = + await app1Client.post('/message', body: {'message': 'Hello, world!'}); + print('app1 response: ${response.body}'); + + var msg = await stream.first.timeout(const Duration(seconds: 5)); + print('app2 got message: ${msg.data}'); + }); +} diff --git a/packages/sync/lib/angel_sync.dart b/packages/sync/lib/angel_sync.dart new file mode 100644 index 00000000..2774ccb8 --- /dev/null +++ b/packages/sync/lib/angel_sync.dart @@ -0,0 +1,49 @@ +import 'dart:async'; +import 'package:angel_websocket/angel_websocket.dart'; +import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'package:stream_channel/stream_channel.dart'; + +/// Synchronizes WebSockets using `package:pub_sub`. +class PubSubSynchronizationChannel extends StreamChannelMixin { + /// The event name used to synchronize events on the server. + static const String eventName = 'angel_sync::event'; + + final StreamChannelController _ctrl = + new StreamChannelController(); + + pub_sub.ClientSubscription _subscription; + + final pub_sub.Client client; + + PubSubSynchronizationChannel(this.client) { + _ctrl.local.stream.listen((e) { + return client + .publish(eventName, e.toJson()) + .catchError(_ctrl.local.sink.addError); + }); + + client.subscribe(eventName).then((sub) { + _subscription = sub + ..listen((data) { + // Incoming is a Map + if (data is Map) { + var e = new WebSocketEvent.fromJson(data); + _ctrl.local.sink.add(e); + } + }, onError: _ctrl.local.sink.addError); + }).catchError(_ctrl.local.sink.addError); + } + + @override + Stream get stream => _ctrl.foreign.stream; + + StreamSink get sink => _ctrl.foreign.sink; + + Future close() { + if (_subscription != null) { + _subscription.unsubscribe().then((_) => client.close()); + } else + client.close(); + return _ctrl.local.sink.close(); + } +} diff --git a/packages/sync/pubspec.yaml b/packages/sync/pubspec.yaml new file mode 100644 index 00000000..8a23d1cf --- /dev/null +++ b/packages/sync/pubspec.yaml @@ -0,0 +1,15 @@ +name: angel_sync +version: 2.0.0 +description: Easily synchronize and scale WebSockets using package:pub_sub. +author: Tobe O +homepage: https://github.com/angel-dart/sync +environment: + sdk: ">=2.0.0-dev <3.0.0" +dependencies: + angel_framework: ^2.0.0-alpha + angel_websocket: ^2.0.0-alpha + pub_sub: ^2.0.0 + stream_channel: ^1.0.0 +dev_dependencies: + angel_test: ^2.0.0-alpha + test: ^1.0.0 \ No newline at end of file diff --git a/packages/sync/test/all_test.dart b/packages/sync/test/all_test.dart new file mode 100644 index 00000000..dcc6e964 --- /dev/null +++ b/packages/sync/test/all_test.dart @@ -0,0 +1,99 @@ +import 'dart:isolate'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_sync/angel_sync.dart'; +import 'package:angel_test/angel_test.dart'; +import 'package:angel_websocket/io.dart' as client; +import 'package:angel_websocket/server.dart'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'package:test/test.dart'; + +main() { + Angel app1, app2; + TestClient app1Client; + client.WebSockets app2Client; + pub_sub.Server server; + ReceivePort app1Port, app2Port; + + setUp(() async { + var adapter = new pub_sub.IsolateAdapter(); + + server = new pub_sub.Server([ + adapter, + ]) + ..registerClient(const pub_sub.ClientInfo('angel_sync1')) + ..registerClient(const pub_sub.ClientInfo('angel_sync2')) + ..start(); + + app1 = new Angel(); + app2 = new Angel(); + + app1.post('/message', (req, res) async { + // Manually broadcast. Even though app1 has no clients, it *should* + // propagate to app2. + var ws = req.container.make(); + var body = await req.parseBody(); + ws.batchEvent(new WebSocketEvent( + eventName: 'message', + data: body['message'], + )); + return 'Sent: ${body['message']}'; + }); + + app1Port = new ReceivePort(); + var ws1 = new AngelWebSocket( + app1, + synchronizationChannel: new PubSubSynchronizationChannel( + new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort), + ), + ); + await app1.configure(ws1.configureServer); + app1.get('/ws', ws1.handleRequest); + app1Client = await connectTo(app1); + + app2Port = new ReceivePort(); + var ws2 = new AngelWebSocket( + app2, + synchronizationChannel: new PubSubSynchronizationChannel( + new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort), + ), + ); + await app2.configure(ws2.configureServer); + app2.get('/ws', ws2.handleRequest); + + var http = new AngelHttp(app2); + await http.startServer(); + var wsPath = + http.uri.replace(scheme: 'ws', path: '/ws').removeFragment().toString(); + print(wsPath); + app2Client = new client.WebSockets(wsPath); + await app2Client.connect(); + }); + + tearDown(() { + server.close(); + app1Port.close(); + app2Port.close(); + app1.close(); + app2.close(); + app1Client.close(); + app2Client.close(); + }); + + test('events propagate', () async { + // The point of this test is that neither app1 nor app2 + // is aware that the other even exists. + // + // Regardless, a WebSocket event broadcast in app1 will be + // broadcast by app2 as well. + + var stream = app2Client.on['message']; + var response = + await app1Client.post('/message', body: {'message': 'Hello, world!'}); + print('app1 response: ${response.body}'); + + var msg = await stream.first.timeout(const Duration(seconds: 5)); + print('app2 got message: ${msg.data}'); + }); +}