Compare commits
2 commits
775bae4a61
...
ee9d512c1f
Author | SHA1 | Date | |
---|---|---|---|
|
ee9d512c1f | ||
|
dcddc2992a |
14 changed files with 1004 additions and 15 deletions
7
core/events/.gitignore
vendored
Normal file
7
core/events/.gitignore
vendored
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
# https://dart.dev/guides/libraries/private-files
|
||||||
|
# Created by `dart pub`
|
||||||
|
.dart_tool/
|
||||||
|
|
||||||
|
# Avoid committing pubspec.lock for library packages; see
|
||||||
|
# https://dart.dev/guides/libraries/private-files#pubspeclock.
|
||||||
|
pubspec.lock
|
3
core/events/CHANGELOG.md
Normal file
3
core/events/CHANGELOG.md
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
## 1.0.0
|
||||||
|
|
||||||
|
- Initial version.
|
10
core/events/LICENSE.md
Normal file
10
core/events/LICENSE.md
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
The Laravel Framework is Copyright (c) Taylor Otwell
|
||||||
|
The Fabric Framework is Copyright (c) Vieo, Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
1
core/events/README.md
Normal file
1
core/events/README.md
Normal file
|
@ -0,0 +1 @@
|
||||||
|
<p align="center"><a href="https://protevus.com" target="_blank"><img src="https://git.protevus.com/protevus/branding/raw/branch/main/protevus-logo-bg.png"></a></p>
|
30
core/events/analysis_options.yaml
Normal file
30
core/events/analysis_options.yaml
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
# This file configures the static analysis results for your project (errors,
|
||||||
|
# warnings, and lints).
|
||||||
|
#
|
||||||
|
# This enables the 'recommended' set of lints from `package:lints`.
|
||||||
|
# This set helps identify many issues that may lead to problems when running
|
||||||
|
# or consuming Dart code, and enforces writing Dart using a single, idiomatic
|
||||||
|
# style and format.
|
||||||
|
#
|
||||||
|
# If you want a smaller set of lints you can change this to specify
|
||||||
|
# 'package:lints/core.yaml'. These are just the most critical lints
|
||||||
|
# (the recommended set includes the core lints).
|
||||||
|
# The core lints are also what is used by pub.dev for scoring packages.
|
||||||
|
|
||||||
|
include: package:lints/recommended.yaml
|
||||||
|
|
||||||
|
# Uncomment the following section to specify additional rules.
|
||||||
|
|
||||||
|
# linter:
|
||||||
|
# rules:
|
||||||
|
# - camel_case_types
|
||||||
|
|
||||||
|
# analyzer:
|
||||||
|
# exclude:
|
||||||
|
# - path/to/excluded/files/**
|
||||||
|
|
||||||
|
# For more information about the core and recommended set of lints, see
|
||||||
|
# https://dart.dev/go/core-lints
|
||||||
|
|
||||||
|
# For additional information about configuring this file, see
|
||||||
|
# https://dart.dev/guides/language/analysis-options
|
3
core/events/lib/dispatcher.dart
Normal file
3
core/events/lib/dispatcher.dart
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
library;
|
||||||
|
|
||||||
|
export 'src/dispatcher.dart';
|
490
core/events/lib/src/dispatcher.dart
Normal file
490
core/events/lib/src/dispatcher.dart
Normal file
|
@ -0,0 +1,490 @@
|
||||||
|
import 'dart:async';
|
||||||
|
import 'package:angel3_container/angel3_container.dart';
|
||||||
|
import 'package:angel3_reactivex/angel3_reactivex.dart';
|
||||||
|
import 'package:angel3_event_bus/event_bus.dart';
|
||||||
|
import 'package:angel3_mq/mq.dart';
|
||||||
|
|
||||||
|
// Simulating some of the Laravel interfaces/classes
|
||||||
|
abstract class ShouldBroadcast {}
|
||||||
|
|
||||||
|
abstract class ShouldQueue {}
|
||||||
|
|
||||||
|
abstract class ShouldBeEncrypted {}
|
||||||
|
|
||||||
|
abstract class ShouldDispatchAfterCommit {}
|
||||||
|
|
||||||
|
class Dispatcher implements DispatcherContract {
|
||||||
|
// Properties as specified in YAML
|
||||||
|
final Container container;
|
||||||
|
final Map<String, List<Function>> _listeners = {};
|
||||||
|
final Map<String, List<Function>> _wildcards = {};
|
||||||
|
final Map<String, List<Function>> _wildcardsCache = {};
|
||||||
|
late final Function _queueResolver;
|
||||||
|
late final Function _transactionManagerResolver;
|
||||||
|
final Map<String, List<Function>> _eventBusListeners = {};
|
||||||
|
final Map<String, Completer<dynamic>> _untilCompleters = {};
|
||||||
|
final Map<String, StreamSubscription> _eventBusSubscriptions = {};
|
||||||
|
final Set<String> _processedMessageIds = {};
|
||||||
|
|
||||||
|
// Properties for Angel3 packages
|
||||||
|
final EventBus _eventBus;
|
||||||
|
late final MQClient? _mqClient;
|
||||||
|
final Map<String, BehaviorSubject<dynamic>> _subjects = {};
|
||||||
|
|
||||||
|
// Queue and exchange names
|
||||||
|
static const String _eventsQueue = 'events_queue';
|
||||||
|
static const String _delayedEventsQueue = 'delayed_events_queue';
|
||||||
|
static const String _eventsExchange = 'events_exchange';
|
||||||
|
|
||||||
|
Dispatcher(this.container) : _eventBus = EventBus();
|
||||||
|
|
||||||
|
// Setter for _mqClient
|
||||||
|
set mqClient(MQClient client) {
|
||||||
|
_mqClient = client;
|
||||||
|
_setupQueuesAndExchanges();
|
||||||
|
_startProcessingQueuedEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
void _setupQueuesAndExchanges() {
|
||||||
|
_mqClient?.declareQueue(_eventsQueue);
|
||||||
|
_mqClient?.declareQueue(_delayedEventsQueue);
|
||||||
|
_mqClient?.declareExchange(
|
||||||
|
exchangeName: _eventsExchange,
|
||||||
|
exchangeType: ExchangeType.direct,
|
||||||
|
);
|
||||||
|
_mqClient?.bindQueue(
|
||||||
|
queueId: _eventsQueue,
|
||||||
|
exchangeName: _eventsExchange,
|
||||||
|
bindingKey: _eventsQueue,
|
||||||
|
);
|
||||||
|
_mqClient?.bindQueue(
|
||||||
|
queueId: _delayedEventsQueue,
|
||||||
|
exchangeName: _eventsExchange,
|
||||||
|
bindingKey: _delayedEventsQueue,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
void _startProcessingQueuedEvents() {
|
||||||
|
_mqClient?.fetchQueue(_eventsQueue).listen((Message message) async {
|
||||||
|
if (message.payload is Map) {
|
||||||
|
final eventData = message.payload as Map<String, dynamic>;
|
||||||
|
if (eventData.containsKey('event') &&
|
||||||
|
eventData.containsKey('payload')) {
|
||||||
|
await dispatch(eventData['event'], eventData['payload']);
|
||||||
|
} else {
|
||||||
|
print('Invalid message format: ${message.payload}');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
print('Unexpected payload type: ${message.payload.runtimeType}');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void listen(dynamic events, dynamic listener) {
|
||||||
|
if (events is String) {
|
||||||
|
_addListener(events, listener);
|
||||||
|
} else if (events is List) {
|
||||||
|
for (var event in events) {
|
||||||
|
_addListener(event, listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (events is String && events.contains('*')) {
|
||||||
|
_setupWildcardListen(events, listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _addListener(String event, dynamic listener) {
|
||||||
|
_listeners.putIfAbsent(event, () => []).add(listener);
|
||||||
|
|
||||||
|
// Create a subject for this event if it doesn't exist
|
||||||
|
_subjects.putIfAbsent(event, () => BehaviorSubject<dynamic>());
|
||||||
|
|
||||||
|
// Add EventBus listener and store the subscription
|
||||||
|
final subscription = _eventBus.on().listen((AppEvent busEvent) {
|
||||||
|
if (busEvent is CustomAppEvent && busEvent.eventName == event) {
|
||||||
|
listener(event, busEvent.payload);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_eventBusSubscriptions[event] = subscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _setupWildcardListen(String event, Function listener) {
|
||||||
|
_wildcards.putIfAbsent(event, () => []).add(listener);
|
||||||
|
_wildcardsCache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool hasListeners(String eventName) {
|
||||||
|
return _listeners.containsKey(eventName) ||
|
||||||
|
_wildcards.containsKey(eventName) ||
|
||||||
|
hasWildcardListeners(eventName);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasWildcardListeners(String eventName) {
|
||||||
|
return _wildcards.keys
|
||||||
|
.any((pattern) => _isWildcardMatch(pattern, eventName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void push(String event, [dynamic payload]) {
|
||||||
|
final effectivePayload = payload ?? [];
|
||||||
|
_mqClient?.sendMessage(
|
||||||
|
exchangeName: _eventsExchange,
|
||||||
|
routingKey: _delayedEventsQueue,
|
||||||
|
message: Message(
|
||||||
|
headers: {'expiration': '5000'}, // 5 seconds delay
|
||||||
|
payload: {
|
||||||
|
'event': event,
|
||||||
|
'payload':
|
||||||
|
effectivePayload is List ? effectivePayload : [effectivePayload],
|
||||||
|
},
|
||||||
|
timestamp: DateTime.now().toIso8601String(),
|
||||||
|
id: 'msg_${DateTime.now().millisecondsSinceEpoch}', // Ensure unique ID
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> flush(String event) async {
|
||||||
|
final messageStream = _mqClient?.fetchQueue(_delayedEventsQueue);
|
||||||
|
if (messageStream == null) {
|
||||||
|
print('Warning: MQClient is not initialized');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final messagesToProcess = <Message>[];
|
||||||
|
|
||||||
|
// Collect messages to process
|
||||||
|
await for (final message in messageStream) {
|
||||||
|
print('Examining message: ${message.id}');
|
||||||
|
if (message.payload is Map<String, dynamic> &&
|
||||||
|
!_processedMessageIds.contains(message.id)) {
|
||||||
|
final eventData = message.payload as Map<String, dynamic>;
|
||||||
|
if (eventData['event'] == event) {
|
||||||
|
print('Adding message to process: ${message.id}');
|
||||||
|
messagesToProcess.add(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
print('Total messages to process: ${messagesToProcess.length}');
|
||||||
|
|
||||||
|
// Process collected messages
|
||||||
|
for (final message in messagesToProcess) {
|
||||||
|
final eventData = message.payload as Map<String, dynamic>;
|
||||||
|
print('Processing message: ${message.id}');
|
||||||
|
await dispatch(eventData['event'], eventData['payload']);
|
||||||
|
_mqClient?.deleteMessage(_delayedEventsQueue, message);
|
||||||
|
_processedMessageIds.add(message.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void subscribe(dynamic subscriber) {
|
||||||
|
if (subscriber is EventBusSubscriber) {
|
||||||
|
subscriber.subscribe(_eventBus);
|
||||||
|
} else {
|
||||||
|
// Handle other types of subscribers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<dynamic> until(dynamic event, [dynamic payload]) {
|
||||||
|
if (event is String) {
|
||||||
|
final completer = Completer<dynamic>();
|
||||||
|
_untilCompleters[event] = completer;
|
||||||
|
|
||||||
|
// Set up a one-time listener for this event
|
||||||
|
listen(event, (dynamic e, dynamic p) {
|
||||||
|
if (!completer.isCompleted) {
|
||||||
|
completer.complete(p);
|
||||||
|
_untilCompleters.remove(event);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// If payload is provided, dispatch the event immediately
|
||||||
|
if (payload != null) {
|
||||||
|
// Use dispatch instead of push to ensure immediate processing
|
||||||
|
dispatch(event, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
return completer.future;
|
||||||
|
}
|
||||||
|
throw ArgumentError('Event must be a String');
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<dynamic> dispatch(dynamic event, [dynamic payload, bool? halt]) async {
|
||||||
|
final eventName = event is String ? event : event.runtimeType.toString();
|
||||||
|
final eventPayload = payload ?? (event is AppEvent ? event : []);
|
||||||
|
|
||||||
|
if (event is ShouldBroadcast ||
|
||||||
|
(eventPayload is List &&
|
||||||
|
eventPayload.isNotEmpty &&
|
||||||
|
eventPayload[0] is ShouldBroadcast)) {
|
||||||
|
await _broadcastEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event is ShouldQueue ||
|
||||||
|
(eventPayload is List &&
|
||||||
|
eventPayload.isNotEmpty &&
|
||||||
|
eventPayload[0] is ShouldQueue)) {
|
||||||
|
return _queueEvent(eventName, eventPayload);
|
||||||
|
}
|
||||||
|
|
||||||
|
final listeners = getListeners(eventName);
|
||||||
|
for (var listener in listeners) {
|
||||||
|
final response =
|
||||||
|
await Function.apply(listener, [eventName, eventPayload]);
|
||||||
|
if (halt == true && response != null) {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
if (response == false) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return halt == true ? null : listeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
// void _addToSubject(String eventName, dynamic payload) {
|
||||||
|
// if (_subjects.containsKey(eventName)) {
|
||||||
|
// _subjects[eventName]!.add(payload);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<Function> getListeners(String eventName) {
|
||||||
|
var listeners = <Function>[
|
||||||
|
...(_listeners[eventName] ?? []),
|
||||||
|
...(_wildcardsCache[eventName] ?? _getWildcardListeners(eventName)),
|
||||||
|
...(_eventBusListeners[eventName] ?? []),
|
||||||
|
];
|
||||||
|
|
||||||
|
return listeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Function> _getWildcardListeners(String eventName) {
|
||||||
|
final wildcardListeners = <Function>[];
|
||||||
|
for (var entry in _wildcards.entries) {
|
||||||
|
if (_isWildcardMatch(entry.key, eventName)) {
|
||||||
|
wildcardListeners.addAll(entry.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_wildcardsCache[eventName] = wildcardListeners;
|
||||||
|
return wildcardListeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void forget(String event) {
|
||||||
|
// Remove from _listeners
|
||||||
|
_listeners.remove(event);
|
||||||
|
|
||||||
|
// Remove from _subjects
|
||||||
|
if (_subjects.containsKey(event)) {
|
||||||
|
_subjects[event]?.close();
|
||||||
|
_subjects.remove(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel and remove EventBus subscription
|
||||||
|
_eventBusSubscriptions[event]?.cancel();
|
||||||
|
_eventBusSubscriptions.remove(event);
|
||||||
|
|
||||||
|
// Remove from wildcards if applicable
|
||||||
|
if (event.contains('*')) {
|
||||||
|
_wildcards.remove(event);
|
||||||
|
_wildcardsCache.clear();
|
||||||
|
} else {
|
||||||
|
// If it's not a wildcard, we need to remove it from any matching wildcard listeners
|
||||||
|
_wildcards.forEach((pattern, listeners) {
|
||||||
|
if (_isWildcardMatch(pattern, event)) {
|
||||||
|
_wildcards[pattern] = listeners
|
||||||
|
.where((listener) => listener != _listeners[event])
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_wildcardsCache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove any 'until' completers for this event
|
||||||
|
_untilCompleters.remove(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void forgetPushed() {
|
||||||
|
_listeners.removeWhere((key, _) => key.endsWith('_pushed'));
|
||||||
|
_eventBusListeners.removeWhere((key, _) => key.endsWith('_pushed'));
|
||||||
|
// Note: We're not clearing all EventBus listeners here, as that might affect other parts of your application
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void setQueueResolver(Function resolver) {
|
||||||
|
_queueResolver = resolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void setTransactionManagerResolver(Function resolver) {
|
||||||
|
_transactionManagerResolver = resolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Map<String, List<Function>> getRawListeners() {
|
||||||
|
return Map.unmodifiable(_listeners);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _shouldBroadcast(List payload) {
|
||||||
|
return payload.isNotEmpty && payload[0] is ShouldBroadcast;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _broadcastEvent(dynamic event) async {
|
||||||
|
// Implement broadcasting logic here
|
||||||
|
// For now, we'll just print a message
|
||||||
|
print('Broadcasting event: ${event.runtimeType}');
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _isWildcardMatch(String pattern, String eventName) {
|
||||||
|
final regExp = RegExp('^${pattern.replaceAll('*', '.*')}\$');
|
||||||
|
return regExp.hasMatch(eventName);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _shouldQueue(List payload) {
|
||||||
|
return payload.isNotEmpty && payload[0] is ShouldQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _queueEvent(String eventName, dynamic payload) async {
|
||||||
|
_mqClient?.sendMessage(
|
||||||
|
exchangeName: _eventsExchange,
|
||||||
|
routingKey: _eventsQueue,
|
||||||
|
message: Message(
|
||||||
|
payload: {'event': eventName, 'payload': payload},
|
||||||
|
timestamp: DateTime.now().toIso8601String(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updated on<T> method
|
||||||
|
Stream<T> on<T>(String event) {
|
||||||
|
return (_subjects
|
||||||
|
.putIfAbsent(event, () => BehaviorSubject<dynamic>())
|
||||||
|
.stream as Stream<T>)
|
||||||
|
.where((event) => event is T)
|
||||||
|
.cast<T>();
|
||||||
|
}
|
||||||
|
|
||||||
|
// In your Dispatcher class
|
||||||
|
void setMQClient(MQClient client) {
|
||||||
|
_mqClient = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method to close the MQClient connection
|
||||||
|
Future<void> close() async {
|
||||||
|
_mqClient?.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't forget to close the subjects when they're no longer needed
|
||||||
|
void dispose() {
|
||||||
|
for (var subject in _subjects.values) {
|
||||||
|
subject.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ... rest of the code (DispatcherContract, EventBusSubscriber, etc.) remains the same
|
||||||
|
|
||||||
|
abstract class DispatcherContract {
|
||||||
|
void listen(dynamic events, dynamic listener);
|
||||||
|
bool hasListeners(String eventName);
|
||||||
|
void push(String event, [dynamic payload]);
|
||||||
|
Future<void> flush(String event);
|
||||||
|
void subscribe(dynamic subscriber);
|
||||||
|
Future<dynamic> until(dynamic event, [dynamic payload]);
|
||||||
|
Future<dynamic> dispatch(dynamic event, [dynamic payload, bool halt]);
|
||||||
|
List<Function> getListeners(String eventName);
|
||||||
|
void forget(String event);
|
||||||
|
void forgetPushed();
|
||||||
|
void setQueueResolver(Function resolver);
|
||||||
|
void setTransactionManagerResolver(Function resolver);
|
||||||
|
Map<String, List<Function>> getRawListeners();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper class for EventBus subscribers
|
||||||
|
abstract class EventBusSubscriber {
|
||||||
|
void subscribe(EventBus eventBus);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mixin to simulate Macroable trait
|
||||||
|
mixin Macroable {
|
||||||
|
// Implementation of Macroable functionality
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mixin to simulate ReflectsClosures trait
|
||||||
|
mixin ReflectsClosures {
|
||||||
|
// Implementation of ReflectsClosures functionality
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not already defined, you might need to create an Event class
|
||||||
|
class Event {
|
||||||
|
final String name;
|
||||||
|
final dynamic data;
|
||||||
|
|
||||||
|
Event(this.name, this.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Custom AppEvent subclasses for handling different event types
|
||||||
|
class StringBasedEvent extends AppEvent {
|
||||||
|
final String eventName;
|
||||||
|
final dynamic payload;
|
||||||
|
|
||||||
|
StringBasedEvent(this.eventName, this.payload);
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<Object?> get props => [eventName, payload];
|
||||||
|
}
|
||||||
|
|
||||||
|
class CustomAppEvent extends AppEvent {
|
||||||
|
final String eventName;
|
||||||
|
final dynamic payload;
|
||||||
|
|
||||||
|
CustomAppEvent(this.eventName, this.payload);
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<Object?> get props => [eventName, payload];
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a simple implementation of Reflector that does nothing
|
||||||
|
class EmptyReflector implements Reflector {
|
||||||
|
const EmptyReflector();
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedType reflectType(Type type) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedInstance reflectInstance(Object object) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedType reflectFutureOf(Type type) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String? getName(Symbol symbol) {
|
||||||
|
// TODO: implement getName
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedClass? reflectClass(Type clazz) {
|
||||||
|
// TODO: implement reflectClass
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedFunction? reflectFunction(Function function) {
|
||||||
|
// TODO: implement reflectFunction
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
}
|
21
core/events/pubspec.yaml
Normal file
21
core/events/pubspec.yaml
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
name: angel3_events
|
||||||
|
description: The Events Package for the Protevus Platform
|
||||||
|
version: 0.0.1
|
||||||
|
homepage: https://protevus.com
|
||||||
|
documentation: https://docs.protevus.com
|
||||||
|
repository: https://github.com/protevus/platformo
|
||||||
|
environment:
|
||||||
|
sdk: ^3.4.2
|
||||||
|
|
||||||
|
# Add regular dependencies here.
|
||||||
|
dependencies:
|
||||||
|
angel3_container: ^9.0.0
|
||||||
|
angel3_mq: ^9.0.0
|
||||||
|
angel3_event_bus: ^9.0.0
|
||||||
|
angel3_framework: ^9.0.0
|
||||||
|
angel3_reactivex: ^0.27.5
|
||||||
|
# path: ^1.8.0
|
||||||
|
|
||||||
|
dev_dependencies:
|
||||||
|
lints: ^3.0.0
|
||||||
|
test: ^1.24.0
|
379
core/events/test/event_test.dart
Normal file
379
core/events/test/event_test.dart
Normal file
|
@ -0,0 +1,379 @@
|
||||||
|
import 'package:angel3_event_bus/res/app_event.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
|
import 'package:angel3_container/angel3_container.dart';
|
||||||
|
import 'package:angel3_mq/mq.dart';
|
||||||
|
import 'package:angel3_events/dispatcher.dart'; // Replace with the actual import path
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
late Dispatcher dispatcher;
|
||||||
|
late MockMQClient mockMQClient;
|
||||||
|
|
||||||
|
setUp(() {
|
||||||
|
var container = Container(EmptyReflector());
|
||||||
|
dispatcher = Dispatcher(container);
|
||||||
|
mockMQClient = MockMQClient();
|
||||||
|
dispatcher.mqClient = mockMQClient; // Use the setter
|
||||||
|
|
||||||
|
// Clear the queue before each test
|
||||||
|
mockMQClient.queuedMessages.clear();
|
||||||
|
});
|
||||||
|
|
||||||
|
group('Dispatcher', () {
|
||||||
|
test('listen and dispatch', () async {
|
||||||
|
var callCount = 0;
|
||||||
|
dispatcher.listen('test_event', (dynamic event, dynamic payload) {
|
||||||
|
expect(event, equals('test_event'));
|
||||||
|
expect(payload, equals(['test_payload']));
|
||||||
|
callCount++;
|
||||||
|
});
|
||||||
|
await dispatcher.dispatch('test_event', ['test_payload']);
|
||||||
|
expect(callCount, equals(1));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('wildcard listener', () async {
|
||||||
|
var callCount = 0;
|
||||||
|
dispatcher.listen('test.*', (dynamic event, dynamic payload) {
|
||||||
|
expect(event, matches(RegExp(r'^test\.')));
|
||||||
|
callCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await dispatcher.dispatch('test.one', ['payload1']);
|
||||||
|
await dispatcher.dispatch('test.two', ['payload2']);
|
||||||
|
expect(callCount, equals(2));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('hasListeners', () {
|
||||||
|
dispatcher.listen('test_event', (dynamic event, dynamic payload) {});
|
||||||
|
expect(dispatcher.hasListeners('test_event'), isTrue);
|
||||||
|
expect(dispatcher.hasListeners('non_existent_event'), isFalse);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('until', () async {
|
||||||
|
// Test without pushing the event immediately
|
||||||
|
var futureResult = dispatcher.until('test_event');
|
||||||
|
|
||||||
|
// Use a small delay to ensure the until listener is set up
|
||||||
|
await Future.delayed(Duration(milliseconds: 10));
|
||||||
|
|
||||||
|
await dispatcher.dispatch('test_event', ['test_payload']);
|
||||||
|
var result = await futureResult;
|
||||||
|
expect(result, equals(['test_payload']));
|
||||||
|
|
||||||
|
// Test with pushing the event immediately
|
||||||
|
result =
|
||||||
|
await dispatcher.until('another_test_event', ['another_payload']);
|
||||||
|
expect(result, equals(['another_payload']));
|
||||||
|
}, timeout: Timeout(Duration(seconds: 5))); // Add a reasonable timeout
|
||||||
|
|
||||||
|
test('forget', () async {
|
||||||
|
var callCount = 0;
|
||||||
|
dispatcher.listen('test_event', (dynamic event, dynamic payload) {
|
||||||
|
callCount++;
|
||||||
|
});
|
||||||
|
await dispatcher.dispatch('test_event');
|
||||||
|
expect(callCount, equals(1));
|
||||||
|
|
||||||
|
dispatcher.forget('test_event');
|
||||||
|
await dispatcher.dispatch('test_event');
|
||||||
|
expect(callCount, equals(1)); // Should not increase
|
||||||
|
});
|
||||||
|
|
||||||
|
test('push and flush', () async {
|
||||||
|
print('Starting push and flush test');
|
||||||
|
|
||||||
|
// Push 4 messages
|
||||||
|
for (var i = 0; i < 4; i++) {
|
||||||
|
dispatcher.push('delayed_event', ['delayed_payload_$i']);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that 4 messages were queued
|
||||||
|
expect(mockMQClient.queuedMessages['delayed_events_queue']?.length,
|
||||||
|
equals(4),
|
||||||
|
reason: 'Should have queued exactly 4 messages');
|
||||||
|
|
||||||
|
print(
|
||||||
|
'Queued messages: ${mockMQClient.queuedMessages['delayed_events_queue']?.length}');
|
||||||
|
|
||||||
|
var callCount = 0;
|
||||||
|
var processedPayloads = <String>[];
|
||||||
|
|
||||||
|
// Remove any existing listeners
|
||||||
|
dispatcher.forget('delayed_event');
|
||||||
|
|
||||||
|
dispatcher.listen('delayed_event', (dynamic event, dynamic payload) {
|
||||||
|
print('Listener called with payload: $payload');
|
||||||
|
expect(event, equals('delayed_event'));
|
||||||
|
expect(payload[0], startsWith('delayed_payload_'));
|
||||||
|
processedPayloads.add(payload[0]);
|
||||||
|
callCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await dispatcher.flush('delayed_event');
|
||||||
|
|
||||||
|
print('After flush - Call count: $callCount');
|
||||||
|
print('Processed payloads: $processedPayloads');
|
||||||
|
|
||||||
|
expect(callCount, equals(4), reason: 'Should process exactly 4 messages');
|
||||||
|
expect(processedPayloads.toSet().length, equals(4),
|
||||||
|
reason: 'All payloads should be unique');
|
||||||
|
|
||||||
|
// Verify that all messages were removed from the queue
|
||||||
|
expect(mockMQClient.queuedMessages['delayed_events_queue']?.length,
|
||||||
|
equals(0),
|
||||||
|
reason: 'Queue should be empty after flush');
|
||||||
|
|
||||||
|
// Flush again to ensure no more messages are processed
|
||||||
|
await dispatcher.flush('delayed_event');
|
||||||
|
expect(callCount, equals(4),
|
||||||
|
reason: 'Should still be 4 after second flush');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('shouldBroadcast', () async {
|
||||||
|
var broadcastEvent = BroadcastTestEvent();
|
||||||
|
var callCount = 0;
|
||||||
|
|
||||||
|
dispatcher.listen('BroadcastTestEvent', (dynamic event, dynamic payload) {
|
||||||
|
callCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await dispatcher.dispatch(broadcastEvent);
|
||||||
|
expect(callCount, equals(1));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('shouldQueue', () async {
|
||||||
|
var queueEvent = QueueTestEvent();
|
||||||
|
await dispatcher.dispatch(queueEvent);
|
||||||
|
expect(mockMQClient.queuedMessages['events_queue'], isNotEmpty);
|
||||||
|
expect(mockMQClient.queuedMessages['events_queue']!.first.payload,
|
||||||
|
containsPair('event', 'QueueTestEvent'));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class MQClientWrapper {
|
||||||
|
Stream<Message> fetchQueue(String queueId);
|
||||||
|
void sendMessage({
|
||||||
|
required Message message,
|
||||||
|
String? exchangeName,
|
||||||
|
String? routingKey,
|
||||||
|
});
|
||||||
|
String declareQueue(String queueId);
|
||||||
|
void declareExchange({
|
||||||
|
required String exchangeName,
|
||||||
|
required ExchangeType exchangeType,
|
||||||
|
});
|
||||||
|
void bindQueue({
|
||||||
|
required String queueId,
|
||||||
|
required String exchangeName,
|
||||||
|
String? bindingKey,
|
||||||
|
});
|
||||||
|
void close();
|
||||||
|
}
|
||||||
|
|
||||||
|
class RealMQClientWrapper implements MQClientWrapper {
|
||||||
|
final MQClient _client;
|
||||||
|
|
||||||
|
RealMQClientWrapper(this._client);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<Message> fetchQueue(String queueId) => _client.fetchQueue(queueId);
|
||||||
|
|
||||||
|
@override
|
||||||
|
void sendMessage({
|
||||||
|
required Message message,
|
||||||
|
String? exchangeName,
|
||||||
|
String? routingKey,
|
||||||
|
}) =>
|
||||||
|
_client.sendMessage(
|
||||||
|
message: message,
|
||||||
|
exchangeName: exchangeName,
|
||||||
|
routingKey: routingKey,
|
||||||
|
);
|
||||||
|
|
||||||
|
@override
|
||||||
|
String declareQueue(String queueId) => _client.declareQueue(queueId);
|
||||||
|
|
||||||
|
@override
|
||||||
|
void declareExchange({
|
||||||
|
required String exchangeName,
|
||||||
|
required ExchangeType exchangeType,
|
||||||
|
}) =>
|
||||||
|
_client.declareExchange(
|
||||||
|
exchangeName: exchangeName,
|
||||||
|
exchangeType: exchangeType,
|
||||||
|
);
|
||||||
|
|
||||||
|
@override
|
||||||
|
void bindQueue({
|
||||||
|
required String queueId,
|
||||||
|
required String exchangeName,
|
||||||
|
String? bindingKey,
|
||||||
|
}) =>
|
||||||
|
_client.bindQueue(
|
||||||
|
queueId: queueId,
|
||||||
|
exchangeName: exchangeName,
|
||||||
|
bindingKey: bindingKey,
|
||||||
|
);
|
||||||
|
|
||||||
|
@override
|
||||||
|
void close() => _client.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
class MockMQClient implements MQClient {
|
||||||
|
Map<String, List<Message>> queuedMessages = {};
|
||||||
|
int _messageIdCounter = 0;
|
||||||
|
|
||||||
|
void queueMessage(String queueName, Message message) {
|
||||||
|
queuedMessages.putIfAbsent(queueName, () => []).add(message);
|
||||||
|
print(
|
||||||
|
'Queued message. Queue $queueName now has ${queuedMessages[queueName]?.length} messages');
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String declareQueue(String queueId) {
|
||||||
|
queuedMessages[queueId] = [];
|
||||||
|
return queueId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void deleteQueue(String queueId) {
|
||||||
|
queuedMessages.remove(queueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<Message> fetchQueue(String queueId) {
|
||||||
|
print('Fetching queue: $queueId');
|
||||||
|
return Stream.fromIterable(queuedMessages[queueId] ?? []);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void sendMessage({
|
||||||
|
required Message message,
|
||||||
|
String? exchangeName,
|
||||||
|
String? routingKey,
|
||||||
|
}) {
|
||||||
|
print('Sending message to queue: $routingKey');
|
||||||
|
final newMessage = Message(
|
||||||
|
payload: message.payload,
|
||||||
|
headers: message.headers,
|
||||||
|
timestamp: message.timestamp,
|
||||||
|
id: 'msg_${_messageIdCounter++}',
|
||||||
|
);
|
||||||
|
queueMessage(routingKey ?? '', newMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Message? getLatestMessage(String queueId) {
|
||||||
|
final messages = queuedMessages[queueId];
|
||||||
|
return messages?.isNotEmpty == true ? messages!.last : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void bindQueue({
|
||||||
|
required String queueId,
|
||||||
|
required String exchangeName,
|
||||||
|
String? bindingKey,
|
||||||
|
}) {
|
||||||
|
// Implement if needed for your tests
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void unbindQueue({
|
||||||
|
required String queueId,
|
||||||
|
required String exchangeName,
|
||||||
|
String? bindingKey,
|
||||||
|
}) {
|
||||||
|
// Implement if needed for your tests
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void declareExchange({
|
||||||
|
required String exchangeName,
|
||||||
|
required ExchangeType exchangeType,
|
||||||
|
}) {
|
||||||
|
// Implement if needed for your tests
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void deleteExchange(String exchangeName) {
|
||||||
|
// Implement if needed for your tests
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<String> listQueues() {
|
||||||
|
return queuedMessages.keys.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void close() {
|
||||||
|
queuedMessages.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void deleteMessage(String queueId, Message message) {
|
||||||
|
print('Deleting message from queue: $queueId');
|
||||||
|
queuedMessages[queueId]?.removeWhere((m) => m.id == message.id);
|
||||||
|
print(
|
||||||
|
'After deletion, queue $queueId has ${queuedMessages[queueId]?.length} messages');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class BroadcastTestEvent implements AppEvent, ShouldBroadcast {
|
||||||
|
@override
|
||||||
|
List<Object?> get props => [];
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool? get stringify => true;
|
||||||
|
|
||||||
|
@override
|
||||||
|
DateTime get timestamp => DateTime.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
class QueueTestEvent implements AppEvent, ShouldQueue {
|
||||||
|
@override
|
||||||
|
List<Object?> get props => [];
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool? get stringify => true;
|
||||||
|
|
||||||
|
@override
|
||||||
|
DateTime get timestamp => DateTime.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a simple implementation of Reflector that does nothing
|
||||||
|
class EmptyReflector implements Reflector {
|
||||||
|
const EmptyReflector();
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedType reflectType(Type type) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedInstance reflectInstance(Object object) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedType reflectFutureOf(Type type) {
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
String? getName(Symbol symbol) {
|
||||||
|
// TODO: implement getName
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedClass? reflectClass(Type clazz) {
|
||||||
|
// TODO: implement reflectClass
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
ReflectedFunction? reflectFunction(Function function) {
|
||||||
|
// TODO: implement reflectFunction
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
import 'package:angel3_mq/src/message/message.base.dart';
|
import 'package:angel3_mq/src/message/message.base.dart';
|
||||||
|
import 'package:uuid/uuid.dart';
|
||||||
|
|
||||||
/// Represents a message with headers, payload, and an optional timestamp.
|
/// Represents a message with headers, payload, and an optional timestamp.
|
||||||
///
|
///
|
||||||
|
@ -14,8 +15,7 @@ import 'package:angel3_mq/src/message/message.base.dart';
|
||||||
/// );
|
/// );
|
||||||
/// ```
|
/// ```
|
||||||
class Message extends BaseMessage {
|
class Message extends BaseMessage {
|
||||||
/// Creates a new [Message] with the specified headers, payload, and
|
/// Creates a new [Message] with the specified headers, payload, timestamp, and id.
|
||||||
/// timestamp.
|
|
||||||
///
|
///
|
||||||
/// The [headers] parameter is a map that can contain additional information
|
/// The [headers] parameter is a map that can contain additional information
|
||||||
/// about the message. It is optional and defaults to an empty map if not
|
/// about the message. It is optional and defaults to an empty map if not
|
||||||
|
@ -28,24 +28,33 @@ class Message extends BaseMessage {
|
||||||
/// indicating when the message was created. If not provided, the current
|
/// indicating when the message was created. If not provided, the current
|
||||||
/// timestamp will be used.
|
/// timestamp will be used.
|
||||||
///
|
///
|
||||||
|
/// The [id] parameter is an optional unique identifier for the message.
|
||||||
|
/// If not provided, a new UUID will be generated.
|
||||||
|
///
|
||||||
/// Example:
|
/// Example:
|
||||||
/// ```dart
|
/// ```dart
|
||||||
/// final message = Message(
|
/// final message = Message(
|
||||||
/// headers: {'contentType': 'json', 'sender': 'Alice'},
|
/// headers: {'contentType': 'json', 'sender': 'Alice'},
|
||||||
/// payload: {'text': 'Hello, World!'},
|
/// payload: {'text': 'Hello, World!'},
|
||||||
/// timestamp: '2023-09-07T12:00:002',
|
/// timestamp: '2023-09-07T12:00:002',
|
||||||
|
/// id: '123e4567-e89b-12d3-a456-426614174000',
|
||||||
/// );
|
/// );
|
||||||
/// ```
|
/// ```
|
||||||
Message({
|
Message({
|
||||||
required Object payload,
|
required Object payload,
|
||||||
Map<String, dynamic>? headers,
|
Map<String, dynamic>? headers,
|
||||||
String? timestamp,
|
String? timestamp,
|
||||||
}) : super(
|
String? id,
|
||||||
|
}) : id = id ?? Uuid().v4(),
|
||||||
|
super(
|
||||||
headers,
|
headers,
|
||||||
payload,
|
payload,
|
||||||
timestamp ?? DateTime.now().toUtc().toIso8601String(),
|
timestamp ?? DateTime.now().toUtc().toIso8601String(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/// A unique identifier for the message.
|
||||||
|
final String id;
|
||||||
|
|
||||||
/// Returns a human-readable string representation of the message.
|
/// Returns a human-readable string representation of the message.
|
||||||
///
|
///
|
||||||
/// Example:
|
/// Example:
|
||||||
|
@ -68,9 +77,10 @@ class Message extends BaseMessage {
|
||||||
String toString() {
|
String toString() {
|
||||||
return '''
|
return '''
|
||||||
Message{
|
Message{
|
||||||
headers: $headers,
|
id: $id,
|
||||||
payload: $payload,
|
headers: $headers,
|
||||||
timestamp: $timestamp,
|
payload: $payload,
|
||||||
}''';
|
timestamp: $timestamp,
|
||||||
|
}''';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,4 +11,4 @@
|
||||||
/// // Custom implementation of the message queue client.
|
/// // Custom implementation of the message queue client.
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
abstract base class BaseMQClient {}
|
abstract class BaseMQClient {}
|
||||||
|
|
|
@ -55,7 +55,7 @@ import 'package:angel3_mq/src/queue/queue.dart';
|
||||||
/// print('Received message: $message');
|
/// print('Received message: $message');
|
||||||
/// });
|
/// });
|
||||||
/// ```
|
/// ```
|
||||||
final class MQClient extends BaseMQClient implements MQClientInterface {
|
class MQClient extends BaseMQClient implements MQClientInterface {
|
||||||
/// Private constructor to create the `MQClient` instance.
|
/// Private constructor to create the `MQClient` instance.
|
||||||
MQClient._internal() {
|
MQClient._internal() {
|
||||||
_exchanges.register('', DefaultExchange(''));
|
_exchanges.register('', DefaultExchange(''));
|
||||||
|
@ -137,6 +137,16 @@ final class MQClient extends BaseMQClient implements MQClientInterface {
|
||||||
)
|
)
|
||||||
.toList();
|
.toList();
|
||||||
|
|
||||||
|
void deleteMessage(String queueId, Message message) {
|
||||||
|
try {
|
||||||
|
final queue = _fetchQueue(queueId);
|
||||||
|
queue.removeMessage(message);
|
||||||
|
} on QueueNotRegisteredException {
|
||||||
|
// Queue doesn't exist, so we can't delete the message
|
||||||
|
// We might want to log this or handle it in some way
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
void sendMessage({
|
void sendMessage({
|
||||||
required Message message,
|
required Message message,
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:angel3_mq/mq.dart';
|
||||||
import 'package:angel3_mq/src/queue/data_stream.base.dart';
|
import 'package:angel3_mq/src/queue/data_stream.base.dart';
|
||||||
import 'package:equatable/equatable.dart';
|
import 'package:equatable/equatable.dart';
|
||||||
|
|
||||||
|
@ -23,14 +26,35 @@ import 'package:equatable/equatable.dart';
|
||||||
/// final hasListeners = myQueue.hasListeners();
|
/// final hasListeners = myQueue.hasListeners();
|
||||||
/// ```
|
/// ```
|
||||||
class Queue extends BaseDataStream with EquatableMixin {
|
class Queue extends BaseDataStream with EquatableMixin {
|
||||||
/// Creates a new queue with the specified [id].
|
|
||||||
///
|
|
||||||
/// The [id] parameter is a unique identifier for the queue.
|
|
||||||
Queue(this.id);
|
Queue(this.id);
|
||||||
|
|
||||||
/// The unique identifier for the queue.
|
|
||||||
final String id;
|
final String id;
|
||||||
|
final StreamController<Message> _controller =
|
||||||
|
StreamController<Message>.broadcast();
|
||||||
|
Message? _latestMessage;
|
||||||
|
|
||||||
|
void addMessage(Message message) {
|
||||||
|
_latestMessage = message;
|
||||||
|
_controller.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Stream<Message> get dataStream => _controller.stream;
|
||||||
|
|
||||||
|
Message? get latestMessage => _latestMessage;
|
||||||
|
|
||||||
|
bool hasListeners() => _controller.hasListener;
|
||||||
|
|
||||||
|
void dispose() {
|
||||||
|
_controller.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// New method to remove a message
|
||||||
|
void removeMessage(Message message) {
|
||||||
|
if (_latestMessage == message) {
|
||||||
|
_latestMessage = null;
|
||||||
|
}
|
||||||
|
// Note: We can't remove past messages from the stream,
|
||||||
|
// but we can prevent this message from being processed again in the future.
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
|
||||||
List<Object?> get props => [id];
|
List<Object?> get props => [id];
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ environment:
|
||||||
|
|
||||||
dependencies:
|
dependencies:
|
||||||
equatable: ^2.0.5
|
equatable: ^2.0.5
|
||||||
|
uuid: ^4.5.1
|
||||||
|
|
||||||
dev_dependencies:
|
dev_dependencies:
|
||||||
lints: ^3.0.0
|
lints: ^3.0.0
|
||||||
|
|
Loading…
Reference in a new issue