diff --git a/.gitignore b/.gitignore
index 7c280441..ea89ccf0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,4 @@ doc/api/
# Don't commit pubspec lock file
# (Library packages only! Remove pattern if developing an application package)
pubspec.lock
+.idea
\ No newline at end of file
diff --git a/.idea/libraries/Dart_Packages.xml b/.idea/libraries/Dart_Packages.xml
index edbf2248..acaad618 100644
--- a/.idea/libraries/Dart_Packages.xml
+++ b/.idea/libraries/Dart_Packages.xml
@@ -5,374 +5,374 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 00000000..7b1a505f
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,508 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ project
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ project
+
+
+ true
+
+ bdd
+
+ DIRECTORY
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1467772535835
+
+
+ 1467772535835
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/lib/client.dart b/lib/client.dart
new file mode 100644
index 00000000..a5b74af9
--- /dev/null
+++ b/lib/client.dart
@@ -0,0 +1,197 @@
+import 'dart:async';
+import 'dart:convert';
+import 'dart:io';
+import 'package:angel_client/angel_client.dart';
+import 'package:angel_framework/angel_framework.dart' as srv;
+import 'package:angel_websocket/angel_websocket.dart';
+import 'package:json_god/json_god.dart' as god;
+
+class WebSocketClient extends Angel {
+ WebSocket _socket;
+ Map> _services = {};
+
+ WebSocketClient(String wsEndpoint) : super(wsEndpoint);
+
+ onData(data) {
+ var fromJson = JSON.decode(data);
+ var e = new WebSocketEvent(
+ eventName: fromJson['eventName'], data: fromJson['data']);
+ var split = e.eventName.split("::");
+ var serviceName = split[0];
+ var services = _services[serviceName];
+
+ if (serviceName == "error") {
+ var exc = new srv.AngelHttpException(new Exception("Server-side error."));
+ exc.statusCode = e.data['statusCode'];
+ exc.message = e.data['message'];
+ exc.errors = exc.errors ?? [];
+ exc.errors.addAll(e.data['errors'] ?? []);
+ throw exc;
+ } else if (services != null) {
+ e.eventName = split[1];
+
+ for (WebSocketService service in services) {
+ service._onAllEvents.add(e);
+ switch (e.eventName) {
+ case srv.HookedServiceEvent.INDEXED:
+ service._onIndexed.add(e);
+ break;
+ case srv.HookedServiceEvent.READ:
+ service._onRead.add(e);
+ break;
+ case srv.HookedServiceEvent.CREATED:
+ service._onCreated.add(e);
+ break;
+ case srv.HookedServiceEvent.MODIFIED:
+ service._onModified.add(e);
+ break;
+ case srv.HookedServiceEvent.UPDATED:
+ service._onUpdated.add(e);
+ break;
+ case srv.HookedServiceEvent.REMOVED:
+ service._onRemoved.add(e);
+ break;
+ case "error":
+ service._onError.add(e);
+ break;
+ default:
+ if (service._on._events.containsKey(e.eventName))
+ service._on._events[e.eventName].add(e);
+ break;
+ }
+ }
+ }
+ }
+
+ Future connect() async {
+ _socket = await WebSocket.connect(basePath);
+ _socket.listen(onData);
+ }
+
+ @override
+ Service service(Pattern path, {Type type}) {
+ var service =
+ new WebSocketService._base(path.toString(), this, _socket, type);
+ if (_services[path.toString()] == null) _services[path.toString()] = [];
+
+ _services[path.toString()].add(service);
+ return service;
+ }
+}
+
+class WebSocketExtraneousEventHandler {
+ Map> _events = {};
+
+ operator [](String index) {
+ if (_events[index] == null)
+ _events[index] = new StreamController();
+
+ return _events[index].stream;
+ }
+}
+
+class _WebSocketServiceTransformer
+ implements StreamTransformer {
+ Type _outputType;
+
+ _WebSocketServiceTransformer.base(this._outputType);
+
+ @override
+ Stream bind(Stream stream) {
+ var _stream = new StreamController();
+
+ stream.listen((WebSocketEvent e) {
+ if (_outputType != null && e.eventName != "error")
+ e.data = god.deserialize(god.serialize(e.data), outputType: _outputType);
+ _stream.add(e);
+ });
+
+ return _stream.stream;
+ }
+}
+
+class WebSocketService extends Service {
+ Type _outputType;
+ String _path;
+ _WebSocketServiceTransformer _transformer;
+ WebSocket connection;
+
+ WebSocketExtraneousEventHandler _on = new WebSocketExtraneousEventHandler();
+ var _onAllEvents = new StreamController();
+ var _onError = new StreamController();
+ var _onIndexed = new StreamController();
+ var _onRead = new StreamController();
+ var _onCreated = new StreamController();
+ var _onModified = new StreamController();
+ var _onUpdated = new StreamController();
+ var _onRemoved = new StreamController();
+
+ WebSocketExtraneousEventHandler get on => _on;
+
+ Stream get onAllEvents =>
+ _onAllEvents.stream.transform(_transformer);
+
+ Stream get onError => _onError.stream;
+
+ Stream get onIndexed =>
+ _onIndexed.stream.transform(_transformer);
+
+ Stream get onRead => _onRead.stream.transform(_transformer);
+
+ Stream get onCreated =>
+ _onCreated.stream.transform(_transformer);
+
+ Stream get onModified =>
+ _onModified.stream.transform(_transformer);
+
+ Stream get onUpdated =>
+ _onUpdated.stream.transform(_transformer);
+
+ Stream get onRemoved =>
+ _onRemoved.stream.transform(_transformer);
+
+ WebSocketService._base(
+ String path, Angel app, WebSocket this.connection, Type _outputType) {
+ this._path = path;
+ this.app = app;
+ this._outputType = _outputType;
+ _transformer = new _WebSocketServiceTransformer.base(this._outputType);
+ }
+
+ @override
+ Future index([Map params]) async {
+ connection.add(god.serialize(
+ new WebSocketAction(eventName: "$_path::index", params: params)));
+ return null;
+ }
+
+ @override
+ Future read(id, [Map params]) async {
+ connection.add(god.serialize(new WebSocketAction(
+ eventName: "$_path::read", id: id, params: params)));
+ }
+
+ @override
+ Future create(data, [Map params]) async {
+ connection.add(god.serialize(new WebSocketAction(
+ eventName: "$_path::create", data: data, params: params)));
+ }
+
+ @override
+ Future modify(id, data, [Map params]) async {
+ connection.add(god.serialize(new WebSocketAction(
+ eventName: "$_path::modify", id: id, data: data, params: params)));
+ }
+
+ @override
+ Future update(id, data, [Map params]) async {
+ connection.add(god.serialize(new WebSocketAction(
+ eventName: "$_path::update", id: id, data: data, params: params)));
+ }
+
+ @override
+ Future remove(id, [Map params]) async {
+ connection.add(god.serialize(new WebSocketAction(
+ eventName: "$_path::remove", id: id, params: params)));
+ }
+}
diff --git a/lib/server.dart b/lib/server.dart
index d0dbbaaf..43c306e9 100644
--- a/lib/server.dart
+++ b/lib/server.dart
@@ -1,6 +1,7 @@
library angel_websocket.server;
import 'dart:async';
+import 'dart:convert';
import 'dart:io';
import 'dart:mirrors';
import 'package:angel_framework/angel_framework.dart';
@@ -60,6 +61,7 @@ class AngelWebSocket {
god.deserializeDatum(action.params),
{"provider": Providers.WEBSOCKET}
]);
+
try {
if (eventName == "index") {
return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED,
@@ -107,22 +109,37 @@ class AngelWebSocket {
servicesAlreadyWired.add(path);
}
- onData(WebSocketContext socket, data) {
+ Future onConnect(WebSocketContext socket) async {}
+
+ onData(WebSocketContext socket, data) async {
try {
- WebSocketAction action =
- god.deserialize(data, outputType: WebSocketAction);
+ var fromJson = JSON.decode(data);
+ var action = new WebSocketAction(
+ id: fromJson['id'],
+ eventName: fromJson['eventName'],
+ data: fromJson['data'],
+ params: fromJson['params']);
if (action.eventName == null ||
action.eventName is! String ||
- action.eventName.isEmpty) throw new AngelHttpException.BadRequest();
+ action.eventName.isEmpty) {
+ throw new AngelHttpException.BadRequest();
+ }
var event = handleAction(action, socket);
+ if (event is Future)
+ event = await event;
+
+
if (event is WebSocketEvent) {
batchEvent(event);
}
} catch (e) {
// Send an error
- socket.sendError(new AngelHttpException(e));
+ if (e is AngelHttpException)
+ socket.sendError(e);
+ else
+ socket.sendError(new AngelHttpException(e));
}
}
@@ -154,7 +171,10 @@ class AngelWebSocket {
throw new AngelHttpException.BadRequest();
var ws = await WebSocketTransformer.upgrade(req.underlyingRequest);
+ _clients.add(ws);
+
var socket = new WebSocketContext(ws, req, res);
+ await onConnect(socket);
ws.listen((data) {
onData(socket, data);
diff --git a/test/packages b/test/packages
new file mode 120000
index 00000000..a16c4050
--- /dev/null
+++ b/test/packages
@@ -0,0 +1 @@
+../packages
\ No newline at end of file
diff --git a/test/server.dart b/test/server.dart
index e51fb462..1ff313bd 100644
--- a/test/server.dart
+++ b/test/server.dart
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:io';
-import 'package:angel_framework/angel_framework.dart';
+import 'package:angel_framework/angel_framework.dart' as server;
+import 'package:angel_websocket/client.dart' as client;
import 'package:angel_websocket/angel_websocket.dart';
import 'package:angel_websocket/server.dart';
import 'package:json_god/json_god.dart' as god;
@@ -8,19 +9,28 @@ import 'package:test/test.dart';
import 'common.dart';
main() {
- Angel app;
+ server.Angel app;
+ client.WebSocketClient clientApp;
+ client.WebSocketService clientTodos;
WebSocket socket;
setUp(() async {
- app = new Angel();
+ app = new server.Angel();
app.use("/real", new FakeService(), hooked: false);
- app.use("/api/todos", new MemoryService());
+ app.use("/api/todos", new server.MemoryService());
+ await app
+ .service("api/todos")
+ .create(new Todo(text: "Clean your room", when: "now"));
await app.configure(websocket);
await app.configure(startTestServer);
socket = await WebSocket.connect(app.properties["ws_url"]);
+ clientApp = new client.WebSocketClient(app.properties["ws_url"]);
+ await clientApp.connect();
+
+ clientTodos = clientApp.service("api/todos", type: Todo);
});
tearDown(() async {
@@ -36,9 +46,30 @@ main() {
var action = new WebSocketAction(eventName: "api/todos::index");
socket.add(god.serialize(action));
- print(await socket.first);
+ String json = await socket.first;
+ print(json);
+
+ WebSocketEvent e =
+ god.deserialize(json, outputType: WebSocketEvent);
+ expect(e.eventName, equals("api/todos::indexed"));
+ expect(e.data[0]["when"], equals("now"));
+ });
+
+ test("create", () async {
+ var todo = new Todo(text: "Finish the Angel framework", when: "2016");
+ clientTodos.create(todo);
+
+ var all = await clientTodos.onAllEvents.first;
+ var e = await clientTodos.onCreated.first;
+ print(god.serialize(e));
+
+ expect(all, equals(e));
+ expect(e.eventName, equals("created"));
+ expect(e.data is Todo, equals(true));
+ expect(e.data.text, equals(todo.text));
+ expect(e.data.when, equals(todo.when));
});
}
@Realtime()
-class FakeService extends Service {}
\ No newline at end of file
+class FakeService extends server.Service {}