This commit is contained in:
Tobe O 2017-12-21 14:24:49 -05:00
parent 990a129c02
commit bb368721cf
10 changed files with 381 additions and 22 deletions

View file

@ -0,0 +1,8 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="fires modified in all_test.dart" type="DartTestRunConfigurationType" factoryName="Dart Test" singleton="true" nameIsGenerated="true">
<option name="filePath" value="$PROJECT_DIR$/test/all_test.dart" />
<option name="scope" value="GROUP_OR_TEST_BY_NAME" />
<option name="testName" value="fires modified" />
<method />
</configuration>
</component>

View file

@ -0,0 +1,8 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="fires removed in all_test.dart" type="DartTestRunConfigurationType" factoryName="Dart Test" singleton="true" nameIsGenerated="true">
<option name="filePath" value="$PROJECT_DIR$/test/all_test.dart" />
<option name="scope" value="GROUP_OR_TEST_BY_NAME" />
<option name="testName" value="fires removed" />
<method />
</configuration>
</component>

View file

@ -0,0 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tests in all_test.dart" type="DartTestRunConfigurationType" factoryName="Dart Test" singleton="true" nameIsGenerated="true">
<option name="filePath" value="$PROJECT_DIR$/test/all_test.dart" />
<method />
</configuration>
</component>

1
.travis.yml Normal file
View file

@ -0,0 +1 @@
language: dart

2
CHANGELOG.md Normal file
View file

@ -0,0 +1,2 @@
# 1.0.0
* Created package + tests

View file

