Client-side 'on' not working :(

This commit is contained in:
thosakwe 2016-09-17 22:53:58 -04:00
parent 876e998445
commit 00b681f04b
5 changed files with 196 additions and 54 deletions

View file

@ -27,21 +27,24 @@ main() async {
**Adding Handlers within a Controller**
`WebSocketController` extends a normal `Controller`, but also listens to WebSockets.
```dart
import 'dart:async';
import "package:angel_framework/angel_framework.dart";
import "package:angel_websocket/server.dart";
@Expose("/")
class MyController extends Controller {
class MyController extends WebSocketController {
@override
Future call(AngelBase app) async {
var ws = app.container.make(AngelWebSocket);
ws.onConnection.listen((WebSocketContext socket) {
socket.on["message"].listen((WebSocketEvent e) {
socket.send("new_message", { "text": e.data["text"] });
});
});
void onConnect(WebSocketContext socket) {
// On connect...
}
// Dependency injection works, too..
@ExposeWs("read_message")
void sendMessage(WebSocketContext socket, Db db) async {
socket.send("found_message", db.collection("messages").findOne(where.id("...")));
}
}
```

View file

@ -12,13 +12,16 @@ class WebSocketClient extends Angel {
WebSocket _socket;
Map<Pattern, List<WebSocketService>> _services = {};
WebSocket get underlyingSocket => _socket;
_WebSocketEventTable on = new _WebSocketEventTable();
WebSocketClient(String wsEndpoint) : super(wsEndpoint);
onData(data) {
onData(data) async {
var fromJson = JSON.decode(data);
print("a: $fromJson");
var e = new WebSocketEvent(
eventName: fromJson['eventName'], data: fromJson['data']);
print("b: $e");
var split = e.eventName.split("::");
var serviceName = split[0];
var services = _services[serviceName];
@ -30,37 +33,41 @@ class WebSocketClient extends Angel {
exc.errors = exc.errors ?? [];
exc.errors.addAll(e.data['errors'] ?? []);
throw exc;
} else if (services != null) {
e.eventName = split[1];
} else {
on._getStreamForEvent(serviceName).add(e.data);
for (WebSocketService service in services) {
service._onAllEvents.add(e);
switch (e.eventName) {
case srv.HookedServiceEvent.INDEXED:
service._onIndexed.add(e);
break;
case srv.HookedServiceEvent.READ:
service._onRead.add(e);
break;
case srv.HookedServiceEvent.CREATED:
service._onCreated.add(e);
break;
case srv.HookedServiceEvent.MODIFIED:
service._onModified.add(e);
break;
case srv.HookedServiceEvent.UPDATED:
service._onUpdated.add(e);
break;
case srv.HookedServiceEvent.REMOVED:
service._onRemoved.add(e);
break;
case "error":
service._onError.add(e);
break;
default:
if (service._on._events.containsKey(e.eventName))
service._on._events[e.eventName].add(e);
break;
if (services != null) {
e.eventName = split[1];
for (WebSocketService service in services) {
service._onAllEvents.add(e);
switch (e.eventName) {
case srv.HookedServiceEvent.INDEXED:
service._onIndexed.add(e);
break;
case srv.HookedServiceEvent.READ:
service._onRead.add(e);
break;
case srv.HookedServiceEvent.CREATED:
service._onCreated.add(e);
break;
case srv.HookedServiceEvent.MODIFIED:
service._onModified.add(e);
break;
case srv.HookedServiceEvent.UPDATED:
service._onUpdated.add(e);
break;
case srv.HookedServiceEvent.REMOVED:
service._onRemoved.add(e);
break;
case "error":
service._onError.add(e);
break;
default:
if (service._on._events.containsKey(e.eventName))
service._on._events[e.eventName].add(e);
break;
}
}
}
}
@ -72,10 +79,7 @@ class WebSocketClient extends Angel {
}
void send(String eventName, data) {
_socket.add(JSON.encode({
"eventName": eventName,
"data": data
}));
_socket.add(JSON.encode({"eventName": eventName, "data": data}));
}
@override
@ -112,7 +116,8 @@ class _WebSocketServiceTransformer
stream.listen((WebSocketEvent e) {
if (_outputType != null && e.eventName != "error")
e.data = god.deserialize(god.serialize(e.data), outputType: _outputType);
e.data =
god.deserialize(god.serialize(e.data), outputType: _outputType);
_stream.add(e);
});
@ -205,3 +210,15 @@ class WebSocketService extends Service {
eventName: "$_path::remove", id: id, params: params)));
}
}
class _WebSocketEventTable {
Map<String, StreamController<Map>> _handlers = {};
StreamController<Map> _getStreamForEvent(eventName) {
if (!_handlers.containsKey(eventName))
_handlers[eventName] = new StreamController<Map>.broadcast();
return _handlers[eventName];
}
Stream<Map> operator [](String key) => _getStreamForEvent(key).stream;
}

View file

@ -3,6 +3,7 @@ library angel_websocket.server;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:mirrors';
import 'package:angel_framework/angel_framework.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:merge_map/merge_map.dart';
@ -10,10 +11,7 @@ import 'angel_websocket.dart';
export 'angel_websocket.dart';
part 'websocket_context.dart';
class Realtime {
const Realtime();
}
part 'websocket_controller.dart';
class AngelWebSocket extends AngelPlugin {
Angel _app;
@ -130,7 +128,7 @@ class AngelWebSocket extends AngelPlugin {
if (fromJson is Map && fromJson.containsKey("eventName")) {
socket._onAll.add(fromJson);
socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson);
socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson["data"]);
}
if (action.eventName.contains("::")) {

View file

@ -0,0 +1,103 @@
part of angel_websocket.server;
class ExposeWs {
final String eventName;
const ExposeWs(this.eventName);
}
class WebSocketController extends Controller {
Map<String, MethodMirror> _handlers = {};
Map<String, Symbol> _handlerSymbols = {};
InstanceMirror _instanceMirror;
AngelWebSocket ws;
WebSocketController():super() {
_instanceMirror = reflect(this);
}
@override
Future call(Angel app) async {
await super.call(app);
ClassMirror classMirror = reflectClass(this.runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) {
InstanceMirror exposeMirror = mirror.metadata.firstWhere(
(mirror) => mirror.reflectee is ExposeWs,
orElse: () => null);
if (exposeMirror != null) {
ExposeWs exposeWs = exposeMirror.reflectee;
_handlers[exposeWs.eventName] = mirror;
_handlerSymbols[exposeWs.eventName] = sym;
}
}
});
AngelWebSocket ws = app.container.make(AngelWebSocket);
ws.onConnection.listen((socket) async {
await onConnect(socket);
socket.onData.listen(onData);
socket.onAll.listen((Map data) async {
await onAllEvents(data);
if (_handlers.containsKey(data["eventName"])) {
var methodMirror = _handlers[data["eventName"]];
try {
// Load parameters, and execute
List args = [];
for (int i = 0; i < methodMirror.parameters.length; i++) {
ParameterMirror parameter = methodMirror.parameters[i];
String name = MirrorSystem.getName(parameter.simpleName);
if (parameter.type.reflectedType == RequestContext ||
name == "req")
args.add(socket.requestContext);
else if (parameter.type.reflectedType == ResponseContext ||
name == "res")
args.add(socket.responseContext);
else if (parameter.type == AngelWebSocket)
args.add(socket);
else {
if (socket.requestContext.params.containsKey(name)) {
args.add(socket.requestContext.params[name]);
} else {
try {
args.add(app.container.make(parameter.type.reflectedType));
continue;
} catch (e) {
throw new AngelHttpException.BadRequest(
message: "Missing parameter '$name'");
}
}
}
}
await _instanceMirror.invoke(_handlerSymbols[data["eventName"]], args);
} catch (e) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else
socket.sendError(new AngelHttpException(e));
}
}
});
});
}
void broadcast(String eventName, data) {
ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data));
}
Future onConnect(WebSocketContext socket) async {}
Future onAllEvents(Map data) async {}
void onData(data) {}
}

