Almost done

This commit is contained in:
Tobe O 2019-04-29 04:22:36 -04:00
parent 71d8a07513
commit 18cb8749ee
12 changed files with 589 additions and 75 deletions

View file

@ -1,6 +1,8 @@
CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include override CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include \
objects := lib/src/angel_wings.o lib/src/wings_socket.o\ -I .dart_tool
lib/src/bind.o lib/src/util.o objects := lib/src/angel_wings.o lib/src/wings_socket.o \
lib/src/bind.o lib/src/util.o lib/src/http.o \
.dart_tool/http-parser/http_parser.o
.PHONY: distclean clean .PHONY: distclean clean

View file

@ -1,7 +1,26 @@
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:angel_static/angel_static.dart';
import 'package:angel_wings/angel_wings.dart'; import 'package:angel_wings/angel_wings.dart';
import 'package:file/local.dart';
import 'package:logging/logging.dart';
main() async { main() async {
var app = Angel(); var app = Angel();
var wings = AngelWings(app); var wings = AngelWings(app);
var fs = LocalFileSystem();
var vDir = CachingVirtualDirectory(app, fs,
source: fs.currentDirectory, allowDirectoryListing: true);
app.logger = Logger('wings')
..onRecord.listen((rec) {
print(rec);
if (rec.error != null) print(rec.error);
if (rec.stackTrace != null) print(rec.stackTrace);
});
app.get('/', (req, res) => 'WINGS!!!');
app.fallback(vDir.handleRequest);
app.fallback((req, res) => throw AngelHttpException.notFound());
await wings.startServer('127.0.0.1', 3000);
print('Listening at http://localhost:3000');
} }

View file

@ -1,10 +1,27 @@
import 'dart:convert';
import 'dart:typed_data';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_wings/angel_wings.dart'; import 'package:angel_wings/angel_wings.dart';
main() async { main() async {
var app = Angel();
var socket = await WingsSocket.bind('127.0.0.1', 3000); var socket = await WingsSocket.bind('127.0.0.1', 3000);
print('Listening at http://localhost:3000');
await for (var fd in socket) { await for (var fd in socket) {
print('FD: $fd'); var response = '''
closeNativeSocketDescriptor(fd); HTTP/1.1 200 Not Found\r
Date: Fri, 31 Dec 1999 23:59:59 GMT\r
server: wings-test\r\n\r
Nope, nothing's here!
\r\n\r
''';
var bytes = utf8.encode(response);
var data = Uint8List.fromList(bytes);
var rq = await WingsRequestContext.from(app, fd);
print('Yay: $rq');
print(rq.headers);
writeToNativeSocket(fd.fileDescriptor, data);
closeNativeSocketDescriptor(fd.fileDescriptor);
} }
} }

View file

@ -49,5 +49,7 @@ Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_sco
result = Dart_WingsSocket_close; result = Dart_WingsSocket_close;
if (strcmp("Dart_WingsSocket_listen", cname) == 0) if (strcmp("Dart_WingsSocket_listen", cname) == 0)
result = Dart_WingsSocket_listen; result = Dart_WingsSocket_listen;
if (strcmp("Dart_WingsSocket_parseHttp", cname) == 0)
result = Dart_WingsSocket_parseHttp;
return result; return result;
} }

View file

@ -2,6 +2,7 @@
#define ANGEL_WINGS_WINGS_H #define ANGEL_WINGS_WINGS_H
#include <dart_api.h> #include <dart_api.h>
#include <dart_native_api.h>
#include "angel_wings.h" #include "angel_wings.h"
Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope); Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_scope);
@ -14,5 +15,7 @@ void Dart_WingsSocket_write(Dart_NativeArguments arguments);
void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments); void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments);
void Dart_WingsSocket_close(Dart_NativeArguments arguments); void Dart_WingsSocket_close(Dart_NativeArguments arguments);
void Dart_WingsSocket_listen(Dart_NativeArguments arguments); void Dart_WingsSocket_listen(Dart_NativeArguments arguments);
void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments);
void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message);
#endif #endif

142
lib/src/http.cc Normal file
View file

