Clean: cleaning up workspace from testing

This commit is contained in:
Patrick Stewart 2024-10-11 18:52:19 -07:00
parent 562bb5a433
commit 28e355510b
283 changed files with 5 additions and 32481 deletions

View file

@ -1,16 +1,15 @@
@Codebase
The name of our company is Protevus
The name of our project is Platform
Protevus = Laravel and Platform = Illuminate
Our Development Langauge is Dart
Our cross-platform UI Kit is Flutter
Our Implementation is based on Angel3
Our Inspiration is based on Laravel
Our Deployment Target are Windows, MacOS, Linux, iOS, Android, Web, IoT and Edge Devices
Protevus Aims to bring a complete Laravel experience to Dart, and Flutter with a truely Unified Full Stack Experience. We aim to mature the platform to support cross-platform development of complex government, military, and enterprise applications.
Protevus Aims to bring a complete Laravel experience to Dart, and Flutter with a truely Unified Full Stack Experience. We aim to mature the platform to support cross-platform development of complex government, military, financial, medical, supply-chain, enterprise and idustrial applications.
Our 10 Step Development Lifecycle is as follows: We use a hybrid mix of Interface Driven Development, Test Driven Development and AI Software Engineer Agents to assist in generating efficient and reliable code while also providing help with other task like code reviews, documentation, and testing. We call this approach IDDA or Integrated Development Design and AI.
Our 10 Step Development Lifecycle is as follows: We use a hybrid mix of Interface Driven Development, Test Driven Development and AI Software Engineer Agents to assist in generating efficient and reliable code while also providing help with other task like code reviews, documentation, and testing. We call this approach IDD-AI or Individual Driven Developemt AI.
1. Research - Research the requirements and specifications of the project.
2. Identify - Identify the key components and technologies that are needed to build the project. (use opensource)
@ -23,7 +22,7 @@ Our 10 Step Development Lifecycle is as follows: We use a hybrid mix of Interfa
9. Review - Review the code to ensure it meets the requirements and specifications.
10. Release - Release the code to the public.
Or RITIGITIRR pronounced Rite-Ghee-Tee-Ree
Or RITIGITIRR pronounced Rih-tih-gih-tirr
The we wash rinse and repeat for each project. When we are done we have a fully functional and robust implementation of the requirements and specifications we will still follow this pattern for enhancements, bug fixes, and new features.
@ -220,10 +219,6 @@ Here's the content formatted in Markdown:
- Always consider the implications of Dart's strong typing when implementing dynamic features from Laravel.
- Look for opportunities to leverage Dart's unique features, such as isolates for concurrency, where appropriate.
This is the project roadmap for the Dart version of Laravel, which aims to bring the power and flexibility of Laravel to the Dart ecosystem. The roadmap is divided into several phases, each focusing on implementing specific Laravel features in Dart. Please use it to your advantage and contribute to the project's success.
Certainly! I'll combine everything into a comprehensive roadmap, including descriptions for each package in Markdown format. This will serve as a complete guide for the AI engineer.
The Project Road Map is in @Roadmap
The Project Road Map is in @Roadmap Book
You now have access to all the information that all team members need to start working on the project.

View file

@ -1,54 +0,0 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: Dart
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
# Note: This workflow uses the latest stable version of the Dart SDK.
# You can specify other versions if desired, see documentation here:
# https://github.com/dart-lang/setup-dart/blob/main/README.md
# - uses: dart-lang/setup-dart@v1
- uses: dart-lang/setup-dart@9a04e6d73cca37bd455e0608d7e5092f881fd603
- name: Flutter action
# You may pin to the exact commit or the version.
# uses: subosito/flutter-action@4389e6cbc6cb8a4b18c628ff96ff90be0e926aa8
uses: subosito/flutter-action@v1.5.3
- name: Install dependencies
run: dart pub get
# Uncomment this step to verify the use of 'dart format' on each commit.
# - name: Verify formatting
# run: dart format --output=none --set-exit-if-changed .
# Consider passing '--fatal-infos' for slightly stricter analysis.
- name: Analyze project source
run: dart analyze
# Your project will need to have tests in test/ and a dependency on
# package:test for this step to succeed. Note that Flutter projects will
# want to change this to 'flutter test'.
- name: Run tests
run: flutter test --coverage
- name: Install lcov
run: sudo apt-get install -y lcov
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
file: coverage/lcov.info

View file

@ -1,30 +0,0 @@
# Miscellaneous
*.class
*.log
*.pyc
*.swp
.DS_Store
.atom/
.buildlog/
.history
.svn/
# IntelliJ related
*.iml
*.ipr
*.iws
.idea/
# The .vscode folder contains launch configuration and tasks you configure in
# VS Code which you may wish to be included in version control, so this line
# is commented out by default.
#.vscode/
# Flutter/Dart/Pub related
# Libraries should not include pubspec.lock, per https://dart.dev/guides/libraries/private-files#pubspeclock.
/pubspec.lock
**/doc/api/
.dart_tool/
.packages
build/
coverage

View file

@ -1,10 +0,0 @@
# This file tracks properties of this Flutter project.
# Used by Flutter tool to assess capabilities and perform upgrades etc.
#
# This file should be version controlled and should not be manually edited.
version:
revision: 7e9793dee1b85a243edd0e06cb1658e98b077561
channel: stable
project_type: package

View file

