platform/packages/range_header/lib/src/converter.dart
2021-05-18 19:58:51 +08:00

171 lines
5 KiB
Dart

import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' show BytesBuilder;
import 'dart:math';
import 'package:async/async.dart';
import 'package:charcode/ascii.dart';
import 'range_header.dart';
/// A [StreamTransformer] that uses a parsed [RangeHeader] and transforms an input stream
/// into one compatible with the `multipart/byte-ranges` specification.
class RangeHeaderTransformer
extends StreamTransformerBase<List<int>, List<int>> {
final RangeHeader header;
final String boundary, mimeType;
final int totalLength;
RangeHeaderTransformer(this.header, this.mimeType, this.totalLength,
{String? boundary})
: boundary = boundary ?? _randomString() {
if (header.items.isEmpty) {
throw ArgumentError('`header` cannot be null or empty.');
}
}
/// Computes the content length that will be written to a response, given a stream of the given [totalFileSize].
int computeContentLength(int totalFileSize) {
var len = 0;
for (var item in header.items) {
if (item.start == -1) {
if (item.end == -1) {
len += totalFileSize;
} else {
//len += item.end + 1;
len += item.end + 1;
}
} else if (item.end == -1) {
len += totalFileSize - item.start;
//len += totalFileSize - item.start - 1;
} else {
len += item.end - item.start;
}
// Take into consideration the fact that delimiters are written.
len += utf8.encode('--$boundary\r\n').length;
len += utf8.encode('Content-Type: $mimeType\r\n').length;
len += utf8
.encode(
'Content-Range: ${header.rangeUnit} ${item.toContentRange(totalLength)}/$totalLength\r\n\r\n')
.length;
len += 2; // CRLF
}
len += utf8.encode('--$boundary--\r\n').length;
return len;
}
@override
Stream<List<int>> bind(Stream<List<int>> stream) {
var ctrl = StreamController<List<int>>();
Future(() async {
var index = 0;
var enqueued = Queue<List<int>>();
var q = StreamQueue(stream);
Future<List<int>> absorb(int length) async {
var out = BytesBuilder();
while (out.length < length) {
var remaining = length - out.length;
while (out.length < length && enqueued.isNotEmpty) {
remaining = length - out.length;
var blob = enqueued.removeFirst();
if (blob.length > remaining) {
enqueued.addFirst(blob.skip(remaining).toList());
blob = blob.take(remaining).toList();
}
out.add(blob);
index += blob.length;
}
if (out.length < length && await q.hasNext) {
var blob = await q.next;
remaining = length - out.length;
if (blob.length > remaining) {
enqueued.addFirst(blob.skip(remaining).toList());
blob = blob.take(remaining).toList();
}
out.add(blob);
index += blob.length;
}
// If we get this far, and the stream is EMPTY, the user requested
// too many bytes.
if (out.length < length && enqueued.isEmpty && !(await q.hasNext)) {
throw StateError(
'The range denoted is bigger than the size of the input stream.');
}
}
return out.takeBytes();
}
for (var item in header.items) {
var chunk = BytesBuilder();
// Skip until we reach the start index.
while (index < item.start) {
var remaining = item.start - index;
await absorb(remaining);
}
// Next, absorb until we reach the end.
if (item.end == -1) {
while (enqueued.isNotEmpty) {
chunk.add(enqueued.removeFirst());
}
while (await q.hasNext) {
chunk.add(await q.next);
}
} else {
var remaining = item.end - index;
chunk.add(await absorb(remaining));
}
// Next, write the boundary and data.
ctrl.add(utf8.encode('--$boundary\r\n'));
ctrl.add(utf8.encode('Content-Type: $mimeType\r\n'));
ctrl.add(utf8.encode(
'Content-Range: ${header.rangeUnit} ${item.toContentRange(totalLength)}/$totalLength\r\n\r\n'));
ctrl.add(chunk.takeBytes());
ctrl.add(const [$cr, $lf]);
// If this range was unbounded, don't bother looping any further.
if (item.end == -1) break;
}
ctrl.add(utf8.encode('--$boundary--\r\n'));
await ctrl.close();
}).catchError((e) {
ctrl.addError(e as Object);
return null;
});
return ctrl.stream;
}
}
var _rnd = Random();
String _randomString(
{int length = 32,
String validChars =
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'}) {
var len = _rnd.nextInt((length - 10)) + 10;
var buf = StringBuffer();
while (buf.length < len) {
buf.writeCharCode(validChars.codeUnitAt(_rnd.nextInt(validChars.length)));
}
return buf.toString();
}