update: updating structure adding forked rxdart, event_bus_plus, dart_mq

This commit is contained in:
Patrick Stewart 2024-10-02 18:47:02 -07:00
parent 1bd7ecd19b
commit a9172c2d3a
319 changed files with 30982 additions and 0 deletions

1
core/eventbus Submodule

@ -0,0 +1 @@
Subproject commit a50543d9add747666cd7bfe131939a35de5b3859

View file

@ -0,0 +1,43 @@
name: build
on:
push:
branches:
- master
- main
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: 📚 Git Checkout
uses: actions/checkout@v4
- name: 🎯 Setup Dart
uses: dart-lang/setup-dart@v1
- name: 📦 Install Dependencies
run: dart pub get
- name: ✨ Check Formatting
run: dart format --line-length 80 --set-exit-if-changed .
- name: 🕵️ Analyze
run: dart analyze --fatal-infos --fatal-warnings .
- name: 🧪 Run Tests
run: |
dart pub global activate coverage 1.2.0
dart test --coverage=coverage && dart pub global run coverage:format_coverage --lcov --in=coverage --out=coverage/lcov.info
- name: 📊 Check Code Coverage
uses: VeryGoodOpenSource/very_good_coverage@v2
with:
path: ./coverage/lcov.info
min_coverage: 100
- name: 📈 Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

7
core/mqueue/.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

19
core/mqueue/CHANGELOG.md Normal file
View file

