platform/packages/poll/lib/angel_poll.dart

261 lines
6.4 KiB
Dart
Raw Normal View History

2017-12-21 17:38:30 +00:00
import 'dart:async';
2017-12-21 19:24:49 +00:00
import 'package:collection/collection.dart';
2017-12-21 17:38:30 +00:00
import 'package:angel_client/angel_client.dart';
2017-12-21 19:24:49 +00:00
/// A [Service] that facilitates real-time updates via the long polling of an [inner] service.
///
/// Works well with [ServiceList].
class PollingService extends Service {
/// The underlying [Service] that does the actual communication with the server.
2017-12-21 17:38:30 +00:00
final Service inner;
2017-12-21 19:24:49 +00:00
/// Perform computations after polling to discern whether new items were created.
final bool checkForCreated;
/// Perform computations after polling to discern whether items were modified.
final bool checkForModified;
/// Perform computations after polling to discern whether items were removed.
final bool checkForRemoved;
/// An [EqualityBy] used to compare the ID's of two items.
///
/// Defaults to comparing the [idField] of two `Map` instances.
final EqualityBy compareId;
/// An [Equality] used to discern whether two items, with the same [idField], are the same item.
///
/// Defaults to [MapEquality], which deep-compares `Map` instances.
final Equality compareItems;
/// A [String] used as an index through which to compare `Map` instances.
///
/// Defaults to `id`.
2017-12-21 17:38:30 +00:00
final String idField;
2017-12-21 19:24:49 +00:00
/// If `true` (default: `false`), then `index` events will be handled as a [Map] containing a `data` field.
///
/// See https://github.com/angel-dart/paginate.
2017-12-21 17:38:30 +00:00
final bool asPaginated;
final List _items = [];
2017-12-21 19:24:49 +00:00
final List<StreamSubscription> _subs = [];
2017-12-21 17:38:30 +00:00
final StreamController _onIndexed = new StreamController(),
_onRead = new StreamController(),
_onCreated = new StreamController(),
_onModified = new StreamController(),
_onUpdated = new StreamController(),
_onRemoved = new StreamController();
Timer _timer;
@override
Angel get app => inner.app;
2021-02-14 05:22:25 +00:00
// TODO: To revisit this logic
2017-12-21 17:38:30 +00:00
@override
2021-02-14 05:22:25 +00:00
Stream<List> get onIndexed => _onIndexed.stream;
2017-12-21 17:38:30 +00:00
@override
Stream get onRead => _onRead.stream;
@override
Stream get onCreated => _onCreated.stream;
@override
Stream get onModified => _onModified.stream;
@override
Stream get onUpdated => _onUpdated.stream;
@override
Stream get onRemoved => _onRemoved.stream;
2017-12-21 19:24:49 +00:00
PollingService(this.inner, Duration interval,
{this.checkForCreated: true,
this.checkForModified: true,
this.checkForRemoved: true,
this.idField: 'id',
this.asPaginated: false,
EqualityBy compareId,
this.compareItems: const MapEquality()})
: compareId = compareId ?? new EqualityBy((map) => map[idField ?? 'id']) {
_timer = new Timer.periodic(interval, (_) {
index().catchError(_onIndexed.addError);
});
var streams = <Stream, StreamController>{
inner.onRead: _onRead,
inner.onCreated: _onCreated,
inner.onModified: _onModified,
inner.onUpdated: _onUpdated,
inner.onRemoved: _onRemoved,
};
streams.forEach((stream, ctrl) {
_subs.add(stream.listen(ctrl.add, onError: ctrl.addError));
});
_subs.add(
inner.onIndexed.listen(
_handleIndexed,
onError: _onIndexed.addError,
),
);
2017-12-21 17:38:30 +00:00
}
@override
Future close() async {
_timer.cancel();
2017-12-21 19:24:49 +00:00
_subs.forEach((s) => s.cancel());
2017-12-21 17:38:30 +00:00
_onIndexed.close();
_onRead.close();
_onCreated.close();
_onModified.close();
_onUpdated.close();
_onRemoved.close();
}
2021-02-14 05:22:25 +00:00
// TODO: To revisit this logic
2017-12-21 17:38:30 +00:00
@override
2021-02-14 05:22:25 +00:00
Future<List> index([Map params]) {
2017-12-21 17:38:30 +00:00
return inner.index().then((data) {
2021-02-14 05:22:25 +00:00
//return asPaginated == true ? data['data'] : data;
return asPaginated == true ? data[0] : data;
2017-12-21 17:38:30 +00:00
});
}
2021-02-14 05:22:25 +00:00
/*
@override
Future index([Map params]) {
}
*/
2017-12-21 17:38:30 +00:00
@override
2017-12-21 19:24:49 +00:00
Future remove(id, [Map params]) {
return inner.remove(id, params).then((result) {
_items.remove(result);
return result;
}).catchError(_onRemoved.addError);
}
_handleUpdate(result) {
int index = -1;
for (int i = 0; i < _items.length; i++) {
if (compareId.equals(_items[i], result)) {
index = i;
break;
}
}
if (index > -1) {
_items[index] = result;
}
return result;
}
2017-12-21 17:38:30 +00:00
@override
2017-12-21 19:24:49 +00:00
Future update(id, data, [Map params]) {
return inner
.update(id, data, params)
.then(_handleUpdate)
.catchError(_onUpdated.addError);
}
2017-12-21 17:38:30 +00:00
@override
2017-12-21 19:24:49 +00:00
Future modify(id, data, [Map params]) {
return inner
.modify(id, data, params)
.then(_handleUpdate)
.catchError(_onModified.addError);
}
2017-12-21 17:38:30 +00:00
@override
2017-12-21 19:24:49 +00:00
Future create(data, [Map params]) {
return inner.create(data, params).then((result) {
_items.add(result);
return result;
}).catchError(_onCreated.addError);
}
2017-12-21 17:38:30 +00:00
@override
2017-12-21 19:24:49 +00:00
Future read(id, [Map params]) {
return inner.read(id, params);
}
void _handleIndexed(data) {
var items = asPaginated == true ? data['data'] : data;
bool changesComputed = false;
if (checkForCreated != false) {
var newItems = <int, dynamic>{};
for (int i = 0; i < items.length; i++) {
var item = items[i];
if (!_items.any((i) => compareId.equals(i, item))) {
newItems[i] = item;
}
}
newItems.forEach((index, item) {
_items.insert(index, item);
_onCreated.add(item);
});
changesComputed = newItems.isNotEmpty;
}
if (checkForRemoved != false) {
var removedItems = <int, dynamic>{};
2017-12-21 17:38:30 +00:00
2017-12-21 19:24:49 +00:00
for (int i = 0; i < _items.length; i++) {
var item = _items[i];
2017-12-21 17:38:30 +00:00
2017-12-21 19:24:49 +00:00
if (!items.any((i) => compareId.equals(i, item))) {
removedItems[i] = item;
}
}
2017-12-21 17:38:30 +00:00
2017-12-21 19:24:49 +00:00
removedItems.forEach((index, item) {
_items.removeAt(index);
_onRemoved.add(item);
});
changesComputed = changesComputed || removedItems.isNotEmpty;
}
if (checkForModified != false) {
var modifiedItems = <int, dynamic>{};
for (var item in items) {
for (int i = 0; i < _items.length; i++) {
var localItem = _items[i];
if (compareId.equals(item, localItem)) {
if (!compareItems.equals(item, localItem)) {
modifiedItems[i] = item;
}
break;
}
}
}
modifiedItems.forEach((index, item) {
_onModified.add(_items[index] = item);
});
changesComputed = changesComputed || modifiedItems.isNotEmpty;
}
if (!changesComputed) {
_items
..clear()
..addAll(items);
_onIndexed.add(items);
}
2017-12-21 17:38:30 +00:00
}
}