Test: adding checkpoint for passing test

This commit is contained in:
Patrick Stewart 2024-10-05 19:35:50 -07:00
parent 3d04bbb561
commit 3ab3a4261c
9 changed files with 956 additions and 8 deletions

View file

@ -18,6 +18,10 @@ import 'should_queue_after_commit.dart';
abstract class Queue with InteractsWithTime {
/// The IoC container instance.
final Container container;
final EventBus eventBus;
final MQClient mq;
final Subject<dynamic> jobSubject;
final Uuid uuid = Uuid();
/// The connection name for the queue.
String _connectionName;
@ -28,9 +32,20 @@ abstract class Queue with InteractsWithTime {
/// The create payload callbacks.
static final List<Function> _createPayloadCallbacks = [];
Queue(this.container,
Queue(this.container, this.eventBus, this.mq,
{String connectionName = 'default', this.dispatchAfterCommit = false})
: _connectionName = connectionName;
: _connectionName = connectionName,
jobSubject = PublishSubject<dynamic>() {
_setupJobObservable();
}
void _setupJobObservable() {
jobSubject.stream.listen((job) {
// Process the job
print('Processing job: $job');
// Implement your job processing logic here
});
}
Future<dynamic> pushOn(String queue, dynamic job, [dynamic data = '']) {
return push(job, data, queue);
@ -197,19 +212,36 @@ abstract class Queue with InteractsWithTime {
Duration? delay,
Future<dynamic> Function(String, String?, Duration?) callback,
) async {
final String jobId = uuid.v4(); // Generate a unique job ID
if (shouldDispatchAfterCommit(job) && container.has<TransactionManager>()) {
return container.make<TransactionManager>().addCallback(() async {
await raiseJobQueueingEvent(queue, job, payload, delay);
final jobId = await callback(payload, queue, delay);
final result = await callback(payload, queue, delay);
await raiseJobQueuedEvent(queue, jobId, job, payload, delay);
return jobId;
return result;
});
}
await raiseJobQueueingEvent(queue, job, payload, delay);
final jobId = await callback(payload, queue, delay);
final result = await callback(payload, queue, delay);
await raiseJobQueuedEvent(queue, jobId, job, payload, delay);
return jobId;
// Use angel3_mq to publish the job
mq.sendMessage(
message: Message(
headers: {'jobId': jobId}, // Include jobId in headers
payload: payload,
timestamp: DateTime.now().toIso8601String(),
),
exchangeName: '', // Use default exchange
routingKey: queue ?? 'default',
);
// Use angel3_reactivex to add the job to the subject
jobSubject.add(job);
return result;
}
bool shouldDispatchAfterCommit(dynamic job) {
@ -249,9 +281,51 @@ abstract class Queue with InteractsWithTime {
Container getContainer() => container;
// Abstract methods to be implemented by subclasses
Future<dynamic> push(dynamic job, [dynamic data = '', String? queue]);
// Implement the push method
Future<dynamic> push(dynamic job, [dynamic data = '', String? queue]) async {
final payload = await createPayload(job, queue ?? 'default', data);
return enqueueUsing(job, payload, queue, null, (payload, queue, _) async {
final jobId = Uuid().v4();
mq.sendMessage(
message: Message(
id: jobId,
headers: {},
payload: payload,
timestamp: DateTime.now().toIso8601String(),
),
exchangeName: '',
routingKey: queue ?? 'default',
);
return jobId;
});
}
// Implement the later method
Future<dynamic> later(Duration delay, dynamic job,
[dynamic data = '', String? queue]);
[dynamic data = '', String? queue]) async {
final payload = await createPayload(job, queue ?? 'default', data);
return enqueueUsing(job, payload, queue, delay,
(payload, queue, delay) async {
final jobId = Uuid().v4();
await Future.delayed(delay!);
mq.sendMessage(
message: Message(
id: jobId,
headers: {},
payload: payload,
timestamp: DateTime.now().toIso8601String(),
),
exchangeName: '',
routingKey: queue ?? 'default',
);
return jobId;
});
}
// Cleanup method
void dispose() {
jobSubject.close();
}
}
// Additional interfaces and classes

View file

@ -18,5 +18,8 @@ dependencies:
crypto: ^3.0.5
dev_dependencies:
build_runner: ^2.3.3
build_test: ^2.1.0
lints: ^3.0.0
mockito: ^5.0.0
test: ^1.24.0

View file

@ -0,0 +1,17 @@
abstract class AppEvent {
String get timestamp;
}
class EmptyEvent extends AppEvent {
@override
String get timestamp => DateTime.now().toIso8601String();
}
class EventCompletionEvent extends AppEvent {
final AppEvent completedEvent;
EventCompletionEvent(this.completedEvent);
@override
String get timestamp => DateTime.now().toIso8601String();
}

View file

@ -0,0 +1,97 @@
import 'dart:async';
import 'package:angel3_event_bus/event_bus.dart';
import 'package:angel3_reactivex/subjects.dart';
import 'package:logger/logger.dart';
import 'app_event.dart' hide AppEvent, EmptyEvent;
import 'history_entry.dart' hide EventBusHistoryEntry;
import 'subscription.dart' hide Subscription, Responder;
class DummyEventBus implements IEventBus {
final _logger = Logger();
final _lastEventSubject = BehaviorSubject<AppEvent>();
final _inProgress = BehaviorSubject<List<AppEvent>>.seeded([]);
final List<EventBusHistoryEntry> _history = [];
@override
bool get isBusy => false;
@override
Stream<bool> get isBusy$ => Stream.value(false);
@override
AppEvent? get last => null;
@override
Stream<AppEvent?> get last$ => _lastEventSubject.distinct();
@override
Stream<List<AppEvent>> get inProgress$ => _inProgress.stream;
@override
List<EventBusHistoryEntry> get history => List.unmodifiable(_history);
@override
void fire(AppEvent event) {
_lastEventSubject.add(event);
_logger.d(' ⚡️ [${event.timestamp}] $event');
}
@override
void watch(AppEvent event) {
fire(event);
_inProgress.add([..._inProgress.value, event]);
}
@override
void complete(AppEvent event, {AppEvent? nextEvent}) {
final newArr = _inProgress.value.toList()..removeWhere((e) => e == event);
_inProgress.add(newArr);
if (nextEvent != null) {
fire(nextEvent);
}
}
@override
bool isInProgress<T>() {
return _inProgress.value.whereType<T>().isNotEmpty;
}
@override
Stream<T> on<T extends AppEvent>() {
if (T == dynamic) {
return _lastEventSubject.stream as Stream<T>;
} else {
return _lastEventSubject.stream.where((event) => event is T).cast<T>();
}
}
@override
Subscription respond<T>(Responder<T> responder) {
return Subscription(_lastEventSubject).respond<T>(responder);
}
@override
Stream<bool> whileInProgress<T extends AppEvent>() {
return _inProgress.map((events) {
return events.whereType<T>().isNotEmpty;
});
}
@override
void clearHistory() {
_history.clear();
}
@override
void reset() {
clearHistory();
_inProgress.add([]);
_lastEventSubject.add(EmptyEvent());
}
@override
void dispose() {
_inProgress.close();
_lastEventSubject.close();
}
}

View file

@ -0,0 +1,8 @@
import 'app_event.dart';
class EventBusHistoryEntry {
final AppEvent event;
final String timestamp;
EventBusHistoryEntry(this.event, this.timestamp);
}

View file

@ -0,0 +1,20 @@
import 'package:angel3_reactivex/subjects.dart';
import 'app_event.dart';
typedef Responder<T> = void Function(T event);
class Subscription {
final BehaviorSubject<AppEvent> _subject;
Subscription(this._subject);
void respond<T>(Responder<T> responder) {
_subject.stream.where((event) => event is T).listen((event) {
responder(event as T);
});
}
void dispose() {
// Implement disposal logic if needed
}
}

View file

@ -0,0 +1,65 @@
import 'package:test/test.dart';
import 'package:mockito/annotations.dart';
import 'package:mockito/mockito.dart';
import 'package:angel3_container/angel3_container.dart';
import 'package:angel3_event_bus/event_bus.dart';
import 'package:angel3_mq/mq.dart';
import 'package:angel3_queue/src/queue.dart';
import 'queue_test.mocks.dart';
@GenerateMocks([Container, MQClient, TransactionManager])
void main() {
late MockContainer container;
late EventBus eventBus;
late MockMQClient mq;
late TestQueue queue;
setUpAll(() {
// Provide a dummy EventBus for Mockito to use
provideDummy<EventBus>(DummyEventBus());
});
setUp(() {
container = MockContainer();
eventBus = DummyEventBus();
mq = MockMQClient();
queue = TestQueue(container, eventBus, mq);
// Basic setup
when(container.has<EventBus>()).thenReturn(true);
when(container.has<TransactionManager>()).thenReturn(false);
when(container.make<EventBus>()).thenReturn(eventBus);
});
test('pushOn calls push with correct arguments', () async {
final result = await queue.pushOn('test_queue', 'test_job', 'test_data');
expect(result, equals('pushed'));
});
test('laterOn calls later with correct arguments', () async {
final result = await queue.laterOn(
'test_queue', Duration(minutes: 5), 'test_job', 'test_data');
expect(result, equals('pushed later'));
});
}
class TestQueue extends Queue {
TestQueue(Container container, EventBus eventBus, MQClient mq)
: super(container, eventBus, mq);
@override
Future<dynamic> push(dynamic job, [dynamic data = '', String? queue]) async {
return 'pushed';
}
@override
Future<dynamic> later(Duration delay, dynamic job,
[dynamic data = '', String? queue]) async {
return 'pushed later';
}
}
class DummyEventBus implements EventBus {
@override
dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
}

View file

@ -0,0 +1,664 @@
// Mocks generated by Mockito 5.4.4 from annotations
// in angel3_queue/test/queue_test.dart.
// Do not manually edit this file.
// ignore_for_file: no_leading_underscores_for_library_prefixes
import 'dart:async' as _i4;
import 'package:angel3_container/angel3_container.dart' as _i3;
import 'package:angel3_container/src/reflector.dart' as _i2;
import 'package:angel3_event_bus/event_bus.dart' as _i7;
import 'package:angel3_event_bus/res/app_event.dart' as _i8;
import 'package:angel3_event_bus/res/history_entry.dart' as _i9;
import 'package:angel3_event_bus/res/subscription.dart' as _i5;
import 'package:angel3_mq/mq.dart' as _i10;
import 'package:angel3_mq/src/core/constants/enums.dart' as _i12;
import 'package:angel3_mq/src/message/message.dart' as _i11;
import 'package:angel3_queue/src/queue.dart' as _i13;
import 'package:mockito/mockito.dart' as _i1;
import 'package:mockito/src/dummies.dart' as _i6;
// ignore_for_file: type=lint
// ignore_for_file: avoid_redundant_argument_values
// ignore_for_file: avoid_setters_without_getters
// ignore_for_file: comment_references
// ignore_for_file: deprecated_member_use
// ignore_for_file: deprecated_member_use_from_same_package
// ignore_for_file: implementation_imports
// ignore_for_file: invalid_use_of_visible_for_testing_member
// ignore_for_file: prefer_const_constructors
// ignore_for_file: unnecessary_parenthesis
// ignore_for_file: camel_case_types
// ignore_for_file: subtype_of_sealed_class
class _FakeReflector_0 extends _i1.SmartFake implements _i2.Reflector {
_FakeReflector_0(
Object parent,
Invocation parentInvocation,
) : super(
parent,
parentInvocation,
);
}
class _FakeContainer_1 extends _i1.SmartFake implements _i3.Container {
_FakeContainer_1(
Object parent,
Invocation parentInvocation,
) : super(
parent,
parentInvocation,
);
}
class _FakeFuture_2<T1> extends _i1.SmartFake implements _i4.Future<T1> {
_FakeFuture_2(
Object parent,
Invocation parentInvocation,
) : super(
parent,
parentInvocation,
);
}
class _FakeSubscription_3 extends _i1.SmartFake implements _i5.Subscription {
_FakeSubscription_3(
Object parent,
Invocation parentInvocation,
) : super(
parent,
parentInvocation,
);
}
/// A class which mocks [Container].
///
/// See the documentation for Mockito's code generation for more information.
class MockContainer extends _i1.Mock implements _i3.Container {
MockContainer() {
_i1.throwOnMissingStub(this);
}
@override
_i2.Reflector get reflector => (super.noSuchMethod(
Invocation.getter(#reflector),
returnValue: _FakeReflector_0(
this,
Invocation.getter(#reflector),
),
) as _i2.Reflector);
@override
bool get isRoot => (super.noSuchMethod(
Invocation.getter(#isRoot),
returnValue: false,
) as bool);
@override
_i3.Container createChild() => (super.noSuchMethod(
Invocation.method(
#createChild,
[],
),
returnValue: _FakeContainer_1(
this,
Invocation.method(
#createChild,
[],
),
),
) as _i3.Container);
@override
bool has<T>([Type? t]) => (super.noSuchMethod(
Invocation.method(
#has,
[t],
),
returnValue: false,
) as bool);
@override
bool hasNamed(String? name) => (super.noSuchMethod(
Invocation.method(
#hasNamed,
[name],
),
returnValue: false,
) as bool);
@override
_i4.Future<T> makeAsync<T>([Type? type]) => (super.noSuchMethod(
Invocation.method(
#makeAsync,
[type],
),
returnValue: _i6.ifNotNull(
_i6.dummyValueOrNull<T>(
this,
Invocation.method(
#makeAsync,
[type],
),
),
(T v) => _i4.Future<T>.value(v),
) ??
_FakeFuture_2<T>(
this,
Invocation.method(
#makeAsync,
[type],
),
),
) as _i4.Future<T>);
@override
T make<T>([Type? type]) => (super.noSuchMethod(
Invocation.method(
#make,
[type],
),
returnValue: _i6.dummyValue<T>(
this,
Invocation.method(
#make,
[type],
),
),
) as T);
@override
T Function(_i3.Container) registerLazySingleton<T>(
T Function(_i3.Container)? f, {
Type? as,
}) =>
(super.noSuchMethod(
Invocation.method(
#registerLazySingleton,
[f],
{#as: as},
),
returnValue: (_i3.Container __p0) => _i6.dummyValue<T>(
this,
Invocation.method(
#registerLazySingleton,
[f],
{#as: as},
),
),
) as T Function(_i3.Container));
@override
T Function(_i3.Container) registerFactory<T>(
T Function(_i3.Container)? f, {
Type? as,
}) =>
(super.noSuchMethod(
Invocation.method(
#registerFactory,
[f],
{#as: as},
),
returnValue: (_i3.Container __p0) => _i6.dummyValue<T>(
this,
Invocation.method(
#registerFactory,
[f],
{#as: as},
),
),
) as T Function(_i3.Container));
@override
T registerSingleton<T>(
T? object, {
Type? as,
}) =>
(super.noSuchMethod(
Invocation.method(
#registerSingleton,
[object],
{#as: as},
),
returnValue: _i6.dummyValue<T>(
this,
Invocation.method(
#registerSingleton,
[object],
{#as: as},
),
),
) as T);
@override
T findByName<T>(String? name) => (super.noSuchMethod(
Invocation.method(
#findByName,
[name],
),
returnValue: _i6.dummyValue<T>(
this,
Invocation.method(
#findByName,
[name],
),
),
) as T);
@override
T registerNamedSingleton<T>(
String? name,
T? object,
) =>
(super.noSuchMethod(
Invocation.method(
#registerNamedSingleton,
[
name,
object,
],
),
returnValue: _i6.dummyValue<T>(
this,
Invocation.method(
#registerNamedSingleton,
[
name,
object,
],
),
),
) as T);
@override
void registerScoped<T>(T Function(_i3.Container)? factory) =>
super.noSuchMethod(
Invocation.method(
#registerScoped,
[factory],
),
returnValueForMissingStub: null,
);
@override
void registerTransient<T>(T Function(_i3.Container)? factory) =>
super.noSuchMethod(
Invocation.method(
#registerTransient,
[factory],
),
returnValueForMissingStub: null,
);
@override
void registerConstant<T>(T? value) => super.noSuchMethod(
Invocation.method(
#registerConstant,
[value],
),
returnValueForMissingStub: null,
);
}
/// A class which mocks [EventBus].
///
/// See the documentation for Mockito's code generation for more information.
class MockEventBus extends _i1.Mock implements _i7.EventBus {
MockEventBus() {
_i1.throwOnMissingStub(this);
}
@override
int get maxHistoryLength => (super.noSuchMethod(
Invocation.getter(#maxHistoryLength),
returnValue: 0,
) as int);
@override
bool get allowLogging => (super.noSuchMethod(
Invocation.getter(#allowLogging),
returnValue: false,
) as bool);
@override
Map<Type, List<_i8.AppEvent Function(_i8.AppEvent)>> get map =>
(super.noSuchMethod(
Invocation.getter(#map),
returnValue: <Type, List<_i8.AppEvent Function(_i8.AppEvent)>>{},
) as Map<Type, List<_i8.AppEvent Function(_i8.AppEvent)>>);
@override
bool get isBusy => (super.noSuchMethod(
Invocation.getter(#isBusy),
returnValue: false,
) as bool);
@override
_i4.Stream<bool> get isBusy$ => (super.noSuchMethod(
Invocation.getter(#isBusy$),
returnValue: _i4.Stream<bool>.empty(),
) as _i4.Stream<bool>);
@override
_i4.Stream<_i8.AppEvent?> get last$ => (super.noSuchMethod(
Invocation.getter(#last$),
returnValue: _i4.Stream<_i8.AppEvent?>.empty(),
) as _i4.Stream<_i8.AppEvent?>);
@override
_i4.Stream<List<_i8.AppEvent>> get inProgress$ => (super.noSuchMethod(
Invocation.getter(#inProgress$),
returnValue: _i4.Stream<List<_i8.AppEvent>>.empty(),
) as _i4.Stream<List<_i8.AppEvent>>);
@override
List<_i9.EventBusHistoryEntry> get history => (super.noSuchMethod(
Invocation.getter(#history),
returnValue: <_i9.EventBusHistoryEntry>[],
) as List<_i9.EventBusHistoryEntry>);
@override
void fire(_i8.AppEvent? event) => super.noSuchMethod(
Invocation.method(
#fire,
[event],
),
returnValueForMissingStub: null,
);
@override
void watch(_i8.AppEvent? event) => super.noSuchMethod(
Invocation.method(
#watch,
[event],
),
returnValueForMissingStub: null,
);
@override
void complete(
_i8.AppEvent? event, {
_i8.AppEvent? nextEvent,
}) =>
super.noSuchMethod(
Invocation.method(
#complete,
[event],
{#nextEvent: nextEvent},
),
returnValueForMissingStub: null,
);
@override
bool isInProgress<T>() => (super.noSuchMethod(
Invocation.method(
#isInProgress,
[],
),
returnValue: false,
) as bool);
@override
_i4.Stream<T> on<T extends _i8.AppEvent>() => (super.noSuchMethod(
Invocation.method(
#on,
[],
),
returnValue: _i4.Stream<T>.empty(),
) as _i4.Stream<T>);
@override
_i5.Subscription respond<T>(_i5.Responder<T>? responder) =>
(super.noSuchMethod(
Invocation.method(
#respond,
[responder],
),
returnValue: _FakeSubscription_3(
this,
Invocation.method(
#respond,
[responder],
),
),
) as _i5.Subscription);
@override
_i4.Stream<bool> whileInProgress<T extends _i8.AppEvent>() =>
(super.noSuchMethod(
Invocation.method(
#whileInProgress,
[],
),
returnValue: _i4.Stream<bool>.empty(),
) as _i4.Stream<bool>);
@override
void clearHistory() => super.noSuchMethod(
Invocation.method(
#clearHistory,
[],
),
returnValueForMissingStub: null,
);
@override
void reset() => super.noSuchMethod(
Invocation.method(
#reset,
[],
),
returnValueForMissingStub: null,
);
@override
void dispose() => super.noSuchMethod(
Invocation.method(
#dispose,
[],
),
returnValueForMissingStub: null,
);
}
/// A class which mocks [MQClient].
///
/// See the documentation for Mockito's code generation for more information.
class MockMQClient extends _i1.Mock implements _i10.MQClient {
MockMQClient() {
_i1.throwOnMissingStub(this);
}
@override
String declareQueue(String? queueId) => (super.noSuchMethod(
Invocation.method(
#declareQueue,
[queueId],
),
returnValue: _i6.dummyValue<String>(
this,
Invocation.method(
#declareQueue,
[queueId],
),
),
) as String);
@override
void deleteQueue(String? queueId) => super.noSuchMethod(
Invocation.method(
#deleteQueue,
[queueId],
),
returnValueForMissingStub: null,
);
@override
_i4.Stream<_i11.Message> fetchQueue(String? queueId) => (super.noSuchMethod(
Invocation.method(
#fetchQueue,
[queueId],
),
returnValue: _i4.Stream<_i11.Message>.empty(),
) as _i4.Stream<_i11.Message>);
@override
List<String> listQueues() => (super.noSuchMethod(
Invocation.method(
#listQueues,
[],
),
returnValue: <String>[],
) as List<String>);
@override
void deleteMessage(
String? queueId,
_i11.Message? message,
) =>
super.noSuchMethod(
Invocation.method(
#deleteMessage,
[
queueId,
message,
],
),
returnValueForMissingStub: null,
);
@override
void sendMessage({
required _i11.Message? message,
String? exchangeName,
String? routingKey,
}) =>
super.noSuchMethod(
Invocation.method(
#sendMessage,
[],
{
#message: message,
#exchangeName: exchangeName,
#routingKey: routingKey,
},
),
returnValueForMissingStub: null,
);
@override
_i11.Message? getLatestMessage(String? queueId) =>
(super.noSuchMethod(Invocation.method(
#getLatestMessage,
[queueId],
)) as _i11.Message?);
@override
void bindQueue({
required String? queueId,
required String? exchangeName,
String? bindingKey,
}) =>
super.noSuchMethod(
Invocation.method(
#bindQueue,
[],
{
#queueId: queueId,
#exchangeName: exchangeName,
#bindingKey: bindingKey,
},
),
returnValueForMissingStub: null,
);
@override
void unbindQueue({
required String? queueId,
required String? exchangeName,
String? bindingKey,
}) =>
super.noSuchMethod(
Invocation.method(
#unbindQueue,
[],
{
#queueId: queueId,
#exchangeName: exchangeName,
#bindingKey: bindingKey,
},
),
returnValueForMissingStub: null,
);
@override
void declareExchange({
required String? exchangeName,
required _i12.ExchangeType? exchangeType,
}) =>
super.noSuchMethod(
Invocation.method(
#declareExchange,
[],
{
#exchangeName: exchangeName,
#exchangeType: exchangeType,
},
),
returnValueForMissingStub: null,
);
@override
void deleteExchange(String? exchangeName) => super.noSuchMethod(
Invocation.method(
#deleteExchange,
[exchangeName],
),
returnValueForMissingStub: null,
);
@override
void close() => super.noSuchMethod(
Invocation.method(
#close,
[],
),
returnValueForMissingStub: null,
);
}
/// A class which mocks [TransactionManager].
///
/// See the documentation for Mockito's code generation for more information.
class MockTransactionManager extends _i1.Mock
implements _i13.TransactionManager {
MockTransactionManager() {
_i1.throwOnMissingStub(this);
}
@override
_i4.Future<T> addCallback<T>(_i4.Future<T> Function()? callback) =>
(super.noSuchMethod(
Invocation.method(
#addCallback,
[callback],
),
returnValue: _i6.ifNotNull(
_i6.dummyValueOrNull<T>(
this,
Invocation.method(
#addCallback,
[callback],
),
),
(T v) => _i4.Future<T>.value(v),
) ??
_FakeFuture_2<T>(
this,
Invocation.method(
#addCallback,
[callback],
),
),
) as _i4.Future<T>);
}