Solid listening plan

This commit is contained in:
Tobe O 2019-04-29 01:55:02 -04:00
parent 7f70420991
commit 71d8a07513
9 changed files with 116 additions and 27 deletions

View file

@ -1,4 +1,4 @@
CXXFLAGS := $(CXXFLAGS) --std=c++11 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include CXXFLAGS := $(CXXFLAGS) --std=c++14 -fPIC -DDART_SHARED_LIB=1 -I $(DART_SDK)/include
objects := lib/src/angel_wings.o lib/src/wings_socket.o\ objects := lib/src/angel_wings.o lib/src/wings_socket.o\
lib/src/bind.o lib/src/util.o lib/src/bind.o lib/src/util.o

View file

@ -2,5 +2,9 @@ import 'package:angel_wings/angel_wings.dart';
main() async { main() async {
var socket = await WingsSocket.bind('127.0.0.1', 3000); var socket = await WingsSocket.bind('127.0.0.1', 3000);
print([socket.port]);
await for (var fd in socket) {
print('FD: $fd');
closeNativeSocketDescriptor(fd);
}
} }

View file

@ -47,5 +47,7 @@ Dart_NativeFunction ResolveName(Dart_Handle name, int argc, bool *auto_setup_sco
result = Dart_WingsSocket_closeDescriptor; result = Dart_WingsSocket_closeDescriptor;
if (strcmp("Dart_WingsSocket_close", cname) == 0) if (strcmp("Dart_WingsSocket_close", cname) == 0)
result = Dart_WingsSocket_close; result = Dart_WingsSocket_close;
if (strcmp("Dart_WingsSocket_listen", cname) == 0)
result = Dart_WingsSocket_listen;
return result; return result;
} }

View file

@ -13,5 +13,6 @@ void Dart_WingsSocket_getPort(Dart_NativeArguments arguments);
void Dart_WingsSocket_write(Dart_NativeArguments arguments); void Dart_WingsSocket_write(Dart_NativeArguments arguments);
void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments); void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments);
void Dart_WingsSocket_close(Dart_NativeArguments arguments); void Dart_WingsSocket_close(Dart_NativeArguments arguments);
void Dart_WingsSocket_listen(Dart_NativeArguments arguments);
#endif #endif

View file

@ -1,9 +1,3 @@
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include "angel_wings.h" #include "angel_wings.h"
#include "wings_socket.h" #include "wings_socket.h"
using namespace wings; using namespace wings;

View file

@ -2,13 +2,23 @@
#include "wings_socket.h" #include "wings_socket.h"
using namespace wings; using namespace wings;
void Dart_WingsSocket_listen(Dart_NativeArguments arguments)
{
uint64_t ptr;
Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0);
HandleError(Dart_IntegerToUint64(pointerHandle, &ptr));
auto *socket = (WingsSocket *)ptr;
socket->start(arguments);
}
void Dart_WingsSocket_getPort(Dart_NativeArguments arguments) void Dart_WingsSocket_getPort(Dart_NativeArguments arguments)
{ {
uint64_t ptr; uint64_t ptr;
Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0); Dart_Handle pointerHandle = Dart_GetNativeArgument(arguments, 0);
HandleError(Dart_IntegerToUint64(pointerHandle, &ptr)); HandleError(Dart_IntegerToUint64(pointerHandle, &ptr));
auto* socket = (WingsSocket*) ptr; auto *socket = (WingsSocket *)ptr;
auto outHandle = Dart_NewIntegerFromUint64(socket->getInfo().port); auto outHandle = Dart_NewIntegerFromUint64(socket->getInfo().port);
Dart_SetReturnValue(arguments, outHandle); Dart_SetReturnValue(arguments, outHandle);
} }
@ -20,7 +30,10 @@ void Dart_WingsSocket_write(Dart_NativeArguments arguments)
void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments) void Dart_WingsSocket_closeDescriptor(Dart_NativeArguments arguments)
{ {
// TODO: Actually do something. int64_t fd;
Dart_Handle fdHandle = Dart_GetNativeArgument(arguments, 0);
HandleError(Dart_IntegerToInt64(fdHandle, &fd));
close(fd);
} }
void Dart_WingsSocket_close(Dart_NativeArguments arguments) void Dart_WingsSocket_close(Dart_NativeArguments arguments)

View file

