This is great - Dart's standard +library comes with an HTTP server, which saves a lot of difficult in implementation. + +However, abstraction tends to come with a cost. Wings seeks to minimize abstraction entirely. Rather than +using the built-in Dart network stack, Wings' HTTP server is implemented in C++ as a Dart native extension, +and the `AngelWings` driver listens to events from the extension and converts them directly into +`RequestContext` objects, without any additional abstraction within. This reduces the amount of computation +performed on each request, and helps to minimize response latency. Sending data from the response buffer in plain +Dart surprisingly is the most expensive operation, as is revealed by the Observatory. + +By combining Dart's powerful VM with a native code server based on +[the same one used in Node.js](https://github.com/nodejs/http-parser), +`AngelWings` trims several milliseconds off every request, both saving resources and reducing +load times for high-traffic applications. + +## How can I use it? +The intended way to use `AngelWings` is via +[`package:build_native`](https://github.com/thosakwe/build_native); +however, the situation surrounding distributing native extensions is yet far from ideal, +so this package includes pre-built binaries out-of-the-box. + +Thanks to this, you can use it like any other Dart package, by installing it via Pub. + +## Brief example +Using `AngelWings` is almost identical to using `AngelHttp`; however, it does +not support SSL, and therefore should be placed behind a reverse proxy like `nginx` in production. + +```dart +main() async { + var app = new Angel(); + var wings = new AngelWings(app, shared: true, useZone: false); + + app.injectEncoders({'gzip': gzip.encoder, 'deflate': zlib.encoder}); + + app.get('/hello', 'Hello, native world! This is Angel WINGS.'); + + var fs = const LocalFileSystem(); + var vDir = new VirtualDirectory(app, fs, source: fs.directory('web')); + app.use(vDir.handleRequest); + + await wings.startServer('', 3000); + print('Listening at http://${wings.address.address}:${wings.port}'); +} +``` \ No newline at end of file diff --git a/packages/wings/Vagrantfile b/packages/wings/Vagrantfile new file mode 100644 index 00000000..995ff4a3 --- /dev/null +++ b/packages/wings/Vagrantfile @@ -0,0 +1,6 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : +Vagrant.configure("2") do |config| + config.vm.box = "ubuntu/bionic64" + config.vm.provision "shell", path: "provision.sh" +end diff --git a/packages/wings/analysis_options.yaml b/packages/wings/analysis_options.yaml new file mode 100644 index 00000000..1009a0a1 --- /dev/null +++ b/packages/wings/analysis_options.yaml @@ -0,0 +1,6 @@ +include: package:pedantic/analysis_options.yaml +analyzer: + exclude: + - cmake_dart_utils/example/* + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/packages/wings/benchmark/empty.dart b/packages/wings/benchmark/empty.dart new file mode 100644 index 00000000..dcd5bc0e --- /dev/null +++ b/packages/wings/benchmark/empty.dart @@ -0,0 +1,23 @@ +import 'dart:async'; +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart'; +import 'util.dart'; + +const AngelBenchmark emptyBenchmark = _EmptyBenchmark(); + +main() => runBenchmarks([emptyBenchmark]); + +class _EmptyBenchmark implements AngelBenchmark { + const _EmptyBenchmark(); + + @override + String get name => 'empty'; + + @override + FutureOr rawHandler(HttpRequest req, HttpResponse res) { + return res.close(); + } + + @override + void setupAngel(Angel app) {} +} diff --git a/packages/wings/benchmark/util.dart b/packages/wings/benchmark/util.dart new file mode 100644 index 00000000..0da16c2d --- /dev/null +++ b/packages/wings/benchmark/util.dart @@ -0,0 +1,110 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_wings/angel_wings.dart'; +import 'package:io/ansi.dart'; +import 'package:tuple/tuple.dart'; + +Future _runWrk( + {ProcessStartMode mode = ProcessStartMode.inheritStdio}) async { + return await Process.start('wrk', ['http://localhost:$testPort'], mode: mode); +} + +Future _warmUp() async { + var wrk = await _runWrk(); + await wrk.exitCode; + // await wrk.stderr.drain(); + // await wrk.stdout.drain(); +} + +Future _10s() => Future.delayed(Duration(seconds: 10)); + +const testPort = 8877; + +Future runBenchmarks(Iterable benchmarks, + {Iterable factories = const [ + // 'angel_http', + 'angel_wings', + ]}) async { + for (var benchmark in benchmarks) { + print(magenta.wrap('Entering benchmark: ${benchmark.name}')); + + // // Run dart:io + // print(lightGray.wrap('Booting dart:io server (waiting 10s)...')); + // var isolates = []; + // for (int i = 0; i < Platform.numberOfProcessors; i++) { + // isolates.add(await Isolate.spawn(_httpIsolate, benchmark)); + // } + + // await _10s(); + // print(lightGray.wrap('Warming up dart:io server...')); + // await _warmUp(); + + // stdout + // ..write(lightGray.wrap('Now running `wrk` for ')) + // ..write(cyan.wrap(benchmark.name)) + // ..writeln(lightGray.wrap(' (waiting 10s)...')); + // var wrk = await _runWrk(mode: ProcessStartMode.inheritStdio); + // await wrk.exitCode; + // isolates.forEach((i) => i.kill(priority: Isolate.immediate)); + + // Run Angel HTTP, Wings + for (var fac in factories) { + print(lightGray.wrap('Booting $fac server...')); + + var isolates = []; + for (int i = 0; i < Platform.numberOfProcessors; i++) { + isolates + .add(await Isolate.spawn(_angelIsolate, Tuple2(benchmark, fac))); + } + + await _10s(); + print(lightGray.wrap('Warming up $fac server...')); + await _warmUp(); + stdout + ..write(lightGray.wrap('Now running `wrk` for ')) + ..write(cyan.wrap(benchmark.name)) + ..writeln(lightGray.wrap('...')); + var wrk = await _runWrk(mode: ProcessStartMode.inheritStdio); + await wrk.exitCode; + } + } + + exit(0); +} + +void _httpIsolate(AngelBenchmark benchmark) { + Future(() async { + var raw = await HttpServer.bind(InternetAddress.loopbackIPv4, testPort, + shared: true); + raw.listen((r) => benchmark.rawHandler(r, r.response)); + }); +} + +void _angelIsolate(Tuple2 args) { + Future(() async { + var app = Angel(); + Driver driver; + + if (args.item2 == 'angel_http') { + driver = AngelHttp.custom(app, startShared); + } else if (args.item2 == 'angel_wings') { + driver = AngelWings.custom(app, startSharedWings); + } + + await app.configure(args.item1.setupAngel); + await driver.startServer(InternetAddress.loopbackIPv4, testPort); + }); +} + +abstract class AngelBenchmark { + const AngelBenchmark(); + + String get name; + + FutureOr setupAngel(Angel app); + + FutureOr rawHandler(HttpRequest req, HttpResponse res); +} diff vDir = CachingVirtualDirectory(app, fs, + source: fs.currentDirectory, allowDirectoryListing: true); + + app.mimeTypeResolver.addExtension('yaml', 'text/x-yaml'); + + app.get('/', (req, res) => 'WINGS!!!'); + app.post('/', (req, res) async { + await req.parseBody(); + return req.bodyAsMap; + }); + + app.fallback(vDir.handleRequest); + app.fallback((req, res) => throw AngelHttpException.notFound()); + + await wings.startServer(InternetAddress.loopbackIPv4, 3000); + print('Listening at ${wings.uri}'); +} diff --git a/packages/wings/example/shared.dart b/packages/wings/example/shared.dart new file mode 100644 index 00000000..2b9f6ee5 --- /dev/null +++ b/packages/wings/example/shared.dart @@ -0,0 +1,22 @@ +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_wings/angel_wings.dart'; + +main() async { + var app = Angel(); + var wings1 = AngelWings.custom(app, startSharedWings); + var wings2 = AngelWings.custom(app, startSharedWings); + var wings3 = AngelWings.custom(app, startSharedWings); + var wings4 = AngelWings.custom(app, startSharedWings); + await wings1.startServer('', 3000); + await wings2.startServer('', 3000); + await wings3.startServer('', 3000); + await wings4.startServer('', 3000); + print(wings1.uri); + print(wings2.uri); + print(wings3.uri); + print(wings4.uri); + await wings1.close(); + await wings2.close(); + await wings3.close(); + await wings4.close(); +} diff --git a/packages/wings/example/socket.dart b/packages/wings/example/socket.dart new file mode 100644 index 00000000..5c827772 --- /dev/null +++ b/packages/wings/example/socket.dart @@ -0,0 +1,27 @@ +import 'dart:convert'; +import 'dart:typed_data'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_wings/angel_wings.dart'; + +main() async { + var app = Angel(); + var socket = await WingsSocket.bind('', 3000); + print('Listening at http://localhost:3000'); + + await for (var fd in socket) { + var response = ''' +HTTP/1.1 200 Not Found\r +Date: Fri, 31 Dec 1999 23:59:59 GMT\r +server: wings-test\r\n\r +Nope, nothing's here! +\r\n\r +'''; + var bytes = utf8.encode(response); + var data = Uint8List.fromList(bytes); + var rq = await WingsRequestContext.from(app, fd); + print('Yay: $rq'); + print(rq.headers); + writeToNativeSocket(fd.fileDescriptor, data); + closeNativeSocketDescriptor(fd.fileDescriptor); + } +} diff --git a/packages/wings/lib/angel_wings.dart b/packages/wings/lib/angel_wings.dart new file mode 100644 index 00000000..c1cda8f7 --- /dev/null +++ b/packages/wings/lib/angel_wings.dart @@ -0,0 +1,4 @@ +export 'src/wings_driver.dart'; +export 'src/wings_request.dart'; +export 'src/wings_response.dart'; +export 'src/wings_socket.dart'; diff --git a/packages/wings/lib/src/CMakeLists.txt b/packages/wings/lib/src/CMakeLists.txt new file mode 100644 index 00000000..758fce7d --- /dev/null +++ b/packages/wings/lib/src/CMakeLists.txt @@ -0,0 +1,8 @@ +include_directories("${CMAKE_CURRENT_LIST_DIR}") +add_dart_native_extension(angel_wings + http-parser/http_parser.c + angel_wings.h angel_wings.cc + bind.cc http.cc wings_socket.cc + util.cc) +install(TARGETS angel_wings LIBRARY DESTINATION "${CMAKE_CURRENT_LIST_DIR}") +install(TARGETS angel_wings LIBRARY DESTINATION "${CMAKE_CURRENT_LIST_DIR}/../..") diff --git a/packages/wings/lib/src/angel_wings.cc b/packages/wings/lib/src/angel_wings.cc new file mode 100644 index 00000000..74d3a8ef --- /dev/null +++ b/packages/wings/lib/src/angel_wings.cc @@ -0,0 +1,55 @@ +#include "angel_wings.h" +#include +#include +#include +#include + +// The name of the initialization function is the extension name followed +// by _Init. +DART_EXPORT Dart_Handle angel_wings_Init(Dart_Handle parent_library) { + if (Dart_IsError(parent_library)) + return parent_library; + + Dart_Handle result_code = + Dart_SetNativeResolver(parent_library, ResolveName, NULL); + if (Dart_IsError(result_code)) + return result_code; + + return Dart_Null(); +} + +Dart_Handle HandleError(Dart_Handle handle) { + if (Dart_IsError(handle)) + Dart_PropagateError(handle); + return handle; +} + +Dart_NativeFunction ResolveName(Dart_Handle name, int argc, + bool *auto_setup_scope) { + // If we fail, we return NULL, and Dart throws an exception. + if (!Dart_IsString(name)) + return NULL; + Dart_NativeFunction result = NULL; + const char *cname; + HandleError(Dart_StringToCString(name, &cname)); + + if (strcmp("Dart_WingsSocket_bindIPv4", cname) == 0) + result = Dart_WingsSocket_bindIPv4; + if (strcmp("Dart_WingsSocket_bindIPv6", cname) == 0) + result = Dart_WingsSocket_bindIPv6; + if (strcmp("Dart_WingsSocket_getAddress", cname) == 0) + result = Dart_WingsSocket_getAddress; + if (strcmp("Dart_WingsSocket_getPort", cname) == 0) + result = Dart_WingsSocket_getPort; + if (strcmp("Dart_WingsSocket_write", cname) == 0) + result = Dart_WingsSocket_write; + if (strcmp("Dart_WingsSocket_closeDescriptor", cname) == 0) + result = Dart_WingsSocket_closeDescriptor; + if (strcmp("Dart_WingsSocket_close", cname) == 0) + result = Dart_WingsSocket_close; + if (strcmp("Dart_WingsSocket_listen", cname) == 0) + result = Dart_WingsSocket_listen; + if (strcmp("Dart_WingsSocket_parseHttp", cname) == 0) + result = Dart_WingsSocket_parseHttp; + return result; +} \ No newline at end of file diff --git a/packages/wings/lib/src/angel_wings.h b/packages/wings/lib/src/angel_wings.h new file mode 100644 index 00000000..3aeda0b7 --- /dev/null +++ b/packages/wings/lib/src/angel_wings.h @@ -0,0 +1,25 @@ +#ifndef ANGEL_WINGS_WINGS_H +#define ANGEL_WINGS_WINGS_H + +#include "angel_wings.h" +#include +#include + +Dart_NativeFunction ResolveName(Dart_Handle name, int argc, + bool *auto_setup_scope); +Dart_Handle HandleError(Dart_Handle handle); + +void wingsThrowError(const char *msg, const char *lib = "dart:core", + const char *name = "StateError", int n = -1); +void Dart_WingsSocket_bindIPv4(Dart_NativeArguments arguments); +void Dart_WingsSocket_bindIPv6(Dart_NativeArguments arguments); +void Dart_WingsSocket_getAddress(Dart_NativeArguments arguments); +void Dart_WingsSocket_getPort(Dart_NativeArguments arguments); +void Dart_WingsSocket_write(Dart_NativeArguments arguments); +void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments); +void Dart_WingsSocket_close(Dart_NativeArguments arguments); +void Dart_WingsSocket_listen(Dart_NativeArguments arguments); +void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments); +void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message); + +#endif \ No newline at end of file diff --git a/packages/wings/lib/src/bind.cc b/packages/wings/lib/src/bind.cc new file mode 100644 index 00000000..26d90781 --- /dev/null +++ b/packages/wings/lib/src/bind.cc @@ -0,0 +1,161 @@ +#include "angel_wings.h" +#include "wings_socket.h" +#include +#include +using namespace wings; + +void getWingsSocketInfo(Dart_NativeArguments arguments, WingsSocketInfo *info); + +WingsSocket *wingsFindSocket(Dart_NativeArguments arguments, + const WingsSocketInfo &info, int af); + +WingsSocket *wingsBindNewSocket(Dart_NativeArguments arguments, + const WingsSocketInfo &info, int af); + +void wingsReturnBound(Dart_NativeArguments arguments, WingsSocket *socket); + +void Dart_WingsSocket_bind(sa_family_t af, Dart_NativeArguments arguments) { + WingsSocketInfo info; + getWingsSocketInfo(arguments, &info); + WingsSocket *socket = wingsFindSocket(arguments, info, af); + wingsReturnBound(arguments, socket); +} + +void Dart_WingsSocket_bindIPv4(Dart_NativeArguments arguments) { + Dart_WingsSocket_bind(AF_INET, arguments); +} + +void Dart_WingsSocket_bindIPv6(Dart_NativeArguments arguments) { + Dart_WingsSocket_bind(AF_INET6, arguments); +} + +void wingsReturnBound(Dart_NativeArguments arguments, WingsSocket *socket) { + Dart_Port sendPort; + HandleError( + Dart_SendPortGetId(Dart_GetNativeArgument(arguments, 5), &sendPort)); + socket->incrRef(sendPort); + auto ptr = (uint64_t)socket; + Dart_Handle ptrHandle = Dart_NewIntegerFromUint64(ptr); + Dart_SetReturnValue(arguments, ptrHandle); +} + +WingsSocket *wingsFindSocket(Dart_NativeArguments arguments, + const WingsSocketInfo &info, int af) { + // Find an existing server, if any. + if (info.shared) { + // std::cout << info.address << std::endl; + // std::cout << globalSocketList.size() << std::endl; + for (auto *socket : globalSocketList) { + if (info.equals(socket->getInfo())) { + return socket; + } + } + } + + return wingsBindNewSocket(arguments, info, af); +} + +WingsSocket *wingsBindNewSocket(Dart_NativeArguments arguments, + const WingsSocketInfo &info, int af) { + sockaddr *addr; + sockaddr_in v4; + sockaddr_in6 v6; + int ret; + + int sock = socket(af, SOCK_STREAM, IPPROTO_TCP); + + if (sock < 0) { + wingsThrowError("Failed to create socket."); + return nullptr; + } + + int i = 1; + ret = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); + + if (ret < 0) { + wingsThrowError("Cannot reuse address for socket."); + return nullptr; + } + + // TODO: Only on Mac??? + // ret = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i)); + + // if (ret < 0) + // { + // wingsThrowStateError("Cannot reuse port for socket."); + // return; + // } + + if (af == AF_INET6) { + v6.sin6_family = AF_INET6; + v6.sin6_port = htons((uint16_t)info.port); + ret = inet_pton(AF_INET6, info.address, &v6.sin6_addr.s6_addr); + if (ret >= 0) + ret = bind(sock, (const sockaddr *)&v6, sizeof(v6)); + } else { + v4.sin_family = AF_INET; + v4.sin_port = htons((uint16_t)info.port); + v4.sin_addr.s_addr = inet_addr(info.address); + bind(sock, (const sockaddr *)&v4, sizeof(v4)); + } + + if (ret < 0) { + wingsThrowError("Failed to bind socket."); + return nullptr; + } + + if (listen(sock, SOMAXCONN) < 0) { + wingsThrowError("Failed to set SOMAXCONN on bound socket."); + return nullptr; + } + + if (listen(sock, (int)info.backlog) < 0) { + wingsThrowError("Failed to set backlog on bound socket."); + return nullptr; + } + + if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) { + wingsThrowError("Failed to make socket non-blocking."); + return nullptr; + } + + auto *out = new WingsSocket(af, sock, info); + globalSocketList.push_back(out); + return out; +} + +void getWingsSocketInfo(Dart_NativeArguments arguments, WingsSocketInfo *info) { + Dart_Handle addressHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle portHandle = Dart_GetNativeArgument(arguments, 1); + Dart_Handle sharedHandle = Dart_GetNativeArgument(arguments, 2); + Dart_Handle backlogHandle = Dart_GetNativeArgument(arguments, 3); + Dart_Handle v6OnlyHandle = Dart_GetNativeArgument(arguments, 4); + info->sendPortHandle = Dart_GetNativeArgument(arguments, 5); + + HandleError(Dart_StringToCString(addressHandle, &info->address)); + HandleError(Dart_IntegerToUint64(portHandle, &info->port)); + HandleError(Dart_BooleanValue(sharedHandle, &info->shared)); + HandleError(Dart_IntegerToUint64(backlogHandle, &info->backlog)); + HandleError(Dart_BooleanValue(v6OnlyHandle, &info->v6Only)); +} + +void wingsThrowError(const char *msg, const char *lib, const char *name, + int n) { + Dart_Handle msgHandle = Dart_NewStringFromCString(msg); + Dart_Handle emptyHandle = Dart_NewStringFromCString(""); + Dart_Handle stateErrorHandle = Dart_NewStringFromCString(name); + Dart_Handle dartCoreHandle = Dart_NewStringFromCString(lib); + Dart_Handle dartCore = Dart_LookupLibrary(dartCoreHandle); + Dart_Handle stateError = Dart_GetType(dartCore, stateErrorHandle, 0, nullptr); + + std::vector args; + args.push_back(msgHandle); + + if (n > -1) { + args.push_back(Dart_NewInteger(n)); + } + + Dart_Handle errHandle = + Dart_New(stateError, emptyHandle, args.size(), args.data()); + Dart_ThrowException(errHandle); +} \ No newline at end of file diff --git a/packages/wings/lib/src/http-parser b/packages/wings/lib/src/http-parser new file mode 160000 index 00000000..28f3c35c --- /dev/null +++ b/packages/wings/lib/src/http-parser @@ -0,0 +1 @@ +Subproject commit 28f3c35c215ffbe0241685901338fad484660454 diff --git a/packages/wings/lib/src/http.cc b/packages/wings/lib/src/http.cc new file mode 100644 index 00000000..a9fe7183 --- /dev/null +++ b/packages/wings/lib/src/http.cc @@ -0,0 +1,175 @@ +#include + +#include "angel_wings.h" +#include "wings_socket.h" +#include +#include +using namespace wings; + +void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments) { + Dart_Port service_port = + Dart_NewNativePort("WingsHttpCallback", &wingsHttpCallback, true); + Dart_Handle send_port = Dart_NewSendPort(service_port); + Dart_SetReturnValue(arguments, send_port); +} + +struct wingsHttp { + Dart_Port port; + std::string lastHeader; +}; + +void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message) { + int64_t fd = -1; + Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; + Dart_CObject *fdArg = message->value.as_array.values[1]; + + wingsHttp httpData = {outPort}; + +#define theStruct (*((wingsHttp *)parser->data)) +#define thePort theStruct.port +#define sendInt(n) \ + { \ + Dart_CObject obj; \ + obj.type = Dart_CObject_kInt64; \ + obj.value.as_int64 = (n); \ + Dart_PostCObject(thePort, &obj); \ + } +#define sendString(n) \ + if (length > 0) { \ + Dart_CObject typeObj; \ + typeObj.type = Dart_CObject_kInt32; \ + typeObj.value.as_int32 = (n); \ + std::string str(at, length); \ + Dart_CObject strObj; \ + strObj.type = Dart_CObject_kString; \ + strObj.value.as_string = (char *)str.c_str(); \ + Dart_CObject *values[2] = {&typeObj, &strObj}; \ + Dart_CObject out; \ + out.type = Dart_CObject_kArray; \ + out.value.as_array.length = 2; \ + out.value.as_array.values = values; \ + Dart_PostCObject(thePort, &out); \ + } + + if (fdArg->type == Dart_CObject_kInt32) { + fd = (int64_t)fdArg->value.as_int32; + } else { + fd = fdArg->value.as_int64; + } + + if (fd != -1) { + http_parser_settings settings; + + settings.on_message_begin = [](http_parser *parser) { return 0; }; + + settings.on_headers_complete = [](http_parser *parser) { + Dart_CObject type; + type.type = Dart_CObject_kInt32; + type.value.as_int32 = 2; + Dart_CObject value; + value.type = Dart_CObject_kInt32; + value.value.as_int32 = parser->method; + Dart_CObject *values[2] = {&type, &value}; + Dart_CObject out; + out.type = Dart_CObject_kArray; + out.value.as_array.length = 2; + out.value.as_array.values = values; + Dart_PostCObject(thePort, &out); + sendInt(100); + return 0; + }; + + settings.on_message_complete = [](http_parser *parser) { + sendInt(200); + return 0; + }; + + settings.on_chunk_complete = [](http_parser *parser) { return 0; }; + + settings.on_chunk_header = [](http_parser *parser) { return 0; }; + + settings.on_url = [](http_parser *parser, const char *at, size_t length) { + sendString(0); + return 0; + }; + + settings.on_header_field = [](http_parser *parser, const char *at, + size_t length) { + theStruct.lastHeader = std::string(at, length); + return 0; + }; + + settings.on_header_value = [](http_parser *parser, const char *at, + size_t length) { + if (!theStruct.lastHeader.empty()) { + std::string vStr(at, length); + Dart_CObject type; + type.type = Dart_CObject_kInt32; + type.value.as_int32 = 1; + Dart_CObject name; + name.type = Dart_CObject_kString; + name.value.as_string = (char *)theStruct.lastHeader.c_str(); + Dart_CObject value; + value.type = Dart_CObject_kString; + value.value.as_string = (char *)vStr.c_str(); + Dart_CObject *values[3] = {&type, &name, &value}; + Dart_CObject out; + out.type = Dart_CObject_kArray; + out.value.as_array.length = 3; + out.value.as_array.values = values; + Dart_PostCObject(thePort, &out); + theStruct.lastHeader.clear(); + } + return 0; + }; + + settings.on_body = [](http_parser *parser, const char *at, size_t length) { + Dart_CObject obj; + obj.type = Dart_CObject_kTypedData; + obj.value.as_typed_data.type = Dart_TypedData_kUint8; + obj.value.as_typed_data.length = length; + obj.value.as_typed_data.values = (uint8_t *)at; + Dart_PostCObject(thePort, &obj); + return 0; + }; + + size_t len = 80 * 1024, nparsed = 0; + char buf[len]; + ssize_t recved = 0; + memset(buf, 0, sizeof(buf)); + // http_parser parser; + auto *parser = (http_parser *)malloc(sizeof(http_parser)); + http_parser_init(parser, HTTP_BOTH); + parser->data = &httpData; + + while ((recved = recv(fd, buf, len, 0)) >= 0) { + if (false) // (isUpgrade) + { + // send_string(&parser, buf, (size_t)recved, 7, true); + } else { + /* Start up / continue the parser. + * Note we pass recved==0 to signal that EOF has been received. + */ + nparsed = http_parser_execute(parser, &settings, buf, recved); + + if (nparsed != recved) { + // TODO: End it...! + } else if (recved == 0) { + break; + } + + // if ((isUpgrade = parser.upgrade) == 1) + // { + // send_notification(&parser, 6); + // } + // else if (nparsed != recved) + // { + // close(rq->sock); + // return; + // } + } + + // memset(buf, 0, len); + } + } +} *socket = (WingsSocket *)ptr; + socket->start(arguments); +} + +struct wingsSockName { + sa_family_t family; + sockaddr_in v4; + sockaddr_in6 v6; + + struct sockaddr *ptr() const { + if (family == AF_INET6) { + return (sockaddr *)&v6; + } else { + return (sockaddr *)&v4; + } + } + + void *addrPtr() const { + if (family == AF_INET6) { + return (void *)&v6.sin6_addr; + } else { + return (void *)&v4.sin_addr; + } + } + + socklen_t length() const { + if (family == AF_INET6) { + return sizeof(v6); + } else { + return sizeof(v4); + } + } +}; + +void wingsThrowOSError() { + wingsThrowError(strerror(errno), "dart:io", "OSError", errno); +} + +bool wingsReadSocket(Dart_NativeArguments arguments, wingsSockName *out) { + uint64_t ptr; + Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0); + HandleError(Dart_IntegerToUint64(pointerHandle, &ptr)); + + auto *socket = (WingsSocket *)ptr; + int fd = socket->getFD(); + + socklen_t len; + out->family = socket->getFamily(); + len = out->length(); + + int result; + + // result = connect(fd, out->ptr(), len); + + // if (result < 0) + // { + // wingsThrowOSError(); + // return false; + // } + + result = getsockname(fd, out->ptr(), &len); + + if (result == -1) { + wingsThrowOSError(); + return false; + } + + return true; +} + +void Dart_WingsSocket_getAddress(Dart_NativeArguments arguments) { + wingsSockName sock; + if (wingsReadSocket(arguments, &sock)) { + char addrBuf[INET6_ADDRSTRLEN + 1] = {0}; + + auto *result = + inet_ntop(sock.family, sock.addrPtr(), addrBuf, sock.length()); + + if (result == NULL) { + wingsThrowOSError(); + } + + Dart_Handle outHandle = Dart_NewStringFromCString(addrBuf); + Dart_SetReturnValue(arguments, outHandle); + } +} + +void Dart_WingsSocket_getPort(Dart_NativeArguments arguments) { + wingsSockName sock; + if (wingsReadSocket(arguments, &sock)) { + Dart_Handle outHandle; + + if (sock.family == AF_INET6) { + outHandle = Dart_NewIntegerFromUint64(ntohs(sock.v6.sin6_port)); + } else { + outHandle = Dart_NewIntegerFromUint64(ntohs(sock.v4.sin_port)); + } + + Dart_SetReturnValue(arguments, outHandle); + } +} + +void Dart_WingsSocket_write(Dart_NativeArguments arguments) { + int64_t fd; + void *data; + Dart_TypedData_Type type; + intptr_t len; + Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle dataHandle = Dart_GetNativeArgument(arguments, 1); + HandleError(Dart_IntegerToInt64(fdHandle, &fd)); + HandleError(Dart_TypedDataAcquireData(dataHandle, &type, &data, &len)); + write(fd, data, len); + HandleError(Dart_TypedDataReleaseData(dataHandle)); +} + +void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments) { + int64_t fd; + Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0); + HandleError(Dart_IntegerToInt64(fdHandle, &fd)); + close(fd); +} + +void Dart_WingsSocket_close(Dart_NativeArguments arguments) { + Dart_Port port; + uint64_t ptr; + Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0); + Dart_Handle sendPortHandle = Dart_GetNativeArgument(arguments, 1); + HandleError(Dart_IntegerToUint64(pointerHandle, &ptr)); + HandleError(Dart_SendPortGetId(sendPortHandle, &port)); + + auto *socket = (WingsSocket *)ptr; + socket->decrRef(port); +} \ No newline at end of file diff --git a/packages/wings/lib/src/wings_driver.dart b/packages/wings/lib/src/wings_driver.dart new file mode 100644 index 00000000..20a2eb5b --- /dev/null +++ b/packages/wings/lib/src/wings_driver.dart @@ -0,0 +1,98 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io' show Cookie; +import 'dart:typed_data'; +import 'package:angel_framework/angel_framework.dart'; +import 'wings_request.dart'; +import 'wings_response.dart'; +import 'wings_socket.dart'; + +Future startSharedWings(dynamic addr, int port) { + return WingsSocket.bind(addr, port, shared: true); +} + +class AngelWings extends Driver { + factory AngelWings(Angel app) { + return AngelWings.custom(app, WingsSocket.bind); + } + + AngelWings.custom( + Angel app, Future Function(dynamic, int) serverGenerator) + : super(app, serverGenerator); + + @override + void addCookies(int response, Iterable cookies) { + for (var cookie in cookies) { + setHeader(response, 'set-cookie', cookie.toString()); + } + } + + @override + Future close() async { + await server?.close(); + return super.close(); + } + + @override + Future closeResponse(int response) { + closeNativeSocketDescriptor(response); + return Future.value(); + } + + @override + Future createRequestContext( + WingsClientSocket request, int response) { + return WingsRequestContext.from(app, request); + } + + @override + Future createResponseContext( + WingsClientSocket request, int response, + [WingsRequestContext correspondingRequest]) { + return Future.value(WingsResponseContext( + app, request.fileDescriptor, correspondingRequest)); + } + + @override + Stream createResponseStreamFromRawRequest(WingsClientSocket request) { + return Stream.fromIterable([request.fileDescriptor]); + } + + @override + void setChunkedEncoding(int response, bool value) { + // TODO: implement setChunkedEncoding + } + + @override + void setContentLength(int response, int length) { + writeStringToResponse(response, 'content-length: $length\r\n'); + } + + @override + void setHeader(int response, String key, String value) { + writeStringToResponse(response, '$key: $value\r\n'); + } + + @override + void setStatusCode(int response, int value) { + // HTTP-Version SP Status-Code SP Reason-Phrase CRLF + writeStringToResponse(response, 'HTTP/1.1 $value\r\n'); + } + + @override + Uri get uri { + return Uri(scheme: 'http', host: server.address.address, port: server.port); + } + + @override + void writeStringToResponse(int response, String value) { + writeToResponse(response, utf8.encode(value)); + } + + @override + void writeToResponse(int response, List data) { + var buf = data is Uint8List ? data : Uint8List.fromList(data); + writeToNativeSocket(response, buf); + } +} diff --git a/packages/wings/lib/src/wings_request.dart b/packages/wings/lib/src/wings_request.dart new file mode 100644 index 00000000..bb066028 --- /dev/null +++ b/packages/wings/lib/src/wings_request.dart @@ -0,0 +1,215 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; +import 'dart:typed_data'; +import 'package:angel_container/angel_container.dart'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:mock_request/mock_request.dart'; +import 'wings_socket.dart'; + +enum _ParseState { method, url, headerField, headerValue, body } + +final RegExp _straySlashes = RegExp(r'(^/+)|(/+$)'); + +class WingsRequestContext extends RequestContext { + final WingsClientSocket rawRequest; + final Container container; + + final StreamController> _body = StreamController(); + List _cookies, __cookies; + final LockableMockHttpHeaders _headers = LockableMockHttpHeaders(); + final RawReceivePort _recv; + InternetAddress _remoteAddress; + String _method, _override, _path; + Uri _uri; + + @override + Angel app; + + WingsRequestContext._(this.app, this.rawRequest, this._recv) + : container = app.container.createChild(); + + Future close() async { + await _body.close(); + _recv.close(); + await super.close(); + } + + static const int DELETE = 0, + GET = 1, + HEAD = 2, + POST = 3, + PUT = 4, + CONNECT = 5, + OPTIONS = 6, + TRACE = 7, + COPY = 8, + LOCK = 9, + MKCOL = 10, + MOVE = 11, + PROPFIND = 12, + PROPPATCH = 13, + SEARCH = 14, + UNLOCK = 15, + BIND = 16, + REBIND = 17, + UNBIND = 18, + ACL = 19, + REPORT = 20, + MKACTIVITY = 21, + CHECKOUT = 22, + MERGE = 23, + MSEARCH = 24, + NOTIFY = 25, + SUBSCRIBE = 26, + UNSUBSCRIBE = 27, + PATCH = 28, + PURGE = 29, + MKCALENDAR = 30, + LINK = 31, + UNLINK = 32, + SOURCE = 33; + + static String methodToString(int method) { + switch (method) { + case DELETE: + return 'DELETE'; + case GET: + return 'GET'; + case HEAD: + return 'HEAD'; + case POST: + return 'POST'; + case PUT: + return 'PUT'; + case CONNECT: + return 'CONNECT'; + case OPTIONS: + return 'OPTIONS'; + case PATCH: + return 'PATCH'; + case PURGE: + return 'PURGE'; + default: + throw ArgumentError('Unknown method $method.'); + } + } + + static Future from(Angel app, WingsClientSocket socket) { + // var state = _ParseState.url; + var c = Completer(); + var recv = RawReceivePort(); + var rq = WingsRequestContext._(app, socket, recv); + rq._remoteAddress = socket.remoteAddress; + var ct = StreamController(); + recv.handler = ct.add; + recv.handler = (ee) { + if (ee is Uint8List) { + if (!rq._body.isClosed) rq._body.add(ee); + } else if (ee is List) { + var type = ee[0] as int; + + if (type == 2) { + rq._method = methodToString(ee[1] as int); + } else { + var value = ee[1] as String; + + if (type == 0) { + rq._uri = Uri.parse(value); + var path = rq._uri.path.replaceAll(_straySlashes, ''); + if (path.isEmpty) path = '/'; + rq._path = path; + } else if (type == 1) { + var k = value, v = ee[2] as String; + if (k == 'cookie') { + rq.__cookies.add(Cookie.fromSetCookieValue(v)); + } else { + rq._headers.add(k, v); + } + } else { + // print("h: $ee');"); + } + } + } else if (ee == 100) { + // Headers done, just listen for body. + c.complete(rq); + } else if (ee == 200) { + // Message complete. + rq._body.close(); + } + // if (state == _ParseState.url) { + // rq._uri = Uri.parse(e as String); + // var path = rq._uri.path.replaceAll(_straySlashes, ''); + // if (path.isEmpty) path = '/'; + // rq._path = path; + // state = _ParseState.headerField; + // } else if (state == _ParseState.headerField) { + // if (e == 0) { + // state = _ParseState.method; + // } else { + // lastHeader = e as String; //Uri.decodeFull(e as String); + // state = _ParseState.headerValue; + // } + // } else if (state == _ParseState.headerValue) { + // if (e == 0) { + // state = _ParseState.method; + // } else { + // var value = e as String; //Uri.decodeFull(e as String); + // if (lastHeader != null) { + // if (lastHeader == 'cookie') { + // rq.__cookies.add(Cookie.fromSetCookieValue(value)); + // } else { + // rq._headers.add(lastHeader, value); + // } + // lastHeader = null; + // } + // } + // state = _ParseState.headerField; + // } else if (state == _ParseState.method) { + // rq._method = methodToString(e as int); + // state = _ParseState.body; + // c.complete(rq); + // } else if (state == _ParseState.body) { + // if (e == 1) { + // rq._body.close(); + // } else { + // rq._body.add(e as List); + // } + // } + }; + wingsParseHttp().send([recv.sendPort, socket.fileDescriptor]); + return c.future; + } + + @override + Stream> get body => _body.stream; + + @override + List get cookies => _cookies ??= List.unmodifiable(__cookies); + + @override + HttpHeaders get headers => _headers; + + @override + String get hostname => headers.value('host'); + + @override + String get method => _override ??= + (headers.value('x-http-method-override')?.toUpperCase() ?? _method); + + @override + String get originalMethod => _method; + + @override + String get path => _path; + + @override + InternetAddress get remoteAddress => _remoteAddress; + + @override + // TODO: implement session + HttpSession get session => null; + + @override + Uri get uri => _uri; +} diff --git a/packages/wings/lib/src/wings_response.dart b/packages/wings/lib/src/wings_response.dart new file mode 100644 index 00000000..cdb22099 --- /dev/null +++ b/packages/wings/lib/src/wings_response.dart @@ -0,0 +1,215 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:charcode/ascii.dart'; +import 'wings_request.dart'; +import 'wings_socket.dart'; + +class WingsResponseContext extends ResponseContext { + @override + final Angel app; + + @override + final WingsRequestContext correspondingRequest; + + LockableBytesBuilder _buffer; + + @override + final int rawResponse; + + bool _isDetached = false, _isClosed = false, _streamInitialized = false; + + WingsResponseContext(this.app, this.rawResponse, [this.correspondingRequest]); + + Iterable __allowedEncodings; + + Iterable get _allowedEncodings { + return __allowedEncodings ??= correspondingRequest.headers + .value('accept-encoding') + ?.split(',') + ?.map((s) => s.trim()) + ?.where((s) => s.isNotEmpty) + ?.map((str) { + // Ignore quality specifications in accept-encoding + // ex. gzip;q=0.8 + if (!str.contains(';')) return str; + return str.split(';')[0]; + }); + } + + bool _openStream() { + if (!_streamInitialized) { + // If this is the first stream added to this response, + // then add headers, status code, etc. + var outHeaders = {}; + var statusLine = + utf8.encode('HTTP/1.1 $statusCode').followedBy([$cr, $lf]); + writeToNativeSocket(rawResponse, Uint8List.fromList(statusLine.toList())); + + headers.forEach((k, v) => outHeaders[k] = v); + + if (headers.containsKey('content-length')) { + var l = int.tryParse(headers['content-length']); + if (l != null) { + outHeaders['content-length'] = l.toString(); + } + } + + if (contentType != null) { + outHeaders['content-type'] = contentType.toString(); + } + + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) { + encoder = encoders[encodingName]; + } else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + outHeaders['content-encoding'] = key; + break; + } + } + } + } + + void _wh(String k, String v) { + // var vv =Uri.encodeComponent(v); + var vv = v; + var headerLine = utf8.encode('$k: $vv').followedBy([$cr, $lf]); + writeToNativeSocket( + rawResponse, Uint8List.fromList(headerLine.toList())); + } + + outHeaders.forEach(_wh); + + for (var c in cookies) { + _wh('set-cookie', c.toString()); + } + + writeToNativeSocket(rawResponse, Uint8List.fromList([$cr, $lf])); + + //_isClosed = true; + return _streamInitialized = true; + } + + return false; + } + + @override + Future addStream(Stream> stream) { + if (_isClosed && isBuffered) throw ResponseContext.closed(); + _openStream(); + + Stream> output = stream; + + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) { + encoder = encoders[encodingName]; + } else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + output = encoders[key].bind(output); + break; + } + } + } + } + + return output.forEach((buf) { + if (!_isClosed) { + writeToNativeSocket( + rawResponse, buf is Uint8List ? buf : Uint8List.fromList(buf)); + } + }); + } + + @override + void add(List data) { + if (_isClosed && isBuffered) { + throw ResponseContext.closed(); + } else if (!isBuffered) { + if (!_isClosed) { + _openStream(); + + if (encoders.isNotEmpty && correspondingRequest != null) { + if (_allowedEncodings != null) { + for (var encodingName in _allowedEncodings) { + Converter, List> encoder; + String key = encodingName; + + if (encoders.containsKey(encodingName)) { + encoder = encoders[encodingName]; + } else if (encodingName == '*') { + encoder = encoders[key = encoders.keys.first]; + } + + if (encoder != null) { + data = encoders[key].convert(data); + break; + } + } + } + } + + writeToNativeSocket( + rawResponse, data is Uint8List ? data : Uint8List.fromList(data)); + } + } else { + buffer.add(data); + } + } + + @override + Future close() async { + if (!_isDetached) { + if (!_isClosed) { + _isClosed = true; + if (!isBuffered) { + _openStream(); + closeNativeSocketDescriptor(rawResponse); + } else { + _buffer.lock(); + } + } + + await correspondingRequest?.close(); + await super.close(); + } + } + + @override + BytesBuilder get buffer => _buffer; + + @override + int detach() { + _isDetached = true; + return rawResponse; + } + + @override + bool get isBuffered => _buffer != null; + + @override + bool get isOpen => !_isClosed && !_isDetached; + + @override + void useBuffer() { + _buffer = LockableBytesBuilder(); + } +} diff --git a/packages/wings/lib/src/wings_socket.cc b/packages/wings/lib/src/wings_socket.cc new file mode 100644 index 00000000..7105b2ee --- /dev/null +++ b/packages/wings/lib/src/wings_socket.cc @@ -0,0 +1,129 @@ +#include "wings_socket.h" +#include +#include +#include +using namespace wings; + +std::vector wings::globalSocketList; + +bool WingsSocketInfo::equals(const WingsSocketInfo &right) const { + // std::cout << address << " vs " << right.address << std::endl; + // std::cout << port << " vs " << right.port << std::endl; + return (strcmp(address, right.address) == 0) && port == right.port; +} + +WingsSocket::WingsSocket(sa_family_t family, int sockfd, + const WingsSocketInfo &info) + : sockfd(sockfd), info(info), family(family) { + portIterator = sendPorts.begin(); + open = true; + refCount = 0; + workerThread = nullptr; + this->info.address = strdup(info.address); +} + +void WingsSocket::incrRef(Dart_Port port) { + refCount++; + sendPorts.push_back(port); +} + +void WingsSocket::decrRef(Dart_Port port) { + auto it = std::find(sendPorts.begin(), sendPorts.end(), port); + + if (it != sendPorts.end()) { + sendPorts.erase(it); + } + + refCount--; + + if (refCount <= 0 && open) { + close(sockfd); + open = false; + } +} + +Dart_Port WingsSocket::nextPort() { + portIterator++; + if (portIterator == sendPorts.end()) + portIterator = sendPorts.begin(); + return *portIterator; +} + +const WingsSocketInfo &WingsSocket::getInfo() const { return info; } + +int WingsSocket::getFD() const { return sockfd; } + +sa_family_t WingsSocket::getFamily() const { return family; } + +bool WingsSocket::isClosed() const { return !open; } + +void WingsSocket::start(Dart_NativeArguments arguments) { + // if (workerThread == nullptr) + // { + // workerThread = std::make_unique(threadCallback, this); + // } + Dart_Port service_port = + Dart_NewNativePort("WingsThreadCallback", &threadCallback, true); + Dart_Handle send_port = Dart_NewSendPort(service_port); + Dart_SetReturnValue(arguments, send_port); +} + +void WingsSocket::threadCallback(Dart_Port dest_port_id, + Dart_CObject *message) { + + WingsSocket *socket = nullptr; + Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; + Dart_CObject *ptrArg = message->value.as_array.values[1]; + + // If there are no listeners, quit. + if (ptrArg->type == Dart_CObject_kInt32) { + auto as64 = (int64_t)ptrArg->value.as_int32; + socket = (WingsSocket *)as64; + } else { + socket = (WingsSocket *)ptrArg->value.as_int64; + } + + if (socket != nullptr) { + if (socket->sendPorts.empty() || socket->isClosed()) { + return; + } + + int sock; + unsigned long index = 0; + sockaddr addr; + socklen_t len; + + if ((sock = accept(socket->sockfd, &addr, &len)) != -1) { + char addrBuf[INET6_ADDRSTRLEN] = {0}; + + if (addr.sa_family == AF_INET6) { + auto as6 = (sockaddr_in6 *)&addr; + inet_ntop(addr.sa_family, &(as6->sin6_addr), addrBuf, len); + } else { + auto as4 = (sockaddr_in *)&addr; + inet_ntop(AF_INET, &(as4->sin_addr), addrBuf, len); + } + + Dart_CObject fdObj; + fdObj.type = Dart_CObject_kInt64; + fdObj.value.as_int64 = sock; + + Dart_CObject addrObj; + addrObj.type = Dart_CObject_kString; + addrObj.value.as_string = addrBuf; + + Dart_CObject *values[2] = {&fdObj, &addrObj}; + + Dart_CObject obj; + obj.type = Dart_CObject_kArray; + obj.value.as_array.length = 2; + obj.value.as_array.values = values; + + // Dart_PostCObject(outPort, &obj); + // Dispatch the fd to the next listener. + auto port = socket->nextPort(); + Dart_PostCObject(port, &obj); + // Dart_PostCObject(outPort, &obj); + } + } +} \ No newline at end of file diff --git a/packages/wings/lib/src/wings_socket.dart b/packages/wings/lib/src/wings_socket.dart new file mode 100644 index 00000000..f2222e2a --- /dev/null +++ b/packages/wings/lib/src/wings_socket.dart @@ -0,0 +1,130 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; +import 'dart:typed_data'; +import 'dart-ext:angel_wings'; + +int bindWingsIPv4ServerSocket( + String address, + int port, + bool shared, + int backlog, + bool v6Only, + SendPort sendPort) native 'Dart_WingsSocket_bindIPv4'; + +int bindWingsIPv6ServerSocket( + String address, + int port, + bool shared, + int backlog, + bool v6Only, + SendPort sendPort) native 'Dart_WingsSocket_bindIPv6'; + +String getWingsServerSocketAddress(int pointer) + native 'Dart_WingsSocket_getAddress'; + +int getWingsServerSocketPort(int pointer) native 'Dart_WingsSocket_getPort'; + +void writeToNativeSocket(int fd, Uint8List data) + native 'Dart_WingsSocket_write'; + +void closeNativeSocketDescriptor(int fd) + native 'Dart_WingsSocket_closeDescriptor'; + +SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen'; + +void closeWingsSocket(int pointer, SendPort sendPort) + native 'Dart_WingsSocket_close'; + +SendPort wingsParseHttp() native 'Dart_WingsSocket_parseHttp'; + +class WingsClientSocket { + final int fileDescriptor; + final InternetAddress remoteAddress; + + WingsClientSocket(this.fileDescriptor, this.remoteAddress); +} + +class WingsSocket extends Stream { + final StreamController _ctrl = StreamController(); + SendPort _acceptor; + InternetAddress _address; + final int _pointer; + final RawReceivePort _recv; + bool _open = true; + int _port; + + WingsSocket._(this._pointer, this._recv) { + _acceptor = wingsSocketListen(_pointer); + _recv.handler = (h) { + if (!_ctrl.isClosed) { + _ctrl.add( + WingsClientSocket(h[0] as int, InternetAddress(h[1] as String))); + _acceptor.send([_recv.sendPort, _pointer]); + } + }; + + _acceptor.send([_recv.sendPort, _pointer]); + } + + static Future bind(address, int port, + {bool shared = false, int backlog = 0, bool v6Only = false}) async { + var recv = RawReceivePort(); + int ptr; + InternetAddress addr; + + if (address is InternetAddress) { + addr = address; + } else if (address is String) { + var addrs = await InternetAddress.lookup(address); + if (addrs.isNotEmpty) { + addr = addrs[0]; + } else { + throw StateError('Internet address lookup failed: $address'); + } + } else { + throw ArgumentError.value( + address, 'address', 'must be an InternetAddress or String'); + } + + try { + if (addr.type == InternetAddressType.IPv6) { + ptr = bindWingsIPv6ServerSocket( + addr.address, port, shared, backlog, v6Only, recv.sendPort); + } else { + ptr = bindWingsIPv4ServerSocket( + addr.address, port, shared, backlog, v6Only, recv.sendPort); + } + + return WingsSocket._(ptr, recv); //.._address = addr; + } catch (e) { + recv.close(); + rethrow; + } + } + + InternetAddress get address => + _address ??= InternetAddress(getWingsServerSocketAddress(_pointer)); + + int get port => _port ??= getWingsServerSocketPort(_pointer); + + @override + StreamSubscription listen( + void Function(WingsClientSocket event) onData, + {Function onError, + void Function() onDone, + bool cancelOnError}) { + return _ctrl.stream + .listen(onData, onError: onError, cancelOnError: cancelOnError); + } + + Future close(){ + if (_open) { + _open = false; + closeWingsSocket(_pointer, _recv.sendPort); + _recv.close(); + _ctrl.close(); + } + return Future.value(); + } +} diff --git a/packages/wings/lib/src/wings_socket.h b/packages/wings/lib/src/wings_socket.h new file mode 100644 index 00000000..37336917 --- /dev/null +++ b/packages/wings/lib/src/wings_socket.h @@ -0,0 +1,54 @@ +#ifndef WINGS_SOCKET_H +#define WINGS_SOCKET_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace wings { +struct WingsSocketInfo { + const char *address; + uint64_t port; + bool shared; + uint64_t backlog; + bool v6Only; + Dart_Handle sendPortHandle; + bool equals(const WingsSocketInfo &right) const; +}; + +class WingsSocket { +public: + WingsSocket(sa_family_t family, int sockfd, const WingsSocketInfo &info); + void incrRef(Dart_Port port); + void decrRef(Dart_Port port); + const WingsSocketInfo &getInfo() const; + void start(Dart_NativeArguments arguments); + int getFD() const; + bool isClosed() const; + sa_family_t getFamily() const; + Dart_Port nextPort(); + +private: + static void threadCallback(Dart_Port dest_port_id, Dart_CObject *message); + WingsSocketInfo info; + std::list::iterator portIterator; + int sockfd; + int refCount; + bool open; + sa_family_t family; + std::unique_ptr workerThread; + std::list sendPorts; +}; + +extern std::vector globalSocketList; +} // namespace wings + +#endif \ No newline at end of file diff --git a/packages/wings/libangel_wings.dylib b/packages/wings/libangel_wings.dylib new file mode 100755 index + + \ No newline at end of file diff --git a/packages/wings/web/site.css b/packages/wings/web/site.css new file mode 100644 index 00000000..c2140b8b --- /dev/null +++ b/packages/wings/web/site.css @@ -0,0 +1,3 @@ +h1 { + color: blue; +} \ No newline at end of file