@ -1,67 +0,0 @@
## 0.6.2
* update `logger` dependency
## 0.6.1
* [Add] `allowLogging` to control logging (thanks [@hatemragab](https://github.com/hatemragab))
## 0.6.0
* **[Change]** fire `EmptyEvent` after each event to keep stream empty.
* [Add] [Logger](https://pub.dev/packages/logger)
## 0.5.0
* [Add] timestamp for each `event`.
## 0.4.0
* [Add] mapper
## 0.3.0+3
* Downgrade clock version
## 0.3.0+2
* Improve documentation
## 0.3.0+1
* Improve documentation
## 0.3.0
* [Add] `EventCompletionEvent`
* Improve documentation
## 0.2.2
* [Fix] `clock` dependency
## 0.2.1
* [Add] debug logging
## 0.2.0
* [Add] history
## 0.1.0
* [Add] distinct
## 0.0.2+1
* [Fix] GitHub repository link
## 0.0.2
* [Remove] `id` of `AppEvent`
## 0.0.1
* Initial release.
* [Add] `EventBus`
* [Add] `AppEvent` superclass

View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2021 Andrew
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,98 +0,0 @@
# EventBus: Events for Dart/Flutter
[![pub package](https://img.shields.io/pub/v/event_bus_plus.svg?label=event_bus_plus&color=blue)](https://pub.dev/packages/event_bus_plus)
[![codecov](https://codecov.io/gh/AndrewPiterov/event_bus_plus/branch/main/graph/badge.svg?token=VM9LTJXGQS)](https://codecov.io/gh/AndrewPiterov/event_bus_plus)
[![Dart](https://github.com/AndrewPiterov/event_bus_plus/actions/workflows/dart.yml/badge.svg)](https://github.com/AndrewPiterov/event_bus_plus/actions/workflows/dart.yml)
**EventBus** is an open-source library for Dart and Flutter using the **publisher/subscriber** pattern for loose coupling. **EventBus** enables central communication to decoupled classes with just a few lines of code simplifying the code, removing dependencies, and speeding up app development.
<img src="https://raw.githubusercontent.com/andrewpiterov/event_bus_plus/main/doc/pub_sub.webp" alt="event bus publish subscribe" style="width: 100%; height: auto; "/>
## Your benefits using EventBus: It…
- simplifies the communication between components;
- decouples event senders and receivers;
- performs well with UI artifacts (e.g. Widgets, Controllers);
- avoids complex and error-prone dependencies and life cycle issues.
<img src="https://raw.githubusercontent.com/andrewpiterov/event_bus_plus/main/doc/video_presentation.gif" alt="event bus plus" style="width: 100%; height: auto; "/>
### Define the app's events
```dart
// Initialize the Service Bus
IAppEventBus eventBus = AppEventBus();
// Define your app events
final event = FollowEvent('@devcraft.ninja');
final event = CommentEvent('Awesome package 😎');
```
### Subscribe
```dart
// listen the latest event
final sub = eventBus.last$
.listen((AppEvent event) { /*do something*/ });
// Listen particular event
final sub2 = eventBus.on<FollowAppEvent>()
.listen((e) { /*do something*/ });
```
### Publish
```dart
// fire the event
eventBus.fire(event);
```
### Watch events in progress
```dart
// start watch the event till its completion
eventBus.watch(event);
// and check the progress
eventBus.isInProgress<FollowAppEvent>();
// or listen stream to check the processing
eventBus.inProgress$.map((List<AppEvent> events) =>
events.whereType<FollowAppEvent>().isNotEmpty);
// complete
_eventBus.complete(event);
// or complete with completion event
_eventBus.complete(event, nextEvent: SomeAnotherEvent);
```
## History
```dart
final events = eventBus.history;
```
## Mapping
```dart
final eventBus = bus = EventBus(
map: {
SomeEvent: [
(e) => SomeAnotherEvent(),
],
},
);
```
## Contributing
We accept the following contributions:
* Improving the documentation
* [Reporting issues](https://github.com/AndrewPiterov/event_bus_plus/issues/new)
* Fixing bugs
## Maintainers
* [Andrew Piterov](mailto:contact@andrewpiterov.com?subject=[GitHub]%20Source%20Dart%event_bus_plus)

View file

@ -1,224 +0,0 @@
include: package:flutter_lints/flutter.yaml
analyzer:
strong-mode:
implicit-casts: false
implicit-dynamic: true
errors:
# Close instances of dart.core.Sink.
close_sinks: error
# Cancel instances of dart.async.StreamSubscription.
cancel_subscriptions: error
# treat missing required parameters as a error (not a hint)
missing_required_param: error
# treat missing returns as a error (not a hint)
missing_return: error
# allow having TODOs in the code and treat as warning
todo: warning
# allow self-reference to deprecated members (we do this because otherwise we have
# to annotate every member in every test, assert, etc, when we deprecate something)
deprecated_member_use_from_same_package: ignore
# Ignore analyzer hints for updating pubspecs when using Future or
# Stream and not importing dart:async
# Please see https://github.com/flutter/flutter/pull/24528 for details.
sdk_version_async_exported_from_core: ignore
# await
unawaited_futures: error
avoid_print: warning
always_declare_return_types: error
unrelated_type_equality_checks: error
implementation_imports: error
require_trailing_commas: warning
avoid_slow_async_io: error
use_build_context_synchronously: error
sized_box_for_whitespace: error
dead_code: warning
invalid_assignment: error
exclude:
- "bin/cache/**"
# the following two are relative to the stocks example and the flutter package respectively
# see https://github.com/dart-lang/sdk/issues/28463
- "lib/i18n/messages_*.dart"
- "lib/src/http/**"
- "build/**"
- "lib/**.freezed.dart"
- "lib/**.g.dart"
- "test/**"
- "example/**"
linter:
rules:
# these rules are documented on and in the same order as
# the Dart Lint rules page to make maintenance easier
# https://github.com/dart-lang/linter/blob/master/example/all.yaml
- always_declare_return_types
- always_put_control_body_on_new_line
# - always_put_required_named_parameters_first # we prefer having parameters in the same order as fields https://github.com/flutter/flutter/issues/10219
- always_require_non_null_named_parameters
# - always_specify_types
- annotate_overrides
# - avoid_annotating_with_dynamic # conflicts with always_specify_types
# - avoid_as # required for implicit-casts: true
- avoid_bool_literals_in_conditional_expressions
# - avoid_catches_without_on_clauses # we do this commonly
# - avoid_catching_errors # we do this commonly
- avoid_classes_with_only_static_members
# - avoid_double_and_int_checks # only useful when targeting JS runtime
- avoid_empty_else
- avoid_equals_and_hash_code_on_mutable_classes
- avoid_field_initializers_in_const_classes
- avoid_function_literals_in_foreach_calls
# - avoid_implementing_value_types # not yet tested
- avoid_init_to_null
# - avoid_js_rounded_ints # only useful when targeting JS runtime
- avoid_null_checks_in_equality_operators
# - avoid_positional_boolean_parameters # not yet tested
- avoid_print # not yet tested
# - avoid_private_typedef_functions # we prefer having typedef (discussion in https://github.com/flutter/flutter/pull/16356)
# - avoid_redundant_argument_values # not yet tested
- avoid_relative_lib_imports
- avoid_renaming_method_parameters
- avoid_return_types_on_setters
# - avoid_returning_null # there are plenty of valid reasons to return null
# - avoid_returning_null_for_future # not yet tested
- avoid_returning_null_for_void
# - avoid_returning_this # there are plenty of valid reasons to return this
# - avoid_setters_without_getters # not yet tested
- avoid_shadowing_type_parameters # not yet tested
- avoid_single_cascade_in_expression_statements
- avoid_slow_async_io
- avoid_types_as_parameter_names
# - avoid_types_on_closure_parameters # conflicts with always_specify_types
# - avoid_unnecessary_containers # not yet tested
- avoid_unused_constructor_parameters
- avoid_void_async
# - avoid_web_libraries_in_flutter # not yet tested
- await_only_futures
- camel_case_extensions
- camel_case_types
- cancel_subscriptions
# - cascade_invocations # not yet tested
# - close_sinks # not reliable enough
# - comment_references # blocked on https://github.com/flutter/flutter/issues/20765
# - constant_identifier_names # needs an opt-out https://github.com/dart-lang/linter/issues/204
- control_flow_in_finally
# - curly_braces_in_flow_control_structures # not yet tested
# - diagnostic_describe_all_properties # not yet tested
- directives_ordering
- empty_catches
- empty_constructor_bodies
- empty_statements
# - file_names # not yet tested
- flutter_style_todos
- hash_and_equals
- implementation_imports
# - invariant_booleans # too many false positives: https://github.com/dart-lang/linter/issues/811
- iterable_contains_unrelated_type
# - join_return_with_assignment # not yet tested
- library_names
- library_prefixes
# - lines_longer_than_80_chars # not yet tested
- list_remove_unrelated_type
# - literal_only_boolean_expressions # too many false positives: https://github.com/dart-lang/sdk/issues/34181
# - missing_whitespace_between_adjacent_strings # not yet tested
- no_adjacent_strings_in_list
- no_duplicate_case_values
# - no_logic_in_create_state # not yet tested
# - no_runtimeType_toString # not yet tested
- non_constant_identifier_names
# - null_closures # not yet tested
- omit_local_variable_types # opposite of always_specify_types
# - one_member_abstracts # too many false positives
# - only_throw_errors # https://github.com/flutter/flutter/issues/5792
- overridden_fields
- package_api_docs
- package_names
- package_prefixed_library_names
# - parameter_assignments # we do this commonly
- prefer_adjacent_string_concatenation
- prefer_asserts_in_initializer_lists
# - prefer_asserts_with_message # not yet tested
- prefer_collection_literals
- prefer_conditional_assignment
- prefer_const_constructors
- prefer_const_constructors_in_immutables
- prefer_const_declarations
- prefer_const_literals_to_create_immutables
# - prefer_constructors_over_static_methods # not yet tested
- prefer_contains
# - prefer_double_quotes # opposite of prefer_single_quotes
- prefer_equal_for_default_values
# - prefer_expression_function_bodies # conflicts with https://github.com/flutter/flutter/wiki/Style-guide-for-Flutter-repo#consider-using--for-short-functions-and-methods
- prefer_final_fields
- prefer_final_in_for_each
- prefer_final_locals
- prefer_for_elements_to_map_fromIterable
- prefer_foreach
# - prefer_function_declarations_over_variables # not yet tested
- prefer_generic_function_type_aliases
- prefer_if_elements_to_conditional_expressions
- prefer_if_null_operators
- prefer_initializing_formals
- prefer_inlined_adds
# - prefer_int_literals # not yet tested
# - prefer_interpolation_to_compose_strings # not yet tested
- prefer_is_empty
- prefer_is_not_empty
- prefer_is_not_operator
- prefer_iterable_whereType
# - prefer_mixin # https://github.com/dart-lang/language/issues/32
# - prefer_null_aware_operators # disable until NNBD, see https://github.com/flutter/flutter/pull/32711#issuecomment-492930932
# - prefer_relative_imports # not yet tested
- prefer_single_quotes
- prefer_spread_collections
- prefer_typing_uninitialized_variables
- prefer_void_to_null
# - provide_deprecation_message # not yet tested
- public_member_api_docs # enabled on a case-by-case basis; see e.g. packages/analysis_options.yaml
- recursive_getters
- slash_for_doc_comments
# - sort_child_properties_last # not yet tested
- sort_constructors_first
- sort_pub_dependencies
- sort_unnamed_constructors_first
- test_types_in_equals
- throw_in_finally
# - type_annotate_public_apis # subset of always_specify_types
- type_init_formals
- unawaited_futures # too many false positives
- unnecessary_await_in_return # not yet tested
- unnecessary_brace_in_string_interps
- unnecessary_const
# - unnecessary_final # conflicts with prefer_final_locals
- unnecessary_getters_setters
# - unnecessary_lambdas # has false positives: https://github.com/dart-lang/linter/issues/498
- unnecessary_new
- unnecessary_null_aware_assignments
- unnecessary_null_in_if_null_operators
- unnecessary_overrides
- unnecessary_parenthesis
- unnecessary_statements
- unnecessary_string_interpolations
- unnecessary_this
- unrelated_type_equality_checks
# - unsafe_html # not yet tested
- use_full_hex_values_for_flutter_colors
# - use_function_type_syntax_for_parameters # not yet tested
# - use_key_in_widget_constructors # not yet tested
- use_late_for_private_fields_and_variables
- use_rethrow_when_possible
# - use_setters_to_change_properties # not yet tested
# - use_string_buffers # has false positives: https://github.com/dart-lang/sdk/issues/34182
# - use_to_and_as_if_applicable # has false positives, so we prefer to catch this by code-review
- valid_regexps
- void_checks
dart_code_metrics:
rules:
- newline-before-return:
severity: style
- no-boolean-literal-compare:
severity: error
anti-patterns:
- long-method:
severity: warning

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.8 MiB

View file

@ -1,3 +0,0 @@
library event_bus;
export 'res/res.dart';

View file

@ -1,29 +0,0 @@
import 'package:clock/clock.dart';
import 'package:equatable/equatable.dart';
/// The base class for all events
abstract class AppEvent extends Equatable {
/// Create the event
const AppEvent();
/// The event time
DateTime get timestamp => clock.now();
}
/// The event completion event
class EventCompletionEvent extends AppEvent {
/// Create the event
const EventCompletionEvent(this.event);
/// The event that is completed
final AppEvent event;
@override
List<Object> get props => [event];
}
/// The empty event
class EmptyEvent extends AppEvent {
@override
List<Object?> get props => [];
}

View file

@ -1,212 +0,0 @@
import 'package:angel3_reactivex/subjects.dart';
import 'package:logger/logger.dart';
import 'app_event.dart';
import 'history_entry.dart';
import 'subscription.dart';
/// The event bus interface
abstract class IEventBus {
/// Whether the event bus is busy
bool get isBusy;
/// Whether the event bus is busy
Stream<bool> get isBusy$;
/// The last event
AppEvent? get last;
/// The last event
Stream<AppEvent?> get last$;
/// The list of events that are in progress
Stream<List<AppEvent>> get inProgress$;
/// Subscribe `EventBus` on a specific type of event, and register responder to it.
Stream<T> on<T extends AppEvent>();
/// Subscribe `EventBus` on a specific type of event, and register responder to it.
Stream<bool> whileInProgress<T extends AppEvent>();
/// Subscribe `EventBus` on a specific type of event, and register responder to it.
Subscription respond<T>(Responder<T> responder);
/// The history of events
List<EventBusHistoryEntry> get history;
/// Fire a event
void fire(AppEvent event);
/// Fire a event and wait for it to be completed
void watch(AppEvent event);
/// Complete a event
void complete(AppEvent event, {AppEvent? nextEvent});
///
bool isInProgress<T>();
/// Reset the event bus
void reset();
/// Dispose the event bus
void dispose();
/// Clear the history
void clearHistory();
}
/// The event bus implementation
class EventBus implements IEventBus {
/// Create the event bus
EventBus({
this.maxHistoryLength = 100,
this.map = const {},
this.allowLogging = false,
});
final _logger = Logger();
/// The maximum length of history
final int maxHistoryLength;
/// allow to log all events this when you call [fire]
/// the event will be in console log
final bool allowLogging;
/// The map of events
final Map<Type, List<AppEvent Function(AppEvent event)>> map;
@override
bool get isBusy => _inProgress.value.isNotEmpty;
@override
Stream<bool> get isBusy$ => _inProgress.map((event) => event.isNotEmpty);
final _lastEventSubject = BehaviorSubject<AppEvent>();
@override
AppEvent? get last => _lastEventSubject.valueOrNull;
@override
Stream<AppEvent?> get last$ => _lastEventSubject.distinct();
final _inProgress = BehaviorSubject<List<AppEvent>>.seeded([]);
List<AppEvent> get _isInProgressEvents => _inProgress.value;
@override
Stream<List<AppEvent>> get inProgress$ => _inProgress;
@override
List<EventBusHistoryEntry> get history => List.unmodifiable(_history);
final List<EventBusHistoryEntry> _history = [];
@override
void fire(AppEvent event) {
if (_history.length >= maxHistoryLength) {
_history.removeAt(0);
}
_history.add(EventBusHistoryEntry(event, event.timestamp));
// 1. Fire the event
_lastEventSubject.add(event);
// 2. Map if needed
_map(event);
// 3. Reset stream
_lastEventSubject.add(EmptyEvent());
if (allowLogging) {
_logger.d(' ⚡️ [${event.timestamp}] $event');
}
}
@override
void watch(AppEvent event) {
fire(event);
_inProgress.add([
..._isInProgressEvents,
event,
]);
}
@override
void complete(AppEvent event, {AppEvent? nextEvent}) {
// complete the event
if (_isInProgressEvents.any((e) => e == event)) {
final newArr = _isInProgressEvents.toList()
..removeWhere((e) => e == event);
_inProgress.add(newArr);
fire(EventCompletionEvent(event));
}
// fire next event if any
if (nextEvent != null) {
fire(nextEvent);
}
}
@override
bool isInProgress<T>() {
return _isInProgressEvents.whereType<T>().isNotEmpty;
}
@override
Stream<T> on<T extends AppEvent>() {
if (T == dynamic) {
return _lastEventSubject.stream as Stream<T>;
} else {
return _lastEventSubject.stream.where((event) => event is T).cast<T>();
}
}
/// Subscribe `EventBus` on a specific type of event, and register responder to it.
///
/// When [T] is not given or given as `dynamic`, it listens to all events regardless of the type.
/// Returns [Subscription], which can be disposed to cancel all the subscription registered to itself.
@override
Subscription respond<T>(Responder<T> responder) =>
Subscription(_lastEventSubject).respond<T>(responder);
@override
Stream<bool> whileInProgress<T extends AppEvent>() {
return _inProgress.map((events) {
return events.whereType<T>().isNotEmpty;
});
}
void _map(AppEvent? event) {
if (event == null) {
return;
}
final functions = map[event.runtimeType] ?? [];
if (functions.isEmpty) {
return;
}
for (final func in functions) {
final newEvent = func(event);
if (newEvent.runtimeType == event.runtimeType) {
if (allowLogging) {
_logger.d(
' 🟠 SKIP EVENT: ${newEvent.runtimeType} => ${event.runtimeType}',
);
}
continue;
}
fire(newEvent);
}
}
@override
void clearHistory() {
_history.clear();
}
@override
void reset() {
clearHistory();
_inProgress.add([]);
_lastEventSubject.add(EmptyEvent());
}
@override
void dispose() {
_inProgress.close();
_lastEventSubject.close();
}
}

View file

@ -1,18 +0,0 @@
import 'package:equatable/equatable.dart';
import 'app_event.dart';
/// The history entry
class EventBusHistoryEntry extends Equatable {
/// The history entry
const EventBusHistoryEntry(this.event, this.timestamp);
/// The event
final AppEvent event;
/// The timestamp
final DateTime timestamp;
@override
List<Object?> get props => [event, timestamp];
}

View file

@ -1,4 +0,0 @@
export 'app_event.dart';
export 'event_bus.dart';
export 'history_entry.dart';
export 'subscription.dart';

View file

@ -1,78 +0,0 @@
import 'dart:async';
/// The function/method signature for the event handler
typedef Responder<T> = void Function(T event);
/// The class manages the subscription to event bus
class Subscription {
/// Create the subscription
///
/// Should barely used directly, to subscribe to event bus, use `EventBus.respond`.
Subscription(this._stream);
/// Returns an instance that indicates there is no subscription
factory Subscription.empty() => const _EmptySubscription();
final Stream _stream;
/// Subscriptions that registered to event bus
final List<StreamSubscription> subscriptions = [];
Stream<T> _cast<T>() {
if (T == dynamic) {
return _stream as Stream<T>;
} else {
return _stream.where((event) => event is T).cast<T>();
}
}
/// Register a [responder] to event bus for the event type [T].
/// If [T] is omitted or given as `dynamic`, it listens to all events that published on [EventBus].
///
/// Method call can be safely chained, and the order doesn't matter.
///
/// ```
/// eventBus
/// .respond<EventA>(responderA)
/// .respond<EventB>(responderB);
/// ```
Subscription respond<T>(Responder<T> responder) {
subscriptions.add(_cast<T>().listen(responder));
return this;
}
/// Cancel all the registered subscriptions.
/// After calling this method, all the events published won't be delivered to the cleared responders any more.
///
/// No harm to call more than once.
void dispose() {
if (subscriptions.isEmpty) {
return;
}
for (final s in subscriptions) {
s.cancel();
}
subscriptions.clear();
}
}
class _EmptySubscription implements Subscription {
const _EmptySubscription();
static final List<StreamSubscription> emptyList =
List.unmodifiable(<StreamSubscription>[]);
@override
void dispose() {}
@override
Subscription respond<T>(responder) => throw Exception('Not supported');
@override
List<StreamSubscription> get subscriptions => emptyList;
@override
Stream<T> _cast<T>() => throw Exception('Not supported');
@override
Stream get _stream => throw Exception('Not supported');
}

View file

@ -1,20 +0,0 @@
name: angel3_event_bus
description: Event Bus for Dart.
version: 0.6.2
homepage: https://github.com/AndrewPiterov/event_bus_plus
environment:
sdk: '>=2.17.1 <4.0.0'
dependencies:
clock: ^1.1.0
equatable: ^2.0.5
logger: ^2.0.2+1
angel3_reactivex: ^0.27.5
dev_dependencies:
flutter_lints: ^3.0.0
given_when_then_unit_test: ^0.2.1
shouldly: ^0.5.0+1
test: ^1.22.0

View file

@ -1,70 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'models.dart';
import 'package:test/test.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus();
});
test('emit Follower Events', () {
expect(
bus.on(),
emitsInOrder([
const FollowAppEvent('username'),
EmptyEvent(),
const FollowAppEvent('username3'),
EmptyEvent(),
const FollowAppEvent('username2'),
EmptyEvent(),
]));
bus.fire(const FollowAppEvent('username'));
bus.fire(const FollowAppEvent('username3'));
bus.fire(const FollowAppEvent('username2'));
});
test('start watch but not complete', () {
expect(
bus.on(),
emitsInOrder([
const FollowAppEvent('username'),
]));
bus.watch(const FollowAppEvent('username'));
});
test('start watch and complete', () {
const watchable = FollowAppEvent('username3');
expect(
bus.on(),
emitsInOrder([
watchable,
EmptyEvent(),
const EventCompletionEvent(watchable),
EmptyEvent(),
const FollowSuccessfullyEvent(watchable),
EmptyEvent(),
]));
bus.watch(watchable);
bus.complete(watchable,
nextEvent: const FollowSuccessfullyEvent(watchable));
});
// test('emit Follower Events', () {
// final watchable = FollowAppEvent('username3', id: '3');
// expect(
// _bus.on(),
// emitsInAnyOrder([
// FollowAppEvent('username3', id: '3'),
// FollowSuccessfullyAppEvent(watchable),
// ]));
// _bus.watch(watchable);
// _bus.complete(watchable, withh: FollowSuccessfullyAppEvent(watchable));
// });
}

View file

@ -1,58 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:test/expect.dart';
import 'package:test/scaffolding.dart';
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus();
});
test('Call once', () {
expectLater(
bus.last$,
emitsInOrder([
const SomeEvent(),
EmptyEvent(),
const SomeAnotherEvent(),
EmptyEvent(),
]));
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
});
// test('Call twice', () {
// const event = SomeEvent();
// expectLater(
// bus.last$,
// emitsInOrder([
// const SomeEvent(),
// EmptyEvent(),
// const SomeAnotherEvent(),
// EmptyEvent(),
// ]));
// bus.fire(event);
// bus.fire(event);
// bus.fire(const SomeAnotherEvent());
// });
// test('Call three times', () {
// const event = SomeEvent();
// expectLater(
// bus.last$,
// emitsInOrder([
// const SomeEvent(),
// EmptyEvent(),
// const SomeAnotherEvent(),
// EmptyEvent(),
// ]));
// bus.fire(event);
// bus.fire(event);
// bus.fire(event);
// bus.fire(const SomeAnotherEvent());
// });
}

View file

@ -1,20 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:test/test.dart';
import 'models.dart';
void main() {
final IEventBus _bus = EventBus();
test('Should fire Empty event', () {
expect(
_bus.last$,
emitsInOrder(
[
const SomeEvent(),
EmptyEvent(),
],
),
);
_bus.fire(const SomeEvent());
}, timeout: const Timeout(Duration(seconds: 1)));
}

View file

@ -1,63 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:shouldly/shouldly.dart';
import 'package:test/scaffolding.dart';
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus();
});
test('Empty event bus', () {
bus.isBusy.should.beFalse();
});
when('start some event', () {
const event = FollowAppEvent('username');
// const eventId = '1';
before(() {
bus.watch(event);
});
then('should be busy', () {
bus.isBusy.should.beTrue();
});
then('should be in progress', () {
bus.isInProgress<FollowAppEvent>().should.beTrue();
});
and('complete the event', () {
before(() {
bus.complete(event);
});
then('should not be busy', () {
bus.isBusy.should.beFalse();
});
then('should not be in progress', () {
bus.isInProgress<FollowAppEvent>().should.not.beTrue();
});
});
});
group('compare equality', () {
// test('compare two equal events - should not be equal', () {
// final event = FollowAppEvent('username');
// final event2 = FollowAppEvent('username');
// event.should.not.be(event2);
// });
test('compare two equal events - should be equal', () {
const event = FollowAppEvent('username');
const event2 = FollowAppEvent('username');
event.should.be(event2);
});
});
}

View file

@ -1,75 +0,0 @@
import 'package:clock/clock.dart';
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:shouldly/shouldly.dart';
import 'package:test/test.dart';
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus(maxHistoryLength: 3);
});
test('Keep history', () {
final date = DateTime(2022, 9, 1);
withClock(Clock.fixed(date), () {
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
bus.fire(const FollowAppEvent('@username'));
bus.history.should.be([
EventBusHistoryEntry(const SomeEvent(), date),
EventBusHistoryEntry(const SomeAnotherEvent(), date),
EventBusHistoryEntry(const FollowAppEvent('@username'), date),
]);
});
});
test('Keep full history without debounce', () {
final date = DateTime(2022, 9, 1, 4, 59, 55);
withClock(Clock.fixed(date), () {
bus.fire(const SomeEvent());
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
bus.history.should.be([
EventBusHistoryEntry(const SomeEvent(), date),
EventBusHistoryEntry(const SomeEvent(), date),
EventBusHistoryEntry(const SomeAnotherEvent(), date),
]);
});
});
test('Clear history', () {
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
bus.clearHistory();
bus.history.should.beEmpty();
});
group('History length', () {
test('Add more than max', () {
final date = DateTime(2022, 9, 1, 4, 59, 55);
withClock(Clock.fixed(date), () {
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
bus.fire(const FollowAppEvent('@username'));
bus.fire(const NewCommentEvent('text'));
bus.history.length.should.be(3);
bus.history.should.be([
EventBusHistoryEntry(const SomeAnotherEvent(), date),
EventBusHistoryEntry(const FollowAppEvent('@username'), date),
EventBusHistoryEntry(const NewCommentEvent('text'), date),
]);
});
});
});
}

