Update: added id field to message made mqclient reachable from outside

This commit is contained in:
Patrick Stewart 2024-10-04 19:58:26 -07:00
parent dcddc2992a
commit ee9d512c1f
5 changed files with 60 additions and 15 deletions

View file

@ -1,4 +1,5 @@
import 'package:angel3_mq/src/message/message.base.dart'; import 'package:angel3_mq/src/message/message.base.dart';
import 'package:uuid/uuid.dart';
/// Represents a message with headers, payload, and an optional timestamp. /// Represents a message with headers, payload, and an optional timestamp.
/// ///
@ -14,8 +15,7 @@ import 'package:angel3_mq/src/message/message.base.dart';
/// ); /// );
/// ``` /// ```
class Message extends BaseMessage { class Message extends BaseMessage {
/// Creates a new [Message] with the specified headers, payload, and /// Creates a new [Message] with the specified headers, payload, timestamp, and id.
/// timestamp.
/// ///
/// The [headers] parameter is a map that can contain additional information /// The [headers] parameter is a map that can contain additional information
/// about the message. It is optional and defaults to an empty map if not /// about the message. It is optional and defaults to an empty map if not
@ -28,24 +28,33 @@ class Message extends BaseMessage {
/// indicating when the message was created. If not provided, the current /// indicating when the message was created. If not provided, the current
/// timestamp will be used. /// timestamp will be used.
/// ///
/// The [id] parameter is an optional unique identifier for the message.
/// If not provided, a new UUID will be generated.
///
/// Example: /// Example:
/// ```dart /// ```dart
/// final message = Message( /// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'}, /// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'}, /// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002', /// timestamp: '2023-09-07T12:00:002',
/// id: '123e4567-e89b-12d3-a456-426614174000',
/// ); /// );
/// ``` /// ```
Message({ Message({
required Object payload, required Object payload,
Map<String, dynamic>? headers, Map<String, dynamic>? headers,
String? timestamp, String? timestamp,
}) : super( String? id,
}) : id = id ?? Uuid().v4(),
super(
headers, headers,
payload, payload,
timestamp ?? DateTime.now().toUtc().toIso8601String(), timestamp ?? DateTime.now().toUtc().toIso8601String(),
); );
/// A unique identifier for the message.
final String id;
/// Returns a human-readable string representation of the message. /// Returns a human-readable string representation of the message.
/// ///
/// Example: /// Example:
@ -68,9 +77,10 @@ class Message extends BaseMessage {
String toString() { String toString() {
return ''' return '''
Message{ Message{
id: $id,
headers: $headers, headers: $headers,
payload: $payload, payload: $payload,
timestamp: $timestamp, timestamp: $timestamp,
}'''; }''';
} }
} }

View file

@ -11,4 +11,4 @@
/// // Custom implementation of the message queue client. /// // Custom implementation of the message queue client.
/// } /// }
/// ``` /// ```
abstract base class BaseMQClient {} abstract class BaseMQClient {}

View file

@ -55,7 +55,7 @@ import 'package:angel3_mq/src/queue/queue.dart';
/// print('Received message: $message'); /// print('Received message: $message');
/// }); /// });
/// ``` /// ```
final class MQClient extends BaseMQClient implements MQClientInterface { class MQClient extends BaseMQClient implements MQClientInterface {
/// Private constructor to create the `MQClient` instance. /// Private constructor to create the `MQClient` instance.
MQClient._internal() { MQClient._internal() {
_exchanges.register('', DefaultExchange('')); _exchanges.register('', DefaultExchange(''));
@ -137,6 +137,16 @@ final class MQClient extends BaseMQClient implements MQClientInterface {
) )
.toList(); .toList();
void deleteMessage(String queueId, Message message) {
try {
final queue = _fetchQueue(queueId);
queue.removeMessage(message);
} on QueueNotRegisteredException {
// Queue doesn't exist, so we can't delete the message
// We might want to log this or handle it in some way
}
}
@override @override
void sendMessage({ void sendMessage({
required Message message, required Message message,

View file

@ -1,3 +1,6 @@
import 'dart:async';
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/queue/data_stream.base.dart'; import 'package:angel3_mq/src/queue/data_stream.base.dart';
import 'package:equatable/equatable.dart'; import 'package:equatable/equatable.dart';
@ -23,14 +26,35 @@ import 'package:equatable/equatable.dart';
/// final hasListeners = myQueue.hasListeners(); /// final hasListeners = myQueue.hasListeners();
/// ``` /// ```
class Queue extends BaseDataStream with EquatableMixin { class Queue extends BaseDataStream with EquatableMixin {
/// Creates a new queue with the specified [id].
///
/// The [id] parameter is a unique identifier for the queue.
Queue(this.id); Queue(this.id);
/// The unique identifier for the queue.
final String id; final String id;
final StreamController<Message> _controller =
StreamController<Message>.broadcast();
Message? _latestMessage;
void addMessage(Message message) {
_latestMessage = message;
_controller.add(message);
}
Stream<Message> get dataStream => _controller.stream;
Message? get latestMessage => _latestMessage;
bool hasListeners() => _controller.hasListener;
void dispose() {
_controller.close();
}
// New method to remove a message
void removeMessage(Message message) {
if (_latestMessage == message) {
_latestMessage = null;
}
// Note: We can't remove past messages from the stream,
// but we can prevent this message from being processed again in the future.
}
@override
List<Object?> get props => [id]; List<Object?> get props => [id];
} }

View file

@ -11,6 +11,7 @@ environment:
dependencies: dependencies:
equatable: ^2.0.5 equatable: ^2.0.5
uuid: ^4.5.1
dev_dependencies: dev_dependencies:
lints: ^3.0.0 lints: ^3.0.0