@ -0,0 +1,19 @@
## 1.0.0
- Initial version of the package.
## 1.0.1
- Fix documentation. (Image path)
- Update package description.
- Fix linter rules. (Type matching)
- Update `example` files.
- Remove `mocktail` dependency.
## 1.0.2
- Update documentation.
## 1.1.0
- `Deprecate` the `Consumer` mixin in favor of `ConsumerMixin`. ([#1](https://github.com/N-Razzouk/dart_mq/issues/1))

21
core/mqueue/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 Naif Razzouk
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

165
core/mqueue/README.md Normal file
View file

@ -0,0 +1,165 @@
# DartMQ: A Message Queue System for Dart and Flutter
<!-- TODO: fix pub version badge -->
[![Pub](https://img.shields.io/pub/v/dart_mq.svg)](https://pub.dev/packages/dart_mq)
[![coverage](https://codecov.io/gh/N-Razzouk/dart_mq/graph/badge.svg)](https://app.codecov.io/gh/N-Razzouk/dart_mq)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
DartMQ is a Dart package that provides message queue functionality for sending messages between different components in your Dart and Flutter applications. It offers a simple and efficient way to implement message queues, making it easier to build robust and scalable applications.
## Table of Contents
1. [Introduction](#introduction)
2. [Exchanges](#exchanges)
3. [Usage](#usage)
4. [Examples](#examples)
5. [Acknowledgment](#acknowledgment)
###
## Introduction
In the development of complex applications, dependencies among components are almost inevitable. Often, different components within your application need to communicate with each other, leading to tight coupling between these elements.
![Components](https://github.com/N-Razzouk/dart_mq/blob/master/assets/components.png?raw=true)
###
Message queues provide an effective means to decouple these components by enabling communication through messages. This decoupling strategy enhances the development of robust applications.
![Components with MQ](https://github.com/N-Razzouk/dart_mq/blob/master/assets/components-mq.png?raw=true)
###
DartMQ employs the publish-subscribe pattern. **Producers** send messages, **Consumers** receive them, and **Queues** and **Exchanges** facilitate this communication.
![Simple View](https://github.com/N-Razzouk/dart_mq/blob/master/assets/simple-view.png?raw=true)
###
Communication channels are called Exchanges. Exchanges receive messages from Producers, efficiently routing them to Queues for Consumer consumption.
![Detailed View](https://github.com/N-Razzouk/dart_mq/blob/master/assets/detailed-view.png?raw=true)
## Exchanges
### DartMQ provides different types of Exchanges for different use cases.
###
- **Default Exchange**: Routes messages based on Queue names.
![Default Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/default-exchange.png?raw=true)
###
- **Fanout Exchange**: Sends messages to all bound Queues.
![Fanout Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/fanout-exchange.png?raw=true)
###
- **Direct Exchange**: Routes messages to Queues based on routing keys.
![Direct Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/direct-exchange.png?raw=true)
## Usage
### Initialize an MQClient:
<!-- TODO: change import in code snippet. -->
```dart
import 'package:dart_mq/dart_mq.dart';
void main() {
// Initialize DartMQ
MQClient.initialize();
// Your application code here
}
```
### Declare a Queue:
```dart
MQClient.declareQueue('my_queue');
```
> Note: Queues are idempotent, which means that if you declare a Queue multiple times, it will not create multiple Queues. Instead, it will return the existing Queue.
### Create a Producer:
```dart
class MyProducer with ProducerMixin {
void greet(String message) {
// Send a message to the queue
sendMessage(
routingKey: 'my_queue',
payload: message,
);
}
}
```
> Note: `exchangeName` is optional. If you don't specify an exchange name, the message is sent to the default exchange.
### Create a Consumer:
```dart
class MyConsumer with ConsumerMixin {
void listenToQueue() {
// Subscribe to the queue and process incoming messages
subscribe(
queueId: 'my_queue',
callback: (message) {
// Handle incoming message
print('Received message: $message');
},
)
}
}
```
### Putting it all together:
```dart
void main() {
// Initialize DartMQ
MQClient.initialize();
// Declare a Queue
MQClient.declareQueue('my_queue');
// Create a Producer
final producer = MyProducer();
// Create a Consumer
final consumer = MyConsumer();
// Start listening
consumer.listenToQueue();
// Send a message
producer.greet('Hello World!');
// Your application code here
...
}
```
## Examples
- [Hello World](example/hello_world): A simple example that demonstrates how to send and receive messages using DartMQ.
- [Message Filtering](example/message_filtering): An example that demonstrates how to multiple consumers can listen to the same queue and filter messages accordingly.
- [Routing](example/routing): An example that demonstrates how to use Direct Exchanges to route messages to different queues based on the routing key.
- [RPC (Remote Procedure Call)](example/rpc): An example that demonstrates how to send RPC requests and receive responses using DartMQ.
## Acknowledgment
- [RabbitMQ](https://www.rabbitmq.com/): This package is inspired by RabbitMQ, an open-source message-broker software that implements the Advanced Message Queuing Protocol (AMQP).

View file

@ -0,0 +1,211 @@
# This file configures the static analysis results for your project (errors,
# warnings, and lints).
#
# This enables the 'recommended' set of lints from `package:lints`.
# This set helps identify many issues that may lead to problems when running
# or consuming Dart code, and enforces writing Dart using a single, idiomatic
# style and format.
#
# If you want a smaller set of lints you can change this to specify
# 'package:lints/core.yaml'. These are just the most critical lints
# (the recommended set includes the core lints).
# The core lints are also what is used by pub.dev for scoring packages.
include: package:lints/recommended.yaml
linter:
rules:
- prefer_const_constructors
- prefer_const_literals_to_create_immutables
- prefer_final_fields
- always_put_required_named_parameters_first
- avoid_init_to_null
- lines_longer_than_80_chars
- use_function_type_syntax_for_parameters
- avoid_relative_lib_imports
- avoid_shadowing_type_parameters
- avoid_equals_and_hash_code_on_mutable_classes
- unnecessary_brace_in_string_interps
- always_declare_return_types
- always_use_package_imports
- annotate_overrides
- avoid_bool_literals_in_conditional_expressions
- avoid_catching_errors
- avoid_double_and_int_checks
- avoid_dynamic_calls
- avoid_empty_else
- avoid_escaping_inner_quotes
- avoid_field_initializers_in_const_classes
- avoid_final_parameters
- avoid_function_literals_in_foreach_calls
- avoid_js_rounded_ints
- avoid_multiple_declarations_per_line
- avoid_null_checks_in_equality_operators
- avoid_positional_boolean_parameters
- avoid_print
- avoid_private_typedef_functions
- avoid_redundant_argument_values
- avoid_renaming_method_parameters
- avoid_return_types_on_setters
- avoid_returning_null_for_void
- avoid_returning_this
- avoid_setters_without_getters
- avoid_single_cascade_in_expression_statements
- avoid_slow_async_io
- avoid_type_to_string
- avoid_types_as_parameter_names
- avoid_unnecessary_containers
- avoid_unused_constructor_parameters
- avoid_void_async
- avoid_web_libraries_in_flutter
- await_only_futures
- camel_case_extensions
- camel_case_types
- cancel_subscriptions
- cascade_invocations
- cast_nullable_to_non_nullable
- collection_methods_unrelated_type
- combinators_ordering
- comment_references
- conditional_uri_does_not_exist
- constant_identifier_names
- control_flow_in_finally
- curly_braces_in_flow_control_structures
- dangling_library_doc_comments
- depend_on_referenced_packages
- deprecated_consistency
- directives_ordering
- empty_catches
- empty_constructor_bodies
- empty_statements
- eol_at_end_of_file
- exhaustive_cases
- file_names
- flutter_style_todos
- hash_and_equals
- implementation_imports
- implicit_call_tearoffs
- implicit_reopen
- invalid_case_patterns
- join_return_with_assignment
- leading_newlines_in_multiline_strings
- library_annotations
- library_names
- library_prefixes
- library_private_types_in_public_api
- literal_only_boolean_expressions
- missing_whitespace_between_adjacent_strings
- no_adjacent_strings_in_list
- no_default_cases
- no_duplicate_case_values
- no_leading_underscores_for_library_prefixes
- no_leading_underscores_for_local_identifiers
- no_logic_in_create_state
- no_runtimeType_toString
- non_constant_identifier_names
- noop_primitive_operations
- null_check_on_nullable_type_parameter
- null_closures
- omit_local_variable_types
- one_member_abstracts
- only_throw_errors
- overridden_fields
- package_api_docs
- package_names
- package_prefixed_library_names
- parameter_assignments
- prefer_adjacent_string_concatenation
- prefer_asserts_in_initializer_lists
- prefer_asserts_with_message
- prefer_collection_literals
- prefer_conditional_assignment
- prefer_const_constructors_in_immutables
- prefer_const_declarations
- prefer_constructors_over_static_methods
- prefer_contains
- prefer_final_in_for_each
- prefer_final_locals
- prefer_for_elements_to_map_fromIterable
- prefer_function_declarations_over_variables
- prefer_generic_function_type_aliases
- prefer_if_elements_to_conditional_expressions
- prefer_if_null_operators
- prefer_initializing_formals
- prefer_inlined_adds
- prefer_int_literals
- prefer_interpolation_to_compose_strings
- prefer_is_empty
- prefer_is_not_empty
- prefer_is_not_operator
- prefer_iterable_whereType
- prefer_null_aware_method_calls
- prefer_null_aware_operators
- prefer_single_quotes
- prefer_spread_collections
- prefer_typing_uninitialized_variables
- prefer_void_to_null
- provide_deprecation_message
- public_member_api_docs
- recursive_getters
- require_trailing_commas
- secure_pubspec_urls
- sized_box_for_whitespace
- sized_box_shrink_expand
- slash_for_doc_comments
- sort_child_properties_last
- sort_constructors_first
- sort_pub_dependencies
- sort_unnamed_constructors_first
- test_types_in_equals
- throw_in_finally
- tighten_type_of_initializing_formals
- type_annotate_public_apis
- type_init_formals
- unawaited_futures
- unnecessary_await_in_return
- unnecessary_breaks
- unnecessary_const
- unnecessary_constructor_name
- unnecessary_getters_setters
- unnecessary_lambdas
- unnecessary_late
- unnecessary_library_directive
- unnecessary_new
- unnecessary_null_aware_assignments
- unnecessary_null_checks
- unnecessary_null_in_if_null_operators
- unnecessary_nullable_for_final_variable_declarations
- unnecessary_overrides
- unnecessary_parenthesis
- unnecessary_raw_strings
- unnecessary_statements
- unnecessary_string_escapes
- unnecessary_string_interpolations
- unnecessary_this
- unnecessary_to_list_in_spreads
- unrelated_type_equality_checks
- use_build_context_synchronously
- use_colored_box
- use_enums
- use_full_hex_values_for_flutter_colors
- use_if_null_to_convert_nulls_to_bools
- use_is_even_rather_than_modulo
- use_key_in_widget_constructors
- use_late_for_private_fields_and_variables
- use_named_constants
- use_raw_strings
- use_rethrow_when_possible
- use_setters_to_change_properties
- use_string_buffers
- use_string_in_part_of_directives
- use_super_parameters
- use_test_throws_matchers
- use_to_and_as_if_applicable
- valid_regexps
- void_checks
# For more information about the core and recommended set of lints, see
# https://dart.dev/go/core-lints
# For additional information about configuring this file, see
# https://dart.dev/guides/language/analysis-options

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View file

@ -0,0 +1,16 @@
import 'package:angel3_mq/mq.dart';
import 'receiver.dart';
import 'sender.dart';
void main() async {
MQClient.initialize();
final sender = Sender();
final receiver = Receiver()..listenToGreeting();
await sender.sendGreeting(greeting: 'Hello, World!');
receiver.stopListening();
}

View file

@ -0,0 +1,27 @@
import 'package:angel3_mq/mq.dart';
import 'task_manager.dart';
import 'worker_one.dart';
import 'worker_two.dart';
void main() async {
MQClient.initialize();
final workerOne = WorkerOne();
final workerTwo = WorkerTwo();
final taskManager = TaskManager();
workerOne.startListening();
workerTwo.startListening();
taskManager
..sendTask(task: 'Hello..')
..sendTask(task: 'Hello...')
..sendTask(task: 'Hello....')
..sendTask(task: 'Hello.')
..sendTask(task: 'Hello.......')
..sendTask(task: 'Hello..');
}

View file

@ -0,0 +1,12 @@
import 'package:angel3_mq/mq.dart';
final class TaskManager with ProducerMixin {
TaskManager() {
MQClient.instance.declareQueue('task_queue');
}
void sendTask({required String task}) => sendMessage(
payload: task,
routingKey: 'task_queue',
);
}

View file

@ -0,0 +1,22 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class WorkerOne with ConsumerMixin {
WorkerOne() {
MQClient.instance.declareQueue('task_queue');
}
void startListening() => subscribe(
queueId: 'task_queue',
filter: (Object messagePayload) => messagePayload
.toString()
.split('')
.where((String char) => char == '.')
.length
.isEven,
callback: (Message message) {
log('WorkerOne reacting to ${message.payload}');
},
);
}

View file

@ -0,0 +1,24 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class WorkerTwo with ConsumerMixin {
WorkerTwo() {
MQClient.instance.declareQueue('task_queue');
}
void startListening() => subscribe(
queueId: 'task_queue',
filter: (Object messagePayload) =>
messagePayload
.toString()
.split('')
.where((String char) => char == '.')
.length %
2 !=
0,
callback: (Message message) {
log('WorkerTwo reacting to ${message.payload}');
},
);
}

View file

@ -0,0 +1,18 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class Receiver with ConsumerMixin {
Receiver() {
MQClient.instance.declareQueue('hello');
}
void listenToGreeting() => subscribe(
queueId: 'hello',
callback: (Message message) {
log('Received: ${message.payload}');
},
);
void stopListening() => unsubscribe(queueId: 'hello');
}

View file

@ -0,0 +1,39 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class DebugLogger with ConsumerMixin {
DebugLogger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('debug');
}
late final String _queueName;
void startListening() {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'info',
);
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'warning',
);
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'error',
);
subscribe(
queueId: _queueName,
callback: (Message message) {
log('Debug Logger recieved: ${message.payload}');
},
);
}
}

View file

@ -0,0 +1,21 @@
import 'package:angel3_mq/mq.dart';
final class Logger with ProducerMixin {
Logger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
}
Future<void> log({
required String level,
required String message,
}) async {
sendMessage(
payload: message,
exchangeName: 'logs',
routingKey: level,
);
}
}

View file

@ -0,0 +1,30 @@
import 'package:angel3_mq/mq.dart';
import 'debug_logger.dart';
import 'logger.dart';
import 'production_logger.dart';
void main() async {
MQClient.initialize();
DebugLogger().startListening();
ProductionLogger().startListening();
final logger = Logger();
await logger.log(
level: 'info',
message: 'This is an info message',
);
await logger.log(
level: 'warning',
message: 'This is a warning message',
);
await logger.log(
level: 'error',
message: 'This is an error message',
);
}

View file

@ -0,0 +1,29 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class ProductionLogger with ConsumerMixin {
ProductionLogger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('production');
}
late final String _queueName;
void startListening() {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'error',
);
subscribe(
queueId: _queueName,
callback: (Message message) {
log('Production Logger recieved: ${message.payload}');
},
);
}
}

View file

@ -0,0 +1,19 @@
import 'package:angel3_mq/mq.dart';
import 'service_one.dart';
import 'service_two.dart';
void main() {
MQClient.initialize();
MQClient.instance.declareExchange(
exchangeName: 'ServiceRPC',
exchangeType: ExchangeType.direct,
);
final serviceOne = ServiceOne();
ServiceTwo().startListening();
serviceOne.requestFoo();
}

View file

@ -0,0 +1,19 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
class ServiceOne with ProducerMixin {
Future<void> requestFoo() async {
final res = await sendRPCMessage<String>(
exchangeName: 'ServiceRPC',
routingKey: 'rpcBinding',
processId: 'foo',
args: {},
);
_handleFuture(res);
}
void _handleFuture(String data) {
log('Service One received: $data\n');
}
}

View file

@ -0,0 +1,46 @@
import 'dart:async';
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
class ServiceTwo with ConsumerMixin {
ServiceTwo() {
MQClient.instance.declareExchange(
exchangeName: 'ServiceRPC',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('two');
}
late final String _queueName;
Future<void> startListening() async {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'ServiceRPC',
bindingKey: 'rpcBinding',
);
subscribe(
queueId: _queueName,
callback: (Message message) async {
log('Service Two got message $message\n');
if (message.headers['type'] == 'RPC') {
switch (message.headers['processId']) {
case 'foo':
final data = await foo();
final Completer completer =
message.headers['completer'] ?? (throw Exception());
completer.complete(data);
default:
}
}
},
);
}
Future<String> foo() async {
// log('Service Two bar\n');
await Future.delayed(const Duration(seconds: 2));
return 'Hello, world!';
}
}

View file

@ -0,0 +1,12 @@
import 'package:angel3_mq/mq.dart';
final class Sender with ProducerMixin {
Sender() {
MQClient.instance.declareQueue('hello');
}
Future<void> sendGreeting({required String greeting}) async => sendMessage(
routingKey: 'hello',
payload: greeting,
);
}

11
core/mqueue/lib/mq.dart Normal file
View file

@ -0,0 +1,11 @@
/// Library definition.
library angel3_mq;
/// Export files.
export 'src/consumer/consumer.dart';
export 'src/consumer/consumer.mixin.dart';
export 'src/core/constants/enums.dart';
export 'src/message/message.dart';
export 'src/mq/mq.dart';
export 'src/producer/producer.dart';
export 'src/producer/producer.mixin.dart';

View file

@ -0,0 +1,75 @@
import 'package:angel3_mq/src/binding/binding.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing a binding between a topic and its associated queues.
///
/// The `Binding` class implements the [BindingInterface] interface and is
/// responsible for managing the association between a topic and its associated
/// queues. It allows the addition and removal of queues to the binding and the
/// publication of messages to all associated queues.
///
/// Example:
/// ```dart
/// final binding = Binding('my_binding');
/// final queue1 = Queue('queue_1');
/// final queue2 = Queue('queue_2');
///
/// // Add queues to the binding.
/// binding.addQueue(queue1);
/// binding.addQueue(queue2);
///
/// // Publish a message to all associated queues.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// binding.publishMessage(message);
///
/// // Check if the binding has associated queues.
/// final hasQueues = binding.hasQueues(); // Returns true
/// ```
final class Binding implements BindingInterface {
/// Creates a new binding with the specified [id].
///
/// The [id] parameter represents the unique identifier for the binding.
Binding(this.id);
/// The unique identifier for the binding.
final String id;
/// A list of associated queues.
final List<Queue> _queues = [];
@override
bool hasQueues() => _queues.isNotEmpty;
@override
void addQueue(Queue queue) => _queues.add(queue);
@override
void removeQueue(String queueId) => _queues.removeWhere(
(Queue queue) => queue.id == queueId && queue.hasListeners()
? throw QueueHasSubscribersException(queueId)
: queue.id == queueId,
);
@override
void publishMessage(Message message) {
for (final queue in _queues) {
queue.enqueue(message);
}
}
@override
void clear() {
for (final queue in _queues) {
if (queue.hasListeners()) {
throw QueueHasSubscribersException(queue.id);
}
}
_queues.clear();
}
}

View file

@ -0,0 +1,44 @@
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// An abstract interface class defining the contract for managing bindings.
///
/// The `BindingInterface` abstract interface class defines a contract for
/// classes that are responsible for managing bindings between topics and
/// queues. Implementing classes must provide functionality for adding and
/// removing queues from the binding, publishing messages to the associated
/// queues, and checking if the binding has queues.
///
/// Example:
/// ```dart
/// class MyBinding implements BindingInterface {
/// // Custom implementation of the binding interface methods.
/// }
/// ```
abstract interface class BindingInterface {
/// Checks if the binding has associated queues.
///
/// Returns `true` if the binding has one or more associated queues;
/// otherwise, `false`.
bool hasQueues();
/// Adds a queue to the binding.
///
/// The [queue] parameter represents the queue to be associated with the
/// binding.
void addQueue(Queue queue);
/// Removes a queue from the binding based on its ID.
///
/// The [queueId] parameter represents the ID of the queue to be removed.
void removeQueue(String queueId);
/// Publishes a message to all associated queues in the binding.
///
/// The [message] parameter represents the message to be published to the
/// queues.
void publishMessage(Message message);
/// Removes all queues from the binding.
void clear();
}

View file

@ -0,0 +1,95 @@
import 'dart:async';
import 'package:angel3_mq/src/consumer/consumer.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
/// A mixin implementing the `ConsumerInterface` for message consumption.
///
/// The `Consumer` mixin provides a concrete implementation of the
/// `ConsumerInterface`for message consumption. It allows classes to easily
/// consume messages from specific queues by subscribing to them, handling
/// received messages, and managing subscriptions.
///
/// Example:
/// ```dart
/// class MyMessageConsumer with Consumer {
/// // Custom implementation of the message consumer.
/// }
/// ```
@Deprecated('Please use `ConsumerMixin` instead. '
'This will be removed in v2.0.0')
mixin Consumer implements ConsumerInterface {
/// A registry of active message subscriptions.
final Registrar<StreamSubscription<Message>> _subscriptions =
Registrar<StreamSubscription<Message>>();
@override
Message? getLatestMessage(String queueId) =>
MQClient.instance.getLatestMessage(queueId);
@override
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
try {
final messageStream = MQClient.instance.fetchQueue(queueId);
final sub = filter != null
? messageStream.listen((Message message) {
if (filter(message.payload)) {
callback(message);
}
})
: messageStream.listen(callback);
_subscriptions.register(queueId, sub);
} on IdAlreadyRegisteredException catch (_) {
throw ConsumerAlreadySubscribedException(
consumer: runtimeType.toString(),
queue: queueId,
);
}
}
@override
void unsubscribe({required String queueId}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
}
@override
void pauseSubscription(String queueId) => _subscriptions.get(queueId).pause();
@override
void resumeSubscription(String queueId) =>
_subscriptions.get(queueId).resume();
@override
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
subscribe(
queueId: queueId,
callback: callback,
filter: filter,
);
}
@override
void clearSubscriptions() {
for (final StreamSubscription sub in _subscriptions.getAll()) {
sub.cancel();
}
_subscriptions.clear();
}
}

View file

@ -0,0 +1,74 @@
import 'package:angel3_mq/src/message/message.dart';
/// An abstract interface class defining the contract for a message consumer.
///
/// The `ConsumerInterface` abstract interface class defines a contract for
/// classes that implement a message consumer. Implementing classes must
/// provide methods for subscribing and unsubscribing from queues, pausing and
/// resuming subscriptions, updating subscriptions, retrieving the
/// latest message from a queue, and clearing all subscriptions.
///
/// Example:
/// ```dart
/// class MyConsumer implements ConsumerInterface {
/// // Custom implementation of the message consumer.
/// }
/// ```
abstract interface class ConsumerInterface {
/// Subscribes to a queue to receive messages.
///
/// The [queueId] parameter represents the ID of the queue to subscribe to.
/// The [callback] parameter is a function that will be invoked for each
/// received message.
/// The [filter] parameter is an optional function that can be used to filter
/// messages based on custom criteria.
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
});
/// Unsubscribes from a previously subscribed queue.
///
/// The [queueId] parameter represents the ID of the queue to unsubscribe
/// from.
void unsubscribe({required String queueId});
/// Pauses message subscription for a specified queue.
///
/// The [queueId] parameter represents the ID of the queue to pause the
/// subscription.
void pauseSubscription(String queueId);
/// Resumes a paused subscription for a specified queue.
///
/// The [queueId] parameter represents the ID of the queue to resume the
/// subscription.
void resumeSubscription(String queueId);
/// Updates an existing subscription with a new callback and/or filter.
///
/// The [queueId] parameter represents the ID of the queue to update the
/// subscription.
/// The [callback] parameter is a new function that will be invoked for each
/// received message.
/// The [filter] parameter is an optional new filter function for message
/// filtering.
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
});
/// Retrieves the latest message from a queue.
///
/// The [queueId] parameter represents the ID of the queue to fetch the latest
/// message from.
///
/// Returns the latest message from the specified queue or `null` if the queue
/// is empty.
Message? getLatestMessage(String queueId);
/// Clears all active subscriptions, unsubscribing from all queues.
void clearSubscriptions();
}

View file

@ -0,0 +1,92 @@
import 'dart:async';
import 'package:angel3_mq/src/consumer/consumer.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
/// A mixin implementing the `ConsumerInterface` for message consumption.
///
/// The `ConsumerMixin` mixin provides a concrete implementation of the
/// `ConsumerInterface`for message consumption. It allows classes to easily
/// consume messages from specific queues by subscribing to them, handling
/// received messages, and managing subscriptions.
///
/// Example:
/// ```dart
/// class MyMessageConsumer with ConsumerMixin {
/// // Custom implementation of the message consumer.
/// }
/// ```
mixin ConsumerMixin implements ConsumerInterface {
/// A registry of active message subscriptions.
final Registrar<StreamSubscription<Message>> _subscriptions =
Registrar<StreamSubscription<Message>>();
@override
Message? getLatestMessage(String queueId) =>
MQClient.instance.getLatestMessage(queueId);
@override
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
try {
final messageStream = MQClient.instance.fetchQueue(queueId);
final sub = filter != null
? messageStream.listen((Message message) {
if (filter(message.payload)) {
callback(message);
}
})
: messageStream.listen(callback);
_subscriptions.register(queueId, sub);
} on IdAlreadyRegisteredException catch (_) {
throw ConsumerAlreadySubscribedException(
consumer: runtimeType.toString(),
queue: queueId,
);
}
}
@override
void unsubscribe({required String queueId}) => _subscriptions
..get(queueId).cancel()
..unregister(queueId);
@override
void pauseSubscription(String queueId) => _subscriptions.get(queueId).pause();
@override
void resumeSubscription(String queueId) =>
_subscriptions.get(queueId).resume();
@override
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
subscribe(
queueId: queueId,
callback: callback,
filter: filter,
);
}
@override
void clearSubscriptions() {
for (final StreamSubscription sub in _subscriptions.getAll()) {
sub.cancel();
}
_subscriptions.clear();
}
}

View file

@ -0,0 +1,22 @@
/// An enumeration representing different types of message exchanges.
///
/// The [ExchangeType] enum defines various types of message exchanges that are
/// commonly used in messaging systems. Each type represents a specific behavior
/// for distributing messages to multiple queues or consumers.
///
/// - `direct`: A direct exchange routes messages to queues based on a specified
/// routing key.
/// - `base`: The default exchange (unnamed) routes messages to queues using
/// their names.
/// - `fanout`: A fanout exchange routes messages to all connected queues,
/// ignoring routing keys.
enum ExchangeType {
/// Represents a direct message exchange.
direct,
/// Represents the default exchange (unnamed).
base,
/// Represents a fanout message exchange.
fanout,
}

View file

@ -0,0 +1,99 @@
/// A utility class providing exception-related error messages.
///
/// The `ExceptionStrings` class defines static methods that generate error
/// messages for various exception scenarios. These messages can be used to
/// provide descriptive error information in exception handling and debugging.
class ExceptionStrings {
/// Generates an error message when MQClient is not initialized.
///
/// This message is used when attempting to use the MQClient before it has
/// been properly initialized using the `MQClient.initialize()` method.
static String mqClientNotInitialized() =>
'MQClient is not initialized. Please make sure to call '
'MQClient.initialize() first.';
/// Generates an error message for a Queue that is not registered.
///
/// The [queueId] parameter represents the name of the unregistered queue.
static String queueNotRegistered(String queueId) =>
'Queue: $queueId is not registered.';
/// Generates an error message for a queue with active subscribers.
///
/// The [queueId] parameter represents the ID of the queue with active
/// subscribers.
static String queueHasSubscribers(String queueId) =>
'Queue: $queueId has subscribers.';
/// Generates an error message for a queue with no name.
///
/// This message is used when the name of the queue is not provided and is
/// null.
static String queueIdNull() => "Queue name can't be null.";
/// Generates an error message for a required routing key.
///
/// This message is used when a routing key is required for a specific
/// operation but is not provided.
static String routingKeyRequired() => 'Routing key is required.';
/// Generates an error message for a non-existent binding key.
///
/// The [bindingKey] parameter represents the non-existent binding key.
static String bindingKeyNotFound(String bindingKey) =>
'The binding key "$bindingKey" was not found.';
/// Generates an error message for a missing binding key.
///
/// This message is used when a binding operation expects a binding key to
static String bindingKeyRequired() => 'Binding key is required.';
/// Generates an error message for an exchange that is not registered.
///
/// The [exchangeName] parameter represents the name of the unregistered
/// exchange.
static String exchangeNotRegistered(String exchangeName) =>
'Exchange: $exchangeName is not registered.';
/// Generates an error message for invalid exchange type.
static String invalidExchangeType() => 'Exchange type is invalid.';
/// Generates an error message for a consumer that is not subscribed to a
/// queue.
///
/// The [consumerId] parameter represents the ID of the consumer.
/// The [queue] parameter represents the name of the queue.
static String consumerNotSubscribed(String consumerId, String queue) =>
'The consumer "$consumerId" is not subscribed to the queue "$queue".';
/// Generates an error message for a consumer that is already subscribed to
/// a queue.
///
/// The [consumerId] parameter represents the ID of the consumer.
/// The [queue] parameter represents the name of the queue.
static String consumerAlreadySubscribed(String consumerId, String queue) =>
'The consumer "$consumerId" is already subscribed to the queue "$queue".';
/// Generates an error message for a consumer that is not registered.
///
/// The [consumerId] parameter represents the ID of the consumer.
static String consumerNotRegistered(String consumerId) =>
'The consumer "$consumerId" is not registered.';
/// Generates an error message for a consumer that has active subscriptions.
///
/// The [consumerId] parameter represents the ID of the consumer.
static String consumerHasSubscriptions(String consumerId) =>
'The consumer "$consumerId" has active subscriptions.';
/// Generates an error message for an ID that is already registered.
///
/// The [id] parameter represents the ID that is already registered.
static String idAlreadyRegistered(String id) =>
'Id "$id" already registered.';
/// Generates an error message for an ID that is not registered.
///
/// The [id] parameter represents the ID that is not registered.
static String idNotRegistered(String id) => 'Id "$id" not registered.';
}

Some files were not shown because too many files have changed in this diff Show more