View file

@ -1,36 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:test/test.dart';
import '../models.dart';
void main() {
late IEventBus bus;
before(
() {
bus = EventBus(
map: {
SomeEvent: [
(e) => e,
(e) => const SomeAnotherEvent(),
],
},
);
},
);
test('does not emit the same event', () {
expect(
bus.last$,
emitsInOrder(
[
const SomeEvent(),
const SomeAnotherEvent(),
EmptyEvent(),
],
),
);
bus.fire(const SomeEvent());
}, timeout: const Timeout(Duration(seconds: 1)));
}

View file

@ -1,34 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:test/test.dart';
import '../models.dart';
void main() {
late IEventBus bus;
before(
() {
bus = EventBus(
map: {
SomeEvent: [
(e) => const SomeAnotherEvent(),
],
},
);
},
);
test('emits another', () {
expect(
bus.last$,
emitsInOrder(
[
const SomeEvent(),
const SomeAnotherEvent(),
],
),
);
bus.fire(const SomeEvent());
}, timeout: const Timeout(Duration(seconds: 1)));
}

View file

@ -1,42 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
class FollowAppEvent extends AppEvent {
const FollowAppEvent(this.username);
final String username;
@override
List<Object?> get props => [username];
}
class FollowSuccessfullyEvent extends AppEvent {
const FollowSuccessfullyEvent(this.starting);
final FollowAppEvent starting;
@override
List<Object?> get props => [starting];
}
class NewCommentEvent extends AppEvent {
const NewCommentEvent(this.text);
final String text;
@override
List<Object?> get props => [text];
}
class SomeEvent extends AppEvent {
const SomeEvent();
@override
List<Object?> get props => [];
}
class SomeAnotherEvent extends AppEvent {
const SomeAnotherEvent();
@override
List<Object?> get props => [];
}

View file

@ -1,74 +0,0 @@
// ignore_for_file: prefer_const_constructors
import 'dart:async';
import 'dart:developer';
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:test/expect.dart';
import 'package:test/scaffolding.dart';
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus();
});
test('emit Some Event', () {
final ctrl = StreamController();
final sub = bus.respond<NewCommentEvent>((event) {
log('new comment');
ctrl.add(2);
}).respond<FollowAppEvent>((event) {
log('new follower');
ctrl.add(1);
});
expect(ctrl.stream, emitsInOrder([1, 2]));
bus.fire(FollowAppEvent('username'));
bus.fire(NewCommentEvent('comment #1'));
});
test('emit Some Events', () {
final ctrl = StreamController();
final sub = bus.respond<NewCommentEvent>((event) {
log('new comment');
ctrl.add(2);
}).respond<FollowAppEvent>((event) {
log('new follower');
ctrl.add(1);
});
expect(ctrl.stream, emitsInOrder([1, 2, 1]));
bus.fire(FollowAppEvent('username'));
bus.fire(NewCommentEvent('comment #1'));
bus.fire(FollowAppEvent('username2'));
});
test('emit all Event', () {
final ctrl = StreamController();
final sub = bus.respond<NewCommentEvent>((event) {
log('new comment');
ctrl.add(2);
}).respond<FollowAppEvent>((event) {
log('new follower');
ctrl.add(1);
}).respond((event) {
log('event $event');
ctrl.add(3);
});
expect(ctrl.stream, emitsInOrder([1, 3, 3, 2, 3, 3]));
bus.fire(FollowAppEvent('username'));
bus.fire(NewCommentEvent('comment #1'));
});
}

View file

@ -1,56 +0,0 @@
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:test/test.dart';
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus();
});
test('description', () {
bus.inProgress$.map((List<AppEvent> events) =>
events.whereType<FollowAppEvent>().isNotEmpty);
}, skip: 'should skip');
test('emit Follower Event', () {
const id = 'id';
expect(bus.on(), emitsInAnyOrder([const FollowAppEvent('username')]));
bus.fire(const FollowAppEvent('username'));
});
test('emit Follower Events', () {
expect(
bus.on(),
emitsInOrder([
const FollowAppEvent('username'),
EmptyEvent(),
const FollowAppEvent('username3'),
EmptyEvent(),
const FollowAppEvent('username2'),
EmptyEvent(),
]));
bus.fire(const FollowAppEvent('username'));
bus.fire(const FollowAppEvent('username3'));
bus.fire(const FollowAppEvent('username2'));
});
test('emit New Comment Event', () {
expect(
bus.on<NewCommentEvent>(),
emitsInOrder([
const NewCommentEvent('comment #1'),
const NewCommentEvent('comment #2'),
]));
bus.fire(const FollowAppEvent('username3'));
bus.fire(const FollowAppEvent('username3'));
bus.fire(const NewCommentEvent('comment #1'));
bus.fire(const FollowAppEvent('username3'));
bus.fire(const NewCommentEvent('comment #2'));
bus.fire(const FollowAppEvent('username3'));
});
}

View file

@ -1,36 +0,0 @@
import 'package:clock/clock.dart';
import 'package:event_bus_plus/event_bus_plus.dart';
import 'package:given_when_then_unit_test/given_when_then_unit_test.dart';
import 'package:shouldly/shouldly.dart';
import 'package:test/test.dart';
import 'dart:developer' as dev;
import 'models.dart';
void main() {
late IEventBus bus;
before(() {
bus = EventBus(maxHistoryLength: 3);
});
test('right timestamp', () {
final date = DateTime(2022, 9, 1);
withClock(Clock.fixed(date), () {
bus.fire(const SomeEvent());
bus.fire(const SomeAnotherEvent());
bus.fire(const FollowAppEvent('@username'));
for (final e in bus.history) {
e.timestamp.should.be(date);
dev.log(e.toString());
}
bus.history.should.be([
EventBusHistoryEntry(const SomeEvent(), date),
EventBusHistoryEntry(const SomeAnotherEvent(), date),
EventBusHistoryEntry(const FollowAppEvent('@username'), date),
]);
});
});
}

View file

@ -1,13 +0,0 @@
#!/bin/sh
cd ..
# Generate `coverage/lcov.info` file
flutter test --coverage
# Generate HTML report
# Note: on macOS you need to have lcov installed on your system (`brew install lcov`) to use this:
genhtml coverage/lcov.info -o coverage/html
# Open the report
open coverage/html/index.html

View file

@ -1,43 +0,0 @@
name: build
on:
push:
branches:
- master
- main
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: 📚 Git Checkout
uses: actions/checkout@v4
- name: 🎯 Setup Dart
uses: dart-lang/setup-dart@v1
- name: 📦 Install Dependencies
run: dart pub get
- name: ✨ Check Formatting
run: dart format --line-length 80 --set-exit-if-changed .
- name: 🕵️ Analyze
run: dart analyze --fatal-infos --fatal-warnings .
- name: 🧪 Run Tests
run: |
dart pub global activate coverage 1.2.0
dart test --coverage=coverage && dart pub global run coverage:format_coverage --lcov --in=coverage --out=coverage/lcov.info
- name: 📊 Check Code Coverage
uses: VeryGoodOpenSource/very_good_coverage@v2
with:
path: ./coverage/lcov.info
min_coverage: 100
- name: 📈 Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View file

@ -1,7 +0,0 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

View file