@ -0,0 +1,142 @@
#include <iostream>
#include <http-parser/http_parser.h>
#include "angel_wings.h"
#include "wings_socket.h"
using namespace wings;
void Dart_WingsSocket_parseHttp(Dart_NativeArguments arguments)
{
Dart_Port service_port =
Dart_NewNativePort("WingsHttpCallback", &wingsHttpCallback, true);
Dart_Handle send_port = Dart_NewSendPort(service_port);
Dart_SetReturnValue(arguments, send_port);
}
void wingsHttpCallback(Dart_Port dest_port_id, Dart_CObject *message)
{
int64_t fd = -1;
Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id;
Dart_CObject *fdArg = message->value.as_array.values[1];
#define thePort (*((Dart_Port *)parser->data))
#define sendInt(n) \
Dart_CObject obj; \
obj.type = Dart_CObject_kInt64; \
obj.value.as_int64 = (n); \
Dart_PostCObject(thePort, &obj);
#define sendString() \
if (length > 0) \
{ \
std::string str(at, length); \
Dart_CObject obj; \
obj.type = Dart_CObject_kString; \
obj.value.as_string = (char *)str.c_str(); \
Dart_PostCObject(thePort, &obj); \
}
if (fdArg->type == Dart_CObject_kInt32)
{
fd = (int64_t)fdArg->value.as_int32;
}
else
{
fd = fdArg->value.as_int64;
}
if (fd != -1)
{
http_parser_settings settings;
settings.on_message_begin = [](http_parser *parser) {
return 0;
};
settings.on_headers_complete = [](http_parser *parser) {
{
sendInt(0);
}
{
sendInt(parser->method);
}
return 0;
};
settings.on_message_complete = [](http_parser *parser) {
sendInt(1);
return 0;
};
settings.on_chunk_complete = [](http_parser *parser) {
return 0;
};
settings.on_chunk_header = [](http_parser *parser) {
return 0;
};
settings.on_url = [](http_parser *parser, const char *at, size_t length) {
sendString();
return 0;
};
settings.on_header_field = [](http_parser *parser, const char *at, size_t length) {
sendString();
return 0;
};
settings.on_header_value = [](http_parser *parser, const char *at, size_t length) {
sendString();
return 0;
};
settings.on_body = [](http_parser *parser, const char *at, size_t length) {
sendString();
return 0;
};
size_t len = 80 * 1024, nparsed = 0;
char buf[len];
ssize_t recved = 0;
memset(buf, 0, sizeof(buf));
// http_parser parser;
auto *parser = (http_parser *)malloc(sizeof(http_parser));
http_parser_init(parser, HTTP_BOTH);
parser->data = &outPort;
while ((recved = recv(fd, buf, len, 0)) >= 0)
{
if (false) // (isUpgrade)
{
// send_string(&parser, buf, (size_t)recved, 7, true);
}
else
{
/* Start up / continue the parser.
* Note we pass recved==0 to signal that EOF has been received.
*/
nparsed = http_parser_execute(parser, &settings, buf, recved);
if (nparsed != recved)
{
// TODO: End it...!
}
else if (recved == 0)
{
break;
}
// if ((isUpgrade = parser.upgrade) == 1)
// {
// send_notification(&parser, 6);
// }
// else if (nparsed != recved)
// {
// close(rq->sock);
// return;
// }
}
// memset(buf, 0, len);
}
}
}

View file

@ -1,3 +1,4 @@
#include <dart_native_api.h>
#include "angel_wings.h" #include "angel_wings.h"
#include "wings_socket.h" #include "wings_socket.h"
using namespace wings; using namespace wings;
@ -25,7 +26,16 @@ void Dart_WingsSocket_getPort(Dart_NativeArguments arguments)
void Dart_WingsSocket_write(Dart_NativeArguments arguments) void Dart_WingsSocket_write(Dart_NativeArguments arguments)
{ {
// TODO: Actually do something. int64_t fd;
void *data;
Dart_TypedData_Type type;
intptr_t len;
Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0);
Dart_Handle dataHandle = Dart_GetNativeArgument(arguments, 1);
HandleError(Dart_IntegerToInt64(fdHandle, &fd));
HandleError(Dart_TypedDataAcquireData(dataHandle, &type, &data, &len));
write(fd, data, len);
HandleError(Dart_TypedDataReleaseData(dataHandle));
} }
void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments) void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments)

