From 7957a4beeeb2696e41b30d629a944c14f1871cd2 Mon Sep 17 00:00:00 2001 From: thosakwe Date: Fri, 23 Dec 2016 05:47:21 -0500 Subject: [PATCH] :) --- lib/base_websocket_client.dart | 133 +++++++++++++++++++++++++++++++++ lib/io.dart | 38 ++++++++++ lib/server.dart | 24 ++++-- lib/websocket_context.dart | 2 +- pubspec.yaml | 16 ++-- test/packages | 1 - 6 files changed, 198 insertions(+), 16 deletions(-) create mode 100644 lib/base_websocket_client.dart create mode 100644 lib/io.dart delete mode 120000 test/packages diff --git a/lib/base_websocket_client.dart b/lib/base_websocket_client.dart new file mode 100644 index 00000000..98f45a67 --- /dev/null +++ b/lib/base_websocket_client.dart @@ -0,0 +1,133 @@ +import 'dart:async'; +import 'package:angel_client/angel_client.dart'; +import 'package:http/src/base_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/status.dart'; +import 'angel_websocket.dart'; +export 'package:angel_client/angel_client.dart'; +import 'package:angel_client/base_angel_client.dart'; + +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +abstract class BaseWebSocketClient extends BaseAngelClient { + WebSocketChannel _socket; + + /// The [WebSocketChannel] underneath this instance. + WebSocketChannel get socket => _socket; + + BaseWebSocketClient(http.BaseClient client, String basePath) + : super(client, basePath); + + Future connect(); + + @override + BaseWebSocketService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.toString().replaceAll(_straySlashes, ''); + return new BaseWebSocketService(socket, this, uri, + deserializer: deserializer)..listen(); + } +} + +class BaseWebSocketService extends Service { + @override + final Angel app; + final AngelDeserializer deserializer; + final WebSocketChannel socket; + final String uri; + + final StreamController _onMessage = + new StreamController(); + final StreamController _onError = + new StreamController(); + final StreamController _onIndexed = + new StreamController(); + final StreamController _onRead = + new StreamController(); + final StreamController _onCreated = + new StreamController(); + final StreamController _onModified = + new StreamController(); + final StreamController _onUpdated = + new StreamController(); + final StreamController _onRemoved = + new StreamController(); + final WebSocketExtraneousEventHandler _on = + new WebSocketExtraneousEventHandler(); + + /// Use this to handle events that are not standard. + WebSocketExtraneousEventHandler get on => _on; + + /// Fired on all events. + Stream get onMessage => _onMessage.stream; + + /// Fired on errors. + Stream get onError => _onError.stream; + + /// Fired on `index` events. + Stream get onIndexed => _onIndexed.stream; + + /// Fired on `read` events. + Stream get onRead => _onRead.stream; + + /// Fired on `created` events. + Stream get onCreated => _onCreated.stream; + + /// Fired on `modified` events. + Stream get onModified => _onModified.stream; + + /// Fired on `updated` events. + Stream get onUpdated => _onUpdated.stream; + + /// Fired on `removed` events. + Stream get onRemoved => _onRemoved.stream; + + BaseWebSocketService(this.socket, this.app, this.uri, {this.deserializer}); + + void listen() { + socket.stream.listen((message) { + print('Message: ${message.runtimeType}'); + }); + } + + @override + Future index([Map params]) { + // TODO: implement index + } + + @override + Future read(id, [Map params]) { + // TODO: implement read + } + + @override + Future create(data, [Map params]) { + // TODO: implement create + } + + @override + Future modify(id, data, [Map params]) { + // TODO: implement modify + } + + @override + Future update(id, data, [Map params]) { + // TODO: implement update + } + + @override + Future remove(id, [Map params]) { + // TODO: implement remove + } +} + +class WebSocketExtraneousEventHandler { + Map> _events = {}; + + operator [](String index) { + if (_events[index] == null) + _events[index] = new StreamController(); + + return _events[index].stream; + } +} diff --git a/lib/io.dart b/lib/io.dart new file mode 100644 index 00000000..f7becf8c --- /dev/null +++ b/lib/io.dart @@ -0,0 +1,38 @@ +/// Command-line WebSocket client library for the Angel framework. +library angel_client.cli; + +import 'dart:async'; +import 'package:angel_client/angel_client.dart'; +import 'package:http/http.dart' as http; +import 'package:json_god/json_god.dart' as god; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'base_websocket_client.dart'; +export 'package:angel_client/angel_client.dart'; +export 'angel_websocket.dart'; + +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + WebSockets(String path) : super(new http.Client(), path); + + @override + Future connect() async { + return new IOWebSocketChannel.connect(basePath); + } + + @override + WebSocketsService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.replaceAll(_straySlashes, ""); + return new WebSocketsService(socket, this, uri, T != dynamic ? T : type); + } +} + +class WebSocketsService extends BaseWebSocketService { + final Type type; + + WebSocketsService(WebSocketChannel socket, Angel app, String uri, this.type) + : super(socket, app, uri); +} diff --git a/lib/server.dart b/lib/server.dart index a9a2b45a..c1397e01 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -17,13 +17,23 @@ class AngelWebSocket extends AngelPlugin { Angel _app; List _clients = []; StreamController _onConnection = - new StreamController.broadcast(); + new StreamController(); StreamController _onDisconnect = - new StreamController.broadcast(); - List get clients => new List.from(_clients, growable: false); - List servicesAlreadyWired = []; - String endpoint; + new StreamController(); + final List _servicesAlreadyWired = []; + + /// A list of clients currently connected to this server via WebSockets. + List get clients => new List.unmodifiable(_clients); + + /// Services that have already been hooked to fire socket events. + List get servicesAlreadyWired => new List.unmodifiable(_servicesAlreadyWired); + + final String endpoint; + + /// Fired on incoming connections. Stream get onConnection => _onConnection.stream; + + /// Fired when a user disconnects. Stream get onDisconnection => _onDisconnect.stream; AngelWebSocket(String this.endpoint); @@ -108,7 +118,7 @@ class AngelWebSocket extends AngelPlugin { ..afterUpdated.listen(batch) ..afterRemoved.listen(batch); - servicesAlreadyWired.add(path); + _servicesAlreadyWired.add(path); } Future onConnect(WebSocketContext socket) async {} @@ -170,7 +180,7 @@ class AngelWebSocket extends AngelPlugin { wireAllServices(Angel app) { for (Pattern key in app.services.keys.where((x) { - return !servicesAlreadyWired.contains(x) && + return !_servicesAlreadyWired.contains(x) && app.services[x] is HookedService; })) { hookupService(key, app.services[key]); diff --git a/lib/websocket_context.dart b/lib/websocket_context.dart index 8cdb2302..0151eda9 100644 --- a/lib/websocket_context.dart +++ b/lib/websocket_context.dart @@ -18,7 +18,7 @@ class WebSocketContext { god.serialize(new WebSocketEvent(eventName: eventName, data: data))); } - sendError(AngelHttpException error) => send("error", error); + sendError(AngelHttpException error) => send("error", error.toJson()); } class _WebSocketEventTable { diff --git a/pubspec.yaml b/pubspec.yaml index 7969b5d5..45326092 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,14 +1,16 @@ name: angel_websocket description: WebSocket plugin for Angel +environment: + sdk: ">=1.19.0" version: 1.0.0-dev+5 author: Tobe O homepage: https://github.com/angel-dart/angel_websocket dependencies: - angel_client: ">=1.0.0-dev <2.0.0" - angel_framework: ">=1.0.0-dev < 2.0.0" - json_god: ">=2.0.0-beta <3.0.0" - jwt: ">=0.1.4 <1.0.0" - uuid: ">=0.5.3 <1.0.0" + angel_auth: "^1.0.0-dev" + angel_client: "^1.0.0-dev" + angel_framework: "^1.0.0-dev" + uuid: "^0.5.3" + web_socket_channel: "^1.0.0" dev_dependencies: - http: ">= 0.11.3 < 0.12.0" - test: ">= 0.12.13 < 0.13.0" + http: "^0.11.3" + test: "^0.12.15" diff --git a/test/packages b/test/packages deleted file mode 120000 index a16c4050..00000000 --- a/test/packages +++ /dev/null @@ -1 +0,0 @@ -../packages \ No newline at end of file