platform/core/pipeline/lib/src/pipeline.dart
2024-10-03 07:58:46 -07:00

223 lines
6 KiB
Dart

import 'dart:async';
import 'package:angel3_container/angel3_container.dart';
import 'package:logging/logging.dart';
/// Represents a series of "pipes" through which an object can be passed.
abstract class PipelineContract {
PipelineContract send(dynamic passable);
PipelineContract through(dynamic pipes);
PipelineContract pipe(dynamic pipes);
PipelineContract via(String method);
Future<dynamic> then(dynamic Function(dynamic) destination);
Future<dynamic> thenReturn();
}
/// Provides conditional execution methods for the pipeline.
mixin Conditionable<T> {
T when(bool Function() callback, void Function(T) callback2) {
if (callback()) {
callback2(this as T);
}
return this as T;
}
T unless(bool Function() callback, void Function(T) callback2) {
if (!callback()) {
callback2(this as T);
}
return this as T;
}
}
/// The primary class for building and executing pipelines.
class Pipeline with Conditionable<Pipeline> implements PipelineContract {
/// The container implementation.
Container? _container;
/// The object being passed through the pipeline.
dynamic _passable;
/// The array of class pipes.
final List<dynamic> _pipes = [];
/// The method to call on each pipe.
String _method = 'handle';
/// Logger for the pipeline.
final Logger _logger = Logger('Pipeline');
/// Create a new class instance.
Pipeline([this._container]);
/// Set the object being sent through the pipeline.
@override
Pipeline send(dynamic passable) {
_passable = passable;
return this;
}
/// Set the array of pipes.
@override
Pipeline through(dynamic pipes) {
_pipes.clear();
_pipes.addAll(pipes is Iterable ? pipes.toList() : [pipes]);
return this;
}
/// Push additional pipes onto the pipeline.
@override
Pipeline pipe(dynamic pipes) {
_pipes.addAll(pipes is Iterable ? pipes.toList() : [pipes]);
return this;
}
/// Set the method to call on the pipes.
@override
Pipeline via(String method) {
_method = method;
return this;
}
/// Run the pipeline with a final destination callback.
@override
Future<dynamic> then(dynamic Function(dynamic) destination) async {
var pipeline = pipes().fold<Function>(
(passable) => prepareDestination(destination),
(Function next, pipe) => (passable) => carry(pipe, passable, next),
);
return pipeline(_passable);
}
/// Run the pipeline and return the result.
@override
Future<dynamic> thenReturn() async {
return then((passable) => passable);
}
/// Get the final piece of the Closure onion.
Function prepareDestination(Function destination) {
return (passable) async {
try {
var result = destination(passable);
return result is Future ? await result : result;
} catch (e) {
return handleException(passable, e);
}
};
}
/// Get a Closure that represents a slice of the application onion.
Future<dynamic> carry(dynamic pipe, dynamic passable, Function next) async {
try {
if (pipe is Function) {
var result = pipe(passable, next);
return result is Future ? await result : result;
}
List<String> parameters = [];
if (pipe is String) {
var parts = parsePipeString(pipe);
pipe = parts[0];
parameters = parts.sublist(1);
}
var instance = pipe is String ? getContainer().make(pipe as Type) : pipe;
if (instance == null) {
throw Exception('Unable to resolve pipe: $pipe');
}
var method = instance.call(_method);
if (method == null) {
throw Exception('Method $_method not found on instance: $instance');
}
var result = Function.apply(
method,
[passable, next, ...parameters],
);
result = result is Future ? await result : result;
return handleCarry(result);
} catch (e) {
return handleException(passable, e);
}
}
/// Parse full pipe string to get name and parameters.
List<String> parsePipeString(String pipe) {
var parts = pipe.split(':');
return [parts[0], if (parts.length > 1) ...parts[1].split(',')];
}
/// Get the array of configured pipes.
List<dynamic> pipes() {
return List.unmodifiable(_pipes);
}
/// Get the container instance.
Container getContainer() {
if (_container == null) {
throw Exception(
'A container instance has not been passed to the Pipeline.');
}
return _container!;
}
/// Set the container instance.
Pipeline setContainer(Container container) {
_container = container;
return this;
}
/// Handle the value returned from each pipe before passing it to the next.
dynamic handleCarry(dynamic carry) {
if (carry is Future) {
return carry.then((value) => value ?? _passable);
}
return carry ?? _passable;
}
/// Handle the given exception.
dynamic handleException(dynamic passable, Object e) {
_logger.severe('Exception occurred in pipeline', e);
if (e is Exception) {
// You can add custom exception handling logic here
// For example, you might want to return a default value or transform the exception
}
throw e;
}
}
/// Extension methods for the Pipeline class.
extension PipelineExtensions on Pipeline {
/// Add a logging pipe to the pipeline.
Pipeline addLoggingPipe() {
return pipe((passable, next) {
_logger.info('Pipe input: $passable');
var result = next(passable);
_logger.info('Pipe output: $result');
return result;
});
}
/// Add an asynchronous pipe to the pipeline.
Pipeline addAsyncPipe(Future<dynamic> Function(dynamic) asyncOperation) {
return pipe((passable, next) async {
var result = await asyncOperation(passable);
return next(result);
});
}
/// Add a validation pipe to the pipeline.
Pipeline addValidationPipe(bool Function(dynamic) validator,
{String? errorMessage}) {
return pipe((passable, next) {
if (!validator(passable)) {
throw Exception(errorMessage ?? 'Validation failed');
}
return next(passable);
});
}
}