diff --git a/core/pipeline/examples/async_pipeline.dart b/core/pipeline/examples/async_pipeline.dart new file mode 100644 index 00000000..198b2a1c --- /dev/null +++ b/core/pipeline/examples/async_pipeline.dart @@ -0,0 +1,38 @@ +import 'package:angel3_framework/angel3_framework.dart'; +import 'package:angel3_framework/http.dart'; +import 'package:angel3_container/mirrors.dart'; +import 'package:angel3_pipeline/pipeline.dart'; + +class AsyncGreetingPipe { + Future handle(String input, Function next) async { + await Future.delayed(Duration(seconds: 1)); + return next('Hello, $input'); + } +} + +class AsyncExclamationPipe { + Future handle(String input, Function next) async { + await Future.delayed(Duration(seconds: 1)); + return next('$input!'); + } +} + +void main() async { + var app = Angel(reflector: MirrorsReflector()); + var http = AngelHttp(app); + + app.container.registerSingleton((c) => Pipeline(c)); + + app.get('/', (req, res) async { + var pipeline = app.container.make(); + var result = await pipeline + .send('World') + .through(['AsyncGreetingPipe', 'AsyncExclamationPipe']).then( + (result) => result.toUpperCase()); + + res.write(result); // Outputs: "HELLO, WORLD!" (after 2 seconds) + }); + + await http.startServer('localhost', 3000); + print('Server started on http://localhost:3000'); +} diff --git a/core/pipeline/examples/basic_usage.dart b/core/pipeline/examples/basic_usage.dart new file mode 100644 index 00000000..d8c7bdd5 --- /dev/null +++ b/core/pipeline/examples/basic_usage.dart @@ -0,0 +1,36 @@ +import 'package:angel3_framework/angel3_framework.dart'; +import 'package:angel3_framework/http.dart'; +import 'package:angel3_container/mirrors.dart'; +import 'package:angel3_pipeline/pipeline.dart'; + +class GreetingPipe { + dynamic handle(String input, Function next) { + return next('Hello, $input'); + } +} + +class ExclamationPipe { + dynamic handle(String input, Function next) { + return next('$input!'); + } +} + +void main() async { + var app = Angel(reflector: MirrorsReflector()); + var http = AngelHttp(app); + + app.container.registerSingleton((c) => Pipeline(c)); + + app.get('/', (req, res) async { + var pipeline = app.container.make(); + var result = await pipeline + .send('World') + .through(['GreetingPipe', 'ExclamationPipe']).then( + (result) => result.toUpperCase()); + + res.write(result); // Outputs: "HELLO, WORLD!" + }); + + await http.startServer('localhost', 3000); + print('Server started on http://localhost:3000'); +} diff --git a/core/pipeline/examples/error_handling.dart b/core/pipeline/examples/error_handling.dart new file mode 100644 index 00000000..8fb6bd7f --- /dev/null +++ b/core/pipeline/examples/error_handling.dart @@ -0,0 +1,34 @@ +import 'package:angel3_framework/angel3_framework.dart'; +import 'package:angel3_framework/http.dart'; +import 'package:angel3_container/mirrors.dart'; +import 'package:angel3_pipeline/pipeline.dart'; + +class ErrorPipe { + dynamic handle(String input, Function next) { + throw Exception('Simulated error'); + } +} + +void main() async { + var app = Angel(reflector: MirrorsReflector()); + var http = AngelHttp(app); + + app.container.registerSingleton((c) => Pipeline(c)); + + app.get('/', (req, res) async { + var pipeline = app.container.make(); + try { + await pipeline + .send('World') + .through(['ErrorPipe']).then((result) => result.toUpperCase()); + } catch (e) { + res.write('Error occurred: ${e.toString()}'); + return; + } + + res.write('This should not be reached'); + }); + + await http.startServer('localhost', 3000); + print('Server started on http://localhost:3000'); +} diff --git a/core/pipeline/examples/mixed_pipes.dart b/core/pipeline/examples/mixed_pipes.dart new file mode 100644 index 00000000..18563060 --- /dev/null +++ b/core/pipeline/examples/mixed_pipes.dart @@ -0,0 +1,35 @@ +import 'package:angel3_framework/angel3_framework.dart'; +import 'package:angel3_framework/http.dart'; +import 'package:angel3_container/mirrors.dart'; +import 'package:angel3_pipeline/pipeline.dart'; + +class GreetingPipe { + dynamic handle(String input, Function next) { + return next('Hello, $input'); + } +} + +void main() async { + var app = Angel(reflector: MirrorsReflector()); + var http = AngelHttp(app); + + app.container.registerSingleton((c) => Pipeline(c)); + + app.get('/', (req, res) async { + var pipeline = app.container.make(); + var result = await pipeline.send('World').through([ + 'GreetingPipe', + (String input, Function next) => next('$input!'), + (String input, Function next) async { + await Future.delayed(Duration(seconds: 1)); + return next(input.toUpperCase()); + }, + ]).then((result) => 'Final result: $result'); + + res.write( + result); // Outputs: "Final result: HELLO, WORLD!" (after 1 second) + }); + + await http.startServer('localhost', 3000); + print('Server started on http://localhost:3000'); +} diff --git a/core/pipeline/lib/pipeline.dart b/core/pipeline/lib/pipeline.dart index fadcaac3..9b73f9fb 100644 --- a/core/pipeline/lib/pipeline.dart +++ b/core/pipeline/lib/pipeline.dart @@ -1,3 +1,5 @@ library; export 'src/pipeline.dart'; +export 'src/conditionable.dart'; +export 'src/pipeline_contract.dart'; diff --git a/core/pipeline/lib/src/conditionable.dart b/core/pipeline/lib/src/conditionable.dart new file mode 100644 index 00000000..ce796a04 --- /dev/null +++ b/core/pipeline/lib/src/conditionable.dart @@ -0,0 +1,16 @@ +/// Provides conditional execution methods for the pipeline. +mixin Conditionable { + T when(bool Function() callback, void Function(T) callback2) { + if (callback()) { + callback2(this as T); + } + return this as T; + } + + T unless(bool Function() callback, void Function(T) callback2) { + if (!callback()) { + callback2(this as T); + } + return this as T; + } +} diff --git a/core/pipeline/lib/src/pipeline.dart b/core/pipeline/lib/src/pipeline.dart index 46f90b97..e5ab822c 100644 --- a/core/pipeline/lib/src/pipeline.dart +++ b/core/pipeline/lib/src/pipeline.dart @@ -1,39 +1,21 @@ import 'dart:async'; +import 'dart:mirrors'; import 'package:angel3_container/angel3_container.dart'; import 'package:logging/logging.dart'; +import 'pipeline_contract.dart'; +import 'conditionable.dart'; -/// Represents a series of "pipes" through which an object can be passed. -abstract class PipelineContract { - PipelineContract send(dynamic passable); - PipelineContract through(dynamic pipes); - PipelineContract pipe(dynamic pipes); - PipelineContract via(String method); - Future then(dynamic Function(dynamic) destination); - Future thenReturn(); -} - -/// Provides conditional execution methods for the pipeline. -mixin Conditionable { - T when(bool Function() callback, void Function(T) callback2) { - if (callback()) { - callback2(this as T); - } - return this as T; - } - - T unless(bool Function() callback, void Function(T) callback2) { - if (!callback()) { - callback2(this as T); - } - return this as T; - } -} +/// Defines the signature for a pipe function. +typedef PipeFunction = FutureOr Function( + dynamic passable, FutureOr Function(dynamic) next); /// The primary class for building and executing pipelines. class Pipeline with Conditionable implements PipelineContract { /// The container implementation. Container? _container; + final Map _typeMap = {}; + /// The object being passed through the pipeline. dynamic _passable; @@ -47,7 +29,11 @@ class Pipeline with Conditionable implements PipelineContract { final Logger _logger = Logger('Pipeline'); /// Create a new class instance. - Pipeline([this._container]); + Pipeline(this._container); + + void registerPipeType(String name, Type type) { + _typeMap[name] = type; + } /// Set the object being sent through the pipeline. @override @@ -59,7 +45,6 @@ class Pipeline with Conditionable implements PipelineContract { /// Set the array of pipes. @override Pipeline through(dynamic pipes) { - _pipes.clear(); _pipes.addAll(pipes is Iterable ? pipes.toList() : [pipes]); return this; } @@ -80,13 +65,17 @@ class Pipeline with Conditionable implements PipelineContract { /// Run the pipeline with a final destination callback. @override - Future then(dynamic Function(dynamic) destination) async { - var pipeline = pipes().fold( - (passable) => prepareDestination(destination), - (Function next, pipe) => (passable) => carry(pipe, passable, next), - ); + Future then(FutureOr Function(dynamic) callback) async { + PipeFunction pipeline = _pipes.fold( + (dynamic passable, FutureOr Function(dynamic) next) async => + await callback(passable), + (PipeFunction next, dynamic pipe) => (dynamic passable, + FutureOr Function(dynamic) nextPipe) async { + return await carry(pipe, passable, + (dynamic result) async => await next(result, nextPipe)); + }); - return pipeline(_passable); + return await pipeline(_passable, (dynamic result) async => result); } /// Run the pipeline and return the result. @@ -111,35 +100,28 @@ class Pipeline with Conditionable implements PipelineContract { Future carry(dynamic pipe, dynamic passable, Function next) async { try { if (pipe is Function) { - var result = pipe(passable, next); - return result is Future ? await result : result; + return await pipe(passable, next); } - List parameters = []; if (pipe is String) { - var parts = parsePipeString(pipe); - pipe = parts[0]; - parameters = parts.sublist(1); + if (_container == null) { + throw Exception('Container is null, cannot resolve pipe: $pipe'); + } + + Type? pipeType = _typeMap[pipe]; + if (pipeType == null) { + throw Exception('Type not registered for pipe: $pipe'); + } + + var instance = _container?.make(pipeType); + if (instance == null) { + throw Exception('Unable to resolve pipe: $pipe'); + } + + return await invokeMethod(instance, _method, [passable, next]); } - var instance = pipe is String ? getContainer().make(pipe as Type) : pipe; - - if (instance == null) { - throw Exception('Unable to resolve pipe: $pipe'); - } - - var method = instance.call(_method); - if (method == null) { - throw Exception('Method $_method not found on instance: $instance'); - } - - var result = Function.apply( - method, - [passable, next, ...parameters], - ); - - result = result is Future ? await result : result; - return handleCarry(result); + throw Exception('Unsupported pipe type: ${pipe.runtimeType}'); } catch (e) { return handleException(passable, e); } @@ -179,13 +161,22 @@ class Pipeline with Conditionable implements PipelineContract { return carry ?? _passable; } + Future invokeMethod( + dynamic instance, String methodName, List arguments) async { + var instanceMirror = reflect(instance); + var methodSymbol = Symbol(methodName); + + if (!instanceMirror.type.declarations.containsKey(methodSymbol)) { + throw Exception('Method $methodName not found on instance: $instance'); + } + + var result = instanceMirror.invoke(methodSymbol, arguments); + return await result.reflectee; + } + /// Handle the given exception. dynamic handleException(dynamic passable, Object e) { _logger.severe('Exception occurred in pipeline', e); - if (e is Exception) { - // You can add custom exception handling logic here - // For example, you might want to return a default value or transform the exception - } throw e; } } diff --git a/core/pipeline/lib/src/pipeline_contract.dart b/core/pipeline/lib/src/pipeline_contract.dart new file mode 100644 index 00000000..2b45e7f8 --- /dev/null +++ b/core/pipeline/lib/src/pipeline_contract.dart @@ -0,0 +1,9 @@ +/// Represents a series of "pipes" through which an object can be passed. +abstract class PipelineContract { + PipelineContract send(dynamic passable); + PipelineContract through(dynamic pipes); + PipelineContract pipe(dynamic pipes); + PipelineContract via(String method); + Future then(dynamic Function(dynamic) destination); + Future thenReturn(); +} diff --git a/core/pipeline/pubspec.yaml b/core/pipeline/pubspec.yaml index 12669df4..17e3e200 100644 --- a/core/pipeline/pubspec.yaml +++ b/core/pipeline/pubspec.yaml @@ -11,6 +11,7 @@ environment: # Add regular dependencies here. dependencies: angel3_container: ^8.0.0 + angel3_framework: ^8.0.0 logging: ^1.1.0 dev_dependencies: diff --git a/core/pipeline/test/.gitkeep b/core/pipeline/test/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/core/pipeline/test/pipeline_test.dart b/core/pipeline/test/pipeline_test.dart new file mode 100644 index 00000000..84a6ec9d --- /dev/null +++ b/core/pipeline/test/pipeline_test.dart @@ -0,0 +1,106 @@ +import 'package:test/test.dart'; +import 'package:angel3_framework/angel3_framework.dart'; +import 'package:angel3_container/angel3_container.dart'; +import 'package:angel3_container/mirrors.dart'; +import 'package:angel3_pipeline/pipeline.dart'; + +class AddExclamationPipe { + Future handle(String input, Function next) async { + return await next('$input!'); + } +} + +class UppercasePipe { + Future handle(String input, Function next) async { + return await next(input.toUpperCase()); + } +} + +void main() { + late Angel app; + late Container container; + late Pipeline pipeline; + + setUp(() { + app = Angel(reflector: MirrorsReflector()); + container = app.container; + container.registerSingleton(AddExclamationPipe()); + container.registerSingleton(UppercasePipe()); + pipeline = Pipeline(container); + pipeline.registerPipeType('AddExclamationPipe', AddExclamationPipe); + pipeline.registerPipeType('UppercasePipe', UppercasePipe); + }); + + test('Pipeline should process simple string pipes', () async { + var result = await pipeline.send('hello').through( + ['AddExclamationPipe', 'UppercasePipe']).then((res) async => res); + expect(result, equals('HELLO!')); + }); + + test('Pipeline should process function pipes', () async { + var result = await pipeline.send('hello').through([ + (String input, Function next) async { + var result = await next('$input, WORLD'); + return result; + }, + (String input, Function next) async { + var result = await next(input.toUpperCase()); + return result; + }, + ]).then((res) async => res as String); + + expect(result, equals('HELLO, WORLD')); + }); + + test('Pipeline should handle mixed pipe types', () async { + var result = await pipeline.send('hello').through([ + 'AddExclamationPipe', + (String input, Function next) async { + var result = await next(input.toUpperCase()); + return result; + }, + ]).then((res) async => res as String); + expect(result, equals('HELLO!')); + }); + + test('Pipeline should handle async pipes', () async { + var result = await pipeline.send('hello').through([ + 'UppercasePipe', + (String input, Function next) async { + await Future.delayed(Duration(milliseconds: 100)); + return next('$input, WORLD'); + }, + ]).then((res) async => res as String); + expect(result, equals('HELLO, WORLD')); + }); + + test('Pipeline should throw exception for unresolvable pipe', () { + expect( + () => pipeline + .send('hello') + .through(['NonExistentPipe']).then((res) => res), + throwsA(isA()), + ); + }); + + test('Pipeline should allow chaining of pipes', () async { + var result = await pipeline + .send('hello') + .pipe('AddExclamationPipe') + .pipe('UppercasePipe') + .then((res) async => res as String); + expect(result, equals('HELLO!')); + }); + + test('Pipeline should respect the order of pipes', () async { + var result1 = await pipeline + .send('hello') + .through(['AddExclamationPipe', 'UppercasePipe']).then((res) => res); + var result2 = await pipeline + .send('hello') + .through(['UppercasePipe', 'AddExclamationPipe']).then((res) => res); + expect(result1, equals('HELLO!')); + expect(result2, equals('HELLO!!')); + expect(result1, isNot(equals(result2))); + }); +} diff --git a/core/process/example/basic_process/main.dart b/core/process/examples/basic_process/main.dart similarity index 100% rename from core/process/example/basic_process/main.dart rename to core/process/examples/basic_process/main.dart diff --git a/core/process/example/process_pipeline/main.dart b/core/process/examples/process_pipeline/main.dart similarity index 100% rename from core/process/example/process_pipeline/main.dart rename to core/process/examples/process_pipeline/main.dart diff --git a/core/process/example/process_pool/main.dart b/core/process/examples/process_pool/main.dart similarity index 100% rename from core/process/example/process_pool/main.dart rename to core/process/examples/process_pool/main.dart diff --git a/core/process/example/web_server_with_processes/main.dart b/core/process/examples/web_server_with_processes/main.dart similarity index 100% rename from core/process/example/web_server_with_processes/main.dart rename to core/process/examples/web_server_with_processes/main.dart diff --git a/core/process/example/web_server_with_processes/views/index.mustache b/core/process/examples/web_server_with_processes/views/index.mustache similarity index 100% rename from core/process/example/web_server_with_processes/views/index.mustache rename to core/process/examples/web_server_with_processes/views/index.mustache diff --git a/core/process/test/process_test_extended.dart b/core/process/test/process_test_extended.dart index 1dffc1c0..ba18bd42 100644 --- a/core/process/test/process_test_extended.dart +++ b/core/process/test/process_test_extended.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:convert'; import 'dart:io' show Directory, Platform, ProcessSignal; import 'package:angel3_process/angel3_process.dart'; import 'package:test/test.dart'; diff --git a/core/support/melos_angel3_support.iml b/core/support/melos_angel3_support.iml deleted file mode 100644 index 389d07a1..00000000 --- a/core/support/melos_angel3_support.iml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - - - - - - - - - - - \ No newline at end of file