platform/packages/websocket/lib/base_websocket_client.dart

467 lines
14 KiB
Dart
Raw Normal View History

2016-12-23 10:47:21 +00:00
import 'dart:async';
2017-04-17 11:03:42 +00:00
import 'dart:collection';
2018-10-02 15:32:06 +00:00
import 'dart:convert';
2016-12-23 10:47:21 +00:00
import 'package:angel_client/angel_client.dart';
2017-09-24 04:37:58 +00:00
import 'package:angel_client/base_angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
2016-12-23 10:47:21 +00:00
import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
2016-12-23 20:57:46 +00:00
import 'package:web_socket_channel/status.dart' as status;
2016-12-23 10:47:21 +00:00
import 'angel_websocket.dart';
2019-01-06 02:41:46 +00:00
import 'constants.dart';
2016-12-23 10:47:21 +00:00
2021-04-10 15:12:43 +00:00
final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
2016-12-23 10:47:21 +00:00
2016-12-23 20:57:46 +00:00
/// An [Angel] client that operates across WebSockets.
2016-12-23 10:47:21 +00:00
abstract class BaseWebSocketClient extends BaseAngelClient {
2021-04-26 00:47:32 +00:00
Duration? _reconnectInterval;
WebSocketChannel? _socket;
2021-02-21 02:47:23 +00:00
final Queue<WebSocketAction> _queue = Queue<WebSocketAction>();
2016-12-23 10:47:21 +00:00
2021-02-21 02:47:23 +00:00
final StreamController _onData = StreamController();
2016-12-24 01:45:52 +00:00
final StreamController<WebSocketEvent> _onAllEvents =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketEvent>();
2017-02-28 14:15:34 +00:00
final StreamController<AngelAuthResult> _onAuthenticated =
2021-02-21 02:47:23 +00:00
StreamController<AngelAuthResult>();
2016-12-23 20:57:46 +00:00
final StreamController<AngelHttpException> _onError =
2021-02-21 02:47:23 +00:00
StreamController<AngelHttpException>();
2016-12-23 20:57:46 +00:00
final StreamController<Map<String, WebSocketEvent>> _onServiceEvent =
2021-02-21 02:47:23 +00:00
StreamController<Map<String, WebSocketEvent>>.broadcast();
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketChannelException>
_onWebSocketChannelException =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketChannelException>();
2016-12-23 20:57:46 +00:00
2016-12-24 01:45:52 +00:00
/// Use this to handle events that are not standard.
2021-02-21 02:47:23 +00:00
final WebSocketExtraneousEventHandler on = WebSocketExtraneousEventHandler();
2016-12-24 01:45:52 +00:00
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
2017-02-28 14:15:34 +00:00
/// Fired whenever a WebSocket is successfully authenticated.
2021-04-10 15:12:43 +00:00
@override
2017-02-28 14:15:34 +00:00
Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream;
2016-12-23 20:57:46 +00:00
/// A broadcast stream of data coming from the [socket].
///
/// Mostly just for internal use.
Stream get onData => _onData.stream;
/// Fired on errors.
Stream<AngelHttpException> get onError => _onError.stream;
/// Fired whenever an event is fired by a service.
Stream<Map<String, WebSocketEvent>> get onServiceEvent =>
_onServiceEvent.stream;
/// Fired on [WebSocketChannelException]s.
Stream<WebSocketChannelException> get onWebSocketChannelException =>
_onWebSocketChannelException.stream;
2016-12-23 10:47:21 +00:00
/// The [WebSocketChannel] underneath this instance.
2021-04-26 00:47:32 +00:00
WebSocketChannel? get socket => _socket;
2016-12-23 10:47:21 +00:00
2017-02-28 14:15:34 +00:00
/// If `true` (default), then the client will automatically try to reconnect to the server
/// if the socket closes.
final bool reconnectOnClose;
/// The amount of time to wait between reconnect attempts. Default: 10 seconds.
2021-04-26 00:47:32 +00:00
Duration? get reconnectInterval => _reconnectInterval;
2017-02-28 14:15:34 +00:00
2021-04-26 00:47:32 +00:00
Uri? _wsUri;
2019-01-06 02:41:46 +00:00
/// The [Uri] to which a websocket should point.
Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl);
static Uri _toWsUri(Uri u) {
if (u.hasScheme) {
if (u.scheme == 'http') {
return u.replace(scheme: 'ws');
} else if (u.scheme == 'https') {
return u.replace(scheme: 'wss');
} else {
return u;
}
} else {
return _toWsUri(u.replace(scheme: Uri.base.scheme));
}
}
BaseWebSocketClient(http.BaseClient client, baseUrl,
2021-04-26 00:47:32 +00:00
{this.reconnectOnClose = true, Duration? reconnectInterval})
2019-01-06 02:41:46 +00:00
: super(client, baseUrl) {
2021-02-21 02:47:23 +00:00
_reconnectInterval = reconnectInterval ?? Duration(seconds: 10);
2017-02-28 14:15:34 +00:00
}
2016-12-23 20:57:46 +00:00
@override
2017-02-28 14:15:34 +00:00
Future close() async {
on._close();
2019-05-01 22:58:47 +00:00
scheduleMicrotask(() async {
2021-04-26 00:47:32 +00:00
await _socket!.sink.close(status.goingAway);
2019-05-01 22:58:47 +00:00
await _onData.close();
await _onAllEvents.close();
await _onAuthenticated.close();
await _onError.close();
await _onServiceEvent.close();
await _onWebSocketChannelException.close();
});
2017-02-28 14:15:34 +00:00
}
2016-12-23 10:47:21 +00:00
2017-02-28 14:15:34 +00:00
/// Connects the WebSocket. [timeout] is optional.
2021-04-26 00:47:32 +00:00
Future<WebSocketChannel?> connect({Duration? timeout}) async {
2017-02-28 14:15:34 +00:00
if (timeout != null) {
2021-02-21 02:47:23 +00:00
var c = Completer<WebSocketChannel>();
2021-04-26 00:47:32 +00:00
late Timer timer;
2017-02-28 14:15:34 +00:00
2021-02-21 02:47:23 +00:00
timer = Timer(timeout, () {
2017-02-28 14:15:34 +00:00
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
2021-02-21 02:47:23 +00:00
c.completeError(TimeoutException(
2017-02-28 14:15:34 +00:00
'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms',
timeout));
}
});
2019-05-01 22:58:47 +00:00
scheduleMicrotask(() {
2021-04-26 00:47:32 +00:00
getConnectedWebSocket().then((socket) {
2019-05-01 22:58:47 +00:00
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
2017-04-17 11:03:42 +00:00
2019-05-01 22:58:47 +00:00
while (_queue.isNotEmpty) {
var action = _queue.removeFirst();
socket.sink.add(serialize(action));
}
2017-04-17 11:03:42 +00:00
2019-05-01 22:58:47 +00:00
c.complete(socket);
}
}).catchError((e, StackTrace st) {
if (!c.isCompleted) {
2021-04-26 00:47:32 +00:00
if (timer.isActive) {
timer.cancel();
}
// TODO: Re-evaluate this error
var obj = 'Error';
c.completeError(obj, st);
2019-05-01 22:58:47 +00:00
}
});
2017-02-28 14:15:34 +00:00
});
return await c.future.then((socket) {
_socket = socket;
listen();
2021-04-10 15:12:43 +00:00
return _socket;
2017-02-28 14:15:34 +00:00
});
} else {
_socket = await getConnectedWebSocket();
listen();
return _socket;
}
2016-12-23 20:57:46 +00:00
}
/// Returns a new [WebSocketChannel], ready to be listened on.
///
/// This should be overriden by child classes, **NOT** [connect].
Future<WebSocketChannel> getConnectedWebSocket();
2016-12-23 10:47:21 +00:00
@override
2021-04-26 00:47:32 +00:00
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
2021-04-10 15:12:43 +00:00
var uri = path.toString().replaceAll(_straySlashes, '');
2021-04-26 00:47:32 +00:00
var wsService = WebSocketsService<Id, Data>(socket, this, uri,
2018-10-21 08:15:51 +00:00
deserializer: deserializer);
2021-04-26 00:47:32 +00:00
return wsService as Service<Id, Data>;
2016-12-23 20:57:46 +00:00
}
/// Starts listening for data.
void listen() {
2021-04-26 00:47:32 +00:00
_socket?.stream.listen(
2017-02-28 14:15:34 +00:00
(data) {
_onData.add(data);
if (data is WebSocketChannelException) {
_onWebSocketChannelException.add(data);
} else if (data is String) {
2018-07-10 16:54:55 +00:00
var jsons = json.decode(data);
2017-02-28 14:15:34 +00:00
2018-07-10 16:54:55 +00:00
if (jsons is Map) {
2021-02-21 02:47:23 +00:00
var event = WebSocketEvent.fromJson(jsons);
2017-02-28 14:15:34 +00:00
if (event.eventName?.isNotEmpty == true) {
_onAllEvents.add(event);
2021-04-26 00:47:32 +00:00
on._getStream(event.eventName)!.add(event);
2017-02-28 14:15:34 +00:00
}
2019-01-06 02:41:46 +00:00
if (event.eventName == errorEvent) {
2018-10-02 15:32:06 +00:00
var error =
2021-02-21 02:47:23 +00:00
AngelHttpException.fromMap((event.data ?? {}) as Map);
2017-02-28 14:15:34 +00:00
_onError.add(error);
2019-01-06 02:41:46 +00:00
} else if (event.eventName == authenticatedEvent) {
2021-04-26 00:47:32 +00:00
var authResult = AngelAuthResult.fromMap(event.data as Map?);
2017-02-28 14:15:34 +00:00
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) {
2021-04-26 00:47:32 +00:00
var split = event.eventName!
2021-04-10 15:12:43 +00:00
.split('::')
2017-02-28 14:15:34 +00:00
.where((str) => str.isNotEmpty)
.toList();
if (split.length >= 2) {
var serviceName = split[0], eventName = split[1];
_onServiceEvent
.add({serviceName: event..eventName = eventName});
}
}
2016-12-23 20:57:46 +00:00
}
}
2017-02-28 14:15:34 +00:00
},
cancelOnError: true,
onDone: () {
2017-04-17 11:03:42 +00:00
_socket = null;
2017-02-28 14:15:34 +00:00
if (reconnectOnClose == true) {
2021-04-26 00:47:32 +00:00
Timer.periodic(reconnectInterval!, (Timer timer) async {
2017-02-28 14:15:34 +00:00
var result;
try {
result = await connect(timeout: reconnectInterval);
} catch (e) {
//
}
if (result != null) timer.cancel();
});
}
});
2016-12-23 20:57:46 +00:00
}
/// Serializes data to JSON.
2021-04-10 15:12:43 +00:00
dynamic serialize(x) => json.encode(x);
2016-12-23 20:57:46 +00:00
/// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) {
2021-04-10 15:12:43 +00:00
if (_socket == null) {
2017-04-17 11:03:42 +00:00
_queue.addLast(action);
2021-04-10 15:12:43 +00:00
} else {
2021-04-26 00:47:32 +00:00
socket!.sink.add(serialize(action));
2021-04-10 15:12:43 +00:00
}
2016-12-23 10:47:21 +00:00
}
2017-02-28 14:15:34 +00:00
/// Attempts to authenticate a WebSocket, using a valid JWT.
2021-04-26 00:47:32 +00:00
void authenticateViaJwt(String? jwt) {
2021-02-21 02:47:23 +00:00
sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: authenticateAction,
params: {
'query': {'jwt': jwt}
},
));
2017-02-28 14:15:34 +00:00
}
2016-12-23 10:47:21 +00:00
}
2016-12-23 20:57:46 +00:00
/// A [Service] that asynchronously interacts with the server.
2021-04-26 00:47:32 +00:00
class WebSocketsService<Id, Data> extends Service<Id, Data?> {
2016-12-23 20:57:46 +00:00
/// The [BaseWebSocketClient] that spawned this service.
2016-12-23 10:47:21 +00:00
@override
2016-12-23 20:57:46 +00:00
final BaseWebSocketClient app;
/// Used to deserialize JSON into typed data.
2021-04-26 00:47:32 +00:00
final AngelDeserializer<Data>? deserializer;
2016-12-23 20:57:46 +00:00
/// The [WebSocketChannel] to listen to, and send data across.
2021-04-26 00:47:32 +00:00
final WebSocketChannel? socket;
2016-12-23 10:47:21 +00:00
2016-12-23 20:57:46 +00:00
/// The service path to listen to.
final String path;
final StreamController<WebSocketEvent> _onAllEvents =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketEvent>();
2021-04-26 00:47:32 +00:00
final StreamController<List<Data?>> _onIndexed = StreamController();
final StreamController<Data?> _onRead = StreamController<Data>();
final StreamController<Data?> _onCreated = StreamController<Data>();
final StreamController<Data?> _onModified = StreamController<Data>();
final StreamController<Data?> _onUpdated = StreamController<Data>();
final StreamController<Data?> _onRemoved = StreamController<Data>();
2016-12-23 10:47:21 +00:00
/// Fired on all events.
2016-12-23 20:57:46 +00:00
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `index` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<List<Data?>> get onIndexed => _onIndexed.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `read` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<Data?> get onRead => _onRead.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `created` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<Data?> get onCreated => _onCreated.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `modified` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<Data?> get onModified => _onModified.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `updated` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<Data?> get onUpdated => _onUpdated.stream;
2016-12-23 10:47:21 +00:00
/// Fired on `removed` events.
2021-04-10 15:12:43 +00:00
@override
2021-04-26 00:47:32 +00:00
Stream<Data?> get onRemoved => _onRemoved.stream;
2016-12-23 10:47:21 +00:00
2017-06-30 23:09:03 +00:00
WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) {
2016-12-23 20:57:46 +00:00
listen();
}
2021-04-10 15:12:43 +00:00
@override
2017-03-02 03:54:58 +00:00
Future close() async {
2019-05-01 22:58:47 +00:00
await _onAllEvents.close();
await _onCreated.close();
await _onIndexed.close();
await _onModified.close();
await _onRead.close();
await _onRemoved.close();
await _onUpdated.close();
2017-03-02 03:54:58 +00:00
}
2016-12-23 20:57:46 +00:00
/// Serializes an [action] to be sent over a WebSocket.
2021-04-10 15:12:43 +00:00
dynamic serialize(WebSocketAction action) => json.encode(action);
2016-12-23 10:47:21 +00:00
2016-12-23 20:57:46 +00:00
/// Deserializes data from a [WebSocketEvent].
2021-04-26 00:47:32 +00:00
Data? deserialize(x) {
return deserializer != null ? deserializer!(x) : x as Data?;
2016-12-23 20:57:46 +00:00
}
/// Deserializes the contents of an [event].
2018-11-04 02:04:50 +00:00
WebSocketEvent<Data> transformEvent(WebSocketEvent event) {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(
2018-11-04 02:04:50 +00:00
eventName: event.eventName, data: deserialize(event.data));
2016-12-23 20:57:46 +00:00
}
/// Starts listening for events.
2016-12-23 10:47:21 +00:00
void listen() {
2016-12-23 20:57:46 +00:00
app.onServiceEvent.listen((map) {
if (map.containsKey(path)) {
2021-04-26 00:47:32 +00:00
var event = map[path]!;
2016-12-23 20:57:46 +00:00
_onAllEvents.add(event);
2019-01-06 02:41:46 +00:00
if (event.eventName == indexedEvent) {
2018-11-04 02:04:50 +00:00
var d = event.data;
2021-02-21 02:47:23 +00:00
var transformed = WebSocketEvent(
2018-11-04 02:04:50 +00:00
eventName: event.eventName,
data: d is Iterable ? d.map(deserialize).toList() : null);
2021-04-26 00:47:32 +00:00
if (transformed.data != null) {
_onIndexed.add(transformed.data!);
}
2018-11-04 02:04:50 +00:00
return;
}
var transformed = transformEvent(event).data;
2016-12-23 20:57:46 +00:00
switch (event.eventName) {
2019-01-06 02:41:46 +00:00
case readEvent:
2016-12-23 20:57:46 +00:00
_onRead.add(transformed);
break;
2019-01-06 02:41:46 +00:00
case createdEvent:
2016-12-23 20:57:46 +00:00
_onCreated.add(transformed);
break;
2019-01-06 02:41:46 +00:00
case modifiedEvent:
2016-12-23 20:57:46 +00:00
_onModified.add(transformed);
break;
2019-01-06 02:41:46 +00:00
case updatedEvent:
2016-12-23 20:57:46 +00:00
_onUpdated.add(transformed);
break;
2019-01-06 02:41:46 +00:00
case removedEvent:
2016-12-23 20:57:46 +00:00
_onRemoved.add(transformed);
break;
}
}
2016-12-23 10:47:21 +00:00
});
}
2016-12-23 20:57:46 +00:00
/// Sends the given [action] on the [socket].
void send(WebSocketAction action) {
2017-04-17 11:03:42 +00:00
app.sendAction(action);
2016-12-23 20:57:46 +00:00
}
2016-12-23 10:47:21 +00:00
@override
2021-04-26 00:47:32 +00:00
Future<List<Data>?> index([Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$indexAction', params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
@override
2021-04-26 00:47:32 +00:00
Future<Data?> read(id, [Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$readAction',
2018-10-02 15:32:06 +00:00
id: id.toString(),
params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
@override
2021-04-26 00:47:32 +00:00
Future<Data?> create(data, [Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$createAction', data: data, params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
@override
2021-04-26 00:47:32 +00:00
Future<Data?> modify(id, data, [Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$modifyAction',
2018-07-10 16:54:55 +00:00
id: id.toString(),
2017-02-12 03:12:20 +00:00
data: data,
2017-04-17 11:03:42 +00:00
params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
@override
2021-04-26 00:47:32 +00:00
Future<Data?> update(id, data, [Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$updateAction',
2018-07-10 16:54:55 +00:00
id: id.toString(),
2017-02-12 03:12:20 +00:00
data: data,
2017-04-17 11:03:42 +00:00
params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
@override
2021-04-26 00:47:32 +00:00
Future<Data?> remove(id, [Map<String, dynamic>? params]) async {
2021-02-21 02:47:23 +00:00
app.sendAction(WebSocketAction(
2019-01-06 02:41:46 +00:00
eventName: '$path::$removeAction',
2018-10-02 15:32:06 +00:00
id: id.toString(),
params: params ?? {}));
2016-12-23 20:57:46 +00:00
return null;
2016-12-23 10:47:21 +00:00
}
2017-12-10 05:31:34 +00:00
2017-12-21 20:15:47 +00:00
/// No longer necessary.
@deprecated
Service unwrap() => this;
2016-12-23 10:47:21 +00:00
}
2016-12-23 20:57:46 +00:00
/// Contains a dynamic Map of [WebSocketEvent] streams.
2016-12-23 10:47:21 +00:00
class WebSocketExtraneousEventHandler {
2021-04-26 00:47:32 +00:00
final Map<String?, StreamController<WebSocketEvent>> _events = {};
2016-12-23 10:47:21 +00:00
2021-04-26 00:47:32 +00:00
StreamController<WebSocketEvent>? _getStream(String? index) {
2021-04-10 15:12:43 +00:00
if (_events[index] == null) {
2021-02-21 02:47:23 +00:00
_events[index] = StreamController<WebSocketEvent>();
2021-04-10 15:12:43 +00:00
}
2016-12-23 20:57:46 +00:00
return _events[index];
}
2016-12-24 01:45:52 +00:00
Stream<WebSocketEvent> operator [](String index) {
2021-04-10 15:12:43 +00:00
if (_events[index] == null) {
2021-02-21 02:47:23 +00:00
_events[index] = StreamController<WebSocketEvent>();
2021-04-10 15:12:43 +00:00
}
2016-12-23 10:47:21 +00:00
2021-04-26 00:47:32 +00:00
return _events[index]!.stream;
2016-12-23 10:47:21 +00:00
}
void _close() {
_events.values.forEach((s) => s.close());
}
2018-10-02 15:32:06 +00:00
}