This commit is contained in:
Tobe O 2018-07-03 19:16:29 -04:00
commit 6a16a057fc
17 changed files with 1525 additions and 0 deletions

21
.gitignore vendored Normal file
View file

@ -0,0 +1,21 @@
# See https://www.dartlang.org/guides/libraries/private-files
# Files and directories created by pub
.dart_tool/
.packages
build/
# If you're building an application, you may want to check-in your pubspec.lock
pubspec.lock
# Directory created by dartdoc
# If you don't generate documentation locally you can remove this line.
doc/api/
# Avoid committing generated Javascript files:
*.dart.js
*.info.json # Produced by the --dump-info flag.
*.js # When generated by dart2js. Don't specify *.js if your
# project includes source files written in JavaScript.
*.js_
*.js.deps
*.js.map

3
analysis_options.yaml Normal file
View file

@ -0,0 +1,3 @@
analyzer:
strong-mode:
implicit-casts: false

28
example/main.dart Normal file
View file

@ -0,0 +1,28 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_wings/angel_wings.dart';
main() async {
for (int i = 1; i < Platform.numberOfProcessors; i++) {
var onError = new ReceivePort();
Isolate.spawn(isolateMain, i, onError: onError.sendPort);
onError.listen((e) => Zone.current
.handleUncaughtError(e[0], new StackTrace.fromString(e[1].toString())));
}
isolateMain(0);
}
void isolateMain(int id) {
var app = new Angel();
var wings = new AngelWings(app, shared: true);
app.get('/', 'Hello, native world!');
wings.startServer('127.0.0.1', 3000).then((_) {
print(
'Instance #$id listening at http://${wings.address.address}:${wings.port}');
});
}

21
lib/angel_wings.dart Normal file
View file

@ -0,0 +1,21 @@
library angel_wings;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import 'dart-ext:src/wings';
import 'package:angel_framework/angel_framework.dart';
import 'package:body_parser/body_parser.dart';
import 'package:combinator/combinator.dart';
import 'package:http_parser/http_parser.dart';
import 'package:mock_request/mock_request.dart';
import 'package:pool/pool.dart';
import 'package:pooled_map/pooled_map.dart';
import 'package:stack_trace/stack_trace.dart';
import 'package:tuple/tuple.dart';
import 'package:uuid/uuid.dart';
part 'src/wings_request.dart';
part 'src/wings_response.dart';
part 'src/wings.dart';

199
lib/src/bind_socket.cc Normal file
View file

@ -0,0 +1,199 @@
#include <cstring>
#include <mutex>
#include <vector>
#include "wings.h"
std::vector<WingsServerInfo *> serverInfoVector;
std::mutex serverInfoVectorMutex;
void wings_BindSocket(Dart_NativeArguments arguments)
{
// Uint8List address, String addressString, int port, int backlog, bool shared
Dart_Handle addressHandle = Dart_GetNativeArgument(arguments, 0);
Dart_Handle addressStringHandle = Dart_GetNativeArgument(arguments, 1);
Dart_Handle portHandle = Dart_GetNativeArgument(arguments, 2);
Dart_Handle backlogHandle = Dart_GetNativeArgument(arguments, 3);
Dart_Handle sharedHandle = Dart_GetNativeArgument(arguments, 4);
Dart_TypedData_Type addressType;
void *addressData;
intptr_t addressLength;
const char *addressString;
uint64_t port, backlog;
bool shared;
// Read the arguments...
HandleError(Dart_TypedDataAcquireData(addressHandle, &addressType, &addressData, &addressLength));
HandleError(Dart_TypedDataReleaseData(addressHandle));
HandleError(Dart_StringToCString(addressStringHandle, &addressString));
HandleError(Dart_IntegerToUint64(portHandle, &port));
HandleError(Dart_IntegerToUint64(backlogHandle, &backlog));
HandleError(Dart_BooleanValue(sharedHandle, &shared));
// See if there is already a server bound to the port.
long existingIndex = -1;
std::string addressStringInstance(addressString);
std::lock_guard<std::mutex> lock(serverInfoVectorMutex);
if (shared)
{
for (unsigned long i = 0; i < serverInfoVector.size(); i++)
{
WingsServerInfo *server_info = serverInfoVector.at(i);
if (server_info->addressString == addressStringInstance && server_info->port == port)
{
existingIndex = (long) i;
break;
}
}
}
if (existingIndex > -1)
{
// We found an existing socket, just return a reference to it.
Dart_SetReturnValue(arguments, Dart_NewIntegerFromUint64(existingIndex));
return;
}
else
{
// There's no existing server, so bind a new one, and add it to the serverInfoVector.
#ifndef WIN32
int sockfd;
#else
WSADATA wsaData;
SOCKET ConnectSocket = INVALID_SOCKET;
// Initialize Winsock
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0)
{
Dart_Handle errorHandle = Dart_NewList(2);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("WSAStartup failed."));
Dart_ListSetAt(errorHandle, 1, Dart_NewInteger(iResult));
Dart_ThrowException(errorHandle);
return 1;
}
// TODO: Rest of Windows config:
// https://docs.microsoft.com/en-us/windows/desktop/winsock/complete-client-code
#endif
if (addressLength == 4)
{
// IPv4
sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
else
{
// IPv6
sockfd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
}
if (sockfd < 0)
{
Dart_Handle errorHandle = Dart_NewList(3);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to create socket."));
Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno)));
Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno));
Dart_ThrowException(errorHandle);
return;
}
int i = 1;
int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
if (ret < 0)
{
Dart_Handle errorHandle = Dart_NewList(3);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Cannot reuse address for socket."));
Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno)));
Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno));
Dart_ThrowException(errorHandle);
return;
}
/*
ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i));
if (ret < 0)
{
Dart_ThrowException(Dart_NewStringFromCString("Cannot reuse port for socket."));
return;
}
*/
if (addressLength > 4)
{
struct sockaddr_in6 v6
{
};
memset(&v6, 0, sizeof(v6));
v6.sin6_family = AF_INET6;
v6.sin6_port = htons((uint16_t)port);
ret = inet_pton(v6.sin6_family, addressString, &v6.sin6_addr.s6_addr);
if (ret >= 0)
ret = bind(sockfd, (const sockaddr *)&v6, sizeof(v6));
}
else
{
struct sockaddr_in v4
{
};
memset(&v4, 0, sizeof(v4));
v4.sin_family = AF_INET;
v4.sin_port = htons((uint16_t)port);
v4.sin_addr.s_addr = inet_addr(addressString);
if (ret >= 0)
ret = bind(sockfd, (const sockaddr *)&v4, sizeof(v4));
//ret = inet_pton(family, host, &v4.sin_addr);
}
/*if (ret < 1) {
Dart_ThrowException(Dart_NewStringFromCString("Cannot parse IP address."));
return;
}*/
//if (bind(sock, (const sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
if (ret < 0)
{
Dart_Handle errorHandle = Dart_NewList(3);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to bind socket."));
Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno)));
Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno));
Dart_ThrowException(errorHandle);
return;
}
if (listen(sockfd, SOMAXCONN) < 0)
{
Dart_Handle errorHandle = Dart_NewList(3);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to listen to bound socket."));
Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno)));
Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno));
Dart_ThrowException(errorHandle);
return;
}
if (listen(sockfd, (int)backlog) < 0)
{
Dart_Handle errorHandle = Dart_NewList(3);
Dart_ListSetAt(errorHandle, 0, Dart_NewStringFromCString("Failed to listen to bound socket."));
Dart_ListSetAt(errorHandle, 1, Dart_NewStringFromCString(strerror(errno)));
Dart_ListSetAt(errorHandle, 2, Dart_NewInteger(errno));
Dart_ThrowException(errorHandle);
return;
}
// Now that we've bound the socket, let's add it to the list.
auto *server_info = new WingsServerInfo;
server_info->sockfd = sockfd;
server_info->port = port;
server_info->ipv6 = addressLength > 4;
server_info->addressString += addressStringInstance;
Dart_SetReturnValue(arguments, Dart_NewIntegerFromUint64(serverInfoVector.size()));
serverInfoVector.push_back(server_info);
}
}

