refactor: working on process pool 8 pass 2 fail

This commit is contained in:
Patrick Stewart 2024-12-30 07:30:38 -07:00
parent e2be3ebd54
commit 23782ce8e5
5 changed files with 41 additions and 21 deletions

View file

@ -44,11 +44,9 @@ class Factory with Macroable {
List<PendingProcess> processes, { List<PendingProcess> processes, {
void Function(String)? onOutput, void Function(String)? onOutput,
}) async { }) async {
final results = <ProcessResult>[]; // Run all processes concurrently and wait for all to complete
for (final process in processes) { final futures = processes.map((process) => process.run(null, onOutput));
results.add(await process.run(null, onOutput)); return Future.wait(futures);
}
return results;
} }
/// Run a series of processes in sequence. /// Run a series of processes in sequence.

View file

@ -34,8 +34,9 @@ class InvokedProcess {
late final StreamSubscription<List<int>> _stderrSubscription; late final StreamSubscription<List<int>> _stderrSubscription;
/// Create a new invoked process instance. /// Create a new invoked process instance.
InvokedProcess(this._process, this._command, [this._outputHandler]) InvokedProcess(Process process, this._command, [this._outputHandler])
: _stdoutController = StreamController<List<int>>.broadcast(), : _process = process,
_stdoutController = StreamController<List<int>>.broadcast(),
_stderrController = StreamController<List<int>>.broadcast() { _stderrController = StreamController<List<int>>.broadcast() {
// Set up output handling // Set up output handling
_stdoutSubscription = _process.stdout.listen( _stdoutSubscription = _process.stdout.listen(
@ -78,11 +79,9 @@ class InvokedProcess {
/// Kill the process. /// Kill the process.
bool kill([ProcessSignal signal = ProcessSignal.sigterm]) { bool kill([ProcessSignal signal = ProcessSignal.sigterm]) {
try { closeStdin();
return _process.kill(signal); _process.kill(signal);
} catch (e) { return true;
return false;
}
} }
/// Get the process exit code. /// Get the process exit code.

View file

@ -133,6 +133,9 @@ class PendingProcess with Macroable {
if (commandStr == 'test -t 0') { if (commandStr == 'test -t 0') {
// Special handling for TTY test command // Special handling for TTY test command
return ('sh', ['-c', 'exit 0'], true); return ('sh', ['-c', 'exit 0'], true);
} else if (commandStr == 'pwd') {
// Special handling for pwd command
return ('pwd', [], false);
} }
// All other commands need sh shell // All other commands need sh shell
return ('sh', ['-c', commandStr], true); return ('sh', ['-c', commandStr], true);
@ -216,11 +219,16 @@ class PendingProcess with Macroable {
} }
} }
final stdoutSubscription = final stdoutCompleter = Completer<void>();
process.stdout.transform(utf8.decoder).listen(handleOutput); final stderrCompleter = Completer<void>();
final stderrSubscription = final stdoutSubscription = process.stdout
process.stderr.transform(utf8.decoder).listen(handleError); .transform(utf8.decoder)
.listen(handleOutput, onDone: stdoutCompleter.complete);
final stderrSubscription = process.stderr
.transform(utf8.decoder)
.listen(handleError, onDone: stderrCompleter.complete);
if (_input != null) { if (_input != null) {
if (_input is String) { if (_input is String) {
@ -251,6 +259,12 @@ class PendingProcess with Macroable {
exitCode = await process.exitCode; exitCode = await process.exitCode;
} }
// Wait for output streams to complete
await Future.wait([
stdoutCompleter.future,
stderrCompleter.future,
]);
await stdoutSubscription.cancel(); await stdoutSubscription.cancel();
await stderrSubscription.cancel(); await stderrSubscription.cancel();

View file

@ -15,17 +15,26 @@ class Pool {
final List<PendingProcess> _processes = []; final List<PendingProcess> _processes = [];
/// Create a new process pool instance. /// Create a new process pool instance.
Pool(this._factory, this._callback); Pool(this._factory, this._callback) {
// Call the callback immediately to configure the pool
_callback(this);
}
/// Add a process to the pool. /// Add a process to the pool.
Pool command(dynamic command) { Pool command(dynamic command) {
_processes.add(_factory.command(command)); if (command is PendingProcess) {
_processes.add(command);
} else {
_processes.add(_factory.command(command));
}
return this; return this;
} }
/// Start the processes in the pool. /// Start the processes in the pool.
Future<List<ProcessResult>> start([void Function(String)? output]) async { Future<List<ProcessResult>> start([void Function(String)? output]) async {
_callback(this); if (_processes.isEmpty) {
return [];
}
return _factory.concurrently(_processes, onOutput: output); return _factory.concurrently(_processes, onOutput: output);
} }
} }

View file

@ -119,11 +119,11 @@ void main() {
invokedProcess = InvokedProcess(process, 'sleep 10'); invokedProcess = InvokedProcess(process, 'sleep 10');
// Kill process and ensure resources are cleaned up // Kill process and ensure resources are cleaned up
invokedProcess.kill(); expect(invokedProcess.kill(), isTrue);
await invokedProcess.wait(); await invokedProcess.wait();
// Verify process is terminated // Verify process is terminated
expect(() => process.kill(), throwsA(anything)); expect(await invokedProcess.exitCode, isNot(0));
}); });
}); });
} }