View file

@ -7,8 +7,8 @@ import 'wings_request.dart';
import 'wings_response.dart'; import 'wings_response.dart';
import 'wings_socket.dart'; import 'wings_socket.dart';
class AngelWings extends Driver<int, int, WingsSocket, WingsRequestContext, class AngelWings extends Driver<WingsClientSocket, int, WingsSocket,
WingsResponseContext> { WingsRequestContext, WingsResponseContext> {
factory AngelWings(Angel app) { factory AngelWings(Angel app) {
return AngelWings._(app, WingsSocket.bind); return AngelWings._(app, WingsSocket.bind);
} }
@ -31,21 +31,19 @@ class AngelWings extends Driver<int, int, WingsSocket, WingsRequestContext,
} }
@override @override
Future<WingsRequestContext> createRequestContext(int request, int response) { Future<WingsRequestContext> createRequestContext(WingsClientSocket request, int response) {
// TODO: implement createRequestContext return WingsRequestContext.from(app, request);
return null;
} }
@override @override
Future<WingsResponseContext> createResponseContext(int request, int response, Future<WingsResponseContext> createResponseContext(WingsClientSocket request, int response,
[WingsRequestContext correspondingRequest]) { [WingsRequestContext correspondingRequest]) {
// TODO: implement createResponseContext return Future.value(WingsResponseContext(request.fileDescriptor, correspondingRequest));
return null;
} }
@override @override
Stream<int> createResponseStreamFromRawRequest(int request) { Stream<int> createResponseStreamFromRawRequest(WingsClientSocket request) {
return Stream.fromIterable([request]); return Stream.fromIterable([request.fileDescriptor]);
} }
@override @override

View file

