diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..de2210c9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1 @@ +language: dart \ No newline at end of file diff --git a/README.md b/README.md index 2479c35f..d9302337 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,21 @@ # sync +[![Pub](https://img.shields.io/pub/v/angel_sync.svg)](https://pub.dartlang.org/packages/angel_sync) +[![build status](https://travis-ci.org/angel-dart/sync.svg)](https://travis-ci.org/angel-dart/sync) + Easily synchronize and scale WebSockets using package:pub_sub. + +# Usage +This package exposes `PubSubWebSocketSynchronizer`, which +can simply be dropped into any `AngelWebSocket` constructor. + +Once you've set that up, instances of your application will +automatically fire events in-sync. That's all you have to do +to scale a real-time application with Angel! + +```dart +await app.configure(new AngelWebSocket( + synchronizer: new PubSubWebSocketSynchronizer( + new pub_sub.IsolateClient('', adapter.receivePort.sendPort), + ), +)); +``` \ No newline at end of file diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 00000000..518eb901 --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,2 @@ +analyzer: + strong-mode: true \ No newline at end of file diff --git a/lib/angel_sync.dart b/lib/angel_sync.dart new file mode 100644 index 00000000..e0640bc4 --- /dev/null +++ b/lib/angel_sync.dart @@ -0,0 +1,41 @@ +import 'dart:async'; +import 'package:angel_websocket/server.dart'; +import 'package:pub_sub/pub_sub.dart' as pub_sub; + +/// Synchronizes WebSockets using `package:pub_sub`. +class PubSubWebSocketSynchronizer extends WebSocketSynchronizer { + /// The event name used to synchronize events on the server. + static const String eventName = 'angel_sync::event'; + + final StreamController _stream = + new StreamController(); + + pub_sub.ClientSubscription _subscription; + + final pub_sub.Client client; + + PubSubWebSocketSynchronizer(this.client) { + client.subscribe(eventName).then((sub) { + _subscription = sub + ..listen((Map data) { + if (!_stream.isClosed) _stream.add(new WebSocketEvent.fromJson(data)); + }, onError: _stream.addError); + }).catchError(_stream.addError); + } + + @override + Stream get stream => _stream.stream; + + Future close() { + if (_subscription != null) { + _subscription.unsubscribe().then((_) => client.close()); + } else + client.close(); + return new Future.value(); + } + + @override + void notifyOthers(WebSocketEvent e) { + client.publish(eventName, e.toJson()).catchError(_stream.addError); + } +} diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 00000000..e305fb00 --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,14 @@ +name: angel_sync +version: 1.0.0 +description: Easily synchronize and scale WebSockets using package:pub_sub. +author: Tobe O +homepage: https://github.com/angel-dart/sync +environment: + sdk: ">=1.19.0" +dependencies: + angel_framework: ^1.0.0 + angel_websocket: ^1.0.0 + pub_sub: ^1.0.0 +dev_dependencies: + angel_test: ^1.0.0 + test: ^0.12.0 \ No newline at end of file diff --git a/test/all_test.dart b/test/all_test.dart new file mode 100644 index 00000000..02648a74 --- /dev/null +++ b/test/all_test.dart @@ -0,0 +1,87 @@ +import 'dart:isolate'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_sync/angel_sync.dart'; +import 'package:angel_test/angel_test.dart'; +import 'package:angel_websocket/io.dart' as client; +import 'package:angel_websocket/server.dart'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'package:test/test.dart'; + +main() { + Angel app1, app2; + TestClient app1Client; + client.WebSockets app2Client; + pub_sub.Server server; + ReceivePort app1Port, app2Port; + + setUp(() async { + var adapter = new pub_sub.IsolateAdapter(); + + server = new pub_sub.Server([ + adapter, + ]) + ..registerClient(const pub_sub.ClientInfo('angel_sync1')) + ..registerClient(const pub_sub.ClientInfo('angel_sync2')) + ..start(); + + app1 = new Angel(); + app2 = new Angel(); + + app1.post('/message', (RequestContext req, AngelWebSocket ws) async { + // Manually broadcast. Even though app1 has no clients, it *should* + // propagate to app2. + ws.batchEvent(new WebSocketEvent( + eventName: 'message', + data: req.body['message'], + )); + return 'Sent: ${req.body['message']}'; + }); + + app1Port = new ReceivePort(); + await app1.configure(new AngelWebSocket( + synchronizer: new PubSubWebSocketSynchronizer( + new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort), + ), + )); + app1Client = await connectTo(app1); + + app2Port = new ReceivePort(); + await app2.configure(new AngelWebSocket( + synchronizer: new PubSubWebSocketSynchronizer( + new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort), + ), + )); + + var http = await app2.startServer(); + app2Client = + new client.WebSockets('ws://${http.address.address}:${http.port}/ws'); + await app2Client.connect(); + }); + + tearDown(() { + server.close(); + app1Port.close(); + app2Port.close(); + app1.close(); + app2.close(); + app1Client.close(); + app2Client.close(); + }); + + test('events propagate', () async { + // The point of this test is that neither app1 nor app2 + // is aware that the other even exists. + // + // Regardless, a WebSocket event broadcast in app1 will be + // broadcast by app2 as well. + + var stream = app2Client.on['message']; + var response = + await app1Client.post('/message', body: {'message': 'Hello, world!'}); + print('app1 response: ${response.body}'); + + var msg = await stream.first.timeout(const Duration(seconds: 5)); + print('app2 got message: ${msg.data}'); + }); +}