import 'dart:async'; import 'package:angel3_reactivex/angel3_reactivex.dart'; import 'package:test/test.dart'; List> _getStreams() { var a = Stream.fromIterable(const [0, 1, 2]), b = Stream.fromIterable(const [3, 4, 5]); return [a, b]; } List> _getStreamsIncludingEmpty() { var a = Stream.fromIterable(const [0, 1, 2]), b = Stream.fromIterable(const [3, 4, 5]), c = Stream.empty(); return [c, a, b]; } void main() { test('Rx.concat', () async { const expectedOutput = [0, 1, 2, 3, 4, 5]; var count = 0; final stream = Rx.concat(_getStreams()); stream.listen(expectAsync1((result) { // test to see if the combined output matches expect(result, expectedOutput[count++]); }, count: expectedOutput.length)); }); test('Rx.concatEager.single.subscription', () async { final stream = Rx.concat(_getStreams()); stream.listen(null); await expectLater(() => stream.listen(null), throwsA(isStateError)); }); test('Rx.concat.withEmptyStream', () async { const expectedOutput = [0, 1, 2, 3, 4, 5]; var count = 0; final stream = Rx.concat(_getStreamsIncludingEmpty()); stream.listen(expectAsync1((result) { // test to see if the combined output matches expect(result, expectedOutput[count++]); }, count: expectedOutput.length)); }); test('Rx.concat.withBroadcastStreams', () async { const expectedOutput = [1, 2, 3, 4]; final ctrlA = StreamController.broadcast(), ctrlB = StreamController.broadcast(), ctrlC = StreamController.broadcast(); var x = 0, y = 100, z = 1000, count = 0; Timer.periodic(const Duration(milliseconds: 1), (_) { ctrlA.add(++x); ctrlB.add(--y); if (x <= 3) ctrlC.add(--z); if (x == 3) ctrlC.close(); if (x == 4) { _.cancel(); ctrlA.close(); ctrlB.close(); } }); final stream = Rx.concat([ctrlA.stream, ctrlB.stream, ctrlC.stream]); stream.listen(expectAsync1((result) { // test to see if the combined output matches expect(result, expectedOutput[count++]); }, count: expectedOutput.length)); }); test('Rx.concat.asBroadcastStream', () async { final stream = Rx.concat(_getStreams()).asBroadcastStream(); // listen twice on same stream stream.listen(null); stream.listen(null); // code should reach here await expectLater(stream.isBroadcast, isTrue); }); test('Rx.concat.error.shouldThrowA', () async { final streamWithError = Rx.concat(_getStreams()..add(Stream.error(Exception()))); streamWithError.listen(null, onError: expectAsync2((Exception e, StackTrace s) { expect(e, isException); })); }); test('Rx.concat.empty', () { expect(Rx.concat(const []), emitsDone); }); test('Rx.concat.single', () { expect( Rx.concat([Stream.value(1)]), emitsInOrder([1, emitsDone]), ); }); test('Rx.concat.iterate.once', () async { var iterationCount = 0; final stream = Rx.concat(() sync* { ++iterationCount; yield Stream.value(1); yield Stream.value(2); yield Stream.value(3); }()); await expectLater( stream, emitsInOrder([ 1, 2, 3, emitsDone, ]), ); expect(iterationCount, 1); }); }