Sudden crashes, yay!

This commit is contained in:
Tobe O 2019-05-01 00:29:21 -04:00
parent 8a1b95c352
commit e0ef7ba79a
17 changed files with 391 additions and 87 deletions

View file

@ -16,12 +16,9 @@ clean:
find . -type f -name '*.dylib' -delete
find . -type f -name '*.dill' -delete
%-run: % example/main.dart
%-run: % example/main.dill
dart example/main.dill
example/main.dill: ./**/*.dart
dart --snapshot="$@" example/main.dart
mac: libangel_wings.dylib
linux: lib/src/libangel_wings.so
@ -42,3 +39,6 @@ lib/src/libangel_wings.dylib: $(objects)
%.o: %.cc lib/src/angel_wings.h %.h
$(CXX) $(CXXFLAGS) -c -o $@ $<
%.dill: %.dart
dart --snapshot="$@" $<

23
benchmark/empty.dart Normal file
View file

@ -0,0 +1,23 @@
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'util.dart';
const AngelBenchmark emptyBenchmark = _EmptyBenchmark();
main() => runBenchmarks([emptyBenchmark]);
class _EmptyBenchmark implements AngelBenchmark {
const _EmptyBenchmark();
@override
String get name => 'empty';
@override
FutureOr<void> rawHandler(HttpRequest req, HttpResponse res) {
return res.close();
}
@override
void setupAngel(Angel app) {}
}

110
benchmark/util.dart Normal file
View file

@ -0,0 +1,110 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/http.dart';
import 'package:angel_wings/angel_wings.dart';
import 'package:io/ansi.dart';
import 'package:pedantic/pedantic.dart';
import 'package:tuple/tuple.dart';
Future<Process> _runWrk(
{ProcessStartMode mode = ProcessStartMode.inheritStdio}) async {
return await Process.start('wrk', ['http://localhost:$testPort'], mode: mode);
}
Future<void> _warmUp() async {
var wrk = await _runWrk();
await wrk.exitCode;
// await wrk.stderr.drain();
// await wrk.stdout.drain();
}
Future _10s() => Future.delayed(Duration(seconds: 10));
const testPort = 8877;
Future<void> runBenchmarks(Iterable<AngelBenchmark> benchmarks,
{Iterable<String> factories = const [
// 'angel_http',
'angel_wings',
]}) async {
for (var benchmark in benchmarks) {
print(magenta.wrap('Entering benchmark: ${benchmark.name}'));
// // Run dart:io
// print(lightGray.wrap('Booting dart:io server (waiting 10s)...'));
// var isolates = <Isolate>[];
// for (int i = 0; i < Platform.numberOfProcessors; i++) {
// isolates.add(await Isolate.spawn(_httpIsolate, benchmark));
// }
// await _10s();
// print(lightGray.wrap('Warming up dart:io server...'));
// await _warmUp();
// stdout
// ..write(lightGray.wrap('Now running `wrk` for '))
// ..write(cyan.wrap(benchmark.name))
// ..writeln(lightGray.wrap(' (waiting 10s)...'));
// var wrk = await _runWrk(mode: ProcessStartMode.inheritStdio);
// await wrk.exitCode;
// isolates.forEach((i) => i.kill(priority: Isolate.immediate));
// Run Angel HTTP, Wings
for (var fac in factories) {
print(lightGray.wrap('Booting $fac server...'));
var isolates = <Isolate>[];
for (int i = 0; i < Platform.numberOfProcessors; i++) {
isolates
.add(await Isolate.spawn(_angelIsolate, Tuple2(benchmark, fac)));
}
await _10s();
print(lightGray.wrap('Warming up $fac server...'));
await _warmUp();
stdout
..write(lightGray.wrap('Now running `wrk` for '))
..write(cyan.wrap(benchmark.name))
..writeln(lightGray.wrap('...'));
var wrk = await _runWrk(mode: ProcessStartMode.inheritStdio);
await wrk.exitCode;
}
}
exit(0);
}
void _httpIsolate(AngelBenchmark benchmark) {
Future(() async {
var raw = await HttpServer.bind(InternetAddress.loopbackIPv4, testPort,
shared: true);
raw.listen((r) => benchmark.rawHandler(r, r.response));
});
}
void _angelIsolate(Tuple2<AngelBenchmark, String> args) {
Future(() async {
var app = Angel();
Driver driver;
if (args.item2 == 'angel_http')
driver = AngelHttp.custom(app, startShared);
else if (args.item2 == 'angel_wings')
driver = AngelWings.custom(app, startSharedWings);
await app.configure(args.item1.setupAngel);
await driver.startServer(InternetAddress.loopbackIPv4, testPort);
});
}
abstract class AngelBenchmark {
const AngelBenchmark();
String get name;
FutureOr<void> setupAngel(Angel app);
FutureOr<void> rawHandler(HttpRequest req, HttpResponse res);
}

