This commit is contained in:
thosakwe 2016-12-23 15:57:46 -05:00
parent 7957a4beee
commit 7d1f0a9c65
20 changed files with 610 additions and 714 deletions

4
.anaylsis-options.yaml Normal file
View file

@ -0,0 +1,4 @@
analyzer:
strong-mode: true
exclude:
- .scripts-bin/**/*.dart

4
.gitignore vendored
View file

@ -25,4 +25,6 @@ doc/api/
# Don't commit pubspec lock file
# (Library packages only! Remove pattern if developing an application package)
pubspec.lock
.idea
.idea
log.txt

1
.travis.yml Normal file
View file

@ -0,0 +1 @@
language: dart

View file

@ -1,17 +1,73 @@
/// WebSocket plugin for Angel.
library angel_websocket;
const String ACTION_INDEX = 'index';
const String ACTION_READ = 'read';
const String ACTION_CREATE = 'create';
const String ACTION_MODIFY = 'modify';
const String ACTION_UPDATE = 'update';
const String ACTION_REMOVE = 'remove';
const String EVENT_ERROR = 'error';
const String EVENT_INDEXED = 'indexed';
const String EVENT_READ = 'read';
const String EVENT_CREATED = 'created';
const String EVENT_MODIFIED = 'modified';
const String EVENT_UPDATED = 'updated';
const String EVENT_REMOVED = 'removed';
/// The standard Angel service actions.
const List<String> ACTIONS = const [
ACTION_INDEX,
ACTION_READ,
ACTION_CREATE,
ACTION_MODIFY,
ACTION_UPDATE,
ACTION_REMOVE
];
/// The standard Angel service events.
const List<String> EVENTS = const [
EVENT_INDEXED,
EVENT_READ,
EVENT_CREATED,
EVENT_MODIFIED,
EVENT_UPDATED,
EVENT_REMOVED
];
/// A notification from the server that something has occurred.
class WebSocketEvent {
String eventName;
var data;
WebSocketEvent({String this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) =>
new WebSocketEvent(eventName: data['eventName'], data: data['data']);
Map toJson() {
return {'eventName': eventName, 'data': data};
}
}
/// A command sent to the server, usually corresponding to a service method.
class WebSocketAction {
String id;
String eventName;
var data;
var params;
WebSocketAction({String this.id, String this.eventName, this.data, this.params});
WebSocketAction(
{String this.id, String this.eventName, this.data, this.params});
factory WebSocketAction.fromJson(Map data) => new WebSocketAction(
id: data['id'],
eventName: data['eventName'],
data: data['data'],
params: data['params']);
Map toJson() {
return {'id': id, 'eventName': eventName, 'data': data, 'params': params};
}
}

View file

@ -1,44 +1,140 @@
import 'dart:async';
import 'dart:convert';
import 'package:angel_client/angel_client.dart';
import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'angel_websocket.dart';
export 'package:angel_client/angel_client.dart';
import 'package:angel_client/base_angel_client.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient {
WebSocketChannel _socket;
final StreamController _onData = new StreamController();
final StreamController<WebSocketEvent> _onMessage =
new StreamController<WebSocketEvent>();
final StreamController<AngelHttpException> _onError =
new StreamController<AngelHttpException>();
final StreamController<Map<String, WebSocketEvent>> _onServiceEvent =
new StreamController<Map<String, WebSocketEvent>>.broadcast();
final StreamController<WebSocketChannelException>
_onWebSocketChannelException =
new StreamController<WebSocketChannelException>();
/// A broadcast stream of data coming from the [socket].
///
/// Mostly just for internal use.
Stream get onData => _onData.stream;
/// Fired on errors.
Stream<AngelHttpException> get onError => _onError.stream;
/// Fired on all events.
Stream<WebSocketEvent> get onMessage => _onMessage.stream;
/// Fired whenever an event is fired by a service.
Stream<Map<String, WebSocketEvent>> get onServiceEvent =>
_onServiceEvent.stream;
/// Fired on [WebSocketChannelException]s.
Stream<WebSocketChannelException> get onWebSocketChannelException =>
_onWebSocketChannelException.stream;
/// The [WebSocketChannel] underneath this instance.
WebSocketChannel get socket => _socket;
BaseWebSocketClient(http.BaseClient client, String basePath)
: super(client, basePath);
: super(client, basePath) {}
Future<WebSocketChannel> connect();
@override
Future close() async => _socket.sink.close(status.goingAway);
/// Connects the WebSocket.
Future<WebSocketChannel> connect() async {
_socket = await getConnectedWebSocket();
listen();
return _socket;
}
/// Returns a new [WebSocketChannel], ready to be listened on.
///
/// This should be overriden by child classes, **NOT** [connect].
Future<WebSocketChannel> getConnectedWebSocket();
@override
BaseWebSocketService service<T>(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.toString().replaceAll(_straySlashes, '');
return new BaseWebSocketService(socket, this, uri,
deserializer: deserializer)..listen();
deserializer: deserializer);
}
/// Starts listening for data.
void listen() {
_socket.stream.listen((data) {
_onData.add(data);
if (data is WebSocketChannelException) {
_onWebSocketChannelException.add(data);
} else if (data is String) {
var json = JSON.decode(data);
if (json is Map) {
var event = new WebSocketEvent.fromJson(json);
_onMessage.add(event);
if (event.eventName == EVENT_ERROR) {
var error = new AngelHttpException.fromMap(event.data ?? {});
_onError.add(error);
} else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName
.split("::")
.where((str) => str.isNotEmpty)
.toList();
if (split.length >= 2) {
var serviceName = split[0], eventName = split[1];
_onServiceEvent.add({serviceName: event..eventName = eventName});
}
}
}
}
});
}
/// Serializes data to JSON.
serialize(x) => JSON.encode(x);
/// Alternative form of [send]ing an action.
void send(String eventName, WebSocketAction action) =>
sendAction(action..eventName = eventName);
/// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) {
socket.sink.add(serialize(action));
}
}
/// A [Service] that asynchronously interacts with the server.
class BaseWebSocketService extends Service {
/// The [BaseWebSocketClient] that spawned this service.
@override
final Angel app;
final AngelDeserializer deserializer;
final WebSocketChannel socket;
final String uri;
final BaseWebSocketClient app;
final StreamController<WebSocketEvent> _onMessage =
new StreamController<WebSocketEvent>();
final StreamController<WebSocketEvent> _onError =
/// Used to deserialize JSON into typed data.
final AngelDeserializer deserializer;
/// The [WebSocketChannel] to listen to, and send data across.
final WebSocketChannel socket;
/// The service path to listen to.
final String path;
final StreamController<WebSocketEvent> _onAllEvents =
new StreamController<WebSocketEvent>();
final StreamController<WebSocketEvent> _onIndexed =
new StreamController<WebSocketEvent>();
@ -52,17 +148,13 @@ class BaseWebSocketService extends Service {
new StreamController<WebSocketEvent>();
final StreamController<WebSocketEvent> _onRemoved =
new StreamController<WebSocketEvent>();
final WebSocketExtraneousEventHandler _on =
new WebSocketExtraneousEventHandler();
/// Use this to handle events that are not standard.
WebSocketExtraneousEventHandler get on => _on;
final WebSocketExtraneousEventHandler on =
new WebSocketExtraneousEventHandler();
/// Fired on all events.
Stream<WebSocketEvent> get onMessage => _onMessage.stream;
/// Fired on errors.
Stream<WebSocketEvent> get onError => _onError.stream;
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired on `index` events.
Stream<WebSocketEvent> get onIndexed => _onIndexed.stream;
@ -82,48 +174,116 @@ class BaseWebSocketService extends Service {
/// Fired on `removed` events.
Stream<WebSocketEvent> get onRemoved => _onRemoved.stream;
BaseWebSocketService(this.socket, this.app, this.uri, {this.deserializer});
BaseWebSocketService(this.socket, this.app, this.path, {this.deserializer}) {
listen();
}
/// Serializes an [action] to be sent over a WebSocket.
serialize(WebSocketAction action) => JSON.encode(action);
/// Deserializes data from a [WebSocketEvent].
deserialize(x) {
return deserializer != null ? deserializer(x) : x;
}
/// Deserializes the contents of an [event].
WebSocketEvent transformEvent(WebSocketEvent event) {
return event..data = deserialize(event.data);
}
/// Starts listening for events.
void listen() {
socket.stream.listen((message) {
print('Message: ${message.runtimeType}');
app.onServiceEvent.listen((map) {
if (map.containsKey(path)) {
var event = map[path];
var transformed = transformEvent(event);
_onAllEvents.add(event);
on._getStream(event.eventName).add(event);
switch (event.eventName) {
case EVENT_INDEXED:
_onIndexed.add(transformed);
break;
case EVENT_READ:
_onRead.add(transformed);
break;
case EVENT_CREATED:
_onCreated.add(transformed);
break;
case EVENT_MODIFIED:
_onModified.add(transformed);
break;
case EVENT_UPDATED:
_onUpdated.add(transformed);
break;
case EVENT_REMOVED:
_onRemoved.add(transformed);
break;
}
}
});
}
@override
Future<List> index([Map params]) {
// TODO: implement index
/// Sends the given [action] on the [socket].
void send(WebSocketAction action) {
socket.sink.add(serialize(action));
}
@override
Future read(id, [Map params]) {
// TODO: implement read
Future<List> index([Map params]) async {
socket.sink.add(serialize(new WebSocketAction(
eventName: '$path::${ACTION_INDEX}', params: params ?? {})));
return null;
}
@override
Future create(data, [Map params]) {
// TODO: implement create
Future read(id, [Map params]) async {
socket.sink
.add(serialize(new WebSocketAction(id: id, params: params ?? {})));
return null;
}
@override
Future modify(id, data, [Map params]) {
// TODO: implement modify
Future create(data, [Map params]) async {
socket.sink
.add(serialize(new WebSocketAction(data: data, params: params ?? {})));
return null;
}
@override
Future update(id, data, [Map params]) {
// TODO: implement update
Future modify(id, data, [Map params]) async {
socket.sink.add(serialize(
new WebSocketAction(id: id, data: data, params: params ?? {})));
return null;
}
@override
Future remove(id, [Map params]) {
// TODO: implement remove
Future update(id, data, [Map params]) async {
socket.sink.add(serialize(
new WebSocketAction(id: id, data: data, params: params ?? {})));
return null;
}
@override
Future remove(id, [Map params]) async {
socket.sink
.add(serialize(new WebSocketAction(id: id, params: params ?? {})));
return null;
}
}
/// Contains a dynamic Map of [WebSocketEvent] streams.
class WebSocketExtraneousEventHandler {
Map<String, StreamController<WebSocketEvent>> _events = {};
StreamController<WebSocketEvent> _getStream(String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();
return _events[index];
}
operator [](String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();

View file

@ -1,212 +1,50 @@
/// Browser WebSocket client library for the Angel framework.
library angel_websocket.browser;
import 'dart:async';
import 'dart:convert';
import 'dart:html';
import 'package:angel_client/angel_client.dart';
import 'package:angel_websocket/angel_websocket.dart';
import 'package:http/http.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/html.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'package:angel_websocket/angel_websocket.dart';
export 'angel_websocket.dart';
class WebSocketClient extends Angel {
WebSocket _socket;
Map<Pattern, List<WebSocketService>> _services = {};
WebSocket get _underlyingSocket => _socket;
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
WebSocketClient(String wsEndpoint) : super(wsEndpoint) {
_socket = new WebSocket(wsEndpoint);
_connect();
}
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
WebSockets(String path) : super(new http.Client(), path);
onData(data) {
var fromJson = JSON.decode(data);
var e = new WebSocketEvent(
eventName: fromJson['eventName'], data: fromJson['data']);
var split = e.eventName.split("::");
var serviceName = split[0];
var services = _services[serviceName];
@override
Future<WebSocketChannel> getConnectedWebSocket() {
var socket = new WebSocket(basePath);
var completer = new Completer<WebSocketChannel>();
if (serviceName == "error") {
throw new Exception("Server-side error.");
} else if (services != null) {
e.eventName = split[1];
socket
..onOpen.listen((_) {
if (!completer.isCompleted)
return completer.complete(new HtmlWebSocketChannel(socket));
})
..onError.listen((ErrorEvent e) {
if (!completer.isCompleted) return completer.completeError(e.error);
});
for (WebSocketService service in services) {
service._onAllEvents.add(e);
switch (e.eventName) {
case "indexed":
service._onIndexed.add(e);
break;
case "read":
service._onRead.add(e);
break;
case "created":
service._onCreated.add(e);
break;
case "modified":
service._onModified.add(e);
break;
case "updated":
service._onUpdated.add(e);
break;
case "error":
service._onRemoved.add(e);
break;
case "error":
service._onError.add(e);
break;
default:
if (service._on._events.containsKey(e.eventName))
service._on._events[e.eventName].add(e);
break;
}
}
}
}
void _connect() {
_socket.onMessage.listen((MessageEvent event) {
onData(event.data);
});
}
void send(String eventName, data) {
_socket.send(JSON.encode({"eventName": eventName, "data": data}));
return completer.future;
}
@override
Service service(Pattern path, {Type type}) {
var service =
new WebSocketService._base(path.toString(), this, _socket, type);
if (_services[path.toString()] == null) _services[path.toString()] = [];
_services[path.toString()].add(service);
return service;
WebSocketsService service<T>(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.replaceAll(_straySlashes, '');
return new WebSocketsService(socket, this, uri, T != dynamic ? T : type);
}
}
class WebSocketExtraneousEventHandler {
Map<String, StreamController<WebSocketEvent>> _events = {};
class WebSocketsService extends BaseWebSocketService {
final Type type;
operator [](String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();
return _events[index].stream;
}
}
class _WebSocketServiceTransformer
implements StreamTransformer<WebSocketEvent, WebSocketEvent> {
Type _outputType;
_WebSocketServiceTransformer.base(this._outputType);
@override
Stream<WebSocketEvent> bind(Stream<WebSocketEvent> stream) {
var _stream = new StreamController<WebSocketEvent>();
stream.listen((WebSocketEvent e) {
/* if (_outputType != null && e.eventName != "error")
e.data = god.deserialize(god.serialize(e.data), outputType: _outputType);
*/
_stream.add(e);
});
return _stream.stream;
}
}
class WebSocketService extends Service {
Type _outputType;
String _path;
_WebSocketServiceTransformer _transformer;
WebSocket connection;
WebSocketExtraneousEventHandler _on = new WebSocketExtraneousEventHandler();
var _onAllEvents = new StreamController<WebSocketEvent>();
var _onError = new StreamController<WebSocketEvent>();
var _onIndexed = new StreamController<WebSocketEvent>();
var _onRead = new StreamController<WebSocketEvent>();
var _onCreated = new StreamController<WebSocketEvent>();
var _onModified = new StreamController<WebSocketEvent>();
var _onUpdated = new StreamController<WebSocketEvent>();
var _onRemoved = new StreamController<WebSocketEvent>();
WebSocketExtraneousEventHandler get on => _on;
Stream<WebSocketEvent> get onAllEvents =>
_onAllEvents.stream.transform(_transformer);
Stream<WebSocketEvent> get onError => _onError.stream;
Stream<WebSocketEvent> get onIndexed =>
_onIndexed.stream.transform(_transformer);
Stream<WebSocketEvent> get onRead => _onRead.stream.transform(_transformer);
Stream<WebSocketEvent> get onCreated =>
_onCreated.stream.transform(_transformer);
Stream<WebSocketEvent> get onModified =>
_onModified.stream.transform(_transformer);
Stream<WebSocketEvent> get onUpdated =>
_onUpdated.stream.transform(_transformer);
Stream<WebSocketEvent> get onRemoved =>
_onRemoved.stream.transform(_transformer);
WebSocketService._base(
String path, Angel app, WebSocket this.connection, Type _outputType) {
this._path = path;
this.app = app;
this._outputType = _outputType;
_transformer = new _WebSocketServiceTransformer.base(this._outputType);
}
_serialize(WebSocketAction action) {
var data = {"id": action.id, "eventName": action.eventName};
if (action.data != null) data["data"] = action.data;
if (action.params != null) data["params"] = action.params;
return JSON.encode(data);
}
@override
Future<List> index([Map params]) async {
connection.send(_serialize(
new WebSocketAction(eventName: "$_path::index", params: params)));
return null;
}
@override
Future read(id, [Map params]) async {
connection.send(_serialize(new WebSocketAction(
eventName: "$_path::read", id: id, params: params)));
}
@override
Future create(data, [Map params]) async {
connection.send(_serialize(new WebSocketAction(
eventName: "$_path::create", data: data, params: params)));
}
@override
Future modify(id, data, [Map params]) async {
connection.send(_serialize(new WebSocketAction(
eventName: "$_path::modify", id: id, data: data, params: params)));
}
@override
Future update(id, data, [Map params]) async {
connection.send(_serialize(new WebSocketAction(
eventName: "$_path::update", id: id, data: data, params: params)));
}
@override
Future remove(id, [Map params]) async {
connection.send(_serialize(new WebSocketAction(
eventName: "$_path::remove", id: id, params: params)));
}
WebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type)
: super(socket, app, uri);
}

View file

@ -1,224 +0,0 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:angel_client/angel_client.dart';
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_websocket/angel_websocket.dart';
import 'package:json_god/json_god.dart' as god;
export 'package:angel_client/angel_client.dart';
export 'package:angel_websocket/angel_websocket.dart';
class WebSocketClient extends Angel {
WebSocket _socket;
Map<Pattern, List<WebSocketService>> _services = {};
WebSocket get underlyingSocket => _socket;
_WebSocketEventTable on = new _WebSocketEventTable();
WebSocketClient(String wsEndpoint) : super(wsEndpoint);
onData(data) async {
var fromJson = JSON.decode(data);
print("a: $fromJson");
var e = new WebSocketEvent(
eventName: fromJson['eventName'], data: fromJson['data']);
print("b: $e");
var split = e.eventName.split("::");
var serviceName = split[0];
var services = _services[serviceName];
if (serviceName == "error") {
var exc = new srv.AngelHttpException(new Exception("Server-side error."));
exc.statusCode = e.data['statusCode'];
exc.message = e.data['message'];
exc.errors = exc.errors ?? [];
exc.errors.addAll(e.data['errors'] ?? []);
throw exc;
} else {
on._getStreamForEvent(serviceName).add(e.data);
if (services != null) {
e.eventName = split[1];
for (WebSocketService service in services) {
service._onAllEvents.add(e);
switch (e.eventName) {
case srv.HookedServiceEvent.INDEXED:
service._onIndexed.add(e);
break;
case srv.HookedServiceEvent.READ:
service._onRead.add(e);
break;
case srv.HookedServiceEvent.CREATED:
service._onCreated.add(e);
break;
case srv.HookedServiceEvent.MODIFIED:
service._onModified.add(e);
break;
case srv.HookedServiceEvent.UPDATED:
service._onUpdated.add(e);
break;
case srv.HookedServiceEvent.REMOVED:
service._onRemoved.add(e);
break;
case "error":
service._onError.add(e);
break;
default:
if (service._on._events.containsKey(e.eventName))
service._on._events[e.eventName].add(e);
break;
}
}
}
}
}
Future connect() async {
_socket = await WebSocket.connect(basePath);
_socket.listen(onData);
}
void send(String eventName, data) {
_socket.add(JSON.encode({"eventName": eventName, "data": data}));
}
@override
Service service(Pattern path, {Type type}) {
var service =
new WebSocketService._base(path.toString(), this, _socket, type);
if (_services[path.toString()] == null) _services[path.toString()] = [];
_services[path.toString()].add(service);
return service;
}
}
class WebSocketExtraneousEventHandler {
Map<String, StreamController<WebSocketEvent>> _events = {};
operator [](String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();
return _events[index].stream;
}
}
class _WebSocketServiceTransformer
implements StreamTransformer<WebSocketEvent, WebSocketEvent> {
Type _outputType;
_WebSocketServiceTransformer.base(this._outputType);
@override
Stream<WebSocketEvent> bind(Stream<WebSocketEvent> stream) {
var _stream = new StreamController<WebSocketEvent>();
stream.listen((WebSocketEvent e) {
if (_outputType != null && e.eventName != "error")
e.data =
god.deserialize(god.serialize(e.data), outputType: _outputType);
_stream.add(e);
});
return _stream.stream;
}
}
class WebSocketService extends Service {
Type _outputType;
String _path;
_WebSocketServiceTransformer _transformer;
WebSocket connection;
WebSocketExtraneousEventHandler _on = new WebSocketExtraneousEventHandler();
var _onAllEvents = new StreamController<WebSocketEvent>();
var _onError = new StreamController<WebSocketEvent>();
var _onIndexed = new StreamController<WebSocketEvent>();
var _onRead = new StreamController<WebSocketEvent>();
var _onCreated = new StreamController<WebSocketEvent>();
var _onModified = new StreamController<WebSocketEvent>();
var _onUpdated = new StreamController<WebSocketEvent>();
var _onRemoved = new StreamController<WebSocketEvent>();
WebSocketExtraneousEventHandler get on => _on;
Stream<WebSocketEvent> get onAllEvents =>
_onAllEvents.stream.transform(_transformer);
Stream<WebSocketEvent> get onError => _onError.stream;
Stream<WebSocketEvent> get onIndexed =>
_onIndexed.stream.transform(_transformer);
Stream<WebSocketEvent> get onRead => _onRead.stream.transform(_transformer);
Stream<WebSocketEvent> get onCreated =>
_onCreated.stream.transform(_transformer);
Stream<WebSocketEvent> get onModified =>
_onModified.stream.transform(_transformer);
Stream<WebSocketEvent> get onUpdated =>
_onUpdated.stream.transform(_transformer);
Stream<WebSocketEvent> get onRemoved =>
_onRemoved.stream.transform(_transformer);
WebSocketService._base(
String path, Angel app, WebSocket this.connection, Type _outputType) {
this._path = path;
this.app = app;
this._outputType = _outputType;
_transformer = new _WebSocketServiceTransformer.base(this._outputType);
}
@override
Future<List> index([Map params]) async {
connection.add(god.serialize(
new WebSocketAction(eventName: "$_path::index", params: params)));
return null;
}
@override
Future read(id, [Map params]) async {
connection.add(god.serialize(new WebSocketAction(
eventName: "$_path::read", id: id, params: params)));
}
@override
Future create(data, [Map params]) async {
connection.add(god.serialize(new WebSocketAction(
eventName: "$_path::create", data: data, params: params)));
}
@override
Future modify(id, data, [Map params]) async {
connection.add(god.serialize(new WebSocketAction(
eventName: "$_path::modify", id: id, data: data, params: params)));
}
@override
Future update(id, data, [Map params]) async {
connection.add(god.serialize(new WebSocketAction(
eventName: "$_path::update", id: id, data: data, params: params)));
}
@override
Future remove(id, [Map params]) async {
connection.add(god.serialize(new WebSocketAction(
eventName: "$_path::remove", id: id, params: params)));
}
}
class _WebSocketEventTable {
Map<String, StreamController<Map>> _handlers = {};
StreamController<Map> _getStreamForEvent(eventName) {
if (!_handlers.containsKey(eventName))
_handlers[eventName] = new StreamController<Map>.broadcast();
return _handlers[eventName];
}
Stream<Map> operator [](String key) => _getStreamForEvent(key).stream;
}

View file

@ -1,12 +1,14 @@
/// Command-line WebSocket client library for the Angel framework.
library angel_client.cli;
library angel_websocket.io;
import 'dart:async';
import 'dart:io';
import 'package:angel_client/angel_client.dart';
import 'package:http/http.dart' as http;
import 'package:json_god/json_god.dart' as god;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'angel_websocket.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';
@ -18,16 +20,20 @@ class WebSockets extends BaseWebSocketClient {
WebSockets(String path) : super(new http.Client(), path);
@override
Future<WebSocketChannel> connect() async {
return new IOWebSocketChannel.connect(basePath);
Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(basePath);
return new IOWebSocketChannel(socket);
}
@override
WebSocketsService service<T>(String path,
{Type type, AngelDeserializer deserializer}) {
String uri = path.replaceAll(_straySlashes, "");
String uri = path.replaceAll(_straySlashes, '');
return new WebSocketsService(socket, this, uri, T != dynamic ? T : type);
}
@override
serialize(x) => god.serialize(x);
}
class WebSocketsService extends BaseWebSocketService {
@ -35,4 +41,15 @@ class WebSocketsService extends BaseWebSocketService {
WebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type)
: super(socket, app, uri);
@override
serialize(WebSocketAction action) => god.serialize(action);
@override
deserialize(x) {
if (type != null && type != dynamic) {
return god.deserializeDatum(x, outputType: type);
} else
return super.deserialize(x);
}
}

View file

@ -1,3 +1,4 @@
/// Server-side support for WebSockets.
library angel_websocket.server;
import 'dart:async';
@ -13,30 +14,46 @@ export 'angel_websocket.dart';
part 'websocket_context.dart';
part 'websocket_controller.dart';
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket extends AngelPlugin {
Angel _app;
List<WebSocket> _clients = [];
StreamController<WebSocketContext> _onConnection =
new StreamController<WebSocketContext>();
StreamController<WebSocketContext> _onDisconnect =
new StreamController<WebSocketContext>();
final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>();
final StreamController _onData = new StreamController();
final StreamController<WebSocketContext> _onConnection =
new StreamController<WebSocketContext>.broadcast();
final StreamController<WebSocketContext> _onDisconnect =
new StreamController<WebSocketContext>.broadcast();
/// Include debug information, and send error information across WebSockets.
final bool debug;
/// A list of clients currently connected to this server via WebSockets.
List<WebSocket> get clients => new List.unmodifiable(_clients);
/// Services that have already been hooked to fire socket events.
List<String> get servicesAlreadyWired => new List.unmodifiable(_servicesAlreadyWired);
List<String> get servicesAlreadyWired =>
new List.unmodifiable(_servicesAlreadyWired);
/// The endpoint that users should connect a WebSocket to.
final String endpoint;
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired whenever a WebSocket sends data.
Stream get onData => _onData.stream;
/// Fired on incoming connections.
Stream<WebSocketContext> get onConnection => _onConnection.stream;
/// Fired when a user disconnects.
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
AngelWebSocket(String this.endpoint);
AngelWebSocket({this.endpoint: '/ws', this.debug: false});
_batchEvent(String path) {
return (HookedServiceEvent e) async {
@ -46,6 +63,7 @@ class AngelWebSocket extends AngelPlugin {
};
}
/// Slates an event to be dispatched.
Future batchEvent(WebSocketEvent event) async {
// Default implementation will just immediately fire events
_clients.forEach((client) {
@ -53,8 +71,10 @@ class AngelWebSocket extends AngelPlugin {
});
}
/// Returns a list of events yet to be sent.
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::");
@ -67,7 +87,7 @@ class AngelWebSocket extends AngelPlugin {
return socket.sendError(new AngelHttpException.NotFound(
message: "No service \"${split[0]}\" exists."));
var eventName = split[1];
var actionName = split[1];
var params = mergeMap([
god.deserializeDatum(action.params),
@ -75,39 +95,44 @@ class AngelWebSocket extends AngelPlugin {
]);
try {
if (eventName == "index") {
return socket.send("${split[0]}::" + HookedServiceEvent.INDEXED,
await service.index(params));
} else if (eventName == "read") {
return socket.send("${split[0]}::" + HookedServiceEvent.READ,
if (actionName == ACTION_INDEX) {
return socket.send(
"${split[0]}::" + EVENT_INDEXED, await service.index(params));
} else if (actionName == ACTION_READ) {
return socket.send("${split[0]}::" + EVENT_READ,
await service.read(action.id, params));
} else if (eventName == "create") {
} else if (actionName == ACTION_CREATE) {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.CREATED,
eventName: "${split[0]}::" + EVENT_CREATED,
data: await service.create(action.data, params));
} else if (eventName == "modify") {
} else if (actionName == ACTION_MODIFY) {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.MODIFIED,
eventName: "${split[0]}::" + EVENT_MODIFIED,
data: await service.modify(action.id, action.data, params));
} else if (eventName == "update") {
} else if (actionName == ACTION_UPDATE) {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.UPDATED,
eventName: "${split[0]}::" + EVENT_UPDATED,
data: await service.update(action.id, action.data, params));
} else if (eventName == "remove") {
} else if (actionName == ACTION_REMOVE) {
return new WebSocketEvent(
eventName: "${split[0]}::" + HookedServiceEvent.REMOVED,
eventName: "${split[0]}::" + EVENT_REMOVED,
data: await service.remove(action.id, params));
} else {
return socket.sendError(new AngelHttpException.MethodNotAllowed(
message: "Method Not Allowed: \"$eventName\""));
message: "Method Not Allowed: \"$actionName\""));
}
} catch (e) {
if (e is AngelHttpException) return socket.sendError(e);
return socket.sendError(new AngelHttpException(e));
} catch (e, st) {
if (e is AngelHttpException)
return socket.sendError(e);
else if (debug == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
}
}
/// Hooks a service up to have its events broadcasted.
hookupService(Pattern _path, HookedService service) {
String path = _path.toString();
var batch = _batchEvent(path);
@ -121,17 +146,16 @@ class AngelWebSocket extends AngelPlugin {
_servicesAlreadyWired.add(path);
}
Future onConnect(WebSocketContext socket) async {}
/// Runs before firing [onConnection].
Future handleConnect(WebSocketContext socket) async {}
onData(WebSocketContext socket, data) async {
/// Handles incoming data from a WebSocket.
handleData(WebSocketContext socket, data) async {
try {
socket._onData.add(data);
var fromJson = JSON.decode(data);
var action = new WebSocketAction(
id: fromJson['id'],
eventName: fromJson['eventName'],
data: fromJson['data'],
params: fromJson['params']);
var action = new WebSocketAction.fromJson(fromJson);
_onAction.add(action);
if (action.eventName == null ||
action.eventName is! String ||
@ -140,22 +164,17 @@ class AngelWebSocket extends AngelPlugin {
}
if (fromJson is Map && fromJson.containsKey("eventName")) {
socket._onAll.add(fromJson);
socket.on._getStreamForEvent(fromJson["eventName"].toString()).add(fromJson["data"]);
socket._onAction.add(new WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson["eventName"].toString())
.add(fromJson["data"]);
}
if (action.eventName.contains("::")) {
var split = action.eventName.split("::");
if (split.length >= 2) {
if ([
"index",
"read",
"create",
"modify",
"update",
"remove"
].contains(split[1])) {
if (ACTIONS.contains(split[1])) {
var event = handleAction(action, socket);
if (event is Future) event = await event;
@ -165,19 +184,24 @@ class AngelWebSocket extends AngelPlugin {
}
}
}
} catch (e) {
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (debug == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
}
}
/// Transforms a [HookedServiceEvent], so that it can be broadcasted.
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
return new WebSocketEvent(eventName: event.eventName, data: event.result);
}
/// Hooks any [HookedService]s that are not being broadcasted yet.
wireAllServices(Angel app) {
for (Pattern key in app.services.keys.where((x) {
return !_servicesAlreadyWired.contains(x) &&
@ -189,7 +213,7 @@ class AngelWebSocket extends AngelPlugin {
@override
Future call(Angel app) async {
this._app = app..container.singleton(this);
_app = app..container.singleton(this);
if (runtimeType != AngelWebSocket)
app.container.singleton(this, as: AngelWebSocket);
@ -202,24 +226,25 @@ class AngelWebSocket extends AngelPlugin {
});
app.get(endpoint, (RequestContext req, ResponseContext res) async {
if (!WebSocketTransformer.isUpgradeRequest(req.underlyingRequest))
if (!WebSocketTransformer.isUpgradeRequest(req.io))
throw new AngelHttpException.BadRequest();
res
..willCloseItself = true
..end();
var ws = await WebSocketTransformer.upgrade(req.underlyingRequest);
var ws = await WebSocketTransformer.upgrade(req.io);
_clients.add(ws);
var socket = new WebSocketContext(ws, req, res);
await onConnect(socket);
await handleConnect(socket);
_onConnection.add(socket);
req.params['socket'] = socket;
req.properties['socket'] = socket;
ws.listen((data) {
onData(socket, data);
_onData.add(data);
handleData(socket, data);
}, onDone: () {
_onDisconnect.add(socket);
_clients.remove(ws);

View file

@ -1,24 +1,40 @@
part of angel_websocket.server;
/// Represents a WebSocket session, with the original
/// [RequestContext] and [ResponseContext] attached.
class WebSocketContext {
StreamController<Map> _onAll = new StreamController<Map>.broadcast();
StreamController _onData = new StreamController.broadcast();
/// Use this to listen for events.
_WebSocketEventTable on = new _WebSocketEventTable();
Stream<Map> get onAll => _onAll.stream;
/// The underlying [WebSocket] instance.
final WebSocket io;
/// The original [RequestContext].
final RequestContext request;
/// The original [ResponseContext].
final ResponseContext response;
StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>();
StreamController _onData = new StreamController();
/// Fired on any [WebSocketAction];
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired when any data is sent through [io].
Stream get onData => _onData.stream;
WebSocket underlyingSocket;
RequestContext requestContext;
ResponseContext responseContext;
WebSocketContext(WebSocket this.underlyingSocket,
RequestContext this.requestContext, ResponseContext this.responseContext);
WebSocketContext(WebSocket this.io, RequestContext this.request,
ResponseContext this.response);
send(String eventName, data) {
underlyingSocket.add(
god.serialize(new WebSocketEvent(eventName: eventName, data: data)));
/// Sends an arbitrary [WebSocketEvent];
void send(String eventName, data) {
io.add(god.serialize(new WebSocketEvent(eventName: eventName, data: data)));
}
sendError(AngelHttpException error) => send("error", error.toJson());
/// Sends an error event.
void sendError(AngelHttpException error) => send(EVENT_ERROR, error.toJson());
}
class _WebSocketEventTable {

View file

@ -9,17 +9,27 @@ class ExposeWs {
class WebSocketController extends Controller {
Map<String, MethodMirror> _handlers = {};
Map<String, Symbol> _handlerSymbols = {};
InstanceMirror _instanceMirror;
AngelWebSocket ws;
WebSocketController():super() {
_instanceMirror = reflect(this);
WebSocketController() : super();
void broadcast(String eventName, data) {
ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data));
}
onConnect(WebSocketContext socket) {}
onDisconnect(WebSocketContext socket) {}
onAction(WebSocketAction action, WebSocketContext socket) async {}
onData(data, WebSocketContext socket) {}
@override
Future call(Angel app) async {
await super.call(app);
InstanceMirror instanceMirror = reflect(this);
ClassMirror classMirror = reflectClass(this.runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) {
@ -38,51 +48,32 @@ class WebSocketController extends Controller {
AngelWebSocket ws = app.container.make(AngelWebSocket);
ws.onConnection.listen((socket) async {
socket.request
..inject('socket', socket)
..inject(WebSocketContext, socket);
await onConnect(socket);
socket.onData.listen(onData);
socket.onData.listen((data) => onData(data, socket));
socket.onAll.listen((Map data) async {
await onAllEvents(data);
socket.onAction.listen((WebSocketAction action) async {
await onAction(action, socket);
if (_handlers.containsKey(data["eventName"])) {
var methodMirror = _handlers[data["eventName"]];
if (_handlers.containsKey(action.eventName)) {
try {
// Load parameters, and execute
List args = [];
var methodMirror = _handlers[action.eventName];
var fn = instanceMirror.getField(methodMirror.simpleName).reflectee;
for (int i = 0; i < methodMirror.parameters.length; i++) {
ParameterMirror parameter = methodMirror.parameters[i];
String name = MirrorSystem.getName(parameter.simpleName);
if (parameter.type.reflectedType == RequestContext ||
name == "req")
args.add(socket.requestContext);
else if (parameter.type.reflectedType == ResponseContext ||
name == "res")
args.add(socket.responseContext);
else if (parameter.type == AngelWebSocket)
args.add(socket);
else {
if (socket.requestContext.params.containsKey(name)) {
args.add(socket.requestContext.params[name]);
} else {
try {
args.add(app.container.make(parameter.type.reflectedType));
continue;
} catch (e) {
throw new AngelHttpException.BadRequest(
message: "Missing parameter '$name'");
}
}
}
}
await _instanceMirror.invoke(_handlerSymbols[data["eventName"]], args);
} catch (e) {
return app.runContained(fn, socket.request, socket.response);
} catch (e, st) {
// Send an error
if (e is AngelHttpException)
socket.sendError(e);
else if (ws.debug == true)
socket.sendError(new AngelHttpException(e,
message: e.toString(),
stackTrace: st,
errors: [st.toString()]));
else
socket.sendError(new AngelHttpException(e));
}
@ -92,16 +83,4 @@ class WebSocketController extends Controller {
ws.onDisconnection.listen(onDisconnect);
}
void broadcast(String eventName, data) {
ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data));
}
Future onConnect(WebSocketContext socket) async {}
Future onDisconnect(WebSocketContext socket) async {}
Future onAllEvents(Map data) async {}
void onData(data) {}
}

View file

@ -12,5 +12,6 @@ dependencies:
uuid: "^0.5.3"
web_socket_channel: "^1.0.0"
dev_dependencies:
angel_diagnostics: "^1.0.0-dev"
http: "^0.11.3"
test: "^0.12.15"

View file

@ -1,6 +0,0 @@
import 'package:test/test.dart';
import 'server.dart' as server;
main() async {
group("server", server.main);
}

View file

@ -1,20 +0,0 @@
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/src/defs.dart';
class Todo extends MemoryModel {
String text;
String when;
Todo({String this.text, String this.when});
}
Future startTestServer(Angel app) async {
var host = InternetAddress.LOOPBACK_IP_V4;
var port = 3000;
await app.startServer(host, port);
app.properties["ws_url"] = "ws://${host.address}:$port/ws";
print("Test server listening on ${host.address}:$port");
}

View file

@ -0,0 +1,17 @@
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_websocket/server.dart';
class Game {
final String playerOne, playerTwo;
const Game({this.playerOne, this.playerTwo});
}
@Expose('/game')
class GameController extends WebSocketController {
@ExposeWs('search')
search(WebSocketContext socket) async {
print('OMG ok');
socket.send('searched', 'poop');
}
}

View file

@ -0,0 +1,62 @@
import 'dart:io';
import 'package:angel_diagnostics/angel_diagnostics.dart' as srv;
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:test/test.dart';
import 'common.dart';
main() {
srv.Angel app;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
setUp(() async {
app = new srv.Angel();
websockets = new srv.AngelWebSocket(debug: true)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets);
await app.configure(new GameController());
server =
await new srv.DiagnosticsServer(app, new File('log.txt')).startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
await client.connect();
client
..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client.close();
await server.close(force: true);
app = null;
client = null;
server = null;
url = null;
});
group('controller.io', () {
test('search', () async {
client.send('search', new ws.WebSocketAction());
var search = await client.onData.first;
print('First: $search');
});
});
}

View file

@ -1,117 +0,0 @@
import 'dart:async';
import 'dart:io';
import 'package:angel_framework/angel_framework.dart' as server;
import 'package:angel_websocket/cli.dart' as client;
import 'package:angel_websocket/server.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:test/test.dart';
import 'common.dart';
main() {
server.Angel app;
client.WebSocketClient clientApp;
client.WebSocketService clientTodos;
Stream<Map> customEventStream;
Stream<Map> customEventStream2;
WebSocket socket;
AngelWebSocket webSocket = new AngelWebSocket("/ws");
setUp(() async {
app = new server.Angel();
app.use("/real", new FakeService(), hooked: false);
app.use("/api/todos", new server.MemoryService<Todo>());
await app
.service("api/todos")
.create(new Todo(text: "Clean your room", when: "now"));
await app.configure(webSocket);
await app.configure((server.Angel app) async {
AngelWebSocket ws = app.container.make(AngelWebSocket);
ws.onConnection.listen((WebSocketContext socket) {
socket.onData.listen((data) {
print("Data: $data");
});
customEventStream = socket.on["custom"];
});
});
await app.configure(new Custom2Controller());
await app.configure(startTestServer);
socket = await WebSocket.connect(app.properties["ws_url"]);
clientApp = new client.WebSocketClient(app.properties["ws_url"]);
await clientApp.connect();
customEventStream2 = clientApp.on["custom2"];
clientTodos = clientApp.service("api/todos", type: Todo);
});
tearDown(() async {
await app.httpServer.close(force: true);
});
test("find all real-time services", () {
print(webSocket.servicesAlreadyWired);
expect(webSocket.servicesAlreadyWired, equals(["api/todos"]));
});
test("index", () async {
var action = new WebSocketAction(eventName: "api/todos::index");
socket.add(god.serialize(action));
String json = await socket.first;
print(json);
WebSocketEvent e = god.deserialize(json, outputType: WebSocketEvent);
expect(e.eventName, equals("api/todos::indexed"));
expect(e.data[0]["when"], equals("now"));
});
test("create", () async {
var todo = new Todo(text: "Finish the Angel framework", when: "2016");
clientTodos.create(todo);
var all = await clientTodos.onAllEvents.first;
var e = await clientTodos.onCreated.first;
print(god.serialize(e));
expect(all, equals(e));
expect(e.eventName, equals("created"));
expect(e.data is Todo, equals(true));
expect(e.data.text, equals(todo.text));
expect(e.data.when, equals(todo.when));
});
test("custom event via controller", () async {
clientApp.send("custom", {"hello": "world"});
var data = await customEventStream.first;
expect(data["hello"], equals("world"));
});
test("custom event via ws controller", () async {
clientApp.send("custom2", {"hello": "world"});
var data = customEventStream2.first;
print("Received data from server: $data");
});
}
class FakeService extends server.Service {}
@server.Expose("/custom2")
class Custom2Controller extends WebSocketController {
@override
Future onConnect(WebSocketContext socket) async {
print(
"Got a WS connection from session #${socket.requestContext.session.id}!");
}
@ExposeWs("custom2")
void sayFoo(WebSocketContext socket, server.RequestContext req, AngelWebSocket ws) {
socket.send("custom2", {"franken": "stein"});
}
}

View file

@ -0,0 +1,5 @@
import 'package:test/test.dart';
main() {
group('service.browser', () {});
}

24
test/service/common.dart Normal file
View file

@ -0,0 +1,24 @@
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/src/defs.dart';
import 'package:angel_websocket/base_websocket_client.dart';
import 'package:test/test.dart';
class Todo extends MemoryModel {
String text;
String when;
Todo({String this.text, String this.when});
}
class TodoService extends MemoryService<Todo> {}
testIndex(BaseWebSocketClient client) async {
var Todos = client.service('api/todos');
Todos.index();
var indexed = await Todos.onIndexed.first;
print('indexed: ${indexed.toJson()}');
expect(indexed.data, isList);
expect(indexed.data, isEmpty);
}

56
test/service/io_test.dart Normal file
View file

@ -0,0 +1,56 @@
import 'dart:io';
import 'package:angel_diagnostics/angel_diagnostics.dart' as srv;
import 'package:angel_framework/angel_framework.dart' as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:test/test.dart';
import 'common.dart';
main() {
srv.Angel app;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
setUp(() async {
app = new srv.Angel()..use('/api/todos', new TodoService());
websockets = new srv.AngelWebSocket(debug: true)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets);
server =
await new srv.DiagnosticsServer(app, new File('log.txt')).startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
await client.connect();
client
..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client.close();
await server.close(force: true);
app = null;
client = null;
server = null;
url = null;
});
group('service.io', () {
test('index', () => testIndex(client));
});
}