50
lib/src/http_listener.cc Normal file
View file

@ -0,0 +1,50 @@
#include <dart_native_api.h>
#include <thread>
#include "wings.h"
#include "wings_thread.h"
void handleMessage(Dart_Port destPortId, Dart_CObject *message);
void wings_StartHttpListener(Dart_NativeArguments arguments)
{
Dart_Port port = Dart_NewNativePort("angel_wings", handleMessage, true);
Dart_SetReturnValue(arguments, Dart_NewSendPort(port));
}
int64_t get_int(Dart_CObject *obj)
{
if (obj == nullptr)
return 0;
switch (obj->type)
{
case Dart_CObject_kInt32:
return (int64_t)obj->value.as_int32;
case Dart_CObject_kInt64:
return obj->value.as_int64;
default:
return 0;
}
}
void handleMessage(Dart_Port destPortId, Dart_CObject *message)
{
// We always expect an array to be sent.
Dart_CObject_Type firstType = message->value.as_array.values[0]->type;
// If it's a SendPort, then start a new thread that listens for incoming connections.
if (firstType == Dart_CObject_kSendPort)
{
std::lock_guard<std::mutex> lock(serverInfoVectorMutex);
auto *threadInfo = new wings_thread_info;
threadInfo->port = message->value.as_array.values[0]->value.as_send_port.id;
threadInfo->serverInfo = serverInfoVector.at((unsigned long)get_int(message->value.as_array.values[1]));
std::thread workerThread(wingsThreadMain, threadInfo);
workerThread.detach();
}
else if (firstType == Dart_CObject_kBool)
{
// The Dart world is trying to close this port.
Dart_Port port = message->value.as_array.values[1]->value.as_send_port.id;
Dart_CloseNativePort(port);
}
}

View file

@ -0,0 +1,18 @@
include:
- angel_wings|lib/src/wings.h
- angel_wings|lib/src/wings_thread.h
sources:
- angel_wings|lib/src/bind_socket.cc
- angel_wings|lib/src/http_listener.cc
- angel_wings|lib/src/send.cc
- angel_wings|lib/src/util.cc
- angel_wings|lib/src/wings.cc
- angel_wings|lib/src/worker_thread.cc
third_party:
http_parser:
git: https://github.com/nodejs/http-parser.git
commit: 5b76466
include:
- .
sources:
- http_parser.c

22
lib/src/send.cc Normal file
View file

