commit 6a16a057fcca5c39af0e444b43ecab2548d8d91d Author: Tobe O Date: Tue Jul 3 19:16:29 2018 -0400 :fire: diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..2e7f80df --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# See https://www.dartlang.org/guides/libraries/private-files + +# Files and directories created by pub +.dart_tool/ +.packages +build/ +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ + +# Avoid committing generated Javascript files: +*.dart.js +*.info.json # Produced by the --dump-info flag. +*.js # When generated by dart2js. Don't specify *.js if your + # project includes source files written in JavaScript. +*.js_ +*.js.deps +*.js.map \ No newline at end of file diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 00000000..eae1e42a --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,3 @@ +analyzer: + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/example/main.dart b/example/main.dart new file mode 100644 index 00000000..8d0f950b --- /dev/null +++ b/example/main.dart @@ -0,0 +1,28 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_wings/angel_wings.dart'; + +main() async { + for (int i = 1; i < Platform.numberOfProcessors; i++) { + var onError = new ReceivePort(); + Isolate.spawn(isolateMain, i, onError: onError.sendPort); + onError.listen((e) => Zone.current + .handleUncaughtError(e[0], new StackTrace.fromString(e[1].toString()))); + } + + isolateMain(0); +} + +void isolateMain(int id) { + var app = new Angel(); + var wings = new AngelWings(app, shared: true); + + app.get('/', 'Hello, native world!'); + + wings.startServer('127.0.0.1', 3000).then((_) { + print( + 'Instance #$id listening at http://${wings.address.address}:${wings.port}'); + }); +} diff --git a/lib/angel_wings.dart b/lib/angel_wings.dart new file mode 100644 index 00000000..5fb2763a --- /dev/null +++ b/lib/angel_wings.dart @@ -0,0 +1,21 @@ +library angel_wings; + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:isolate'; +import 'dart:typed_data'; +import 'dart-ext:src/wings'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:body_parser/body_parser.dart'; +import 'package:combinator/combinator.dart'; +import 'package:http_parser/http_parser.dart'; +import 'package:mock_request/mock_request.dart'; +import 'package:pool/pool.dart'; +import 'package:pooled_map/pooled_map.dart'; +import 'package:stack_trace/stack_trace.dart'; +import 'package:tuple/tuple.dart'; +import 'package:uuid/uuid.dart'; +part 'src/wings_request.dart'; +part 'src/wings_response.dart'; +part 'src/wings.dart'; diff --git a/lib/src/bind_socket.cc b/lib/src/bind_socket.cc new file mode 100644 index 00000000..995ef857 --- /dev/null +++ b/lib/src/bind_socket.cc @@ -0,0 +1,199 @@ +#include +#include +#include +#include "wings.h" + +std::vector serverInfoVector; +std::mutex serverInfoVectorMutex; + +void wings_BindSocket(Dart_NativeArguments arguments) +{ + // Uint8List address, String addressString, int port, int backlog, bool shared + Dart_Handle addressHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle addressStringHandle = Dart_GetNativeArgument(arguments, 1); + Dart_Handle portHandle = Dart_GetNativeArgument(arguments, 2); + Dart_Handle backlogHandle = Dart_GetNativeArgument(arguments, 3); + Dart_Handle sharedHandle = Dart_GetNativeArgument(arguments, 4); + Dart_TypedData_Type addressType; + void *addressData; + intptr_t addressLength; + const char *addressString; + uint64_t port, backlog; + bool shared; + + // Read the arguments... + HandleError(Dart_TypedDataAcquireData(addressHandle, &addressType, &addressData, &addressLength)); + HandleError(Dart_TypedDataReleaseData(addressHandle)); + HandleError(Dart_StringToCString(addressStringHandle, &addressString)); + HandleError(Dart_IntegerToUint64(portHandle, &port)); + HandleError(Dart_IntegerToUint64(backlogHandle, &backlog)); + HandleError(Dart_BooleanValue(sharedHandle, &shared)); + + // See if there is already a server bound to the port. + long existingIndex = -1; + std::string addressStringInstance(addressString); + std::lock_guard lock(serverInfoVectorMutex); + + if (shared) + { + for (unsigned long i = 0; i < serverInfoVector.size(); i++) + { + WingsServerInfo *server_info = serverInfoVector.at(i); + + if (server_info->addressString == addressStringInstance && server_info->port == port) + { + existingIndex = (long) i; + break; + } + } + } + + if (existingIndex > -1) + { + // We found an existing socket, just return a reference to it. + Dart_SetReturnValue(arguments, Dart_NewIntegerFromUint64(existingIndex)); + return; + } + else + { + // There's no existing server, so bind a new one, and add it to the serverInfoVector. +#ifndef WIN32 + int sockfd; +#else + WSADATA wsaData; + SOCKET ConnectSocket = INVALID_SOCKET; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) + { + Dart_Handle errorHandle = Dart_NewList(2); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("WSAStartup failed.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewInteger(iResult)); + Dart_ThrowException(errorHandle); + return 1; + } + + // TODO: Rest of Windows config: + // https://docs.microsoft.com/en-us/windows/desktop/winsock/complete-client-code +#endif + + if (addressLength == 4) + { + // IPv4 + sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + } + else + { + // IPv6 + sockfd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + } + + if (sockfd < 0) + { + Dart_Handle errorHandle = Dart_NewList(3); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to create socket.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno))); + Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno)); + Dart_ThrowException(errorHandle); + return; + } + + int i = 1; + int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); + + if (ret < 0) + { + + Dart_Handle errorHandle = Dart_NewList(3); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Cannot reuse address for socket.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno))); + Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno)); + Dart_ThrowException(errorHandle); + return; + } + + /* + ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i)); + + if (ret < 0) + { + Dart_ThrowException(Dart_NewStringFromCString("Cannot reuse port for socket.")); + return; + } + */ + + if (addressLength > 4) + { + struct sockaddr_in6 v6 + { + }; + memset(&v6, 0, sizeof(v6)); + v6.sin6_family = AF_INET6; + v6.sin6_port = htons((uint16_t)port); + ret = inet_pton(v6.sin6_family, addressString, &v6.sin6_addr.s6_addr); + + if (ret >= 0) + ret = bind(sockfd, (const sockaddr *)&v6, sizeof(v6)); + } + else + { + struct sockaddr_in v4 + { + }; + memset(&v4, 0, sizeof(v4)); + v4.sin_family = AF_INET; + v4.sin_port = htons((uint16_t)port); + v4.sin_addr.s_addr = inet_addr(addressString); + + if (ret >= 0) + ret = bind(sockfd, (const sockaddr *)&v4, sizeof(v4)); + //ret = inet_pton(family, host, &v4.sin_addr); + } + + /*if (ret < 1) { + Dart_ThrowException(Dart_NewStringFromCString("Cannot parse IP address.")); + return; + }*/ + + //if (bind(sock, (const sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { + if (ret < 0) + { + Dart_Handle errorHandle = Dart_NewList(3); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to bind socket.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno))); + Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno)); + Dart_ThrowException(errorHandle); + return; + } + + if (listen(sockfd, SOMAXCONN) < 0) + { + Dart_Handle errorHandle = Dart_NewList(3); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to listen to bound socket.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno))); + Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno)); + Dart_ThrowException(errorHandle); + return; + } + + if (listen(sockfd, (int)backlog) < 0) + { + Dart_Handle errorHandle = Dart_NewList(3); + Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to listen to bound socket.")); + Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno))); + Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno)); + Dart_ThrowException(errorHandle); + return; + } + + // Now that we've bound the socket, let's add it to the list. + auto *server_info = new WingsServerInfo; + server_info->sockfd = sockfd; + server_info->port = port; + server_info->ipv6 = addressLength > 4; + server_info->addressString += addressStringInstance; + Dart_SetReturnValue(arguments, Dart_NewIntegerFromUint64(serverInfoVector.size())); + serverInfoVector.push_back(server_info); + } +} \ No newline at end of file diff --git a/lib/src/http_listener.cc b/lib/src/http_listener.cc new file mode 100644 index 00000000..534466b6 --- /dev/null +++ b/lib/src/http_listener.cc @@ -0,0 +1,50 @@ +#include +#include +#include "wings.h" +#include "wings_thread.h" + +void handleMessage(Dart_Port destPortId, Dart_CObject *message); + +void wings_StartHttpListener(Dart_NativeArguments arguments) +{ + Dart_Port port = Dart_NewNativePort("angel_wings", handleMessage, true); + Dart_SetReturnValue(arguments, Dart_NewSendPort(port)); +} + +int64_t get_int(Dart_CObject *obj) +{ + if (obj == nullptr) + return 0; + switch (obj->type) + { + case Dart_CObject_kInt32: + return (int64_t)obj->value.as_int32; + case Dart_CObject_kInt64: + return obj->value.as_int64; + default: + return 0; + } +} + +void handleMessage(Dart_Port destPortId, Dart_CObject *message) +{ + // We always expect an array to be sent. + Dart_CObject_Type firstType = message->value.as_array.values[0]->type; + + // If it's a SendPort, then start a new thread that listens for incoming connections. + if (firstType == Dart_CObject_kSendPort) + { + std::lock_guard lock(serverInfoVectorMutex); + auto *threadInfo = new wings_thread_info; + threadInfo->port = message->value.as_array.values[0]->value.as_send_port.id; + threadInfo->serverInfo = serverInfoVector.at((unsigned long)get_int(message->value.as_array.values[1])); + std::thread workerThread(wingsThreadMain, threadInfo); + workerThread.detach(); + } + else if (firstType == Dart_CObject_kBool) + { + // The Dart world is trying to close this port. + Dart_Port port = message->value.as_array.values[1]->value.as_send_port.id; + Dart_CloseNativePort(port); + } +} \ No newline at end of file diff --git a/lib/src/libwings.build_native.yaml b/lib/src/libwings.build_native.yaml new file mode 100644 index 00000000..9fd5b606 --- /dev/null +++ b/lib/src/libwings.build_native.yaml @@ -0,0 +1,18 @@ +include: + - angel_wings|lib/src/wings.h + - angel_wings|lib/src/wings_thread.h +sources: + - angel_wings|lib/src/bind_socket.cc + - angel_wings|lib/src/http_listener.cc + - angel_wings|lib/src/send.cc + - angel_wings|lib/src/util.cc + - angel_wings|lib/src/wings.cc + - angel_wings|lib/src/worker_thread.cc +third_party: + http_parser: + git: https://github.com/nodejs/http-parser.git + commit: 5b76466 + include: + - . + sources: + - http_parser.c \ No newline at end of file diff --git a/lib/src/send.cc b/lib/src/send.cc new file mode 100644 index 00000000..45c0414d --- /dev/null +++ b/lib/src/send.cc @@ -0,0 +1,22 @@ +#include "wings.h" + +void wings_CloseSocket(Dart_NativeArguments arguments) +{ + Dart_Handle sockfdHandle = Dart_GetNativeArgument(arguments, 0); + uint64_t sockfd; + HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd)); + close((int)sockfd); +} + +void wings_Send(Dart_NativeArguments arguments) +{ + Dart_Handle sockfdHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle dataHandle = Dart_GetNativeArgument(arguments, 1); + uint64_t sockfd; + Dart_TypedData_Type dataType; + void *dataBytes; + intptr_t dataLength; + HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd)); + HandleError(Dart_TypedDataAcquireData(dataHandle, &dataType, &dataBytes, &dataLength)); + write((int)sockfd, dataBytes, (size_t)dataLength); +} \ No newline at end of file diff --git a/lib/src/util.cc b/lib/src/util.cc new file mode 100644 index 00000000..be5017d5 --- /dev/null +++ b/lib/src/util.cc @@ -0,0 +1,36 @@ +#include +#include "wings.h" + +void wings_AddressToString(Dart_NativeArguments arguments) { + char *address; + void *data; + intptr_t length; + bool ipv6; + Dart_TypedData_Type type; + + Dart_Handle address_handle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle ipv6_handle = Dart_GetNativeArgument(arguments, 1); + HandleError(Dart_BooleanValue(ipv6_handle, &ipv6)); + sa_family_t family; + + if (ipv6) { + family = AF_INET6; + address = (char *) Dart_ScopeAllocate(INET6_ADDRSTRLEN); + } else { + family = AF_INET; + address = (char *) Dart_ScopeAllocate(INET_ADDRSTRLEN); + } + + HandleError(Dart_TypedDataAcquireData(address_handle, &type, &data, &length)); + auto *ptr = inet_ntop(family, data, address, INET_ADDRSTRLEN); + HandleError(Dart_TypedDataReleaseData(address_handle)); + + if (ptr == nullptr) { + if (ipv6) + Dart_ThrowException(Dart_NewStringFromCString("Invalid IPV6 address.")); + else + Dart_ThrowException(Dart_NewStringFromCString("Invalid IPV4 address.")); + } else { + Dart_SetReturnValue(arguments, Dart_NewStringFromCString(address)); + } +} \ No newline at end of file diff --git a/lib/src/wings.cc b/lib/src/wings.cc new file mode 100644 index 00000000..41ab5b15 --- /dev/null +++ b/lib/src/wings.cc @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include "wings.h" + +// Forward declaration of ResolveName function. +Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope); + +// The name of the initialization function is the extension name followed +// by _Init. +DART_EXPORT Dart_Handle wings_Init(Dart_Handle parent_library) +{ + if (Dart_IsError(parent_library)) + return parent_library; + + Dart_Handle result_code = + Dart_SetNativeResolver(parent_library, ResolveName, NULL); + if (Dart_IsError(result_code)) + return result_code; + + return Dart_Null(); +} + +Dart_Handle HandleError(Dart_Handle handle) +{ + if (Dart_IsError(handle)) + Dart_PropagateError(handle); + return handle; +} + +Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope) +{ + // If we fail, we return NULL, and Dart throws an exception. + if (!Dart_IsString(name)) + return NULL; + Dart_NativeFunction result = NULL; + const char *cname; + HandleError(Dart_StringToCString(name, &cname)); + + if (strcmp(cname, "AddressToString") == 0) + { + result = wings_AddressToString; + } + else if (strcmp(cname, "BindSocket") == 0) + { + result = wings_BindSocket; + } + else if (strcmp(cname, "CloseSocket") == 0) + { + result = wings_CloseSocket; + } + else if (strcmp(cname, "Send") == 0) + { + result = wings_Send; + } + else if (strcmp(cname, "StartHttpListener") == 0) + { + result = wings_StartHttpListener; + } + + return result; +} \ No newline at end of file diff --git a/lib/src/wings.dart b/lib/src/wings.dart new file mode 100644 index 00000000..67faf25a --- /dev/null +++ b/lib/src/wings.dart @@ -0,0 +1,451 @@ +part of angel_wings; + +class AngelWings { + static const int messageBegin = 0, + messageComplete = 1, + url = 2, + headerField = 3, + headerValue = 4, + body = 5, + upgrade = 6, + upgradedMessage = 7; + + static const int DELETE = 0, + GET = 1, + HEAD = 2, + POST = 3, + PUT = 4, + CONNECT = 5, + OPTIONS = 6, + TRACE = 7, + COPY = 8, + LOCK = 9, + MKCOL = 10, + MOVE = 11, + PROPFIND = 12, + PROPPATCH = 13, + SEARCH = 14, + UNLOCK = 15, + BIND = 16, + REBIND = 17, + UNBIND = 18, + ACL = 19, + REPORT = 20, + MKACTIVITY = 21, + CHECKOUT = 22, + MERGE = 23, + MSEARCH = 24, + NOTIFY = 25, + SUBSCRIBE = 26, + UNSUBSCRIBE = 27, + PATCH = 28, + PURGE = 29, + MKCALENDAR = 30, + LINK = 31, + UNLINK = 32, + SOURCE = 33; + + static String methodToString(int method) { + switch (method) { + case DELETE: + return 'DELETE'; + case GET: + return 'GET'; + case HEAD: + return 'HEAD'; + case POST: + return 'POST'; + case PUT: + return 'PUT'; + case CONNECT: + return 'CONNECT'; + case OPTIONS: + return 'OPTIONS'; + case PATCH: + return 'PATCH'; + case PURGE: + return 'PURGE'; + default: + throw new ArgumentError('Unknown method $method.'); + } + } + + final Angel app; + final bool shared; + final bool useZone; + + final RawReceivePort _recv = new RawReceivePort(); + final Map _sessions = {}; + final PooledMap _staging = + new PooledMap(); + final Uuid _uuid = new Uuid(); + InternetAddress _address; + int _port; + SendPort _sendPort; + + static int _bindSocket(Uint8List address, String addressString, int port, + int backlog, bool shared) native "BindSocket"; + + static SendPort _startHttpListener() native "StartHttpListener"; + + AngelWings(this.app, {this.shared: false, this.useZone: true}) { + _recv.handler = _handleMessage; + } + + InternetAddress get address => _address; + + int get port => _port; + + Future startServer([host, int port, int backlog = 10]) { + Future _addr = host is InternetAddress + ? host + : InternetAddress.lookup(host?.toString() ?? '127.0.0.1').then((list) => + list.isNotEmpty + ? list.first + : throw new StateError('IP lookup failed.')); + + return _addr.then((address) { + try { + var serverInfoIndex = _bindSocket( + new Uint8List.fromList(address.rawAddress), + address.address, + port ?? 0, + backlog, + shared); + _sendPort = _startHttpListener(); + _sendPort.send([_recv.sendPort, serverInfoIndex]); + _address = address; + _port = port; + } on List catch (osError) { + if (osError.length == 3) { + throw new SocketException(osError[0] as String, + osError: new OSError(osError[1] as String, osError[2] as int)); + } else { + throw new SocketException('Could not start Wings server.', + osError: new OSError(osError[0] as String, osError[1] as int)); + } + } on String catch (message) { + throw new SocketException(message); + } + }); + } + + Future close() { + _sendPort.send([true, _sendPort]); + _recv.close(); + return new Future.value(); + } + + void _handleMessage(x) { + if (x is String) { + close(); + throw new StateError(x); + } else if (x is List && x.length >= 2) { + int sockfd = x[0], command = x[1]; + + WingsRequestContext _newRequest() => + new WingsRequestContext._(this, sockfd, app); + //print(x); + + switch (command) { + case messageBegin: + _staging.putIfAbsent(sockfd, _newRequest); + break; + case messageComplete: + // (sockfd, method, major, minor, addrBytes) + _staging.update(sockfd, (rq) { + rq._method = methodToString(x[2] as int); + rq._addressBytes = x[5] as Uint8List; + return rq; + }, defaultValue: _newRequest).then(_handleRequest); + break; + case body: + _staging.update(sockfd, (rq) { + (rq._body ??= new StreamController()) + .add(x[2] as Uint8List); + return rq; + }, defaultValue: _newRequest); + break; + //case upgrade: + // TODO: Handle WebSockets...? + // if (onUpgrade != null) onUpgrade(sockfd); + // break; + //case upgradedMessage: + // TODO: Handle upgrade + // onUpgradedMessage(sockfd, x[2]); + // break; + case url: + _staging.update(sockfd, (rq) => rq..__url = x[2] as String, + defaultValue: _newRequest); + break; + case headerField: + _staging.update(sockfd, (rq) => rq.._headerField = x[2] as String, + defaultValue: _newRequest); + break; + case headerValue: + _staging.update(sockfd, (rq) => rq.._headerValue = x[2] as String, + defaultValue: _newRequest); + break; + } + } + } + + Future _handleRequest(WingsRequestContext req) { + if (req == null) return new Future.value(); + var res = new WingsResponseContext._(req) + ..app = app + ..serializer = app.serializer + ..encoders.addAll(app.encoders); + + handle() { + var path = req.path; + if (path == '/') path = ''; + + Tuple3>> resolveTuple() { + Router r = app.optimizedRouter; + var resolved = + r.resolveAbsolute(path, method: req.method, strip: false); + + return new Tuple3( + new MiddlewarePipeline(resolved).handlers, + resolved.fold({}, (out, r) => out..addAll(r.allParams)), + resolved.isEmpty ? null : resolved.first.parseResult, + ); + } + + var cacheKey = req.method + path; + var tuple = app.isProduction + ? app.handlerCache.putIfAbsent(cacheKey, resolveTuple) + : resolveTuple(); + + req.params.addAll(tuple.item2); + req.inject(ParseResult, tuple.item3); + + if (!app.isProduction && app.logger != null) + req.inject(Stopwatch, new Stopwatch()..start()); + + var pipeline = tuple.item1; + + Future Function() runPipeline; + + for (var handler in pipeline) { + if (handler == null) break; + + if (runPipeline == null) + runPipeline = () => app.executeHandler(handler, req, res); + else { + var current = runPipeline; + runPipeline = () => current().then((result) => !result + ? new Future.value(result) + : app.executeHandler(handler, req, res)); + } + } + + return runPipeline == null + ? sendResponse(req, res) + : runPipeline().then((_) => sendResponse(req, res)); + } + + if (useZone == false) { + return handle().catchError((e, StackTrace st) { + if (e is FormatException) + throw new AngelHttpException.badRequest(message: e.message) + ..stackTrace = st; + throw new AngelHttpException(e, stackTrace: st, statusCode: 500); + }, test: (e) => e is! AngelHttpException).catchError( + (AngelHttpException e, StackTrace st) { + return handleAngelHttpException(e, e.stackTrace ?? st, req, res); + }).whenComplete(() => res.dispose()); + } else { + var zoneSpec = new ZoneSpecification( + print: (self, parent, zone, line) { + if (app.logger != null) + app.logger.info(line); + else + parent.print(zone, line); + }, + handleUncaughtError: (self, parent, zone, error, stackTrace) { + var trace = new Trace.from(stackTrace ?? StackTrace.current).terse; + + return new Future(() { + AngelHttpException e; + + if (error is FormatException) { + e = new AngelHttpException.badRequest(message: error.message); + } else if (error is AngelHttpException) { + e = error; + } else { + e = new AngelHttpException(error, + stackTrace: stackTrace, message: error?.toString()); + } + + if (app.logger != null) { + app.logger.severe(e.message ?? e.toString(), error, trace); + } + + return handleAngelHttpException(e, trace, req, res); + }).catchError((e, StackTrace st) { + var trace = new Trace.from(st ?? StackTrace.current).terse; + WingsResponseContext._closeSocket(req._sockfd); + // Ideally, we won't be in a position where an absolutely fatal error occurs, + // but if so, we'll need to log it. + if (app.logger != null) { + app.logger.severe( + 'Fatal error occurred when processing ${req.uri}.', e, trace); + } else { + stderr + ..writeln('Fatal error occurred when processing ' + '${req.uri}:') + ..writeln(e) + ..writeln(trace); + } + }); + }, + ); + + var zone = Zone.current.fork(specification: zoneSpec); + req.inject(Zone, zone); + req.inject(ZoneSpecification, zoneSpec); + return zone.run(handle).whenComplete(() { + res.dispose(); + }); + } + } + + /// Handles an [AngelHttpException]. + Future handleAngelHttpException(AngelHttpException e, StackTrace st, + WingsRequestContext req, WingsResponseContext res, + {bool ignoreFinalizers: false}) { + if (req == null || res == null) { + try { + app.logger?.severe(e, st); + var b = new StringBuffer(); + b.writeln('HTTP/1.1 500 Internal Server Error'); + b.writeln(); + + WingsResponseContext._send( + req._sockfd, _coerceUint8List(b.toString().codeUnits)); + WingsResponseContext._closeSocket(req._sockfd); + } finally { + return null; + } + } + + Future handleError; + + if (!res.isOpen) + handleError = new Future.value(); + else { + res.statusCode = e.statusCode; + handleError = + new Future.sync(() => app.errorHandler(e, req, res)).then((result) { + return app.executeHandler(result, req, res).then((_) => res.end()); + }); + } + + return handleError.then((_) => + sendResponse(req, res, ignoreFinalizers: ignoreFinalizers == true)); + } + + /// Sends a response. + Future sendResponse(WingsRequestContext req, WingsResponseContext res, + {bool ignoreFinalizers: false}) { + if (res.willCloseItself) return new Future.value(); + + Future finalizers = ignoreFinalizers == true + ? new Future.value() + : app.responseFinalizers.fold( + new Future.value(), (out, f) => out.then((_) => f(req, res))); + + if (res.isOpen) res.end(); + + var headers = {}; + headers.addAll(res.headers); + + headers['content-length'] = res.buffer.length.toString(); + + // Ignore chunked transfer encoding + //request.response.headers.chunkedTransferEncoding = res.chunked ?? true; + // TODO: Is there a need to support this? + + List outputBuffer = res.buffer.toBytes(); + + if (res.encoders.isNotEmpty) { + var allowedEncodings = req.headers + .value('accept-encoding') + ?.split(',') + ?.map((s) => s.trim()) + ?.where((s) => s.isNotEmpty) + ?.map((str) { + // Ignore quality specifications in accept-encoding + // ex. gzip;q=0.8 + if (!str.contains(';')) return str; + return str.split(';')[0]; + }); + + if (allowedEncodings != null) { + for (var encodingName in allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (res.encoders.containsKey(encodingName)) + encoder = res.encoders[encodingName]; + else if (encodingName == '*') { + encoder = res.encoders[key = res.encoders.keys.first]; + } + + if (encoder != null) { + headers['content-encoding'] = key; + outputBuffer = res.encoders[key].convert(outputBuffer); + + headers['content-length'] = outputBuffer.length.toString(); + break; + } + } + } + } + + var b = new StringBuffer(); + b.writeln('HTTP/1.1 ${res.statusCode}'); + + res.headers.forEach((k, v) { + b.writeln('$k: $v'); + }); + + // Persist session ID + if (res.correspondingRequest._session != null) { + res.cookies + .add(new Cookie('DARTSESSID', res.correspondingRequest._session.id)); + } + + // Send all cookies + for (var cookie in res.cookies) { + var value = cookie.toString(); + b.writeln('set-cookie: $value'); + } + + b.writeln(); + + var buf = new Uint8List.fromList( + new List.from(b.toString().codeUnits)..addAll(outputBuffer)); + + return finalizers.then((_) { + WingsResponseContext._send(req._sockfd, buf); + WingsResponseContext._closeSocket(req._sockfd); + + if (req.injections.containsKey(PoolResource)) { + req.injections[PoolResource].release(); + } + + if (!app.isProduction && app.logger != null) { + var sw = req.grab(Stopwatch); + + if (sw.isRunning) { + sw?.stop(); + app.logger.info("${res.statusCode} ${req.method} ${req.uri} (${sw + ?.elapsedMilliseconds ?? 'unknown'} ms)"); + } + } + }); + } +} diff --git a/lib/src/wings.h b/lib/src/wings.h new file mode 100644 index 00000000..da4f06c8 --- /dev/null +++ b/lib/src/wings.h @@ -0,0 +1,45 @@ +#ifndef ANGEL_WINGS_H +#define ANGEL_WINGS_H +#ifndef WIN32 +#include +#include +#include +#include +#else +#include +#include +#include +#include +// Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib +#pragma comment(lib, "Ws2_32.lib") +#pragma comment(lib, "Mswsock.lib") +#pragma comment(lib, "AdvApi32.lib") +#endif +#include +#include +#include +#include +#include + +class WingsServerInfo +{ +public: + std::mutex mutex; + std::string addressString; + uint64_t port; + int sockfd; + bool ipv6; +}; + +extern std::mutex serverInfoVectorMutex; +extern std::vector serverInfoVector; + +Dart_Handle HandleError(Dart_Handle handle); + +void wings_AddressToString(Dart_NativeArguments arguments); +void wings_BindSocket(Dart_NativeArguments arguments); +void wings_CloseSocket(Dart_NativeArguments arguments); +void wings_Send(Dart_NativeArguments arguments); +void wings_StartHttpListener(Dart_NativeArguments arguments); + +#endif \ No newline at end of file diff --git a/lib/src/wings_request.dart b/lib/src/wings_request.dart new file mode 100644 index 00000000..d58b9505 --- /dev/null +++ b/lib/src/wings_request.dart @@ -0,0 +1,157 @@ +part of angel_wings; + +class WingsRequestContext extends RequestContext { + final AngelWings _wings; + final int _sockfd; + + @override + Angel app; + + WingsRequestContext._(this._wings, this._sockfd, Angel app) : this.app = app; + + static final RegExp _straySlashes = new RegExp(r'(^/+)|(/+$)'); + + final Map _headers = {}; + + String __contentTypeString; + String __path; + String __url; + + Uint8List _addressBytes; + StreamController _body; + ContentType _contentType; + List _cookies; + String _headerField, _hostname, _originalMethod, _method, _path; + HttpHeaders _httpHeaders; + InternetAddress _remoteAddress; + HttpSession _session; + Uri _uri; + + static String _addressToString(Uint8List bytes, bool ipV6) + native "AddressToString"; + + String get _contentTypeString => + __contentTypeString ??= _headers['content-type']?.toString(); + + void set _headerValue(String value) { + if (_headerField != null) { + _headers[_headerField.toLowerCase()] = value; + _headerField = null; + } + } + + @override + ContentType get contentType => _contentType ??= (_contentTypeString == null + ? ContentType.binary + : ContentType.parse(_contentTypeString)); + + @override + List get cookies { + if (_cookies != null) { + return _cookies; + } + + var cookies = []; + + return _cookies = new List.unmodifiable(cookies); + } + + @override + HttpHeaders get headers => _httpHeaders ??= new _WingsIncomingHeaders(this); + + @override + String get hostname => _hostname ??= + (_headers['host'] ?? '${_wings.address.address}:${_wings.port}'); + + @override + HttpRequest get io => null; + + @override + String get method => + _method ??= (_headers['x-http-method-override'] ?? originalMethod); + + @override + String get originalMethod => _originalMethod; + + @override + Future parseOnce() { + return parseBodyFromStream( + _body?.stream ?? new Stream>.empty(), + contentType == null ? null : new MediaType.parse(contentType.toString()), + uri, + storeOriginalBuffer: app.storeOriginalBuffer, + ); + } + + @override + String get path { + if (_path != null) { + return __path; + } else { + var path = __path.replaceAll(_straySlashes, ''); + if (path.isEmpty) path = '/'; + return _path = path; + } + } + + @override + InternetAddress get remoteAddress => _remoteAddress ??= new InternetAddress( + _addressToString(_addressBytes, _addressBytes.length > 4)); + + @override + HttpSession get session { + if (_session != null) return _session; + var dartSessIdCookie = cookies.firstWhere((c) => c.name == 'DARTSESSID', + orElse: () => new Cookie('DARTSESSID', _wings._uuid.v4().toString())); + return _session = _wings._sessions.putIfAbsent(dartSessIdCookie.value, + () => new MockHttpSession(id: dartSessIdCookie.value)); + } + + @override + Uri get uri => _uri ??= Uri.parse(__url); + + @override + bool get xhr => + _headers['x-requested-with']?.trim()?.toLowerCase() == 'xmlhttprequest'; +} + +class _WingsIncomingHeaders extends HttpHeaders { + final WingsRequestContext request; + + _WingsIncomingHeaders(this.request); + + UnsupportedError _unsupported() => + new UnsupportedError('Cannot modify incoming HTTP headers.'); + + @override + List operator [](String name) { + return value(name)?.split(',')?.map((s) => s.trim())?.toList(); + } + + @override + void add(String name, Object value) => throw _unsupported(); + + @override + void clear() => throw _unsupported(); + + @override + void forEach(void Function(String name, List values) f) { + request._headers.forEach((name, value) => + f(name, value.split(',').map((s) => s.trim()).toList())); + } + + @override + void noFolding(String name) => throw _unsupported(); + + @override + void remove(String name, Object value) => throw _unsupported(); + + @override + void removeAll(String name) => throw _unsupported(); + + @override + void set(String name, Object value) => throw _unsupported(); + + @override + String value(String name) => request._headers[name.toLowerCase()]; +} diff --git a/lib/src/wings_response.dart b/lib/src/wings_response.dart new file mode 100644 index 00000000..47abec7b --- /dev/null +++ b/lib/src/wings_response.dart @@ -0,0 +1,158 @@ +part of angel_wings; + +class WingsResponseContext extends ResponseContext { + final WingsRequestContext correspondingRequest; + bool _isClosed = false, _useStream = false; + + WingsResponseContext._(this.correspondingRequest); + + static void _send(int sockfd, Uint8List data) native "Send"; + + static void _closeSocket(int sockfd) native "CloseSocket"; + + @override + void add(List data) { + if (_isClosed && !_useStream) + throw ResponseContext.closed(); + else if (_useStream) + _send(correspondingRequest._sockfd, _coerceUint8List(data)); + else + buffer.add(data); + } + + @override + Future close() { + _closeSocket(correspondingRequest._sockfd); + _isClosed = true; + _useStream = false; + return super.close(); + } + + @override + void end() { + _isClosed = true; + super.end(); + } + + @override + Future addStream(Stream> stream) { + if (_isClosed && !_useStream) throw ResponseContext.closed(); + var firstStream = useStream(); + + Stream> output = stream; + + if ((firstStream || !headers.containsKey('content-encoding')) && + encoders.isNotEmpty && + correspondingRequest != null) { + var allowedEncodings = + (correspondingRequest.headers['accept-encoding'] ?? []).map((str) { + // Ignore quality specifications in accept-encoding + // ex. gzip;q=0.8 + if (!str.contains(';')) return str; + return str.split(';')[0]; + }); + + for (var encodingName in allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) + encoder = encoders[encodingName]; + else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + /* + if (firstStream) { + this.stream.sendHeaders([ + new Header.ascii( + 'content-encoding', headers['content-encoding'] = key) + ]); + } + */ + + output = encoders[key].bind(output); + break; + } + } + } + + return output.forEach(((data) => + _send(correspondingRequest._sockfd, _coerceUint8List(data)))); + } + + @override + HttpResponse get io => null; + + @override + bool get isOpen => !_isClosed; + + @override + bool get streaming => _useStream; + + @override + bool useStream() { + if (!_useStream) { + // If this is the first stream added to this response, + // then add headers, status code, etc. + _finalize(); + + willCloseItself = _useStream = _isClosed = true; + releaseCorrespondingRequest(); + return true; + } + + return false; + } + + /// Write headers, status, etc. to the underlying [stream]. + void _finalize() { + var b = new StringBuffer(); + b.writeln('HTTP/1.1 $statusCode'); + headers['date'] ??= HttpDate.format(new DateTime.now()); + + if (encoders.isNotEmpty && correspondingRequest != null) { + var allowedEncodings = + (correspondingRequest.headers['accept-encoding'] ?? []).map((str) { + // Ignore quality specifications in accept-encoding + // ex. gzip;q=0.8 + if (!str.contains(';')) return str; + return str.split(';')[0]; + }); + + for (var encodingName in allowedEncodings) { + String key = encodingName; + + if (encoders.containsKey(encodingName)) { + this.headers['content-encoding'] = key; + break; + } + } + } + + // Add all normal headers + this.headers.forEach((k, v) { + b.writeln('$k: $v'); + }); + + // Persist session ID + if (correspondingRequest._session != null) { + cookies.add(new Cookie('DARTSESSID', correspondingRequest._session.id)); + } + + // Send all cookies + for (var cookie in cookies) { + var value = cookie.toString(); + b.writeln('set-cookie: $value'); + } + + b.writeln(); + + _send( + correspondingRequest._sockfd, _coerceUint8List(b.toString().codeUnits)); + } +} + +Uint8List _coerceUint8List(List list) => + list is Uint8List ? list : new Uint8List.fromList(list); diff --git a/lib/src/wings_thread.h b/lib/src/wings_thread.h new file mode 100644 index 00000000..b86fd2f1 --- /dev/null +++ b/lib/src/wings_thread.h @@ -0,0 +1,25 @@ +#ifndef ANGEL_WINGS_THREAD_H +#define ANGEL_WINGS_THREAD_H +#include +#include +#include "wings.h" + +typedef struct +{ + Dart_Port port; + WingsServerInfo *serverInfo; +} wings_thread_info; + +typedef struct +{ + bool ipv6; + int sock; + sockaddr addr; + socklen_t addr_len; + Dart_Port port; +} requestInfo; + +void wingsThreadMain(wings_thread_info *info); +void handleRequest(requestInfo *rq); + +#endif \ No newline at end of file diff --git a/lib/src/worker_thread.cc b/lib/src/worker_thread.cc new file mode 100644 index 00000000..fbe0ee15 --- /dev/null +++ b/lib/src/worker_thread.cc @@ -0,0 +1,215 @@ +#include +#include +#include "wings_thread.h" + +void wingsThreadMain(wings_thread_info *info) +{ + auto *serverInfo = std::move(info->serverInfo); + Dart_Port port = std::move(info->port); + //delete info; + + while (true) + { + std::lock_guard lock(serverInfo->mutex); + + sockaddr client_addr{}; + socklen_t client_addr_len; + int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len); + + if (client < 0) + { + // send_error(info->port, "Failed to accept client socket."); + return; + } + + requestInfo rq{}; + rq.ipv6 = serverInfo->ipv6; + rq.sock = client; + rq.addr = client_addr; + rq.addr_len = client_addr_len; + rq.port = port; + handleRequest(&rq); + } +} + +int send_notification(http_parser *parser, int code) +{ + //if (parser == nullptr) return 0; + auto *rq = (requestInfo *)parser->data; + //if (rq == nullptr) return 0; + + Dart_CObject first{}; + Dart_CObject second{}; + first.type = second.type = Dart_CObject_kInt64; + first.value.as_int64 = rq->sock; + second.value.as_int64 = code; + + Dart_CObject *list[2]{&first, &second}; + Dart_CObject obj{}; + obj.type = Dart_CObject_kArray; + obj.value.as_array.length = 2; + obj.value.as_array.values = list; + Dart_PostCObject(rq->port, &obj); + return 0; +} + +int send_string(http_parser *parser, char *str, size_t length, int code, bool as_typed_data = false) +{ + //if (parser == nullptr) return 0; + auto *rq = (requestInfo *)parser->data; + //if (rq == nullptr) return 0; + auto *s = new char[length + 1]; + memset(s, 0, length + 1); + + Dart_CObject first{}; + Dart_CObject second{}; + Dart_CObject third{}; + first.type = second.type = Dart_CObject_kInt32; + first.value.as_int32 = rq->sock; + second.value.as_int32 = code; + + if (!as_typed_data) + { + third.type = Dart_CObject_kString; + memcpy(s, str, length); + third.value.as_string = s; + } + else + { + third.type = Dart_CObject_kExternalTypedData; + third.type = Dart_CObject_kExternalTypedData; + third.value.as_external_typed_data.type = Dart_TypedData_kUint8; + third.value.as_external_typed_data.length = length; + third.value.as_external_typed_data.data = (uint8_t *)str; + } + + // Post the string back to Dart... + Dart_CObject *list[3]{&first, &second, &third}; + Dart_CObject obj{}; + obj.type = Dart_CObject_kArray; + obj.value.as_array.length = 3; + obj.value.as_array.values = list; + Dart_PostCObject(rq->port, &obj); + delete[] s; + return 0; +} + +int send_oncomplete(http_parser *parser, int code) +{ + //if (parser == nullptr) return 0; + auto *rq = (requestInfo *)parser->data; + //if (rq == nullptr) return 0; + + Dart_CObject sockfd{}; + Dart_CObject command{}; + Dart_CObject method{}; + Dart_CObject major{}; + Dart_CObject minor{}; + Dart_CObject addr{}; + sockfd.type = command.type = method.type = major.type = minor.type = Dart_CObject_kInt32; + addr.type = Dart_CObject_kExternalTypedData; + sockfd.value.as_int32 = rq->sock; + command.value.as_int32 = code; + method.value.as_int32 = parser->method; + major.value.as_int32 = parser->http_major; + minor.value.as_int32 = parser->http_minor; + addr.value.as_external_typed_data.type = Dart_TypedData_kUint8; + addr.value.as_external_typed_data.length = rq->addr_len; + + if (rq->ipv6) + { + auto *v6 = (sockaddr_in6 *)&rq->addr; + addr.value.as_external_typed_data.data = (uint8_t *)v6->sin6_addr.s6_addr; + } + else + { + auto *v4 = (sockaddr_in *)&rq->addr; + addr.value.as_external_typed_data.data = (uint8_t *)&v4->sin_addr.s_addr; + } + + Dart_CObject *list[6]{&sockfd, &command, &method, &major, &minor, &addr}; + Dart_CObject obj{}; + obj.type = Dart_CObject_kArray; + obj.value.as_array.length = 6; + obj.value.as_array.values = list; + Dart_PostCObject(rq->port, &obj); + //delete parser; + return 0; +} + +void handleRequest(requestInfo *rq) +{ + size_t len = 80 * 1024, nparsed; + char buf[len]; + ssize_t recved; + memset(buf, 0, len); + + http_parser parser{}; + http_parser_init(&parser, HTTP_REQUEST); + parser.data = rq; //rq.get(); + + http_parser_settings settings{}; + + settings.on_message_begin = [](http_parser *parser) { + // std::cout << "mb" << std::endl; + return send_notification(parser, 0); + }; + + settings.on_message_complete = [](http_parser *parser) { + //std::cout << "mc" << std::endl; + send_oncomplete(parser, 1); + //delete (requestInfo *) parser->data; + //std::cout << "deleted rq!" << std::endl; + return 0; + }; + + settings.on_url = [](http_parser *parser, const char *at, size_t length) { + // std::cout << "url" << std::endl; + return send_string(parser, (char *)at, length, 2); + }; + + settings.on_header_field = [](http_parser *parser, const char *at, size_t length) { + // std::cout << "hf" << std::endl; + return send_string(parser, (char *)at, length, 3); + }; + + settings.on_header_value = [](http_parser *parser, const char *at, size_t length) { + // std::cout << "hv" << std::endl; + return send_string(parser, (char *)at, length, 4); + }; + + settings.on_body = [](http_parser *parser, const char *at, size_t length) { + // std::cout << "body" << std::endl; + return send_string(parser, (char *)at, length, 5, true); + }; + + unsigned int isUpgrade = 0; + + // std::cout << "start" << std::endl; + while ((recved = recv(rq->sock, buf, len, 0)) > 0) + { + if (isUpgrade) + { + send_string(&parser, buf, (size_t)recved, 7, true); + } + else + { + /* Start up / continue the parser. + * Note we pass recved==0 to signal that EOF has been received. + */ + nparsed = http_parser_execute(&parser, &settings, buf, (size_t)recved); + + if ((isUpgrade = parser.upgrade) == 1) + { + send_notification(&parser, 6); + } + else if (nparsed != recved) + { + close(rq->sock); + return; + } + } + + memset(buf, 0, len); + } +} \ No newline at end of file diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 00000000..90b0a776 --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,13 @@ +name: angel_wings +dependencies: + angel_framework: ^1.0.0 + build_native: ^0.0.9 + mock_request: ^1.0.0 + pooled_map: ^1.0.0 + uuid: ^1.0.0 +dev_dependencies: + build_runner: + git: + url: https://github.com/thosakwe/build.git + path: build_runner + ref: experimental-hot-reloading \ No newline at end of file