This commit is contained in:
Tobe O 2018-11-15 12:03:03 -05:00
parent b2a7af039d
commit 6fae2e706c
6 changed files with 62 additions and 36 deletions

1
.gitignore vendored
View file

@ -10,3 +10,4 @@ pubspec.lock
# Directory created by dartdoc # Directory created by dartdoc
# If you don't generate documentation locally you can remove this line. # If you don't generate documentation locally you can remove this line.
doc/api/ doc/api/
.dart_tool

3
CHANGELOG.md Normal file
View file

@ -0,0 +1,3 @@
# 2.0.0
* Dart 2 + Angel 2 updates.
* Extend `StreamChannel`, instead of the defunct `WebSocketSynchronizer`.

View file

@ -1,2 +1,3 @@
analyzer: analyzer:
strong-mode: true strong-mode:
implicit-casts: false

View file

@ -1,41 +1,49 @@
import 'dart:async'; import 'dart:async';
import 'package:angel_websocket/server.dart'; import 'package:angel_websocket/angel_websocket.dart';
import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:pub_sub/pub_sub.dart' as pub_sub;
import 'package:stream_channel/stream_channel.dart';
/// Synchronizes WebSockets using `package:pub_sub`. /// Synchronizes WebSockets using `package:pub_sub`.
class PubSubWebSocketSynchronizer extends WebSocketSynchronizer { class PubSubSynchronizationChannel extends StreamChannelMixin<WebSocketEvent> {
/// The event name used to synchronize events on the server. /// The event name used to synchronize events on the server.
static const String eventName = 'angel_sync::event'; static const String eventName = 'angel_sync::event';
final StreamController<WebSocketEvent> _stream = final StreamChannelController<WebSocketEvent> _ctrl =
new StreamController<WebSocketEvent>(); new StreamChannelController<WebSocketEvent>();
pub_sub.ClientSubscription _subscription; pub_sub.ClientSubscription _subscription;
final pub_sub.Client client; final pub_sub.Client client;
PubSubWebSocketSynchronizer(this.client) { PubSubSynchronizationChannel(this.client) {
_ctrl.local.stream.listen((e) {
return client
.publish(eventName, e.toJson())
.catchError(_ctrl.local.sink.addError);
});
client.subscribe(eventName).then((sub) { client.subscribe(eventName).then((sub) {
_subscription = sub _subscription = sub
..listen((Map data) { ..listen((data) {
if (!_stream.isClosed) _stream.add(new WebSocketEvent.fromJson(data)); // Incoming is a Map
}, onError: _stream.addError); if (data is Map) {
}).catchError(_stream.addError); var e = new WebSocketEvent.fromJson(data);
_ctrl.local.sink.add(e);
}
}, onError: _ctrl.local.sink.addError);
}).catchError(_ctrl.local.sink.addError);
} }
@override @override
Stream<WebSocketEvent> get stream => _stream.stream; Stream<WebSocketEvent> get stream => _ctrl.foreign.stream;
StreamSink<WebSocketEvent> get sink => _ctrl.foreign.sink;
Future close() { Future close() {
if (_subscription != null) { if (_subscription != null) {
_subscription.unsubscribe().then((_) => client.close()); _subscription.unsubscribe().then((_) => client.close());
} else } else
client.close(); client.close();
return new Future.value(); return _ctrl.local.sink.close();
}
@override
void notifyOthers(WebSocketEvent e) {
client.publish(eventName, e.toJson()).catchError(_stream.addError);
} }
} }

View file

@ -1,14 +1,15 @@
name: angel_sync name: angel_sync
version: 1.0.0 version: 2.0.0
description: Easily synchronize and scale WebSockets using package:pub_sub. description: Easily synchronize and scale WebSockets using package:pub_sub.
author: Tobe O <thosakwe@gmail.com> author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/sync homepage: https://github.com/angel-dart/sync
environment: environment:
sdk: ">=1.19.0" sdk: ">=2.0.0-dev <3.0.0"
dependencies: dependencies:
angel_framework: ^1.0.0 angel_framework: ^2.0.0-alpha
angel_websocket: ^1.0.0 angel_websocket: ^2.0.0-alpha
pub_sub: ^1.0.0 pub_sub: ^2.0.0
stream_channel: ^1.0.0
dev_dependencies: dev_dependencies:
angel_test: ^1.0.0 angel_test: ^2.0.0-alpha
test: ^0.12.0 test: ^1.0.0

View file

@ -1,5 +1,6 @@
import 'dart:isolate'; import 'dart:isolate';
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/http.dart';
import 'package:angel_sync/angel_sync.dart'; import 'package:angel_sync/angel_sync.dart';
import 'package:angel_test/angel_test.dart'; import 'package:angel_test/angel_test.dart';
import 'package:angel_websocket/io.dart' as client; import 'package:angel_websocket/io.dart' as client;
@ -28,34 +29,45 @@ main() {
app1 = new Angel(); app1 = new Angel();
app2 = new Angel(); app2 = new Angel();
app1.post('/message', (RequestContext req, AngelWebSocket ws) async { app1.post('/message', (req, res) async {
// Manually broadcast. Even though app1 has no clients, it *should* // Manually broadcast. Even though app1 has no clients, it *should*
// propagate to app2. // propagate to app2.
var ws = req.container.make<AngelWebSocket>();
var body = await req.parseBody();
ws.batchEvent(new WebSocketEvent( ws.batchEvent(new WebSocketEvent(
eventName: 'message', eventName: 'message',
data: req.body['message'], data: body['message'],
)); ));
return 'Sent: ${req.body['message']}'; return 'Sent: ${body['message']}';
}); });
app1Port = new ReceivePort(); app1Port = new ReceivePort();
await app1.configure(new AngelWebSocket( var ws1 = new AngelWebSocket(
synchronizer: new PubSubWebSocketSynchronizer( app1,
synchronizationChannel: new PubSubSynchronizationChannel(
new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort), new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort),
), ),
)); );
await app1.configure(ws1.configureServer);
app1.get('/ws', ws1.handleRequest);
app1Client = await connectTo(app1); app1Client = await connectTo(app1);
app2Port = new ReceivePort(); app2Port = new ReceivePort();
await app2.configure(new AngelWebSocket( var ws2 = new AngelWebSocket(
synchronizer: new PubSubWebSocketSynchronizer( app2,
synchronizationChannel: new PubSubSynchronizationChannel(
new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort), new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort),
), ),
)); );
await app2.configure(ws2.configureServer);
app2.get('/ws', ws2.handleRequest);
var http = await app2.startServer(); var http = new AngelHttp(app2);
app2Client = await http.startServer();
new client.WebSockets('ws://${http.address.address}:${http.port}/ws'); var wsPath =
http.uri.replace(scheme: 'ws', path: '/ws').removeFragment().toString();
print(wsPath);
app2Client = new client.WebSockets(wsPath);
await app2Client.connect(); await app2Client.connect();
}); });