@ -0,0 +1,22 @@
#include "wings.h"
void wings_CloseSocket(Dart_NativeArguments arguments)
{
Dart_Handle sockfdHandle = Dart_GetNativeArgument(arguments, 0);
uint64_t sockfd;
HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd));
close((int)sockfd);
}
void wings_Send(Dart_NativeArguments arguments)
{
Dart_Handle sockfdHandle = Dart_GetNativeArgument(arguments, 0);
Dart_Handle dataHandle = Dart_GetNativeArgument(arguments, 1);
uint64_t sockfd;
Dart_TypedData_Type dataType;
void *dataBytes;
intptr_t dataLength;
HandleError(Dart_IntegerToUint64(sockfdHandle, &sockfd));
HandleError(Dart_TypedDataAcquireData(dataHandle, &dataType, &dataBytes, &dataLength));
write((int)sockfd, dataBytes, (size_t)dataLength);
}

36
lib/src/util.cc Normal file
View file

@ -0,0 +1,36 @@
#include <dart_api.h>
#include "wings.h"
void wings_AddressToString(Dart_NativeArguments arguments) {
char *address;
void *data;
intptr_t length;
bool ipv6;
Dart_TypedData_Type type;
Dart_Handle address_handle = Dart_GetNativeArgument(arguments, 0);
Dart_Handle ipv6_handle = Dart_GetNativeArgument(arguments, 1);
HandleError(Dart_BooleanValue(ipv6_handle, &ipv6));
sa_family_t family;
if (ipv6) {
family = AF_INET6;
address = (char *) Dart_ScopeAllocate(INET6_ADDRSTRLEN);
} else {
family = AF_INET;
address = (char *) Dart_ScopeAllocate(INET_ADDRSTRLEN);
}
HandleError(Dart_TypedDataAcquireData(address_handle, &type, &data, &length));
auto *ptr = inet_ntop(family, data, address, INET_ADDRSTRLEN);
HandleError(Dart_TypedDataReleaseData(address_handle));
if (ptr == nullptr) {
if (ipv6)
Dart_ThrowException(Dart_NewStringFromCString("Invalid IPV6 address."));
else
Dart_ThrowException(Dart_NewStringFromCString("Invalid IPV4 address."));
} else {
Dart_SetReturnValue(arguments, Dart_NewStringFromCString(address));
}
}

63
lib/src/wings.cc Normal file
View file

@ -0,0 +1,63 @@
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <dart_api.h>
#include "wings.h"
// Forward declaration of ResolveName function.
Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope);
// The name of the initialization function is the extension name followed
// by _Init.
DART_EXPORT Dart_Handle wings_Init(Dart_Handle parent_library)
{
if (Dart_IsError(parent_library))
return parent_library;
Dart_Handle result_code =
Dart_SetNativeResolver(parent_library, ResolveName, NULL);
if (Dart_IsError(result_code))
return result_code;
return Dart_Null();
}
Dart_Handle HandleError(Dart_Handle handle)
{
if (Dart_IsError(handle))
Dart_PropagateError(handle);
return handle;
}
Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope)
{
// If we fail, we return NULL, and Dart throws an exception.
if (!Dart_IsString(name))
return NULL;
Dart_NativeFunction result = NULL;
const char *cname;
HandleError(Dart_StringToCString(name, &cname));
if (strcmp(cname, "AddressToString") == 0)
{
result = wings_AddressToString;
}
else if (strcmp(cname, "BindSocket") == 0)
{
result = wings_BindSocket;
}
else if (strcmp(cname, "CloseSocket") == 0)
{
result = wings_CloseSocket;
}
else if (strcmp(cname, "Send") == 0)
{
result = wings_Send;
}
else if (strcmp(cname, "StartHttpListener") == 0)
{
result = wings_StartHttpListener;
}
return result;
}

451
lib/src/wings.dart Normal file
View file

