platform/packages/range_header/lib/src/converter.dart

172 lines
5 KiB
Dart
Raw Normal View History

2021-05-01 02:48:36 +00:00
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' show BytesBuilder;
import 'dart:math';
import 'package:async/async.dart';
import 'package:charcode/ascii.dart';
import 'range_header.dart';
/// A [StreamTransformer] that uses a parsed [RangeHeader] and transforms an input stream
/// into one compatible with the `multipart/byte-ranges` specification.
class RangeHeaderTransformer
extends StreamTransformerBase<List<int>, List<int>> {
final RangeHeader header;
final String boundary, mimeType;
final int totalLength;
RangeHeaderTransformer(this.header, this.mimeType, this.totalLength,
2021-05-01 03:39:09 +00:00
{String? boundary})
2021-05-18 11:58:51 +00:00
: boundary = boundary ?? _randomString() {
2021-05-01 03:39:09 +00:00
if (header.items.isEmpty) {
2021-05-18 11:58:51 +00:00
throw ArgumentError('`header` cannot be null or empty.');
2021-05-01 02:48:36 +00:00
}
}
/// Computes the content length that will be written to a response, given a stream of the given [totalFileSize].
int computeContentLength(int totalFileSize) {
2021-05-18 11:58:51 +00:00
var len = 0;
2021-05-01 02:48:36 +00:00
for (var item in header.items) {
if (item.start == -1) {
if (item.end == -1) {
len += totalFileSize;
} else {
//len += item.end + 1;
len += item.end + 1;
}
} else if (item.end == -1) {
len += totalFileSize - item.start;
//len += totalFileSize - item.start - 1;
} else {
len += item.end - item.start;
}
// Take into consideration the fact that delimiters are written.
len += utf8.encode('--$boundary\r\n').length;
len += utf8.encode('Content-Type: $mimeType\r\n').length;
len += utf8
.encode(
'Content-Range: ${header.rangeUnit} ${item.toContentRange(totalLength)}/$totalLength\r\n\r\n')
.length;
len += 2; // CRLF
}
len += utf8.encode('--$boundary--\r\n').length;
return len;
}
@override
Stream<List<int>> bind(Stream<List<int>> stream) {
2021-05-18 11:58:51 +00:00
var ctrl = StreamController<List<int>>();
2021-05-01 02:48:36 +00:00
2021-05-18 11:58:51 +00:00
Future(() async {
2021-05-01 02:48:36 +00:00
var index = 0;
2021-05-18 11:58:51 +00:00
var enqueued = Queue<List<int>>();
var q = StreamQueue(stream);
2021-05-01 02:48:36 +00:00
Future<List<int>> absorb(int length) async {
2021-05-18 11:58:51 +00:00
var out = BytesBuilder();
2021-05-01 02:48:36 +00:00
while (out.length < length) {
var remaining = length - out.length;
while (out.length < length && enqueued.isNotEmpty) {
remaining = length - out.length;
var blob = enqueued.removeFirst();
if (blob.length > remaining) {
enqueued.addFirst(blob.skip(remaining).toList());
blob = blob.take(remaining).toList();
}
out.add(blob);
index += blob.length;
}
if (out.length < length && await q.hasNext) {
var blob = await q.next;
remaining = length - out.length;
if (blob.length > remaining) {
enqueued.addFirst(blob.skip(remaining).toList());
blob = blob.take(remaining).toList();
}
out.add(blob);
index += blob.length;
}
// If we get this far, and the stream is EMPTY, the user requested
// too many bytes.
if (out.length < length && enqueued.isEmpty && !(await q.hasNext)) {
2021-05-18 11:58:51 +00:00
throw StateError(
2021-05-01 02:48:36 +00:00
'The range denoted is bigger than the size of the input stream.');
}
}
return out.takeBytes();
}
for (var item in header.items) {
2021-05-18 11:58:51 +00:00
var chunk = BytesBuilder();
2021-05-01 02:48:36 +00:00
// Skip until we reach the start index.
while (index < item.start) {
var remaining = item.start - index;
await absorb(remaining);
}
// Next, absorb until we reach the end.
if (item.end == -1) {
2021-05-18 11:58:51 +00:00
while (enqueued.isNotEmpty) {
chunk.add(enqueued.removeFirst());
}
while (await q.hasNext) {
chunk.add(await q.next);
}
2021-05-01 02:48:36 +00:00
} else {
var remaining = item.end - index;
chunk.add(await absorb(remaining));
}
// Next, write the boundary and data.
ctrl.add(utf8.encode('--$boundary\r\n'));
ctrl.add(utf8.encode('Content-Type: $mimeType\r\n'));
ctrl.add(utf8.encode(
'Content-Range: ${header.rangeUnit} ${item.toContentRange(totalLength)}/$totalLength\r\n\r\n'));
ctrl.add(chunk.takeBytes());
ctrl.add(const [$cr, $lf]);
// If this range was unbounded, don't bother looping any further.
if (item.end == -1) break;
}
ctrl.add(utf8.encode('--$boundary--\r\n'));
2021-05-18 11:58:51 +00:00
await ctrl.close();
}).catchError((e) {
ctrl.addError(e as Object);
return null;
});
2021-05-01 02:48:36 +00:00
return ctrl.stream;
}
}
2021-05-18 11:58:51 +00:00
var _rnd = Random();
2021-05-01 02:48:36 +00:00
String _randomString(
2021-05-18 11:58:51 +00:00
{int length = 32,
String validChars =
2021-05-01 02:48:36 +00:00
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'}) {
var len = _rnd.nextInt((length - 10)) + 10;
2021-05-18 11:58:51 +00:00
var buf = StringBuffer();
2021-05-01 02:48:36 +00:00
2021-05-18 11:58:51 +00:00
while (buf.length < len) {
2021-05-01 02:48:36 +00:00
buf.writeCharCode(validChars.codeUnitAt(_rnd.nextInt(validChars.length)));
2021-05-18 11:58:51 +00:00
}
2021-05-01 02:48:36 +00:00
return buf.toString();
}