Redesigned it. Still needs a LOT of work, but now there is a clear direction.

This commit is contained in:
regiostech 2016-07-05 21:28:00 -04:00
parent 098411def7
commit 5be1b8b208
11 changed files with 384 additions and 109 deletions

View file

@ -9,10 +9,17 @@
</list> </list>
</value> </value>
</entry> </entry>
<entry key="angel_client">
<value>
<list>
<option value="$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_client-1.0.0-dev+3/lib" />
</list>
</value>
</entry>
<entry key="angel_framework"> <entry key="angel_framework">
<value> <value>
<list> <list>
<option value="$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_framework-1.0.0-dev+4pub/lib" /> <option value="$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_framework-1.0.0-dev.14/lib" />
</list> </list>
</value> </value>
</entry> </entry>
@ -321,7 +328,8 @@
</properties> </properties>
<CLASSES> <CLASSES>
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/analyzer-0.27.2/lib" /> <root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/analyzer-0.27.2/lib" />
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_framework-1.0.0-dev+4pub/lib" /> <root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_client-1.0.0-dev+3/lib" />
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/angel_framework-1.0.0-dev.14/lib" />
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/args-0.13.4+2/lib" /> <root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/args-0.13.4+2/lib" />
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/async-1.11.0/lib" /> <root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/async-1.11.0/lib" />
<root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/barback-0.15.2+8/lib" /> <root url="file://$USER_HOME$/AppData/Roaming/Pub/Cache/hosted/pub.dartlang.org/barback-0.15.2+8/lib" />

View file

@ -0,0 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="All Tests" type="DartTestRunConfigurationType" factoryName="Dart Test" singleton="true">
<option name="filePath" value="$PROJECT_DIR$/test/all_tests.dart" />
<method />
</configuration>
</component>

View file

@ -0,0 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Server Tests" type="DartTestRunConfigurationType" factoryName="Dart Test" singleton="true">
<option name="filePath" value="$PROJECT_DIR$/test/server.dart" />
<method />
</configuration>
</component>

View file