@ -0,0 +1,451 @@
part of angel_wings;
class AngelWings {
static const int messageBegin = 0,
messageComplete = 1,
url = 2,
headerField = 3,
headerValue = 4,
body = 5,
upgrade = 6,
upgradedMessage = 7;
static const int DELETE = 0,
GET = 1,
HEAD = 2,
POST = 3,
PUT = 4,
CONNECT = 5,
OPTIONS = 6,
TRACE = 7,
COPY = 8,
LOCK = 9,
MKCOL = 10,
MOVE = 11,
PROPFIND = 12,
PROPPATCH = 13,
SEARCH = 14,
UNLOCK = 15,
BIND = 16,
REBIND = 17,
UNBIND = 18,
ACL = 19,
REPORT = 20,
MKACTIVITY = 21,
CHECKOUT = 22,
MERGE = 23,
MSEARCH = 24,
NOTIFY = 25,
SUBSCRIBE = 26,
UNSUBSCRIBE = 27,
PATCH = 28,
PURGE = 29,
MKCALENDAR = 30,
LINK = 31,
UNLINK = 32,
SOURCE = 33;
static String methodToString(int method) {
switch (method) {
case DELETE:
return 'DELETE';
case GET:
return 'GET';
case HEAD:
return 'HEAD';
case POST:
return 'POST';
case PUT:
return 'PUT';
case CONNECT:
return 'CONNECT';
case OPTIONS:
return 'OPTIONS';
case PATCH:
return 'PATCH';
case PURGE:
return 'PURGE';
default:
throw new ArgumentError('Unknown method $method.');
}
}
final Angel app;
final bool shared;
final bool useZone;
final RawReceivePort _recv = new RawReceivePort();
final Map<String, MockHttpSession> _sessions = {};
final PooledMap<int, WingsRequestContext> _staging =
new PooledMap<int, WingsRequestContext>();
final Uuid _uuid = new Uuid();
InternetAddress _address;
int _port;
SendPort _sendPort;
static int _bindSocket(Uint8List address, String addressString, int port,
int backlog, bool shared) native "BindSocket";
static SendPort _startHttpListener() native "StartHttpListener";
AngelWings(this.app, {this.shared: false, this.useZone: true}) {
_recv.handler = _handleMessage;
}
InternetAddress get address => _address;
int get port => _port;
Future startServer([host, int port, int backlog = 10]) {
Future<InternetAddress> _addr = host is InternetAddress
? host
: InternetAddress.lookup(host?.toString() ?? '127.0.0.1').then((list) =>
list.isNotEmpty
? list.first
: throw new StateError('IP lookup failed.'));
return _addr.then((address) {
try {
var serverInfoIndex = _bindSocket(
new Uint8List.fromList(address.rawAddress),
address.address,
port ?? 0,
backlog,
shared);
_sendPort = _startHttpListener();
_sendPort.send([_recv.sendPort, serverInfoIndex]);
_address = address;
_port = port;
} on List catch (osError) {
if (osError.length == 3) {
throw new SocketException(osError[0] as String,
osError: new OSError(osError[1] as String, osError[2] as int));
} else {
throw new SocketException('Could not start Wings server.',
osError: new OSError(osError[0] as String, osError[1] as int));
}
} on String catch (message) {
throw new SocketException(message);
}
});
}
Future close() {
_sendPort.send([true, _sendPort]);
_recv.close();
return new Future.value();
}
void _handleMessage(x) {
if (x is String) {
close();
throw new StateError(x);
} else if (x is List && x.length >= 2) {
int sockfd = x[0], command = x[1];
WingsRequestContext _newRequest() =>
new WingsRequestContext._(this, sockfd, app);
//print(x);
switch (command) {
case messageBegin:
_staging.putIfAbsent(sockfd, _newRequest);
break;
case messageComplete:
// (sockfd, method, major, minor, addrBytes)
_staging.update(sockfd, (rq) {
rq._method = methodToString(x[2] as int);
rq._addressBytes = x[5] as Uint8List;
return rq;
}, defaultValue: _newRequest).then(_handleRequest);
break;
case body:
_staging.update(sockfd, (rq) {
(rq._body ??= new StreamController<Uint8List>())
.add(x[2] as Uint8List);
return rq;
}, defaultValue: _newRequest);
break;
//case upgrade:
// TODO: Handle WebSockets...?
// if (onUpgrade != null) onUpgrade(sockfd);
// break;
//case upgradedMessage:
// TODO: Handle upgrade
// onUpgradedMessage(sockfd, x[2]);
// break;
case url:
_staging.update(sockfd, (rq) => rq..__url = x[2] as String,
defaultValue: _newRequest);
break;
case headerField:
_staging.update(sockfd, (rq) => rq.._headerField = x[2] as String,
defaultValue: _newRequest);
break;
case headerValue:
_staging.update(sockfd, (rq) => rq.._headerValue = x[2] as String,
defaultValue: _newRequest);
break;
}
}
}
Future _handleRequest(WingsRequestContext req) {
if (req == null) return new Future.value();
var res = new WingsResponseContext._(req)
..app = app
..serializer = app.serializer
..encoders.addAll(app.encoders);
handle() {
var path = req.path;
if (path == '/') path = '';
Tuple3<List, Map, ParseResult<Map<String, dynamic>>> resolveTuple() {
Router r = app.optimizedRouter;
var resolved =
r.resolveAbsolute(path, method: req.method, strip: false);
return new Tuple3(
new MiddlewarePipeline(resolved).handlers,
resolved.fold<Map>({}, (out, r) => out..addAll(r.allParams)),
resolved.isEmpty ? null : resolved.first.parseResult,
);
}
var cacheKey = req.method + path;
var tuple = app.isProduction
? app.handlerCache.putIfAbsent(cacheKey, resolveTuple)
: resolveTuple();
req.params.addAll(tuple.item2);
req.inject(ParseResult, tuple.item3);
if (!app.isProduction && app.logger != null)
req.inject(Stopwatch, new Stopwatch()..start());
var pipeline = tuple.item1;
Future<bool> Function() runPipeline;
for (var handler in pipeline) {
if (handler == null) break;
if (runPipeline == null)
runPipeline = () => app.executeHandler(handler, req, res);
else {
var current = runPipeline;
runPipeline = () => current().then((result) => !result
? new Future.value(result)
: app.executeHandler(handler, req, res));
}
}
return runPipeline == null
? sendResponse(req, res)
: runPipeline().then((_) => sendResponse(req, res));
}
if (useZone == false) {
return handle().catchError((e, StackTrace st) {
if (e is FormatException)
throw new AngelHttpException.badRequest(message: e.message)
..stackTrace = st;
throw new AngelHttpException(e, stackTrace: st, statusCode: 500);
}, test: (e) => e is! AngelHttpException).catchError(
(AngelHttpException e, StackTrace st) {
return handleAngelHttpException(e, e.stackTrace ?? st, req, res);
}).whenComplete(() => res.dispose());
} else {
var zoneSpec = new ZoneSpecification(
print: (self, parent, zone, line) {
if (app.logger != null)
app.logger.info(line);
else
parent.print(zone, line);
},
handleUncaughtError: (self, parent, zone, error, stackTrace) {
var trace = new Trace.from(stackTrace ?? StackTrace.current).terse;
return new Future(() {
AngelHttpException e;
if (error is FormatException) {
e = new AngelHttpException.badRequest(message: error.message);
} else if (error is AngelHttpException) {
e = error;
} else {
e = new AngelHttpException(error,
stackTrace: stackTrace, message: error?.toString());
}
if (app.logger != null) {
app.logger.severe(e.message ?? e.toString(), error, trace);
}
return handleAngelHttpException(e, trace, req, res);
}).catchError((e, StackTrace st) {
var trace = new Trace.from(st ?? StackTrace.current).terse;
WingsResponseContext._closeSocket(req._sockfd);
// Ideally, we won't be in a position where an absolutely fatal error occurs,
// but if so, we'll need to log it.
if (app.logger != null) {
app.logger.severe(
'Fatal error occurred when processing ${req.uri}.', e, trace);
} else {
stderr
..writeln('Fatal error occurred when processing '
'${req.uri}:')
..writeln(e)
..writeln(trace);
}
});
},
);
var zone = Zone.current.fork(specification: zoneSpec);
req.inject(Zone, zone);
req.inject(ZoneSpecification, zoneSpec);
return zone.run(handle).whenComplete(() {
res.dispose();
});
}
}
/// Handles an [AngelHttpException].
Future handleAngelHttpException(AngelHttpException e, StackTrace st,
WingsRequestContext req, WingsResponseContext res,
{bool ignoreFinalizers: false}) {
if (req == null || res == null) {
try {
app.logger?.severe(e, st);
var b = new StringBuffer();
b.writeln('HTTP/1.1 500 Internal Server Error');
b.writeln();
WingsResponseContext._send(
req._sockfd, _coerceUint8List(b.toString().codeUnits));
WingsResponseContext._closeSocket(req._sockfd);
} finally {
return null;
}
}
Future handleError;
if (!res.isOpen)
handleError = new Future.value();
else {
res.statusCode = e.statusCode;
handleError =
new Future.sync(() => app.errorHandler(e, req, res)).then((result) {
return app.executeHandler(result, req, res).then((_) => res.end());
});
}
return handleError.then((_) =>
sendResponse(req, res, ignoreFinalizers: ignoreFinalizers == true));
}
/// Sends a response.
Future sendResponse(WingsRequestContext req, WingsResponseContext res,
{bool ignoreFinalizers: false}) {
if (res.willCloseItself) return new Future.value();
Future finalizers = ignoreFinalizers == true
? new Future.value()
: app.responseFinalizers.fold<Future>(
new Future.value(), (out, f) => out.then((_) => f(req, res)));
if (res.isOpen) res.end();
var headers = <String, String>{};
headers.addAll(res.headers);
headers['content-length'] = res.buffer.length.toString();
// Ignore chunked transfer encoding
//request.response.headers.chunkedTransferEncoding = res.chunked ?? true;
// TODO: Is there a need to support this?
List<int> outputBuffer = res.buffer.toBytes();
if (res.encoders.isNotEmpty) {
var allowedEncodings = req.headers
.value('accept-encoding')
?.split(',')
?.map((s) => s.trim())
?.where((s) => s.isNotEmpty)
?.map((str) {
// Ignore quality specifications in accept-encoding
// ex. gzip;q=0.8
if (!str.contains(';')) return str;
return str.split(';')[0];
});
if (allowedEncodings != null) {
for (var encodingName in allowedEncodings) {
Converter<List<int>, List<int>> encoder;
String key = encodingName;
if (res.encoders.containsKey(encodingName))
encoder = res.encoders[encodingName];
else if (encodingName == '*') {
encoder = res.encoders[key = res.encoders.keys.first];
}
if (encoder != null) {
headers['content-encoding'] = key;
outputBuffer = res.encoders[key].convert(outputBuffer);
headers['content-length'] = outputBuffer.length.toString();
break;
}
}
}
}
var b = new StringBuffer();
b.writeln('HTTP/1.1 ${res.statusCode}');
res.headers.forEach((k, v) {
b.writeln('$k: $v');
});
// Persist session ID
if (res.correspondingRequest._session != null) {
res.cookies
.add(new Cookie('DARTSESSID', res.correspondingRequest._session.id));
}
// Send all cookies
for (var cookie in res.cookies) {
var value = cookie.toString();
b.writeln('set-cookie: $value');
}
b.writeln();
var buf = new Uint8List.fromList(
new List<int>.from(b.toString().codeUnits)..addAll(outputBuffer));
return finalizers.then((_) {
WingsResponseContext._send(req._sockfd, buf);
WingsResponseContext._closeSocket(req._sockfd);
if (req.injections.containsKey(PoolResource)) {
req.injections[PoolResource].release();
}
if (!app.isProduction && app.logger != null) {
var sw = req.grab<Stopwatch>(Stopwatch);
if (sw.isRunning) {
sw?.stop();
app.logger.info("${res.statusCode} ${req.method} ${req.uri} (${sw
?.elapsedMilliseconds ?? 'unknown'} ms)");
}
}
});
}
}