@ -1,2 +1,42 @@
# poll
package:angel_client upport for "realtime" interactions with Angel via long polling.
[![Pub](https://img.shields.io/pub/v/angel_poll.svg)](https://pub.dartlang.org/packages/angel_poll)
[![build status](https://travis-ci.org/angel-dart/poll.svg?branch=master)](https://travis-ci.org/angel-dart/poll)
`package:angel_client` support for "realtime" interactions with Angel via long polling.
Angel supports [WebSockets](https://github.com/angel-dart/websocket) on the server and client, which
makes it very straightforward to implement realtime collections. However, not every user's browser
supports WebSockets. In such a case, applications might *gracefully degrade* to long-polling
the server for changes.
A `PollingService` wraps a client-side `Service` (typically a REST-based one), and calls its
`index` method at a regular interval. After indexing, the `PollingService` performs a diff
and identifies whether items have been created, modified, or removed. The updates are sent out
through `onCreated`, `onModified`, etc., effectively managing a real-time collection of data.
A common use-case would be passing this service to `ServiceList`, a class that manages the state
of a collection managed in real-time.
```dart
import 'package:angel_client/io.dart';
import 'package:angel_poll/angel_poll.dart';
main() {
var app = new Rest('http://localhost:3000');
var todos = new ServiceList(
new PollingService(
// Typically, you'll pass a REST-based service instance here.
app.service('api/todos'),
// `index` called every 5 seconds
const Duration(seconds: 5),
),
);
todos.onChange.listen((_) {
// Something happened here.
// Maybe an item was created, modified, etc.
});
}
```

21
example/main.dart Normal file
View file

@ -0,0 +1,21 @@
import 'package:angel_client/io.dart';
import 'package:angel_poll/angel_poll.dart';
main() {
var app = new Rest('http://localhost:3000');
var todos = new ServiceList(
new PollingService(
// Typically, you'll pass a REST-based service instance here.
app.service('api/todos'),
// `index` called every 5 seconds
const Duration(seconds: 5),
),
);
todos.onChange.listen((_) {
// Something happened here.
// Maybe an item was created, modified, etc.
});
}

View file

@ -1,13 +1,46 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:angel_client/angel_client.dart';
class Poll extends Service {
/// A [Service] that facilitates real-time updates via the long polling of an [inner] service.
///
/// Works well with [ServiceList].
class PollingService extends Service {
/// The underlying [Service] that does the actual communication with the server.
final Service inner;
/// Perform computations after polling to discern whether new items were created.
final bool checkForCreated;
/// Perform computations after polling to discern whether items were modified.
final bool checkForModified;
/// Perform computations after polling to discern whether items were removed.
final bool checkForRemoved;
/// An [EqualityBy] used to compare the ID's of two items.
///
/// Defaults to comparing the [idField] of two `Map` instances.
final EqualityBy compareId;
/// An [Equality] used to discern whether two items, with the same [idField], are the same item.
///
/// Defaults to [MapEquality], which deep-compares `Map` instances.
final Equality compareItems;
/// A [String] used as an index through which to compare `Map` instances.
///
/// Defaults to `id`.
final String idField;
/// If `true` (default: `false`), then `index` events will be handled as a [Map] containing a `data` field.
///
/// See https://github.com/angel-dart/paginate.
final bool asPaginated;
final List _items = [];
final List<StreamSubscription> _subs = [];
final StreamController _onIndexed = new StreamController(),
_onRead = new StreamController(),
_onCreated = new StreamController(),
@ -15,7 +48,6 @@ class Poll extends Service {
_onUpdated = new StreamController(),
_onRemoved = new StreamController();
bool Function(dynamic, dynamic) _compare;
Timer _timer;
@override
@ -39,15 +71,43 @@ class Poll extends Service {
@override
Stream get onRemoved => _onRemoved.stream;
Poll(this.inner, Duration interval,
{this.idField: 'id', this.asPaginated: false, bool compare(a, b)}) {
_timer = new Timer.periodic(interval, _timerCallback);
_compare = compare ?? (a, b) => a[idField ?? 'id'] == b[idField ?? 'id'];
PollingService(this.inner, Duration interval,
{this.checkForCreated: true,
this.checkForModified: true,
this.checkForRemoved: true,
this.idField: 'id',
this.asPaginated: false,
EqualityBy compareId,
this.compareItems: const MapEquality()})
: compareId = compareId ?? new EqualityBy((map) => map[idField ?? 'id']) {
_timer = new Timer.periodic(interval, (_) {
index().catchError(_onIndexed.addError);
});
var streams = <Stream, StreamController>{
inner.onRead: _onRead,
inner.onCreated: _onCreated,
inner.onModified: _onModified,
inner.onUpdated: _onUpdated,
inner.onRemoved: _onRemoved,
};
streams.forEach((stream, ctrl) {
_subs.add(stream.listen(ctrl.add, onError: ctrl.addError));
});
_subs.add(
inner.onIndexed.listen(
_handleIndexed,
onError: _onIndexed.addError,
),
);
}
@override
Future close() async {
_timer.cancel();
_subs.forEach((s) => s.cancel());
_onIndexed.close();
_onRead.close();
_onCreated.close();
@ -59,35 +119,134 @@ class Poll extends Service {
@override
Future index([Map params]) {
return inner.index().then((data) {
var items = asPaginated == true ? data['data'] : data;
_items
..clear()
..addAll(items);
_onIndexed.add(items);
return asPaginated == true ? data['data'] : data;
});
}
@override
Future remove(id, [Map params]) {}
Future remove(id, [Map params]) {
return inner.remove(id, params).then((result) {
_items.remove(result);
return result;
}).catchError(_onRemoved.addError);
}
_handleUpdate(result) {
int index = -1;
for (int i = 0; i < _items.length; i++) {
if (compareId.equals(_items[i], result)) {
index = i;
break;
}
}
if (index > -1) {
_items[index] = result;
}
return result;
}
@override
Future update(id, data, [Map params]) {}
Future update(id, data, [Map params]) {
return inner
.update(id, data, params)
.then(_handleUpdate)
.catchError(_onUpdated.addError);
}
@override
Future modify(id, data, [Map params]) {}
Future modify(id, data, [Map params]) {
return inner
.modify(id, data, params)
.then(_handleUpdate)
.catchError(_onModified.addError);
}
@override
Future create(data, [Map params]) {}
Future create(data, [Map params]) {
return inner.create(data, params).then((result) {
_items.add(result);
return result;
}).catchError(_onCreated.addError);
}
@override
Future read(id, [Map params]) {}
Future read(id, [Map params]) {
return inner.read(id, params);
}
void _timerCallback(Timer timer) {
index().then((data) {
var items = asPaginated == true ? data['data'] : data;
void _handleIndexed(data) {
var items = asPaginated == true ? data['data'] : data;
bool changesComputed = false;
// TODO: Check create, modify, remove
if (checkForCreated != false) {
var newItems = <int, dynamic>{};
}).catchError(_onIndexed.addError);
for (int i = 0; i < items.length; i++) {
var item = items[i];
if (!_items.any((i) => compareId.equals(i, item))) {
newItems[i] = item;
}
}
newItems.forEach((index, item) {
_items.insert(index, item);
_onCreated.add(item);
});
changesComputed = newItems.isNotEmpty;
}
if (checkForRemoved != false) {
var removedItems = <int, dynamic>{};
for (int i = 0; i < _items.length; i++) {
var item = _items[i];
if (!items.any((i) => compareId.equals(i, item))) {
removedItems[i] = item;
}
}
removedItems.forEach((index, item) {
_items.removeAt(index);
_onRemoved.add(item);
});
changesComputed = changesComputed || removedItems.isNotEmpty;
}
if (checkForModified != false) {
var modifiedItems = <int, dynamic>{};
for (var item in items) {
for (int i = 0; i < _items.length; i++) {
var localItem = _items[i];
if (compareId.equals(item, localItem)) {
if (!compareItems.equals(item, localItem)) {
modifiedItems[i] = item;
}
break;
}
}
}
modifiedItems.forEach((index, item) {
_onModified.add(_items[index] = item);
});
changesComputed = changesComputed || modifiedItems.isNotEmpty;
}
if (!changesComputed) {
_items
..clear()
..addAll(items);
_onIndexed.add(items);
}
}
}

View file

@ -1,6 +1,14 @@
name: angel_poll
version: 1.0.0
description: package:angel_client support for "realtime" interactions with Angel via long polling.
author: Tobe O <thosakwe@gmail.com>
environment:
sdk: ">=1.19.0"
homepage: https://github.com/angel-dart/poll
dependencies:
angel_client: ^1.0.0
async: ">=1.10.0 <3.0.0"
collection: ^1.0.0
dev_dependencies:
angel_test: ^1.1.0
test: ^0.12.0

106
test/all_test.dart Normal file
View file

@ -0,0 +1,106 @@
import 'dart:async';
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_poll/angel_poll.dart';
import 'package:angel_test/angel_test.dart';
import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:test/test.dart';
void main() {
srv.Service store;
TestClient client;
PollingService pollingService;
setUp(() async {
var app = new srv.Angel();
app.logger = new Logger.detached('angel_poll')
..onRecord.listen((rec) {
print(rec);
if (rec.error != null) {
print(rec.error);
print(rec.stackTrace);
}
});
store = app.use(
'/api/todos',
new srv.MapService(
autoIdAndDateFields: false,
),
);
client = await connectTo(app);
pollingService = new PollingService(
client.service('api/todos'),
const Duration(milliseconds: 100),
);
});
tearDown(() => client.close());
group('events', () {
var created;
StreamQueue onCreated, onModified, onRemoved;
setUp(() async {
onCreated = new StreamQueue(pollingService.onCreated);
onModified = new StreamQueue(pollingService.onModified);
onRemoved = new StreamQueue(pollingService.onRemoved);
created = await store.create({
'id': '0',
'text': 'Clean your room',
'completed': false,
});
});
tearDown(() {
onCreated.cancel();
onModified.cancel();
onRemoved.cancel();
});
test('fires indexed', () async {
var indexed = await pollingService.index();
print(indexed);
expect(await pollingService.onIndexed.first, indexed);
});
test('fires created', () async {
var result = await onCreated.next;
print(result);
expect(created, result);
});
test('fires modified', () async {
await pollingService.index();
await store.modify('0', {
'text': 'go to school',
});
var result = await onModified.next;
print(result);
expect(result, new Map.from(created)..['text'] = 'go to school');
});
test('manual modify', () async {
await pollingService.index();
await pollingService.modify('0', {
'text': 'eat',
});
var result = await onModified.next;
print(result);
expect(result, new Map.from(created)..['text'] = 'eat');
});
test('fires removed', () async {
await pollingService.index();
var removed = await store.remove('0');
var result = await onRemoved.next;
print(result);
expect(result, removed);
});
});
}