View file

@ -1,3 +1,4 @@
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_static/angel_static.dart';
import 'package:angel_wings/angel_wings.dart';
@ -16,9 +17,13 @@ main() async {
app.mimeTypeResolver.addExtension('yaml', 'text/x-yaml');
app.get('/', (req, res) => 'WINGS!!!');
app.post('/', (req, res) async {
await req.parseBody();
return req.bodyAsMap;
});
app.fallback(vDir.handleRequest);
app.fallback((req, res) => throw AngelHttpException.notFound());
await wings.startServer('127.0.0.1', 3000);
await wings.startServer(InternetAddress.loopbackIPv6, 3000);
print('Listening at ${wings.uri}');
}

Binary file not shown.

22
example/shared.dart Normal file
View file

@ -0,0 +1,22 @@
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_wings/angel_wings.dart';
main() async {
var app = Angel();
var wings1 = AngelWings.custom(app, startSharedWings);
var wings2 = AngelWings.custom(app, startSharedWings);
var wings3 = AngelWings.custom(app, startSharedWings);
var wings4 = AngelWings.custom(app, startSharedWings);
await wings1.startServer('127.0.0.1', 3000);
await wings2.startServer('127.0.0.1', 3000);
await wings3.startServer('127.0.0.1', 3000);
await wings4.startServer('127.0.0.1', 3000);
print(wings1.uri);
print(wings2.uri);
print(wings3.uri);
print(wings4.uri);
await wings1.close();
await wings2.close();
await wings3.close();
await wings4.close();
}

View file