45
lib/src/wings.h Normal file
View file

@ -0,0 +1,45 @@
#ifndef ANGEL_WINGS_H
#define ANGEL_WINGS_H
#ifndef WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#else
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <sstream>
// Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
#pragma comment(lib, "AdvApi32.lib")
#endif
#include <cstdint>
#include <mutex>
#include <string>
#include <vector>
#include <dart_api.h>
class WingsServerInfo
{
public:
std::mutex mutex;
std::string addressString;
uint64_t port;
int sockfd;
bool ipv6;
};
extern std::mutex serverInfoVectorMutex;
extern std::vector<WingsServerInfo *> serverInfoVector;
Dart_Handle HandleError(Dart_Handle handle);
void wings_AddressToString(Dart_NativeArguments arguments);
void wings_BindSocket(Dart_NativeArguments arguments);
void wings_CloseSocket(Dart_NativeArguments arguments);
void wings_Send(Dart_NativeArguments arguments);
void wings_StartHttpListener(Dart_NativeArguments arguments);
#endif

157
lib/src/wings_request.dart Normal file
View file

@ -0,0 +1,157 @@
part of angel_wings;
class WingsRequestContext extends RequestContext {
final AngelWings _wings;
final int _sockfd;
@override
Angel app;
WingsRequestContext._(this._wings, this._sockfd, Angel app) : this.app = app;
static final RegExp _straySlashes = new RegExp(r'(^/+)|(/+$)');
final Map<String, String> _headers = {};
String __contentTypeString;
String __path;
String __url;
Uint8List _addressBytes;
StreamController<Uint8List> _body;
ContentType _contentType;
List<Cookie> _cookies;
String _headerField, _hostname, _originalMethod, _method, _path;
HttpHeaders _httpHeaders;
InternetAddress _remoteAddress;
HttpSession _session;
Uri _uri;
static String _addressToString(Uint8List bytes, bool ipV6)
native "AddressToString";
String get _contentTypeString =>
__contentTypeString ??= _headers['content-type']?.toString();
void set _headerValue(String value) {
if (_headerField != null) {
_headers[_headerField.toLowerCase()] = value;
_headerField = null;
}
}
@override
ContentType get contentType => _contentType ??= (_contentTypeString == null
? ContentType.binary
: ContentType.parse(_contentTypeString));
@override
List<Cookie> get cookies {
if (_cookies != null) {
return _cookies;
}
var cookies = <Cookie>[];
return _cookies = new List<Cookie>.unmodifiable(cookies);
}
@override
HttpHeaders get headers => _httpHeaders ??= new _WingsIncomingHeaders(this);
@override
String get hostname => _hostname ??=
(_headers['host'] ?? '${_wings.address.address}:${_wings.port}');
@override
HttpRequest get io => null;
@override
String get method =>
_method ??= (_headers['x-http-method-override'] ?? originalMethod);
@override
String get originalMethod => _originalMethod;
@override
Future<BodyParseResult> parseOnce() {
return parseBodyFromStream(
_body?.stream ?? new Stream<List<int>>.empty(),
contentType == null ? null : new MediaType.parse(contentType.toString()),
uri,
storeOriginalBuffer: app.storeOriginalBuffer,
);
}
@override
String get path {
if (_path != null) {
return __path;
} else {
var path = __path.replaceAll(_straySlashes, '');
if (path.isEmpty) path = '/';
return _path = path;
}
}
@override
InternetAddress get remoteAddress => _remoteAddress ??= new InternetAddress(
_addressToString(_addressBytes, _addressBytes.length > 4));
@override
HttpSession get session {
if (_session != null) return _session;
var dartSessIdCookie = cookies.firstWhere((c) => c.name == 'DARTSESSID',
orElse: () => new Cookie('DARTSESSID', _wings._uuid.v4().toString()));
return _session = _wings._sessions.putIfAbsent(dartSessIdCookie.value,
() => new MockHttpSession(id: dartSessIdCookie.value));
}
@override
Uri get uri => _uri ??= Uri.parse(__url);
@override
bool get xhr =>
_headers['x-requested-with']?.trim()?.toLowerCase() == 'xmlhttprequest';
}
class _WingsIncomingHeaders extends HttpHeaders {
final WingsRequestContext request;
_WingsIncomingHeaders(this.request);
UnsupportedError _unsupported() =>
new UnsupportedError('Cannot modify incoming HTTP headers.');
@override
List<String> operator [](String name) {
return value(name)?.split(',')?.map((s) => s.trim())?.toList();
}
@override
void add(String name, Object value) => throw _unsupported();
@override
void clear() => throw _unsupported();
@override
void forEach(void Function(String name, List<String> values) f) {
request._headers.forEach((name, value) =>
f(name, value.split(',').map((s) => s.trim()).toList()));
}
@override
void noFolding(String name) => throw _unsupported();
@override
void remove(String name, Object value) => throw _unsupported();
@override
void removeAll(String name) => throw _unsupported();
@override
void set(String name, Object value) => throw _unsupported();
@override
String value(String name) => request._headers[name.toLowerCase()];
}

