platform/packages/pub_sub/lib/src/json_rpc/client.dart

144 lines
4.1 KiB
Dart
Raw Normal View History

2021-03-08 12:56:39 +00:00
import 'dart:async';
import 'package:stream_channel/stream_channel.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc_2;
import 'package:uuid/uuid.dart';
import '../../pub_sub.dart';
/// A [Client] implementation that communicates via JSON RPC 2.0.
class JsonRpc2Client extends Client {
final Map<String, Completer<Map>> _requests = {};
final List<_JsonRpc2ClientSubscription> _subscriptions = [];
2021-05-01 01:54:59 +00:00
final Uuid _uuid = Uuid();
2021-03-08 12:56:39 +00:00
2021-05-01 01:54:59 +00:00
json_rpc_2.Peer? _peer;
2021-03-08 12:56:39 +00:00
/// The ID of the client we are authenticating as.
///
/// May be `null`, if and only if we are marked as a trusted source on
/// the server side.
2021-05-01 01:54:59 +00:00
String? get clientId => _clientId;
String? _clientId;
2021-03-08 12:56:39 +00:00
2021-05-01 01:54:59 +00:00
JsonRpc2Client(String? clientId, StreamChannel<String> channel) {
2021-03-08 12:56:39 +00:00
_clientId = clientId;
2021-05-01 01:54:59 +00:00
_peer = json_rpc_2.Peer(channel);
2021-03-08 12:56:39 +00:00
2021-05-01 01:54:59 +00:00
_peer!.registerMethod('event', (json_rpc_2.Parameters params) {
String? eventName = params['event_name'].asString;
var event = params['value'].value;
2021-03-08 12:56:39 +00:00
for (var s in _subscriptions.where((s) => s.eventName == eventName)) {
if (!s._stream.isClosed) s._stream.add(event);
}
});
2021-05-01 01:54:59 +00:00
_peer!.registerFallback((json_rpc_2.Parameters params) {
2021-03-08 12:56:39 +00:00
var c = _requests.remove(params.method);
2021-05-01 01:54:59 +00:00
if (c == null) {
throw json_rpc_2.RpcException.methodNotFound(params.method);
} else {
2021-03-08 12:56:39 +00:00
var data = params.asMap;
if (data['status'] is! bool) {
c.completeError(
2021-05-01 01:54:59 +00:00
FormatException('The server sent an invalid response.'));
2021-03-08 12:56:39 +00:00
} else if (!(data['status'] as bool)) {
2021-05-01 01:54:59 +00:00
c.completeError(PubSubException(data['error_message']?.toString() ??
2021-03-08 12:56:39 +00:00
'The server sent a failure response, but did not provide an error message.'));
} else {
c.complete(data);
}
}
});
2021-05-01 01:54:59 +00:00
_peer!.listen();
2021-03-08 12:56:39 +00:00
}
@override
Future publish(String eventName, value) {
2021-05-01 01:54:59 +00:00
var c = Completer<Map>();
2021-03-08 12:56:39 +00:00
var requestId = _uuid.v4();
_requests[requestId] = c;
2021-05-01 01:54:59 +00:00
_peer!.sendNotification('publish', {
2021-03-08 12:56:39 +00:00
'request_id': requestId,
'client_id': clientId,
'event_name': eventName,
'value': value
});
return c.future.then((data) {
2021-05-01 01:54:59 +00:00
_clientId = data['result']['client_id'] as String?;
2021-03-08 12:56:39 +00:00
});
}
@override
Future<ClientSubscription> subscribe(String eventName) {
2021-05-01 01:54:59 +00:00
var c = Completer<Map>();
2021-03-08 12:56:39 +00:00
var requestId = _uuid.v4();
_requests[requestId] = c;
2021-05-01 01:54:59 +00:00
_peer!.sendNotification('subscribe', {
2021-03-08 12:56:39 +00:00
'request_id': requestId,
'client_id': clientId,
'event_name': eventName
});
return c.future.then<ClientSubscription>((result) {
2021-05-01 01:54:59 +00:00
_clientId = result['client_id'] as String?;
var s = _JsonRpc2ClientSubscription(
eventName, result['subscription_id'] as String?, this);
2021-03-08 12:56:39 +00:00
_subscriptions.add(s);
return s;
});
}
@override
Future close() {
2021-05-01 01:54:59 +00:00
if (_peer?.isClosed != true) _peer!.close();
2021-03-08 12:56:39 +00:00
for (var c in _requests.values) {
if (!c.isCompleted) {
2021-05-01 01:54:59 +00:00
c.completeError(StateError(
2021-03-08 12:56:39 +00:00
'The client was closed before the server responded to this request.'));
}
}
for (var s in _subscriptions) s._close();
_requests.clear();
2021-05-01 01:54:59 +00:00
return Future.value();
2021-03-08 12:56:39 +00:00
}
}
class _JsonRpc2ClientSubscription extends ClientSubscription {
2021-05-01 01:54:59 +00:00
final StreamController _stream = StreamController();
final String? eventName, id;
2021-03-08 12:56:39 +00:00
final JsonRpc2Client client;
_JsonRpc2ClientSubscription(this.eventName, this.id, this.client);
void _close() {
if (!_stream.isClosed) _stream.close();
}
@override
2021-05-01 01:54:59 +00:00
StreamSubscription listen(void onData(event)?,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
2021-03-08 12:56:39 +00:00
return _stream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
Future unsubscribe() {
2021-05-01 01:54:59 +00:00
var c = Completer<Map>();
2021-03-08 12:56:39 +00:00
var requestId = client._uuid.v4();
client._requests[requestId] = c;
2021-05-01 01:54:59 +00:00
client._peer!.sendNotification('unsubscribe', {
2021-03-08 12:56:39 +00:00
'request_id': requestId,
'client_id': client.clientId,
'subscription_id': id
});
return c.future.then((_) {
_close();
});
}
}