2017-02-22 03:13:08 +00:00
|
|
|
import 'dart:async';
|
|
|
|
import 'package:angel_framework/angel_framework.dart';
|
|
|
|
import 'package:json_god/json_god.dart' as god;
|
2017-03-02 04:29:29 +00:00
|
|
|
import 'package:rethinkdb_driver2/rethinkdb_driver2.dart';
|
2017-02-22 03:13:08 +00:00
|
|
|
|
|
|
|
// Extends a RethinkDB query.
|
|
|
|
typedef RqlQuery QueryCallback(RqlQuery query);
|
|
|
|
|
|
|
|
/// Queries a single RethinkDB table or query.
|
|
|
|
class RethinkService extends Service {
|
|
|
|
/// If set to `true`, clients can remove all items by passing a `null` `id` to `remove`.
|
|
|
|
///
|
|
|
|
/// `false` by default.
|
|
|
|
final bool allowRemoveAll;
|
|
|
|
|
|
|
|
/// If set to `true`, parameters in `req.query` are applied to the database query.
|
|
|
|
final bool allowQuery;
|
|
|
|
|
|
|
|
final bool debug;
|
|
|
|
|
|
|
|
/// If set to `true`, then a HookedService mounted over this instance
|
|
|
|
/// will fire events when RethinkDB pushes events.
|
|
|
|
///
|
|
|
|
/// Good for scaling. ;)
|
|
|
|
final bool listenForChanges;
|
|
|
|
|
|
|
|
final Connection connection;
|
|
|
|
|
|
|
|
/// Doesn't actually have to be a table, just a RethinkDB query.
|
|
|
|
///
|
|
|
|
/// However, a table is the most common usecase.
|
|
|
|
final RqlQuery table;
|
|
|
|
|
|
|
|
RethinkService(this.connection, this.table,
|
|
|
|
{this.allowRemoveAll: false,
|
|
|
|
this.allowQuery: true,
|
|
|
|
this.debug: false,
|
|
|
|
this.listenForChanges: true})
|
|
|
|
: super() {}
|
|
|
|
|
2017-02-22 22:02:17 +00:00
|
|
|
RqlQuery buildQuery(RqlQuery initialQuery, Map params) {
|
2017-02-22 03:13:08 +00:00
|
|
|
if (params != null)
|
2017-02-22 22:02:17 +00:00
|
|
|
params['broadcast'] = params.containsKey('broadcast')
|
|
|
|
? params['broadcast']
|
|
|
|
: (listenForChanges != true);
|
2017-02-22 03:13:08 +00:00
|
|
|
|
2017-02-22 22:02:17 +00:00
|
|
|
var q = _getQueryInner(initialQuery, params);
|
2017-02-22 03:13:08 +00:00
|
|
|
|
|
|
|
if (params?.containsKey('reql') == true && params['reql'] is QueryCallback)
|
|
|
|
q = params['reql'](q);
|
|
|
|
|
2017-02-22 22:02:17 +00:00
|
|
|
return q ?? initialQuery;
|
2017-02-22 03:13:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
RqlQuery _getQueryInner(RqlQuery query, Map params) {
|
|
|
|
if (params == null || !params.containsKey('query'))
|
|
|
|
return null;
|
|
|
|
else {
|
|
|
|
if (params['query'] is RqlQuery)
|
|
|
|
return params['query'];
|
|
|
|
else if (params['query'] is QueryCallback)
|
|
|
|
return params['query'](table);
|
|
|
|
else if (params['query'] is! Map || allowQuery != true)
|
|
|
|
return query;
|
|
|
|
else {
|
|
|
|
Map q = params['query'];
|
2017-03-07 21:23:09 +00:00
|
|
|
return q.keys.fold<RqlQuery>(query, (out, key) {
|
2017-02-22 03:13:08 +00:00
|
|
|
var val = q[key];
|
|
|
|
|
|
|
|
if (val is RequestContext ||
|
|
|
|
val is ResponseContext ||
|
|
|
|
key == 'provider' ||
|
|
|
|
val is Providers)
|
|
|
|
return out;
|
|
|
|
else {
|
2017-03-07 21:23:09 +00:00
|
|
|
return out.filter({key.toString(): val});
|
2017-02-22 03:13:08 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Future _sendQuery(RqlQuery query) async {
|
|
|
|
var result = await query.run(connection);
|
|
|
|
|
|
|
|
if (result is Cursor)
|
|
|
|
return await result.toList();
|
|
|
|
else if (result is Map && result['generated_keys'] is List) {
|
|
|
|
if (result['generated_keys'].length == 1)
|
|
|
|
return await read(result['generated_keys'].first);
|
|
|
|
return await Future.wait(result['generated_keys'].map(read));
|
|
|
|
} else
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2017-02-22 22:02:17 +00:00
|
|
|
_serialize(data) {
|
2017-02-22 03:13:08 +00:00
|
|
|
if (data is Map)
|
|
|
|
return data;
|
2017-02-22 22:02:17 +00:00
|
|
|
else if (data is Iterable)
|
|
|
|
return data.map(_serialize).toList();
|
2017-02-22 03:13:08 +00:00
|
|
|
else
|
|
|
|
return god.serializeObject(data);
|
|
|
|
}
|
|
|
|
|
2017-02-22 22:02:17 +00:00
|
|
|
_squeeze(data) {
|
|
|
|
if (data is Map)
|
|
|
|
return data.keys.fold<Map>({}, (map, k) => map..[k.toString()] = data[k]);
|
|
|
|
else if (data is Iterable)
|
|
|
|
return data.map(_squeeze).toList();
|
|
|
|
else
|
|
|
|
return data;
|
2017-02-22 03:13:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void onHooked(HookedService hookedService) {
|
|
|
|
if (listenForChanges == true) {
|
|
|
|
listenToQuery(table, hookedService);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Future listenToQuery(RqlQuery query, HookedService hookedService) async {
|
|
|
|
Feed feed = await query.changes({'include_types': true}).run(connection);
|
|
|
|
|
|
|
|
feed.listen((Map event) {
|
|
|
|
String type = event['type']?.toString();
|
|
|
|
var newVal = event['new_val'], oldVal = event['old_val'];
|
|
|
|
|
|
|
|
if (type == 'add') {
|
|
|
|
// Create
|
2017-02-23 01:08:02 +00:00
|
|
|
hookedService.fireEvent(
|
|
|
|
hookedService.afterCreated,
|
|
|
|
new HookedServiceEvent(
|
|
|
|
true, null, null, this, HookedServiceEvent.CREATED,
|
|
|
|
result: newVal));
|
2017-02-22 03:13:08 +00:00
|
|
|
} else if (type == 'change') {
|
|
|
|
// Update
|
2017-02-23 01:08:02 +00:00
|
|
|
hookedService.fireEvent(
|
|
|
|
hookedService.afterCreated,
|
|
|
|
new HookedServiceEvent(
|
|
|
|
true, null, null, this, HookedServiceEvent.UPDATED,
|
2017-03-04 21:08:15 +00:00
|
|
|
result: newVal, id: oldVal['id'], data: newVal));
|
2017-02-22 03:13:08 +00:00
|
|
|
} else if (type == 'remove') {
|
|
|
|
// Remove
|
2017-02-23 01:08:02 +00:00
|
|
|
hookedService.fireEvent(
|
|
|
|
hookedService.afterCreated,
|
|
|
|
new HookedServiceEvent(
|
|
|
|
true, null, null, this, HookedServiceEventREMOVED,
|
|
|
|
result: oldVal, id: oldVal['id']));
|
2017-02-22 03:13:08 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future index([Map params]) async {
|
2017-02-22 22:02:17 +00:00
|
|
|
var query = buildQuery(table, params);
|
2017-02-22 03:13:08 +00:00
|
|
|
return await _sendQuery(query);
|
|
|
|
}
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future read(id, [Map params]) async {
|
2017-02-22 22:02:17 +00:00
|
|
|
var query = buildQuery(table.get(id?.toString()), params);
|
2017-02-22 03:13:08 +00:00
|
|
|
var found = await _sendQuery(query);
|
2017-03-07 22:06:26 +00:00
|
|
|
//print('Found for $id: $found');
|
2017-02-22 03:13:08 +00:00
|
|
|
|
|
|
|
if (found == null) {
|
|
|
|
throw new AngelHttpException.notFound(
|
|
|
|
message: 'No record found for ID $id');
|
|
|
|
} else
|
|
|
|
return found;
|
|
|
|
}
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future create(data, [Map params]) async {
|
2017-02-22 22:02:17 +00:00
|
|
|
if (table is! Table) throw new AngelHttpException.methodNotAllowed();
|
2017-02-22 03:13:08 +00:00
|
|
|
|
|
|
|
var d = _serialize(data);
|
|
|
|
var q = table as Table;
|
2017-02-22 22:02:17 +00:00
|
|
|
var query = buildQuery(q.insert(_squeeze(d)), params);
|
2017-02-22 03:13:08 +00:00
|
|
|
return await _sendQuery(query);
|
|
|
|
}
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future modify(id, data, [Map params]) => update(id, data, params);
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future update(id, data, [Map params]) async {
|
2017-02-22 22:02:17 +00:00
|
|
|
var query = buildQuery(table.get(id?.toString()), params).update(data);
|
2017-02-22 03:13:08 +00:00
|
|
|
await _sendQuery(query);
|
|
|
|
return await read(id, params);
|
|
|
|
}
|
|
|
|
|
|
|
|
@override
|
|
|
|
Future remove(id, [Map params]) async {
|
|
|
|
if (id == null ||
|
|
|
|
id == 'null' &&
|
|
|
|
(allowRemoveAll == true ||
|
|
|
|
params?.containsKey('provider') != true)) {
|
|
|
|
return await _sendQuery(table.delete());
|
|
|
|
} else {
|
|
|
|
var prior = await read(id, params);
|
2017-02-22 22:02:17 +00:00
|
|
|
var query = buildQuery(table.get(id), params).delete();
|
2017-02-22 03:13:08 +00:00
|
|
|
await _sendQuery(query);
|
|
|
|
return prior;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|