@ -1,53 +1,173 @@
import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'package:angel_container/src/container.dart'; import 'dart:isolate';
import 'package:angel_container/angel_container.dart';
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:mock_request/mock_request.dart';
import 'wings_socket.dart';
class WingsRequestContext extends RequestContext<int> { enum _ParseState { method, url, headerField, headerValue, body }
@override
// TODO: implement body final RegExp _straySlashes = new RegExp(r'(^/+)|(/+$)');
Stream<List<int>> get body => null;
class WingsRequestContext extends RequestContext<WingsClientSocket> {
final WingsClientSocket rawRequest;
final Container container;
final StreamController<List<int>> _body = StreamController();
List<Cookie> _cookies, __cookies;
final LockableMockHttpHeaders _headers = LockableMockHttpHeaders();
final RawReceivePort _recv;
InternetAddress _remoteAddress;
String _method, _override, _path;
Uri _uri;
@override @override
// TODO: implement container Angel app;
Container get container => null;
WingsRequestContext._(this.app, this.rawRequest, this._recv)
: container = app.container.createChild();
static const int DELETE = 0,
GET = 1,
HEAD = 2,
POST = 3,
PUT = 4,
CONNECT = 5,
OPTIONS = 6,
TRACE = 7,
COPY = 8,
LOCK = 9,
MKCOL = 10,
MOVE = 11,
PROPFIND = 12,
PROPPATCH = 13,
SEARCH = 14,
UNLOCK = 15,
BIND = 16,
REBIND = 17,
UNBIND = 18,
ACL = 19,
REPORT = 20,
MKACTIVITY = 21,
CHECKOUT = 22,
MERGE = 23,
MSEARCH = 24,
NOTIFY = 25,
SUBSCRIBE = 26,
UNSUBSCRIBE = 27,
PATCH = 28,
PURGE = 29,
MKCALENDAR = 30,
LINK = 31,
UNLINK = 32,
SOURCE = 33;
static String methodToString(int method) {
switch (method) {
case DELETE:
return 'DELETE';
case GET:
return 'GET';
case HEAD:
return 'HEAD';
case POST:
return 'POST';
case PUT:
return 'PUT';
case CONNECT:
return 'CONNECT';
case OPTIONS:
return 'OPTIONS';
case PATCH:
return 'PATCH';
case PURGE:
return 'PURGE';
default:
throw new ArgumentError('Unknown method $method.');
}
}
static Future<WingsRequestContext> from(Angel app, WingsClientSocket socket) {
var state = _ParseState.url;
var c = Completer<WingsRequestContext>();
var recv = RawReceivePort();
var rq = WingsRequestContext._(app, socket, recv);
rq._remoteAddress = socket.remoteAddress;
String lastHeader;
recv.handler = (e) {
if (state == _ParseState.url) {
rq._uri = Uri.parse(e as String);
var path = rq._uri.path.replaceAll(_straySlashes, '');
if (path.isEmpty) path = '/';
rq._path = path;
state = _ParseState.headerField;
} else if (state == _ParseState.headerField) {
if (e == 0) {
state = _ParseState.method;
} else {
lastHeader = e as String;
state = _ParseState.headerValue;
}
} else if (state == _ParseState.headerValue) {
if (e == 0) {
state = _ParseState.method;
} else {
if (lastHeader != null) {
if (lastHeader == 'cookie') {
rq.__cookies.add(Cookie.fromSetCookieValue(e as String));
} else {
rq._headers.add(lastHeader, e as String);
}
lastHeader = null;
}
}
state = _ParseState.headerField;
} else if (state == _ParseState.method) {
rq._method = methodToString(e as int);
state = _ParseState.body;
c.complete(rq);
} else if (state == _ParseState.body) {
if (e == 1) {
rq._body.close();
} else {
rq._body.add(e as List<int>);
}
}
};
wingsParseHttp().send([recv.sendPort, socket.fileDescriptor]);
return c.future;
}
@override @override
// TODO: implement cookies Stream<List<int>> get body => _body.stream;
List<Cookie> get cookies => null;
@override @override
// TODO: implement headers List<Cookie> get cookies => _cookies ??= List.unmodifiable(__cookies);
HttpHeaders get headers => null;
@override @override
// TODO: implement hostname HttpHeaders get headers => _headers;
String get hostname => null;
@override @override
// TODO: implement method String get hostname => headers.value('host');
String get method => null;
@override @override
// TODO: implement originalMethod String get method => _override ??=
String get originalMethod => null; (headers.value('x-http-method-override')?.toUpperCase() ?? _method);
@override @override
// TODO: implement path String get originalMethod => _method;
String get path => null;
@override @override
// TODO: implement rawRequest String get path => _path;
int get rawRequest => null;
@override @override
// TODO: implement remoteAddress InternetAddress get remoteAddress => _remoteAddress;
InternetAddress get remoteAddress => null;
@override @override
// TODO: implement session // TODO: implement session
HttpSession get session => null; HttpSession get session => null;
@override @override
// TODO: implement uri Uri get uri => _uri;
Uri get uri => null;
} }

View file

