From 7d1f0a9c65e50c8a80289df879f90ec64f8ccc9c Mon Sep 17 00:00:00 2001 From: thosakwe Date: Fri, 23 Dec 2016 15:57:46 -0500 Subject: [PATCH] Almost --- .anaylsis-options.yaml | 4 + .gitignore | 4 +- .travis.yml | 1 + lib/angel_websocket.dart | 58 ++++++++- lib/base_websocket_client.dart | 228 ++++++++++++++++++++++++++++----- lib/browser.dart | 228 +++++---------------------------- lib/cli.dart | 224 -------------------------------- lib/io.dart | 25 +++- lib/server.dart | 123 +++++++++++------- lib/websocket_context.dart | 40 ++++-- lib/websocket_controller.dart | 81 +++++------- pubspec.yaml | 1 + test/all_tests.dart | 6 - test/common.dart | 20 --- test/controller/common.dart | 17 +++ test/controller/io_test.dart | 62 +++++++++ test/server.dart | 117 ----------------- test/service/browser_test.dart | 5 + test/service/common.dart | 24 ++++ test/service/io_test.dart | 56 ++++++++ 20 files changed, 610 insertions(+), 714 deletions(-) create mode 100644 .anaylsis-options.yaml create mode 100644 .travis.yml delete mode 100644 lib/cli.dart delete mode 100644 test/all_tests.dart delete mode 100644 test/common.dart create mode 100644 test/controller/common.dart create mode 100644 test/controller/io_test.dart delete mode 100644 test/server.dart create mode 100644 test/service/browser_test.dart create mode 100644 test/service/common.dart create mode 100644 test/service/io_test.dart diff --git a/.anaylsis-options.yaml b/.anaylsis-options.yaml new file mode 100644 index 00000000..716de123 --- /dev/null +++ b/.anaylsis-options.yaml @@ -0,0 +1,4 @@ +analyzer: + strong-mode: true + exclude: + - .scripts-bin/**/*.dart \ No newline at end of file diff --git a/.gitignore b/.gitignore index ea89ccf0..4191ae6d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,6 @@ doc/api/ # Don't commit pubspec lock file # (Library packages only! Remove pattern if developing an application package) pubspec.lock -.idea \ No newline at end of file +.idea + +log.txt \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..de2210c9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1 @@ +language: dart \ No newline at end of file diff --git a/lib/angel_websocket.dart b/lib/angel_websocket.dart index fc058014..03b4bb4b 100644 --- a/lib/angel_websocket.dart +++ b/lib/angel_websocket.dart @@ -1,17 +1,73 @@ +/// WebSocket plugin for Angel. library angel_websocket; +const String ACTION_INDEX = 'index'; +const String ACTION_READ = 'read'; +const String ACTION_CREATE = 'create'; +const String ACTION_MODIFY = 'modify'; +const String ACTION_UPDATE = 'update'; +const String ACTION_REMOVE = 'remove'; + +const String EVENT_ERROR = 'error'; +const String EVENT_INDEXED = 'indexed'; +const String EVENT_READ = 'read'; +const String EVENT_CREATED = 'created'; +const String EVENT_MODIFIED = 'modified'; +const String EVENT_UPDATED = 'updated'; +const String EVENT_REMOVED = 'removed'; + +/// The standard Angel service actions. +const List ACTIONS = const [ + ACTION_INDEX, + ACTION_READ, + ACTION_CREATE, + ACTION_MODIFY, + ACTION_UPDATE, + ACTION_REMOVE +]; + +/// The standard Angel service events. +const List EVENTS = const [ + EVENT_INDEXED, + EVENT_READ, + EVENT_CREATED, + EVENT_MODIFIED, + EVENT_UPDATED, + EVENT_REMOVED +]; + +/// A notification from the server that something has occurred. class WebSocketEvent { String eventName; var data; WebSocketEvent({String this.eventName, this.data}); + + factory WebSocketEvent.fromJson(Map data) => + new WebSocketEvent(eventName: data['eventName'], data: data['data']); + + Map toJson() { + return {'eventName': eventName, 'data': data}; + } } +/// A command sent to the server, usually corresponding to a service method. class WebSocketAction { String id; String eventName; var data; var params; - WebSocketAction({String this.id, String this.eventName, this.data, this.params}); + WebSocketAction( + {String this.id, String this.eventName, this.data, this.params}); + + factory WebSocketAction.fromJson(Map data) => new WebSocketAction( + id: data['id'], + eventName: data['eventName'], + data: data['data'], + params: data['params']); + + Map toJson() { + return {'id': id, 'eventName': eventName, 'data': data, 'params': params}; + } } diff --git a/lib/base_websocket_client.dart b/lib/base_websocket_client.dart index 98f45a67..9cfed9d6 100644 --- a/lib/base_websocket_client.dart +++ b/lib/base_websocket_client.dart @@ -1,44 +1,140 @@ import 'dart:async'; +import 'dart:convert'; import 'package:angel_client/angel_client.dart'; import 'package:http/src/base_client.dart' as http; import 'package:web_socket_channel/web_socket_channel.dart'; -import 'package:web_socket_channel/status.dart'; +import 'package:web_socket_channel/status.dart' as status; import 'angel_websocket.dart'; export 'package:angel_client/angel_client.dart'; import 'package:angel_client/base_angel_client.dart'; final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); +/// An [Angel] client that operates across WebSockets. abstract class BaseWebSocketClient extends BaseAngelClient { WebSocketChannel _socket; + final StreamController _onData = new StreamController(); + final StreamController _onMessage = + new StreamController(); + final StreamController _onError = + new StreamController(); + final StreamController> _onServiceEvent = + new StreamController>.broadcast(); + final StreamController + _onWebSocketChannelException = + new StreamController(); + + /// A broadcast stream of data coming from the [socket]. + /// + /// Mostly just for internal use. + Stream get onData => _onData.stream; + + /// Fired on errors. + Stream get onError => _onError.stream; + + /// Fired on all events. + Stream get onMessage => _onMessage.stream; + + /// Fired whenever an event is fired by a service. + Stream> get onServiceEvent => + _onServiceEvent.stream; + + /// Fired on [WebSocketChannelException]s. + Stream get onWebSocketChannelException => + _onWebSocketChannelException.stream; + /// The [WebSocketChannel] underneath this instance. WebSocketChannel get socket => _socket; BaseWebSocketClient(http.BaseClient client, String basePath) - : super(client, basePath); + : super(client, basePath) {} - Future connect(); + @override + Future close() async => _socket.sink.close(status.goingAway); + + /// Connects the WebSocket. + Future connect() async { + _socket = await getConnectedWebSocket(); + listen(); + return _socket; + } + + /// Returns a new [WebSocketChannel], ready to be listened on. + /// + /// This should be overriden by child classes, **NOT** [connect]. + Future getConnectedWebSocket(); @override BaseWebSocketService service(String path, {Type type, AngelDeserializer deserializer}) { String uri = path.toString().replaceAll(_straySlashes, ''); return new BaseWebSocketService(socket, this, uri, - deserializer: deserializer)..listen(); + deserializer: deserializer); + } + + /// Starts listening for data. + void listen() { + _socket.stream.listen((data) { + _onData.add(data); + + if (data is WebSocketChannelException) { + _onWebSocketChannelException.add(data); + } else if (data is String) { + var json = JSON.decode(data); + + if (json is Map) { + var event = new WebSocketEvent.fromJson(json); + _onMessage.add(event); + + if (event.eventName == EVENT_ERROR) { + var error = new AngelHttpException.fromMap(event.data ?? {}); + _onError.add(error); + } else if (event.eventName?.isNotEmpty == true) { + var split = event.eventName + .split("::") + .where((str) => str.isNotEmpty) + .toList(); + + if (split.length >= 2) { + var serviceName = split[0], eventName = split[1]; + _onServiceEvent.add({serviceName: event..eventName = eventName}); + } + } + } + } + }); + } + + /// Serializes data to JSON. + serialize(x) => JSON.encode(x); + + /// Alternative form of [send]ing an action. + void send(String eventName, WebSocketAction action) => + sendAction(action..eventName = eventName); + + /// Sends the given [action] on the [socket]. + void sendAction(WebSocketAction action) { + socket.sink.add(serialize(action)); } } +/// A [Service] that asynchronously interacts with the server. class BaseWebSocketService extends Service { + /// The [BaseWebSocketClient] that spawned this service. @override - final Angel app; - final AngelDeserializer deserializer; - final WebSocketChannel socket; - final String uri; + final BaseWebSocketClient app; - final StreamController _onMessage = - new StreamController(); - final StreamController _onError = + /// Used to deserialize JSON into typed data. + final AngelDeserializer deserializer; + + /// The [WebSocketChannel] to listen to, and send data across. + final WebSocketChannel socket; + + /// The service path to listen to. + final String path; + + final StreamController _onAllEvents = new StreamController(); final StreamController _onIndexed = new StreamController(); @@ -52,17 +148,13 @@ class BaseWebSocketService extends Service { new StreamController(); final StreamController _onRemoved = new StreamController(); - final WebSocketExtraneousEventHandler _on = - new WebSocketExtraneousEventHandler(); /// Use this to handle events that are not standard. - WebSocketExtraneousEventHandler get on => _on; + final WebSocketExtraneousEventHandler on = + new WebSocketExtraneousEventHandler(); /// Fired on all events. - Stream get onMessage => _onMessage.stream; - - /// Fired on errors. - Stream get onError => _onError.stream; + Stream get onAllEvents => _onAllEvents.stream; /// Fired on `index` events. Stream get onIndexed => _onIndexed.stream; @@ -82,48 +174,116 @@ class BaseWebSocketService extends Service { /// Fired on `removed` events. Stream get onRemoved => _onRemoved.stream; - BaseWebSocketService(this.socket, this.app, this.uri, {this.deserializer}); + BaseWebSocketService(this.socket, this.app, this.path, {this.deserializer}) { + listen(); + } + /// Serializes an [action] to be sent over a WebSocket. + serialize(WebSocketAction action) => JSON.encode(action); + + /// Deserializes data from a [WebSocketEvent]. + deserialize(x) { + return deserializer != null ? deserializer(x) : x; + } + + /// Deserializes the contents of an [event]. + WebSocketEvent transformEvent(WebSocketEvent event) { + return event..data = deserialize(event.data); + } + + /// Starts listening for events. void listen() { - socket.stream.listen((message) { - print('Message: ${message.runtimeType}'); + app.onServiceEvent.listen((map) { + if (map.containsKey(path)) { + var event = map[path]; + var transformed = transformEvent(event); + + _onAllEvents.add(event); + on._getStream(event.eventName).add(event); + + switch (event.eventName) { + case EVENT_INDEXED: + _onIndexed.add(transformed); + break; + case EVENT_READ: + _onRead.add(transformed); + break; + case EVENT_CREATED: + _onCreated.add(transformed); + break; + case EVENT_MODIFIED: + _onModified.add(transformed); + break; + case EVENT_UPDATED: + _onUpdated.add(transformed); + break; + case EVENT_REMOVED: + _onRemoved.add(transformed); + break; + } + } }); } - @override - Future index([Map params]) { - // TODO: implement index + /// Sends the given [action] on the [socket]. + void send(WebSocketAction action) { + socket.sink.add(serialize(action)); } @override - Future read(id, [Map params]) { - // TODO: implement read + Future index([Map params]) async { + socket.sink.add(serialize(new WebSocketAction( + eventName: '$path::${ACTION_INDEX}', params: params ?? {}))); + return null; } @override - Future create(data, [Map params]) { - // TODO: implement create + Future read(id, [Map params]) async { + socket.sink + .add(serialize(new WebSocketAction(id: id, params: params ?? {}))); + return null; } @override - Future modify(id, data, [Map params]) { - // TODO: implement modify + Future create(data, [Map params]) async { + socket.sink + .add(serialize(new WebSocketAction(data: data, params: params ?? {}))); + return null; } @override - Future update(id, data, [Map params]) { - // TODO: implement update + Future modify(id, data, [Map params]) async { + socket.sink.add(serialize( + new WebSocketAction(id: id, data: data, params: params ?? {}))); + return null; } @override - Future remove(id, [Map params]) { - // TODO: implement remove + Future update(id, data, [Map params]) async { + socket.sink.add(serialize( + new WebSocketAction(id: id, data: data, params: params ?? {}))); + return null; + } + + @override + Future remove(id, [Map params]) async { + socket.sink + .add(serialize(new WebSocketAction(id: id, params: params ?? {}))); + return null; } } +/// Contains a dynamic Map of [WebSocketEvent] streams. class WebSocketExtraneousEventHandler { Map> _events = {}; + StreamController _getStream(String index) { + if (_events[index] == null) + _events[index] = new StreamController(); + + return _events[index]; + } + operator [](String index) { if (_events[index] == null) _events[index] = new StreamController(); diff --git a/lib/browser.dart b/lib/browser.dart index 69252e56..cbde5c23 100644 --- a/lib/browser.dart +++ b/lib/browser.dart @@ -1,212 +1,50 @@ +/// Browser WebSocket client library for the Angel framework. +library angel_websocket.browser; + import 'dart:async'; -import 'dart:convert'; import 'dart:html'; import 'package:angel_client/angel_client.dart'; -import 'package:angel_websocket/angel_websocket.dart'; +import 'package:http/http.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/html.dart'; +import 'base_websocket_client.dart'; export 'package:angel_client/angel_client.dart'; -export 'package:angel_websocket/angel_websocket.dart'; +export 'angel_websocket.dart'; -class WebSocketClient extends Angel { - WebSocket _socket; - Map> _services = {}; - WebSocket get _underlyingSocket => _socket; +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); - WebSocketClient(String wsEndpoint) : super(wsEndpoint) { - _socket = new WebSocket(wsEndpoint); - _connect(); - } +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + WebSockets(String path) : super(new http.Client(), path); - onData(data) { - var fromJson = JSON.decode(data); - var e = new WebSocketEvent( - eventName: fromJson['eventName'], data: fromJson['data']); - var split = e.eventName.split("::"); - var serviceName = split[0]; - var services = _services[serviceName]; + @override + Future getConnectedWebSocket() { + var socket = new WebSocket(basePath); + var completer = new Completer(); - if (serviceName == "error") { - throw new Exception("Server-side error."); - } else if (services != null) { - e.eventName = split[1]; + socket + ..onOpen.listen((_) { + if (!completer.isCompleted) + return completer.complete(new HtmlWebSocketChannel(socket)); + }) + ..onError.listen((ErrorEvent e) { + if (!completer.isCompleted) return completer.completeError(e.error); + }); - for (WebSocketService service in services) { - service._onAllEvents.add(e); - switch (e.eventName) { - case "indexed": - service._onIndexed.add(e); - break; - case "read": - service._onRead.add(e); - break; - case "created": - service._onCreated.add(e); - break; - case "modified": - service._onModified.add(e); - break; - case "updated": - service._onUpdated.add(e); - break; - case "error": - service._onRemoved.add(e); - break; - case "error": - service._onError.add(e); - break; - default: - if (service._on._events.containsKey(e.eventName)) - service._on._events[e.eventName].add(e); - break; - } - } - } - } - - void _connect() { - _socket.onMessage.listen((MessageEvent event) { - onData(event.data); - }); - } - - void send(String eventName, data) { - _socket.send(JSON.encode({"eventName": eventName, "data": data})); + return completer.future; } @override - Service service(Pattern path, {Type type}) { - var service = - new WebSocketService._base(path.toString(), this, _socket, type); - if (_services[path.toString()] == null) _services[path.toString()] = []; - - _services[path.toString()].add(service); - return service; + WebSocketsService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.replaceAll(_straySlashes, ''); + return new WebSocketsService(socket, this, uri, T != dynamic ? T : type); } } -class WebSocketExtraneousEventHandler { - Map> _events = {}; +class WebSocketsService extends BaseWebSocketService { + final Type type; - operator [](String index) { - if (_events[index] == null) - _events[index] = new StreamController(); - - return _events[index].stream; - } -} - -class _WebSocketServiceTransformer - implements StreamTransformer { - Type _outputType; - - _WebSocketServiceTransformer.base(this._outputType); - - @override - Stream bind(Stream stream) { - var _stream = new StreamController(); - - stream.listen((WebSocketEvent e) { - /* if (_outputType != null && e.eventName != "error") - e.data = god.deserialize(god.serialize(e.data), outputType: _outputType); - */ - _stream.add(e); - }); - - return _stream.stream; - } -} - -class WebSocketService extends Service { - Type _outputType; - String _path; - _WebSocketServiceTransformer _transformer; - WebSocket connection; - - WebSocketExtraneousEventHandler _on = new WebSocketExtraneousEventHandler(); - var _onAllEvents = new StreamController(); - var _onError = new StreamController(); - var _onIndexed = new StreamController(); - var _onRead = new StreamController(); - var _onCreated = new StreamController(); - var _onModified = new StreamController(); - var _onUpdated = new StreamController(); - var _onRemoved = new StreamController(); - - WebSocketExtraneousEventHandler get on => _on; - - Stream get onAllEvents => - _onAllEvents.stream.transform(_transformer); - - Stream get onError => _onError.stream; - - Stream get onIndexed => - _onIndexed.stream.transform(_transformer); - - Stream get onRead => _onRead.stream.transform(_transformer); - - Stream get onCreated => - _onCreated.stream.transform(_transformer); - - Stream get onModified => - _onModified.stream.transform(_transformer); - - Stream get onUpdated => - _onUpdated.stream.transform(_transformer); - - Stream get onRemoved => - _onRemoved.stream.transform(_transformer); - - WebSocketService._base( - String path, Angel app, WebSocket this.connection, Type _outputType) { - this._path = path; - this.app = app; - this._outputType = _outputType; - _transformer = new _WebSocketServiceTransformer.base(this._outputType); - } - - _serialize(WebSocketAction action) { - var data = {"id": action.id, "eventName": action.eventName}; - - if (action.data != null) data["data"] = action.data; - - if (action.params != null) data["params"] = action.params; - - return JSON.encode(data); - } - - @override - Future index([Map params]) async { - connection.send(_serialize( - new WebSocketAction(eventName: "$_path::index", params: params))); - return null; - } - - @override - Future read(id, [Map params]) async { - connection.send(_serialize(new WebSocketAction( - eventName: "$_path::read", id: id, params: params))); - } - - @override - Future create(data, [Map params]) async { - connection.send(_serialize(new WebSocketAction( - eventName: "$_path::create", data: data, params: params))); - } - - @override - Future modify(id, data, [Map params]) async { - connection.send(_serialize(new WebSocketAction( - eventName: "$_path::modify", id: id, data: data, params: params))); - } - - @override - Future update(id, data, [Map params]) async { - connection.send(_serialize(new WebSocketAction( - eventName: "$_path::update", id: id, data: data, params: params))); - } - - @override - Future remove(id, [Map params]) async { - connection.send(_serialize(new WebSocketAction( - eventName: "$_path::remove", id: id, params: params))); - } + WebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type) + : super(socket, app, uri); } diff --git a/lib/cli.dart b/lib/cli.dart deleted file mode 100644 index ec48b16e..00000000 --- a/lib/cli.dart +++ /dev/null @@ -1,224 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; -import 'package:angel_client/angel_client.dart'; -import 'package:angel_framework/angel_framework.dart' as srv; -import 'package:angel_websocket/angel_websocket.dart'; -import 'package:json_god/json_god.dart' as god; -export 'package:angel_client/angel_client.dart'; -export 'package:angel_websocket/angel_websocket.dart'; - -class WebSocketClient extends Angel { - WebSocket _socket; - Map> _services = {}; - WebSocket get underlyingSocket => _socket; - _WebSocketEventTable on = new _WebSocketEventTable(); - - WebSocketClient(String wsEndpoint) : super(wsEndpoint); - - onData(data) async { - var fromJson = JSON.decode(data); - print("a: $fromJson"); - var e = new WebSocketEvent( - eventName: fromJson['eventName'], data: fromJson['data']); - print("b: $e"); - var split = e.eventName.split("::"); - var serviceName = split[0]; - var services = _services[serviceName]; - - if (serviceName == "error") { - var exc = new srv.AngelHttpException(new Exception("Server-side error.")); - exc.statusCode = e.data['statusCode']; - exc.message = e.data['message']; - exc.errors = exc.errors ?? []; - exc.errors.addAll(e.data['errors'] ?? []); - throw exc; - } else { - on._getStreamForEvent(serviceName).add(e.data); - - if (services != null) { - e.eventName = split[1]; - - for (WebSocketService service in services) { - service._onAllEvents.add(e); - switch (e.eventName) { - case srv.HookedServiceEvent.INDEXED: - service._onIndexed.add(e); - break; - case srv.HookedServiceEvent.READ: - service._onRead.add(e); - break; - case srv.HookedServiceEvent.CREATED: - service._onCreated.add(e); - break; - case srv.HookedServiceEvent.MODIFIED: - service._onModified.add(e); - break; - case srv.HookedServiceEvent.UPDATED: - service._onUpdated.add(e); - break; - case srv.HookedServiceEvent.REMOVED: - service._onRemoved.add(e); - break; - case "error": - service._onError.add(e); - break; - default: - if (service._on._events.containsKey(e.eventName)) - service._on._events[e.eventName].add(e); - break; - } - } - } - } - } - - Future connect() async { - _socket = await WebSocket.connect(basePath); - _socket.listen(onData); - } - - void send(String eventName, data) { - _socket.add(JSON.encode({"eventName": eventName, "data": data})); - } - - @override - Service service(Pattern path, {Type type}) { - var service = - new WebSocketService._base(path.toString(), this, _socket, type); - if (_services[path.toString()] == null) _services[path.toString()] = []; - - _services[path.toString()].add(service); - return service; - } -} - -class WebSocketExtraneousEventHandler { - Map> _events = {}; - - operator [](String index) { - if (_events[index] == null) - _events[index] = new StreamController(); - - return _events[index].stream; - } -} - -class _WebSocketServiceTransformer - implements StreamTransformer { - Type _outputType; - - _WebSocketServiceTransformer.base(this._outputType); - - @override - Stream bind(Stream stream) { - var _stream = new StreamController(); - - stream.listen((WebSocketEvent e) { - if (_outputType != null && e.eventName != "error") - e.data = - god.deserialize(god.serialize(e.data), outputType: _outputType); - _stream.add(e); - }); - - return _stream.stream; - } -} - -class WebSocketService extends Service { - Type _outputType; - String _path; - _WebSocketServiceTransformer _transformer; - WebSocket connection; - - WebSocketExtraneousEventHandler _on = new WebSocketExtraneousEventHandler(); - var _onAllEvents = new StreamController(); - var _onError = new StreamController(); - var _onIndexed = new StreamController(); - var _onRead = new StreamController(); - var _onCreated = new StreamController(); - var _onModified = new StreamController(); - var _onUpdated = new StreamController(); - var _onRemoved = new StreamController(); - - WebSocketExtraneousEventHandler get on => _on; - - Stream get onAllEvents => - _onAllEvents.stream.transform(_transformer); - - Stream get onError => _onError.stream; - - Stream get onIndexed => - _onIndexed.stream.transform(_transformer); - - Stream get onRead => _onRead.stream.transform(_transformer); - - Stream get onCreated => - _onCreated.stream.transform(_transformer); - - Stream get onModified => - _onModified.stream.transform(_transformer); - - Stream get onUpdated => - _onUpdated.stream.transform(_transformer); - - Stream get onRemoved => - _onRemoved.stream.transform(_transformer); - - WebSocketService._base( - String path, Angel app, WebSocket this.connection, Type _outputType) { - this._path = path; - this.app = app; - this._outputType = _outputType; - _transformer = new _WebSocketServiceTransformer.base(this._outputType); - } - - @override - Future index([Map params]) async { - connection.add(god.serialize( - new WebSocketAction(eventName: "$_path::index", params: params))); - return null; - } - - @override - Future read(id, [Map params]) async { - connection.add(god.serialize(new WebSocketAction( - eventName: "$_path::read", id: id, params: params))); - } - - @override - Future create(data, [Map params]) async { - connection.add(god.serialize(new WebSocketAction( - eventName: "$_path::create", data: data, params: params))); - } - - @override - Future modify(id, data, [Map params]) async { - connection.add(god.serialize(new WebSocketAction( - eventName: "$_path::modify", id: id, data: data, params: params))); - } - - @override - Future update(id, data, [Map params]) async { - connection.add(god.serialize(new WebSocketAction( - eventName: "$_path::update", id: id, data: data, params: params))); - } - - @override - Future remove(id, [Map params]) async { - connection.add(god.serialize(new WebSocketAction( - eventName: "$_path::remove", id: id, params: params))); - } -} - -class _WebSocketEventTable { - Map> _handlers = {}; - - StreamController _getStreamForEvent(eventName) { - if (!_handlers.containsKey(eventName)) - _handlers[eventName] = new StreamController.broadcast(); - return _handlers[eventName]; - } - - Stream operator [](String key) => _getStreamForEvent(key).stream; -} diff --git a/lib/io.dart b/lib/io.dart index f7becf8c..6c111124 100644 --- a/lib/io.dart +++ b/lib/io.dart @@ -1,12 +1,14 @@ /// Command-line WebSocket client library for the Angel framework. -library angel_client.cli; +library angel_websocket.io; import 'dart:async'; +import 'dart:io'; import 'package:angel_client/angel_client.dart'; import 'package:http/http.dart' as http; import 'package:json_god/json_god.dart' as god; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/io.dart'; +import 'angel_websocket.dart'; import 'base_websocket_client.dart'; export 'package:angel_client/angel_client.dart'; export 'angel_websocket.dart'; @@ -18,16 +20,20 @@ class WebSockets extends BaseWebSocketClient { WebSockets(String path) : super(new http.Client(), path); @override - Future connect() async { - return new IOWebSocketChannel.connect(basePath); + Future getConnectedWebSocket() async { + var socket = await WebSocket.connect(basePath); + return new IOWebSocketChannel(socket); } @override WebSocketsService service(String path, {Type type, AngelDeserializer deserializer}) { - String uri = path.replaceAll(_straySlashes, ""); + String uri = path.replaceAll(_straySlashes, ''); return new WebSocketsService(socket, this, uri, T != dynamic ? T : type); } + + @override + serialize(x) => god.serialize(x); } class WebSocketsService extends BaseWebSocketService { @@ -35,4 +41,15 @@ class WebSocketsService extends BaseWebSocketService { WebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type) : super(socket, app, uri); + + @override + serialize(WebSocketAction action) => god.serialize(action); + + @override + deserialize(x) { + if (type != null && type != dynamic) { + return god.deserializeDatum(x, outputType: type); + } else + return super.deserialize(x); + } } diff --git a/lib/server.dart b/lib/server.dart index c1397e01..f63fcaf5 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -1,3 +1,4 @@ +/// Server-side support for WebSockets. library angel_websocket.server; import 'dart:async'; @@ -13,30 +14,46 @@ export 'angel_websocket.dart'; part 'websocket_context.dart'; part 'websocket_controller.dart'; +/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s. class AngelWebSocket extends AngelPlugin { Angel _app; List _clients = []; - StreamController _onConnection = - new StreamController(); - StreamController _onDisconnect = - new StreamController(); final List _servicesAlreadyWired = []; + final StreamController _onAction = + new StreamController(); + final StreamController _onData = new StreamController(); + final StreamController _onConnection = + new StreamController.broadcast(); + final StreamController _onDisconnect = + new StreamController.broadcast(); + + /// Include debug information, and send error information across WebSockets. + final bool debug; + /// A list of clients currently connected to this server via WebSockets. List get clients => new List.unmodifiable(_clients); /// Services that have already been hooked to fire socket events. - List get servicesAlreadyWired => new List.unmodifiable(_servicesAlreadyWired); + List get servicesAlreadyWired => + new List.unmodifiable(_servicesAlreadyWired); + /// The endpoint that users should connect a WebSocket to. final String endpoint; + /// Fired on any [WebSocketAction]. + Stream get onAction => _onAction.stream; + + /// Fired whenever a WebSocket sends data. + Stream get onData => _onData.stream; + /// Fired on incoming connections. Stream get onConnection => _onConnection.stream; /// Fired when a user disconnects. Stream get onDisconnection => _onDisconnect.stream; - AngelWebSocket(String this.endpoint); + AngelWebSocket({this.endpoint: '/ws', this.debug: false}); _batchEvent(String path) { return (HookedServiceEvent e) async { @@ -46,6 +63,7 @@ class AngelWebSocket extends AngelPlugin { }; } + /// Slates an event to be dispatched. Future batchEvent(WebSocketEvent event) async { // Default implementation will just immediately fire events _clients.forEach((client) { @@ -53,8 +71,10 @@ class AngelWebSocket extends AngelPlugin { }); } + /// Returns a list of events yet to be sent. Future> getBatchedEvents() async => []; + /// Responds to an incoming action on a WebSocket. Future handleAction(WebSocketAction action, WebSocketContext socket) async { var split = action.eventName.split("::"); @@ -67,7 +87,7 @@ class AngelWebSocket extends AngelPlugin { return socket.sendError(new AngelHttpException.NotFound( message: "No service \"${split[0]}\" exists.")); - var eventName = split[1]; + var actionName = split[1]; var params = mergeMap([ god.deserializeDatum(action.params), @@ -75,39 +95,44 @@ class AngelWebSocket extends AngelPlugin { ]); try { - if (eventName == "index") { - return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED, - await service.index(params)); - } else if (eventName == "read") { - return socket.send("${split[0]}::" + HookedServiceEvent.READ, + if (actionName == ACTION_INDEX) { + return socket.send( + "${split[0]}::" + EVENT_INDEXED, await service.index(params)); + } else if (actionName == ACTION_READ) { + return socket.send("${split[0]}::" + EVENT_READ, await service.read(action.id, params)); - } else if (eventName == "create") { + } else if (actionName == ACTION_CREATE) { return new WebSocketEvent( - eventName: "${split[0]}::" + HookedServiceEvent.CREATED, + eventName: "${split[0]}::" + EVENT_CREATED, data: await service.create(action.data, params)); - } else if (eventName == "modify") { + } else if (actionName == ACTION_MODIFY) { return new WebSocketEvent( - eventName: "${split[0]}::" + HookedServiceEvent.MODIFIED, + eventName: "${split[0]}::" + EVENT_MODIFIED, data: await service.modify(action.id, action.data, params)); - } else if (eventName == "update") { + } else if (actionName == ACTION_UPDATE) { return new WebSocketEvent( - eventName: "${split[0]}::" + HookedServiceEvent.UPDATED, + eventName: "${split[0]}::" + EVENT_UPDATED, data: await service.update(action.id, action.data, params)); - } else if (eventName == "remove") { + } else if (actionName == ACTION_REMOVE) { return new WebSocketEvent( - eventName: "${split[0]}::" + HookedServiceEvent.REMOVED, + eventName: "${split[0]}::" + EVENT_REMOVED, data: await service.remove(action.id, params)); } else { return socket.sendError(new AngelHttpException.MethodNotAllowed( - message: "Method Not Allowed: \"$eventName\"")); + message: "Method Not Allowed: \"$actionName\"")); } - } catch (e) { - if (e is AngelHttpException) return socket.sendError(e); - - return socket.sendError(new AngelHttpException(e)); + } catch (e, st) { + if (e is AngelHttpException) + return socket.sendError(e); + else if (debug == true) + socket.sendError(new AngelHttpException(e, + message: e.toString(), stackTrace: st, errors: [st.toString()])); + else + socket.sendError(new AngelHttpException(e)); } } + /// Hooks a service up to have its events broadcasted. hookupService(Pattern _path, HookedService service) { String path = _path.toString(); var batch = _batchEvent(path); @@ -121,17 +146,16 @@ class AngelWebSocket extends AngelPlugin { _servicesAlreadyWired.add(path); } - Future onConnect(WebSocketContext socket) async {} + /// Runs before firing [onConnection]. + Future handleConnect(WebSocketContext socket) async {} - onData(WebSocketContext socket, data) async { + /// Handles incoming data from a WebSocket. + handleData(WebSocketContext socket, data) async { try { socket._onData.add(data); var fromJson = JSON.decode(data); - var action = new WebSocketAction( - id: fromJson['id'], - eventName: fromJson['eventName'], - data: fromJson['data'], - params: fromJson['params']); + var action = new WebSocketAction.fromJson(fromJson); + _onAction.add(action); if (action.eventName == null || action.eventName is! String || @@ -140,22 +164,17 @@ class AngelWebSocket extends AngelPlugin { } if (fromJson is Map && fromJson.containsKey("eventName")) { - socket._onAll.add(fromJson); - socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson["data"]); + socket._onAction.add(new WebSocketAction.fromJson(fromJson)); + socket.on + ._getStreamForEvent(fromJson["eventName"].toString()) + .add(fromJson["data"]); } if (action.eventName.contains("::")) { var split = action.eventName.split("::"); if (split.length >= 2) { - if ([ - "index", - "read", - "create", - "modify", - "update", - "remove" - ].contains(split[1])) { + if (ACTIONS.contains(split[1])) { var event = handleAction(action, socket); if (event is Future) event = await event; @@ -165,19 +184,24 @@ class AngelWebSocket extends AngelPlugin { } } } - } catch (e) { + } catch (e, st) { // Send an error if (e is AngelHttpException) socket.sendError(e); + else if (debug == true) + socket.sendError(new AngelHttpException(e, + message: e.toString(), stackTrace: st, errors: [st.toString()])); else socket.sendError(new AngelHttpException(e)); } } + /// Transforms a [HookedServiceEvent], so that it can be broadcasted. Future transformEvent(HookedServiceEvent event) async { return new WebSocketEvent(eventName: event.eventName, data: event.result); } + /// Hooks any [HookedService]s that are not being broadcasted yet. wireAllServices(Angel app) { for (Pattern key in app.services.keys.where((x) { return !_servicesAlreadyWired.contains(x) && @@ -189,7 +213,7 @@ class AngelWebSocket extends AngelPlugin { @override Future call(Angel app) async { - this._app = app..container.singleton(this); + _app = app..container.singleton(this); if (runtimeType != AngelWebSocket) app.container.singleton(this, as: AngelWebSocket); @@ -202,24 +226,25 @@ class AngelWebSocket extends AngelPlugin { }); app.get(endpoint, (RequestContext req, ResponseContext res) async { - if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest)) + if (!WebSocketTransformer.isUpgradeRequest(req.io)) throw new AngelHttpException.BadRequest(); res ..willCloseItself = true ..end(); - var ws = await WebSocketTransformer.upgrade(req.underlyingRequest); + var ws = await WebSocketTransformer.upgrade(req.io); _clients.add(ws); var socket = new WebSocketContext(ws, req, res); - await onConnect(socket); + await handleConnect(socket); _onConnection.add(socket); - req.params['socket'] = socket; + req.properties['socket'] = socket; ws.listen((data) { - onData(socket, data); + _onData.add(data); + handleData(socket, data); }, onDone: () { _onDisconnect.add(socket); _clients.remove(ws); diff --git a/lib/websocket_context.dart b/lib/websocket_context.dart index 0151eda9..98a6c78a 100644 --- a/lib/websocket_context.dart +++ b/lib/websocket_context.dart @@ -1,24 +1,40 @@ part of angel_websocket.server; +/// Represents a WebSocket session, with the original +/// [RequestContext] and [ResponseContext] attached. class WebSocketContext { - StreamController _onAll = new StreamController.broadcast(); - StreamController _onData = new StreamController.broadcast(); + /// Use this to listen for events. _WebSocketEventTable on = new _WebSocketEventTable(); - Stream get onAll => _onAll.stream; + + /// The underlying [WebSocket] instance. + final WebSocket io; + + /// The original [RequestContext]. + final RequestContext request; + + /// The original [ResponseContext]. + final ResponseContext response; + + StreamController _onAction = + new StreamController(); + StreamController _onData = new StreamController(); + + /// Fired on any [WebSocketAction]; + Stream get onAction => _onAction.stream; + + /// Fired when any data is sent through [io]. Stream get onData => _onData.stream; - WebSocket underlyingSocket; - RequestContext requestContext; - ResponseContext responseContext; - WebSocketContext(WebSocket this.underlyingSocket, - RequestContext this.requestContext, ResponseContext this.responseContext); + WebSocketContext(WebSocket this.io, RequestContext this.request, + ResponseContext this.response); - send(String eventName, data) { - underlyingSocket.add( - god.serialize(new WebSocketEvent(eventName: eventName, data: data))); + /// Sends an arbitrary [WebSocketEvent]; + void send(String eventName, data) { + io.add(god.serialize(new WebSocketEvent(eventName: eventName, data: data))); } - sendError(AngelHttpException error) => send("error", error.toJson()); + /// Sends an error event. + void sendError(AngelHttpException error) => send(EVENT_ERROR, error.toJson()); } class _WebSocketEventTable { diff --git a/lib/websocket_controller.dart b/lib/websocket_controller.dart index 4ea621c1..50ebc0e5 100644 --- a/lib/websocket_controller.dart +++ b/lib/websocket_controller.dart @@ -9,17 +9,27 @@ class ExposeWs { class WebSocketController extends Controller { Map _handlers = {}; Map _handlerSymbols = {}; - InstanceMirror _instanceMirror; AngelWebSocket ws; - WebSocketController():super() { - _instanceMirror = reflect(this); + WebSocketController() : super(); + + void broadcast(String eventName, data) { + ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data)); } + onConnect(WebSocketContext socket) {} + + onDisconnect(WebSocketContext socket) {} + + onAction(WebSocketAction action, WebSocketContext socket) async {} + + onData(data, WebSocketContext socket) {} + @override Future call(Angel app) async { await super.call(app); + InstanceMirror instanceMirror = reflect(this); ClassMirror classMirror = reflectClass(this.runtimeType); classMirror.instanceMembers.forEach((sym, mirror) { if (mirror.isRegularMethod) { @@ -38,51 +48,32 @@ class WebSocketController extends Controller { AngelWebSocket ws = app.container.make(AngelWebSocket); ws.onConnection.listen((socket) async { + socket.request + ..inject('socket', socket) + ..inject(WebSocketContext, socket); + await onConnect(socket); - socket.onData.listen(onData); + socket.onData.listen((data) => onData(data, socket)); - socket.onAll.listen((Map data) async { - await onAllEvents(data); + socket.onAction.listen((WebSocketAction action) async { + await onAction(action, socket); - if (_handlers.containsKey(data["eventName"])) { - var methodMirror = _handlers[data["eventName"]]; + if (_handlers.containsKey(action.eventName)) { try { - // Load parameters, and execute - List args = []; + var methodMirror = _handlers[action.eventName]; + var fn = instanceMirror.getField(methodMirror.simpleName).reflectee; - for (int i = 0; i < methodMirror.parameters.length; i++) { - ParameterMirror parameter = methodMirror.parameters[i]; - String name = MirrorSystem.getName(parameter.simpleName); - - if (parameter.type.reflectedType == RequestContext || - name == "req") - args.add(socket.requestContext); - else if (parameter.type.reflectedType == ResponseContext || - name == "res") - args.add(socket.responseContext); - else if (parameter.type == AngelWebSocket) - args.add(socket); - else { - if (socket.requestContext.params.containsKey(name)) { - args.add(socket.requestContext.params[name]); - } else { - try { - args.add(app.container.make(parameter.type.reflectedType)); - continue; - } catch (e) { - throw new AngelHttpException.BadRequest( - message: "Missing parameter '$name'"); - } - } - } - } - - await _instanceMirror.invoke(_handlerSymbols[data["eventName"]], args); - } catch (e) { + return app.runContained(fn, socket.request, socket.response); + } catch (e, st) { // Send an error if (e is AngelHttpException) socket.sendError(e); + else if (ws.debug == true) + socket.sendError(new AngelHttpException(e, + message: e.toString(), + stackTrace: st, + errors: [st.toString()])); else socket.sendError(new AngelHttpException(e)); } @@ -92,16 +83,4 @@ class WebSocketController extends Controller { ws.onDisconnection.listen(onDisconnect); } - - void broadcast(String eventName, data) { - ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data)); - } - - Future onConnect(WebSocketContext socket) async {} - - Future onDisconnect(WebSocketContext socket) async {} - - Future onAllEvents(Map data) async {} - - void onData(data) {} } diff --git a/pubspec.yaml b/pubspec.yaml index 45326092..f1330363 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,5 +12,6 @@ dependencies: uuid: "^0.5.3" web_socket_channel: "^1.0.0" dev_dependencies: + angel_diagnostics: "^1.0.0-dev" http: "^0.11.3" test: "^0.12.15" diff --git a/test/all_tests.dart b/test/all_tests.dart deleted file mode 100644 index b9d49b66..00000000 --- a/test/all_tests.dart +++ /dev/null @@ -1,6 +0,0 @@ -import 'package:test/test.dart'; -import 'server.dart' as server; - -main() async { - group("server", server.main); -} \ No newline at end of file diff --git a/test/common.dart b/test/common.dart deleted file mode 100644 index bf4492f1..00000000 --- a/test/common.dart +++ /dev/null @@ -1,20 +0,0 @@ -import 'dart:async'; -import 'dart:io'; -import 'package:angel_framework/angel_framework.dart'; -import 'package:angel_framework/src/defs.dart'; - -class Todo extends MemoryModel { - String text; - String when; - - Todo({String this.text, String this.when}); -} - -Future startTestServer(Angel app) async { - var host = InternetAddress.LOOPBACK_IP_V4; - var port = 3000; - - await app.startServer(host, port); - app.properties["ws_url"] = "ws://${host.address}:$port/ws"; - print("Test server listening on ${host.address}:$port"); -} diff --git a/test/controller/common.dart b/test/controller/common.dart new file mode 100644 index 00000000..4739f7d3 --- /dev/null +++ b/test/controller/common.dart @@ -0,0 +1,17 @@ +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_websocket/server.dart'; + +class Game { + final String playerOne, playerTwo; + + const Game({this.playerOne, this.playerTwo}); +} + +@Expose('/game') +class GameController extends WebSocketController { + @ExposeWs('search') + search(WebSocketContext socket) async { + print('OMG ok'); + socket.send('searched', 'poop'); + } +} \ No newline at end of file diff --git a/test/controller/io_test.dart b/test/controller/io_test.dart new file mode 100644 index 00000000..be211bbe --- /dev/null +++ b/test/controller/io_test.dart @@ -0,0 +1,62 @@ +import 'dart:io'; +import 'package:angel_diagnostics/angel_diagnostics.dart' as srv; +import 'package:angel_framework/angel_framework.dart' as srv; +import 'package:angel_websocket/io.dart' as ws; +import 'package:angel_websocket/server.dart' as srv; +import 'package:test/test.dart'; +import 'common.dart'; + +main() { + srv.Angel app; + ws.WebSockets client; + srv.AngelWebSocket websockets; + HttpServer server; + String url; + + setUp(() async { + app = new srv.Angel(); + + websockets = new srv.AngelWebSocket(debug: true) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets); + await app.configure(new GameController()); + + server = + await new srv.DiagnosticsServer(app, new File('log.txt')).startServer(); + url = 'ws://${server.address.address}:${server.port}/ws'; + + client = new ws.WebSockets(url); + await client.connect(); + + client + ..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client.close(); + await server.close(force: true); + app = null; + client = null; + server = null; + url = null; + }); + + group('controller.io', () { + test('search', () async { + client.send('search', new ws.WebSocketAction()); + var search = await client.onData.first; + print('First: $search'); + }); + }); +} diff --git a/test/server.dart b/test/server.dart deleted file mode 100644 index 79abb9ec..00000000 --- a/test/server.dart +++ /dev/null @@ -1,117 +0,0 @@ -import 'dart:async'; -import 'dart:io'; -import 'package:angel_framework/angel_framework.dart' as server; -import 'package:angel_websocket/cli.dart' as client; -import 'package:angel_websocket/server.dart'; -import 'package:json_god/json_god.dart' as god; -import 'package:test/test.dart'; -import 'common.dart'; - -main() { - server.Angel app; - client.WebSocketClient clientApp; - client.WebSocketService clientTodos; - Stream customEventStream; - Stream customEventStream2; - WebSocket socket; - AngelWebSocket webSocket = new AngelWebSocket("/ws"); - - setUp(() async { - app = new server.Angel(); - - app.use("/real", new FakeService(), hooked: false); - app.use("/api/todos", new server.MemoryService()); - await app - .service("api/todos") - .create(new Todo(text: "Clean your room", when: "now")); - - await app.configure(webSocket); - await app.configure((server.Angel app) async { - AngelWebSocket ws = app.container.make(AngelWebSocket); - - ws.onConnection.listen((WebSocketContext socket) { - socket.onData.listen((data) { - print("Data: $data"); - }); - - customEventStream = socket.on["custom"]; - }); - }); - await app.configure(new Custom2Controller()); - await app.configure(startTestServer); - - socket = await WebSocket.connect(app.properties["ws_url"]); - clientApp = new client.WebSocketClient(app.properties["ws_url"]); - await clientApp.connect(); - customEventStream2 = clientApp.on["custom2"]; - - clientTodos = clientApp.service("api/todos", type: Todo); - }); - - tearDown(() async { - await app.httpServer.close(force: true); - }); - - test("find all real-time services", () { - print(webSocket.servicesAlreadyWired); - expect(webSocket.servicesAlreadyWired, equals(["api/todos"])); - }); - - test("index", () async { - var action = new WebSocketAction(eventName: "api/todos::index"); - socket.add(god.serialize(action)); - - String json = await socket.first; - print(json); - - WebSocketEvent e = god.deserialize(json, outputType: WebSocketEvent); - expect(e.eventName, equals("api/todos::indexed")); - expect(e.data[0]["when"], equals("now")); - }); - - test("create", () async { - var todo = new Todo(text: "Finish the Angel framework", when: "2016"); - clientTodos.create(todo); - - var all = await clientTodos.onAllEvents.first; - var e = await clientTodos.onCreated.first; - print(god.serialize(e)); - - expect(all, equals(e)); - expect(e.eventName, equals("created")); - expect(e.data is Todo, equals(true)); - expect(e.data.text, equals(todo.text)); - expect(e.data.when, equals(todo.when)); - }); - - test("custom event via controller", () async { - clientApp.send("custom", {"hello": "world"}); - - var data = await customEventStream.first; - - expect(data["hello"], equals("world")); - }); - - test("custom event via ws controller", () async { - clientApp.send("custom2", {"hello": "world"}); - - var data = customEventStream2.first; - print("Received data from server: $data"); - }); -} - -class FakeService extends server.Service {} - -@server.Expose("/custom2") -class Custom2Controller extends WebSocketController { - @override - Future onConnect(WebSocketContext socket) async { - print( - "Got a WS connection from session #${socket.requestContext.session.id}!"); - } - - @ExposeWs("custom2") - void sayFoo(WebSocketContext socket, server.RequestContext req, AngelWebSocket ws) { - socket.send("custom2", {"franken": "stein"}); - } -} diff --git a/test/service/browser_test.dart b/test/service/browser_test.dart new file mode 100644 index 00000000..2a91c878 --- /dev/null +++ b/test/service/browser_test.dart @@ -0,0 +1,5 @@ +import 'package:test/test.dart'; + +main() { + group('service.browser', () {}); +} \ No newline at end of file diff --git a/test/service/common.dart b/test/service/common.dart new file mode 100644 index 00000000..fcd4ff6b --- /dev/null +++ b/test/service/common.dart @@ -0,0 +1,24 @@ +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/src/defs.dart'; +import 'package:angel_websocket/base_websocket_client.dart'; +import 'package:test/test.dart'; + +class Todo extends MemoryModel { + String text; + String when; + + Todo({String this.text, String this.when}); +} + +class TodoService extends MemoryService {} + +testIndex(BaseWebSocketClient client) async { + var Todos = client.service('api/todos'); + Todos.index(); + + var indexed = await Todos.onIndexed.first; + print('indexed: ${indexed.toJson()}'); + + expect(indexed.data, isList); + expect(indexed.data, isEmpty); +} diff --git a/test/service/io_test.dart b/test/service/io_test.dart new file mode 100644 index 00000000..5b8eb12b --- /dev/null +++ b/test/service/io_test.dart @@ -0,0 +1,56 @@ +import 'dart:io'; +import 'package:angel_diagnostics/angel_diagnostics.dart' as srv; +import 'package:angel_framework/angel_framework.dart' as srv; +import 'package:angel_websocket/io.dart' as ws; +import 'package:angel_websocket/server.dart' as srv; +import 'package:test/test.dart'; +import 'common.dart'; + +main() { + srv.Angel app; + ws.WebSockets client; + srv.AngelWebSocket websockets; + HttpServer server; + String url; + + setUp(() async { + app = new srv.Angel()..use('/api/todos', new TodoService()); + + websockets = new srv.AngelWebSocket(debug: true) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets); + server = + await new srv.DiagnosticsServer(app, new File('log.txt')).startServer(); + url = 'ws://${server.address.address}:${server.port}/ws'; + + client = new ws.WebSockets(url); + await client.connect(); + + client + ..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client.close(); + await server.close(force: true); + app = null; + client = null; + server = null; + url = null; + }); + + group('service.io', () { + test('index', () => testIndex(client)); + }); +}