158
lib/src/wings_response.dart Normal file
View file

@ -0,0 +1,158 @@
part of angel_wings;
class WingsResponseContext extends ResponseContext {
final WingsRequestContext correspondingRequest;
bool _isClosed = false, _useStream = false;
WingsResponseContext._(this.correspondingRequest);
static void _send(int sockfd, Uint8List data) native "Send";
static void _closeSocket(int sockfd) native "CloseSocket";
@override
void add(List<int> data) {
if (_isClosed && !_useStream)
throw ResponseContext.closed();
else if (_useStream)
_send(correspondingRequest._sockfd, _coerceUint8List(data));
else
buffer.add(data);
}
@override
Future close() {
_closeSocket(correspondingRequest._sockfd);
_isClosed = true;
_useStream = false;
return super.close();
}
@override
void end() {
_isClosed = true;
super.end();
}
@override
Future addStream(Stream<List<int>> stream) {
if (_isClosed && !_useStream) throw ResponseContext.closed();
var firstStream = useStream();
Stream<List<int>> output = stream;
if ((firstStream || !headers.containsKey('content-encoding')) &&
encoders.isNotEmpty &&
correspondingRequest != null) {
var allowedEncodings =
(correspondingRequest.headers['accept-encoding'] ?? []).map((str) {
// Ignore quality specifications in accept-encoding
// ex. gzip;q=0.8
if (!str.contains(';')) return str;
return str.split(';')[0];
});
for (var encodingName in allowedEncodings) {
Converter<List<int>, List<int>> encoder;
String key = encodingName;
if (encoders.containsKey(encodingName))
encoder = encoders[encodingName];
else if (encodingName == '*') {
encoder = encoders[key = encoders.keys.first];
}
if (encoder != null) {
/*
if (firstStream) {
this.stream.sendHeaders([
new Header.ascii(
'content-encoding', headers['content-encoding'] = key)
]);
}
*/
output = encoders[key].bind(output);
break;
}
}
}
return output.forEach(((data) =>
_send(correspondingRequest._sockfd, _coerceUint8List(data))));
}
@override
HttpResponse get io => null;
@override
bool get isOpen => !_isClosed;
@override
bool get streaming => _useStream;
@override
bool useStream() {
if (!_useStream) {
// If this is the first stream added to this response,
// then add headers, status code, etc.
_finalize();
willCloseItself = _useStream = _isClosed = true;
releaseCorrespondingRequest();
return true;
}
return false;
}
/// Write headers, status, etc. to the underlying [stream].
void _finalize() {
var b = new StringBuffer();
b.writeln('HTTP/1.1 $statusCode');
headers['date'] ??= HttpDate.format(new DateTime.now());
if (encoders.isNotEmpty && correspondingRequest != null) {
var allowedEncodings =
(correspondingRequest.headers['accept-encoding'] ?? []).map((str) {
// Ignore quality specifications in accept-encoding
// ex. gzip;q=0.8
if (!str.contains(';')) return str;
return str.split(';')[0];
});
for (var encodingName in allowedEncodings) {
String key = encodingName;
if (encoders.containsKey(encodingName)) {
this.headers['content-encoding'] = key;
break;
}
}
}
// Add all normal headers
this.headers.forEach((k, v) {
b.writeln('$k: $v');
});
// Persist session ID
if (correspondingRequest._session != null) {
cookies.add(new Cookie('DARTSESSID', correspondingRequest._session.id));
}
// Send all cookies
for (var cookie in cookies) {
var value = cookie.toString();
b.writeln('set-cookie: $value');
}
b.writeln();
_send(
correspondingRequest._sockfd, _coerceUint8List(b.toString().codeUnits));
}
}
Uint8List _coerceUint8List(List<int> list) =>
list is Uint8List ? list : new Uint8List.fromList(list);

