Attempting to diagnose NPE

This commit is contained in:
Tobe O 2018-07-08 18:59:51 -04:00
parent 6a16a057fc
commit 35e019dbef
13 changed files with 230 additions and 60 deletions

4
.gitignore vendored
View file

@ -19,3 +19,7 @@ doc/api/
*.js_ *.js_
*.js.deps *.js.deps
*.js.map *.js.map
*.o
*.dylib
*.a

46
Makefile Normal file
View file

@ -0,0 +1,46 @@
CXX=clang
HTTP_PARSER=.dart_tool/build_native/third_party/angel_wings.http_parser
CXX_INCLUDES=-I$(HTTP_PARSER) -I$(DART_SDK)/include
.PHONY: clean debug
clean:
find lib -name "*.a" -delete
find lib -name "*.o" -delete
find lib -name "*.dylib" -delete
debug:
$(MAKE) lib/src/libwings.dylib CXXFLAGS="-g -DDEBUG=1"
example: debug
lldb -o "target create dart" \
-o "process launch --stop-at-entry example/main.dart" \
-o "process handle SIGINT -p true" \
-o "continue" \
lib/src/bind_socket.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/bind_socket.o lib/src/bind_socket.cc
lib/src/http_listener.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/http_listener.o lib/src/http_listener.cc
lib/src/http_parser.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -c -o lib/src/http_parser.o $(HTTP_PARSER)/http_parser.c
lib/src/send.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/send.o lib/src/send.cc
lib/src/util.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/util.o lib/src/util.cc
lib/src/wings.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/wings.o lib/src/wings.cc
lib/src/worker_thread.o:
$(CXX) $(CXXFLAGS) $(CXX_INCLUDES) -std=c++11 -c -o lib/src/worker_thread.o lib/src/worker_thread.cc
lib/src/libwings.dylib: lib/src/bind_socket.o lib/src/http_listener.o lib/src/http_parser.o lib/src/send.o lib/src/util.o lib/src/wings.o lib/src/worker_thread.o
$(CXX) $(CXXFLAGS) -shared -o lib/src/libwings.dylib -undefined dynamic_lookup -DDART_SHARED_LIB -Wl -fPIC -m64 \
lib/src/bind_socket.o lib/src/http_listener.o \
lib/src/http_parser.o lib/src/send.o lib/src/util.o \
lib/src/wings.o lib/src/worker_thread.o

View file