@ -1,50 +1,213 @@
import 'dart:async'; import 'dart:async';
import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'dart:typed_data';
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:charcode/ascii.dart';
import 'wings_request.dart';
import 'wings_socket.dart';
class WingsResponseContext extends ResponseContext { class WingsResponseContext extends ResponseContext<int> {
@override @override
void add(List<int> event) { final WingsRequestContext correspondingRequest;
// TODO: implement add
LockableBytesBuilder _buffer;
@override
final int rawResponse;
bool _isDetached = false, _isClosed = false, _streamInitialized = false;
WingsResponseContext(this.rawResponse, [this.correspondingRequest]);
Iterable<String> __allowedEncodings;
Iterable<String> get _allowedEncodings {
return __allowedEncodings ??= correspondingRequest.headers
.value('accept-encoding')
?.split(',')
?.map((s) => s.trim())
?.where((s) => s.isNotEmpty)
?.map((str) {
// Ignore quality specifications in accept-encoding
// ex. gzip;q=0.8
if (!str.contains(';')) return str;
return str.split(';')[0];
});
}
bool _openStream() {
if (!_streamInitialized) {
// If this is the first stream added to this response,
// then add headers, status code, etc.
var outHeaders = <String, String>{};
var statusLine =
utf8.encode('HTTP/1.1 $statusCode').followedBy([$cr, $lf]);
writeToNativeSocket(rawResponse, Uint8List.fromList(statusLine.toList()));
headers.forEach((k, v) => outHeaders[k] = v);
if (headers.containsKey('content-length')) {
var l = int.tryParse(headers['content-length']);
if (l != null) {
outHeaders['content-length'] = l.toString();
}
}
if (contentType != null)
outHeaders['content-type'] = contentType.toString();
if (encoders.isNotEmpty && correspondingRequest != null) {
if (_allowedEncodings != null) {
for (var encodingName in _allowedEncodings) {
Converter<List<int>, List<int>> encoder;
String key = encodingName;
if (encoders.containsKey(encodingName))
encoder = encoders[encodingName];
else if (encodingName == '*') {
encoder = encoders[key = encoders.keys.first];
}
if (encoder != null) {
outHeaders['content-encoding'] = key;
break;
}
}
}
}
void _wh(String k, String v) {
var headerLine =
utf8.encode('$k: ${Uri.encodeComponent(v)}').followedBy([$cr, $lf]);
writeToNativeSocket(
rawResponse, Uint8List.fromList(headerLine.toList()));
}
outHeaders.forEach(_wh);
for (var c in cookies) {
_wh('set-cookie', c.toString());
}
writeToNativeSocket(rawResponse, Uint8List.fromList([$cr, $lf]));
//_isClosed = true;
return _streamInitialized = true;
}
return false;
} }
@override @override
Future addStream(Stream<List<int>> stream) { Future addStream(Stream<List<int>> stream) {
// TODO: implement addStream if (_isClosed && isBuffered) throw ResponseContext.closed();
return null; _openStream();
Stream<List<int>> output = stream;
if (encoders.isNotEmpty && correspondingRequest != null) {
if (_allowedEncodings != null) {
for (var encodingName in _allowedEncodings) {
Converter<List<int>, List<int>> encoder;
String key = encodingName;
if (encoders.containsKey(encodingName))
encoder = encoders[encodingName];
else if (encodingName == '*') {
encoder = encoders[key = encoders.keys.first];
}
if (encoder != null) {
output = encoders[key].bind(output);
break;
}
}
}
}
return output.forEach((buf) {
writeToNativeSocket(
rawResponse, buf is Uint8List ? buf : Uint8List.fromList(buf));
});
} }
@override @override
// TODO: implement buffer void add(List<int> data) {
BytesBuilder get buffer => null; if (_isClosed && isBuffered)
throw ResponseContext.closed();
else if (!isBuffered) {
if (!_isClosed) {
_openStream();
@override if (encoders.isNotEmpty && correspondingRequest != null) {
// TODO: implement correspondingRequest if (_allowedEncodings != null) {
RequestContext get correspondingRequest => null; for (var encodingName in _allowedEncodings) {
Converter<List<int>, List<int>> encoder;
String key = encodingName;
@override if (encoders.containsKey(encodingName))
FutureOr detach() { encoder = encoders[encodingName];
// TODO: implement detach else if (encodingName == '*') {
return null; encoder = encoders[key = encoders.keys.first];
}
if (encoder != null) {
data = encoders[key].convert(data);
break;
}
}
}
}
writeToNativeSocket(
rawResponse, data is Uint8List ? data : Uint8List.fromList(data));
}
} else
buffer.add(data);
} }
@override @override
// TODO: implement isBuffered Future close() {
bool get isBuffered => null; if (!_isDetached) {
if (!_isClosed) {
if (!isBuffered) {
try {
_openStream();
closeNativeSocketDescriptor(rawResponse);
} catch (_) {
// This only seems to occur on `MockHttpRequest`, but
// this try/catch prevents a crash.
}
} else {
_buffer.lock();
}
_isClosed = true;
}
super.close();
}
return new Future.value();
}
@override @override
// TODO: implement isOpen BytesBuilder get buffer => _buffer;
bool get isOpen => null;
@override @override
// TODO: implement rawResponse int detach() {
get rawResponse => null; _isDetached = true;
return rawResponse;
}
@override
bool get isBuffered => _buffer != null;
@override
bool get isOpen => !_isClosed && !_isDetached;
@override @override
void useBuffer() { void useBuffer() {
// TODO: implement useBuffer _buffer = LockableBytesBuilder();
} }
} }

