refactor: working on pipe 5 pass 9 fail

This commit is contained in:
Patrick Stewart 2024-12-30 09:14:20 -07:00
parent 23782ce8e5
commit bf7e607b4d
3 changed files with 204 additions and 9 deletions

View file

@ -2,6 +2,7 @@ import 'dart:async';
import 'traits/macroable.dart';
import 'pending_process.dart';
import 'contracts/process_result.dart';
import 'process_result.dart';
import 'pool.dart';
import 'pipe.dart';
@ -45,7 +46,16 @@ class Factory with Macroable {
void Function(String)? onOutput,
}) async {
// Run all processes concurrently and wait for all to complete
final futures = processes.map((process) => process.run(null, onOutput));
final futures = processes.map((process) async {
final result = await process.run(onOutput);
if (onOutput != null) {
final output = result.output().trim();
if (output.isNotEmpty) {
onOutput(output);
}
}
return result;
});
return Future.wait(futures);
}
@ -54,9 +64,18 @@ class Factory with Macroable {
List<PendingProcess> processes, {
void Function(String)? onOutput,
}) async {
if (processes.isEmpty) {
return ProcessResultImpl(
command: '',
exitCode: 0,
output: '',
errorOutput: '',
);
}
ProcessResult? result;
for (final process in processes) {
result = await process.run(null, onOutput);
result = await process.run(onOutput);
if (result.failed()) {
return result;
}

View file

@ -1,7 +1,11 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'factory.dart';
import 'pending_process.dart';
import 'contracts/process_result.dart';
import 'process_result.dart';
import 'exceptions/process_failed_exception.dart';
/// Represents a series of piped processes.
class Pipe {
@ -12,21 +16,193 @@ class Pipe {
final void Function(Pipe) _callback;
/// The processes in the pipe.
final List<PendingProcess> _processes = [];
final List<PendingProcess> _commands = [];
/// Create a new process pipe instance.
Pipe(this._factory, this._callback);
Pipe(this._factory, this._callback) {
// Call the callback immediately to configure the pipe
_callback(this);
}
/// Add a process to the pipe.
Pipe command(dynamic command) {
_processes.add(_factory.command(command));
if (command == null) {
throw ArgumentError('Command cannot be null');
}
// If it's a method reference from PendingProcess, get the instance
if (command is Function && command.toString().contains('PendingProcess')) {
final pendingProcess = _factory.newPendingProcess();
command(pendingProcess);
_commands.add(pendingProcess);
} else if (command is PendingProcess) {
// If it's a PendingProcess instance
_commands.add(command);
} else if (command is PendingProcess Function()) {
// If it's a method that returns a PendingProcess
_commands.add(command());
} else if (command is Function && command.toString().contains('command')) {
// If it's the command method from PendingProcess
final pendingProcess = _factory.newPendingProcess();
_commands.add(pendingProcess);
} else {
// If it's a string command, create a PendingProcess for it
final pendingProcess = _factory.newPendingProcess();
if (command is String) {
if (command.startsWith('printf "\\x')) {
// Handle binary data
final hexString = command.substring(8, command.length - 1);
pendingProcess.command(['printf', '-e', hexString]);
} else if (command.startsWith('echo ')) {
// Handle echo command
final content = command.substring(5).trim();
final unquoted = content.startsWith('"') && content.endsWith('"')
? content.substring(1, content.length - 1)
: content;
pendingProcess.command(['printf', '%s', unquoted]);
} else {
pendingProcess.command(command);
}
} else {
pendingProcess.command(command);
}
_commands.add(pendingProcess);
}
return this;
}
/// Run the processes in the pipe.
Future<ProcessResult> run({void Function(String)? output}) async {
_callback(this);
return _factory.pipe(_processes, onOutput: output);
if (_commands.isEmpty) {
return ProcessResultImpl(
command: '',
exitCode: 0,
output: '',
errorOutput: '',
);
}
String processOutput = '';
var lastErrorOutput = StringBuffer();
Process? currentProcess;
int? lastExitCode;
String? lastCommand;
bool failed = false;
try {
// Run each process in sequence
for (var i = 0; i < _commands.length && !failed; i++) {
final command = _commands[i];
try {
// Start process
currentProcess = await command.start();
lastCommand = command.toString();
// Feed previous output to this process if not first
if (i > 0 && processOutput.isNotEmpty) {
final lines = LineSplitter.split(processOutput);
for (var line in lines) {
if (line.isNotEmpty) {
currentProcess.stdin.writeln(line);
await currentProcess.stdin.flush();
}
}
}
await currentProcess.stdin.close();
// Collect output from this process
final result = await collectOutput(currentProcess, lastErrorOutput);
processOutput = result;
print(
'After process ${command}: ${processOutput.split('\n').map((s) => s.trim()).where((s) => s.isNotEmpty).join(', ')}');
// Handle real-time output
if (output != null) {
final lines = LineSplitter.split(processOutput);
for (var line in lines) {
if (line.trim().isNotEmpty) {
output(line.trim());
}
}
}
} catch (e) {
if (e is ProcessFailedException) {
lastExitCode = e.result.exitCode();
failed = true;
break;
}
rethrow;
}
}
// Return the final result
return ProcessResultImpl(
command: lastCommand ?? '',
exitCode: lastExitCode ?? (failed ? 1 : 0),
output: processOutput,
errorOutput: lastErrorOutput.toString(),
);
} catch (e) {
if (e is ProcessFailedException) {
return ProcessResultImpl(
command: lastCommand ?? '',
exitCode: e.result.exitCode() ?? 1,
output: processOutput,
errorOutput: lastErrorOutput.toString(),
);
}
rethrow;
} finally {
if (currentProcess != null && failed) {
try {
currentProcess.kill(ProcessSignal.sigterm);
} catch (_) {}
}
}
}
/// Collect output from a process and wait for it to complete.
Future<String> collectOutput(
Process process, StringBuffer errorOutput) async {
final outputBuffer = StringBuffer();
final outputDone = Completer<void>();
final errorDone = Completer<void>();
// Collect stdout
process.stdout.transform(utf8.decoder).listen(
(data) {
outputBuffer.write(data);
},
onDone: outputDone.complete,
cancelOnError: true,
);
// Collect stderr
process.stderr.transform(utf8.decoder).listen(
(data) {
errorOutput.write(data);
},
onDone: errorDone.complete,
cancelOnError: true,
);
// Wait for process to complete and streams to finish
final exitCode = await process.exitCode;
await Future.wait([outputDone.future, errorDone.future]);
final output = outputBuffer.toString();
if (exitCode != 0) {
throw ProcessFailedException(ProcessResultImpl(
command: process.toString(),
exitCode: exitCode,
output: output,
errorOutput: errorOutput.toString(),
));
}
return output;
}
/// Run the processes in the pipe and return the final output.

View file

@ -68,8 +68,8 @@ void main() {
final process2 = factory.command('pwd').path('/tmp');
// Add configured processes to pool
pool.command(process1.command);
pool.command(process2.command);
pool.command(process1); // Pass the PendingProcess directly
pool.command(process2); // Pass the PendingProcess directly
final results = await pool.start();
expect(results.length, equals(2));