@ -1,19 +0,0 @@
## 1.0.0
- Initial version of the package.
## 1.0.1
- Fix documentation. (Image path)
- Update package description.
- Fix linter rules. (Type matching)
- Update `example` files.
- Remove `mocktail` dependency.
## 1.0.2
- Update documentation.
## 1.1.0
- `Deprecate` the `Consumer` mixin in favor of `ConsumerMixin`. ([#1](https://github.com/N-Razzouk/dart_mq/issues/1))

View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 Naif Razzouk
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,165 +0,0 @@
# DartMQ: A Message Queue System for Dart and Flutter
<!-- TODO: fix pub version badge -->
[![Pub](https://img.shields.io/pub/v/dart_mq.svg)](https://pub.dev/packages/dart_mq)
[![coverage](https://codecov.io/gh/N-Razzouk/dart_mq/graph/badge.svg)](https://app.codecov.io/gh/N-Razzouk/dart_mq)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
DartMQ is a Dart package that provides message queue functionality for sending messages between different components in your Dart and Flutter applications. It offers a simple and efficient way to implement message queues, making it easier to build robust and scalable applications.
## Table of Contents
1. [Introduction](#introduction)
2. [Exchanges](#exchanges)
3. [Usage](#usage)
4. [Examples](#examples)
5. [Acknowledgment](#acknowledgment)
###
## Introduction
In the development of complex applications, dependencies among components are almost inevitable. Often, different components within your application need to communicate with each other, leading to tight coupling between these elements.
![Components](https://github.com/N-Razzouk/dart_mq/blob/master/assets/components.png?raw=true)
###
Message queues provide an effective means to decouple these components by enabling communication through messages. This decoupling strategy enhances the development of robust applications.
![Components with MQ](https://github.com/N-Razzouk/dart_mq/blob/master/assets/components-mq.png?raw=true)
###
DartMQ employs the publish-subscribe pattern. **Producers** send messages, **Consumers** receive them, and **Queues** and **Exchanges** facilitate this communication.
![Simple View](https://github.com/N-Razzouk/dart_mq/blob/master/assets/simple-view.png?raw=true)
###
Communication channels are called Exchanges. Exchanges receive messages from Producers, efficiently routing them to Queues for Consumer consumption.
![Detailed View](https://github.com/N-Razzouk/dart_mq/blob/master/assets/detailed-view.png?raw=true)
## Exchanges
### DartMQ provides different types of Exchanges for different use cases.
###
- **Default Exchange**: Routes messages based on Queue names.
![Default Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/default-exchange.png?raw=true)
###
- **Fanout Exchange**: Sends messages to all bound Queues.
![Fanout Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/fanout-exchange.png?raw=true)
###
- **Direct Exchange**: Routes messages to Queues based on routing keys.
![Direct Exchange](https://github.com/N-Razzouk/dart_mq/blob/master/assets/direct-exchange.png?raw=true)
## Usage
### Initialize an MQClient:
<!-- TODO: change import in code snippet. -->
```dart
import 'package:dart_mq/dart_mq.dart';
void main() {
// Initialize DartMQ
MQClient.initialize();
// Your application code here
}
```
### Declare a Queue:
```dart
MQClient.declareQueue('my_queue');
```
> Note: Queues are idempotent, which means that if you declare a Queue multiple times, it will not create multiple Queues. Instead, it will return the existing Queue.
### Create a Producer:
```dart
class MyProducer with ProducerMixin {
void greet(String message) {
// Send a message to the queue
sendMessage(
routingKey: 'my_queue',
payload: message,
);
}
}
```
> Note: `exchangeName` is optional. If you don't specify an exchange name, the message is sent to the default exchange.
### Create a Consumer:
```dart
class MyConsumer with ConsumerMixin {
void listenToQueue() {
// Subscribe to the queue and process incoming messages
subscribe(
queueId: 'my_queue',
callback: (message) {
// Handle incoming message
print('Received message: $message');
},
)
}
}
```
### Putting it all together:
```dart
void main() {
// Initialize DartMQ
MQClient.initialize();
// Declare a Queue
MQClient.declareQueue('my_queue');
// Create a Producer
final producer = MyProducer();
// Create a Consumer
final consumer = MyConsumer();
// Start listening
consumer.listenToQueue();
// Send a message
producer.greet('Hello World!');
// Your application code here
...
}
```
## Examples
- [Hello World](example/hello_world): A simple example that demonstrates how to send and receive messages using DartMQ.
- [Message Filtering](example/message_filtering): An example that demonstrates how to multiple consumers can listen to the same queue and filter messages accordingly.
- [Routing](example/routing): An example that demonstrates how to use Direct Exchanges to route messages to different queues based on the routing key.
- [RPC (Remote Procedure Call)](example/rpc): An example that demonstrates how to send RPC requests and receive responses using DartMQ.
## Acknowledgment
- [RabbitMQ](https://www.rabbitmq.com/): This package is inspired by RabbitMQ, an open-source message-broker software that implements the Advanced Message Queuing Protocol (AMQP).

View file

@ -1,211 +0,0 @@
# This file configures the static analysis results for your project (errors,
# warnings, and lints).
#
# This enables the 'recommended' set of lints from `package:lints`.
# This set helps identify many issues that may lead to problems when running
# or consuming Dart code, and enforces writing Dart using a single, idiomatic
# style and format.
#
# If you want a smaller set of lints you can change this to specify
# 'package:lints/core.yaml'. These are just the most critical lints
# (the recommended set includes the core lints).
# The core lints are also what is used by pub.dev for scoring packages.
include: package:lints/recommended.yaml
linter:
rules:
- prefer_const_constructors
- prefer_const_literals_to_create_immutables
- prefer_final_fields
- always_put_required_named_parameters_first
- avoid_init_to_null
- lines_longer_than_80_chars
- use_function_type_syntax_for_parameters
- avoid_relative_lib_imports
- avoid_shadowing_type_parameters
- avoid_equals_and_hash_code_on_mutable_classes
- unnecessary_brace_in_string_interps
- always_declare_return_types
- always_use_package_imports
- annotate_overrides
- avoid_bool_literals_in_conditional_expressions
- avoid_catching_errors
- avoid_double_and_int_checks
- avoid_dynamic_calls
- avoid_empty_else
- avoid_escaping_inner_quotes
- avoid_field_initializers_in_const_classes
- avoid_final_parameters
- avoid_function_literals_in_foreach_calls
- avoid_js_rounded_ints
- avoid_multiple_declarations_per_line
- avoid_null_checks_in_equality_operators
- avoid_positional_boolean_parameters
- avoid_print
- avoid_private_typedef_functions
- avoid_redundant_argument_values
- avoid_renaming_method_parameters
- avoid_return_types_on_setters
- avoid_returning_null_for_void
- avoid_returning_this
- avoid_setters_without_getters
- avoid_single_cascade_in_expression_statements
- avoid_slow_async_io
- avoid_type_to_string
- avoid_types_as_parameter_names
- avoid_unnecessary_containers
- avoid_unused_constructor_parameters
- avoid_void_async
- avoid_web_libraries_in_flutter
- await_only_futures
- camel_case_extensions
- camel_case_types
- cancel_subscriptions
- cascade_invocations
- cast_nullable_to_non_nullable
- collection_methods_unrelated_type
- combinators_ordering
- comment_references
- conditional_uri_does_not_exist
- constant_identifier_names
- control_flow_in_finally
- curly_braces_in_flow_control_structures
- dangling_library_doc_comments
- depend_on_referenced_packages
- deprecated_consistency
- directives_ordering
- empty_catches
- empty_constructor_bodies
- empty_statements
- eol_at_end_of_file
- exhaustive_cases
- file_names
- flutter_style_todos
- hash_and_equals
- implementation_imports
- implicit_call_tearoffs
- implicit_reopen
- invalid_case_patterns
- join_return_with_assignment
- leading_newlines_in_multiline_strings
- library_annotations
- library_names
- library_prefixes
- library_private_types_in_public_api
- literal_only_boolean_expressions
- missing_whitespace_between_adjacent_strings
- no_adjacent_strings_in_list
- no_default_cases
- no_duplicate_case_values
- no_leading_underscores_for_library_prefixes
- no_leading_underscores_for_local_identifiers
- no_logic_in_create_state
- no_runtimeType_toString
- non_constant_identifier_names
- noop_primitive_operations
- null_check_on_nullable_type_parameter
- null_closures
- omit_local_variable_types
- one_member_abstracts
- only_throw_errors
- overridden_fields
- package_api_docs
- package_names
- package_prefixed_library_names
- parameter_assignments
- prefer_adjacent_string_concatenation
- prefer_asserts_in_initializer_lists
- prefer_asserts_with_message
- prefer_collection_literals
- prefer_conditional_assignment
- prefer_const_constructors_in_immutables
- prefer_const_declarations
- prefer_constructors_over_static_methods
- prefer_contains
- prefer_final_in_for_each
- prefer_final_locals
- prefer_for_elements_to_map_fromIterable
- prefer_function_declarations_over_variables
- prefer_generic_function_type_aliases
- prefer_if_elements_to_conditional_expressions
- prefer_if_null_operators
- prefer_initializing_formals
- prefer_inlined_adds
- prefer_int_literals
- prefer_interpolation_to_compose_strings
- prefer_is_empty
- prefer_is_not_empty
- prefer_is_not_operator
- prefer_iterable_whereType
- prefer_null_aware_method_calls
- prefer_null_aware_operators
- prefer_single_quotes
- prefer_spread_collections
- prefer_typing_uninitialized_variables
- prefer_void_to_null
- provide_deprecation_message
- public_member_api_docs
- recursive_getters
- require_trailing_commas
- secure_pubspec_urls
- sized_box_for_whitespace
- sized_box_shrink_expand
- slash_for_doc_comments
- sort_child_properties_last
- sort_constructors_first
- sort_pub_dependencies
- sort_unnamed_constructors_first
- test_types_in_equals
- throw_in_finally
- tighten_type_of_initializing_formals
- type_annotate_public_apis
- type_init_formals
- unawaited_futures
- unnecessary_await_in_return
- unnecessary_breaks
- unnecessary_const
- unnecessary_constructor_name
- unnecessary_getters_setters
- unnecessary_lambdas
- unnecessary_late
- unnecessary_library_directive
- unnecessary_new
- unnecessary_null_aware_assignments
- unnecessary_null_checks
- unnecessary_null_in_if_null_operators
- unnecessary_nullable_for_final_variable_declarations
- unnecessary_overrides
- unnecessary_parenthesis
- unnecessary_raw_strings
- unnecessary_statements
- unnecessary_string_escapes
- unnecessary_string_interpolations
- unnecessary_this
- unnecessary_to_list_in_spreads
- unrelated_type_equality_checks
- use_build_context_synchronously
- use_colored_box
- use_enums
- use_full_hex_values_for_flutter_colors
- use_if_null_to_convert_nulls_to_bools
- use_is_even_rather_than_modulo
- use_key_in_widget_constructors
- use_late_for_private_fields_and_variables
- use_named_constants
- use_raw_strings
- use_rethrow_when_possible
- use_setters_to_change_properties
- use_string_buffers
- use_string_in_part_of_directives
- use_super_parameters
- use_test_throws_matchers
- use_to_and_as_if_applicable
- valid_regexps
- void_checks
# For more information about the core and recommended set of lints, see
# https://dart.dev/go/core-lints
# For additional information about configuring this file, see
# https://dart.dev/guides/language/analysis-options

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View file

@ -1,16 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'receiver.dart';
import 'sender.dart';
void main() async {
MQClient.initialize();
final sender = Sender();
final receiver = Receiver()..listenToGreeting();
await sender.sendGreeting(greeting: 'Hello, World!');
receiver.stopListening();
}

View file

@ -1,27 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'task_manager.dart';
import 'worker_one.dart';
import 'worker_two.dart';
void main() async {
MQClient.initialize();
final workerOne = WorkerOne();
final workerTwo = WorkerTwo();
final taskManager = TaskManager();
workerOne.startListening();
workerTwo.startListening();
taskManager
..sendTask(task: 'Hello..')
..sendTask(task: 'Hello...')
..sendTask(task: 'Hello....')
..sendTask(task: 'Hello.')
..sendTask(task: 'Hello.......')
..sendTask(task: 'Hello..');
}

View file

@ -1,12 +0,0 @@
import 'package:angel3_mq/mq.dart';
final class TaskManager with ProducerMixin {
TaskManager() {
MQClient.instance.declareQueue('task_queue');
}
void sendTask({required String task}) => sendMessage(
payload: task,
routingKey: 'task_queue',
);
}

View file

@ -1,22 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class WorkerOne with ConsumerMixin {
WorkerOne() {
MQClient.instance.declareQueue('task_queue');
}
void startListening() => subscribe(
queueId: 'task_queue',
filter: (Object messagePayload) => messagePayload
.toString()
.split('')
.where((String char) => char == '.')
.length
.isEven,
callback: (Message message) {
log('WorkerOne reacting to ${message.payload}');
},
);
}

View file

@ -1,24 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class WorkerTwo with ConsumerMixin {
WorkerTwo() {
MQClient.instance.declareQueue('task_queue');
}
void startListening() => subscribe(
queueId: 'task_queue',
filter: (Object messagePayload) =>
messagePayload
.toString()
.split('')
.where((String char) => char == '.')
.length %
2 !=
0,
callback: (Message message) {
log('WorkerTwo reacting to ${message.payload}');
},
);
}

View file

@ -1,18 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class Receiver with ConsumerMixin {
Receiver() {
MQClient.instance.declareQueue('hello');
}
void listenToGreeting() => subscribe(
queueId: 'hello',
callback: (Message message) {
log('Received: ${message.payload}');
},
);
void stopListening() => unsubscribe(queueId: 'hello');
}

View file

@ -1,39 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class DebugLogger with ConsumerMixin {
DebugLogger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('debug');
}
late final String _queueName;
void startListening() {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'info',
);
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'warning',
);
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'error',
);
subscribe(
queueId: _queueName,
callback: (Message message) {
log('Debug Logger recieved: ${message.payload}');
},
);
}
}

View file

@ -1,21 +0,0 @@
import 'package:angel3_mq/mq.dart';
final class Logger with ProducerMixin {
Logger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
}
Future<void> log({
required String level,
required String message,
}) async {
sendMessage(
payload: message,
exchangeName: 'logs',
routingKey: level,
);
}
}

View file

@ -1,30 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'debug_logger.dart';
import 'logger.dart';
import 'production_logger.dart';
void main() async {
MQClient.initialize();
DebugLogger().startListening();
ProductionLogger().startListening();
final logger = Logger();
await logger.log(
level: 'info',
message: 'This is an info message',
);
await logger.log(
level: 'warning',
message: 'This is a warning message',
);
await logger.log(
level: 'error',
message: 'This is an error message',
);
}

View file

@ -1,29 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
final class ProductionLogger with ConsumerMixin {
ProductionLogger() {
MQClient.instance.declareExchange(
exchangeName: 'logs',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('production');
}
late final String _queueName;
void startListening() {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'logs',
bindingKey: 'error',
);
subscribe(
queueId: _queueName,
callback: (Message message) {
log('Production Logger recieved: ${message.payload}');
},
);
}
}

View file

@ -1,19 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'service_one.dart';
import 'service_two.dart';
void main() {
MQClient.initialize();
MQClient.instance.declareExchange(
exchangeName: 'ServiceRPC',
exchangeType: ExchangeType.direct,
);
final serviceOne = ServiceOne();
ServiceTwo().startListening();
serviceOne.requestFoo();
}

View file

@ -1,19 +0,0 @@
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
class ServiceOne with ProducerMixin {
Future<void> requestFoo() async {
final res = await sendRPCMessage<String>(
exchangeName: 'ServiceRPC',
routingKey: 'rpcBinding',
processId: 'foo',
args: {},
);
_handleFuture(res);
}
void _handleFuture(String data) {
log('Service One received: $data\n');
}
}

View file

@ -1,46 +0,0 @@
import 'dart:async';
import 'dart:developer';
import 'package:angel3_mq/mq.dart';
class ServiceTwo with ConsumerMixin {
ServiceTwo() {
MQClient.instance.declareExchange(
exchangeName: 'ServiceRPC',
exchangeType: ExchangeType.direct,
);
_queueName = MQClient.instance.declareQueue('two');
}
late final String _queueName;
Future<void> startListening() async {
MQClient.instance.bindQueue(
queueId: _queueName,
exchangeName: 'ServiceRPC',
bindingKey: 'rpcBinding',
);
subscribe(
queueId: _queueName,
callback: (Message message) async {
log('Service Two got message $message\n');
if (message.headers['type'] == 'RPC') {
switch (message.headers['processId']) {
case 'foo':
final data = await foo();
final Completer completer =
message.headers['completer'] ?? (throw Exception());
completer.complete(data);
default:
}
}
},
);
}
Future<String> foo() async {
// log('Service Two bar\n');
await Future.delayed(const Duration(seconds: 2));
return 'Hello, world!';
}
}

View file

@ -1,12 +0,0 @@
import 'package:angel3_mq/mq.dart';
final class Sender with ProducerMixin {
Sender() {
MQClient.instance.declareQueue('hello');
}
Future<void> sendGreeting({required String greeting}) async => sendMessage(
routingKey: 'hello',
payload: greeting,
);
}

View file

@ -1,11 +0,0 @@
/// Library definition.
library angel3_mq;
/// Export files.
export 'src/consumer/consumer.dart';
export 'src/consumer/consumer.mixin.dart';
export 'src/core/constants/enums.dart';
export 'src/message/message.dart';
export 'src/mq/mq.dart';
export 'src/producer/producer.dart';
export 'src/producer/producer.mixin.dart';

View file

@ -1,75 +0,0 @@
import 'package:angel3_mq/src/binding/binding.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing a binding between a topic and its associated queues.
///
/// The `Binding` class implements the [BindingInterface] interface and is
/// responsible for managing the association between a topic and its associated
/// queues. It allows the addition and removal of queues to the binding and the
/// publication of messages to all associated queues.
///
/// Example:
/// ```dart
/// final binding = Binding('my_binding');
/// final queue1 = Queue('queue_1');
/// final queue2 = Queue('queue_2');
///
/// // Add queues to the binding.
/// binding.addQueue(queue1);
/// binding.addQueue(queue2);
///
/// // Publish a message to all associated queues.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// binding.publishMessage(message);
///
/// // Check if the binding has associated queues.
/// final hasQueues = binding.hasQueues(); // Returns true
/// ```
final class Binding implements BindingInterface {
/// Creates a new binding with the specified [id].
///
/// The [id] parameter represents the unique identifier for the binding.
Binding(this.id);
/// The unique identifier for the binding.
final String id;
/// A list of associated queues.
final List<Queue> _queues = [];
@override
bool hasQueues() => _queues.isNotEmpty;
@override
void addQueue(Queue queue) => _queues.add(queue);
@override
void removeQueue(String queueId) => _queues.removeWhere(
(Queue queue) => queue.id == queueId && queue.hasListeners()
? throw QueueHasSubscribersException(queueId)
: queue.id == queueId,
);
@override
void publishMessage(Message message) {
for (final queue in _queues) {
queue.enqueue(message);
}
}
@override
void clear() {
for (final queue in _queues) {
if (queue.hasListeners()) {
throw QueueHasSubscribersException(queue.id);
}
}
_queues.clear();
}
}

View file

@ -1,44 +0,0 @@
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// An abstract interface class defining the contract for managing bindings.
///
/// The `BindingInterface` abstract interface class defines a contract for
/// classes that are responsible for managing bindings between topics and
/// queues. Implementing classes must provide functionality for adding and
/// removing queues from the binding, publishing messages to the associated
/// queues, and checking if the binding has queues.
///
/// Example:
/// ```dart
/// class MyBinding implements BindingInterface {
/// // Custom implementation of the binding interface methods.
/// }
/// ```
abstract interface class BindingInterface {
/// Checks if the binding has associated queues.
///
/// Returns `true` if the binding has one or more associated queues;
/// otherwise, `false`.
bool hasQueues();
/// Adds a queue to the binding.
///
/// The [queue] parameter represents the queue to be associated with the
/// binding.
void addQueue(Queue queue);
/// Removes a queue from the binding based on its ID.
///
/// The [queueId] parameter represents the ID of the queue to be removed.
void removeQueue(String queueId);
/// Publishes a message to all associated queues in the binding.
///
/// The [message] parameter represents the message to be published to the
/// queues.
void publishMessage(Message message);
/// Removes all queues from the binding.
void clear();
}

View file

@ -1,95 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/src/consumer/consumer.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
/// A mixin implementing the `ConsumerInterface` for message consumption.
///
/// The `Consumer` mixin provides a concrete implementation of the
/// `ConsumerInterface`for message consumption. It allows classes to easily
/// consume messages from specific queues by subscribing to them, handling
/// received messages, and managing subscriptions.
///
/// Example:
/// ```dart
/// class MyMessageConsumer with Consumer {
/// // Custom implementation of the message consumer.
/// }
/// ```
@Deprecated('Please use `ConsumerMixin` instead. '
'This will be removed in v2.0.0')
mixin Consumer implements ConsumerInterface {
/// A registry of active message subscriptions.
final Registrar<StreamSubscription<Message>> _subscriptions =
Registrar<StreamSubscription<Message>>();
@override
Message? getLatestMessage(String queueId) =>
MQClient.instance.getLatestMessage(queueId);
@override
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
try {
final messageStream = MQClient.instance.fetchQueue(queueId);
final sub = filter != null
? messageStream.listen((Message message) {
if (filter(message.payload)) {
callback(message);
}
})
: messageStream.listen(callback);
_subscriptions.register(queueId, sub);
} on IdAlreadyRegisteredException catch (_) {
throw ConsumerAlreadySubscribedException(
consumer: runtimeType.toString(),
queue: queueId,
);
}
}
@override
void unsubscribe({required String queueId}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
}
@override
void pauseSubscription(String queueId) => _subscriptions.get(queueId).pause();
@override
void resumeSubscription(String queueId) =>
_subscriptions.get(queueId).resume();
@override
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
subscribe(
queueId: queueId,
callback: callback,
filter: filter,
);
}
@override
void clearSubscriptions() {
for (final StreamSubscription sub in _subscriptions.getAll()) {
sub.cancel();
}
_subscriptions.clear();
}
}

View file

@ -1,74 +0,0 @@
import 'package:angel3_mq/src/message/message.dart';
/// An abstract interface class defining the contract for a message consumer.
///
/// The `ConsumerInterface` abstract interface class defines a contract for
/// classes that implement a message consumer. Implementing classes must
/// provide methods for subscribing and unsubscribing from queues, pausing and
/// resuming subscriptions, updating subscriptions, retrieving the
/// latest message from a queue, and clearing all subscriptions.
///
/// Example:
/// ```dart
/// class MyConsumer implements ConsumerInterface {
/// // Custom implementation of the message consumer.
/// }
/// ```
abstract interface class ConsumerInterface {
/// Subscribes to a queue to receive messages.
///
/// The [queueId] parameter represents the ID of the queue to subscribe to.
/// The [callback] parameter is a function that will be invoked for each
/// received message.
/// The [filter] parameter is an optional function that can be used to filter
/// messages based on custom criteria.
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
});
/// Unsubscribes from a previously subscribed queue.
///
/// The [queueId] parameter represents the ID of the queue to unsubscribe
/// from.
void unsubscribe({required String queueId});
/// Pauses message subscription for a specified queue.
///
/// The [queueId] parameter represents the ID of the queue to pause the
/// subscription.
void pauseSubscription(String queueId);
/// Resumes a paused subscription for a specified queue.
///
/// The [queueId] parameter represents the ID of the queue to resume the
/// subscription.
void resumeSubscription(String queueId);
/// Updates an existing subscription with a new callback and/or filter.
///
/// The [queueId] parameter represents the ID of the queue to update the
/// subscription.
/// The [callback] parameter is a new function that will be invoked for each
/// received message.
/// The [filter] parameter is an optional new filter function for message
/// filtering.
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
});
/// Retrieves the latest message from a queue.
///
/// The [queueId] parameter represents the ID of the queue to fetch the latest
/// message from.
///
/// Returns the latest message from the specified queue or `null` if the queue
/// is empty.
Message? getLatestMessage(String queueId);
/// Clears all active subscriptions, unsubscribing from all queues.
void clearSubscriptions();
}

View file

@ -1,92 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/src/consumer/consumer.interface.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
/// A mixin implementing the `ConsumerInterface` for message consumption.
///
/// The `ConsumerMixin` mixin provides a concrete implementation of the
/// `ConsumerInterface`for message consumption. It allows classes to easily
/// consume messages from specific queues by subscribing to them, handling
/// received messages, and managing subscriptions.
///
/// Example:
/// ```dart
/// class MyMessageConsumer with ConsumerMixin {
/// // Custom implementation of the message consumer.
/// }
/// ```
mixin ConsumerMixin implements ConsumerInterface {
/// A registry of active message subscriptions.
final Registrar<StreamSubscription<Message>> _subscriptions =
Registrar<StreamSubscription<Message>>();
@override
Message? getLatestMessage(String queueId) =>
MQClient.instance.getLatestMessage(queueId);
@override
void subscribe({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
try {
final messageStream = MQClient.instance.fetchQueue(queueId);
final sub = filter != null
? messageStream.listen((Message message) {
if (filter(message.payload)) {
callback(message);
}
})
: messageStream.listen(callback);
_subscriptions.register(queueId, sub);
} on IdAlreadyRegisteredException catch (_) {
throw ConsumerAlreadySubscribedException(
consumer: runtimeType.toString(),
queue: queueId,
);
}
}
@override
void unsubscribe({required String queueId}) => _subscriptions
..get(queueId).cancel()
..unregister(queueId);
@override
void pauseSubscription(String queueId) => _subscriptions.get(queueId).pause();
@override
void resumeSubscription(String queueId) =>
_subscriptions.get(queueId).resume();
@override
void updateSubscription({
required String queueId,
required Function(Message) callback,
bool Function(Object)? filter,
}) {
_subscriptions.get(queueId).cancel();
_subscriptions.unregister(queueId);
subscribe(
queueId: queueId,
callback: callback,
filter: filter,
);
}
@override
void clearSubscriptions() {
for (final StreamSubscription sub in _subscriptions.getAll()) {
sub.cancel();
}
_subscriptions.clear();
}
}

View file

@ -1,22 +0,0 @@
/// An enumeration representing different types of message exchanges.
///
/// The [ExchangeType] enum defines various types of message exchanges that are
/// commonly used in messaging systems. Each type represents a specific behavior
/// for distributing messages to multiple queues or consumers.
///
/// - `direct`: A direct exchange routes messages to queues based on a specified
/// routing key.
/// - `base`: The default exchange (unnamed) routes messages to queues using
/// their names.
/// - `fanout`: A fanout exchange routes messages to all connected queues,
/// ignoring routing keys.
enum ExchangeType {
/// Represents a direct message exchange.
direct,
/// Represents the default exchange (unnamed).
base,
/// Represents a fanout message exchange.
fanout,
}

View file

@ -1,99 +0,0 @@
/// A utility class providing exception-related error messages.
///
/// The `ExceptionStrings` class defines static methods that generate error
/// messages for various exception scenarios. These messages can be used to
/// provide descriptive error information in exception handling and debugging.
class ExceptionStrings {
/// Generates an error message when MQClient is not initialized.
///
/// This message is used when attempting to use the MQClient before it has
/// been properly initialized using the `MQClient.initialize()` method.
static String mqClientNotInitialized() =>
'MQClient is not initialized. Please make sure to call '
'MQClient.initialize() first.';
/// Generates an error message for a Queue that is not registered.
///
/// The [queueId] parameter represents the name of the unregistered queue.
static String queueNotRegistered(String queueId) =>
'Queue: $queueId is not registered.';
/// Generates an error message for a queue with active subscribers.
///
/// The [queueId] parameter represents the ID of the queue with active
/// subscribers.
static String queueHasSubscribers(String queueId) =>
'Queue: $queueId has subscribers.';
/// Generates an error message for a queue with no name.
///
/// This message is used when the name of the queue is not provided and is
/// null.
static String queueIdNull() => "Queue name can't be null.";
/// Generates an error message for a required routing key.
///
/// This message is used when a routing key is required for a specific
/// operation but is not provided.
static String routingKeyRequired() => 'Routing key is required.';
/// Generates an error message for a non-existent binding key.
///
/// The [bindingKey] parameter represents the non-existent binding key.
static String bindingKeyNotFound(String bindingKey) =>
'The binding key "$bindingKey" was not found.';
/// Generates an error message for a missing binding key.
///
/// This message is used when a binding operation expects a binding key to
static String bindingKeyRequired() => 'Binding key is required.';
/// Generates an error message for an exchange that is not registered.
///
/// The [exchangeName] parameter represents the name of the unregistered
/// exchange.
static String exchangeNotRegistered(String exchangeName) =>
'Exchange: $exchangeName is not registered.';
/// Generates an error message for invalid exchange type.
static String invalidExchangeType() => 'Exchange type is invalid.';
/// Generates an error message for a consumer that is not subscribed to a
/// queue.
///
/// The [consumerId] parameter represents the ID of the consumer.
/// The [queue] parameter represents the name of the queue.
static String consumerNotSubscribed(String consumerId, String queue) =>
'The consumer "$consumerId" is not subscribed to the queue "$queue".';
/// Generates an error message for a consumer that is already subscribed to
/// a queue.
///
/// The [consumerId] parameter represents the ID of the consumer.
/// The [queue] parameter represents the name of the queue.
static String consumerAlreadySubscribed(String consumerId, String queue) =>
'The consumer "$consumerId" is already subscribed to the queue "$queue".';
/// Generates an error message for a consumer that is not registered.
///
/// The [consumerId] parameter represents the ID of the consumer.
static String consumerNotRegistered(String consumerId) =>
'The consumer "$consumerId" is not registered.';
/// Generates an error message for a consumer that has active subscriptions.
///
/// The [consumerId] parameter represents the ID of the consumer.
static String consumerHasSubscriptions(String consumerId) =>
'The consumer "$consumerId" has active subscriptions.';
/// Generates an error message for an ID that is already registered.
///
/// The [id] parameter represents the ID that is already registered.
static String idAlreadyRegistered(String id) =>
'Id "$id" already registered.';
/// Generates an error message for an ID that is not registered.
///
/// The [id] parameter represents the ID that is not registered.
static String idNotRegistered(String id) => 'Id "$id" not registered.';
}

View file

@ -1,42 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [BindingException] class represents a base exception related to
/// bindings.
///
/// It is used to handle exceptions that may occur when working with bindings,
/// such as when a binding key is not found or when a binding key is required
/// but not provided.
///
/// Subclasses of [BindingException] can provide more specific information about
/// the nature of the exception.
abstract base class BindingException implements Exception {
/// Creates a new [BindingException] with the specified error [message].
BindingException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [BindingKeyNotFoundException] class represents an exception that occurs
/// when a binding key is not found.
///
/// This exception is thrown when attempting to access a binding key that does
/// not exist in the context of bindings.
final class BindingKeyNotFoundException extends BindingException {
/// Creates a new [BindingKeyNotFoundException] instance.
BindingKeyNotFoundException(String key)
: super(ExceptionStrings.bindingKeyNotFound(key));
}
/// The [BindingKeyRequiredException] class represents an exception that occurs
/// when a binding key is required but not provided.
///
/// This exception is thrown when a binding operation expects a binding key to
/// be provided, but it is missing or empty.
final class BindingKeyRequiredException extends BindingException {
/// Creates a new [BindingKeyRequiredException] instance.
BindingKeyRequiredException() : super(ExceptionStrings.bindingKeyRequired());
}

View file

@ -1,73 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [ConsumerException] class represents a base exception related to
/// consumers.
///
/// It is used to handle exceptions that may occur when working with consumers,
/// such as when a consumer is not registered, is already subscribed to a queue,
/// is not subscribed to a queue when expected, or has active subscriptions.
///
/// Subclasses of [ConsumerException] can provide more specific information
/// about the nature of the exception.
abstract base class ConsumerException implements Exception {
/// Creates a new [ConsumerException] with the specified error [message].
ConsumerException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [ConsumerNotRegisteredException] class represents an exception that
/// occurs when a consumer is not registered.
///
/// This exception is thrown when attempting to perform operations on a consumer
/// that has not been registered.
final class ConsumerNotRegisteredException extends ConsumerException {
/// Creates a new [ConsumerNotRegisteredException] instance with the
/// specified [consumer].
ConsumerNotRegisteredException(String consumer)
: super(ExceptionStrings.consumerNotRegistered(consumer));
}
/// The [ConsumerAlreadySubscribedException] class represents an exception that
/// occurs when a consumer is already subscribed to a queue.
///
/// This exception is thrown when attempting to subscribe a consumer to a queue
/// that it is already subscribed to.
final class ConsumerAlreadySubscribedException extends ConsumerException {
/// Creates a new [ConsumerAlreadySubscribedException] instance with the
/// specified [queue].
ConsumerAlreadySubscribedException({
required String consumer,
required String queue,
}) : super(ExceptionStrings.consumerAlreadySubscribed(consumer, queue));
}
/// The [ConsumerNotSubscribedException] class represents an exception that
/// occurs when a consumer is not subscribed to a queue when expected.
///
/// This exception is thrown when an operation expects a consumer to be
/// subscribed to a queue, but the consumer is not.
final class ConsumerNotSubscribedException extends ConsumerException {
/// Creates a new [ConsumerNotSubscribedException] instance with the
/// specified [queue].
ConsumerNotSubscribedException({
required String consumer,
required String queue,
}) : super(ExceptionStrings.consumerNotSubscribed(consumer, queue));
}
/// The [ConsumerHasSubscriptionsException] class represents an exception that
/// occurs when a consumer has active subscriptions.
///
/// This exception is thrown when an operation expects a consumer to have no
/// active subscriptions, but the consumer has active subscriptions.
final class ConsumerHasSubscriptionsException extends ConsumerException {
/// Creates a new [ConsumerHasSubscriptionsException] instance with the
/// specified [consumer].
ConsumerHasSubscriptionsException(String consumer)
: super(ExceptionStrings.consumerHasSubscriptions(consumer));
}

View file

@ -1,7 +0,0 @@
export 'binding_exceptions.dart';
export 'consumer_exceptions.dart';
export 'exchange_exceptions.dart';
export 'mq_client_exceptions.dart';
export 'queue_exceptions.dart';
export 'registrar_exceptions.dart';
export 'routing_key_exceptions.dart';

View file

@ -1,44 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [ExchangeException] class represents a base exception related to
/// exchanges.
///
/// It is used to handle exceptions that may occur when working with exchanges,
/// such as when an exchange is not registered or when an invalid exchange type
/// is encountered.
///
/// Subclasses of [ExchangeException] can provide more specific information
/// about the nature of the exception.
abstract base class ExchangeException implements Exception {
/// Creates a new [ExchangeException] with the specified error [message].
ExchangeException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [ExchangeNotRegisteredException] class represents an exception that
/// occurs when an exchange is not registered.
///
/// This exception is thrown when attempting to perform operations on an
/// exchange that has not been registered.
final class ExchangeNotRegisteredException extends ExchangeException {
/// Creates a new [ExchangeNotRegisteredException] instance with the
/// specified [exchangeName].
ExchangeNotRegisteredException(String exchangeName)
: super(ExceptionStrings.exchangeNotRegistered(exchangeName));
}
/// The [InvalidExchangeTypeException] class represents an exception that occurs
/// when an invalid exchange type is encountered.
///
/// This exception is thrown when an operation encounters an exchange type that
/// is not recognized or supported.
final class InvalidExchangeTypeException extends ExchangeException {
/// Creates a new [InvalidExchangeTypeException] instance.
InvalidExchangeTypeException()
: super(ExceptionStrings.invalidExchangeType());
}

View file

@ -1,31 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [MQClientException] class represents a base exception related to the
/// MQClient.
///
/// It is used to handle exceptions that may occur when working with the
/// MQClient, such as when the MQClient is not initialized.
///
/// Subclasses of [MQClientException] can provide more specific information
/// about the nature of the exception.
abstract base class MQClientException implements Exception {
/// Creates a new [MQClientException] with the specified error [message].
MQClientException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [MQClientNotInitializedException] class represents an exception that
/// occurs when the MQClient is not initialized.
///
/// This exception is thrown when attempting to use the MQClient before it has
/// been properly initialized using the `MQClient.initialize()` method.
final class MQClientNotInitializedException extends MQClientException {
/// Creates a new [MQClientNotInitializedException] instance.
MQClientNotInitializedException()
: super(ExceptionStrings.mqClientNotInitialized());
}

View file

@ -1,54 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [QueueException] class represents a base exception related to queues.
///
/// It is used to handle exceptions that may occur when working with queues,
/// such as when a queue is not registered or when there are subscribers to a
/// queue.
///
/// Subclasses of [QueueException] can provide more specific information about
/// the nature of the exception.
abstract class QueueException implements Exception {
/// Creates a new [QueueException] with the specified error [message].
QueueException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [QueueNotRegisteredException] class represents an exception that occurs
/// when a queue with a specific ID is not registered.
///
/// This exception is thrown when attempting to perform an operation on an
/// unregistered queue.
final class QueueNotRegisteredException extends QueueException {
/// Creates a new [QueueNotRegisteredException] instance with the specified
/// [queueId].
QueueNotRegisteredException(String queueId)
: super(ExceptionStrings.queueNotRegistered(queueId));
}
/// The [QueueHasSubscribersException] class represents an exception that occurs
/// when there are active subscribers to a queue.
///
/// This exception is thrown when attempting to delete a queue that still has
/// subscribers listening to it.
final class QueueHasSubscribersException extends QueueException {
/// Creates a new [QueueHasSubscribersException] instance with the specified
/// [queueId].
QueueHasSubscribersException(String queueId)
: super(ExceptionStrings.queueHasSubscribers(queueId));
}
/// The [QueueIdNullException] class represents an exception that occurs when
/// attempting to create a queue with a null name.
///
/// This exception is thrown when the name of the queue is not provided and is
/// null.
final class QueueIdNullException extends QueueException {
/// Creates a new [QueueIdNullException] instance.
QueueIdNullException() : super(ExceptionStrings.queueIdNull());
}

View file

@ -1,43 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [RegistrarException] class represents a base exception related to
/// registrar operations.
///
/// It is used to handle exceptions that may occur when working with registrar
/// objects, which are responsible for managing and registering items.
///
/// Subclasses of [RegistrarException] can provide more specific information
/// about the nature of the exception.
abstract class RegistrarException implements Exception {
/// Creates a new [RegistrarException] with the specified error [message].
RegistrarException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [IdAlreadyRegisteredException] class represents an exception that occurs
/// when attempting to register an ID that is already registered in a registrar.
///
/// This exception is thrown when a duplicate ID is detected during the
/// registration process.
final class IdAlreadyRegisteredException extends RegistrarException {
/// Creates a new [IdAlreadyRegisteredException] instance with the specified
/// [id].
IdAlreadyRegisteredException(String id)
: super(ExceptionStrings.idAlreadyRegistered(id));
}
/// The [IdNotRegisteredException] class represents an exception that occurs
/// when attempting to access an ID that is not registered in a registrar.
///
/// This exception is thrown when an operation is performed on an unregistered
/// ID.
final class IdNotRegisteredException extends RegistrarException {
/// Creates a new [IdNotRegisteredException] instance with the specified [id].
IdNotRegisteredException(String id)
: super(ExceptionStrings.idNotRegistered(id));
}

View file

@ -1,30 +0,0 @@
import 'package:angel3_mq/src/core/constants/error_strings.dart';
/// The [RoutingKeyException] class represents a base exception related to
/// routing key operations.
///
/// It is used to handle exceptions that may occur when working with routing
/// keys, which are used for message routing in message broker systems.
///
/// Subclasses of [RoutingKeyException] can provide more specific information
/// about the nature of the exception.
abstract class RoutingKeyException implements Exception {
/// Creates a new [RoutingKeyException] with the specified error [message].
RoutingKeyException(this.message);
/// The error message associated with the exception.
final String message;
@override
String toString() => '$runtimeType: $message';
}
/// The [RoutingKeyRequiredException] class represents an exception that occurs
/// when a routing key is required for a specific operation but is not provided.
///
/// This exception is thrown when an operation expects a routing key to be
/// provided, but it is missing.
final class RoutingKeyRequiredException extends RoutingKeyException {
/// Creates a new [RoutingKeyRequiredException] instance.
RoutingKeyRequiredException() : super(ExceptionStrings.routingKeyRequired());
}

View file

@ -1,100 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
/// A generic registrar for managing and storing objects by their unique
/// identifiers.
///
/// The [Registrar] class allows you to register, get, unregister, and manage
/// objects associated with unique identifiers (IDs). It provides a way to store
/// and access objects in a key-value fashion.
///
/// Example:
/// ```dart
/// final registrar = Registrar<String>();
///
/// // Register objects with unique IDs.
/// registrar.register('user_1', 'Alice');
/// registrar.register('user_2', 'Bob');
///
/// // Get an object by its ID.
/// final user1 = registrar.get('user_1'); // Returns 'Alice'
///
/// // Check if an object with a specific ID exists.
/// final hasUser2 = registrar.has('user_2'); // Returns true
///
/// // Unregister an object by its ID.
/// registrar.unregister('user_1');
///
/// // Check the number of registered objects.
/// final count = registrar.count; // Returns 1
/// ```
final class Registrar<T> {
/// A map to store objects with their associated IDs.
final Map<String, T> _registry = {};
/// Registers an object with a unique ID.
///
/// The [id] parameter represents the unique identifier for the object.
/// The [value] parameter represents the object to be registered.
///
/// If an object with the same ID already exists, an
/// [IdAlreadyRegisteredException] is thrown.
void register(String id, T value) {
if (_registry.containsKey(id)) {
throw IdAlreadyRegisteredException(id);
}
_registry[id] = value;
}
/// Gets an object by its unique ID.
///
/// The [id] parameter represents the unique identifier of the object to
/// retrieve.
///
/// If no object with the specified ID is found, an [IdNotRegisteredException]
/// is thrown.
T get(String id) {
if (!_registry.containsKey(id)) {
throw IdNotRegisteredException(id);
}
return _registry[id]!;
}
/// Retrieves a list of all registered objects.
List<T> getAll() => _registry.values.toList();
/// Unregisters an object by its unique ID.
///
/// The [id] parameter represents the unique identifier of the object to
/// unregister.
///
/// If no object with the specified ID is found, an [IdNotRegisteredException]
/// is thrown.
void unregister(String id) {
if (!_registry.containsKey(id)) {
throw IdNotRegisteredException(id);
}
_registry.remove(id);
}
/// Clears the registrar, removing all registered objects.
void clear() => _registry.clear();
/// Checks if an object with a specific ID is registered.
///
/// The [id] parameter represents the unique identifier to check.
///
/// Returns `true` if an object with the specified ID is registered;
/// otherwise, `false`.
bool has(String id) => _registry.containsKey(id);
/// Returns the count of registered objects.
int get count => _registry.length;
@override
String toString() {
return '''
Registrar(
\t${_registry.entries.map((e) => '${e.key}: ${e.value}').join(',\n\t')}
)''';
}
}

View file

@ -1,86 +0,0 @@
import 'package:angel3_mq/src/binding/binding.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/exchange/exchange.base.dart';
import 'package:angel3_mq/src/exchange/exchange_interface.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing the default message exchange for message routing.
///
/// The `DefaultExchange` class is a specific implementation of the
/// `BaseExchange` abstract base class, representing the default exchange.
/// It provides functionality for binding queues, forwarding messages based on
/// routing keys, and preventing unbinding from the default exchange.
///
/// Example:
/// ```dart
/// final defaultExchange = DefaultExchange('default_exchange');
///
/// // Bind a queue to the default exchange.
/// final queue = Queue('my_queue');
/// defaultExchange.bindQueue(queue: queue, bindingKey: 'my_routing_key');
///
/// // Forward a message to the default exchange using a routing key.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// defaultExchange.forwardMessage(message, routingKey: 'my_routing_key');
/// ```
final class DefaultExchange extends BaseExchange implements ExchangeInterface {
/// Creates a new instance of the default exchange with the specified [id].
///
/// The [id] parameter represents the unique identifier for the default
/// exchange.
DefaultExchange(super.id);
@override
void bindQueue({
required Queue queue,
required String bindingKey,
}) =>
(bindings.has(bindingKey)
? bindings.get(bindingKey)
: _registerAndGetBinding(bindingKey))
..addQueue(queue);
Binding _registerAndGetBinding(String bindingKey) {
bindings.register(bindingKey, Binding(bindingKey));
return bindings.get(bindingKey);
}
@override
void unbindQueue({
required String queueId,
required String bindingKey,
}) {
(bindings.has(bindingKey)
? bindings.get(bindingKey)
: throw BindingKeyNotFoundException(bindingKey))
.removeQueue(queueId);
if (!bindings.get(bindingKey).hasQueues()) {
bindings.unregister(bindingKey);
}
}
@override
void forwardMessage({
required Message message,
String? routingKey,
}) =>
(bindings.has(
routingKey ?? (throw RoutingKeyRequiredException()),
)
? bindings.get(routingKey)
: throw BindingKeyNotFoundException(routingKey))
.publishMessage(message);
@override
void deleteQueue(String queueId) {
for (final binding in bindings.getAll()) {
binding.removeQueue(queueId);
}
}
}

View file

@ -1,89 +0,0 @@
import 'package:angel3_mq/src/binding/binding.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/exchange/exchange.base.dart';
import 'package:angel3_mq/src/exchange/exchange_interface.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing a direct message exchange for message routing.
///
/// The `DirectExchange` class is a specific implementation of the
/// `BaseExchange` abstract base class, representing a direct exchange. A
/// direct exchange routes messages to queues based on matching routing keys.
/// It provides functionality for binding queues, forwarding messages based on
/// routing keys, and unbinding queues from the direct exchange.
///
/// Example:
/// ```dart
/// final directExchange = DirectExchange('my_direct_exchange');
///
/// // Bind queues to the direct exchange with different routing keys.
/// final queue1 = Queue('queue_1');
/// final queue2 = Queue('queue_2');
/// directExchange.bindQueue(queue: queue1, bindingKey: 'routing_key_1');
/// directExchange.bindQueue(queue: queue2, bindingKey: 'routing_key_2');
///
/// // Forward a message with a matching routing key to the appropriate queue.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// directExchange.forwardMessage(message, routingKey: 'routing_key_1');
/// ```
final class DirectExchange extends BaseExchange implements ExchangeInterface {
/// Creates a new instance of the direct exchange with the specified [id].
///
/// The [id] parameter represents the unique identifier for the direct
/// exchange.
DirectExchange(super.id);
@override
void bindQueue({
required Queue queue,
required String bindingKey,
}) =>
(bindings.has(bindingKey)
? bindings.get(bindingKey)
: _registerAndGetBinding(bindingKey))
.addQueue(queue);
Binding _registerAndGetBinding(String bindingKey) {
bindings.register(bindingKey, Binding(bindingKey));
return bindings.get(bindingKey);
}
@override
void unbindQueue({
required String queueId,
required String bindingKey,
}) {
(bindings.has(bindingKey)
? bindings.get(bindingKey)
: throw BindingKeyNotFoundException(bindingKey))
.removeQueue(queueId);
if (!bindings.get(bindingKey).hasQueues()) {
bindings.unregister(bindingKey);
}
}
@override
void forwardMessage({
required Message message,
String? routingKey,
}) =>
(bindings.has(
routingKey ?? (throw RoutingKeyRequiredException()),
)
? bindings.get(routingKey)
: throw BindingKeyNotFoundException(routingKey))
.publishMessage(message);
@override
void deleteQueue(String queueId) {
for (final binding in bindings.getAll()) {
binding.removeQueue(queueId);
}
}
}

View file

@ -1,27 +0,0 @@
import 'package:angel3_mq/src/binding/binding.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/exchange/exchange_interface.dart';
/// An abstract base class representing an exchange for message routing.
///
/// The `BaseExchange` abstract base class defines the core functionality of a
/// message exchange for routing messages to specific queues or bindings.
///
/// Example:
/// ```dart
/// class MyExchange extends BaseExchange {
/// // Custom implementation of the exchange.
/// }
/// ```
abstract base class BaseExchange implements ExchangeInterface {
/// Creates a new exchange with the specified [id].
///
/// The [id] parameter represents the unique identifier for the exchange.
BaseExchange(this.id);
/// The unique identifier for the exchange.
final String id;
/// A registrar for managing bindings associated with the exchange.
Registrar<Binding> bindings = Registrar<Binding>();
}

View file

@ -1,51 +0,0 @@
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// An abstract interface class defining the contract for managing exchanges.
///
/// The `ExchangeInterface` defines a contract for classes that are responsible
/// for managing exchanges. Implementing classes must provide functionality for
/// binding queues to the exchange, unbinding queues from the exchange,
/// forwarding messages to queues or bindings, and removing queues from all
/// associated bindings.
///
/// Example:
/// ```dart
/// class MyExchange implements ExchangeInterface {
/// // Custom implementation of the exchange.
/// }
/// ```
abstract interface class ExchangeInterface {
/// Binds a queue to the exchange with a specific binding key.
///
/// The [queue] parameter represents the queue to be bound to the exchange.
/// The [bindingKey] parameter represents the binding key for the queue.
void bindQueue({
required Queue queue,
required String bindingKey,
});
/// Unbinds a queue from the exchange based on its ID and binding key.
///
/// The [queueId] parameter represents the ID of the queue to be unbound.
/// The [bindingKey] parameter represents the binding key for the queue.
void unbindQueue({
required String queueId,
required String bindingKey,
});
/// Forwards a message to queues or bindings based on the routing key.
///
/// The [message] parameter represents the message to be forwarded.
/// The [routingKey] parameter represents the optional routing key to
/// determine the destination queues or bindings.
void forwardMessage({
required Message message,
String? routingKey,
});
/// Removes a queue from all associated bindings.
///
/// The [queueId] parameter represents the ID of the queue to be removed.
void deleteQueue(String queueId);
}

View file

@ -1,70 +0,0 @@
import 'package:angel3_mq/src/binding/binding.dart';
import 'package:angel3_mq/src/exchange/exchange.base.dart';
import 'package:angel3_mq/src/exchange/exchange_interface.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing a fanout message exchange for message routing.
///
/// The `FanoutExchange` class is a specific implementation of the
/// `BaseExchange` abstract base class, representing a fanout exchange.
/// A fanout exchange routes messages to all associated queues without
/// considering routing keys. It provides functionality for binding queues,
/// forwarding messages to all associated queues, and unbinding queues
/// from the fanout exchange.
///
/// Example:
/// ```dart
/// final fanoutExchange = FanoutExchange('my_fanout_exchange');
///
/// // Bind multiple queues to the fanout exchange.
/// final queue1 = Queue('queue_1');
/// final queue2 = Queue('queue_2');
/// fanoutExchange.bindQueue(queue: queue1, bindingKey: 'binding_key_1');
/// fanoutExchange.bindQueue(queue: queue2, bindingKey: 'binding_key_2');
///
/// // Forward a message to all associated queues in the fanout exchange.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// fanoutExchange.forwardMessage(message);
/// ```
final class FanoutExchange extends BaseExchange implements ExchangeInterface {
/// Creates a new instance of the fanout exchange with the specified [id].
///
/// The [id] parameter represents the unique identifier for the fanout
/// exchange.
FanoutExchange(super.id) {
bindings.register('', Binding(''));
}
@override
void bindQueue({
required Queue queue,
required String bindingKey,
}) =>
bindings.get('').addQueue(queue);
@override
void unbindQueue({
required String queueId,
required String bindingKey,
}) =>
bindings.get('').removeQueue(queueId);
@override
void forwardMessage({
required Message message,
String? routingKey,
}) =>
bindings.get('').publishMessage(message);
@override
void deleteQueue(String queueId) {
for (final binding in bindings.getAll()) {
binding.removeQueue(queueId);
}
}
}

View file

@ -1,43 +0,0 @@
/// Represents a base message with headers, payload, and an optional timestamp.
///
/// A [BaseMessage] is a fundamental unit of data used in various messaging
/// systems. It typically contains metadata in the form of headers, the actual
/// payload, and an optional timestamp indicating when the message was created.
///
/// The `headers` property is a map that can contain additional information
/// about the message, such as content type, sender, or any custom metadata.
///
/// The `payload` property stores the main content of the message. It can be
/// of any type, allowing flexibility in the data that can be transmitted.
///
/// The `timestamp` property, if provided, represents the time when the message
/// was created. It is formatted as an ISO 8601 string.
abstract class BaseMessage {
/// Creates a new `BaseMessage` with the specified headers, payload, and
/// timestamp.
///
/// The [headers] parameter is a map that can contain additional information
/// about the message. It is optional and defaults to an empty map if not
/// provided.
///
/// The [payload] parameter represents the main content of the message and is
/// required.
///
/// The [timestamp] parameter is an optional ISO 8601 formatted timestamp
/// indicating when the message was created. If not provided, it will be
/// `null`.
BaseMessage(
Map<String, dynamic>? headers,
this.payload,
this.timestamp,
) : headers = headers ?? {};
/// A map containing headers or metadata associated with the message.
final Map<String, dynamic> headers;
/// The main content of the message.
final Object payload;
/// An optional timestamp indicating when the message was created.
final String? timestamp;
}

View file

@ -1,86 +0,0 @@
import 'package:angel3_mq/src/message/message.base.dart';
import 'package:uuid/uuid.dart';
/// Represents a message with headers, payload, and an optional timestamp.
///
/// A [Message] is a specific type of message that extends the [BaseMessage]
/// class.
///
/// Example:
/// ```dart
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// ```
class Message extends BaseMessage {
/// Creates a new [Message] with the specified headers, payload, timestamp, and id.
///
/// The [headers] parameter is a map that can contain additional information
/// about the message. It is optional and defaults to an empty map if not
/// provided.
///
/// The [payload] parameter represents the main content of the message and is
/// required.
///
/// The [timestamp] parameter is an optional ISO 8601 formatted timestamp
/// indicating when the message was created. If not provided, the current
/// timestamp will be used.
///
/// The [id] parameter is an optional unique identifier for the message.
/// If not provided, a new UUID will be generated.
///
/// Example:
/// ```dart
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// id: '123e4567-e89b-12d3-a456-426614174000',
/// );
/// ```
Message({
required Object payload,
Map<String, dynamic>? headers,
String? timestamp,
String? id,
}) : id = id ?? Uuid().v4(),
super(
headers,
payload,
timestamp ?? DateTime.now().toUtc().toIso8601String(),
);
/// A unique identifier for the message.
final String id;
/// Returns a human-readable string representation of the message.
///
/// Example:
/// ```dart
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
///
/// print(message.toString());
/// // Output:
/// // Message{
/// // headers: {contentType: json, sender: Alice},
/// // payload: {text: Hello, World!},
/// // timestamp: 2023-09-07T12:00:002,
/// // }
/// ```
@override
String toString() {
return '''
Message{
id: $id,
headers: $headers,
payload: $payload,
timestamp: $timestamp,
}''';
}
}

View file

@ -1,14 +0,0 @@
/// An abstract base class representing a message queue client.
///
/// The `BaseMQClient` abstract base class defines the core functionality and
/// contract for implementing message queue clients. It serves as a foundation
/// for creating client implementations that interact with message queues for
/// sending and receiving messages.
///
/// Example:
/// ```dart
/// class MyMQClient extends BaseMQClient {
/// // Custom implementation of the message queue client.
/// }
/// ```
abstract class BaseMQClient {}

View file

@ -1,256 +0,0 @@
import 'package:angel3_mq/src/core/constants/enums.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:angel3_mq/src/exchange/default_exchange.dart';
import 'package:angel3_mq/src/exchange/direct_exchange.dart';
import 'package:angel3_mq/src/exchange/exchange.base.dart';
import 'package:angel3_mq/src/exchange/fanout_exchange.dart';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.base.dart';
import 'package:angel3_mq/src/mq/mq.interface.dart';
import 'package:angel3_mq/src/queue/queue.dart';
/// A class representing a message queue client with various messaging
/// functionalities.
///
/// The `MQClient` class is an implementation of both the `BaseMQClient` class
/// and the `MQClientInterface` interface. It provides features for interacting
/// with message queues, including declaring and managing queues and exchanges,
/// sending and receiving messages, and binding/unbinding queues to/from exchanges.
///
/// Example:
/// ```dart
/// // Initialize the message queue client.
/// MQClient.initialize();
///
/// // Declare a queue and an exchange.
/// final queueId = MQClient.instance.declareQueue();
/// final exchangeName = 'my_direct_exchange';
/// MQClient.instance.declareExchange(
/// exchangeName: exchangeName,
/// exchangeType: ExchangeType.direct,
/// );
///
/// // Bind the queue to the exchange.
/// MQClient.instance.bindQueue(
/// queueId: queueId,
/// exchangeName: exchangeName,
/// );
///
/// // Send a message to the exchange.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// MQClient.instance.sendMessage(
/// exchangeName: exchangeName,
/// message: message,
/// routingKey: queueId,
/// );
///
/// // Fetch messages from the queue.
/// final messageStream = MQClient.instance.fetchQueue(queueId);
/// messageStream.listen((message) {
/// print('Received message: $message');
/// });
/// ```
class MQClient extends BaseMQClient implements MQClientInterface {
/// Private constructor to create the `MQClient` instance.
MQClient._internal() {
_exchanges.register('', DefaultExchange(''));
}
/// Initializes the `MQClient` and creates a singleton instance.
///
/// This method should be called before using the `MQClient`.
factory MQClient.initialize() => _instance ??= MQClient._internal();
/// Singleton instance of the `MQClient`.
static MQClient? _instance;
/// Gets the singleton instance of the `MQClient`.
///
/// Throws a [MQClientNotInitializedException] if the client has not been
/// initialized.
static MQClient get instance =>
_instance ?? (throw MQClientNotInitializedException());
final Registrar<BaseExchange> _exchanges = Registrar<BaseExchange>();
final Registrar<Queue> _queues = Registrar<Queue>();
@override
String declareQueue(String queueId) {
try {
_queues.register(queueId, Queue(queueId));
_exchanges.get('').bindQueue(
queue: _queues.get(queueId),
bindingKey: queueId,
);
return queueId;
} on IdAlreadyRegisteredException catch (_) {
return queueId;
}
}
@override
void deleteQueue(String queueId) {
try {
final queue = _queues.get(queueId);
if (queue.hasListeners()) {
throw QueueHasSubscribersException(queueId);
} else {
_deleteQueue(queueId);
}
} on IdNotRegisteredException catch (_) {
throw QueueNotRegisteredException(queueId);
}
}
void _deleteQueue(String queueId) {
_queues.get(queueId).dispose();
_exchanges.getAll().forEach(
(BaseExchange exchange) => exchange.deleteQueue(queueId),
);
_queues.unregister(queueId);
}
@override
Stream<Message> fetchQueue(String queueId) => _fetchQueue(queueId).dataStream;
Queue _fetchQueue(String queueId) {
try {
return _queues.get(queueId);
} on IdNotRegisteredException catch (_) {
throw QueueNotRegisteredException(queueId);
}
}
@override
List<String> listQueues() => _queues
.getAll()
.map(
(Queue queue) => queue.id,
)
.toList();
void deleteMessage(String queueId, Message message) {
try {
final queue = _fetchQueue(queueId);
queue.removeMessage(message);
} on QueueNotRegisteredException {
// Queue doesn't exist, so we can't delete the message
// We might want to log this or handle it in some way
}
}
@override
void sendMessage({
required Message message,
String? exchangeName,
String? routingKey,
}) {
try {
_exchanges
.get(exchangeName ?? '')
.forwardMessage(routingKey: routingKey, message: message);
} on IdNotRegisteredException catch (_) {
throw ExchangeNotRegisteredException(exchangeName ?? '');
}
}
@override
Message? getLatestMessage(String queueId) =>
_fetchQueue(queueId).latestMessage;
@override
void bindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
}) {
try {
final exchange = _exchanges.get(exchangeName);
switch (exchange) {
case DirectExchange _:
if (bindingKey == null) {
throw BindingKeyRequiredException();
}
exchange.bindQueue(
queue: _fetchQueue(queueId),
bindingKey: bindingKey,
);
case FanoutExchange _:
exchange.bindQueue(
queue: _fetchQueue(queueId),
bindingKey: '',
);
default:
return;
}
} on IdNotRegisteredException catch (_) {
throw ExchangeNotRegisteredException(exchangeName);
}
}
@override
void unbindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
}) {
try {
final exchange = _exchanges.get(exchangeName);
if (exchange.runtimeType == DirectExchange && bindingKey == null) {
throw BindingKeyRequiredException();
}
exchange.unbindQueue(
queueId: queueId,
bindingKey: bindingKey ?? '',
);
} on IdNotRegisteredException catch (_) {
throw ExchangeNotRegisteredException(exchangeName);
}
}
@override
void declareExchange({
required String exchangeName,
required ExchangeType exchangeType,
}) {
try {
switch (exchangeType) {
case ExchangeType.direct:
_exchanges.register(exchangeName, DirectExchange(exchangeName));
case ExchangeType.fanout:
_exchanges.register(exchangeName, FanoutExchange(exchangeName));
case ExchangeType.base:
throw InvalidExchangeTypeException();
}
} on IdAlreadyRegisteredException catch (_) {
return;
}
}
@override
void deleteExchange(String exchangeName) {
try {
_exchanges.unregister(exchangeName);
} catch (_) {
return;
}
}
@override
void close() {
_queues.getAll().forEach(
(Queue queue) => queue.dispose(),
);
_queues.clear();
_exchanges.clear();
_instance = null;
}
}

View file

@ -1,115 +0,0 @@
import 'package:angel3_mq/src/core/constants/enums.dart';
import 'package:angel3_mq/src/message/message.dart';
/// An abstract interface class defining the contract for a message queue
/// client.
///
/// The `MQClientInterface` abstract interface class defines a contract for
/// classes that implement a message queue client. Implementing classes must
/// provide methods for fetching messages from a queue, sending messages to an
/// exchange, declaring queues and exchanges, deleting queues and exchanges,
/// binding and unbinding queues from exchanges, and more.
///
/// Example:
/// ```dart
/// class MyMQClient implements MQClientInterface {
/// // Custom implementation of the message queue client.
/// }
/// ```
abstract interface class MQClientInterface {
/// Declares a queue in the message queue system.
///
/// The [queueId] parameter represents the optional ID for the queue.
///
/// Returns the ID of the declared queue.
String declareQueue(String queueId);
/// Deletes a queue from the message queue system.
///
/// The [queueId] parameter represents the ID of the queue to be deleted.
void deleteQueue(String queueId);
/// Fetches messages from a queue.
///
/// The [queueId] parameter represents the ID of the queue to fetch messages
/// from.
///
/// Returns a stream of messages from the specified queue.
Stream<Message> fetchQueue(String queueId);
/// Retrieves the list of queues.
///
/// Returns a list of queue IDs.
List<String> listQueues();
/// Sends a message to an exchange for routing to queues.
///
/// The [exchangeName] parameter represents the name of the exchange to send
/// the message to.
/// The [message] parameter represents the message to be sent.
/// The [routingKey] parameter represents the optional routing key for message
/// routing within the exchange.
void sendMessage({
required Message message,
String? exchangeName,
String? routingKey,
});
/// Retrieves the latest message from a queue.
///
/// The [queueId] parameter represents the ID of the queue to fetch the latest
/// message from.
///
/// Returns the latest message from the specified queue or `null` if the queue
/// is empty.
Message? getLatestMessage(String queueId);
/// Binds a queue to an exchange for message routing.
///
/// The [queueId] parameter represents the ID of the queue to be bound.
/// The [exchangeName] parameter represents the name of the exchange to bind
/// to.
/// The [bindingKey] parameter represents the optional binding key for routing
/// messages to the queue within the exchange.
void bindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
});
/// Unbinds a queue from an exchange to stop message routing.
///
/// The [queueId] parameter represents the ID of the queue to be unbound.
/// The [exchangeName] parameter represents the name of the exchange to unbind
/// from.
/// The [bindingKey] parameter represents the optional binding key previously
/// used for binding.
void unbindQueue({
required String queueId,
required String exchangeName,
String? bindingKey,
});
/// Declares an exchange in the message queue system.
///
/// The [exchangeName] parameter represents the name of the exchange to be
/// declared.
/// The [exchangeType] parameter represents the type of exchange (e.g.,
/// direct, fanout).
void declareExchange({
required String exchangeName,
required ExchangeType exchangeType,
});
/// Deletes an exchange from the message queue system.
///
/// The [exchangeName] parameter represents the name of the exchange to be
/// deleted.
void deleteExchange(String exchangeName);
/// Closes the connection to the message queue system.
///
/// This method should be called when the message queue client is no longer
/// needed.
void close();
}

View file

@ -1,91 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
import 'package:angel3_mq/src/producer/producer.interface.dart';
/// A mixin implementing the `ProducerInterface` for message production.
///
/// The `Producer` mixin provides a concrete implementation of the
/// `ProducerInterface` for message production. It allows classes to easily send
/// messages to exchanges, send RPC (Remote Procedure Call) messages, and set a
/// callback for handling push notifications.
///
/// Example:
/// ```dart
/// class MyMessageProducer with Producer {
/// // Custom implementation of the message producer.
/// }
/// ```
@Deprecated('Please use `ProducerMixin` instead. '
'This will be removed in v2.0.0')
mixin Producer implements ProducerInterface {
/// A callback function for handling push notifications (received messages).
Function(Message message)? _callback;
@override
void sendMessage({
required Object payload,
String? exchangeName,
Map<String, dynamic>? headers,
String? routingKey,
String? timestamp,
}) {
final newMessage = Message(
payload: payload,
headers: headers,
timestamp: timestamp,
);
MQClient.instance.sendMessage(
exchangeName: exchangeName,
routingKey: routingKey,
message: newMessage,
);
_callback?.call(newMessage);
}
@override
Future<T> sendRPCMessage<T>({
required String processId,
required String exchangeName,
Map<String, dynamic>? args,
String? routingKey,
T Function(Object)? mapper,
String? timestamp,
}) async {
final Completer completer =
mapper == null ? Completer<T>() : Completer<Object>();
final newMessage = Message(
payload: 'RPC',
headers: {
'type': 'RPC',
'processId': processId,
'args': args,
'completer': completer,
},
timestamp: timestamp,
);
MQClient.instance.sendMessage(
exchangeName: exchangeName,
routingKey: routingKey,
message: newMessage,
);
if (mapper == null) {
_callback?.call(newMessage);
final res = await completer.future.then((value) => value);
return res;
} else {
_callback?.call(newMessage);
final rawData = await completer.future.then((value) => value);
final data = mapper(rawData);
return data;
}
}
@override
void setPushCallback(Function(Message message) callback) =>
_callback = callback;
}

View file

@ -1,56 +0,0 @@
import 'package:angel3_mq/src/message/message.dart';
/// An abstract interface class defining the contract for a message producer.
///
/// The `ProducerInterface` abstract interface class defines a contract for
/// classes that implement a message producer. Implementing classes must provide
/// methods for sending messages to exchanges, sending RPC (Remote Procedure
/// Call) messages, and setting a callback for push notifications.
///
/// Example:
/// ```dart
/// class MyProducer implements ProducerInterface {
/// // Custom implementation of the message producer.
/// }
/// ```
abstract interface class ProducerInterface {
/// Sends a message to an exchange.
///
/// The [payload] parameter represents the message payload to send.
/// The [exchangeName] parameter is the name of the exchange to send the
/// message to.
/// The [headers] parameter is an optional map of headers for the message.
/// The [routingKey] parameter is an optional routing key for the message.
void sendMessage({
required Object payload,
required String exchangeName,
Map<String, dynamic>? headers,
String? routingKey,
});
/// Sends an RPC (Remote Procedure Call) message and awaits a response.
///
/// The [processId] parameter is a unique identifier for the RPC request.
/// The [args] parameter is an optional map of arguments for the RPC request.
/// The [exchangeName] parameter is the name of the exchange for RPC
/// communication.
/// The [routingKey] parameter is an optional routing key for the RPC message.
/// The [mapper] parameter is an optional function to map the response
/// payload.
///
/// Returns a future that completes with the response payload.
Future<T> sendRPCMessage<T>({
required String processId,
required String exchangeName,
Map<String, dynamic>? args,
String? routingKey,
T Function(Object)? mapper,
});
/// Sets a callback function to be called after every 'sendMessage` or
/// `sendRPCMessage`.
///
/// The [callback] parameter is a function that will be invoked when a push
/// notification (message) is received.
void setPushCallback(Function(Message message) callback);
}

View file

@ -1,90 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/src/message/message.dart';
import 'package:angel3_mq/src/mq/mq.dart';
import 'package:angel3_mq/src/producer/producer.interface.dart';
/// A mixin implementing the `ProducerInterface` for message production.
///
/// The `ProducerMixin` mixin provides a concrete implementation of the
/// `ProducerInterface` for message production. It allows classes to easily send
/// messages to exchanges, send RPC (Remote Procedure Call) messages, and set a
/// callback for handling push notifications.
///
/// Example:
/// ```dart
/// class MyMessageProducer with ProducerMixin {
/// // Custom implementation of the message producer.
/// }
/// ```
mixin ProducerMixin implements ProducerInterface {
/// A callback function for handling push notifications (received messages).
Function(Message message)? _callback;
@override
void sendMessage({
required Object payload,
String? exchangeName,
Map<String, dynamic>? headers,
String? routingKey,
String? timestamp,
}) {
final newMessage = Message(
payload: payload,
headers: headers,
timestamp: timestamp,
);
MQClient.instance.sendMessage(
exchangeName: exchangeName,
routingKey: routingKey,
message: newMessage,
);
_callback?.call(newMessage);
}
@override
Future<T> sendRPCMessage<T>({
required String processId,
required String exchangeName,
Map<String, dynamic>? args,
String? routingKey,
T Function(Object)? mapper,
String? timestamp,
}) async {
final Completer completer =
mapper == null ? Completer<T>() : Completer<Object>();
final newMessage = Message(
payload: 'RPC',
headers: {
'type': 'RPC',
'processId': processId,
'args': args,
'completer': completer,
},
timestamp: timestamp,
);
MQClient.instance.sendMessage(
exchangeName: exchangeName,
routingKey: routingKey,
message: newMessage,
);
if (mapper == null) {
_callback?.call(newMessage);
final res = await completer.future.then((value) => value);
return res;
} else {
_callback?.call(newMessage);
final rawData = await completer.future.then((value) => value);
final data = mapper(rawData);
return data;
}
}
@override
void setPushCallback(Function(Message message) callback) =>
_callback = callback;
}

View file

@ -1,54 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/src/message/message.dart';
/// An abstract base class for data streams that produce [Message] objects.
///
/// The `BaseDataStream` class provides the foundation for creating data
/// streams that emit [Message] objects to their listeners. It includes a
/// [StreamController] to manage the stream of messages and methods to enqueue
/// messages and dispose of the stream when it's no longer needed.
///
/// Example:
/// ```dart
/// class MyDataStream extends BaseDataStream {
/// // Custom methods and logic specific to your data stream can be added here.
/// }
/// ```
abstract class BaseDataStream {
/// A [StreamController] for broadcasting [Message] objects to listeners.
final StreamController<Message> _data = StreamController.broadcast();
/// Returns a [Stream] of [Message] objects from this data stream.
Stream<Message> get dataStream => _data.stream;
/// The latest [Message] enqueued in the data stream.
///
/// This property keeps track of the most recently enqueued message.
Message? _latestMessage;
/// Exposes the [_latestMessage] property.
///
/// This getter returns the most recently enqueued message.
Message? get latestMessage => _latestMessage;
/// Enqueues a [Message] to be emitted by the data stream.
///
/// The [message] parameter represents the [Message] to enqueue, and it
/// becomes the latest message in the stream.
void enqueue(Message message) {
_latestMessage = message;
_data.add(message);
}
/// Closes the data stream, freeing up resources.
///
/// This method should be called when the data stream is no longer needed
/// to prevent resource leaks.
void dispose() => _data.close();
/// Checks if there are any active listeners on the data stream.
///
/// Returns `true` if there are active listeners, and `false` otherwise.
bool hasListeners() => _data.hasListener;
}

View file

@ -1,60 +0,0 @@
import 'dart:async';
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/queue/data_stream.base.dart';
import 'package:equatable/equatable.dart';
/// A class representing a queue for message streaming.
///
/// The `Queue` class extends the [BaseDataStream] class and adds an
/// identifier, making it suitable for managing and streaming messages in a
/// queue-like fashion.
///
/// Example:
/// ```dart
/// final myQueue = Queue('my_queue_id');
///
/// // Enqueue a message to the queue.
/// final message = Message(
/// headers: {'contentType': 'json', 'sender': 'Alice'},
/// payload: {'text': 'Hello, World!'},
/// timestamp: '2023-09-07T12:00:002',
/// );
/// myQueue.enqueue(message);
///
/// // Check if the queue has active listeners.
/// final hasListeners = myQueue.hasListeners();
/// ```
class Queue extends BaseDataStream with EquatableMixin {
Queue(this.id);
final String id;
final StreamController<Message> _controller =
StreamController<Message>.broadcast();
Message? _latestMessage;
void addMessage(Message message) {
_latestMessage = message;
_controller.add(message);
}
Stream<Message> get dataStream => _controller.stream;
Message? get latestMessage => _latestMessage;
bool hasListeners() => _controller.hasListener;
void dispose() {
_controller.close();
}
// New method to remove a message
void removeMessage(Message message) {
if (_latestMessage == message) {
_latestMessage = null;
}
// Note: We can't remove past messages from the stream,
// but we can prevent this message from being processed again in the future.
}
List<Object?> get props => [id];
}

View file

@ -1,18 +0,0 @@
name: angel3_mq
description: DartMQ is a message-queue system that facilitates communication between different components in the application.
repository: https://github.com/N-Razzouk/dart_mq
issue_tracker: https://github.com/N-Razzouk/dart_mq/issues
homepage: https://github.com/N-Razzouk/dart_mq
documentation: https://github.com/N-Razzouk/dart_mq
version: 1.1.0
environment:
sdk: ">=3.0.0 <4.0.0"
dependencies:
equatable: ^2.0.5
uuid: ^4.5.1
dev_dependencies:
lints: ^3.0.0
test: ^1.21.0

View file

@ -1,97 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/binding/binding.dart';
import 'package:angel3_mq/src/core/exceptions/queue_exceptions.dart';
import 'package:angel3_mq/src/queue/queue.dart';
import 'package:test/test.dart';
void main() {
late Binding binding;
late Queue queue1;
late Queue queue2;
setUp(() {
binding = Binding('my_binding');
queue1 = Queue('queue_1');
queue2 = Queue('queue_2');
});
test('addQueue adds a queue to the binding', () {
binding.addQueue(queue1);
expect(binding.hasQueues(), isTrue);
});
test('removeQueue removes a queue from the binding', () {
binding.addQueue(queue1);
expect(binding.hasQueues(), isTrue);
binding.removeQueue('queue_1');
expect(binding.hasQueues(), isFalse);
});
test(
'removeQueue throws QueueHasSubscribersException if queue has '
'subscribers', () {
final sub = queue1.dataStream.listen((_) {});
binding.addQueue(queue1);
expect(
() => binding.removeQueue('queue_1'),
throwsA(isA<QueueHasSubscribersException>()),
);
sub.cancel();
});
test('publishMessage publishes a message to all associated queues', () {
binding
..addQueue(queue1)
..addQueue(queue2);
final message = Message(
headers: {'contentType': 'json', 'sender': 'Alice'},
payload: {'text': 'Hello, World!'},
timestamp: '2023-09-07T12:00:002',
);
binding.publishMessage(message);
expect(queue1.latestMessage, equals(message));
expect(queue2.latestMessage, equals(message));
});
test('hasQueues returns true if the binding has associated queues', () {
expect(binding.hasQueues(), isFalse);
binding.addQueue(queue1);
expect(binding.hasQueues(), isTrue);
});
test('clear clears all queues from the binding', () {
binding
..addQueue(queue1)
..addQueue(queue2);
expect(binding.hasQueues(), isTrue);
binding.clear();
expect(binding.hasQueues(), isFalse);
});
test('clear throws QueueHasSubscribersException if a queue has subscribers',
() {
final sub = queue1.dataStream.listen((_) {});
binding
..addQueue(queue1)
..addQueue(queue2);
expect(binding.hasQueues(), isTrue);
expect(() => binding.clear(), throwsA(isA<QueueHasSubscribersException>()));
expect(binding.hasQueues(), isTrue);
sub.cancel();
});
}

View file

@ -1,333 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
class MyMessageConsumer with ConsumerMixin {
// Custom implementation of the message consumer.
}
void main() {
group('Consumer', () {
final consumer = MyMessageConsumer();
setUpAll(() {
MQClient.initialize();
MQClient.instance.declareQueue('test-queue');
});
test('subscribe should register a subscription and receive messages',
() async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final callbackMessages = <Message>[];
consumer.subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Ensure that the callback was called with the expected messages
expect(callbackMessages, contains(message1));
expect(callbackMessages, contains(message2));
});
test('unsubscribe should cancel a subscription', () async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final callbackMessages = <Message>[];
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Unsubscribe and ensure that the callback is not called
consumer.unsubscribe(queueId: queueId);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
expect(callbackMessages, contains(message1));
expect(callbackMessages.length, equals(1));
});
test('pauseSubscription should pause a subscription', () async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final callbackMessages = <Message>[];
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Pause the subscription and ensure that the callback is not called
consumer.pauseSubscription(queueId);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
expect(callbackMessages, contains(message1));
expect(callbackMessages.length, equals(1));
});
test('resumeSubscription should resume a paused subscription', () async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final callbackMessages = <Message>[];
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish a message to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
// Pause and then resume the subscription and ensure that the callback is
// called.
consumer
..pauseSubscription(queueId)
..resumeSubscription(queueId);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
expect(callbackMessages, contains(message1));
expect(callbackMessages, contains(message2));
expect(callbackMessages.length, equals(2));
});
test(
'updateSubscription should update a subscription with a new callback '
'and filter', () async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final message3 = Message(payload: 'Message 3');
final callbackMessages = <Message>[];
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Update the subscription with a new callback and filter
consumer.updateSubscription(
queueId: queueId,
callback: (message) {
if (message.payload == 'Message 2') {
callbackMessages.add(message);
}
},
filter: (payload) => payload == 'Message 2',
);
// Publish another message to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message3,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Ensure that the callback is only called with 'Message 2'
expect(callbackMessages, contains(message1));
expect(callbackMessages, contains(message2));
expect(callbackMessages.contains(message3), isFalse);
expect(callbackMessages.length, equals(3));
});
test('clearSubscriptions should clear all subscriptions', () async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final message3 = Message(payload: 'Message 3');
final callbackMessages = <Message>[];
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (message) {
callbackMessages.add(message);
},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Update the subscription with a new callback and filter
consumer.clearSubscriptions();
// Publish another message to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message3,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Ensure that the callback is only called on the first two messages.
expect(callbackMessages, contains(message1));
expect(callbackMessages, contains(message2));
expect(callbackMessages.contains(message3), isFalse);
expect(callbackMessages.length, equals(2));
});
test('getLatestMessage should return the latest message from a queue',
() async {
const queueId = 'test-queue';
final message1 = Message(payload: 'Message 1');
final message2 = Message(payload: 'Message 2');
final message3 = Message(payload: 'Message 3');
consumer
..clearSubscriptions()
..subscribe(
queueId: queueId,
callback: (_) {},
);
// Publish messages to the queue
MQClient.instance.sendMessage(
exchangeName: '',
message: message1,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message2,
routingKey: queueId,
);
MQClient.instance.sendMessage(
exchangeName: '',
message: message3,
routingKey: queueId,
);
await Future.delayed(Duration.zero);
// Get the latest message
final latestMessage = consumer.getLatestMessage(queueId);
// Ensure that the latest message is 'Message 3'
expect(latestMessage, equals(message3));
});
test(
'subscribing to a queue that has already been subscribed to throws an '
'error.', () {
const queueId = 'test-queue';
consumer
..clearSubscriptions()
..subscribe(queueId: queueId, callback: (_) {});
expect(
() => consumer.subscribe(queueId: queueId, callback: (_) {}),
throwsA(isA<ConsumerAlreadySubscribedException>()),
);
});
});
}

View file

@ -1,24 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('BindingException', () {
test('BindingKeyNotFoundException', () {
final exception = BindingKeyNotFoundException('test-key');
expect(exception.toString(), contains('BindingKeyNotFoundException'));
expect(
exception.toString(),
contains(
'BindingKeyNotFoundException:'
' The binding key "test-key" was not found.',
),
);
});
test('BindingKeyRequiredException', () {
final exception = BindingKeyRequiredException();
expect(exception.toString(), contains('BindingKeyRequiredException'));
expect(exception.toString(), contains('Binding key is required'));
});
});
}

View file

@ -1,60 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('ConsumerException', () {
test('ConsumerNotRegisteredException', () {
final exception = ConsumerNotRegisteredException('Alice');
expect(exception.toString(), contains('ConsumerNotRegisteredException'));
expect(
exception.toString(),
contains('ConsumerNotRegisteredException: The consumer "Alice" is not '
'registered.'),
);
});
test('ConsumerAlreadySubscribedException', () {
final exception = ConsumerAlreadySubscribedException(
consumer: 'NewsConsumer',
queue: 'NewsQueue',
);
expect(
exception.toString(),
contains('ConsumerAlreadySubscribedException'),
);
expect(
exception.toString(),
contains(
'ConsumerAlreadySubscribedException: The consumer "NewsConsumer" '
'is already subscribed to the queue "NewsQueue".'),
);
});
test('ConsumerNotSubscribedException', () {
final exception = ConsumerNotSubscribedException(
consumer: 'WeatherConsumer',
queue: 'WeatherQueue',
);
expect(exception.toString(), contains('ConsumerNotSubscribedException'));
expect(
exception.toString(),
contains(
'ConsumerNotSubscribedException: The consumer "WeatherConsumer" '
'is not subscribed to the queue "WeatherQueue".'),
);
});
test('ConsumerHasSubscriptionsException', () {
final exception = ConsumerHasSubscriptionsException('Bob');
expect(
exception.toString(),
contains('ConsumerHasSubscriptionsException'),
);
expect(
exception.toString(),
contains('ConsumerHasSubscriptionsException: The consumer "Bob" has '
'active subscriptions.'),
);
});
});
}

