platform/packages/framework/lib/src/safe_stream_controller.dart

124 lines
2.5 KiB
Dart
Raw Normal View History

import 'dart:async';
2021-04-10 15:12:43 +00:00
typedef _InitCallback = void Function();
/// A [StreamController] boilerplate that prevents memory leaks.
abstract class SafeCtrl<T> {
2019-05-02 22:48:31 +00:00
factory SafeCtrl() => _SingleSafeCtrl();
2019-05-02 22:48:31 +00:00
factory SafeCtrl.broadcast() => _BroadcastSafeCtrl();
Stream<T> get stream;
void add(T event);
2021-03-20 08:11:18 +00:00
void addError(error, [StackTrace? stackTrace]);
Future close();
2021-05-15 17:58:51 +00:00
void whenInitialized(void Function() callback);
}
class _SingleSafeCtrl<T> implements SafeCtrl<T> {
2021-03-20 08:11:18 +00:00
late StreamController<T> _stream;
bool _hasListener = false, _initialized = false;
2021-03-20 08:11:18 +00:00
_InitCallback? _initializer;
_SingleSafeCtrl() {
2019-05-02 22:48:31 +00:00
_stream = StreamController<T>(onListen: () {
_hasListener = true;
if (!_initialized && _initializer != null) {
2021-03-20 08:11:18 +00:00
_initializer!();
_initialized = true;
}
}, onPause: () {
_hasListener = false;
}, onResume: () {
_hasListener = true;
}, onCancel: () {
_hasListener = false;
});
}
@override
Stream<T> get stream => _stream.stream;
@override
void add(T event) {
if (_hasListener) _stream.add(event);
}
@override
2021-03-20 08:11:18 +00:00
void addError(error, [StackTrace? stackTrace]) {
if (_hasListener) _stream.addError(error as Object, stackTrace);
}
@override
Future close() {
return _stream.close();
}
@override
2021-05-15 17:58:51 +00:00
void whenInitialized(void Function() callback) {
if (!_initialized) {
2019-05-31 03:49:00 +00:00
if (!_hasListener) {
_initializer = callback;
2019-05-31 03:49:00 +00:00
} else {
2021-03-20 08:11:18 +00:00
_initializer!();
_initialized = true;
}
}
}
}
class _BroadcastSafeCtrl<T> implements SafeCtrl<T> {
2021-03-20 08:11:18 +00:00
late StreamController<T> _stream;
int _listeners = 0;
bool _initialized = false;
2021-03-20 08:11:18 +00:00
_InitCallback? _initializer;
_BroadcastSafeCtrl() {
2019-05-02 22:48:31 +00:00
_stream = StreamController<T>.broadcast(onListen: () {
_listeners++;
if (!_initialized && _initializer != null) {
2021-03-20 08:11:18 +00:00
_initializer!();
_initialized = true;
}
}, onCancel: () {
_listeners--;
});
}
@override
Stream<T> get stream => _stream.stream;
@override
void add(T event) {
if (_listeners > 0) _stream.add(event);
}
@override
2021-03-20 08:11:18 +00:00
void addError(error, [StackTrace? stackTrace]) {
if (_listeners > 0) _stream.addError(error as Object, stackTrace);
}
@override
Future close() {
return _stream.close();
}
@override
2021-05-15 17:58:51 +00:00
void whenInitialized(void Function() callback) {
if (!_initialized) {
2019-05-31 03:49:00 +00:00
if (_listeners <= 0) {
_initializer = callback;
2019-05-31 03:49:00 +00:00
} else {
2021-03-20 08:11:18 +00:00
_initializer!();
_initialized = true;
}
}
}
}