diff --git a/core/mqueue/lib/src/message/message.dart b/core/mqueue/lib/src/message/message.dart index fadb80c2..053ab136 100644 --- a/core/mqueue/lib/src/message/message.dart +++ b/core/mqueue/lib/src/message/message.dart @@ -1,4 +1,5 @@ import 'package:angel3_mq/src/message/message.base.dart'; +import 'package:uuid/uuid.dart'; /// Represents a message with headers, payload, and an optional timestamp. /// @@ -14,8 +15,7 @@ import 'package:angel3_mq/src/message/message.base.dart'; /// ); /// ``` class Message extends BaseMessage { - /// Creates a new [Message] with the specified headers, payload, and - /// timestamp. + /// Creates a new [Message] with the specified headers, payload, timestamp, and id. /// /// The [headers] parameter is a map that can contain additional information /// about the message. It is optional and defaults to an empty map if not @@ -28,24 +28,33 @@ class Message extends BaseMessage { /// indicating when the message was created. If not provided, the current /// timestamp will be used. /// + /// The [id] parameter is an optional unique identifier for the message. + /// If not provided, a new UUID will be generated. + /// /// Example: /// ```dart /// final message = Message( /// headers: {'contentType': 'json', 'sender': 'Alice'}, /// payload: {'text': 'Hello, World!'}, /// timestamp: '2023-09-07T12:00:002', + /// id: '123e4567-e89b-12d3-a456-426614174000', /// ); /// ``` Message({ required Object payload, Map? headers, String? timestamp, - }) : super( + String? id, + }) : id = id ?? Uuid().v4(), + super( headers, payload, timestamp ?? DateTime.now().toUtc().toIso8601String(), ); + /// A unique identifier for the message. + final String id; + /// Returns a human-readable string representation of the message. /// /// Example: @@ -68,9 +77,10 @@ class Message extends BaseMessage { String toString() { return ''' Message{ - headers: $headers, - payload: $payload, - timestamp: $timestamp, - }'''; + id: $id, + headers: $headers, + payload: $payload, + timestamp: $timestamp, +}'''; } } diff --git a/core/mqueue/lib/src/mq/mq.base.dart b/core/mqueue/lib/src/mq/mq.base.dart index aba73789..2acafd7c 100644 --- a/core/mqueue/lib/src/mq/mq.base.dart +++ b/core/mqueue/lib/src/mq/mq.base.dart @@ -11,4 +11,4 @@ /// // Custom implementation of the message queue client. /// } /// ``` -abstract base class BaseMQClient {} +abstract class BaseMQClient {} diff --git a/core/mqueue/lib/src/mq/mq.dart b/core/mqueue/lib/src/mq/mq.dart index 10f4ca64..1f48e6b3 100644 --- a/core/mqueue/lib/src/mq/mq.dart +++ b/core/mqueue/lib/src/mq/mq.dart @@ -55,7 +55,7 @@ import 'package:angel3_mq/src/queue/queue.dart'; /// print('Received message: $message'); /// }); /// ``` -final class MQClient extends BaseMQClient implements MQClientInterface { +class MQClient extends BaseMQClient implements MQClientInterface { /// Private constructor to create the `MQClient` instance. MQClient._internal() { _exchanges.register('', DefaultExchange('')); @@ -137,6 +137,16 @@ final class MQClient extends BaseMQClient implements MQClientInterface { ) .toList(); + void deleteMessage(String queueId, Message message) { + try { + final queue = _fetchQueue(queueId); + queue.removeMessage(message); + } on QueueNotRegisteredException { + // Queue doesn't exist, so we can't delete the message + // We might want to log this or handle it in some way + } + } + @override void sendMessage({ required Message message, diff --git a/core/mqueue/lib/src/queue/queue.dart b/core/mqueue/lib/src/queue/queue.dart index 3b8fdb28..f03e00ae 100644 --- a/core/mqueue/lib/src/queue/queue.dart +++ b/core/mqueue/lib/src/queue/queue.dart @@ -1,3 +1,6 @@ +import 'dart:async'; + +import 'package:angel3_mq/mq.dart'; import 'package:angel3_mq/src/queue/data_stream.base.dart'; import 'package:equatable/equatable.dart'; @@ -23,14 +26,35 @@ import 'package:equatable/equatable.dart'; /// final hasListeners = myQueue.hasListeners(); /// ``` class Queue extends BaseDataStream with EquatableMixin { - /// Creates a new queue with the specified [id]. - /// - /// The [id] parameter is a unique identifier for the queue. Queue(this.id); - - /// The unique identifier for the queue. final String id; + final StreamController _controller = + StreamController.broadcast(); + Message? _latestMessage; + + void addMessage(Message message) { + _latestMessage = message; + _controller.add(message); + } + + Stream get dataStream => _controller.stream; + + Message? get latestMessage => _latestMessage; + + bool hasListeners() => _controller.hasListener; + + void dispose() { + _controller.close(); + } + + // New method to remove a message + void removeMessage(Message message) { + if (_latestMessage == message) { + _latestMessage = null; + } + // Note: We can't remove past messages from the stream, + // but we can prevent this message from being processed again in the future. + } - @override List get props => [id]; } diff --git a/core/mqueue/pubspec.yaml b/core/mqueue/pubspec.yaml index c78a4926..79853749 100644 --- a/core/mqueue/pubspec.yaml +++ b/core/mqueue/pubspec.yaml @@ -11,6 +11,7 @@ environment: dependencies: equatable: ^2.0.5 + uuid: ^4.5.1 dev_dependencies: lints: ^3.0.0