View file

@ -1,21 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('ExchangeException', () {
test('ExchangeNotRegisteredException', () {
final exception = ExchangeNotRegisteredException('NewsExchange');
expect(exception.toString(), contains('ExchangeNotRegisteredException'));
expect(
exception.toString(),
contains('Exchange: NewsExchange is not registered'),
);
});
test('InvalidExchangeTypeException', () {
final exception = InvalidExchangeTypeException();
expect(exception.toString(), contains('InvalidExchangeTypeException'));
expect(exception.toString(), contains('Exchange type is invalid.'));
});
});
}

View file

@ -1,17 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('MQClientException', () {
test('MQClientNotInitializedException', () {
final exception = MQClientNotInitializedException();
expect(exception.toString(), contains('MQClientNotInitializedException'));
expect(
exception.toString(),
contains('MQClientNotInitializedException: MQClient is not '
'initialized. Please make sure to call MQClient.initialize() '
'first.'),
);
});
});
}

View file

@ -1,30 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('QueueException', () {
test('QueueNotRegisteredException', () {
final exception = QueueNotRegisteredException('my_queue_id');
expect(exception.toString(), contains('QueueNotRegisteredException'));
expect(
exception.toString(),
contains('Queue: my_queue_id is not registered'),
);
});
test('QueueHasSubscribersException', () {
final exception = QueueHasSubscribersException('my_queue_id');
expect(exception.toString(), contains('QueueHasSubscribersException'));
expect(
exception.toString(),
contains('Queue: my_queue_id has subscribers'),
);
});
test('QueueIdNullException', () {
final exception = QueueIdNullException();
expect(exception.toString(), contains('QueueIdNullException'));
expect(exception.toString(), contains("Queue name can't be null"));
});
});
}