@ -1,3 +1,4 @@
#include <iostream>
#include <vector>
#include "angel_wings.h"
#include "wings_socket.h"
@ -32,7 +33,7 @@ void Dart_WingsSocket_bindIPv6(Dart_NativeArguments arguments)
void wingsReturnBound(Dart_NativeArguments arguments, WingsSocket *socket)
{
Dart_Port sendPort;
HandleError(Dart_SendPortGetId(socket->getInfo().sendPortHandle, &sendPort));
HandleError(Dart_SendPortGetId(Dart_GetNativeArgument(arguments, 5), &sendPort));
socket->incrRef(sendPort);
auto ptr = (uint64_t)socket;
Dart_Handle ptrHandle = Dart_NewIntegerFromUint64(ptr);
@ -44,9 +45,11 @@ WingsSocket *wingsFindSocket(Dart_NativeArguments arguments, const WingsSocketIn
// Find an existing server, if any.
if (info.shared)
{
// std::cout << info.address << std::endl;
// std::cout << globalSocketList.size() << std::endl;
for (auto *socket : globalSocketList)
{
if (socket->getInfo() == info)
if (info.equals(socket->getInfo()))
{
return socket;
}

View file

@ -12,26 +12,45 @@ void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments)
Dart_SetReturnValue(arguments, send_port);
}
struct wingsHttp
{
Dart_Port port;
std::string lastHeader;
};
void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message)
{
int64_t fd = -1;
Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id;
Dart_CObject *fdArg = message->value.as_array.values[1];
#define thePort (*((Dart_Port *)parser->data))
wingsHttp httpData = {outPort};
#define theStruct (*((wingsHttp *)parser->data))
#define thePort theStruct.port
#define sendInt(n) \
{ \
Dart_CObject obj; \
obj.type = Dart_CObject_kInt64; \
obj.value.as_int64 = (n); \
Dart_PostCObject(thePort, &obj);
#define sendString() \
Dart_PostCObject(thePort, &obj); \
}
#define sendString(n) \
if (length > 0) \
{ \
Dart_CObject typeObj; \
typeObj.type = Dart_CObject_kInt32; \
typeObj.value.as_int32 = (n); \
std::string str(at, length); \
Dart_CObject obj; \
obj.type = Dart_CObject_kString; \
obj.value.as_string = (char *)str.c_str(); \
Dart_PostCObject(thePort, &obj); \
Dart_CObject strObj; \
strObj.type = Dart_CObject_kString; \
strObj.value.as_string = (char *)str.c_str(); \
Dart_CObject *values[2] = {&typeObj, &strObj}; \
Dart_CObject out; \
out.type = Dart_CObject_kArray; \
out.value.as_array.length = 2; \
out.value.as_array.values = values; \
Dart_PostCObject(thePort, &out); \
}
if (fdArg->type == Dart_CObject_kInt32)
@ -52,17 +71,24 @@ void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message)
};
settings.on_headers_complete = [](http_parser *parser) {
{
sendInt(0);
}
{
sendInt(parser->method);
}
Dart_CObject type;
type.type = Dart_CObject_kInt32;
type.value.as_int32 = 2;
Dart_CObject value;
value.type = Dart_CObject_kInt32;
value.value.as_int32 = parser->method;
Dart_CObject *values[2] = {&type, &value};
Dart_CObject out;
out.type = Dart_CObject_kArray;
out.value.as_array.length = 2;
out.value.as_array.values = values;
Dart_PostCObject(thePort, &out);
sendInt(100);
return 0;
};
settings.on_message_complete = [](http_parser *parser) {
sendInt(1);
sendInt(200);
return 0;
};
@ -75,22 +101,46 @@ void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message)
};
settings.on_url = [](http_parser *parser, const char *at, size_t length) {
sendString();
sendString(0);
return 0;
};
settings.on_header_field = [](http_parser *parser, const char *at, size_t length) {
sendString();
theStruct.lastHeader = std::string(at, length);
return 0;
};
settings.on_header_value = [](http_parser *parser, const char *at, size_t length) {
sendString();
if (!theStruct.lastHeader.empty())
{
std::string vStr(at, length);
Dart_CObject type;
type.type = Dart_CObject_kInt32;
type.value.as_int32 = 1;
Dart_CObject name;
name.type = Dart_CObject_kString;
name.value.as_string = (char *)theStruct.lastHeader.c_str();
Dart_CObject value;
value.type = Dart_CObject_kString;
value.value.as_string = (char *)vStr.c_str();
Dart_CObject *values[3] = {&type, &name, &value};
Dart_CObject out;
out.type = Dart_CObject_kArray;
out.value.as_array.length = 3;
out.value.as_array.values = values;
Dart_PostCObject(thePort, &out);
theStruct.lastHeader.clear();
}
return 0;
};
settings.on_body = [](http_parser *parser, const char *at, size_t length) {
sendString();
Dart_CObject obj;
obj.type = Dart_CObject_kTypedData;
obj.value.as_typed_data.type = Dart_TypedData_kUint8;
obj.value.as_typed_data.length = length;
obj.value.as_typed_data.values = (uint8_t *)at;
Dart_PostCObject(thePort, &obj);
return 0;
};
@ -101,7 +151,7 @@ void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message)
// http_parser parser;
auto *parser = (http_parser *)malloc(sizeof(http_parser));
http_parser_init(parser, HTTP_BOTH);
parser->data = &outPort;
parser->data = &httpData;
while ((recved = recv(fd, buf, len, 0)) >= 0)
{

Binary file not shown.

View file

@ -159,5 +159,13 @@ void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments)
void Dart_WingsSocket_close(Dart_NativeArguments arguments)
{
// TODO: Actually do something.
Dart_Port port;
uint64_t ptr;
Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0);
Dart_Handle sendPortHandle = Dart_GetNativeArgument(arguments, 1);
HandleError(Dart_IntegerToUint64(pointerHandle, &ptr));
HandleError(Dart_SendPortGetId(sendPortHandle, &port));
auto *socket = (WingsSocket *)ptr;
socket->decrRef(port);
}

View file

