From 23782ce8e5e08b904a54bfeb43ced4ce949d4857 Mon Sep 17 00:00:00 2001 From: Patrick Stewart Date: Mon, 30 Dec 2024 07:30:38 -0700 Subject: [PATCH] refactor: working on process pool 8 pass 2 fail --- packages/process/lib/src/factory.dart | 8 +++---- packages/process/lib/src/invoked_process.dart | 13 +++++------ packages/process/lib/src/pending_process.dart | 22 +++++++++++++++---- packages/process/lib/src/pool.dart | 15 ++++++++++--- .../process/test/invoked_process_test.dart | 4 ++-- 5 files changed, 41 insertions(+), 21 deletions(-) diff --git a/packages/process/lib/src/factory.dart b/packages/process/lib/src/factory.dart index 8ee0ec9..c73799b 100644 --- a/packages/process/lib/src/factory.dart +++ b/packages/process/lib/src/factory.dart @@ -44,11 +44,9 @@ class Factory with Macroable { List processes, { void Function(String)? onOutput, }) async { - final results = []; - for (final process in processes) { - results.add(await process.run(null, onOutput)); - } - return results; + // Run all processes concurrently and wait for all to complete + final futures = processes.map((process) => process.run(null, onOutput)); + return Future.wait(futures); } /// Run a series of processes in sequence. diff --git a/packages/process/lib/src/invoked_process.dart b/packages/process/lib/src/invoked_process.dart index ef30757..92e90ab 100644 --- a/packages/process/lib/src/invoked_process.dart +++ b/packages/process/lib/src/invoked_process.dart @@ -34,8 +34,9 @@ class InvokedProcess { late final StreamSubscription> _stderrSubscription; /// Create a new invoked process instance. - InvokedProcess(this._process, this._command, [this._outputHandler]) - : _stdoutController = StreamController>.broadcast(), + InvokedProcess(Process process, this._command, [this._outputHandler]) + : _process = process, + _stdoutController = StreamController>.broadcast(), _stderrController = StreamController>.broadcast() { // Set up output handling _stdoutSubscription = _process.stdout.listen( @@ -78,11 +79,9 @@ class InvokedProcess { /// Kill the process. bool kill([ProcessSignal signal = ProcessSignal.sigterm]) { - try { - return _process.kill(signal); - } catch (e) { - return false; - } + closeStdin(); + _process.kill(signal); + return true; } /// Get the process exit code. diff --git a/packages/process/lib/src/pending_process.dart b/packages/process/lib/src/pending_process.dart index 3846f76..2360cda 100644 --- a/packages/process/lib/src/pending_process.dart +++ b/packages/process/lib/src/pending_process.dart @@ -133,6 +133,9 @@ class PendingProcess with Macroable { if (commandStr == 'test -t 0') { // Special handling for TTY test command return ('sh', ['-c', 'exit 0'], true); + } else if (commandStr == 'pwd') { + // Special handling for pwd command + return ('pwd', [], false); } // All other commands need sh shell return ('sh', ['-c', commandStr], true); @@ -216,11 +219,16 @@ class PendingProcess with Macroable { } } - final stdoutSubscription = - process.stdout.transform(utf8.decoder).listen(handleOutput); + final stdoutCompleter = Completer(); + final stderrCompleter = Completer(); - final stderrSubscription = - process.stderr.transform(utf8.decoder).listen(handleError); + final stdoutSubscription = process.stdout + .transform(utf8.decoder) + .listen(handleOutput, onDone: stdoutCompleter.complete); + + final stderrSubscription = process.stderr + .transform(utf8.decoder) + .listen(handleError, onDone: stderrCompleter.complete); if (_input != null) { if (_input is String) { @@ -251,6 +259,12 @@ class PendingProcess with Macroable { exitCode = await process.exitCode; } + // Wait for output streams to complete + await Future.wait([ + stdoutCompleter.future, + stderrCompleter.future, + ]); + await stdoutSubscription.cancel(); await stderrSubscription.cancel(); diff --git a/packages/process/lib/src/pool.dart b/packages/process/lib/src/pool.dart index 64ffeba..94652fc 100644 --- a/packages/process/lib/src/pool.dart +++ b/packages/process/lib/src/pool.dart @@ -15,17 +15,26 @@ class Pool { final List _processes = []; /// Create a new process pool instance. - Pool(this._factory, this._callback); + Pool(this._factory, this._callback) { + // Call the callback immediately to configure the pool + _callback(this); + } /// Add a process to the pool. Pool command(dynamic command) { - _processes.add(_factory.command(command)); + if (command is PendingProcess) { + _processes.add(command); + } else { + _processes.add(_factory.command(command)); + } return this; } /// Start the processes in the pool. Future> start([void Function(String)? output]) async { - _callback(this); + if (_processes.isEmpty) { + return []; + } return _factory.concurrently(_processes, onOutput: output); } } diff --git a/packages/process/test/invoked_process_test.dart b/packages/process/test/invoked_process_test.dart index 08c7f60..5148ae3 100644 --- a/packages/process/test/invoked_process_test.dart +++ b/packages/process/test/invoked_process_test.dart @@ -119,11 +119,11 @@ void main() { invokedProcess = InvokedProcess(process, 'sleep 10'); // Kill process and ensure resources are cleaned up - invokedProcess.kill(); + expect(invokedProcess.kill(), isTrue); await invokedProcess.wait(); // Verify process is terminated - expect(() => process.kill(), throwsA(anything)); + expect(await invokedProcess.exitCode, isNot(0)); }); }); }