View file

@ -1,25 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('RegistrarException', () {
test('IdAlreadyRegisteredException', () {
final exception = IdAlreadyRegisteredException('my_id');
expect(exception.toString(), contains('IdAlreadyRegisteredException'));
expect(
exception.toString(),
contains('IdAlreadyRegisteredException: Id '
'"my_id" already registered'),
);
});
test('IdNotRegisteredException', () {
final exception = IdNotRegisteredException('my_id');
expect(exception.toString(), contains('IdNotRegisteredException'));
expect(
exception.toString(),
contains('IdNotRegisteredException: Id "my_id" not registered.'),
);
});
});
}

View file

@ -1,12 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:test/test.dart';
void main() {
group('RoutingKeyException', () {
test('RoutingKeyRequiredException', () {
final exception = RoutingKeyRequiredException();
expect(exception.toString(), contains('RoutingKeyRequiredException'));
expect(exception.toString(), contains('Routing key is required'));
});
});
}

View file

@ -1,105 +0,0 @@
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/core/registrar/simple_registrar.dart';
import 'package:test/test.dart';
void main() {
late Registrar<String> registrar;
setUp(() {
registrar = Registrar<String>();
});
test('register and get objects', () {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob');
expect(registrar.get('user_1'), equals('Alice'));
expect(registrar.get('user_2'), equals('Bob'));
});
test('register throws IdAlreadyRegisteredException for duplicate IDs', () {
registrar.register('user_1', 'Alice');
expect(
() => registrar.register('user_1', 'Another Alice'),
throwsA(const TypeMatcher<IdAlreadyRegisteredException>()),
);
});
test('get throws IdNotRegisteredException for unknown IDs', () {
expect(
() => registrar.get('unknown_id'),
throwsA(const TypeMatcher<IdNotRegisteredException>()),
);
});
test('getAll returns a list of all registered objects', () {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob');
final allObjects = registrar.getAll();
expect(allObjects, contains('Alice'));
expect(allObjects, contains('Bob'));
});
test('unregister removes objects', () {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob')
..unregister('user_1');
expect(
() => registrar.get('user_1'),
throwsA(const TypeMatcher<IdNotRegisteredException>()),
);
expect(registrar.get('user_2'), equals('Bob'));
});
test('unregister throws IdNotRegisteredException for unknown IDs', () {
expect(
() => registrar.unregister('unknown_id'),
throwsA(const TypeMatcher<IdNotRegisteredException>()),
);
});
test('clear removes all registered objects', () {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob')
..clear();
expect(registrar.count, equals(0));
});
test('has checks if an object is registered', () {
registrar.register('user_1', 'Alice');
expect(registrar.has('user_1'), isTrue);
expect(registrar.has('user_2'), isFalse);
});
test('count returns the number of registered objects', () {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob');
expect(registrar.count, equals(2));
});
test('toString returns a formatted string representation of the registrar',
() {
registrar
..register('user_1', 'Alice')
..register('user_2', 'Bob');
const expectedString = '''
Registrar(
\tuser_1: Alice,
\tuser_2: Bob
)''';
expect(registrar.toString(), equals(expectedString));
});
}