@ -1,4 +1,5 @@
#include <cstring> #include <cstring>
#include <iostream>
#include "wings_socket.h" #include "wings_socket.h"
using namespace wings; using namespace wings;
@ -13,6 +14,7 @@ bool WingsSocketInfo::operator==(const WingsSocketInfo &other) const
WingsSocket::WingsSocket(int sockfd, const WingsSocketInfo &info) : sockfd(sockfd), info(info) WingsSocket::WingsSocket(int sockfd, const WingsSocketInfo &info) : sockfd(sockfd), info(info)
{ {
refCount = 0; refCount = 0;
workerThread = nullptr;
} }
void WingsSocket::incrRef(Dart_Port port) void WingsSocket::incrRef(Dart_Port port)
@ -25,3 +27,57 @@ const WingsSocketInfo &WingsSocket::getInfo() const
{ {
return info; return info;
} }
void WingsSocket::start(Dart_NativeArguments arguments)
{
// if (workerThread == nullptr)
// {
// workerThread = std::make_unique<std::thread>(threadCallback, this);
// }
Dart_Port service_port =
Dart_NewNativePort("WingsThreadCallback", &threadCallback, true);
Dart_Handle send_port = Dart_NewSendPort(service_port);
Dart_SetReturnValue(arguments, send_port);
}
void WingsSocket::threadCallback(Dart_Port dest_port_id,
Dart_CObject *message)
{
WingsSocket *socket = nullptr;
Dart_Port outPort = message->value.as_array.values[0]->value.as_send_port.id;
Dart_CObject* ptrArg = message->value.as_array.values[1];
if (ptrArg->type == Dart_CObject_kInt32)
{
auto as64 = (int64_t)ptrArg->value.as_int32;
socket = (WingsSocket *)as64;
}
else
{
socket = (WingsSocket *)ptrArg->value.as_int64;
}
if (socket != nullptr)
{
int sock;
unsigned long index = 0;
sockaddr addr;
socklen_t len;
if ((sock = accept(socket->sockfd, &addr, &len)) != -1)
{
Dart_CObject obj;
obj.type = Dart_CObject_kInt64;
obj.value.as_int64 = sock;
Dart_PostCObject(outPort, &obj);
// Dispatch the fd to the next listener.
// auto &ports = socket->sendPorts;
// Dart_Port port = ports.at(index++);
// if (index >= ports.size())
// index = 0;
// Dart_Handle intHandle = Dart_NewInteger(sock);
// Dart_Post(port, intHandle);
}
}
}

View file

@ -28,21 +28,28 @@ void writeToNativeSocket(int fd, Uint8List data)
void closeNativeSocketDescriptor(int fd) void closeNativeSocketDescriptor(int fd)
native 'Dart_WingsSocket_closeDescriptor'; native 'Dart_WingsSocket_closeDescriptor';
SendPort wingsSocketListen(int pointer) native 'Dart_WingsSocket_listen';
void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close'; void closeWingsSocket(int pointer) native 'Dart_WingsSocket_close';
class WingsSocket extends Stream<int> { class WingsSocket extends Stream<int> {
final StreamController<int> _ctrl = StreamController(); final StreamController<int> _ctrl = StreamController();
SendPort _acceptor;
final int _pointer; final int _pointer;
final RawReceivePort _recv; final RawReceivePort _recv;
bool _open = true; bool _open = true;
int _port; int _port;
WingsSocket._(this._pointer, this._recv) { WingsSocket._(this._pointer, this._recv) {
_acceptor = wingsSocketListen(_pointer);
_recv.handler = (h) { _recv.handler = (h) {
if (!_ctrl.isClosed) { if (!_ctrl.isClosed) {
_ctrl.add(h as int); _ctrl.add(h as int);
_acceptor.send([_recv.sendPort, _pointer]);
} }
}; };
_acceptor.send([_recv.sendPort, _pointer]);
} }
static Future<WingsSocket> bind(address, int port, static Future<WingsSocket> bind(address, int port,

View file

@ -1,6 +1,15 @@
#ifndef WINGS_SOCKET_H #ifndef WINGS_SOCKET_H
#define WINGS_SOCKET_H #define WINGS_SOCKET_H
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <dart_api.h> #include <dart_api.h>
#include <dart_native_api.h>
#include <memory>
#include <thread>
#include <vector> #include <vector>
namespace wings namespace wings
@ -13,24 +22,27 @@ struct WingsSocketInfo
uint64_t backlog; uint64_t backlog;
bool v6Only; bool v6Only;
Dart_Handle sendPortHandle; Dart_Handle sendPortHandle;
bool operator==(const WingsSocketInfo& other) const; bool operator==(const WingsSocketInfo &other) const;
}; };
class WingsSocket class WingsSocket
{ {
public: public:
explicit WingsSocket(int sockfd, const WingsSocketInfo& info); explicit WingsSocket(int sockfd, const WingsSocketInfo &info);
void incrRef(Dart_Port port); void incrRef(Dart_Port port);
const WingsSocketInfo& getInfo() const; const WingsSocketInfo &getInfo() const;
void start(Dart_NativeArguments arguments);
private: private:
static void threadCallback(Dart_Port dest_port_id, Dart_CObject *message);
WingsSocketInfo info; WingsSocketInfo info;
int sockfd; int sockfd;
int refCount; int refCount;
std::unique_ptr<std::thread> workerThread;
std::vector<Dart_Port> sendPorts; std::vector<Dart_Port> sendPorts;
}; };
extern std::vector<WingsSocket*> globalSocketList; extern std::vector<WingsSocket *> globalSocketList;
} // namespace wings } // namespace wings
#endif #endif