import 'dart:async'; import 'package:angel3_reactivex/angel3_reactivex.dart'; import 'package:test/test.dart'; import '../utils.dart'; void main() { group('DoStreamTranformer', () { test('calls onDone when the stream is finished', () async { var onDoneCalled = false; final stream = Stream.empty().doOnDone(() => onDoneCalled = true); await expectLater(stream, emitsDone); await expectLater(onDoneCalled, isTrue); }); test('calls onError when an error is emitted', () async { var onErrorCalled = false; final stream = Stream.error(Exception()) .doOnError((e, s) => onErrorCalled = true); await expectLater(stream, emitsError(isException)); await expectLater(onErrorCalled, isTrue); }); test( 'onError only called once when an error is emitted on a broadcast stream', () async { var count = 0; final subject = BehaviorSubject(sync: true); final stream = subject.stream.doOnError((e, s) => count++); stream.listen(null, onError: (dynamic e, dynamic s) {}); stream.listen(null, onError: (dynamic e, dynamic s) {}); subject.addError(Exception()); subject.addError(Exception()); await expectLater(count, 2); await subject.close(); }); test('calls onCancel when the subscription is cancelled', () async { var onCancelCalled = false; final stream = Stream.value(1); await stream .doOnCancel(() => onCancelCalled = true) .listen(null) .cancel(); await expectLater(onCancelCalled, isTrue); }); test('awaits onCancel when the subscription is cancelled', () async { var onCancelCompleted = 10, onCancelHandled = 10, eventSequenceCount = 0; final stream = Stream.value(1); await stream .doOnCancel(() => Future.delayed(const Duration(milliseconds: 100)) .whenComplete(() => onCancelHandled = ++eventSequenceCount)) .listen(null) .cancel() .whenComplete(() => onCancelCompleted = ++eventSequenceCount); await expectLater(onCancelCompleted > onCancelHandled, isTrue); }); test( 'onCancel called only once when the subscription is multiple listeners', () async { var count = 0; final subject = BehaviorSubject(sync: true); final stream = subject.doOnCancel(() => count++); await stream.listen(null).cancel(); await stream.listen(null).cancel(); await expectLater(count, 2); await subject.close(); }); test('calls onData when the stream emits an item', () async { var onDataCalled = false; final stream = Stream.value(1).doOnData((_) => onDataCalled = true); await expectLater(stream, emits(1)); await expectLater(onDataCalled, isTrue); }); test('onData only emits once for broadcast streams with multiple listeners', () async { final actual = []; final controller = StreamController.broadcast(sync: true); final stream = controller.stream.transform(DoStreamTransformer(onData: actual.add)); stream.listen(null); stream.listen(null); controller.add(1); controller.add(2); await expectLater(actual, const [1, 2]); await controller.close(); }); test('onData only emits once for subjects with multiple listeners', () async { final actual = []; final controller = BehaviorSubject(sync: true); final stream = controller.stream.transform(DoStreamTransformer(onData: actual.add)); stream.listen(null); stream.listen(null); controller.add(1); controller.add(2); await expectLater(actual, const [1, 2]); await controller.close(); }); test('onData only emits correctly with ReplaySubject', () async { final controller = ReplaySubject(sync: true) ..add(1) ..add(2); final actual = []; await controller.close(); expect(await controller.stream.doOnData(actual.add).drain(actual), const [1, 2]); actual.clear(); expect(await controller.stream.doOnData(actual.add).drain(actual), const [1, 2]); }); test('emits onEach Notifications for Data, Error, and Done', () async { StackTrace? stacktrace; final actual = >[]; final exception = Exception(); final stream = Stream.value(1) .concatWith([Stream.error(exception)]).doOnEach((notification) { actual.add(notification); if (notification.isError) { stacktrace = notification.errorAndStackTraceOrNull?.stackTrace; } }); await expectLater(stream, emitsInOrder([1, emitsError(isException), emitsDone])); await expectLater(actual, [ StreamNotification.data(1), StreamNotification.error(exception, stacktrace), StreamNotification.done() ]); }); test('onEach only emits once for broadcast streams with multiple listeners', () async { var count = 0; final controller = StreamController.broadcast(sync: true); final stream = controller.stream.transform(DoStreamTransformer(onEach: (_) { count++; })); stream.listen(null); stream.listen(null); controller.add(1); controller.add(2); await expectLater(count, 2); await controller.close(); }); test('calls onListen when a consumer listens', () async { var onListenCalled = false; final stream = Stream.empty().doOnListen(() { onListenCalled = true; }); await expectLater(stream, emitsDone); await expectLater(onListenCalled, isTrue); }); test( 'calls onListen once when multiple subscribers open, without cancelling', () async { var onListenCallCount = 0; final sc = StreamController.broadcast() ..add(1) ..add(2) ..add(3); final stream = sc.stream.doOnListen(() => onListenCallCount++); stream.listen(null); stream.listen(null); await expectLater(onListenCallCount, 1); await sc.close(); }); test( 'calls onListen every time after all previous subscribers have cancelled', () async { var onListenCallCount = 0; final sc = StreamController.broadcast() ..add(1) ..add(2) ..add(3); final stream = sc.stream.doOnListen(() => onListenCallCount++); await stream.listen(null).cancel(); await stream.listen(null).cancel(); await expectLater(onListenCallCount, 2); await sc.close(); }); test('calls onPause and onResume when the subscription is', () async { var onPauseCalled = false, onResumeCalled = false; final stream = Stream.value(1).doOnPause(() { onPauseCalled = true; }).doOnResume(() { onResumeCalled = true; }); stream.listen(null, onDone: expectAsync0(() { expect(onPauseCalled, isTrue); expect(onResumeCalled, isTrue); })) ..pause() ..resume(); }); test('should be reusable', () async { var callCount = 0; final transformer = DoStreamTransformer(onData: (_) { callCount++; }); final streamA = Stream.value(1).transform(transformer), streamB = Stream.value(1).transform(transformer); await expectLater(streamA, emitsInOrder([1, emitsDone])); await expectLater(streamB, emitsInOrder([1, emitsDone])); expect(callCount, 2); }); test('throws an error when no arguments are provided', () { expect(() => DoStreamTransformer(), throwsArgumentError); }); test('should propagate errors', () { Stream.value(1) .doOnListen(() => throw Exception('catch me if you can! doOnListen')) .listen( null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), ), ); Stream.value(1) .doOnData((_) => throw Exception('catch me if you can! doOnData')) .listen( null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), ), ); Stream.error(Exception('oh noes!')) .doOnError( (_, __) => throw Exception('catch me if you can! doOnError')) .listen( null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), count: 2, ), ); // a cancel() call may occur after the controller is already closed // in that case, the error is forwarded to the current [Zone] runZonedGuarded( () { Stream.value(1) .doOnCancel(() => throw Exception('catch me if you can! doOnCancel-zoned')) .listen(null); Stream.value(1) .doOnCancel( () => throw Exception('catch me if you can! doOnCancel')) .listen(null) .cancel(); }, expectAsync2( (Object e, StackTrace s) => expect(e, isException), count: 2, ), ); Stream.value(1) .doOnDone(() => throw Exception('catch me if you can! doOnDone')) .listen( null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), ), ); Stream.value(1) .doOnEach((_) => throw Exception('catch me if you can! doOnEach')) .listen( null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), count: 2, ), ); Stream.value(1) .doOnPause(() => throw Exception('catch me if you can! doOnPause')) .listen(null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException), )) ..pause() ..resume(); Stream.value(1) .doOnResume(() => throw Exception('catch me if you can! doOnResume')) .listen(null, onError: expectAsync2( (Exception e, StackTrace s) => expect(e, isException))) ..pause() ..resume(); }); test( 'doOnListen correctly allows subscribing multiple times on a broadcast stream', () { final controller = StreamController.broadcast(); final stream = controller.stream.doOnListen(() { // do nothing }); controller.close(); expectLater(stream, emitsDone); expectLater(stream, emitsDone); }); test('issue/389/1', () { final controller = StreamController.broadcast(); final stream = controller.stream.doOnListen(() { // do nothing }); expectLater(stream, emitsDone); expectLater(stream, emitsDone); // #issue/389 : is being ignored/hangs up controller.close(); }); test('issue/389/2', () { final controller = StreamController(); var isListening = false; final stream = controller.stream.doOnListen(() { isListening = true; }); controller.close(); // should be done expectLater(stream, emitsDone); // should have called onX expect(isListening, true); // should not be converted to a broadcast Stream expect(() => stream.listen(null), throwsStateError); }); test('Rx.do accidental broadcast', () async { final controller = StreamController(); final stream = controller.stream.doOnEach((_) {}); stream.listen(null); expect(() => stream.listen(null), throwsStateError); controller.add(1); }); test('nested doOnX', () async { final completer = Completer(); final stream = Rx.range(0, 30).interval(const Duration(milliseconds: 100)); final result = []; const expectedOutput = [ 'A: 0', 'B: 0', 'pause', 'A: 1', 'B: 1', 'A: 2', 'B: 2', 'A: 3', 'B: 3', 'A: 4', 'B: 4', 'A: 5', 'B: 5', 'pause', 'A: 6', 'B: 6', 'A: 7', 'B: 7', 'A: 8', 'B: 8', 'A: 9', 'B: 9', 'A: 10', 'B: 10', 'pause', 'A: 11', 'B: 11', 'A: 12', 'B: 12', 'A: 13', 'B: 13', 'A: 14', 'B: 14', 'A: 15', 'B: 15', 'pause', 'A: 16', 'B: 16', 'A: 17', ]; late StreamSubscription subscription; void addToResult(String value) { result.add(value); if (result.length == expectedOutput.length) { subscription.cancel(); completer.complete(); } } subscription = Stream.value(1) .exhaustMap((_) => stream.doOnData((data) => addToResult('A: $data'))) .doOnPause(() => addToResult('pause')) .doOnData((data) => addToResult('B: $data')) .take(expectedOutput.length) .listen((value) { if (value % 5 == 0) { subscription.pause(Future.delayed(const Duration(seconds: 2))); } }); await completer.future; expect(result, expectedOutput); }); test('doOnData nullable', () { nullableTest( (s) => s.doOnData((d) {}), ); }); }); }