View file

@ -1,79 +0,0 @@
import 'package:angel3_mq/mq.dart';
import 'package:angel3_mq/src/core/exceptions/exceptions.dart';
import 'package:angel3_mq/src/exchange/default_exchange.dart';
import 'package:angel3_mq/src/queue/queue.dart';
import 'package:test/test.dart';
void main() {
late DefaultExchange defaultExchange;
late Queue queue;
late Message message;
setUp(() {
defaultExchange = DefaultExchange('default_exchange');
queue = Queue('my_queue');
message = Message(
headers: {'contentType': 'json', 'sender': 'Alice'},
payload: {'text': 'Hello, World!'},
timestamp: '2023-09-07T12:00:002',
);
});
test('bindQueue binds a queue to the default exchange with a binding key',
() {
defaultExchange.bindQueue(queue: queue, bindingKey: 'my_routing_key');
expect(defaultExchange.bindings.has('my_routing_key'), isTrue);
});
test(
'unbindQueue throws an exception when attempting to unbind from the '
'default exchange', () {
expect(
() => defaultExchange.unbindQueue(
queueId: 'my_queue_id',
bindingKey: 'my_routing_key',
),
throwsA(isA<BindingKeyNotFoundException>()),
);
});
test('unbindQueue unbinds a queue from the default exchange', () {
defaultExchange
..bindQueue(queue: queue, bindingKey: 'my_routing_key')
..unbindQueue(
queueId: queue.id,
bindingKey: 'my_routing_key',
);
expect(defaultExchange.bindings.has('my_routing_key'), isFalse);
});
test(
'forwardMessage forwards a message to the default exchange using a '
'routing key', () {
defaultExchange
..bindQueue(queue: queue, bindingKey: 'my_routing_key')
..forwardMessage(message: message, routingKey: 'my_routing_key');
expect(queue.latestMessage, equals(message));
});
test(
'forwardMessage throws BindingKeyNotFoundException when routing key is '
'not found', () {
expect(
() => defaultExchange.forwardMessage(
message: message,
routingKey: 'non_existent_routing_key',
),
throwsA(isA<BindingKeyNotFoundException>()),
);
});
test(
'forwardMessage throws RoutingKeyRequiredException when routing key is '
'null', () {
expect(
() => defaultExchange.forwardMessage(message: message),
throwsA(isA<RoutingKeyRequiredException>()),
);
});
}

Some files were not shown because too many files have changed in this diff Show more