diff --git a/Makefile b/Makefile index 6719871c..07bb2b86 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ -CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include -objects := lib/src/angel_wings.o lib/src/wings_socket.o\ -lib/src/bind.o lib/src/util.o +override CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include \ +-I .dart_tool +objects := lib/src/angel_wings.o lib/src/wings_socket.o \ +lib/src/bind.o lib/src/util.o lib/src/http.o \ +.dart_tool/http-parser/http_parser.o .PHONY: distclean clean diff --git a/example/main.dart b/example/main.dart index 9c64ef37..b14aeae2 100644 --- a/example/main.dart +++ b/example/main.dart @@ -1,7 +1,26 @@ import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_static/angel_static.dart'; import 'package:angel_wings/angel_wings.dart'; +import 'package:file/local.dart'; +import 'package:logging/logging.dart'; main() async { var app = Angel(); var wings = AngelWings(app); -} \ No newline at end of file + var fs = LocalFileSystem(); + var vDir = CachingVirtualDirectory(app, fs, + source: fs.currentDirectory, allowDirectoryListing: true); + app.logger = Logger('wings') + ..onRecord.listen((rec) { + print(rec); + if (rec.error != null) print(rec.error); + if (rec.stackTrace != null) print(rec.stackTrace); + }); + + app.get('/', (req, res) => 'WINGS!!!'); + app.fallback(vDir.handleRequest); + app.fallback((req, res) => throw AngelHttpException.notFound()); + + await wings.startServer('127.0.0.1', 3000); + print('Listening at http://localhost:3000'); +} diff --git a/example/socket.dart b/example/socket.dart index 1bc5be84..5c827772 100644 --- a/example/socket.dart +++ b/example/socket.dart @@ -1,10 +1,27 @@ +import 'dart:convert'; +import 'dart:typed_data'; +import 'package:angel_framework/angel_framework.dart'; import 'package:angel_wings/angel_wings.dart'; main() async { + var app = Angel(); var socket = await WingsSocket.bind('127.0.0.1', 3000); + print('Listening at http://localhost:3000'); await for (var fd in socket) { - print('FD: $fd'); - closeNativeSocketDescriptor(fd); + var response = ''' +HTTP/1.1 200 Not Found\r +Date: Fri, 31 Dec 1999 23:59:59 GMT\r +server: wings-test\r\n\r +Nope, nothing's here! +\r\n\r +'''; + var bytes = utf8.encode(response); + var data = Uint8List.fromList(bytes); + var rq = await WingsRequestContext.from(app, fd); + print('Yay: $rq'); + print(rq.headers); + writeToNativeSocket(fd.fileDescriptor, data); + closeNativeSocketDescriptor(fd.fileDescriptor); } } diff --git a/lib/src/angel_wings.cc b/lib/src/angel_wings.cc index 497553f6..2b19cd6b 100644 --- a/lib/src/angel_wings.cc +++ b/lib/src/angel_wings.cc @@ -49,5 +49,7 @@ Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_sco result = Dart_WingsSocket_close; if (strcmp("Dart_WingsSocket_listen", cname) == 0) result = Dart_WingsSocket_listen; + if (strcmp("Dart_WingsSocket_parseHttp", cname) == 0) + result = Dart_WingsSocket_parseHttp; return result; } \ No newline at end of file diff --git a/lib/src/angel_wings.h b/lib/src/angel_wings.h index 51ef882d..65397365 100644 --- a/lib/src/angel_wings.h +++ b/lib/src/angel_wings.h @@ -2,6 +2,7 @@ #define ANGEL_WINGS_WINGS_H #include +#include #include "angel_wings.h" Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope); @@ -14,5 +15,7 @@ void Dart_WingsSocket_write(Dart_NativeArguments arguments); void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments); void Dart_WingsSocket_close(Dart_NativeArguments arguments); void Dart_WingsSocket_listen(Dart_NativeArguments arguments); +void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments); +void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message); #endif \ No newline at end of file diff --git a/lib/src/http.cc b/lib/src/http.cc new file mode 100644 index 00000000..ecf478ba --- /dev/null +++ b/lib/src/http.cc @@ -0,0 +1,142 @@ +#include +#include +#include "angel_wings.h" +#include "wings_socket.h" +using namespace wings; + +void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments) +{ + Dart_Port service_port = + Dart_NewNativePort("WingsHttpCallback", &wingsHttpCallback, true); + Dart_Handle send_port = Dart_NewSendPort(service_port); + Dart_SetReturnValue(arguments, send_port); +} + +void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message) +{ + int64_t fd = -1; + Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; + Dart_CObject *fdArg = message->value.as_array.values[1]; + +#define thePort (*((Dart_Port *)parser->data)) +#define sendInt(n) \ + Dart_CObject obj; \ + obj.type = Dart_CObject_kInt64; \ + obj.value.as_int64 = (n); \ + Dart_PostCObject(thePort, &obj); +#define sendString() \ + if (length > 0) \ + { \ + std::string str(at, length); \ + Dart_CObject obj; \ + obj.type = Dart_CObject_kString; \ + obj.value.as_string = (char *)str.c_str(); \ + Dart_PostCObject(thePort, &obj); \ + } + + if (fdArg->type == Dart_CObject_kInt32) + { + fd = (int64_t)fdArg->value.as_int32; + } + else + { + fd = fdArg->value.as_int64; + } + + if (fd != -1) + { + http_parser_settings settings; + + settings.on_message_begin = [](http_parser *parser) { + return 0; + }; + + settings.on_headers_complete = [](http_parser *parser) { + { + sendInt(0); + } + { + sendInt(parser->method); + } + return 0; + }; + + settings.on_message_complete = [](http_parser *parser) { + sendInt(1); + return 0; + }; + + settings.on_chunk_complete = [](http_parser *parser) { + return 0; + }; + + settings.on_chunk_header = [](http_parser *parser) { + return 0; + }; + + settings.on_url = [](http_parser *parser, const char *at, size_t length) { + sendString(); + return 0; + }; + + settings.on_header_field = [](http_parser *parser, const char *at, size_t length) { + sendString(); + return 0; + }; + + settings.on_header_value = [](http_parser *parser, const char *at, size_t length) { + sendString(); + return 0; + }; + + settings.on_body = [](http_parser *parser, const char *at, size_t length) { + sendString(); + return 0; + }; + + size_t len = 80 * 1024, nparsed = 0; + char buf[len]; + ssize_t recved = 0; + memset(buf, 0, sizeof(buf)); + // http_parser parser; + auto *parser = (http_parser *)malloc(sizeof(http_parser)); + http_parser_init(parser, HTTP_BOTH); + parser->data = &outPort; + + while ((recved = recv(fd, buf, len, 0)) >= 0) + { + if (false) // (isUpgrade) + { + // send_string(&parser, buf, (size_t)recved, 7, true); + } + else + { + /* Start up / continue the parser. + * Note we pass recved==0 to signal that EOF has been received. + */ + nparsed = http_parser_execute(parser, &settings, buf, recved); + + if (nparsed != recved) + { + // TODO: End it...! + } + else if (recved == 0) + { + break; + } + + // if ((isUpgrade = parser.upgrade) == 1) + // { + // send_notification(&parser, 6); + // } + // else if (nparsed != recved) + // { + // close(rq->sock); + // return; + // } + } + + // memset(buf, 0, len); + } + } +} \ No newline at end of file diff --git a/lib/src/util.cc b/lib/src/util.cc index e627b1c5..69b6b88c 100644 --- a/lib/src/util.cc +++ b/lib/src/util.cc @@ -1,3 +1,4 @@ +#include #include "angel_wings.h" #include "wings_socket.h" using namespace wings; @@ -25,7 +26,16 @@ void Dart_WingsSocket_getPort(Dart_NativeArguments arguments) void Dart_WingsSocket_write(Dart_NativeArguments arguments) { - // TODO: Actually do something. + int64_t fd; + void *data; + Dart_TypedData_Type type; + intptr_t len; + Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle dataHandle = Dart_GetNativeArgument(arguments, 1); + HandleError(Dart_IntegerToInt64(fdHandle, &fd)); + HandleError(Dart_TypedDataAcquireData(dataHandle, &type, &data, &len)); + write(fd, data, len); + HandleError(Dart_TypedDataReleaseData(dataHandle)); } void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments) diff --git a/lib/src/wings_driver.dart b/lib/src/wings_driver.dart index dc87e2ad..bc4bff0e 100644 --- a/lib/src/wings_driver.dart +++ b/lib/src/wings_driver.dart @@ -7,8 +7,8 @@ import 'wings_request.dart'; import 'wings_response.dart'; import 'wings_socket.dart'; -class AngelWings extends Driver { +class AngelWings extends Driver { factory AngelWings(Angel app) { return AngelWings._(app, WingsSocket.bind); } @@ -31,21 +31,19 @@ class AngelWings extends Driver createRequestContext(int request, int response) { - // TODO: implement createRequestContext - return null; + Future createRequestContext(WingsClientSocket request, int response) { + return WingsRequestContext.from(app, request); } @override - Future createResponseContext(int request, int response, + Future createResponseContext(WingsClientSocket request, int response, [WingsRequestContext correspondingRequest]) { - // TODO: implement createResponseContext - return null; + return Future.value(WingsResponseContext(request.fileDescriptor, correspondingRequest)); } @override - Stream createResponseStreamFromRawRequest(int request) { - return Stream.fromIterable([request]); + Stream createResponseStreamFromRawRequest(WingsClientSocket request) { + return Stream.fromIterable([request.fileDescriptor]); } @override diff --git a/lib/src/wings_request.dart b/lib/src/wings_request.dart index e890bc40..6b97e8e6 100644 --- a/lib/src/wings_request.dart +++ b/lib/src/wings_request.dart @@ -1,53 +1,173 @@ +import 'dart:async'; import 'dart:io'; -import 'package:angel_container/src/container.dart'; +import 'dart:isolate'; +import 'package:angel_container/angel_container.dart'; import 'package:angel_framework/angel_framework.dart'; +import 'package:mock_request/mock_request.dart'; +import 'wings_socket.dart'; -class WingsRequestContext extends RequestContext { - @override - // TODO: implement body - Stream> get body => null; +enum _ParseState { method, url, headerField, headerValue, body } + +final RegExp _straySlashes = new RegExp(r'(^/+)|(/+$)'); + +class WingsRequestContext extends RequestContext { + final WingsClientSocket rawRequest; + final Container container; + + final StreamController> _body = StreamController(); + List _cookies, __cookies; + final LockableMockHttpHeaders _headers = LockableMockHttpHeaders(); + final RawReceivePort _recv; + InternetAddress _remoteAddress; + String _method, _override, _path; + Uri _uri; @override - // TODO: implement container - Container get container => null; + Angel app; + + WingsRequestContext._(this.app, this.rawRequest, this._recv) + : container = app.container.createChild(); + + static const int DELETE = 0, + GET = 1, + HEAD = 2, + POST = 3, + PUT = 4, + CONNECT = 5, + OPTIONS = 6, + TRACE = 7, + COPY = 8, + LOCK = 9, + MKCOL = 10, + MOVE = 11, + PROPFIND = 12, + PROPPATCH = 13, + SEARCH = 14, + UNLOCK = 15, + BIND = 16, + REBIND = 17, + UNBIND = 18, + ACL = 19, + REPORT = 20, + MKACTIVITY = 21, + CHECKOUT = 22, + MERGE = 23, + MSEARCH = 24, + NOTIFY = 25, + SUBSCRIBE = 26, + UNSUBSCRIBE = 27, + PATCH = 28, + PURGE = 29, + MKCALENDAR = 30, + LINK = 31, + UNLINK = 32, + SOURCE = 33; + + static String methodToString(int method) { + switch (method) { + case DELETE: + return 'DELETE'; + case GET: + return 'GET'; + case HEAD: + return 'HEAD'; + case POST: + return 'POST'; + case PUT: + return 'PUT'; + case CONNECT: + return 'CONNECT'; + case OPTIONS: + return 'OPTIONS'; + case PATCH: + return 'PATCH'; + case PURGE: + return 'PURGE'; + default: + throw new ArgumentError('Unknown method $method.'); + } + } + + static Future from(Angel app, WingsClientSocket socket) { + var state = _ParseState.url; + var c = Completer(); + var recv = RawReceivePort(); + var rq = WingsRequestContext._(app, socket, recv); + rq._remoteAddress = socket.remoteAddress; + String lastHeader; + recv.handler = (e) { + if (state == _ParseState.url) { + rq._uri = Uri.parse(e as String); + var path = rq._uri.path.replaceAll(_straySlashes, ''); + if (path.isEmpty) path = '/'; + rq._path = path; + state = _ParseState.headerField; + } else if (state == _ParseState.headerField) { + if (e == 0) { + state = _ParseState.method; + } else { + lastHeader = e as String; + state = _ParseState.headerValue; + } + } else if (state == _ParseState.headerValue) { + if (e == 0) { + state = _ParseState.method; + } else { + if (lastHeader != null) { + if (lastHeader == 'cookie') { + rq.__cookies.add(Cookie.fromSetCookieValue(e as String)); + } else { + rq._headers.add(lastHeader, e as String); + } + lastHeader = null; + } + } + state = _ParseState.headerField; + } else if (state == _ParseState.method) { + rq._method = methodToString(e as int); + state = _ParseState.body; + c.complete(rq); + } else if (state == _ParseState.body) { + if (e == 1) { + rq._body.close(); + } else { + rq._body.add(e as List); + } + } + }; + wingsParseHttp().send([recv.sendPort, socket.fileDescriptor]); + return c.future; + } @override - // TODO: implement cookies - List get cookies => null; + Stream> get body => _body.stream; @override - // TODO: implement headers - HttpHeaders get headers => null; + List get cookies => _cookies ??= List.unmodifiable(__cookies); @override - // TODO: implement hostname - String get hostname => null; + HttpHeaders get headers => _headers; @override - // TODO: implement method - String get method => null; + String get hostname => headers.value('host'); @override - // TODO: implement originalMethod - String get originalMethod => null; + String get method => _override ??= + (headers.value('x-http-method-override')?.toUpperCase() ?? _method); @override - // TODO: implement path - String get path => null; + String get originalMethod => _method; @override - // TODO: implement rawRequest - int get rawRequest => null; + String get path => _path; @override - // TODO: implement remoteAddress - InternetAddress get remoteAddress => null; + InternetAddress get remoteAddress => _remoteAddress; @override // TODO: implement session HttpSession get session => null; @override - // TODO: implement uri - Uri get uri => null; + Uri get uri => _uri; } diff --git a/lib/src/wings_response.dart b/lib/src/wings_response.dart index 2b9b63ab..cbc00794 100644 --- a/lib/src/wings_response.dart +++ b/lib/src/wings_response.dart @@ -1,50 +1,213 @@ import 'dart:async'; - +import 'dart:convert'; import 'dart:io'; - +import 'dart:typed_data'; import 'package:angel_framework/angel_framework.dart'; +import 'package:charcode/ascii.dart'; +import 'wings_request.dart'; +import 'wings_socket.dart'; -class WingsResponseContext extends ResponseContext { +class WingsResponseContext extends ResponseContext { @override - void add(List event) { - // TODO: implement add + final WingsRequestContext correspondingRequest; + + LockableBytesBuilder _buffer; + + @override + final int rawResponse; + + bool _isDetached = false, _isClosed = false, _streamInitialized = false; + + WingsResponseContext(this.rawResponse, [this.correspondingRequest]); + + Iterable __allowedEncodings; + + Iterable get _allowedEncodings { + return __allowedEncodings ??= correspondingRequest.headers + .value('accept-encoding') + ?.split(',') + ?.map((s) => s.trim()) + ?.where((s) => s.isNotEmpty) + ?.map((str) { + // Ignore quality specifications in accept-encoding + // ex. gzip;q=0.8 + if (!str.contains(';')) return str; + return str.split(';')[0]; + }); + } + + bool _openStream() { + if (!_streamInitialized) { + // If this is the first stream added to this response, + // then add headers, status code, etc. + var outHeaders = {}; + var statusLine = + utf8.encode('HTTP/1.1 $statusCode').followedBy([$cr, $lf]); + writeToNativeSocket(rawResponse, Uint8List.fromList(statusLine.toList())); + + headers.forEach((k, v) => outHeaders[k] = v); + + if (headers.containsKey('content-length')) { + var l = int.tryParse(headers['content-length']); + if (l != null) { + outHeaders['content-length'] = l.toString(); + } + } + + if (contentType != null) + outHeaders['content-type'] = contentType.toString(); + + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) + encoder = encoders[encodingName]; + else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + outHeaders['content-encoding'] = key; + break; + } + } + } + } + + void _wh(String k, String v) { + var headerLine = + utf8.encode('$k: ${Uri.encodeComponent(v)}').followedBy([$cr, $lf]); + writeToNativeSocket( + rawResponse, Uint8List.fromList(headerLine.toList())); + } + + outHeaders.forEach(_wh); + + for (var c in cookies) { + _wh('set-cookie', c.toString()); + } + + writeToNativeSocket(rawResponse, Uint8List.fromList([$cr, $lf])); + + //_isClosed = true; + return _streamInitialized = true; + } + + return false; } @override Future addStream(Stream> stream) { - // TODO: implement addStream - return null; + if (_isClosed && isBuffered) throw ResponseContext.closed(); + _openStream(); + + Stream> output = stream; + + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) + encoder = encoders[encodingName]; + else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + output = encoders[key].bind(output); + break; + } + } + } + } + + return output.forEach((buf) { + writeToNativeSocket( + rawResponse, buf is Uint8List ? buf : Uint8List.fromList(buf)); + }); } @override - // TODO: implement buffer - BytesBuilder get buffer => null; + void add(List data) { + if (_isClosed && isBuffered) + throw ResponseContext.closed(); + else if (!isBuffered) { + if (!_isClosed) { + _openStream(); - @override - // TODO: implement correspondingRequest - RequestContext get correspondingRequest => null; + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; - @override - FutureOr detach() { - // TODO: implement detach - return null; + if (encoders.containsKey(encodingName)) + encoder = encoders[encodingName]; + else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + data = encoders[key].convert(data); + break; + } + } + } + } + + writeToNativeSocket( + rawResponse, data is Uint8List ? data : Uint8List.fromList(data)); + } + } else + buffer.add(data); } @override - // TODO: implement isBuffered - bool get isBuffered => null; + Future close() { + if (!_isDetached) { + if (!_isClosed) { + if (!isBuffered) { + try { + _openStream(); + closeNativeSocketDescriptor(rawResponse); + } catch (_) { + // This only seems to occur on `MockHttpRequest`, but + // this try/catch prevents a crash. + } + } else { + _buffer.lock(); + } + + _isClosed = true; + } + + super.close(); + } + return new Future.value(); + } @override - // TODO: implement isOpen - bool get isOpen => null; + BytesBuilder get buffer => _buffer; @override - // TODO: implement rawResponse - get rawResponse => null; + int detach() { + _isDetached = true; + return rawResponse; + } + + @override + bool get isBuffered => _buffer != null; + + @override + bool get isOpen => !_isClosed && !_isDetached; @override void useBuffer() { - // TODO: implement useBuffer + _buffer = LockableBytesBuilder(); } - -} \ No newline at end of file +} diff --git a/lib/src/wings_socket.cc b/lib/src/wings_socket.cc index bd532b1b..35de2883 100644 --- a/lib/src/wings_socket.cc +++ b/lib/src/wings_socket.cc @@ -46,7 +46,7 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id, WingsSocket *socket = nullptr; Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; - Dart_CObject* ptrArg = message->value.as_array.values[1]; + Dart_CObject *ptrArg = message->value.as_array.values[1]; if (ptrArg->type == Dart_CObject_kInt32) { @@ -67,9 +67,34 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id, if ((sock = accept(socket->sockfd, &addr, &len)) != -1) { + char addrBuf[INET6_ADDRSTRLEN] = {0}; + + if (addr.sa_family == AF_INET6) + { + auto as6 = (sockaddr_in6*) &addr; + inet_ntop(addr.sa_family, &(as6->sin6_addr), addrBuf, len); + } + else + { + auto as4 = (sockaddr_in*) &addr; + inet_ntop(AF_INET, &(as4->sin_addr), addrBuf, len); + } + + Dart_CObject fdObj; + fdObj.type = Dart_CObject_kInt64; + fdObj.value.as_int64 = sock; + + Dart_CObject addrObj; + addrObj.type = Dart_CObject_kString; + addrObj.value.as_string = addrBuf; + + Dart_CObject *values[2] = {&fdObj, &addrObj}; + Dart_CObject obj; - obj.type = Dart_CObject_kInt64; - obj.value.as_int64 = sock; + obj.type = Dart_CObject_kArray; + obj.value.as_array.length = 2; + obj.value.as_array.values = values; + Dart_PostCObject(outPort, &obj); // Dispatch the fd to the next listener. // auto &ports = socket->sendPorts; diff --git a/lib/src/wings_socket.dart b/lib/src/wings_socket.dart index e5f29d93..8b875f43 100644 --- a/lib/src/wings_socket.dart +++ b/lib/src/wings_socket.dart @@ -32,8 +32,17 @@ SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen'; void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close'; -class WingsSocket extends Stream { - final StreamController _ctrl = StreamController(); +SendPort wingsParseHttp() native 'Dart_WingsSocket_parseHttp'; + +class WingsClientSocket { + final int fileDescriptor; + final InternetAddress remoteAddress; + + WingsClientSocket(this.fileDescriptor, this.remoteAddress); +} + +class WingsSocket extends Stream { + final StreamController _ctrl = StreamController(); SendPort _acceptor; final int _pointer; final RawReceivePort _recv; @@ -44,7 +53,8 @@ class WingsSocket extends Stream { _acceptor = wingsSocketListen(_pointer); _recv.handler = (h) { if (!_ctrl.isClosed) { - _ctrl.add(h as int); + _ctrl.add( + WingsClientSocket(h[0] as int, InternetAddress(h[1] as String))); _acceptor.send([_recv.sendPort, _pointer]); } }; @@ -91,8 +101,11 @@ class WingsSocket extends Stream { int get port => _port ??= getWingsServerSocketPort(_pointer); @override - StreamSubscription listen(void Function(int event) onData, - {Function onError, void Function() onDone, bool cancelOnError}) { + StreamSubscription listen( + void Function(WingsClientSocket event) onData, + {Function onError, + void Function() onDone, + bool cancelOnError}) { return _ctrl.stream .listen(onData, onError: onError, cancelOnError: cancelOnError); }