@ -1,11 +1,10 @@
library angel_websocket; library angel_websocket;
class WebSocketEvent { class WebSocketEvent {
String id;
String eventName; String eventName;
var data; var data;
WebSocketEvent({String this.id, String this.eventName, this.data}); WebSocketEvent({String this.eventName, this.data});
} }
class WebSocketAction { class WebSocketAction {
@ -13,4 +12,6 @@ class WebSocketAction {
String eventName; String eventName;
var data; var data;
var params; var params;
WebSocketAction({String this.id, String this.eventName, this.data, this.params});
} }

View file

@ -2,121 +2,167 @@ library angel_websocket.server;
import 'dart:async'; import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'dart:mirrors';
import 'package:angel_framework/angel_framework.dart'; import 'package:angel_framework/angel_framework.dart';
import 'package:json_god/json_god.dart' as god; import 'package:json_god/json_god.dart' as god;
import 'package:merge_map/merge_map.dart';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import 'angel_websocket.dart'; import 'angel_websocket.dart';
typedef Future<bool> WebSocketFilter(WebsocketContext context); part 'websocket_context.dart';
List<WebsocketContext> _clients = []; final AngelWebSocket websocket = new AngelWebSocket("/ws");
Uuid _uuid = new Uuid();
class WebsocketContext { class Realtime {
WebSocket socket; const Realtime();
RequestContext request;
ResponseContext response;
WebsocketContext(WebSocket this.socket, RequestContext this.request,
ResponseContext this.response);
} }
_broadcast(WebSocketEvent event) { class AngelWebSocket {
String json = god.serialize(event); Angel _app;
_clients.forEach((WebsocketContext client) { List<WebSocket> _clients = [];
client.socket.add(json); List<String> servicesAlreadyWired = [];
}); String endpoint;
}
_onData(Angel app) { AngelWebSocket(String this.endpoint);
return (data) {
try {
WebSocketAction action = god.deserialize(
data, outputType: WebSocketAction);
List<String> split = action.eventName.split("::"); _batchEvent(String path) {
return (HookedServiceEvent e) async {
if (split.length >= 2) { var event = await transformEvent(e);
Service service = app.service(split[0]); event.eventName = "$path::${event.eventName}";
await batchEvent(event);
if (service != null) {
String event = split[1];
if (event == "index") {
}
}
}
} catch (e) {
}
};
}
_onError(e) {
}
class websocket {
static Map<String, WebSocketFilter> filters = {};
call({List<Pattern> endPoints: const['/ws']}) {
return (Angel app) async {
for (Pattern endPoint in endPoints) {
app.all(endPoint, (RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) {
res.write("This endpoint is only accessible via WebSockets.");
res.end();
} else {
res
..willCloseItself = true
..end();
WebSocket socket = await WebSocketTransformer.upgrade(
req.underlyingRequest);
WebsocketContext context = new WebsocketContext(socket, req, res);
_clients.add(context);
socket.listen(_onData(app), onError: _onError, onDone: () {
_clients.remove(context);
});
}
});
app.services.forEach((Pattern path, Service service) {
if (service is HookedService) {
String pathName = (path is RegExp) ? path.pattern : path;
List<HookedServiceEventDispatcher> dispatchers = [
service.afterIndexed,
service.afterCreated,
service.afterRead,
service.afterModified,
service.afterUpdated,
service.afterRemoved
];
for (HookedServiceEventDispatcher dispatcher in dispatchers) {
dispatcher.listen((HookedServiceEvent event) async {
bool canContinue = true;
String filterName = "$pathName::${event.eventName}";
WebSocketFilter filter = filters[filterName];
for (WebsocketContext client in _clients) {
if (filter != null)
canContinue = await filter(client);
}
if (canContinue) {
WebSocketEvent socketEvent = new WebSocketEvent(
id: _uuid.v4(),
eventName: filterName,
data: event.result);
_broadcast(socketEvent);
}
});
}
}
});
}
}; };
} }
Future batchEvent(WebSocketEvent event) async {
// Default implementation will just immediately fire events
_clients.forEach((client) {
client.add(god.serialize(event));
});
}
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::");
if (split.length < 2)
return socket.sendError(new AngelHttpException.BadRequest());
var service = _app.service(split[0]);
if (service == null)
return socket.sendError(new AngelHttpException.NotFound(
message: "No service \"${split[0]}\" exists."));
var eventName = split[1];
var params = mergeMap([
god.deserializeDatum(action.params),
{"provider": Providers.WEBSOCKET}
]);
try {
if (eventName == "index") {
return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED,
await service.index(params));
} else if (eventName == "read") {
return socket.send("${split[0]}::" + HookedServiceEvent.READ,
await service.read(action.id, params));
} else if (eventName == "create") {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.CREATED,
data: await service.create(action.data, params));
} else if (eventName == "modify") {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.MODIFIED,
data: await service.modify(action.id, action.data, params));
} else if (eventName == "update") {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.UPDATED,
data: await service.update(action.id, action.data, params));
} else if (eventName == "remove") {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.REMOVED,
data: await service.remove(action.id, params));
} else {
return socket.sendError(new AngelHttpException.MethodNotAllowed(
message: "Method Not Allowed: \"$eventName\""));
}
} catch (e) {
if (e is AngelHttpException) return socket.sendError(e);
return socket.sendError(new AngelHttpException(e));
}
}
hookupService(Pattern _path, HookedService service) {
String path = _path.toString();
var batch = _batchEvent(path);
service
..afterCreated.listen(batch)
..afterModified.listen(batch)
..afterUpdated.listen(batch)
..afterRemoved.listen(batch);
servicesAlreadyWired.add(path);
}
onData(WebSocketContext socket, data) {
try {
WebSocketAction action =
god.deserialize(data, outputType: WebSocketAction);
if (action.eventName == null ||
action.eventName is! String ||
action.eventName.isEmpty) throw new AngelHttpException.BadRequest();
var event = handleAction(action, socket);
if (event is WebSocketEvent) {
batchEvent(event);
}
} catch (e) {
// Send an error
socket.sendError(new AngelHttpException(e));
}
}
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
return new WebSocketEvent(eventName: event.eventName, data: event.result);
}
wireAllServices(Angel app) {
for (Pattern key in app.services.keys.where((x) {
return !servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
hookupService(key, app.services[key]);
}
}
Future call(Angel app) async {
this._app = app;
// Set up services
wireAllServices(app);
app.onService.listen((_) {
wireAllServices(app);
});
app.get(endpoint, (RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest))
throw new AngelHttpException.BadRequest();
var ws = await WebSocketTransformer.upgrade(req.underlyingRequest);
var socket = new WebSocketContext(ws, req, res);
ws.listen((data) {
onData(socket, data);
}, onDone: () {
_clients.remove(ws);
}, onError: (e) {
_clients.remove(ws);
}, cancelOnError: true);
});
}
} }

120
lib/server_old.dart Normal file
View file

