From 71d8a07513aa39c7917b496671a70f6f396bb7cb Mon Sep 17 00:00:00 2001 From: Tobe O Date: Mon, 29 Apr 2019 01:55:02 -0400 Subject: [PATCH] Solid listening plan --- Makefile | 2 +- example/socket.dart | 6 ++++- lib/src/angel_wings.cc | 2 ++ lib/src/angel_wings.h | 1 + lib/src/bind.cc | 6 ----- lib/src/util.cc | 17 ++++++++++-- lib/src/wings_socket.cc | 56 +++++++++++++++++++++++++++++++++++++++ lib/src/wings_socket.dart | 7 +++++ lib/src/wings_socket.h | 46 ++++++++++++++++++++------------ 9 files changed, 116 insertions(+), 27 deletions(-) diff --git a/Makefile b/Makefile index c7628414..6719871c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -CXXFLAGS := $(CXXFLAGS) --std=c++11 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include +CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include objects := lib/src/angel_wings.o lib/src/wings_socket.o\ lib/src/bind.o lib/src/util.o diff --git a/example/socket.dart b/example/socket.dart index 79d957a4..1bc5be84 100644 --- a/example/socket.dart +++ b/example/socket.dart @@ -2,5 +2,9 @@ import 'package:angel_wings/angel_wings.dart'; main() async { var socket = await WingsSocket.bind('127.0.0.1', 3000); - print([socket.port]); + + await for (var fd in socket) { + print('FD: $fd'); + closeNativeSocketDescriptor(fd); + } } diff --git a/lib/src/angel_wings.cc b/lib/src/angel_wings.cc index 03731301..497553f6 100644 --- a/lib/src/angel_wings.cc +++ b/lib/src/angel_wings.cc @@ -47,5 +47,7 @@ Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_sco result = Dart_WingsSocket_closeDescriptor; if (strcmp("Dart_WingsSocket_close", cname) == 0) result = Dart_WingsSocket_close; + if (strcmp("Dart_WingsSocket_listen", cname) == 0) + result = Dart_WingsSocket_listen; return result; } \ No newline at end of file diff --git a/lib/src/angel_wings.h b/lib/src/angel_wings.h index 7da7dc20..51ef882d 100644 --- a/lib/src/angel_wings.h +++ b/lib/src/angel_wings.h @@ -13,5 +13,6 @@ void Dart_WingsSocket_getPort(Dart_NativeArguments arguments); void Dart_WingsSocket_write(Dart_NativeArguments arguments); void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments); void Dart_WingsSocket_close(Dart_NativeArguments arguments); +void Dart_WingsSocket_listen(Dart_NativeArguments arguments); #endif \ No newline at end of file diff --git a/lib/src/bind.cc b/lib/src/bind.cc index 757cdb63..dcd4c686 100644 --- a/lib/src/bind.cc +++ b/lib/src/bind.cc @@ -1,9 +1,3 @@ -#include -#include -#include -#include -#include -#include #include "angel_wings.h" #include "wings_socket.h" using namespace wings; diff --git a/lib/src/util.cc b/lib/src/util.cc index 854003cc..e627b1c5 100644 --- a/lib/src/util.cc +++ b/lib/src/util.cc @@ -2,13 +2,23 @@ #include "wings_socket.h" using namespace wings; +void Dart_WingsSocket_listen(Dart_NativeArguments arguments) +{ + uint64_t ptr; + Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0); + HandleError(Dart_IntegerToUint64(pointerHandle, &ptr)); + + auto *socket = (WingsSocket *)ptr; + socket->start(arguments); +} + void Dart_WingsSocket_getPort(Dart_NativeArguments arguments) { uint64_t ptr; Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0); HandleError(Dart_IntegerToUint64(pointerHandle, &ptr)); - auto* socket = (WingsSocket*) ptr; + auto *socket = (WingsSocket *)ptr; auto outHandle = Dart_NewIntegerFromUint64(socket->getInfo().port); Dart_SetReturnValue(arguments, outHandle); } @@ -20,7 +30,10 @@ void Dart_WingsSocket_write(Dart_NativeArguments arguments) void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments) { - // TODO: Actually do something. + int64_t fd; + Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0); + HandleError(Dart_IntegerToInt64(fdHandle, &fd)); + close(fd); } void Dart_WingsSocket_close(Dart_NativeArguments arguments) diff --git a/lib/src/wings_socket.cc b/lib/src/wings_socket.cc index 73f04f2a..bd532b1b 100644 --- a/lib/src/wings_socket.cc +++ b/lib/src/wings_socket.cc @@ -1,4 +1,5 @@ #include +#include #include "wings_socket.h" using namespace wings; @@ -13,6 +14,7 @@ bool WingsSocketInfo::operator==(const WingsSocketInfo &other) const WingsSocket::WingsSocket(int sockfd, const WingsSocketInfo &info) : sockfd(sockfd), info(info) { refCount = 0; + workerThread = nullptr; } void WingsSocket::incrRef(Dart_Port port) @@ -24,4 +26,58 @@ void WingsSocket::incrRef(Dart_Port port) const WingsSocketInfo &WingsSocket::getInfo() const { return info; +} + +void WingsSocket::start(Dart_NativeArguments arguments) +{ + // if (workerThread == nullptr) + // { + // workerThread = std::make_unique(threadCallback, this); + // } + Dart_Port service_port = + Dart_NewNativePort("WingsThreadCallback", &threadCallback, true); + Dart_Handle send_port = Dart_NewSendPort(service_port); + Dart_SetReturnValue(arguments, send_port); +} + +void WingsSocket::threadCallback(Dart_Port dest_port_id, + Dart_CObject *message) +{ + + WingsSocket *socket = nullptr; + Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id; + Dart_CObject* ptrArg = message->value.as_array.values[1]; + + if (ptrArg->type == Dart_CObject_kInt32) + { + auto as64 = (int64_t)ptrArg->value.as_int32; + socket = (WingsSocket *)as64; + } + else + { + socket = (WingsSocket *)ptrArg->value.as_int64; + } + + if (socket != nullptr) + { + int sock; + unsigned long index = 0; + sockaddr addr; + socklen_t len; + + if ((sock = accept(socket->sockfd, &addr, &len)) != -1) + { + Dart_CObject obj; + obj.type = Dart_CObject_kInt64; + obj.value.as_int64 = sock; + Dart_PostCObject(outPort, &obj); + // Dispatch the fd to the next listener. + // auto &ports = socket->sendPorts; + // Dart_Port port = ports.at(index++); + // if (index >= ports.size()) + // index = 0; + // Dart_Handle intHandle = Dart_NewInteger(sock); + // Dart_Post(port, intHandle); + } + } } \ No newline at end of file diff --git a/lib/src/wings_socket.dart b/lib/src/wings_socket.dart index 193c0586..e5f29d93 100644 --- a/lib/src/wings_socket.dart +++ b/lib/src/wings_socket.dart @@ -28,21 +28,28 @@ void writeToNativeSocket(int fd, Uint8List data) void closeNativeSocketDescriptor(int fd) native 'Dart_WingsSocket_closeDescriptor'; +SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen'; + void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close'; class WingsSocket extends Stream { final StreamController _ctrl = StreamController(); + SendPort _acceptor; final int _pointer; final RawReceivePort _recv; bool _open = true; int _port; WingsSocket._(this._pointer, this._recv) { + _acceptor = wingsSocketListen(_pointer); _recv.handler = (h) { if (!_ctrl.isClosed) { _ctrl.add(h as int); + _acceptor.send([_recv.sendPort, _pointer]); } }; + + _acceptor.send([_recv.sendPort, _pointer]); } static Future bind(address, int port, diff --git a/lib/src/wings_socket.h b/lib/src/wings_socket.h index 9a257763..a14cddf4 100644 --- a/lib/src/wings_socket.h +++ b/lib/src/wings_socket.h @@ -1,36 +1,48 @@ #ifndef WINGS_SOCKET_H #define WINGS_SOCKET_H +#include +#include +#include +#include +#include +#include #include +#include +#include +#include #include namespace wings { struct WingsSocketInfo { - const char *address; - uint64_t port; - bool shared; - uint64_t backlog; - bool v6Only; - Dart_Handle sendPortHandle; - bool operator==(const WingsSocketInfo& other) const; + const char *address; + uint64_t port; + bool shared; + uint64_t backlog; + bool v6Only; + Dart_Handle sendPortHandle; + bool operator==(const WingsSocketInfo &other) const; }; class WingsSocket { - public: - explicit WingsSocket(int sockfd, const WingsSocketInfo& info); - void incrRef(Dart_Port port); - const WingsSocketInfo& getInfo() const; +public: + explicit WingsSocket(int sockfd, const WingsSocketInfo &info); + void incrRef(Dart_Port port); + const WingsSocketInfo &getInfo() const; + void start(Dart_NativeArguments arguments); - private: - WingsSocketInfo info; - int sockfd; - int refCount; - std::vector sendPorts; +private: + static void threadCallback(Dart_Port dest_port_id, Dart_CObject *message); + WingsSocketInfo info; + int sockfd; + int refCount; + std::unique_ptr workerThread; + std::vector sendPorts; }; -extern std::vector globalSocketList; +extern std::vector globalSocketList; } // namespace wings #endif \ No newline at end of file