diff --git a/.gitignore b/.gitignore index 4d2a4d6d..321543c9 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ pubspec.lock # Directory created by dartdoc # If you don't generate documentation locally you can remove this line. doc/api/ +.dart_tool \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..45cde10a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +# 2.0.0 +* Dart 2 + Angel 2 updates. +* Extend `StreamChannel`, instead of the defunct `WebSocketSynchronizer`. \ No newline at end of file diff --git a/analysis_options.yaml b/analysis_options.yaml index 518eb901..eae1e42a 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -1,2 +1,3 @@ analyzer: - strong-mode: true \ No newline at end of file + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/lib/angel_sync.dart b/lib/angel_sync.dart index e0640bc4..2774ccb8 100644 --- a/lib/angel_sync.dart +++ b/lib/angel_sync.dart @@ -1,41 +1,49 @@ import 'dart:async'; -import 'package:angel_websocket/server.dart'; +import 'package:angel_websocket/angel_websocket.dart'; import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'package:stream_channel/stream_channel.dart'; /// Synchronizes WebSockets using `package:pub_sub`. -class PubSubWebSocketSynchronizer extends WebSocketSynchronizer { +class PubSubSynchronizationChannel extends StreamChannelMixin { /// The event name used to synchronize events on the server. static const String eventName = 'angel_sync::event'; - final StreamController _stream = - new StreamController(); + final StreamChannelController _ctrl = + new StreamChannelController(); pub_sub.ClientSubscription _subscription; final pub_sub.Client client; - PubSubWebSocketSynchronizer(this.client) { + PubSubSynchronizationChannel(this.client) { + _ctrl.local.stream.listen((e) { + return client + .publish(eventName, e.toJson()) + .catchError(_ctrl.local.sink.addError); + }); + client.subscribe(eventName).then((sub) { _subscription = sub - ..listen((Map data) { - if (!_stream.isClosed) _stream.add(new WebSocketEvent.fromJson(data)); - }, onError: _stream.addError); - }).catchError(_stream.addError); + ..listen((data) { + // Incoming is a Map + if (data is Map) { + var e = new WebSocketEvent.fromJson(data); + _ctrl.local.sink.add(e); + } + }, onError: _ctrl.local.sink.addError); + }).catchError(_ctrl.local.sink.addError); } @override - Stream get stream => _stream.stream; + Stream get stream => _ctrl.foreign.stream; + + StreamSink get sink => _ctrl.foreign.sink; Future close() { if (_subscription != null) { _subscription.unsubscribe().then((_) => client.close()); } else client.close(); - return new Future.value(); - } - - @override - void notifyOthers(WebSocketEvent e) { - client.publish(eventName, e.toJson()).catchError(_stream.addError); + return _ctrl.local.sink.close(); } } diff --git a/pubspec.yaml b/pubspec.yaml index e305fb00..8a23d1cf 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,14 +1,15 @@ name: angel_sync -version: 1.0.0 +version: 2.0.0 description: Easily synchronize and scale WebSockets using package:pub_sub. author: Tobe O homepage: https://github.com/angel-dart/sync environment: - sdk: ">=1.19.0" + sdk: ">=2.0.0-dev <3.0.0" dependencies: - angel_framework: ^1.0.0 - angel_websocket: ^1.0.0 - pub_sub: ^1.0.0 + angel_framework: ^2.0.0-alpha + angel_websocket: ^2.0.0-alpha + pub_sub: ^2.0.0 + stream_channel: ^1.0.0 dev_dependencies: - angel_test: ^1.0.0 - test: ^0.12.0 \ No newline at end of file + angel_test: ^2.0.0-alpha + test: ^1.0.0 \ No newline at end of file diff --git a/test/all_test.dart b/test/all_test.dart index 02648a74..dcc6e964 100644 --- a/test/all_test.dart +++ b/test/all_test.dart @@ -1,5 +1,6 @@ import 'dart:isolate'; import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; import 'package:angel_sync/angel_sync.dart'; import 'package:angel_test/angel_test.dart'; import 'package:angel_websocket/io.dart' as client; @@ -28,34 +29,45 @@ main() { app1 = new Angel(); app2 = new Angel(); - app1.post('/message', (RequestContext req, AngelWebSocket ws) async { + app1.post('/message', (req, res) async { // Manually broadcast. Even though app1 has no clients, it *should* // propagate to app2. + var ws = req.container.make(); + var body = await req.parseBody(); ws.batchEvent(new WebSocketEvent( eventName: 'message', - data: req.body['message'], + data: body['message'], )); - return 'Sent: ${req.body['message']}'; + return 'Sent: ${body['message']}'; }); app1Port = new ReceivePort(); - await app1.configure(new AngelWebSocket( - synchronizer: new PubSubWebSocketSynchronizer( + var ws1 = new AngelWebSocket( + app1, + synchronizationChannel: new PubSubSynchronizationChannel( new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort), ), - )); + ); + await app1.configure(ws1.configureServer); + app1.get('/ws', ws1.handleRequest); app1Client = await connectTo(app1); app2Port = new ReceivePort(); - await app2.configure(new AngelWebSocket( - synchronizer: new PubSubWebSocketSynchronizer( + var ws2 = new AngelWebSocket( + app2, + synchronizationChannel: new PubSubSynchronizationChannel( new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort), ), - )); + ); + await app2.configure(ws2.configureServer); + app2.get('/ws', ws2.handleRequest); - var http = await app2.startServer(); - app2Client = - new client.WebSockets('ws://${http.address.address}:${http.port}/ws'); + var http = new AngelHttp(app2); + await http.startServer(); + var wsPath = + http.uri.replace(scheme: 'ws', path: '/ws').removeFragment().toString(); + print(wsPath); + app2Client = new client.WebSockets(wsPath); await app2Client.connect(); });