diff --git a/core/queue/lib/queue.dart b/core/queue/lib/queue.dart new file mode 100644 index 00000000..e69de29b diff --git a/core/queue/lib/src/job_queued_event.dart b/core/queue/lib/src/job_queued_event.dart new file mode 100644 index 00000000..5a55256b --- /dev/null +++ b/core/queue/lib/src/job_queued_event.dart @@ -0,0 +1,32 @@ +import 'package:angel3_event_bus/event_bus.dart'; +import 'package:equatable/equatable.dart'; + +class JobQueuedEvent extends AppEvent { + final String connectionName; + final String? queue; + final dynamic jobId; + final dynamic job; + final String payload; + final Duration? delay; + + JobQueuedEvent(this.connectionName, this.queue, this.jobId, this.job, + this.payload, this.delay); + + @override + List get props => + [connectionName, queue, jobId, job, payload, delay]; + + @override + Map toJson() { + return { + 'connectionName': connectionName, + 'queue': queue, + 'jobId': jobId, + 'job': job.toString(), // or a more appropriate serialization of the job + 'payload': payload, + 'delay': delay?.inMilliseconds, + }; + } + + String get name => 'job.queued'; +} diff --git a/core/queue/lib/src/job_queueing_event.dart b/core/queue/lib/src/job_queueing_event.dart new file mode 100644 index 00000000..44e2f626 --- /dev/null +++ b/core/queue/lib/src/job_queueing_event.dart @@ -0,0 +1,29 @@ +import 'package:angel3_event_bus/event_bus.dart'; +import 'package:equatable/equatable.dart'; + +class JobQueueingEvent extends AppEvent { + final String connectionName; + final String? queue; + final dynamic job; + final String payload; + final Duration? delay; + + JobQueueingEvent( + this.connectionName, this.queue, this.job, this.payload, this.delay); + + @override + List get props => [connectionName, queue, job, payload, delay]; + + @override + Map toJson() { + return { + 'connectionName': connectionName, + 'queue': queue, + 'job': job.toString(), // or a more appropriate serialization of the job + 'payload': payload, + 'delay': delay?.inMilliseconds, + }; + } + + String get name => 'job.queueing'; +} diff --git a/core/queue/lib/src/queue.dart b/core/queue/lib/src/queue.dart new file mode 100644 index 00000000..987ac169 --- /dev/null +++ b/core/queue/lib/src/queue.dart @@ -0,0 +1,322 @@ +// lib/src/queue.dart + +import 'dart:async'; +import 'dart:convert'; + +import 'package:angel3_container/angel3_container.dart'; +import 'package:angel3_event_bus/event_bus.dart'; +import 'package:angel3_mq/mq.dart'; +import 'package:angel3_reactivex/angel3_reactivex.dart'; +import 'package:crypto/crypto.dart'; +import 'package:uuid/uuid.dart'; + +import 'job_queueing_event.dart'; +import 'job_queued_event.dart'; +import 'should_be_encrypted.dart'; +import 'should_queue_after_commit.dart'; + +abstract class Queue with InteractsWithTime { + /// The IoC container instance. + final Container container; + + /// The connection name for the queue. + String _connectionName; + + /// Indicates that jobs should be dispatched after all database transactions have committed. + bool dispatchAfterCommit; + + /// The create payload callbacks. + static final List _createPayloadCallbacks = []; + + Queue(this.container, + {String connectionName = 'default', this.dispatchAfterCommit = false}) + : _connectionName = connectionName; + + Future pushOn(String queue, dynamic job, [dynamic data = '']) { + return push(job, data, queue); + } + + Future laterOn(String queue, Duration delay, dynamic job, + [dynamic data = '']) { + return later(delay, job, data, queue); + } + + Future bulk(List jobs, + [dynamic data = '', String? queue]) async { + for (var job in jobs) { + await push(job, data, queue); + } + } + + // Add this method + void setContainer(Container container) { + // This method might not be necessary in Dart, as we're using final for container + // But we can implement it for API compatibility + throw UnsupportedError( + 'Container is final and cannot be changed after initialization'); + } + + // Update createPayload method to include exception handling + Future createPayload(dynamic job, String queue, + [dynamic data = '']) async { + if (job is Function) { + // TODO: Implement CallQueuedClosure equivalent + throw UnimplementedError('Closure jobs are not yet supported'); + } + + try { + final payload = jsonEncode(await createPayloadMap(job, queue, data)); + return payload; + } catch (e) { + throw InvalidPayloadException('Unable to JSON encode payload: $e'); + } + } + + Future> createPayloadMap(dynamic job, String queue, + [dynamic data = '']) async { + if (job is Object) { + return createObjectPayload(job, queue); + } else { + return createStringPayload(job.toString(), queue, data); + } + } + + Future> createObjectPayload( + Object job, String queue) async { + final payload = await withCreatePayloadHooks(queue, { + 'uuid': const Uuid().v4(), + 'displayName': getDisplayName(job), + 'job': 'CallQueuedHandler@call', // TODO: Implement CallQueuedHandler + 'maxTries': getJobTries(job), + 'maxExceptions': job is HasMaxExceptions ? job.maxExceptions : null, + 'failOnTimeout': job is HasFailOnTimeout ? job.failOnTimeout : false, + 'backoff': getJobBackoff(job), + 'timeout': job is HasTimeout ? job.timeout : null, + 'retryUntil': getJobExpiration(job), + 'data': { + 'commandName': job.runtimeType.toString(), + 'command': job, + }, + }); + + final command = jobShouldBeEncrypted(job) && container.has() + ? container.make().encrypt(jsonEncode(job)) + : jsonEncode(job); + + payload['data'] = { + ...payload['data'] as Map, + 'commandName': job.runtimeType.toString(), + 'command': command, + }; + + return payload; + } + + String getDisplayName(Object job) { + if (job is HasDisplayName) { + return job.displayName(); + } + return job.runtimeType.toString(); + } + + int? getJobTries(dynamic job) { + if (job is HasTries) { + return job.tries; + } + return null; + } + + String? getJobBackoff(dynamic job) { + if (job is HasBackoff) { + final backoff = job.backoff; + if (backoff == null) return null; + if (backoff is Duration) { + return backoff.inSeconds.toString(); + } + if (backoff is List) { + return backoff.map((d) => d.inSeconds).join(','); + } + } + return null; + } + + int? getJobExpiration(dynamic job) { + if (job is HasRetryUntil) { + final retryUntil = job.retryUntil; + if (retryUntil == null) return null; + return retryUntil.millisecondsSinceEpoch ~/ 1000; + } + return null; + } + + bool jobShouldBeEncrypted(Object job) { + return job is ShouldBeEncrypted || + (job is HasShouldBeEncrypted && job.shouldBeEncrypted); + } + + Future> createStringPayload( + String job, String queue, dynamic data) async { + return withCreatePayloadHooks(queue, { + 'uuid': const Uuid().v4(), + 'displayName': job.split('@')[0], + 'job': job, + 'maxTries': null, + 'maxExceptions': null, + 'failOnTimeout': false, + 'backoff': null, + 'timeout': null, + 'data': data, + }); + } + + static void createPayloadUsing(Function? callback) { + if (callback == null) { + _createPayloadCallbacks.clear(); + } else { + _createPayloadCallbacks.add(callback); + } + } + + Future> withCreatePayloadHooks( + String queue, Map payload) async { + if (_createPayloadCallbacks.isNotEmpty) { + for (var callback in _createPayloadCallbacks) { + final result = await callback(_connectionName, queue, payload); + if (result is Map) { + payload = {...payload, ...result}; + } + } + } + return payload; + } + + Future enqueueUsing( + dynamic job, + String payload, + String? queue, + Duration? delay, + Future Function(String, String?, Duration?) callback, + ) async { + if (shouldDispatchAfterCommit(job) && container.has()) { + return container.make().addCallback(() async { + await raiseJobQueueingEvent(queue, job, payload, delay); + final jobId = await callback(payload, queue, delay); + await raiseJobQueuedEvent(queue, jobId, job, payload, delay); + return jobId; + }); + } + + await raiseJobQueueingEvent(queue, job, payload, delay); + final jobId = await callback(payload, queue, delay); + await raiseJobQueuedEvent(queue, jobId, job, payload, delay); + return jobId; + } + + bool shouldDispatchAfterCommit(dynamic job) { + if (job is ShouldQueueAfterCommit) { + return true; + } + if (job is HasAfterCommit) { + return job.afterCommit; + } + return dispatchAfterCommit; + } + + Future raiseJobQueueingEvent( + String? queue, dynamic job, String payload, Duration? delay) async { + if (container.has()) { + final eventBus = container.make(); + eventBus + .fire(JobQueueingEvent(_connectionName, queue, job, payload, delay)); + } + } + + Future raiseJobQueuedEvent(String? queue, dynamic jobId, dynamic job, + String payload, Duration? delay) async { + if (container.has()) { + final eventBus = container.make(); + eventBus.fire( + JobQueuedEvent(_connectionName, queue, jobId, job, payload, delay)); + } + } + + String get connectionName => _connectionName; + + set connectionName(String name) { + _connectionName = name; + } + + Container getContainer() => container; + + // Abstract methods to be implemented by subclasses + Future push(dynamic job, [dynamic data = '', String? queue]); + Future later(Duration delay, dynamic job, + [dynamic data = '', String? queue]); +} + +// Additional interfaces and classes + +abstract class HasMaxExceptions { + int? get maxExceptions; +} + +abstract class HasFailOnTimeout { + bool get failOnTimeout; +} + +abstract class HasTimeout { + Duration? get timeout; +} + +abstract class HasDisplayName { + String displayName(); +} + +abstract class HasTries { + int? get tries; +} + +abstract class HasBackoff { + dynamic get backoff; +} + +abstract class HasRetryUntil { + DateTime? get retryUntil; +} + +abstract class HasAfterCommit { + bool get afterCommit; +} + +abstract class HasShouldBeEncrypted { + bool get shouldBeEncrypted; +} + +abstract class Encrypter { + String encrypt(String data); +} + +abstract class TransactionManager { + Future addCallback(Future Function() callback); +} + +// Add this mixin to the Queue class +mixin InteractsWithTime { + int secondsUntil(DateTime dateTime) { + return dateTime.difference(DateTime.now()).inSeconds; + } + + int availableAt(Duration delay) { + return DateTime.now().add(delay).millisecondsSinceEpoch ~/ 1000; + } +} + +// First, define the InvalidPayloadException class +class InvalidPayloadException implements Exception { + final String message; + + InvalidPayloadException(this.message); + + @override + String toString() => 'InvalidPayloadException: $message'; +} diff --git a/core/queue/lib/src/should_be_encrypted.dart b/core/queue/lib/src/should_be_encrypted.dart new file mode 100644 index 00000000..5db3ae8d --- /dev/null +++ b/core/queue/lib/src/should_be_encrypted.dart @@ -0,0 +1 @@ +abstract class ShouldBeEncrypted {} diff --git a/core/queue/lib/src/should_queue_after_commit.dart b/core/queue/lib/src/should_queue_after_commit.dart new file mode 100644 index 00000000..d00e3190 --- /dev/null +++ b/core/queue/lib/src/should_queue_after_commit.dart @@ -0,0 +1 @@ +abstract class ShouldQueueAfterCommit {} diff --git a/core/queue/pubspec.yaml b/core/queue/pubspec.yaml index dd22ec8a..398b0f8f 100644 --- a/core/queue/pubspec.yaml +++ b/core/queue/pubspec.yaml @@ -10,8 +10,13 @@ environment: # Add regular dependencies here. dependencies: - # path: ^1.8.0 - + angel3_container: ^8.0.0 + angel3_mq: ^8.0.0 + angel3_event_bus: ^8.0.0 + angel3_reactivex: ^8.0.0 + uuid: ^4.5.1 + crypto: ^3.0.5 + dev_dependencies: lints: ^3.0.0 test: ^1.24.0 diff --git a/sandbox/eventbus/lib/res/event_bus.dart b/sandbox/eventbus/lib/res/event_bus.dart index 178a5518..d23bb2ca 100644 --- a/sandbox/eventbus/lib/res/event_bus.dart +++ b/sandbox/eventbus/lib/res/event_bus.dart @@ -1,5 +1,5 @@ -import 'package:logger/logger.dart'; import 'package:angel3_reactivex/subjects.dart'; +import 'package:logger/logger.dart'; import 'app_event.dart'; import 'history_entry.dart';