diff --git a/core/events/.gitignore b/core/events/.gitignore new file mode 100644 index 00000000..3cceda55 --- /dev/null +++ b/core/events/.gitignore @@ -0,0 +1,7 @@ +# https://dart.dev/guides/libraries/private-files +# Created by `dart pub` +.dart_tool/ + +# Avoid committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock diff --git a/core/events/CHANGELOG.md b/core/events/CHANGELOG.md new file mode 100644 index 00000000..effe43c8 --- /dev/null +++ b/core/events/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version. diff --git a/core/events/LICENSE.md b/core/events/LICENSE.md new file mode 100644 index 00000000..0fd0d03b --- /dev/null +++ b/core/events/LICENSE.md @@ -0,0 +1,10 @@ +The MIT License (MIT) + +The Laravel Framework is Copyright (c) Taylor Otwell +The Fabric Framework is Copyright (c) Vieo, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/core/events/README.md b/core/events/README.md new file mode 100644 index 00000000..757f4c9f --- /dev/null +++ b/core/events/README.md @@ -0,0 +1 @@ +

\ No newline at end of file diff --git a/core/events/analysis_options.yaml b/core/events/analysis_options.yaml new file mode 100644 index 00000000..dee8927a --- /dev/null +++ b/core/events/analysis_options.yaml @@ -0,0 +1,30 @@ +# This file configures the static analysis results for your project (errors, +# warnings, and lints). +# +# This enables the 'recommended' set of lints from `package:lints`. +# This set helps identify many issues that may lead to problems when running +# or consuming Dart code, and enforces writing Dart using a single, idiomatic +# style and format. +# +# If you want a smaller set of lints you can change this to specify +# 'package:lints/core.yaml'. These are just the most critical lints +# (the recommended set includes the core lints). +# The core lints are also what is used by pub.dev for scoring packages. + +include: package:lints/recommended.yaml + +# Uncomment the following section to specify additional rules. + +# linter: +# rules: +# - camel_case_types + +# analyzer: +# exclude: +# - path/to/excluded/files/** + +# For more information about the core and recommended set of lints, see +# https://dart.dev/go/core-lints + +# For additional information about configuring this file, see +# https://dart.dev/guides/language/analysis-options diff --git a/core/events/lib/dispatcher.dart b/core/events/lib/dispatcher.dart new file mode 100644 index 00000000..bbc26fce --- /dev/null +++ b/core/events/lib/dispatcher.dart @@ -0,0 +1,3 @@ +library; + +export 'src/dispatcher.dart'; diff --git a/core/events/lib/src/dispatcher.dart b/core/events/lib/src/dispatcher.dart new file mode 100644 index 00000000..9a692a41 --- /dev/null +++ b/core/events/lib/src/dispatcher.dart @@ -0,0 +1,490 @@ +import 'dart:async'; +import 'package:angel3_container/angel3_container.dart'; +import 'package:angel3_reactivex/angel3_reactivex.dart'; +import 'package:angel3_event_bus/event_bus.dart'; +import 'package:angel3_mq/mq.dart'; + +// Simulating some of the Laravel interfaces/classes +abstract class ShouldBroadcast {} + +abstract class ShouldQueue {} + +abstract class ShouldBeEncrypted {} + +abstract class ShouldDispatchAfterCommit {} + +class Dispatcher implements DispatcherContract { + // Properties as specified in YAML + final Container container; + final Map> _listeners = {}; + final Map> _wildcards = {}; + final Map> _wildcardsCache = {}; + late final Function _queueResolver; + late final Function _transactionManagerResolver; + final Map> _eventBusListeners = {}; + final Map> _untilCompleters = {}; + final Map _eventBusSubscriptions = {}; + final Set _processedMessageIds = {}; + + // Properties for Angel3 packages + final EventBus _eventBus; + late final MQClient? _mqClient; + final Map> _subjects = {}; + + // Queue and exchange names + static const String _eventsQueue = 'events_queue'; + static const String _delayedEventsQueue = 'delayed_events_queue'; + static const String _eventsExchange = 'events_exchange'; + + Dispatcher(this.container) : _eventBus = EventBus(); + + // Setter for _mqClient + set mqClient(MQClient client) { + _mqClient = client; + _setupQueuesAndExchanges(); + _startProcessingQueuedEvents(); + } + + void _setupQueuesAndExchanges() { + _mqClient?.declareQueue(_eventsQueue); + _mqClient?.declareQueue(_delayedEventsQueue); + _mqClient?.declareExchange( + exchangeName: _eventsExchange, + exchangeType: ExchangeType.direct, + ); + _mqClient?.bindQueue( + queueId: _eventsQueue, + exchangeName: _eventsExchange, + bindingKey: _eventsQueue, + ); + _mqClient?.bindQueue( + queueId: _delayedEventsQueue, + exchangeName: _eventsExchange, + bindingKey: _delayedEventsQueue, + ); + } + + void _startProcessingQueuedEvents() { + _mqClient?.fetchQueue(_eventsQueue).listen((Message message) async { + if (message.payload is Map) { + final eventData = message.payload as Map; + if (eventData.containsKey('event') && + eventData.containsKey('payload')) { + await dispatch(eventData['event'], eventData['payload']); + } else { + print('Invalid message format: ${message.payload}'); + } + } else { + print('Unexpected payload type: ${message.payload.runtimeType}'); + } + }); + } + + @override + void listen(dynamic events, dynamic listener) { + if (events is String) { + _addListener(events, listener); + } else if (events is List) { + for (var event in events) { + _addListener(event, listener); + } + } + if (events is String && events.contains('*')) { + _setupWildcardListen(events, listener); + } + } + + void _addListener(String event, dynamic listener) { + _listeners.putIfAbsent(event, () => []).add(listener); + + // Create a subject for this event if it doesn't exist + _subjects.putIfAbsent(event, () => BehaviorSubject()); + + // Add EventBus listener and store the subscription + final subscription = _eventBus.on().listen((AppEvent busEvent) { + if (busEvent is CustomAppEvent && busEvent.eventName == event) { + listener(event, busEvent.payload); + } + }); + _eventBusSubscriptions[event] = subscription; + } + + void _setupWildcardListen(String event, Function listener) { + _wildcards.putIfAbsent(event, () => []).add(listener); + _wildcardsCache.clear(); + } + + @override + bool hasListeners(String eventName) { + return _listeners.containsKey(eventName) || + _wildcards.containsKey(eventName) || + hasWildcardListeners(eventName); + } + + bool hasWildcardListeners(String eventName) { + return _wildcards.keys + .any((pattern) => _isWildcardMatch(pattern, eventName)); + } + + @override + void push(String event, [dynamic payload]) { + final effectivePayload = payload ?? []; + _mqClient?.sendMessage( + exchangeName: _eventsExchange, + routingKey: _delayedEventsQueue, + message: Message( + headers: {'expiration': '5000'}, // 5 seconds delay + payload: { + 'event': event, + 'payload': + effectivePayload is List ? effectivePayload : [effectivePayload], + }, + timestamp: DateTime.now().toIso8601String(), + id: 'msg_${DateTime.now().millisecondsSinceEpoch}', // Ensure unique ID + ), + ); + } + + @override + Future flush(String event) async { + final messageStream = _mqClient?.fetchQueue(_delayedEventsQueue); + if (messageStream == null) { + print('Warning: MQClient is not initialized'); + return; + } + + final messagesToProcess = []; + + // Collect messages to process + await for (final message in messageStream) { + print('Examining message: ${message.id}'); + if (message.payload is Map && + !_processedMessageIds.contains(message.id)) { + final eventData = message.payload as Map; + if (eventData['event'] == event) { + print('Adding message to process: ${message.id}'); + messagesToProcess.add(message); + } + } + } + + print('Total messages to process: ${messagesToProcess.length}'); + + // Process collected messages + for (final message in messagesToProcess) { + final eventData = message.payload as Map; + print('Processing message: ${message.id}'); + await dispatch(eventData['event'], eventData['payload']); + _mqClient?.deleteMessage(_delayedEventsQueue, message); + _processedMessageIds.add(message.id); + } + } + + @override + void subscribe(dynamic subscriber) { + if (subscriber is EventBusSubscriber) { + subscriber.subscribe(_eventBus); + } else { + // Handle other types of subscribers + } + } + + @override + Future until(dynamic event, [dynamic payload]) { + if (event is String) { + final completer = Completer(); + _untilCompleters[event] = completer; + + // Set up a one-time listener for this event + listen(event, (dynamic e, dynamic p) { + if (!completer.isCompleted) { + completer.complete(p); + _untilCompleters.remove(event); + } + }); + + // If payload is provided, dispatch the event immediately + if (payload != null) { + // Use dispatch instead of push to ensure immediate processing + dispatch(event, payload); + } + + return completer.future; + } + throw ArgumentError('Event must be a String'); + } + + @override + Future dispatch(dynamic event, [dynamic payload, bool? halt]) async { + final eventName = event is String ? event : event.runtimeType.toString(); + final eventPayload = payload ?? (event is AppEvent ? event : []); + + if (event is ShouldBroadcast || + (eventPayload is List && + eventPayload.isNotEmpty && + eventPayload[0] is ShouldBroadcast)) { + await _broadcastEvent(event); + } + + if (event is ShouldQueue || + (eventPayload is List && + eventPayload.isNotEmpty && + eventPayload[0] is ShouldQueue)) { + return _queueEvent(eventName, eventPayload); + } + + final listeners = getListeners(eventName); + for (var listener in listeners) { + final response = + await Function.apply(listener, [eventName, eventPayload]); + if (halt == true && response != null) { + return response; + } + if (response == false) { + break; + } + } + + return halt == true ? null : listeners; + } + + // void _addToSubject(String eventName, dynamic payload) { + // if (_subjects.containsKey(eventName)) { + // _subjects[eventName]!.add(payload); + // } + // } + + @override + List getListeners(String eventName) { + var listeners = [ + ...(_listeners[eventName] ?? []), + ...(_wildcardsCache[eventName] ?? _getWildcardListeners(eventName)), + ...(_eventBusListeners[eventName] ?? []), + ]; + + return listeners; + } + + List _getWildcardListeners(String eventName) { + final wildcardListeners = []; + for (var entry in _wildcards.entries) { + if (_isWildcardMatch(entry.key, eventName)) { + wildcardListeners.addAll(entry.value); + } + } + _wildcardsCache[eventName] = wildcardListeners; + return wildcardListeners; + } + + @override + void forget(String event) { + // Remove from _listeners + _listeners.remove(event); + + // Remove from _subjects + if (_subjects.containsKey(event)) { + _subjects[event]?.close(); + _subjects.remove(event); + } + + // Cancel and remove EventBus subscription + _eventBusSubscriptions[event]?.cancel(); + _eventBusSubscriptions.remove(event); + + // Remove from wildcards if applicable + if (event.contains('*')) { + _wildcards.remove(event); + _wildcardsCache.clear(); + } else { + // If it's not a wildcard, we need to remove it from any matching wildcard listeners + _wildcards.forEach((pattern, listeners) { + if (_isWildcardMatch(pattern, event)) { + _wildcards[pattern] = listeners + .where((listener) => listener != _listeners[event]) + .toList(); + } + }); + _wildcardsCache.clear(); + } + + // Remove any 'until' completers for this event + _untilCompleters.remove(event); + } + + @override + void forgetPushed() { + _listeners.removeWhere((key, _) => key.endsWith('_pushed')); + _eventBusListeners.removeWhere((key, _) => key.endsWith('_pushed')); + // Note: We're not clearing all EventBus listeners here, as that might affect other parts of your application + } + + @override + void setQueueResolver(Function resolver) { + _queueResolver = resolver; + } + + @override + void setTransactionManagerResolver(Function resolver) { + _transactionManagerResolver = resolver; + } + + @override + Map> getRawListeners() { + return Map.unmodifiable(_listeners); + } + + bool _shouldBroadcast(List payload) { + return payload.isNotEmpty && payload[0] is ShouldBroadcast; + } + + Future _broadcastEvent(dynamic event) async { + // Implement broadcasting logic here + // For now, we'll just print a message + print('Broadcasting event: ${event.runtimeType}'); + } + + bool _isWildcardMatch(String pattern, String eventName) { + final regExp = RegExp('^${pattern.replaceAll('*', '.*')}\$'); + return regExp.hasMatch(eventName); + } + + bool _shouldQueue(List payload) { + return payload.isNotEmpty && payload[0] is ShouldQueue; + } + + Future _queueEvent(String eventName, dynamic payload) async { + _mqClient?.sendMessage( + exchangeName: _eventsExchange, + routingKey: _eventsQueue, + message: Message( + payload: {'event': eventName, 'payload': payload}, + timestamp: DateTime.now().toIso8601String(), + ), + ); + } + + // Updated on method + Stream on(String event) { + return (_subjects + .putIfAbsent(event, () => BehaviorSubject()) + .stream as Stream) + .where((event) => event is T) + .cast(); + } + + // In your Dispatcher class + void setMQClient(MQClient client) { + _mqClient = client; + } + + // Method to close the MQClient connection + Future close() async { + _mqClient?.close(); + } + + // Don't forget to close the subjects when they're no longer needed + void dispose() { + for (var subject in _subjects.values) { + subject.close(); + } + } +} +// ... rest of the code (DispatcherContract, EventBusSubscriber, etc.) remains the same + +abstract class DispatcherContract { + void listen(dynamic events, dynamic listener); + bool hasListeners(String eventName); + void push(String event, [dynamic payload]); + Future flush(String event); + void subscribe(dynamic subscriber); + Future until(dynamic event, [dynamic payload]); + Future dispatch(dynamic event, [dynamic payload, bool halt]); + List getListeners(String eventName); + void forget(String event); + void forgetPushed(); + void setQueueResolver(Function resolver); + void setTransactionManagerResolver(Function resolver); + Map> getRawListeners(); +} + +// Helper class for EventBus subscribers +abstract class EventBusSubscriber { + void subscribe(EventBus eventBus); +} + +// Mixin to simulate Macroable trait +mixin Macroable { + // Implementation of Macroable functionality +} + +// Mixin to simulate ReflectsClosures trait +mixin ReflectsClosures { + // Implementation of ReflectsClosures functionality +} + +// If not already defined, you might need to create an Event class +class Event { + final String name; + final dynamic data; + + Event(this.name, this.data); +} + +// Custom AppEvent subclasses for handling different event types +class StringBasedEvent extends AppEvent { + final String eventName; + final dynamic payload; + + StringBasedEvent(this.eventName, this.payload); + + @override + List get props => [eventName, payload]; +} + +class CustomAppEvent extends AppEvent { + final String eventName; + final dynamic payload; + + CustomAppEvent(this.eventName, this.payload); + + @override + List get props => [eventName, payload]; +} + +// This is a simple implementation of Reflector that does nothing +class EmptyReflector implements Reflector { + const EmptyReflector(); + + @override + ReflectedType reflectType(Type type) { + throw UnimplementedError(); + } + + @override + ReflectedInstance reflectInstance(Object object) { + throw UnimplementedError(); + } + + @override + ReflectedType reflectFutureOf(Type type) { + throw UnimplementedError(); + } + + @override + String? getName(Symbol symbol) { + // TODO: implement getName + throw UnimplementedError(); + } + + @override + ReflectedClass? reflectClass(Type clazz) { + // TODO: implement reflectClass + throw UnimplementedError(); + } + + @override + ReflectedFunction? reflectFunction(Function function) { + // TODO: implement reflectFunction + throw UnimplementedError(); + } +} diff --git a/core/events/pubspec.yaml b/core/events/pubspec.yaml new file mode 100644 index 00000000..1e90a639 --- /dev/null +++ b/core/events/pubspec.yaml @@ -0,0 +1,21 @@ +name: angel3_events +description: The Events Package for the Protevus Platform +version: 0.0.1 +homepage: https://protevus.com +documentation: https://docs.protevus.com +repository: https://github.com/protevus/platformo +environment: + sdk: ^3.4.2 + +# Add regular dependencies here. +dependencies: + angel3_container: ^9.0.0 + angel3_mq: ^9.0.0 + angel3_event_bus: ^9.0.0 + angel3_framework: ^9.0.0 + angel3_reactivex: ^0.27.5 + # path: ^1.8.0 + +dev_dependencies: + lints: ^3.0.0 + test: ^1.24.0 diff --git a/core/events/test/event_test.dart b/core/events/test/event_test.dart new file mode 100644 index 00000000..1c7ff64a --- /dev/null +++ b/core/events/test/event_test.dart @@ -0,0 +1,379 @@ +import 'package:angel3_event_bus/res/app_event.dart'; +import 'package:test/test.dart'; +import 'package:angel3_container/angel3_container.dart'; +import 'package:angel3_mq/mq.dart'; +import 'package:angel3_events/dispatcher.dart'; // Replace with the actual import path + +void main() { + late Dispatcher dispatcher; + late MockMQClient mockMQClient; + + setUp(() { + var container = Container(EmptyReflector()); + dispatcher = Dispatcher(container); + mockMQClient = MockMQClient(); + dispatcher.mqClient = mockMQClient; // Use the setter + + // Clear the queue before each test + mockMQClient.queuedMessages.clear(); + }); + + group('Dispatcher', () { + test('listen and dispatch', () async { + var callCount = 0; + dispatcher.listen('test_event', (dynamic event, dynamic payload) { + expect(event, equals('test_event')); + expect(payload, equals(['test_payload'])); + callCount++; + }); + await dispatcher.dispatch('test_event', ['test_payload']); + expect(callCount, equals(1)); + }); + + test('wildcard listener', () async { + var callCount = 0; + dispatcher.listen('test.*', (dynamic event, dynamic payload) { + expect(event, matches(RegExp(r'^test\.'))); + callCount++; + }); + + await dispatcher.dispatch('test.one', ['payload1']); + await dispatcher.dispatch('test.two', ['payload2']); + expect(callCount, equals(2)); + }); + + test('hasListeners', () { + dispatcher.listen('test_event', (dynamic event, dynamic payload) {}); + expect(dispatcher.hasListeners('test_event'), isTrue); + expect(dispatcher.hasListeners('non_existent_event'), isFalse); + }); + + test('until', () async { + // Test without pushing the event immediately + var futureResult = dispatcher.until('test_event'); + + // Use a small delay to ensure the until listener is set up + await Future.delayed(Duration(milliseconds: 10)); + + await dispatcher.dispatch('test_event', ['test_payload']); + var result = await futureResult; + expect(result, equals(['test_payload'])); + + // Test with pushing the event immediately + result = + await dispatcher.until('another_test_event', ['another_payload']); + expect(result, equals(['another_payload'])); + }, timeout: Timeout(Duration(seconds: 5))); // Add a reasonable timeout + + test('forget', () async { + var callCount = 0; + dispatcher.listen('test_event', (dynamic event, dynamic payload) { + callCount++; + }); + await dispatcher.dispatch('test_event'); + expect(callCount, equals(1)); + + dispatcher.forget('test_event'); + await dispatcher.dispatch('test_event'); + expect(callCount, equals(1)); // Should not increase + }); + + test('push and flush', () async { + print('Starting push and flush test'); + + // Push 4 messages + for (var i = 0; i < 4; i++) { + dispatcher.push('delayed_event', ['delayed_payload_$i']); + } + + // Verify that 4 messages were queued + expect(mockMQClient.queuedMessages['delayed_events_queue']?.length, + equals(4), + reason: 'Should have queued exactly 4 messages'); + + print( + 'Queued messages: ${mockMQClient.queuedMessages['delayed_events_queue']?.length}'); + + var callCount = 0; + var processedPayloads = []; + + // Remove any existing listeners + dispatcher.forget('delayed_event'); + + dispatcher.listen('delayed_event', (dynamic event, dynamic payload) { + print('Listener called with payload: $payload'); + expect(event, equals('delayed_event')); + expect(payload[0], startsWith('delayed_payload_')); + processedPayloads.add(payload[0]); + callCount++; + }); + + await dispatcher.flush('delayed_event'); + + print('After flush - Call count: $callCount'); + print('Processed payloads: $processedPayloads'); + + expect(callCount, equals(4), reason: 'Should process exactly 4 messages'); + expect(processedPayloads.toSet().length, equals(4), + reason: 'All payloads should be unique'); + + // Verify that all messages were removed from the queue + expect(mockMQClient.queuedMessages['delayed_events_queue']?.length, + equals(0), + reason: 'Queue should be empty after flush'); + + // Flush again to ensure no more messages are processed + await dispatcher.flush('delayed_event'); + expect(callCount, equals(4), + reason: 'Should still be 4 after second flush'); + }); + + test('shouldBroadcast', () async { + var broadcastEvent = BroadcastTestEvent(); + var callCount = 0; + + dispatcher.listen('BroadcastTestEvent', (dynamic event, dynamic payload) { + callCount++; + }); + + await dispatcher.dispatch(broadcastEvent); + expect(callCount, equals(1)); + }); + + test('shouldQueue', () async { + var queueEvent = QueueTestEvent(); + await dispatcher.dispatch(queueEvent); + expect(mockMQClient.queuedMessages['events_queue'], isNotEmpty); + expect(mockMQClient.queuedMessages['events_queue']!.first.payload, + containsPair('event', 'QueueTestEvent')); + }); + }); +} + +abstract class MQClientWrapper { + Stream fetchQueue(String queueId); + void sendMessage({ + required Message message, + String? exchangeName, + String? routingKey, + }); + String declareQueue(String queueId); + void declareExchange({ + required String exchangeName, + required ExchangeType exchangeType, + }); + void bindQueue({ + required String queueId, + required String exchangeName, + String? bindingKey, + }); + void close(); +} + +class RealMQClientWrapper implements MQClientWrapper { + final MQClient _client; + + RealMQClientWrapper(this._client); + + @override + Stream fetchQueue(String queueId) => _client.fetchQueue(queueId); + + @override + void sendMessage({ + required Message message, + String? exchangeName, + String? routingKey, + }) => + _client.sendMessage( + message: message, + exchangeName: exchangeName, + routingKey: routingKey, + ); + + @override + String declareQueue(String queueId) => _client.declareQueue(queueId); + + @override + void declareExchange({ + required String exchangeName, + required ExchangeType exchangeType, + }) => + _client.declareExchange( + exchangeName: exchangeName, + exchangeType: exchangeType, + ); + + @override + void bindQueue({ + required String queueId, + required String exchangeName, + String? bindingKey, + }) => + _client.bindQueue( + queueId: queueId, + exchangeName: exchangeName, + bindingKey: bindingKey, + ); + + @override + void close() => _client.close(); +} + +class MockMQClient implements MQClient { + Map> queuedMessages = {}; + int _messageIdCounter = 0; + + void queueMessage(String queueName, Message message) { + queuedMessages.putIfAbsent(queueName, () => []).add(message); + print( + 'Queued message. Queue $queueName now has ${queuedMessages[queueName]?.length} messages'); + } + + @override + String declareQueue(String queueId) { + queuedMessages[queueId] = []; + return queueId; + } + + @override + void deleteQueue(String queueId) { + queuedMessages.remove(queueId); + } + + @override + Stream fetchQueue(String queueId) { + print('Fetching queue: $queueId'); + return Stream.fromIterable(queuedMessages[queueId] ?? []); + } + + @override + void sendMessage({ + required Message message, + String? exchangeName, + String? routingKey, + }) { + print('Sending message to queue: $routingKey'); + final newMessage = Message( + payload: message.payload, + headers: message.headers, + timestamp: message.timestamp, + id: 'msg_${_messageIdCounter++}', + ); + queueMessage(routingKey ?? '', newMessage); + } + + @override + Message? getLatestMessage(String queueId) { + final messages = queuedMessages[queueId]; + return messages?.isNotEmpty == true ? messages!.last : null; + } + + @override + void bindQueue({ + required String queueId, + required String exchangeName, + String? bindingKey, + }) { + // Implement if needed for your tests + } + + @override + void unbindQueue({ + required String queueId, + required String exchangeName, + String? bindingKey, + }) { + // Implement if needed for your tests + } + + @override + void declareExchange({ + required String exchangeName, + required ExchangeType exchangeType, + }) { + // Implement if needed for your tests + } + + @override + void deleteExchange(String exchangeName) { + // Implement if needed for your tests + } + + @override + List listQueues() { + return queuedMessages.keys.toList(); + } + + @override + void close() { + queuedMessages.clear(); + } + + @override + void deleteMessage(String queueId, Message message) { + print('Deleting message from queue: $queueId'); + queuedMessages[queueId]?.removeWhere((m) => m.id == message.id); + print( + 'After deletion, queue $queueId has ${queuedMessages[queueId]?.length} messages'); + } +} + +class BroadcastTestEvent implements AppEvent, ShouldBroadcast { + @override + List get props => []; + + @override + bool? get stringify => true; + + @override + DateTime get timestamp => DateTime.now(); +} + +class QueueTestEvent implements AppEvent, ShouldQueue { + @override + List get props => []; + + @override + bool? get stringify => true; + + @override + DateTime get timestamp => DateTime.now(); +} + +// This is a simple implementation of Reflector that does nothing +class EmptyReflector implements Reflector { + const EmptyReflector(); + + @override + ReflectedType reflectType(Type type) { + throw UnimplementedError(); + } + + @override + ReflectedInstance reflectInstance(Object object) { + throw UnimplementedError(); + } + + @override + ReflectedType reflectFutureOf(Type type) { + throw UnimplementedError(); + } + + @override + String? getName(Symbol symbol) { + // TODO: implement getName + throw UnimplementedError(); + } + + @override + ReflectedClass? reflectClass(Type clazz) { + // TODO: implement reflectClass + throw UnimplementedError(); + } + + @override + ReflectedFunction? reflectFunction(Function function) { + // TODO: implement reflectFunction + throw UnimplementedError(); + } +}