View file

@ -46,7 +46,7 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id,
WingsSocket *socket = nullptr; WingsSocket *socket = nullptr;
Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id;
Dart_CObject* ptrArg = message->value.as_array.values[1]; Dart_CObject *ptrArg = message->value.as_array.values[1];
if (ptrArg->type == Dart_CObject_kInt32) if (ptrArg->type == Dart_CObject_kInt32)
{ {
@ -67,9 +67,34 @@ void WingsSocket::threadCallback(Dart_Port dest_port_id,
if ((sock = accept(socket->sockfd, &addr, &len)) != -1) if ((sock = accept(socket->sockfd, &addr, &len)) != -1)
{ {
char addrBuf[INET6_ADDRSTRLEN] = {0};
if (addr.sa_family == AF_INET6)
{
auto as6 = (sockaddr_in6*) &addr;
inet_ntop(addr.sa_family, &(as6->sin6_addr), addrBuf, len);
}
else
{
auto as4 = (sockaddr_in*) &addr;
inet_ntop(AF_INET, &(as4->sin_addr), addrBuf, len);
}
Dart_CObject fdObj;
fdObj.type = Dart_CObject_kInt64;
fdObj.value.as_int64 = sock;
Dart_CObject addrObj;
addrObj.type = Dart_CObject_kString;
addrObj.value.as_string = addrBuf;
Dart_CObject *values[2] = {&fdObj, &addrObj};
Dart_CObject obj; Dart_CObject obj;
obj.type = Dart_CObject_kInt64; obj.type = Dart_CObject_kArray;
obj.value.as_int64 = sock; obj.value.as_array.length = 2;
obj.value.as_array.values = values;
Dart_PostCObject(outPort, &obj); Dart_PostCObject(outPort, &obj);
// Dispatch the fd to the next listener. // Dispatch the fd to the next listener.
// auto &ports = socket->sendPorts; // auto &ports = socket->sendPorts;

View file

@ -32,8 +32,17 @@ SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen';
void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close'; void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close';
class WingsSocket extends Stream<int> { SendPort wingsParseHttp() native 'Dart_WingsSocket_parseHttp';
final StreamController<int> _ctrl = StreamController();
class WingsClientSocket {
final int fileDescriptor;
final InternetAddress remoteAddress;
WingsClientSocket(this.fileDescriptor, this.remoteAddress);
}
class WingsSocket extends Stream<WingsClientSocket> {
final StreamController<WingsClientSocket> _ctrl = StreamController();
SendPort _acceptor; SendPort _acceptor;
final int _pointer; final int _pointer;
final RawReceivePort _recv; final RawReceivePort _recv;
@ -44,7 +53,8 @@ class WingsSocket extends Stream<int> {
_acceptor = wingsSocketListen(_pointer); _acceptor = wingsSocketListen(_pointer);
_recv.handler = (h) { _recv.handler = (h) {
if (!_ctrl.isClosed) { if (!_ctrl.isClosed) {
_ctrl.add(h as int); _ctrl.add(
WingsClientSocket(h[0] as int, InternetAddress(h[1] as String)));
_acceptor.send([_recv.sendPort, _pointer]); _acceptor.send([_recv.sendPort, _pointer]);
} }
}; };
@ -91,8 +101,11 @@ class WingsSocket extends Stream<int> {
int get port => _port ??= getWingsServerSocketPort(_pointer); int get port => _port ??= getWingsServerSocketPort(_pointer);
@override @override
StreamSubscription<int> listen(void Function(int event) onData, StreamSubscription<WingsClientSocket> listen(
{Function onError, void Function() onDone, bool cancelOnError}) { void Function(WingsClientSocket event) onData,
{Function onError,
void Function() onDone,
bool cancelOnError}) {
return _ctrl.stream return _ctrl.stream
.listen(onData, onError: onError, cancelOnError: cancelOnError); .listen(onData, onError: onError, cancelOnError: cancelOnError);
} }