Compare commits
No commits in common. "6c8ca004bac9d71d2bb11babbe4a5a91af7c0fb7" and "42596c1026040841862c623882ab9aea01ae4eb2" have entirely different histories.
6c8ca004ba
...
42596c1026
5 changed files with 160 additions and 47 deletions
17
core/queue/test/helpers/app_event.dart
Normal file
17
core/queue/test/helpers/app_event.dart
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
abstract class AppEvent {
|
||||||
|
String get timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
class EmptyEvent extends AppEvent {
|
||||||
|
@override
|
||||||
|
String get timestamp => DateTime.now().toIso8601String();
|
||||||
|
}
|
||||||
|
|
||||||
|
class EventCompletionEvent extends AppEvent {
|
||||||
|
final AppEvent completedEvent;
|
||||||
|
|
||||||
|
EventCompletionEvent(this.completedEvent);
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get timestamp => DateTime.now().toIso8601String();
|
||||||
|
}
|
97
core/queue/test/helpers/dummy_event_bus.dart
Normal file
97
core/queue/test/helpers/dummy_event_bus.dart
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
import 'dart:async';
|
||||||
|
import 'package:angel3_event_bus/event_bus.dart';
|
||||||
|
import 'package:angel3_reactivex/subjects.dart';
|
||||||
|
import 'package:logger/logger.dart';
|
||||||
|
import 'app_event.dart' hide AppEvent, EmptyEvent;
|
||||||
|
import 'history_entry.dart' hide EventBusHistoryEntry;
|
||||||
|
import 'subscription.dart' hide Subscription, Responder;
|
||||||
|
|
||||||
|
class DummyEventBus implements IEventBus {
|
||||||
|
final _logger = Logger();
|
||||||
|
final _lastEventSubject = BehaviorSubject<AppEvent>();
|
||||||
|
final _inProgress = BehaviorSubject<List<AppEvent>>.seeded([]);
|
||||||
|
final List<EventBusHistoryEntry> _history = [];
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get isBusy => false;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<bool> get isBusy$ => Stream.value(false);
|
||||||
|
|
||||||
|
@override
|
||||||
|
AppEvent? get last => null;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<AppEvent?> get last$ => _lastEventSubject.distinct();
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<List<AppEvent>> get inProgress$ => _inProgress.stream;
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<EventBusHistoryEntry> get history => List.unmodifiable(_history);
|
||||||
|
|
||||||
|
@override
|
||||||
|
void fire(AppEvent event) {
|
||||||
|
_lastEventSubject.add(event);
|
||||||
|
_logger.d(' ⚡️ [${event.timestamp}] $event');
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void watch(AppEvent event) {
|
||||||
|
fire(event);
|
||||||
|
_inProgress.add([..._inProgress.value, event]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void complete(AppEvent event, {AppEvent? nextEvent}) {
|
||||||
|
final newArr = _inProgress.value.toList()..removeWhere((e) => e == event);
|
||||||
|
_inProgress.add(newArr);
|
||||||
|
if (nextEvent != null) {
|
||||||
|
fire(nextEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool isInProgress<T>() {
|
||||||
|
return _inProgress.value.whereType<T>().isNotEmpty;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<T> on<T extends AppEvent>() {
|
||||||
|
if (T == dynamic) {
|
||||||
|
return _lastEventSubject.stream as Stream<T>;
|
||||||
|
} else {
|
||||||
|
return _lastEventSubject.stream.where((event) => event is T).cast<T>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Subscription respond<T>(Responder<T> responder) {
|
||||||
|
return Subscription(_lastEventSubject).respond<T>(responder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<bool> whileInProgress<T extends AppEvent>() {
|
||||||
|
return _inProgress.map((events) {
|
||||||
|
return events.whereType<T>().isNotEmpty;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void clearHistory() {
|
||||||
|
_history.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void reset() {
|
||||||
|
clearHistory();
|
||||||
|
_inProgress.add([]);
|
||||||
|
_lastEventSubject.add(EmptyEvent());
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void dispose() {
|
||||||
|
_inProgress.close();
|
||||||
|
_lastEventSubject.close();
|
||||||
|
}
|
||||||
|
}
|
8
core/queue/test/helpers/history_entry.dart
Normal file
8
core/queue/test/helpers/history_entry.dart
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
import 'app_event.dart';
|
||||||
|
|
||||||
|
class EventBusHistoryEntry {
|
||||||
|
final AppEvent event;
|
||||||
|
final String timestamp;
|
||||||
|
|
||||||
|
EventBusHistoryEntry(this.event, this.timestamp);
|
||||||
|
}
|
20
core/queue/test/helpers/subscription.dart
Normal file
20
core/queue/test/helpers/subscription.dart
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
import 'package:angel3_reactivex/subjects.dart';
|
||||||
|
import 'app_event.dart';
|
||||||
|
|
||||||
|
typedef Responder<T> = void Function(T event);
|
||||||
|
|
||||||
|
class Subscription {
|
||||||
|
final BehaviorSubject<AppEvent> _subject;
|
||||||
|
|
||||||
|
Subscription(this._subject);
|
||||||
|
|
||||||
|
void respond<T>(Responder<T> responder) {
|
||||||
|
_subject.stream.where((event) => event is T).listen((event) {
|
||||||
|
responder(event as T);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void dispose() {
|
||||||
|
// Implement disposal logic if needed
|
||||||
|
}
|
||||||
|
}
|
|
@ -40,15 +40,12 @@ void main() {
|
||||||
// Setup for EventBus mock
|
// Setup for EventBus mock
|
||||||
when(eventBus.fire(any)).thenAnswer((_) => Future<void>.value());
|
when(eventBus.fire(any)).thenAnswer((_) => Future<void>.value());
|
||||||
|
|
||||||
// Setup for MQClient mock
|
// // Setup for MQClient mock
|
||||||
when(mq.sendMessage(
|
// when(mq.sendMessage(
|
||||||
message: anyNamed('message'),
|
// message: any,
|
||||||
exchangeName: anyNamed('exchangeName'),
|
// exchangeName: any,
|
||||||
routingKey: anyNamed('routingKey'),
|
// routingKey: any,
|
||||||
)).thenAnswer((_) {
|
// )).thenAnswer((_) => Future<void>.value());
|
||||||
print("Debug: Mock sendMessage called");
|
|
||||||
// Notice we're not returning anything here
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
test('pushOn calls push with correct arguments', () async {
|
test('pushOn calls push with correct arguments', () async {
|
||||||
|
@ -84,7 +81,6 @@ void main() {
|
||||||
test('enqueueUsing publishes message and fires events', () async {
|
test('enqueueUsing publishes message and fires events', () async {
|
||||||
when(container.has<TransactionManager>()).thenReturn(false);
|
when(container.has<TransactionManager>()).thenReturn(false);
|
||||||
|
|
||||||
print("Debug: Before enqueueUsing");
|
|
||||||
await queue.enqueueUsing(
|
await queue.enqueueUsing(
|
||||||
'test_job',
|
'test_job',
|
||||||
'test_payload',
|
'test_payload',
|
||||||
|
@ -92,41 +88,19 @@ void main() {
|
||||||
null,
|
null,
|
||||||
(payload, queue, delay) async => 'job_id',
|
(payload, queue, delay) async => 'job_id',
|
||||||
);
|
);
|
||||||
print("Debug: After enqueueUsing");
|
|
||||||
|
|
||||||
// Verify all method calls in order
|
// Verify that events were fired
|
||||||
verifyInOrder([
|
|
||||||
eventBus.fire(argThat(isA<JobQueueingEvent>())),
|
|
||||||
eventBus.fire(argThat(isA<JobQueuedEvent>())),
|
|
||||||
mq.sendMessage(
|
|
||||||
message: anyNamed('message'),
|
|
||||||
exchangeName: anyNamed('exchangeName'),
|
|
||||||
routingKey: anyNamed('routingKey'),
|
|
||||||
),
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Print captured arguments for sendMessage
|
|
||||||
final sendMessageCall = verify(mq.sendMessage(
|
|
||||||
message: captureAnyNamed('message'),
|
|
||||||
exchangeName: captureAnyNamed('exchangeName'),
|
|
||||||
routingKey: captureAnyNamed('routingKey'),
|
|
||||||
));
|
|
||||||
|
|
||||||
if (sendMessageCall.captured.isNotEmpty) {
|
|
||||||
print("sendMessage was called with:");
|
|
||||||
print("message: ${sendMessageCall.captured[0]}");
|
|
||||||
print("exchangeName: ${sendMessageCall.captured[1]}");
|
|
||||||
print("routingKey: ${sendMessageCall.captured[2]}");
|
|
||||||
} else {
|
|
||||||
print("sendMessage was not called");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Additional verifications
|
|
||||||
verify(eventBus.fire(any)).called(2);
|
verify(eventBus.fire(any)).called(2);
|
||||||
|
|
||||||
|
// More specific verification
|
||||||
|
verify(eventBus.fire(argThat(isA<JobQueueingEvent>()))).called(1);
|
||||||
|
verify(eventBus.fire(argThat(isA<JobQueuedEvent>()))).called(1);
|
||||||
|
|
||||||
|
// Verify that message was sent
|
||||||
verify(mq.sendMessage(
|
verify(mq.sendMessage(
|
||||||
message: anyNamed('message'),
|
message: argThat(isA<Message>()),
|
||||||
exchangeName: anyNamed('exchangeName'),
|
exchangeName: '',
|
||||||
routingKey: anyNamed('routingKey'),
|
routingKey: 'test_queue',
|
||||||
)).called(1);
|
)).called(1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -174,12 +148,10 @@ class TestQueue extends Queue {
|
||||||
Duration? delay,
|
Duration? delay,
|
||||||
Future<dynamic> Function(String, String?, Duration?) callback,
|
Future<dynamic> Function(String, String?, Duration?) callback,
|
||||||
) async {
|
) async {
|
||||||
eventBus.fire(JobQueueingEvent(connectionName, queue, job, payload, delay));
|
await raiseJobQueueingEvent(queue, job, payload, delay);
|
||||||
final result = await callback(payload, queue, delay);
|
final result = await callback(payload, queue, delay);
|
||||||
eventBus.fire(
|
await raiseJobQueuedEvent(queue, result, job, payload, delay);
|
||||||
JobQueuedEvent(connectionName, queue, result, job, payload, delay));
|
|
||||||
|
|
||||||
print("Attempting to send message..."); // Debug print
|
|
||||||
mq.sendMessage(
|
mq.sendMessage(
|
||||||
message: Message(
|
message: Message(
|
||||||
id: 'test-id',
|
id: 'test-id',
|
||||||
|
@ -190,7 +162,6 @@ class TestQueue extends Queue {
|
||||||
exchangeName: '',
|
exchangeName: '',
|
||||||
routingKey: queue ?? 'default',
|
routingKey: queue ?? 'default',
|
||||||
);
|
);
|
||||||
print("Message sent."); // Debug print
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue