Compare commits

...

2 commits

Author SHA1 Message Date
Patrick Stewart
ee9d512c1f Update: added id field to message made mqclient reachable from outside 2024-10-04 19:58:26 -07:00
Patrick Stewart
dcddc2992a Add: adding events package not ready for production use passing test 2024-10-04 19:57:22 -07:00
14 changed files with 1004 additions and 15 deletions

7
core/events/.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

3
core/events/CHANGELOG.md Normal file
View file

@ -0,0 +1,3 @@
## 1.0.0
- Initial version.

10
core/events/LICENSE.md Normal file
View file

@ -0,0 +1,10 @@
The MIT License (MIT)
The Laravel Framework is Copyright (c) Taylor Otwell
The Fabric Framework is Copyright (c) Vieo, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

1
core/events/README.md Normal file
View file

@ -0,0 +1 @@
<p align="center"><a href="https://protevus.com" target="_blank"><img src="https://git.protevus.com/protevus/branding/raw/branch/main/protevus-logo-bg.png"></a></p>

View file

@ -0,0 +1,30 @@
# This file configures the static analysis results for your project (errors,
# warnings, and lints).
#
# This enables the 'recommended' set of lints from `package:lints`.
# This set helps identify many issues that may lead to problems when running
# or consuming Dart code, and enforces writing Dart using a single, idiomatic
# style and format.
#
# If you want a smaller set of lints you can change this to specify
# 'package:lints/core.yaml'. These are just the most critical lints
# (the recommended set includes the core lints).
# The core lints are also what is used by pub.dev for scoring packages.
include: package:lints/recommended.yaml
# Uncomment the following section to specify additional rules.
# linter:
# rules:
# - camel_case_types
# analyzer:
# exclude:
# - path/to/excluded/files/**
# For more information about the core and recommended set of lints, see
# https://dart.dev/go/core-lints
# For additional information about configuring this file, see
# https://dart.dev/guides/language/analysis-options

View file

@ -0,0 +1,3 @@
library;
export 'src/dispatcher.dart';

View file

@ -0,0 +1,490 @@
import 'dart:async';
import 'package:angel3_container/angel3_container.dart';
import 'package:angel3_reactivex/angel3_reactivex.dart';
import 'package:angel3_event_bus/event_bus.dart';
import 'package:angel3_mq/mq.dart';
// Simulating some of the Laravel interfaces/classes
abstract class ShouldBroadcast {}
abstract class ShouldQueue {}
abstract class ShouldBeEncrypted {}
abstract class ShouldDispatchAfterCommit {}
class Dispatcher implements DispatcherContract {
// Properties as specified in YAML
final Container container;
final Map<String, List<Function>> _listeners = {};
final Map<String, List<Function>> _wildcards = {};
final Map<String, List<Function>> _wildcardsCache = {};
late final Function _queueResolver;
late final Function _transactionManagerResolver;
final Map<String, List<Function>> _eventBusListeners = {};
final Map<String, Completer<dynamic>> _untilCompleters = {};
final Map<String, StreamSubscription> _eventBusSubscriptions = {};
final Set<String> _processedMessageIds = {};
// Properties for Angel3 packages
final EventBus _eventBus;
late final MQClient? _mqClient;
final Map<String, BehaviorSubject<dynamic>> _subjects = {};
// Queue and exchange names
static const String _eventsQueue = 'events_queue';
static const String _delayedEventsQueue = 'delayed_events_queue';
static const String _eventsExchange = 'events_exchange';
Dispatcher(this.container) : _eventBus = EventBus();
// Setter for _mqClient
set mqClient(MQClient client) {
_mqClient = client;
_setupQueuesAndExchanges();
_startProcessingQueuedEvents();
}
void _setupQueuesAndExchanges() {
_mqClient?.declareQueue(_eventsQueue);
_mqClient?.declareQueue(_delayedEventsQueue);
_mqClient?.declareExchange(
exchangeName: _eventsExchange,
exchangeType: ExchangeType.direct,
);
_mqClient?.bindQueue(
queueId: _eventsQueue,
exchangeName: _eventsExchange,
bindingKey: _eventsQueue,
);
_mqClient?.bindQueue(
queueId: _delayedEventsQueue,
exchangeName: _eventsExchange,
bindingKey: _delayedEventsQueue,
);
}
void _startProcessingQueuedEvents() {
_mqClient?.fetchQueue(_eventsQueue).listen((Message message) async {
if (message.payload is Map) {
final eventData = message.payload as Map<String, dynamic>;
if (eventData.containsKey('event') &&
eventData.containsKey('payload')) {
await dispatch(eventData['event'], eventData['payload']);
} else {
print('Invalid message format: ${message.payload}');
}
} else {
print('Unexpected payload type: ${message.payload.runtimeType}');
}
});
}
@override
void listen(dynamic events, dynamic listener) {
if (events is String) {
_addListener(events, listener);
} else if (events is List) {
for (var event in events) {
_addListener(event, listener);
}
}
if (events is String && events.contains('*')) {
_setupWildcardListen(events, listener);
}
}
void _addListener(String event, dynamic listener) {
_listeners.putIfAbsent(event, () => []).add(listener);
// Create a subject for this event if it doesn't exist
_subjects.putIfAbsent(event, () => BehaviorSubject<dynamic>());
// Add EventBus listener and store the subscription
final subscription = _eventBus.on().listen((AppEvent busEvent) {
if (busEvent is CustomAppEvent && busEvent.eventName == event) {
listener(event, busEvent.payload);
}
});
_eventBusSubscriptions[event] = subscription;
}
void _setupWildcardListen(String event, Function listener) {
_wildcards.putIfAbsent(event, () => []).add(listener);
_wildcardsCache.clear();
}
@override
bool hasListeners(String eventName) {
return _listeners.containsKey(eventName) ||
_wildcards.containsKey(eventName) ||
hasWildcardListeners(eventName);
}
bool hasWildcardListeners(String eventName) {
return _wildcards.keys
.any((pattern) => _isWildcardMatch(pattern, eventName));
}
@override
void push(String event, [dynamic payload]) {
final effectivePayload = payload ?? [];
_mqClient?.sendMessage(
exchangeName: _eventsExchange,
routingKey: _delayedEventsQueue,
message: Message(
headers: {'expiration': '5000'}, // 5 seconds delay
payload: {
'event': event,
'payload':
effectivePayload is List ? effectivePayload : [effectivePayload],
},
timestamp: DateTime.now().toIso8601String(),
id: 'msg_${DateTime.now().millisecondsSinceEpoch}', // Ensure unique ID
),
);
}
@override
Future<void> flush(String event) async {
final messageStream = _mqClient?.fetchQueue(_delayedEventsQueue);
if (messageStream == null) {
print('Warning: MQClient is not initialized');
return;
}
final messagesToProcess = <Message>[];
// Collect messages to process
await for (final message in messageStream) {
print('Examining message: ${message.id}');
if (message.payload is Map<String, dynamic> &&
!_processedMessageIds.contains(message.id)) {
final eventData = message.payload as Map<String, dynamic>;
if (eventData['event'] == event) {
print('Adding message to process: ${message.id}');
messagesToProcess.add(message);
}
}
}
print('Total messages to process: ${messagesToProcess.length}');
// Process collected messages
for (final message in messagesToProcess) {
final eventData = message.payload as Map<String, dynamic>;
print('Processing message: ${message.id}');
await dispatch(eventData['event'], eventData['payload']);
_mqClient?.deleteMessage(_delayedEventsQueue, message);
_processedMessageIds.add(message.id);
}
}
@override
void subscribe(dynamic subscriber) {
if (subscriber is EventBusSubscriber) {
subscriber.subscribe(_eventBus);
} else {
// Handle other types of subscribers
}
}
@override
Future<dynamic> until(dynamic event, [dynamic payload]) {
if (event is String) {
final completer = Completer<dynamic>();
_untilCompleters[event] = completer;
// Set up a one-time listener for this event
listen(event, (dynamic e, dynamic p) {
if (!completer.isCompleted) {
completer.complete(p);
_untilCompleters.remove(event);
}
});
// If payload is provided, dispatch the event immediately
if (payload != null) {
// Use dispatch instead of push to ensure immediate processing
dispatch(event, payload);
}
return completer.future;
}
throw ArgumentError('Event must be a String');
}
@override
Future<dynamic> dispatch(dynamic event, [dynamic payload, bool? halt]) async {
final eventName = event is String ? event : event.runtimeType.toString();
final eventPayload = payload ?? (event is AppEvent ? event : []);
if (event is ShouldBroadcast ||
(eventPayload is List &&
eventPayload.isNotEmpty &&
eventPayload[0] is ShouldBroadcast)) {
await _broadcastEvent(event);
}
if (event is ShouldQueue ||
(eventPayload is List &&
eventPayload.isNotEmpty &&
eventPayload[0] is ShouldQueue)) {
return _queueEvent(eventName, eventPayload);
}
final listeners = getListeners(eventName);
for (var listener in listeners) {
final response =
await Function.apply(listener, [eventName, eventPayload]);
if (halt == true && response != null) {
return response;
}
if (response == false) {
break;
}
}
return halt == true ? null : listeners;
}
// void _addToSubject(String eventName, dynamic payload) {
// if (_subjects.containsKey(eventName)) {
// _subjects[eventName]!.add(payload);
// }
// }
@override
List<Function> getListeners(String eventName) {
var listeners = <Function>[
...(_listeners[eventName] ?? []),
...(_wildcardsCache[eventName] ?? _getWildcardListeners(eventName)),
...(_eventBusListeners[eventName] ?? []),
];
return listeners;
}
List<Function> _getWildcardListeners(String eventName) {
final wildcardListeners = <Function>[];
for (var entry in _wildcards.entries) {
if (_isWildcardMatch(entry.key, eventName)) {
wildcardListeners.addAll(entry.value);
}
}
_wildcardsCache[eventName] = wildcardListeners;
return wildcardListeners;
}
@override
void forget(String event) {
// Remove from _listeners
_listeners.remove(event);
// Remove from _subjects
if (_subjects.containsKey(event)) {
_subjects[event]?.close();
_subjects.remove(event);
}
// Cancel and remove EventBus subscription
_eventBusSubscriptions[event]?.cancel();
_eventBusSubscriptions.remove(event);
// Remove from wildcards if applicable
if (event.contains('*')) {
_wildcards.remove(event);
_wildcardsCache.clear();
} else {
// If it's not a wildcard, we need to remove it from any matching wildcard listeners
_wildcards.forEach((pattern, listeners) {
if (_isWildcardMatch(pattern, event)) {
_wildcards[pattern] = listeners
.where((listener) => listener != _listeners[event])
.toList();
}
});
_wildcardsCache.clear();
}
// Remove any 'until' completers for this event
_untilCompleters.remove(event);
}
@override
void forgetPushed() {
_listeners.removeWhere((key, _) => key.endsWith('_pushed'));
_eventBusListeners.removeWhere((key, _) => key.endsWith('_pushed'));
// Note: We're not clearing all EventBus listeners here, as that might affect other parts of your application
}
@override
void setQueueResolver(Function resolver) {
_queueResolver = resolver;
}
@override
void setTransactionManagerResolver(Function resolver) {
_transactionManagerResolver = resolver;
}
@override
Map<String, List<Function>> getRawListeners() {
return Map.unmodifiable(_listeners);
}
bool _shouldBroadcast(List payload) {
return payload.isNotEmpty && payload[0] is ShouldBroadcast;
}
Future<void> _broadcastEvent(dynamic event) async {
// Implement broadcasting logic here
// For now, we'll just print a message
print('Broadcasting event: ${event.runtimeType}');
}
bool _isWildcardMatch(String pattern, String eventName) {
final regExp = RegExp('^${pattern.replaceAll('*', '.*')}\$');
return regExp.hasMatch(eventName);
}
bool _shouldQueue(List payload) {
return payload.isNotEmpty && payload[0] is ShouldQueue;
}
Future<void> _queueEvent(String eventName, dynamic payload) async {
_mqClient?.sendMessage(
exchangeName: _eventsExchange,
routingKey: _eventsQueue,
message: Message(
payload: {'event': eventName, 'payload': payload},
timestamp: DateTime.now().toIso8601String(),
),
);
}
// Updated on<T> method
Stream<T> on<T>(String event) {
return (_subjects
.putIfAbsent(event, () => BehaviorSubject<dynamic>())
.stream as Stream<T>)
.where((event) => event is T)
.cast<T>();
}
// In your Dispatcher class
void setMQClient(MQClient client) {
_mqClient = client;
}
// Method to close the MQClient connection
Future<void> close() async {
_mqClient?.close();
}
// Don't forget to close the subjects when they're no longer needed
void dispose() {
for (var subject in _subjects.values) {
subject.close();
}
}
}
// ... rest of the code (DispatcherContract, EventBusSubscriber, etc.) remains the same
abstract class DispatcherContract {
void listen(dynamic events, dynamic listener);
bool hasListeners(String eventName);
void push(String event, [dynamic payload]);
Future<void> flush(String event);
void subscribe(dynamic subscriber);
Future<dynamic> until(dynamic event, [dynamic payload]);
Future<dynamic> dispatch(dynamic event, [dynamic payload, bool halt]);
List<Function> getListeners(String eventName);
void forget(String event);
void forgetPushed();
void setQueueResolver(Function resolver);
void setTransactionManagerResolver(Function resolver);
Map<String, List<Function>> getRawListeners();
}
// Helper class for EventBus subscribers
abstract class EventBusSubscriber {
void subscribe(EventBus eventBus);
}
// Mixin to simulate Macroable trait
mixin Macroable {
// Implementation of Macroable functionality
}
// Mixin to simulate ReflectsClosures trait
mixin ReflectsClosures {
// Implementation of ReflectsClosures functionality
}
// If not already defined, you might need to create an Event class
class Event {
final String name;
final dynamic data;
Event(this.name, this.data);
}
// Custom AppEvent subclasses for handling different event types
class StringBasedEvent extends AppEvent {
final String eventName;
final dynamic payload;
StringBasedEvent(this.eventName, this.payload);
@override
List<Object?> get props => [eventName, payload];
}
class CustomAppEvent extends AppEvent {
final String eventName;
final dynamic payload;
CustomAppEvent(this.eventName, this.payload);
@override
List<Object?> get props => [eventName, payload];
}
// This is a simple implementation of Reflector that does nothing
class EmptyReflector implements Reflector {
const EmptyReflector();
@override
ReflectedType reflectType(Type type) {
throw UnimplementedError();
}
@override
ReflectedInstance reflectInstance(Object object) {
throw UnimplementedError();
}
@override
ReflectedType reflectFutureOf(Type type) {
throw UnimplementedError();
}
@override
String? getName(Symbol symbol) {
// TODO: implement getName
throw UnimplementedError();
}
@override
ReflectedClass? reflectClass(Type clazz) {
// TODO: implement reflectClass
throw UnimplementedError();
}
@override
ReflectedFunction? reflectFunction(Function function) {
// TODO: implement reflectFunction
throw UnimplementedError();
}
}

21
core/events/pubspec.yaml Normal file
View file

@ -0,0 +1,21 @@
name: angel3_events
description: The Events Package for the Protevus Platform
version: 0.0.1
homepage: https://protevus.com
documentation: https://docs.protevus.com
repository: https://github.com/protevus/platformo
environment:
sdk: ^3.4.2
# Add regular dependencies here.
dependencies:
angel3_container: ^9.0.0
angel3_mq: ^9.0.0
angel3_event_bus: ^9.0.0
angel3_framework: ^9.0.0
angel3_reactivex: ^0.27.5
# path: ^1.8.0
dev_dependencies:
lints: ^3.0.0
test: ^1.24.0

View file

@ -0,0 +1,379 @@
import 'package:angel3_event_bus/res/app_event.dart';
import 'package:test/test.dart';
import 'package:angel3_container/angel3_container.dart';
import 'package:angel3_mq/mq.dart';
import 'package:angel3_events/dispatcher.dart'; // Replace with the actual import path
void main() {
late Dispatcher dispatcher;
late MockMQClient mockMQClient;
setUp(() {
var container = Container(EmptyReflector());
dispatcher = Dispatcher(container);
mockMQClient = MockMQClient();
dispatcher.mqClient = mockMQClient; // Use the setter
// Clear the queue before each test
mockMQClient.queuedMessages.clear();
});
group('Dispatcher', () {
test('listen and dispatch', () async {
var callCount = 0;
dispatcher.listen('test_event', (dynamic event, dynamic payload) {
expect(event, equals('test_event'));
expect(payload, equals(['test_payload']));
callCount++;
});
await dispatcher.dispatch('test_event', ['test_payload']);
expect(callCount, equals(1));
});
test('wildcard listener', () async {
var callCount = 0;
dispatcher.listen('test.*', (dynamic event, dynamic payload) {
expect(event, matches(RegExp(r'^test\.')));
callCount++;
});
await dispatcher.dispatch('test.one', ['payload1']);
await dispatcher.dispatch('test.two', ['payload2']);
expect(callCount, equals(2));
});
test('hasListeners', () {
dispatcher.listen('test_event', (dynamic event, dynamic payload) {});
expect(dispatcher.hasListeners('test_event'), isTrue);
expect(dispatcher.hasListeners('non_existent_event'), isFalse);
});
test('until', () async {
// Test without pushing the event immediately
var futureResult = dispatcher.until('test_event');
// Use a small delay to ensure the until listener is set up
await Future.delayed(Duration(milliseconds: 10));
await dispatcher.dispatch('test_event', ['test_payload']);
var result = await futureResult;
expect(result, equals(['test_payload']));
// Test with pushing the event immediately
result =
await dispatcher.until('another_test_event', ['another_payload']);
expect(result, equals(['another_payload']));
}, timeout: Timeout(Duration(seconds: 5))); // Add a reasonable timeout
test('forget', () async {
var callCount = 0;
dispatcher.listen('test_event', (dynamic event, dynamic payload) {
callCount++;
});
await dispatcher.dispatch('test_event');
expect(callCount, equals(1));
dispatcher.forget('test_event');
await dispatcher.dispatch('test_event');
expect(callCount, equals(1)); // Should not increase
});
test('push and flush', () async {
print('Starting push and flush test');
// Push 4 messages
for (var i = 0; i < 4; i++) {
dispatcher.push('delayed_event', ['delayed_payload_$i']);
}
// Verify that 4 messages were queued
expect(mockMQClient.queuedMessages['delayed_events_queue']?.length,
equals(4),
reason: 'Should have queued exactly 4 messages');
print(
'Queued messages: ${mockMQClient.queuedMessages['delayed_events_queue']?.length}');
var callCount = 0;
var processedPayloads = <String>[];
// Remove any existing listeners
dispatcher.forget('delayed_event');
dispatcher.listen('delayed_event', (dynamic event, dynamic payload) {
print('Listener called with payload: $payload');
expect(event, equals('delayed_event'));
expect(payload[0], startsWith('delayed_payload_'));
processedPayloads.add(payload[0]);
callCount++;
});
await dispatcher.flush('delayed_event');
print('After flush - Call count: $callCount');
print('Processed payloads: $processedPayloads');
expect(callCount, equals(4), reason: 'Should process exactly 4 messages');
expect(processedPayloads.toSet().length, equals(4),
reason: 'All payloads should be unique');
// Verify that all messages were removed from the queue
expect(mockMQClient.queuedMessages['delayed_events_queue']?.length,
equals(0),
reason: 'Queue should be empty after flush');
// Flush again to ensure no more messages are processed
await dispatcher.flush('delayed_event');
expect(callCount, equals(4),
reason: 'Should still be 4 after second flush');
});
test('shouldBroadcast', () async {
var broadcastEvent = BroadcastTestEvent();
var callCount = 0;
dispatcher.listen('BroadcastTestEvent', (dynamic event, dynamic payload) {
callCount++;
});
await dispatcher.dispatch(broadcastEvent);
expect(callCount, equals(1));
});
test('shouldQueue', () async {
var queueEvent = QueueTestEvent();
await dispatcher.dispatch(queueEvent);
expect(mockMQClient.queuedMessages['events_queue'], isNotEmpty);
expect(mockMQClient.queuedMessages['events_queue']!.first.payload,
containsPair('event', 'QueueTestEvent'));
});
});
}
abstract class MQClientWrapper {
Stream<Message> fetchQueue(String queueId);
void sendMessage({
required Message message,
String? exchangeName,
String? routingKey,
});
String declareQueue(String queueId);
void declareExchange({
required String exchangeName,
required ExchangeType exchangeType,
});
void bindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
});
void close();
}
class RealMQClientWrapper implements MQClientWrapper {
final MQClient _client;
RealMQClientWrapper(this._client);
@override
Stream<Message> fetchQueue(String queueId) => _client.fetchQueue(queueId);
@override
void sendMessage({
required Message message,
String? exchangeName,
String? routingKey,
}) =>
_client.sendMessage(
message: message,
exchangeName: exchangeName,
routingKey: routingKey,
);
@override
String declareQueue(String queueId) => _client.declareQueue(queueId);
@override
void declareExchange({
required String exchangeName,
required ExchangeType exchangeType,
}) =>
_client.declareExchange(
exchangeName: exchangeName,
exchangeType: exchangeType,
);
@override
void bindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
}) =>
_client.bindQueue(
queueId: queueId,
exchangeName: exchangeName,
bindingKey: bindingKey,
);
@override
void close() => _client.close();
}
class MockMQClient implements MQClient {
Map<String, List<Message>> queuedMessages = {};
int _messageIdCounter = 0;
void queueMessage(String queueName, Message message) {
queuedMessages.putIfAbsent(queueName, () => []).add(message);
print(
'Queued message. Queue $queueName now has ${queuedMessages[queueName]?.length} messages');
}
@override
String declareQueue(String queueId) {
queuedMessages[queueId] = [];
return queueId;
}
@override
void deleteQueue(String queueId) {
queuedMessages.remove(queueId);
}
@override
Stream<Message> fetchQueue(String queueId) {
print('Fetching queue: $queueId');
return Stream.fromIterable(queuedMessages[queueId] ?? []);
}
@override
void sendMessage({
required Message message,
String? exchangeName,
String? routingKey,
}) {
print('Sending message to queue: $routingKey');
final newMessage = Message(
payload: message.payload,
headers: message.headers,
timestamp: message.timestamp,
id: 'msg_${_messageIdCounter++}',
);
queueMessage(routingKey ?? '', newMessage);
}
@override
Message? getLatestMessage(String queueId) {
final messages = queuedMessages[queueId];
return messages?.isNotEmpty == true ? messages!.last : null;
}
@override
void bindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
}) {
// Implement if needed for your tests
}
@override
void unbindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
}) {
// Implement if needed for your tests
}
@override
void declareExchange({
required String exchangeName,
required ExchangeType exchangeType,
}) {
// Implement if needed for your tests
}
@override
void deleteExchange(String exchangeName) {
// Implement if needed for your tests
}
@override
List<String> listQueues() {
return queuedMessages.keys.toList();
}
@override
void close() {
queuedMessages.clear();
}
@override
void deleteMessage(String queueId, Message message) {
print('Deleting message from queue: $queueId');
queuedMessages[queueId]?.removeWhere((m) => m.id == message.id);
print(
'After deletion, queue $queueId has ${queuedMessages[queueId]?.length} messages');
}
}
class BroadcastTestEvent implements AppEvent, ShouldBroadcast {
@override
List<Object?> get props => [];
@override
bool? get stringify => true;
@override
DateTime get timestamp => DateTime.now();
}
class QueueTestEvent implements AppEvent, ShouldQueue {
@override
List<Object?> get props => [];
@override
bool? get stringify => true;
@override
DateTime get timestamp => DateTime.now();
}
// This is a simple implementation of Reflector that does nothing
class EmptyReflector implements Reflector {
const EmptyReflector();
@override
ReflectedType reflectType(Type type) {
throw UnimplementedError();
}
@override
ReflectedInstance reflectInstance(Object object) {
throw UnimplementedError();
}
@override
ReflectedType reflectFutureOf(Type type) {
throw UnimplementedError();
}
@override
String? getName(Symbol symbol) {
// TODO: implement getName
throw UnimplementedError();
}
@override
ReflectedClass? reflectClass(Type clazz) {
// TODO: implement reflectClass
throw UnimplementedError();
}
@override
ReflectedFunction? reflectFunction(Function function) {
// TODO: implement reflectFunction
throw UnimplementedError();
}
}

View file

@ -1,4 +1,5 @@
import 'package:angel3_mq/src/message/message.base.dart';
import 'package:uuid/uuid.dart';
/// Represents a message with headers, payload, and an optional timestamp.
///
@ -14,8 +15,7 @@ import 'package:angel3_mq/src/message/message.base.dart';
/// );
/// ```
class Message extends BaseMessage {
/// Creates a new [Message] with the specified headers, payload, and
/// timestamp.
/// Creates a new [Message] with the specified headers, payload, timestamp, and id.
///
/// The [headers] parameter is a map that can contain additional information
/// about the message. It is optional and defaults to an empty map if not
@ -28,24 +28,33 @@ class Message extends BaseMessage {
/// indicating when the message was created. If not provided, the current
/// timestamp will be used.
///
/// The [id] parameter is an optional unique identifier for the message.
/// If not provided, a new UUID will be generated.
///
/// Example:
/// ```dart
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// id: '123e4567-e89b-12d3-a456-426614174000',
/// );
/// ```
Message({
required Object payload,
Map<String, dynamic>? headers,
String? timestamp,
}) : super(
String? id,
}) : id = id ?? Uuid().v4(),
super(
headers,
payload,
timestamp ?? DateTime.now().toUtc().toIso8601String(),
);
/// A unique identifier for the message.
final String id;
/// Returns a human-readable string representation of the message.
///
/// Example:
@ -68,9 +77,10 @@ class Message extends BaseMessage {
String toString() {
return '''
Message{
headers: $headers,
payload: $payload,
timestamp: $timestamp,
}''';
id: $id,
headers: $headers,
payload: $payload,
timestamp: $timestamp,
}''';
}
}

View file

@ -11,4 +11,4 @@
/// // Custom implementation of the message queue client.
/// }
/// ```
abstract base class BaseMQClient {}
abstract class BaseMQClient {}

View file

@ -55,7 +55,7 @@ import 'package:angel3_mq/src/queue/queue.dart';
/// print('Received message: $message');
/// });
/// ```
final class MQClient extends BaseMQClient implements MQClientInterface {
class MQClient extends BaseMQClient implements MQClientInterface {
/// Private constructor to create the `MQClient` instance.
MQClient._internal() {
_exchanges.register('', DefaultExchange(''));
@ -137,6 +137,16 @@ final class MQClient extends BaseMQClient implements MQClientInterface {
)
.toList();
void deleteMessage(String queueId, Message message) {
try {
final queue = _fetchQueue(queueId);
queue.removeMessage(message);
} on QueueNotRegisteredException {
// Queue doesn't exist, so we can't delete the message
// We might want to log this or handle it in some way
}
}
@override
void sendMessage({
required Message message,

View file

@ -1,3 +1,6 @@
import 'dart:async';
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/queue/data_stream.base.dart';
import 'package:equatable/equatable.dart';
@ -23,14 +26,35 @@ import 'package:equatable/equatable.dart';
/// final hasListeners = myQueue.hasListeners();
/// ```
class Queue extends BaseDataStream with EquatableMixin {
/// Creates a new queue with the specified [id].
///
/// The [id] parameter is a unique identifier for the queue.
Queue(this.id);
/// The unique identifier for the queue.
final String id;
final StreamController<Message> _controller =
StreamController<Message>.broadcast();
Message? _latestMessage;
void addMessage(Message message) {
_latestMessage = message;
_controller.add(message);
}
Stream<Message> get dataStream => _controller.stream;
Message? get latestMessage => _latestMessage;
bool hasListeners() => _controller.hasListener;
void dispose() {
_controller.close();
}
// New method to remove a message
void removeMessage(Message message) {
if (_latestMessage == message) {
_latestMessage = null;
}
// Note: We can't remove past messages from the stream,
// but we can prevent this message from being processed again in the future.
}
@override
List<Object?> get props => [id];
}

View file

@ -11,6 +11,7 @@ environment:
dependencies:
equatable: ^2.0.5
uuid: ^4.5.1
dev_dependencies:
lints: ^3.0.0