platform/packages/sync/lib/angel_sync.dart

54 lines
1.5 KiB
Dart
Raw Normal View History

2017-08-08 21:04:36 +00:00
import 'dart:async';
2018-11-15 17:03:03 +00:00
import 'package:angel_websocket/angel_websocket.dart';
2017-08-08 21:04:36 +00:00
import 'package:pub_sub/pub_sub.dart' as pub_sub;
2018-11-15 17:03:03 +00:00
import 'package:stream_channel/stream_channel.dart';
2017-08-08 21:04:36 +00:00
/// Synchronizes WebSockets using `package:pub_sub`.
2018-11-15 17:03:03 +00:00
class PubSubSynchronizationChannel extends StreamChannelMixin<WebSocketEvent> {
2017-08-08 21:04:36 +00:00
/// The event name used to synchronize events on the server.
static const String eventName = 'angel_sync::event';
2018-11-15 17:03:03 +00:00
final StreamChannelController<WebSocketEvent> _ctrl =
2021-06-20 12:37:20 +00:00
StreamChannelController<WebSocketEvent>();
2017-08-08 21:04:36 +00:00
2021-06-20 12:37:20 +00:00
pub_sub.ClientSubscription? _subscription;
2017-08-08 21:04:36 +00:00
final pub_sub.Client client;
2018-11-15 17:03:03 +00:00
PubSubSynchronizationChannel(this.client) {
_ctrl.local.stream.listen((e) {
2021-06-20 12:37:20 +00:00
client
2018-11-15 17:03:03 +00:00
.publish(eventName, e.toJson())
.catchError(_ctrl.local.sink.addError);
});
2017-08-08 21:04:36 +00:00
client.subscribe(eventName).then((sub) {
_subscription = sub
2018-11-15 17:03:03 +00:00
..listen((data) {
// Incoming is a Map
if (data is Map) {
2021-06-20 12:37:20 +00:00
var e = WebSocketEvent.fromJson(data);
2018-11-15 17:03:03 +00:00
_ctrl.local.sink.add(e);
}
}, onError: _ctrl.local.sink.addError);
2021-06-20 12:37:20 +00:00
}).catchError((error) {
_ctrl.local.sink.addError(error as Object);
});
2017-08-08 21:04:36 +00:00
}
@override
2018-11-15 17:03:03 +00:00
Stream<WebSocketEvent> get stream => _ctrl.foreign.stream;
2021-06-20 12:37:20 +00:00
@override
2018-11-15 17:03:03 +00:00
StreamSink<WebSocketEvent> get sink => _ctrl.foreign.sink;
2017-08-08 21:04:36 +00:00
Future close() {
if (_subscription != null) {
2021-06-20 12:37:20 +00:00
_subscription!.unsubscribe().then((_) => client.close());
} else {
2017-08-08 21:04:36 +00:00
client.close();
2021-06-20 12:37:20 +00:00
}
2018-11-15 17:03:03 +00:00
return _ctrl.local.sink.close();
2017-08-08 21:04:36 +00:00
}
}