diff --git a/.gitignore b/.gitignore index 2e7f80df..b87ee36c 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ doc/api/ # project includes source files written in JavaScript. *.js_ *.js.deps -*.js.map \ No newline at end of file +*.js.map + +*.o +*.dylib +*.a \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..33a6dc20 --- /dev/null +++ b/Makefile @@ -0,0 +1,46 @@ +CXX=clang +HTTP_PARSER=.dart_tool/build_native/third_party/angel_wings.http_parser +CXX_INCLUDES=-I$(HTTP_PARSER) -I$(DART_SDK)/include + +.PHONY: clean debug + +clean: + find lib -name "*.a" -delete + find lib -name "*.o" -delete + find lib -name "*.dylib" -delete + +debug: + $(MAKE) lib/src/libwings.dylib CXXFLAGS="-g -DDEBUG=1" + +example: debug + lldb -o "target create dart" \ + -o "process launch --stop-at-entry example/main.dart" \ + -o "process handle SIGINT -p true" \ + -o "continue" \ + +lib/src/bind_socket.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/bind_socket.o lib/src/bind_socket.cc + +lib/src/http_listener.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/http_listener.o lib/src/http_listener.cc + +lib/src/http_parser.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -c -o lib/src/http_parser.o $(HTTP_PARSER)/http_parser.c + +lib/src/send.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/send.o lib/src/send.cc + +lib/src/util.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/util.o lib/src/util.cc + +lib/src/wings.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/wings.o lib/src/wings.cc + +lib/src/worker_thread.o: + $(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/worker_thread.o lib/src/worker_thread.cc + +lib/src/libwings.dylib: lib/src/bind_socket.o lib/src/http_listener.o lib/src/http_parser.o lib/src/send.o lib/src/util.o lib/src/wings.o lib/src/worker_thread.o + $(CXX) $(CXXFLAGS) -shared -o lib/src/libwings.dylib -undefined dynamic_lookup -DDART_SHARED_LIB -Wl -fPIC -m64 \ + lib/src/bind_socket.o lib/src/http_listener.o \ + lib/src/http_parser.o lib/src/send.o lib/src/util.o \ + lib/src/wings.o lib/src/worker_thread.o \ No newline at end of file diff --git a/example/main.dart b/example/main.dart index 8d0f950b..204795ae 100644 --- a/example/main.dart +++ b/example/main.dart @@ -5,6 +5,7 @@ import 'package:angel_framework/angel_framework.dart'; import 'package:angel_wings/angel_wings.dart'; main() async { + if (false) for (int i = 1; i < Platform.numberOfProcessors; i++) { var onError = new ReceivePort(); Isolate.spawn(isolateMain, i, onError: onError.sendPort); @@ -17,9 +18,16 @@ main() async { void isolateMain(int id) { var app = new Angel(); - var wings = new AngelWings(app, shared: true); + var wings = new AngelWings(app, shared: id > 0, useZone: false); - app.get('/', 'Hello, native world!'); + var old = app.errorHandler; + app.errorHandler = (e, req, res) { + print(e); + print(e.stackTrace); + return old(e, req, res); + }; + + app.get('/', 'Hello, native world! This is isolate #$id.'); wings.startServer('127.0.0.1', 3000).then((_) { print( diff --git a/lib/angel_wings.dart b/lib/angel_wings.dart index 5fb2763a..7029acd0 100644 --- a/lib/angel_wings.dart +++ b/lib/angel_wings.dart @@ -10,6 +10,7 @@ import 'package:angel_framework/angel_framework.dart'; import 'package:body_parser/body_parser.dart'; import 'package:combinator/combinator.dart'; import 'package:http_parser/http_parser.dart'; +import 'package:json_god/json_god.dart' as god; import 'package:mock_request/mock_request.dart'; import 'package:pool/pool.dart'; import 'package:pooled_map/pooled_map.dart'; diff --git a/lib/src/bind_socket.cc b/lib/src/bind_socket.cc index 995ef857..fa077c90 100644 --- a/lib/src/bind_socket.cc +++ b/lib/src/bind_socket.cc @@ -34,7 +34,11 @@ void wings_BindSocket(Dart_NativeArguments arguments) std::string addressStringInstance(addressString); std::lock_guard lock(serverInfoVectorMutex); +#if __APPLE__ + if (false) +#else if (shared) +#endif { for (unsigned long i = 0; i < serverInfoVector.size(); i++) { @@ -42,7 +46,7 @@ void wings_BindSocket(Dart_NativeArguments arguments) if (server_info->addressString == addressStringInstance && server_info->port == port) { - existingIndex = (long) i; + existingIndex = (long)i; break; } } @@ -113,7 +117,7 @@ void wings_BindSocket(Dart_NativeArguments arguments) return; } - /* +#if __APPLE__ ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i)); if (ret < 0) @@ -121,7 +125,7 @@ void wings_BindSocket(Dart_NativeArguments arguments) Dart_ThrowException(Dart_NewStringFromCString("Cannot reuse port for socket.")); return; } - */ +#endif if (addressLength > 4) { diff --git a/lib/src/dart_debug.h b/lib/src/dart_debug.h new file mode 100644 index 00000000..90789d40 --- /dev/null +++ b/lib/src/dart_debug.h @@ -0,0 +1,27 @@ +#ifdef __cplusplus +#include +#else +#include +#endif +#include + +Dart_Handle ToCString(Dart_Handle obj, const char** out) { + Dart_Handle toStringMethod = Dart_NewStringFromCString("toString"); + Dart_Handle string = Dart_Invoke(obj, toStringMethod, 0, nullptr); + return Dart_StringToCString(string, out); +} + +Dart_Handle Dart_PrintToFile(Dart_Handle obj, FILE* stream) { + const char *toString; + Dart_Handle result = ToCString(obj, &toString); + + if (Dart_IsError(result)) + return result; + + fprintf(stream, "%s\n", toString); + return Dart_Null(); +} + +Dart_Handle Dart_Print(Dart_Handle obj) { + return Dart_PrintToFile(obj, stdout); +} \ No newline at end of file diff --git a/lib/src/http_listener.cc b/lib/src/http_listener.cc index 534466b6..c5a74d4c 100644 --- a/lib/src/http_listener.cc +++ b/lib/src/http_listener.cc @@ -1,5 +1,6 @@ #include #include +#include #include "wings.h" #include "wings_thread.h" @@ -47,4 +48,22 @@ void handleMessage(Dart_Port destPortId, Dart_CObject *message) Dart_Port port = message->value.as_array.values[1]->value.as_send_port.id; Dart_CloseNativePort(port); } + else + { + // This is either a send or close message. + int sockfd = (int)get_int(message->value.as_array.values[0]); + printf("FD: %d\n", sockfd); + + if (message->value.as_array.length == 2) + { + auto *msg = message->value.as_array.values[1]; + printf("Length: %ld\n", msg->value.as_typed_data.length); + write(sockfd, msg->value.as_typed_data.values, (size_t)msg->value.as_typed_data.length); + } + else + { + printf("Close!\n"); + close(sockfd); + } + } } \ No newline at end of file diff --git a/lib/src/send.cc b/lib/src/send.cc index 45c0414d..cfcd222e 100644 --- a/lib/src/send.cc +++ b/lib/src/send.cc @@ -1,3 +1,6 @@ +#ifdef DEBUG +#include "dart_debug.h" +#endif #include "wings.h" void wings_CloseSocket(Dart_NativeArguments arguments) @@ -16,7 +19,12 @@ void wings_Send(Dart_NativeArguments arguments) Dart_TypedData_Type dataType; void *dataBytes; intptr_t dataLength; + +#ifdef DEBUG + HandleError(Dart_Print(sockfdHandle)); +#endif HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd)); HandleError(Dart_TypedDataAcquireData(dataHandle, &dataType, &dataBytes, &dataLength)); write((int)sockfd, dataBytes, (size_t)dataLength); + HandleError(Dart_TypedDataReleaseData(dataHandle)); } \ No newline at end of file diff --git a/lib/src/wings.cc b/lib/src/wings.cc index 41ab5b15..f884e82b 100644 --- a/lib/src/wings.cc +++ b/lib/src/wings.cc @@ -1,5 +1,5 @@ #include -#include +//#include #include #include #include "wings.h" @@ -25,7 +25,13 @@ DART_EXPORT Dart_Handle wings_Init(Dart_Handle parent_library) Dart_Handle HandleError(Dart_Handle handle) { if (Dart_IsError(handle)) + { +#ifdef DEBUG + Dart_DumpNativeStackTrace(NULL); +#endif Dart_PropagateError(handle); + } + return handle; } diff --git a/lib/src/wings.dart b/lib/src/wings.dart index 67faf25a..d1d90ab2 100644 --- a/lib/src/wings.dart +++ b/lib/src/wings.dart @@ -76,8 +76,9 @@ class AngelWings { final RawReceivePort _recv = new RawReceivePort(); final Map _sessions = {}; - final PooledMap _staging = - new PooledMap(); + final Map _staging = {}; + //final PooledMap _staging = + // new PooledMap(); final Uuid _uuid = new Uuid(); InternetAddress _address; int _port; @@ -88,6 +89,32 @@ class AngelWings { static SendPort _startHttpListener() native "StartHttpListener"; + final Pool _pool = new Pool(1); + + static void __send(int sockfd, Uint8List data) native "Send"; + + static void __closeSocket(int sockfd) native "CloseSocket"; + + void _send(int sockfd, Uint8List data) { + // _pool.withResource(() { + print('Sending ${[sockfd, data]}'); + _sendPort.send([sockfd, data]); + //}); + //_pool.withResource(() => __send(sockfd, data)); + } + + void _closeSocket(WingsRequestContext req) { + //_pool.withResource(() { + if (!req._closed) { + req._closed = true; + var sockfd = req._sockfd; + print('Sending ${[sockfd]}'); + _sendPort.send([sockfd]); + } + //}); + //_pool.withResource(() => __closeSocket(sockfd)); + } + AngelWings(this.app, {this.shared: false, this.useZone: true}) { _recv.handler = _handleMessage; } @@ -137,34 +164,37 @@ class AngelWings { } void _handleMessage(x) { + print('INPUT: $x'); if (x is String) { close(); throw new StateError(x); } else if (x is List && x.length >= 2) { int sockfd = x[0], command = x[1]; - - WingsRequestContext _newRequest() => - new WingsRequestContext._(this, sockfd, app); + + //WingsRequestContext _newRequest() => + // new WingsRequestContext._(this, sockfd, app); //print(x); switch (command) { case messageBegin: - _staging.putIfAbsent(sockfd, _newRequest); + print('BEGIN $sockfd'); + _staging[sockfd] = new WingsRequestContext._(this, sockfd, app); break; case messageComplete: - // (sockfd, method, major, minor, addrBytes) - _staging.update(sockfd, (rq) { + print('$sockfd in $_staging???'); + var rq = _staging.remove(sockfd); + if (rq != null) { rq._method = methodToString(x[2] as int); rq._addressBytes = x[5] as Uint8List; - return rq; - }, defaultValue: _newRequest).then(_handleRequest); + _handleRequest(rq); + } break; case body: - _staging.update(sockfd, (rq) { + var rq = _staging[sockfd]; + if (rq != null) { (rq._body ??= new StreamController()) .add(x[2] as Uint8List); - return rq; - }, defaultValue: _newRequest); + } break; //case upgrade: // TODO: Handle WebSockets...? @@ -175,27 +205,26 @@ class AngelWings { // onUpgradedMessage(sockfd, x[2]); // break; case url: - _staging.update(sockfd, (rq) => rq..__url = x[2] as String, - defaultValue: _newRequest); + _staging[sockfd]?.__url = x[2] as String; break; case headerField: - _staging.update(sockfd, (rq) => rq.._headerField = x[2] as String, - defaultValue: _newRequest); + _staging[sockfd]?._headerField = x[2] as String; break; case headerValue: - _staging.update(sockfd, (rq) => rq.._headerValue = x[2] as String, - defaultValue: _newRequest); + _staging[sockfd]?._headerValue = x[2] as String; break; } } } Future _handleRequest(WingsRequestContext req) { + print('req: $req'); if (req == null) return new Future.value(); var res = new WingsResponseContext._(req) ..app = app - ..serializer = app.serializer + ..serializer = (app.serializer ?? god.serialize) ..encoders.addAll(app.encoders); + print('Handling fd: ${req._sockfd}'); handle() { var path = req.path; @@ -228,6 +257,7 @@ class AngelWings { Future Function() runPipeline; + print('Pipeline: $pipeline'); for (var handler in pipeline) { if (handler == null) break; @@ -286,7 +316,7 @@ class AngelWings { return handleAngelHttpException(e, trace, req, res); }).catchError((e, StackTrace st) { var trace = new Trace.from(st ?? StackTrace.current).terse; - WingsResponseContext._closeSocket(req._sockfd); + _closeSocket(req); // Ideally, we won't be in a position where an absolutely fatal error occurs, // but if so, we'll need to log it. if (app.logger != null) { @@ -323,9 +353,8 @@ class AngelWings { b.writeln('HTTP/1.1 500 Internal Server Error'); b.writeln(); - WingsResponseContext._send( - req._sockfd, _coerceUint8List(b.toString().codeUnits)); - WingsResponseContext._closeSocket(req._sockfd); + _send(req._sockfd, _coerceUint8List(b.toString().codeUnits)); + _closeSocket(req); } finally { return null; } @@ -350,7 +379,9 @@ class AngelWings { /// Sends a response. Future sendResponse(WingsRequestContext req, WingsResponseContext res, {bool ignoreFinalizers: false}) { + print('Closing: ${req._sockfd}'); if (res.willCloseItself) return new Future.value(); + print('Not self-closing: ${req._sockfd}'); Future finalizers = ignoreFinalizers == true ? new Future.value() @@ -368,6 +399,7 @@ class AngelWings { //request.response.headers.chunkedTransferEncoding = res.chunked ?? true; // TODO: Is there a need to support this? + print('Buffer: ${res.buffer}'); List outputBuffer = res.buffer.toBytes(); if (res.encoders.isNotEmpty) { @@ -405,6 +437,7 @@ class AngelWings { } } + print('Create string buffer'); var b = new StringBuffer(); b.writeln('HTTP/1.1 ${res.statusCode}'); @@ -425,13 +458,20 @@ class AngelWings { } b.writeln(); + print(b); - var buf = new Uint8List.fromList( - new List.from(b.toString().codeUnits)..addAll(outputBuffer)); + var bb = new BytesBuilder(copy: false) + ..add(b.toString().codeUnits) + ..add(outputBuffer); + var buf = _coerceUint8List(bb.takeBytes()); + print('Output: $buf'); return finalizers.then((_) { - WingsResponseContext._send(req._sockfd, buf); - WingsResponseContext._closeSocket(req._sockfd); + print('A'); + _send(req._sockfd, buf); + print('B'); + _closeSocket(req); + print('C'); if (req.injections.containsKey(PoolResource)) { req.injections[PoolResource].release(); diff --git a/lib/src/wings_request.dart b/lib/src/wings_request.dart index d58b9505..dc56400b 100644 --- a/lib/src/wings_request.dart +++ b/lib/src/wings_request.dart @@ -3,6 +3,7 @@ part of angel_wings; class WingsRequestContext extends RequestContext { final AngelWings _wings; final int _sockfd; + bool _closed = false; @override Angel app; @@ -14,7 +15,7 @@ class WingsRequestContext extends RequestContext { final Map _headers = {}; String __contentTypeString; - String __path; + //String __path; String __url; Uint8List _addressBytes; @@ -86,9 +87,9 @@ class WingsRequestContext extends RequestContext { @override String get path { if (_path != null) { - return __path; + return _path; } else { - var path = __path.replaceAll(_straySlashes, ''); + var path = __url?.replaceAll(_straySlashes, '') ?? ''; if (path.isEmpty) path = '/'; return _path = path; } diff --git a/lib/src/wings_response.dart b/lib/src/wings_response.dart index 47abec7b..2cc4a258 100644 --- a/lib/src/wings_response.dart +++ b/lib/src/wings_response.dart @@ -6,23 +6,21 @@ class WingsResponseContext extends ResponseContext { WingsResponseContext._(this.correspondingRequest); - static void _send(int sockfd, Uint8List data) native "Send"; - - static void _closeSocket(int sockfd) native "CloseSocket"; + AngelWings get _wings => correspondingRequest._wings; @override void add(List data) { if (_isClosed && !_useStream) throw ResponseContext.closed(); else if (_useStream) - _send(correspondingRequest._sockfd, _coerceUint8List(data)); + _wings._send(correspondingRequest._sockfd, _coerceUint8List(data)); else buffer.add(data); } @override Future close() { - _closeSocket(correspondingRequest._sockfd); + _wings._closeSocket(correspondingRequest); _isClosed = true; _useStream = false; return super.close(); @@ -79,7 +77,7 @@ class WingsResponseContext extends ResponseContext { } return output.forEach(((data) => - _send(correspondingRequest._sockfd, _coerceUint8List(data)))); + _wings._send(correspondingRequest._sockfd, _coerceUint8List(data)))); } @override @@ -149,7 +147,7 @@ class WingsResponseContext extends ResponseContext { b.writeln(); - _send( + _wings._send( correspondingRequest._sockfd, _coerceUint8List(b.toString().codeUnits)); } } diff --git a/lib/src/worker_thread.cc b/lib/src/worker_thread.cc index fbe0ee15..d98790ce 100644 --- a/lib/src/worker_thread.cc +++ b/lib/src/worker_thread.cc @@ -1,3 +1,4 @@ +//#include #include #include #include "wings_thread.h" @@ -10,25 +11,31 @@ void wingsThreadMain(wings_thread_info *info) while (true) { - std::lock_guard lock(serverInfo->mutex); + std::unique_lock lock(serverInfo->mutex, std::defer_lock); sockaddr client_addr{}; socklen_t client_addr_len; - int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len); - if (client < 0) + if (lock.try_lock()) { - // send_error(info->port, "Failed to accept client socket."); - return; - } + int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len); + lock.unlock(); - requestInfo rq{}; - rq.ipv6 = serverInfo->ipv6; - rq.sock = client; - rq.addr = client_addr; - rq.addr_len = client_addr_len; - rq.port = port; - handleRequest(&rq); + if (client < 0) + { + // send_error(info->port, "Failed to accept client socket."); + return; + } + + //auto rq = std::make_shared(); + auto *rq = new requestInfo; + rq->ipv6 = serverInfo->ipv6; + rq->sock = client; + rq->addr = client_addr; + rq->addr_len = client_addr_len; + rq->port = port; + handleRequest(rq); + } } } @@ -137,7 +144,8 @@ int send_oncomplete(http_parser *parser, int code) return 0; } -void handleRequest(requestInfo *rq) +//void handleRequest(const std::shared_ptr &rq) +void handleRequest(requestInfo* rq) { size_t len = 80 * 1024, nparsed; char buf[len]; @@ -158,7 +166,7 @@ void handleRequest(requestInfo *rq) settings.on_message_complete = [](http_parser *parser) { //std::cout << "mc" << std::endl; send_oncomplete(parser, 1); - //delete (requestInfo *) parser->data; + delete (requestInfo *)parser->data; //std::cout << "deleted rq!" << std::endl; return 0; };