@ -5,6 +5,7 @@ import 'package:angel_framework/angel_framework.dart';
import 'package:angel_wings/angel_wings.dart'; import 'package:angel_wings/angel_wings.dart';
main() async { main() async {
if (false)
for (int i = 1; i < Platform.numberOfProcessors; i++) { for (int i = 1; i < Platform.numberOfProcessors; i++) {
var onError = new ReceivePort(); var onError = new ReceivePort();
Isolate.spawn(isolateMain, i, onError: onError.sendPort); Isolate.spawn(isolateMain, i, onError: onError.sendPort);
@ -17,9 +18,16 @@ main() async {
void isolateMain(int id) { void isolateMain(int id) {
var app = new Angel(); var app = new Angel();
var wings = new AngelWings(app, shared: true); var wings = new AngelWings(app, shared: id > 0, useZone: false);
app.get('/', 'Hello, native world!'); var old = app.errorHandler;
app.errorHandler = (e, req, res) {
print(e);
print(e.stackTrace);
return old(e, req, res);
};
app.get('/', 'Hello, native world! This is isolate #$id.');
wings.startServer('127.0.0.1', 3000).then((_) { wings.startServer('127.0.0.1', 3000).then((_) {
print( print(

View file

@ -10,6 +10,7 @@ import 'package:angel_framework/angel_framework.dart';
import 'package:body_parser/body_parser.dart'; import 'package:body_parser/body_parser.dart';
import 'package:combinator/combinator.dart'; import 'package:combinator/combinator.dart';
import 'package:http_parser/http_parser.dart'; import 'package:http_parser/http_parser.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:mock_request/mock_request.dart'; import 'package:mock_request/mock_request.dart';
import 'package:pool/pool.dart'; import 'package:pool/pool.dart';
import 'package:pooled_map/pooled_map.dart'; import 'package:pooled_map/pooled_map.dart';

View file

@ -34,7 +34,11 @@ void wings_BindSocket(Dart_NativeArguments arguments)
std::string addressStringInstance(addressString); std::string addressStringInstance(addressString);
std::lock_guard<std::mutex> lock(serverInfoVectorMutex); std::lock_guard<std::mutex> lock(serverInfoVectorMutex);
#if __APPLE__
if (false)
#else
if (shared) if (shared)
#endif
{ {
for (unsigned long i = 0; i < serverInfoVector.size(); i++) for (unsigned long i = 0; i < serverInfoVector.size(); i++)
{ {
@ -42,7 +46,7 @@ void wings_BindSocket(Dart_NativeArguments arguments)
if (server_info->addressString == addressStringInstance && server_info->port == port) if (server_info->addressString == addressStringInstance && server_info->port == port)
{ {
existingIndex = (long) i; existingIndex = (long)i;
break; break;
} }
} }
@ -113,7 +117,7 @@ void wings_BindSocket(Dart_NativeArguments arguments)
return; return;
} }
/* #if __APPLE__
ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i)); ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
if (ret < 0) if (ret < 0)
@ -121,7 +125,7 @@ void wings_BindSocket(Dart_NativeArguments arguments)
Dart_ThrowException(Dart_NewStringFromCString("Cannot reuse port for socket.")); Dart_ThrowException(Dart_NewStringFromCString("Cannot reuse port for socket."));
return; return;
} }
*/ #endif
if (addressLength > 4) if (addressLength > 4)
{ {

27
lib/src/dart_debug.h Normal file
View file

@ -0,0 +1,27 @@
#ifdef __cplusplus
#include <cstdio>
#else
#include <stdio.h>
#endif
#include <dart_api.h>
Dart_Handle ToCString(Dart_Handle obj, const char** out) {
Dart_Handle toStringMethod = Dart_NewStringFromCString("toString");
Dart_Handle string = Dart_Invoke(obj, toStringMethod, 0, nullptr);
return Dart_StringToCString(string, out);
}
Dart_Handle Dart_PrintToFile(Dart_Handle obj, FILE* stream) {
const char *toString;
Dart_Handle result = ToCString(obj, &toString);
if (Dart_IsError(result))
return result;
fprintf(stream, "%s\n", toString);
return Dart_Null();
}
Dart_Handle Dart_Print(Dart_Handle obj) {
return Dart_PrintToFile(obj, stdout);
}

View file

@ -1,5 +1,6 @@
#include <dart_native_api.h> #include <dart_native_api.h>
#include <thread> #include <thread>
#include <cstdio>
#include "wings.h" #include "wings.h"
#include "wings_thread.h" #include "wings_thread.h"
@ -47,4 +48,22 @@ void handleMessage(Dart_Port destPortId, Dart_CObject *message)
Dart_Port port = message->value.as_array.values[1]->value.as_send_port.id; Dart_Port port = message->value.as_array.values[1]->value.as_send_port.id;
Dart_CloseNativePort(port); Dart_CloseNativePort(port);
} }
else
{
// This is either a send or close message.
int sockfd = (int)get_int(message->value.as_array.values[0]);
printf("FD: %d\n", sockfd);
if (message->value.as_array.length == 2)
{
auto *msg = message->value.as_array.values[1];
printf("Length: %ld\n", msg->value.as_typed_data.length);
write(sockfd, msg->value.as_typed_data.values, (size_t)msg->value.as_typed_data.length);
}
else
{
printf("Close!\n");
close(sockfd);
}
}
} }

View file

@ -1,3 +1,6 @@
#ifdef DEBUG
#include "dart_debug.h"
#endif
#include "wings.h" #include "wings.h"
void wings_CloseSocket(Dart_NativeArguments arguments) void wings_CloseSocket(Dart_NativeArguments arguments)
@ -16,7 +19,12 @@ void wings_Send(Dart_NativeArguments arguments)
Dart_TypedData_Type dataType; Dart_TypedData_Type dataType;
void *dataBytes; void *dataBytes;
intptr_t dataLength; intptr_t dataLength;
#ifdef DEBUG
HandleError(Dart_Print(sockfdHandle));
#endif
HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd)); HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd));
HandleError(Dart_TypedDataAcquireData(dataHandle, &dataType, &dataBytes, &dataLength)); HandleError(Dart_TypedDataAcquireData(dataHandle, &dataType, &dataBytes, &dataLength));
write((int)sockfd, dataBytes, (size_t)dataLength); write((int)sockfd, dataBytes, (size_t)dataLength);
HandleError(Dart_TypedDataReleaseData(dataHandle));
} }

View file

@ -1,5 +1,5 @@
#include <cstdlib> #include <cstdlib>
#include <iostream> //#include <iostream>
#include <string.h> #include <string.h>
#include <dart_api.h> #include <dart_api.h>
#include "wings.h" #include "wings.h"
@ -25,7 +25,13 @@ DART_EXPORT Dart_Handle wings_Init(Dart_Handle parent_library)
Dart_Handle HandleError(Dart_Handle handle) Dart_Handle HandleError(Dart_Handle handle)
{ {
if (Dart_IsError(handle)) if (Dart_IsError(handle))
{
#ifdef DEBUG
Dart_DumpNativeStackTrace(NULL);
#endif
Dart_PropagateError(handle); Dart_PropagateError(handle);
}
return handle; return handle;
} }

View file

@ -76,8 +76,9 @@ class AngelWings {
final RawReceivePort _recv = new RawReceivePort(); final RawReceivePort _recv = new RawReceivePort();
final Map<String, MockHttpSession> _sessions = {}; final Map<String, MockHttpSession> _sessions = {};
final PooledMap<int, WingsRequestContext> _staging = final Map<int, WingsRequestContext> _staging = <int, WingsRequestContext>{};
new PooledMap<int, WingsRequestContext>(); //final PooledMap<int, WingsRequestContext> _staging =
// new PooledMap<int, WingsRequestContext>();
final Uuid _uuid = new Uuid(); final Uuid _uuid = new Uuid();
InternetAddress _address; InternetAddress _address;
int _port; int _port;
@ -88,6 +89,32 @@ class AngelWings {
static SendPort _startHttpListener() native "StartHttpListener"; static SendPort _startHttpListener() native "StartHttpListener";
final Pool _pool = new Pool(1);
static void __send(int sockfd, Uint8List data) native "Send";
static void __closeSocket(int sockfd) native "CloseSocket";
void _send(int sockfd, Uint8List data) {
// _pool.withResource(() {
print('Sending ${[sockfd, data]}');
_sendPort.send([sockfd, data]);
//});
//_pool.withResource(() => __send(sockfd, data));
}
void _closeSocket(WingsRequestContext req) {
//_pool.withResource(() {
if (!req._closed) {
req._closed = true;
var sockfd = req._sockfd;
print('Sending ${[sockfd]}');
_sendPort.send([sockfd]);
}
//});
//_pool.withResource(() => __closeSocket(sockfd));
}
AngelWings(this.app, {this.shared: false, this.useZone: true}) { AngelWings(this.app, {this.shared: false, this.useZone: true}) {
_recv.handler = _handleMessage; _recv.handler = _handleMessage;
} }
@ -137,34 +164,37 @@ class AngelWings {
} }
void _handleMessage(x) { void _handleMessage(x) {
print('INPUT: $x');
if (x is String) { if (x is String) {
close(); close();
throw new StateError(x); throw new StateError(x);
} else if (x is List && x.length >= 2) { } else if (x is List && x.length >= 2) {
int sockfd = x[0], command = x[1]; int sockfd = x[0], command = x[1];
WingsRequestContext _newRequest() => //WingsRequestContext _newRequest() =>
new WingsRequestContext._(this, sockfd, app); // new WingsRequestContext._(this, sockfd, app);
//print(x); //print(x);
switch (command) { switch (command) {
case messageBegin: case messageBegin:
_staging.putIfAbsent(sockfd, _newRequest); print('BEGIN $sockfd');
_staging[sockfd] = new WingsRequestContext._(this, sockfd, app);
break; break;
case messageComplete: case messageComplete:
// (sockfd, method, major, minor, addrBytes) print('$sockfd in $_staging???');
_staging.update(sockfd, (rq) { var rq = _staging.remove(sockfd);
if (rq != null) {
rq._method = methodToString(x[2] as int); rq._method = methodToString(x[2] as int);
rq._addressBytes = x[5] as Uint8List; rq._addressBytes = x[5] as Uint8List;
return rq; _handleRequest(rq);
}, defaultValue: _newRequest).then(_handleRequest); }
break; break;
case body: case body:
_staging.update(sockfd, (rq) { var rq = _staging[sockfd];
if (rq != null) {
(rq._body ??= new StreamController<Uint8List>()) (rq._body ??= new StreamController<Uint8List>())
.add(x[2] as Uint8List); .add(x[2] as Uint8List);
return rq; }
}, defaultValue: _newRequest);
break; break;
//case upgrade: //case upgrade:
// TODO: Handle WebSockets...? // TODO: Handle WebSockets...?
@ -175,27 +205,26 @@ class AngelWings {
// onUpgradedMessage(sockfd, x[2]); // onUpgradedMessage(sockfd, x[2]);
// break; // break;
case url: case url:
_staging.update(sockfd, (rq) => rq..__url = x[2] as String, _staging[sockfd]?.__url = x[2] as String;
defaultValue: _newRequest);
break; break;
case headerField: case headerField:
_staging.update(sockfd, (rq) => rq.._headerField = x[2] as String, _staging[sockfd]?._headerField = x[2] as String;
defaultValue: _newRequest);
break; break;
case headerValue: case headerValue:
_staging.update(sockfd, (rq) => rq.._headerValue = x[2] as String, _staging[sockfd]?._headerValue = x[2] as String;
defaultValue: _newRequest);
break; break;
} }
} }
} }
Future _handleRequest(WingsRequestContext req) { Future _handleRequest(WingsRequestContext req) {
print('req: $req');
if (req == null) return new Future.value(); if (req == null) return new Future.value();
var res = new WingsResponseContext._(req) var res = new WingsResponseContext._(req)
..app = app ..app = app
..serializer = app.serializer ..serializer = (app.serializer ?? god.serialize)
..encoders.addAll(app.encoders); ..encoders.addAll(app.encoders);
print('Handling fd: ${req._sockfd}');
handle() { handle() {
var path = req.path; var path = req.path;
@ -228,6 +257,7 @@ class AngelWings {
Future<bool> Function() runPipeline; Future<bool> Function() runPipeline;
print('Pipeline: $pipeline');
for (var handler in pipeline) { for (var handler in pipeline) {
if (handler == null) break; if (handler == null) break;
@ -286,7 +316,7 @@ class AngelWings {
return handleAngelHttpException(e, trace, req, res); return handleAngelHttpException(e, trace, req, res);
}).catchError((e, StackTrace st) { }).catchError((e, StackTrace st) {
var trace = new Trace.from(st ?? StackTrace.current).terse; var trace = new Trace.from(st ?? StackTrace.current).terse;
WingsResponseContext._closeSocket(req._sockfd); _closeSocket(req);
// Ideally, we won't be in a position where an absolutely fatal error occurs, // Ideally, we won't be in a position where an absolutely fatal error occurs,
// but if so, we'll need to log it. // but if so, we'll need to log it.
if (app.logger != null) { if (app.logger != null) {
@ -323,9 +353,8 @@ class AngelWings {
b.writeln('HTTP/1.1 500 Internal Server Error'); b.writeln('HTTP/1.1 500 Internal Server Error');
b.writeln(); b.writeln();
WingsResponseContext._send( _send(req._sockfd, _coerceUint8List(b.toString().codeUnits));
req._sockfd, _coerceUint8List(b.toString().codeUnits)); _closeSocket(req);
WingsResponseContext._closeSocket(req._sockfd);
} finally { } finally {
return null; return null;
} }
@ -350,7 +379,9 @@ class AngelWings {
/// Sends a response. /// Sends a response.
Future sendResponse(WingsRequestContext req, WingsResponseContext res, Future sendResponse(WingsRequestContext req, WingsResponseContext res,
{bool ignoreFinalizers: false}) { {bool ignoreFinalizers: false}) {
print('Closing: ${req._sockfd}');
if (res.willCloseItself) return new Future.value(); if (res.willCloseItself) return new Future.value();
print('Not self-closing: ${req._sockfd}');
Future finalizers = ignoreFinalizers == true Future finalizers = ignoreFinalizers == true
? new Future.value() ? new Future.value()
@ -368,6 +399,7 @@ class AngelWings {
//request.response.headers.chunkedTransferEncoding = res.chunked ?? true; //request.response.headers.chunkedTransferEncoding = res.chunked ?? true;
// TODO: Is there a need to support this? // TODO: Is there a need to support this?
print('Buffer: ${res.buffer}');
List<int> outputBuffer = res.buffer.toBytes(); List<int> outputBuffer = res.buffer.toBytes();
if (res.encoders.isNotEmpty) { if (res.encoders.isNotEmpty) {
@ -405,6 +437,7 @@ class AngelWings {
} }
} }
print('Create string buffer');
var b = new StringBuffer(); var b = new StringBuffer();
b.writeln('HTTP/1.1 ${res.statusCode}'); b.writeln('HTTP/1.1 ${res.statusCode}');
@ -425,13 +458,20 @@ class AngelWings {
} }
b.writeln(); b.writeln();
print(b);
var buf = new Uint8List.fromList( var bb = new BytesBuilder(copy: false)
new List<int>.from(b.toString().codeUnits)..addAll(outputBuffer)); ..add(b.toString().codeUnits)
..add(outputBuffer);
var buf = _coerceUint8List(bb.takeBytes());
print('Output: $buf');
return finalizers.then((_) { return finalizers.then((_) {
WingsResponseContext._send(req._sockfd, buf); print('A');
WingsResponseContext._closeSocket(req._sockfd); _send(req._sockfd, buf);
print('B');
_closeSocket(req);
print('C');
if (req.injections.containsKey(PoolResource)) { if (req.injections.containsKey(PoolResource)) {
req.injections[PoolResource].release(); req.injections[PoolResource].release();

View file

@ -3,6 +3,7 @@ part of angel_wings;
class WingsRequestContext extends RequestContext { class WingsRequestContext extends RequestContext {
final AngelWings _wings; final AngelWings _wings;
final int _sockfd; final int _sockfd;
bool _closed = false;
@override @override
Angel app; Angel app;
@ -14,7 +15,7 @@ class WingsRequestContext extends RequestContext {
final Map<String, String> _headers = {}; final Map<String, String> _headers = {};
String __contentTypeString; String __contentTypeString;
String __path; //String __path;
String __url; String __url;
Uint8List _addressBytes; Uint8List _addressBytes;
@ -86,9 +87,9 @@ class WingsRequestContext extends RequestContext {
@override @override
String get path { String get path {
if (_path != null) { if (_path != null) {
return __path; return _path;
} else { } else {
var path = __path.replaceAll(_straySlashes, ''); var path = __url?.replaceAll(_straySlashes, '') ?? '';
if (path.isEmpty) path = '/'; if (path.isEmpty) path = '/';
return _path = path; return _path = path;
} }

View file

@ -6,23 +6,21 @@ class WingsResponseContext extends ResponseContext {
WingsResponseContext._(this.correspondingRequest); WingsResponseContext._(this.correspondingRequest);
static void _send(int sockfd, Uint8List data) native "Send"; AngelWings get _wings => correspondingRequest._wings;
static void _closeSocket(int sockfd) native "CloseSocket";
@override @override
void add(List<int> data) { void add(List<int> data) {
if (_isClosed && !_useStream) if (_isClosed && !_useStream)
throw ResponseContext.closed(); throw ResponseContext.closed();
else if (_useStream) else if (_useStream)
_send(correspondingRequest._sockfd, _coerceUint8List(data)); _wings._send(correspondingRequest._sockfd, _coerceUint8List(data));
else else
buffer.add(data); buffer.add(data);
} }
@override @override
Future close() { Future close() {
_closeSocket(correspondingRequest._sockfd); _wings._closeSocket(correspondingRequest);
_isClosed = true; _isClosed = true;
_useStream = false; _useStream = false;
return super.close(); return super.close();
@ -79,7 +77,7 @@ class WingsResponseContext extends ResponseContext {
} }
return output.forEach(((data) => return output.forEach(((data) =>
_send(correspondingRequest._sockfd, _coerceUint8List(data)))); _wings._send(correspondingRequest._sockfd, _coerceUint8List(data))));
} }
@override @override
@ -149,7 +147,7 @@ class WingsResponseContext extends ResponseContext {
b.writeln(); b.writeln();
_send( _wings._send(
correspondingRequest._sockfd, _coerceUint8List(b.toString().codeUnits)); correspondingRequest._sockfd, _coerceUint8List(b.toString().codeUnits));
} }
} }

View file

@ -1,3 +1,4 @@
//#include <memory>
#include <utility> #include <utility>
#include <dart_native_api.h> #include <dart_native_api.h>
#include "wings_thread.h" #include "wings_thread.h"
@ -10,25 +11,31 @@ void wingsThreadMain(wings_thread_info *info)
while (true) while (true)
{ {
std::lock_guard<std::mutex> lock(serverInfo->mutex); std::unique_lock<std::mutex> lock(serverInfo->mutex, std::defer_lock);
sockaddr client_addr{}; sockaddr client_addr{};
socklen_t client_addr_len; socklen_t client_addr_len;
int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len);
if (client < 0) if (lock.try_lock())
{ {
// send_error(info->port, "Failed to accept client socket."); int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len);
return; lock.unlock();
}
requestInfo rq{}; if (client < 0)
rq.ipv6 = serverInfo->ipv6; {
rq.sock = client; // send_error(info->port, "Failed to accept client socket.");
rq.addr = client_addr; return;
rq.addr_len = client_addr_len; }
rq.port = port;
handleRequest(&rq); //auto rq = std::make_shared<requestInfo>();
auto *rq = new requestInfo;
rq->ipv6 = serverInfo->ipv6;
rq->sock = client;
rq->addr = client_addr;
rq->addr_len = client_addr_len;
rq->port = port;
handleRequest(rq);
}
} }
} }
@ -137,7 +144,8 @@ int send_oncomplete(http_parser *parser, int code)
return 0; return 0;
} }
void handleRequest(requestInfo *rq) //void handleRequest(const std::shared_ptr<requestInfo> &rq)
void handleRequest(requestInfo* rq)
{ {
size_t len = 80 * 1024, nparsed; size_t len = 80 * 1024, nparsed;
char buf[len]; char buf[len];
@ -158,7 +166,7 @@ void handleRequest(requestInfo *rq)
settings.on_message_complete = [](http_parser *parser) { settings.on_message_complete = [](http_parser *parser) {
//std::cout << "mc" << std::endl; //std::cout << "mc" << std::endl;
send_oncomplete(parser, 1); send_oncomplete(parser, 1);
//delete (requestInfo *) parser->data; delete (requestInfo *)parser->data;
//std::cout << "deleted rq!" << std::endl; //std::cout << "deleted rq!" << std::endl;
return 0; return 0;
}; };