import 'dart:async'; import 'package:angel_websocket/angel_websocket.dart'; import 'package:pub_sub/pub_sub.dart' as pub_sub; import 'package:stream_channel/stream_channel.dart'; /// Synchronizes WebSockets using `package:pub_sub`. class PubSubSynchronizationChannel extends StreamChannelMixin { /// The event name used to synchronize events on the server. static const String eventName = 'angel_sync::event'; final StreamChannelController _ctrl = new StreamChannelController(); pub_sub.ClientSubscription _subscription; final pub_sub.Client client; PubSubSynchronizationChannel(this.client) { _ctrl.local.stream.listen((e) { return client .publish(eventName, e.toJson()) .catchError(_ctrl.local.sink.addError); }); client.subscribe(eventName).then((sub) { _subscription = sub ..listen((data) { // Incoming is a Map if (data is Map) { var e = new WebSocketEvent.fromJson(data); _ctrl.local.sink.add(e); } }, onError: _ctrl.local.sink.addError); }).catchError(_ctrl.local.sink.addError); } @override Stream get stream => _ctrl.foreign.stream; StreamSink get sink => _ctrl.foreign.sink; Future close() { if (_subscription != null) { _subscription.unsubscribe().then((_) => client.close()); } else client.close(); return _ctrl.local.sink.close(); } }