View file

@ -12,6 +12,7 @@ main() {
client.WebSocketClient clientApp;
client.WebSocketService clientTodos;
Stream<Map> customEventStream;
Stream<Map> customEventStream2;
WebSocket socket;
AngelWebSocket webSocket = new AngelWebSocket("/ws");
@ -36,11 +37,13 @@ main() {
customEventStream = socket.on["custom"];
});
});
await app.configure(new Custom2Controller());
await app.configure(startTestServer);
socket = await WebSocket.connect(app.properties["ws_url"]);
clientApp = new client.WebSocketClient(app.properties["ws_url"]);
await clientApp.connect();
customEventStream2 = clientApp.on["custom2"];
clientTodos = clientApp.service("api/todos", type: Todo);
});
@ -61,8 +64,7 @@ main() {
String json = await socket.first;
print(json);
WebSocketEvent e =
god.deserialize(json, outputType: WebSocketEvent);
WebSocketEvent e = god.deserialize(json, outputType: WebSocketEvent);
expect(e.eventName, equals("api/todos::indexed"));
expect(e.data[0]["when"], equals("now"));
});
@ -87,10 +89,29 @@ main() {
var data = await customEventStream.first;
expect(data["eventName"], equals("custom"));
expect(data["data"]["hello"], equals("world"));
expect(data["hello"], equals("world"));
});
test("custom event via ws controller", () async {
clientApp.send("custom2", {"hello": "world"});
var data = customEventStream2.first;
print("Received data from server: $data");
});
}
@Realtime()
class FakeService extends server.Service {}
@server.Expose("/custom2")
class Custom2Controller extends WebSocketController {
@override
Future onConnect(WebSocketContext socket) async {
print(
"Got a WS connection from session #${socket.requestContext.session.id}!");
}
@ExposeWs("custom2")
void sayFoo(WebSocketContext socket, server.RequestContext req, AngelWebSocket ws) {
socket.send("custom2", {"franken": "stein"});
}
}