25
lib/src/wings_thread.h Normal file
View file

@ -0,0 +1,25 @@
#ifndef ANGEL_WINGS_THREAD_H
#define ANGEL_WINGS_THREAD_H
#include <dart_api.h>
#include <http_parser.h>
#include "wings.h"
typedef struct
{
Dart_Port port;
WingsServerInfo *serverInfo;
} wings_thread_info;
typedef struct
{
bool ipv6;
int sock;
sockaddr addr;
socklen_t addr_len;
Dart_Port port;
} requestInfo;
void wingsThreadMain(wings_thread_info *info);
void handleRequest(requestInfo *rq);
#endif

215
lib/src/worker_thread.cc Normal file
View file

@ -0,0 +1,215 @@
#include <utility>
#include <dart_native_api.h>
#include "wings_thread.h"
void wingsThreadMain(wings_thread_info *info)
{
auto *serverInfo = std::move(info->serverInfo);
Dart_Port port = std::move(info->port);
//delete info;
while (true)
{
std::lock_guard<std::mutex> lock(serverInfo->mutex);
sockaddr client_addr{};
socklen_t client_addr_len;
int client = accept(serverInfo->sockfd, &client_addr, &client_addr_len);
if (client < 0)
{
// send_error(info->port, "Failed to accept client socket.");
return;
}
requestInfo rq{};
rq.ipv6 = serverInfo->ipv6;
rq.sock = client;
rq.addr = client_addr;
rq.addr_len = client_addr_len;
rq.port = port;
handleRequest(&rq);
}
}
int send_notification(http_parser *parser, int code)
{
//if (parser == nullptr) return 0;
auto *rq = (requestInfo *)parser->data;
//if (rq == nullptr) return 0;
Dart_CObject first{};
Dart_CObject second{};
first.type = second.type = Dart_CObject_kInt64;
first.value.as_int64 = rq->sock;
second.value.as_int64 = code;
Dart_CObject *list[2]{&first, &second};
Dart_CObject obj{};
obj.type = Dart_CObject_kArray;
obj.value.as_array.length = 2;
obj.value.as_array.values = list;
Dart_PostCObject(rq->port, &obj);
return 0;
}
int send_string(http_parser *parser, char *str, size_t length, int code, bool as_typed_data = false)
{
//if (parser == nullptr) return 0;
auto *rq = (requestInfo *)parser->data;
//if (rq == nullptr) return 0;
auto *s = new char[length + 1];
memset(s, 0, length + 1);
Dart_CObject first{};
Dart_CObject second{};
Dart_CObject third{};
first.type = second.type = Dart_CObject_kInt32;
first.value.as_int32 = rq->sock;
second.value.as_int32 = code;
if (!as_typed_data)
{
third.type = Dart_CObject_kString;
memcpy(s, str, length);
third.value.as_string = s;
}
else
{
third.type = Dart_CObject_kExternalTypedData;
third.type = Dart_CObject_kExternalTypedData;
third.value.as_external_typed_data.type = Dart_TypedData_kUint8;
third.value.as_external_typed_data.length = length;
third.value.as_external_typed_data.data = (uint8_t *)str;
}
// Post the string back to Dart...
Dart_CObject *list[3]{&first, &second, &third};
Dart_CObject obj{};
obj.type = Dart_CObject_kArray;
obj.value.as_array.length = 3;
obj.value.as_array.values = list;
Dart_PostCObject(rq->port, &obj);
delete[] s;
return 0;
}
int send_oncomplete(http_parser *parser, int code)
{
//if (parser == nullptr) return 0;
auto *rq = (requestInfo *)parser->data;
//if (rq == nullptr) return 0;
Dart_CObject sockfd{};
Dart_CObject command{};
Dart_CObject method{};
Dart_CObject major{};
Dart_CObject minor{};
Dart_CObject addr{};
sockfd.type = command.type = method.type = major.type = minor.type = Dart_CObject_kInt32;
addr.type = Dart_CObject_kExternalTypedData;
sockfd.value.as_int32 = rq->sock;
command.value.as_int32 = code;
method.value.as_int32 = parser->method;
major.value.as_int32 = parser->http_major;
minor.value.as_int32 = parser->http_minor;
addr.value.as_external_typed_data.type = Dart_TypedData_kUint8;
addr.value.as_external_typed_data.length = rq->addr_len;
if (rq->ipv6)
{
auto *v6 = (sockaddr_in6 *)&rq->addr;
addr.value.as_external_typed_data.data = (uint8_t *)v6->sin6_addr.s6_addr;
}
else
{
auto *v4 = (sockaddr_in *)&rq->addr;
addr.value.as_external_typed_data.data = (uint8_t *)&v4->sin_addr.s_addr;
}
Dart_CObject *list[6]{&sockfd, &command, &method, &major, &minor, &addr};
Dart_CObject obj{};
obj.type = Dart_CObject_kArray;
obj.value.as_array.length = 6;
obj.value.as_array.values = list;
Dart_PostCObject(rq->port, &obj);
//delete parser;
return 0;
}
void handleRequest(requestInfo *rq)
{
size_t len = 80 * 1024, nparsed;
char buf[len];
ssize_t recved;
memset(buf, 0, len);
http_parser parser{};
http_parser_init(&parser, HTTP_REQUEST);
parser.data = rq; //rq.get();
http_parser_settings settings{};
settings.on_message_begin = [](http_parser *parser) {
// std::cout << "mb" << std::endl;
return send_notification(parser, 0);
};
settings.on_message_complete = [](http_parser *parser) {
//std::cout << "mc" << std::endl;
send_oncomplete(parser, 1);
//delete (requestInfo *) parser->data;
//std::cout << "deleted rq!" << std::endl;
return 0;
};
settings.on_url = [](http_parser *parser, const char *at, size_t length) {
// std::cout << "url" << std::endl;
return send_string(parser, (char *)at, length, 2);
};
settings.on_header_field = [](http_parser *parser, const char *at, size_t length) {
// std::cout << "hf" << std::endl;
return send_string(parser, (char *)at, length, 3);
};
settings.on_header_value = [](http_parser *parser, const char *at, size_t length) {
// std::cout << "hv" << std::endl;
return send_string(parser, (char *)at, length, 4);
};
settings.on_body = [](http_parser *parser, const char *at, size_t length) {
// std::cout << "body" << std::endl;
return send_string(parser, (char *)at, length, 5, true);
};
unsigned int isUpgrade = 0;
// std::cout << "start" << std::endl;
while ((recved = recv(rq->sock, buf, len, 0)) > 0)
{
if (isUpgrade)
{
send_string(&parser, buf, (size_t)recved, 7, true);
}
else
{
/* Start up / continue the parser.
* Note we pass recved==0 to signal that EOF has been received.
*/
nparsed = http_parser_execute(&parser, &settings, buf, (size_t)recved);
if ((isUpgrade = parser.upgrade) == 1)
{
send_notification(&parser, 6);
}
else if (nparsed != recved)
{
close(rq->sock);
return;
}
}
memset(buf, 0, len);
}
}

13
pubspec.yaml Normal file
View file

@ -0,0 +1,13 @@
name: angel_wings
dependencies:
angel_framework: ^1.0.0
build_native: ^0.0.9
mock_request: ^1.0.0
pooled_map: ^1.0.0
uuid: ^1.0.0
dev_dependencies:
build_runner:
git:
url: https://github.com/thosakwe/build.git
path: build_runner
ref: experimental-hot-reloading