@ -0,0 +1,120 @@
library angel_websocket.server;
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:uuid/uuid.dart';
import 'angel_websocket.dart';
typedef Future<bool> WebSocketFilter(WebsocketContext context);
List<WebsocketContext> _clients = [];
Uuid _uuid = new Uuid();
class WebsocketContext {
WebSocket socket;
RequestContext request;
ResponseContext response;
WebsocketContext(WebSocket this.socket, RequestContext this.request,
ResponseContext this.response);
}
_broadcast(WebSocketEvent event) {
String json = god.serialize(event);
_clients.forEach((WebsocketContext client) {
client.socket.add(json);
});
}
_onData(Angel app) {
return (data) {
try {
WebSocketAction action = god.deserialize(
data, outputType: WebSocketAction);
List<String> split = action.eventName.split("::");
if (split.length >= 2) {
Service service = app.service(split[0]);
if (service != null) {
String event = split[1];
if (event == "index") {
}
}
}
} catch (e) {
}
};
}
_onError(e) {
}
class websocket {
static Map<String, WebSocketFilter> filters = {};
call({List<Pattern> endPoints: const['/ws']}) {
return (Angel app) async {
for (Pattern endPoint in endPoints) {
app.all(endPoint, (RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) {
res.write("This endpoint is only accessible via WebSockets.");
res.end();
} else {
res
..willCloseItself = true
..end();
WebSocket socket = await WebSocketTransformer.upgrade(
req.underlyingRequest);
WebsocketContext context = new WebsocketContext(socket, req, res);
_clients.add(context);
socket.listen(_onData(app), onError: _onError, onDone: () {
_clients.remove(context);
});
}
});
app.services.forEach((Pattern path, Service service) {
if (service is HookedService) {
String pathName = (path is RegExp) ? path.pattern : path;
List<HookedServiceEventDispatcher> dispatchers = [
service.afterIndexed,
service.afterCreated,
service.afterRead,
service.afterModified,
service.afterUpdated,
service.afterRemoved
];
for (HookedServiceEventDispatcher dispatcher in dispatchers) {
dispatcher.listen((HookedServiceEvent event) async {
bool canContinue = true;
String filterName = "$pathName::${event.eventName}";
WebSocketFilter filter = filters[filterName];
for (WebsocketContext client in _clients) {
if (filter != null)
canContinue = await filter(client);
}
if (canContinue) {
WebSocketEvent socketEvent = new WebSocketEvent(eventName: filterName,
data: event.result);
_broadcast(socketEvent);
}
});
}
}
});
}
};
}
}

View file

@ -0,0 +1,17 @@
part of angel_websocket.server;
class WebSocketContext {
WebSocket underlyingSocket;
RequestContext requestContext;
ResponseContext responseContext;
WebSocketContext(WebSocket this.underlyingSocket,
RequestContext this.requestContext, ResponseContext this.responseContext);
send(String eventName, data) {
underlyingSocket.add(
god.serialize(new WebSocketEvent(eventName: eventName, data: data)));
}
sendError(AngelHttpException error) => send("error", error);
}

View file

@ -4,6 +4,7 @@ version: 1.0.0-dev
author: thosakwe <thosakwe@gmail.com> author: thosakwe <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket homepage: https://github.com/angel-dart/angel_websocket
dependencies: dependencies:
angel_client: ">=1.0.0-dev <2.0.0"
angel_framework: ">=1.0.0-dev < 2.0.0" angel_framework: ">=1.0.0-dev < 2.0.0"
json_god: ">=2.0.0-beta <3.0.0" json_god: ">=2.0.0-beta <3.0.0"
jwt: ">=0.1.4 <1.0.0" jwt: ">=0.1.4 <1.0.0"

6
test/all_tests.dart Normal file
View file

@ -0,0 +1,6 @@
import 'package:test/test.dart';
import 'server.dart' as server;
main() async {
group("server", server.main);
}

20
test/common.dart Normal file
View file

@ -0,0 +1,20 @@
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/defs.dart';
class Todo extends MemoryModel {
String text;
String when;
Todo({String this.text, String this.when});
}
Future startTestServer(Angel app) async {
var host = InternetAddress.LOOPBACK_IP_V4;
var port = 3000;
await app.startServer(host, port);
app.properties["ws_url"] = "ws://${host.address}:$port/ws";
print("Test server listening on ${host.address}:$port");
}

44
test/server.dart Normal file
View file

@ -0,0 +1,44 @@
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_websocket/angel_websocket.dart';
import 'package:angel_websocket/server.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:test/test.dart';
import 'common.dart';
main() {
Angel app;
WebSocket socket;
setUp(() async {
app = new Angel();
app.use("/real", new FakeService(), hooked: false);
app.use("/api/todos", new MemoryService<Todo>());
await app.configure(websocket);
await app.configure(startTestServer);
socket = await WebSocket.connect(app.properties["ws_url"]);
});
tearDown(() async {
await app.httpServer.close(force: true);
});
test("find all real-time services", () {
print(websocket.servicesAlreadyWired);
expect(websocket.servicesAlreadyWired, equals(["api/todos"]));
});
test("index", () async {
var action = new WebSocketAction(eventName: "api/todos::index");
socket.add(god.serialize(action));
print(await socket.first);
});
}
@Realtime()
class FakeService extends Service {}