@ -7,13 +7,17 @@ import 'wings_request.dart';
import 'wings_response.dart';
import 'wings_socket.dart';
Future<WingsSocket> startSharedWings(dynamic addr, int port) {
return WingsSocket.bind(addr, port, shared: true);
}
class AngelWings extends Driver<WingsClientSocket, int, WingsSocket,
WingsRequestContext, WingsResponseContext> {
factory AngelWings(Angel app) {
return AngelWings._(app, WingsSocket.bind);
return AngelWings.custom(app, WingsSocket.bind);
}
AngelWings._(
AngelWings.custom(
Angel app, Future<WingsSocket> Function(dynamic, int) serverGenerator)
: super(app, serverGenerator);
@ -24,6 +28,12 @@ class AngelWings extends Driver<WingsClientSocket, int, WingsSocket,
}
}
@override
Future<WingsSocket> close() async {
await server?.close();
return super.close();
}
@override
Future closeResponse(int response) {
closeNativeSocketDescriptor(response);

View file

@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import 'package:angel_container/angel_container.dart';
import 'package:angel_framework/angel_framework.dart';
import 'package:mock_request/mock_request.dart';
@ -94,52 +95,86 @@ class WingsRequestContext extends RequestContext<WingsClientSocket> {
}
static Future<WingsRequestContext> from(Angel app, WingsClientSocket socket) {
var state = _ParseState.url;
// var state = _ParseState.url;
var c = Completer<WingsRequestContext>();
var recv = RawReceivePort();
var rq = WingsRequestContext._(app, socket, recv);
rq._remoteAddress = socket.remoteAddress;
String lastHeader;
recv.handler = (e) {
if (state == _ParseState.url) {
rq._uri = Uri.parse(e as String);
var ct = StreamController();
recv.handler = ct.add;
recv.handler = (ee) {
if (ee is Uint8List) {
if (!rq._body.isClosed) rq._body.add(ee);
} else if (ee is List) {
var type = ee[0] as int;
if (type == 2) {
rq._method = methodToString(ee[1] as int);
} else {
var value = ee[1] as String;
if (type == 0) {
rq._uri = Uri.parse(value);
var path = rq._uri.path.replaceAll(_straySlashes, '');
if (path.isEmpty) path = '/';
rq._path = path;
state = _ParseState.headerField;
} else if (state == _ParseState.headerField) {
if (e == 0) {
state = _ParseState.method;
} else if (type == 1) {
var k = value, v = ee[2] as String;
if (k == 'cookie') {
rq.__cookies.add(Cookie.fromSetCookieValue(v));
} else {
lastHeader = e as String; //Uri.decodeFull(e as String);
state = _ParseState.headerValue;
rq._headers.add(k, v);
}
} else if (state == _ParseState.headerValue) {
if (e == 0) {
state = _ParseState.method;
} else {
var value = e as String; //Uri.decodeFull(e as String);
if (lastHeader != null) {
if (lastHeader == 'cookie') {
rq.__cookies.add(Cookie.fromSetCookieValue(value));
} else {
rq._headers.add(lastHeader, value);
}
lastHeader = null;
// print("h: $ee');");
}
}
state = _ParseState.headerField;
} else if (state == _ParseState.method) {
rq._method = methodToString(e as int);
state = _ParseState.body;
} else if (ee == 100) {
// Headers done, just listen for body.
c.complete(rq);
} else if (state == _ParseState.body) {
if (e == 1) {
} else if (ee == 200) {
// Message complete.
rq._body.close();
} else {
rq._body.add(e as List<int>);
}
}
// if (state == _ParseState.url) {
// rq._uri = Uri.parse(e as String);
// var path = rq._uri.path.replaceAll(_straySlashes, '');
// if (path.isEmpty) path = '/';
// rq._path = path;
// state = _ParseState.headerField;
// } else if (state == _ParseState.headerField) {
// if (e == 0) {
// state = _ParseState.method;
// } else {
// lastHeader = e as String; //Uri.decodeFull(e as String);
// state = _ParseState.headerValue;
// }
// } else if (state == _ParseState.headerValue) {
// if (e == 0) {
// state = _ParseState.method;
// } else {
// var value = e as String; //Uri.decodeFull(e as String);
// if (lastHeader != null) {
// if (lastHeader == 'cookie') {
// rq.__cookies.add(Cookie.fromSetCookieValue(value));
// } else {
// rq._headers.add(lastHeader, value);
// }
// lastHeader = null;
// }
// }
// state = _ParseState.headerField;
// } else if (state == _ParseState.method) {
// rq._method = methodToString(e as int);
// state = _ParseState.body;
// c.complete(rq);
// } else if (state == _ParseState.body) {
// if (e == 1) {
// rq._body.close();
// } else {
// rq._body.add(e as List<int>);
// }
// }
};
wingsParseHttp().send([recv.sendPort, socket.fileDescriptor]);
return c.future;

View file

@ -5,17 +5,22 @@ using namespace wings;
std::vector<WingsSocket *> wings::globalSocketList;
bool WingsSocketInfo::operator==(const WingsSocketInfo &other) const
bool WingsSocketInfo::equals(const WingsSocketInfo &right) const
{
return (strcmp(address, other.address) == 0) &&
port == other.port;
// std::cout << address << " vs " << right.address << std::endl;
// std::cout << port << " vs " << right.port << std::endl;
return (strcmp(address, right.address) == 0) &&
port == right.port;
}
WingsSocket::WingsSocket(sa_family_t family, int sockfd, const WingsSocketInfo &info)
: sockfd(sockfd), info(info), family(family)
{
index = 0;
open = true;
refCount = 0;
workerThread = nullptr;
this->info.address = strdup(info.address);
}
void WingsSocket::incrRef(Dart_Port port)
@ -24,6 +29,31 @@ void WingsSocket::incrRef(Dart_Port port)
sendPorts.push_back(port);
}
void WingsSocket::decrRef(Dart_Port port)
{
auto it = std::find(sendPorts.begin(), sendPorts.end(), port);
if (it != sendPorts.end())
{
sendPorts.erase(it);
}
refCount--;
if (refCount <= 0 && open)
{
close(sockfd);
}
}
Dart_Port WingsSocket::nextPort()
{
if (index >= sendPorts.size())
index = 0;
Dart_Port port = sendPorts.at(index++);
return port;
}
const WingsSocketInfo &WingsSocket::getInfo() const
{
return info;
@ -59,6 +89,7 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id,
Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id;
Dart_CObject *ptrArg = message->value.as_array.values[1];
// If there are no listeners, quit.
if (ptrArg->type == Dart_CObject_kInt32)
{
auto as64 = (int64_t)ptrArg->value.as_int32;
@ -71,6 +102,11 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id,
if (socket != nullptr)
{
if (socket->sendPorts.empty())
{
return;
}
int sock;
unsigned long index = 0;
sockaddr addr;
@ -106,14 +142,11 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id,
obj.value.as_array.length = 2;
obj.value.as_array.values = values;
Dart_PostCObject(outPort, &obj);
// Dart_PostCObject(outPort, &obj);
// Dispatch the fd to the next listener.
// auto &ports = socket->sendPorts;
// Dart_Port port = ports.at(index++);
// if (index >= ports.size())
// index = 0;
// Dart_Handle intHandle = Dart_NewInteger(sock);
// Dart_Post(port, intHandle);
auto port = socket->nextPort();
// Dart_PostCObject(port, &obj);
Dart_PostCObject(outPort, &obj);
}
}
}

View file

@ -18,7 +18,7 @@ int bindWingsIPv6ServerSocket(
bool shared,
int backlog,
bool v6Only,
SendPort sendPort) native 'Dart_WingsSocket_bindIPV6';
SendPort sendPort) native 'Dart_WingsSocket_bindIPv6';
String getWingsServerSocketAddress(int pointer)
native 'Dart_WingsSocket_getAddress';
@ -33,7 +33,8 @@ void closeNativeSocketDescriptor(int fd)
SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen';
void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close';
void closeWingsSocket(int pointer, SendPort sendPort)
native 'Dart_WingsSocket_close';
SendPort wingsParseHttp() native 'Dart_WingsSocket_parseHttp';
@ -120,7 +121,7 @@ class WingsSocket extends Stream<WingsClientSocket> {
Future<void> close() async {
if (_open) {
_open = false;
closeWingsSocket(_pointer);
closeWingsSocket(_pointer, _recv.sendPort);
_recv.close();
await _ctrl.close();
}

View file

@ -22,7 +22,7 @@ struct WingsSocketInfo
uint64_t backlog;
bool v6Only;
Dart_Handle sendPortHandle;
bool operator==(const WingsSocketInfo &other) const;
bool equals(const WingsSocketInfo &right) const;
};
class WingsSocket
@ -30,16 +30,20 @@ class WingsSocket
public:
WingsSocket(sa_family_t family, int sockfd, const WingsSocketInfo &info);
void incrRef(Dart_Port port);
void decrRef(Dart_Port port);
const WingsSocketInfo &getInfo() const;
void start(Dart_NativeArguments arguments);
int getFD() const;
sa_family_t getFamily() const;
Dart_Port nextPort();
private:
static void threadCallback(Dart_Port dest_port_id, Dart_CObject *message);
WingsSocketInfo info;
unsigned long index;
int sockfd;
int refCount;
bool open;
sa_family_t family;
std::unique_ptr<std::thread> workerThread;
std::vector<Dart_Port> sendPorts;

Binary file not shown.