diff --git a/packages/websocket/.gitignore b/packages/websocket/.gitignore new file mode 100644 index 00000000..b37504a1 --- /dev/null +++ b/packages/websocket/.gitignore @@ -0,0 +1,76 @@ +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub +.buildlog +.packages +.project +.pub/ +build/ +**/packages/ + +# Files created by dart2js +# (Most Dart developers will use pub build to compile Dart, use/modify these +# rules if you intend to use dart2js directly +# Convention is to use extension '.dart.js' for Dart compiled to Javascript to +# differentiate from explicit Javascript files) +*.dart.js +*.part.js +*.js.deps +*.js.map +*.info.json + +# Directory created by dartdoc +doc/api/ + +# Don't commit pubspec lock file +# (Library packages only! Remove pattern if developing an application package) +pubspec.lock +.idea + +log.txt +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +.dart_tool \ No newline at end of file diff --git a/packages/websocket/.travis.yml b/packages/websocket/.travis.yml new file mode 100644 index 00000000..0eb6fac6 --- /dev/null +++ b/packages/websocket/.travis.yml @@ -0,0 +1,4 @@ +language: dart +dart: + - dev + - stable diff --git a/packages/websocket/CHANGELOG.md b/packages/websocket/CHANGELOG.md new file mode 100644 index 00000000..9bd7428c --- /dev/null +++ b/packages/websocket/CHANGELOG.md @@ -0,0 +1,57 @@ +# 2.0.3 +* Remove `WebSocketController.plugin`. +* Remove any unawaited futures. + +# 2.0.2 +* Update `stream_channel` to `2.0.0`. +* Use `angel_framework^@2.0.0-rc.0`. + +# 2.0.1 +* Add `reconnectOnClose` and `reconnectinterval` parameters in top-level `WebSockets` constructors. +* Close `WebSocketExtraneousEventHandler`. +* Add onAuthenticated to server-side. + +# 2.0.0 +* Update to work with `client@2.0.0`. + +# 2.0.0-alpha.8 +* Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever). + +# 2.0.0-alpha.7 +* Replace `WebSocketSynchronizer` with `StreamChannel`. + +# 2.0.0-alpha.6 +* Explicit import of `import 'package:http/io_client.dart' as http;` + +# 2.0.0-alpha.5 +* Update `http` dependency. + +# 2.0.0-alpha.4 +* Remove `package:json_god`. +* Make `WebSocketContext` take any `StreamChannel`. +* Strong typing updates. + +# 2.0.0-alpha.3 +* Directly import Angel HTTP. + +# 2.0.0-alpha.2 +* Updated for the next version of `angel_client`. + +# 2.0.0-alpha.1 +* Refactorings for updated Angel 2 versions. +* Remove `package:dart2_constant`. + +# 2.0.0-alpha +* Depend on Dart 2 and Angel 2. + +# 1.1.2 +* Dart 2 updates. +* Added `handleClient`, which is nice for external implementations +that plug into `AngelWebSocket`. + +# 1.1.1 +* Deprecated `unwrap`. +* Service streams now pump out `e.data`, rather than the actual event. + +# 1.1.0+1 +* Added `unwrap`. \ No newline at end of file diff --git a/packages/websocket/LICENSE b/packages/websocket/LICENSE new file mode 100644 index 00000000..eb4ce33e --- /dev/null +++ b/packages/websocket/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 angel-dart + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/websocket/README.md b/packages/websocket/README.md new file mode 100644 index 00000000..3f09a41e --- /dev/null +++ b/packages/websocket/README.md @@ -0,0 +1,163 @@ +# angel_websocket +[![Pub](https://img.shields.io/pub/v/angel_websocket.svg)](https://pub.dartlang.org/packages/angel_websocket) +[![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket) + +WebSocket plugin for Angel. + +This plugin broadcasts events from hooked services via WebSockets. + +In addition, it adds itself to the app's IoC container as `AngelWebSocket`, so that it can be used +in controllers as well. + +WebSocket contexts are add to `req.properties` as `'socket'`. + + +# Usage + +**Server-side** + +```dart +import "package:angel_framework/angel_framework.dart"; +import "package:angel_websocket/server.dart"; + +main() async { + var app = new Angel(); + + var ws = new AngelWebSocket(); + + // This is a plug-in. It hooks all your services, + // to automatically broadcast events. + await app.configure(ws.configureServer); + + // Listen for requests at `/ws`. + app.all('/ws', ws.handleRequest); +} + +``` + +Filtering events is easy with hooked services. Just return a `bool`, whether +synchronously or asynchronously. + +```dart +myService.properties['ws:filter'] = (HookedServiceEvent e, WebSocketContext socket) async { + return true; +} + +myService.index({ + 'ws:filter': (e, socket) => ...; +}); +``` + +**Adding Handlers within a Controller** + +`WebSocketController` extends a normal `Controller`, but also listens to WebSockets. + +```dart +import 'dart:async'; +import "package:angel_framework/angel_framework.dart"; +import "package:angel_websocket/server.dart"; + +@Expose("/") +class MyController extends WebSocketController { + // A reference to the WebSocket plug-in is required. + MyController(AngelWebSocket ws):super(ws); + + @override + void onConnect(WebSocketContext socket) { + // On connect... + } + + // Dependency injection works, too.. + @ExposeWs("read_message") + void sendMessage(WebSocketContext socket, WebSocketAction action, Db db) async { + socket.send( + "found_message", + db.collection("messages").findOne(where.id(action.data['message_id']))); + } + + // Event filtering + @ExposeWs("foo") + void foo() { + broadcast(new WebSocketEvent(...), filter: (socket) async => ...); + } +} +``` + +**Client Use** + +This repo also provides two client libraries `browser` and `io` that extend the base +`angel_client` interface, and allow you to use a very similar API on the client to that of +the server. + +The provided clients also automatically try to reconnect their WebSockets when disconnected, +which means you can restart your development server without having to reload browser windows. + +They also provide streams of data that pump out filtered data as it comes in from the server. + +Clients can even perform authentication over WebSockets. + +**In the Browser** + +```dart +import "package:angel_websocket/browser.dart"; + +main() async { + Angel app = new WebSockets("/ws"); + await app.connect(); + + var Cars = app.service("api/cars"); + + Cars.onCreated.listen((car) => print("New car: $car")); + + // Happens asynchronously + Cars.create({"brand": "Toyota"}); + + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); + + // Listen for arbitrary events + app.on['custom_event'].listen((event) { + // For example, this might be sent by a + // WebSocketController. + print('Hi!'); + }); +} +``` + +**CLI Client** + +```dart +import "package:angel_framework/common.dart"; +import "package:angel_websocket/io.dart"; + +// You can include these in a shared file and access on both client and server +class Car extends Model { + int year; + String brand, make; + + Car({this.year, this.brand, this.make}); + + @override String toString() => "$year $brand $make"; +} + +main() async { + Angel app = new WebSockets("/ws"); + + // Wait for WebSocket connection... + await app.connect(); + + var Cars = app.service("api/cars", type: Car); + + Cars.onCreated.listen((Car car) { + // Automatically deserialized into a car :) + // + // I just bought a new 2016 Toyota Camry! + print("I just bought a new $car!"); + }); + + // Happens asynchronously + Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"}); + + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); +} \ No newline at end of file diff --git a/packages/websocket/analysis_options.yaml b/packages/websocket/analysis_options.yaml new file mode 100644 index 00000000..380eebc8 --- /dev/null +++ b/packages/websocket/analysis_options.yaml @@ -0,0 +1,4 @@ +include: package:pedantic/analysis_options.yaml +analyzer: + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/packages/websocket/dev.key b/packages/websocket/dev.key new file mode 100644 index 00000000..5d49ae7e --- /dev/null +++ b/packages/websocket/dev.key @@ -0,0 +1,29 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIE5DAcBgoqhkiG9w0BDAEBMA4ECL7L6rj6uEHGAgIIAASCBMLbucyfqAkgCbhP +xNSHYllPMAv/dsIjtnsBwepCXPGkCBCuOAw/2FaCHjN9hBqL5V7fkrKeaemhm2YE +ycPtlHJYPDf3kEkyMjdZ9rIY6kePGfQizs2uJPcXj4YPyQ4HsfVXpOicKfQrouf5 +Mze9bGzeMN065q3iP4dYUMwHAyZYteXCsanQNHlqvsWli0W+H8St8fdsXefZhnv1 +qVatKWdNdWQ9t5MuljgNU2Vv56sHKEYXI0yLxk2QUMk8KlJfnmt8foYUsnPUXHmc +gIjLKwwVkpdololnEHSNu0cEOUPowjgJru+uMpn7vdNl7TPEQ9jbEgdNg4JwoYzU +0nao8WzjaSp7kzvZz0VFwKnk5AjstGvvuAWckADdq23QElbn/mF7AG1m/TBpYxzF +gTt37UdndS/AcvVznWVVrRP5iTSIawdIwvqI4s7rqsoE0GCcak+RhchgAz2gWKkS +oODUo0JL6pPVbJ3l4ebbaO6c99nDVc8dViPtc1EkStJEJ2O4kI4xgLSCr4Y9ahKn +oAaoSkX7Xxq3aQm+BzqSpLjdGL8atsqR/YVOIHYIl3gThvP0NfZGx1xHyvO5mCdZ +kHxSA7tKWxauZ3eQ2clbnzeRsl4El0WMHy/5K1ovene4v7sunmoXVtghBC8hK6eh +zMO9orex2PNQ/VQC7HCvtytunOVx1lkSBoNo7hR70igg6rW9H7UyoAoBOwMpT1xa +J6V62nqruTKOqFNfur7aHJGpHGtDb5/ickHeYCyPTvmGp67u4wChzKReeg02oECe +d1E5FKAcIa8s9TVOB6Z+HvTRNQZu2PsI6TJnjQRowvY9DAHiWTlJZBBY/pko3hxX +TsIeybpvRdEHpDWv86/iqtw1hv9CUxS/8ZTWUgBo+osShHW79FeDASr9FC4/Zn76 +ZDERTgV4YWlW/klVWcG2lFo7jix+OPXAB+ZQavLhlN1xdWBcIz1AUWjAM4hdPylW +HCX4PB9CQIPl2E7F+Y2p6nMcMWSJVBi5UIH7E9LfaBguXSzMmTk2Fw5p1aOQ6wfN +goVAMVwi8ppAVs741PfHdZ295xMmK/1LCxz5DeAdD/tsA/SYfT753GotioDuC7im +EyJ5JyvTr5I6RFFBuqt3NlUb3Hp16wP3B2x9DZiB6jxr0l341/NHgsyeBXkuIy9j +ON2mvpBPCJhS8kgWo3G0UyyKnx64tcgpGuSvZhGwPz843B6AbYyE6pMRfSWRMkMS +YZYa+VNKhR4ixdj07ocFZEWLVjCH7kxkE8JZXKt8jKYmkWd0lS1QVjgaKlO6lRa3 +q6SPJkhW6pvqobvcqVNXwi1XuzpZeEbuh0B7OTekFTTxx5g9XeDl56M8SVQ1KEhT +Q1t7H2Nba18WCB7cf+6PN0F0K0Jz1Kq7ZWaqEI/grX1m4RQuvNF5807sB/QKMO/Z +Gz3NXvHg5xTJRd/567lxPGkor0cE7qD1EZfmJ2HrBYXQ91bhgA7LToBuMZo6ZRXH +QfsanjbP4FPLMiGdQigLjj3A35L/f4sQOOVac/sRaFnm7pzcxsMvyVU/YtvGcjYE +xaOOVnamg661Wo0wksXoDjeSz/JIyyKO3Gwp1FSm2wGLjjy/Ehmqcqy8rvHuf07w +AUukhVtTNn4= +-----END ENCRYPTED PRIVATE KEY----- \ No newline at end of file diff --git a/packages/websocket/dev.pem b/packages/websocket/dev.pem new file mode 100644 index 00000000..01756b25 --- /dev/null +++ b/packages/websocket/dev.pem @@ -0,0 +1,57 @@ +-----BEGIN CERTIFICATE----- +MIIDKTCCAhGgAwIBAgIJAOWmjTS+OnTEMA0GCSqGSIb3DQEBCwUAMBcxFTATBgNV +BAMMDGludGVybWVkaWF0ZTAeFw0xNTA1MTgwOTAwNDBaFw0yMzA4MDQwOTAwNDBa +MBQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC +AQoCggEBALlcwQJuzd+xH8QFgfJSn5tRlvhkldSX98cE7NiA602NBbnAVyUrkRXq +Ni75lgt0kwjYfA9z674m8WSVbgpLPintPCla9CYky1TH0keIs8Rz6cGWHryWEHiu +EDuljQynu2b3sAFuHu9nfWurbJwZnFakBKpdQ9m4EyOZCHC/jHYY7HacKSXg1Cki +we2ca0BWDrcqy8kLy0dZ5oC6IZG8O8drAK8f3f44CRYw59D3sOKBrKXaabpvyEcb +N7Wk2HDBVwHpUJo1reVwtbM8dhqQayYSD8oXnGpP3RQNu/e2rzlXRyq/BfcDY1JI +7TbC4t/7/N4EcPSpGsTcSOC9A7FpzvECAwEAAaN7MHkwCQYDVR0TBAIwADAsBglg +hkgBhvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0O +BBYEFCnwiEMMFZh7NhCr+qA8K0w4Q+AOMB8GA1UdIwQYMBaAFB0h1Evsaw2vfrmS +YuoCTmC4EE6ZMA0GCSqGSIb3DQEBCwUAA4IBAQAcFmHMaXRxyoNaeOowQ6iQWoZd +AUbvG7SHr7I6Pi2aqdqofsKWts7Ytm5WsS0M2nN+sW504houu0iCPeJJX8RQw2q4 +CCcNOs9IXk+2uMzlpocHpv+yYoUiD5DxgWh7eghQMLyMpf8FX3Gy4VazeuXznHOM +4gE4L417xkDzYOzqVTp0FTyAPUv6G2euhNCD6TMru9REcRhYul+K9kocjA5tt2KG +MH6y28LXbLyq4YJUxSUU9gY/xlnbbZS48KDqEcdYC9zjW9nQ0qS+XQuQuFIcwjJ5 +V4kAUYxDu6FoTpyQjgsrmBbZlKNxH7Nj4NDlcdJhp/zeSKHqWa5hSWjjKIxp +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIDAjCCAeqgAwIBAgIJAOWmjTS+OnTDMA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV +BAMMDXJvb3RhdXRob3JpdHkwHhcNMTUwNTE4MDkwMDQwWhcNMjMwODA0MDkwMDQw +WjAXMRUwEwYDVQQDDAxpbnRlcm1lZGlhdGUwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQDSrAO1CoPvUllgLOzDm5nG0skDF7vh1DUgAIDVGz0ecD0JFbQx +EF79pju/6MbtpTW2FYvRp11t/G7rGtX923ybOHY/1MNFQrdIvPlO1VV7IGKjoMwP +DNeb0fIGjHoE9QxaDxR8NX8xQbItpsw+TUtRfc9SLkR+jaYJfVRoM21BOncZbSHE +YKiZlEbpecB/+EtwVpgvl+8mPD5U07Fi4fp/lza3WXInXQPyiTVllIEJCt4PKmlu +MocNaJOW38bysL7i0PzDpVZtOxLHOTaW68yF3FckIHNCaA7k1ABEEEegjFMmIao7 +B9w7A0jvr4jZVvNmui5Djjn+oJxwEVVgyf8LAgMBAAGjUDBOMB0GA1UdDgQWBBQd +IdRL7GsNr365kmLqAk5guBBOmTAfBgNVHSMEGDAWgBRk81s9d0ZbiZhh44KckwPb +oTc0XzAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBZQTK0plfdB5PC +cC5icut4EmrByJa1RbU7ayuEE70e7hla6KVmVjVdCBGltI4jBYwfhKbRItHiAJ/8 +x+XZKBG8DLPFuDb7lAa1ObhAYF7YThUFPQYaBhfzKcWrdmWDBFpvNv6E0Mm364dZ +e7Yxmbe5S4agkYPoxEzgEYmcUk9jbjdR6eTbs8laG169ljrECXfEU9RiAcqz5iSX +NLSewqB47hn3B9qgKcQn+PsgO2j7M+rfklhNgeGJeWmy7j6clSOuCsIjWHU0RLQ4 +0W3SB/rpEAJ7fgQbYUPTIUNALSOWi/o1tDX2mXPRjBoxqAv7I+vYk1lZPmSzkyRh +FKvRDxsW +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIDAzCCAeugAwIBAgIJAJ0MomS4Ck+8MA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV +BAMMDXJvb3RhdXRob3JpdHkwHhcNMTUwNTE4MDkwMDQwWhcNMjMwODA0MDkwMDQw +WjAYMRYwFAYDVQQDDA1yb290YXV0aG9yaXR5MIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAts1ijtBV92S2cOvpUMOSTp9c6A34nIGr0T5Nhz6XiqRVT+gv +dQgmkdKJQjbvR60y6jzltYFsI2MpGVXY8h/oAL81D/k7PDB2aREgyBfTPAhBHyGw +siR+2xYt5b/Zs99q5RdRqQNzNpLPJriIKvUsRyQWy1UiG2s7pRXQeA8qB0XtJdCj +kFIi+G2bDsaffspGeDOCqt7t+yqvRXfSES0c/l7DIHaiMbbp4//ZNML3RNgAjPz2 +hCezZ+wOYajOIyoSPK8IgICrhYFYxvgWxwbLDBEfC5B3jOQsySe10GoRAKZz1gBV +DmgReu81tYJmdgkc9zknnQtIFdA0ex+GvZlfWQIDAQABo1AwTjAdBgNVHQ4EFgQU +ZPNbPXdGW4mYYeOCnJMD26E3NF8wHwYDVR0jBBgwFoAUZPNbPXdGW4mYYeOCnJMD +26E3NF8wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEATzkZ97K777uZ +lQcduNX3ey4IbCiEzFA2zO5Blj+ilfIwNbZXNOgm/lqNvVGDYs6J1apJJe30vL3X +J+t2zsZWzzQzb9uIU37zYemt6m0fHrSrx/iy5lGNqt3HMfqEcOqSCOIK3PCTMz2/ +uyGe1iw33PVeWsm1JUybQ9IrU/huJjbgOHU4wab+8SJCM49ipArp68Fr6j4lcEaE +4rfRg1ZsvxiOyUB3qPn6wyL/JB8kOJ+QCBe498376eaem8AEFk0kQRh6hDaWtq/k +t6IIXQLjx+EBDVP/veK0UnVhKRP8YTOoV8ZiG1NcdlJmX/Uk7iAfevP7CkBfSN8W +r6AL284qtw== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/packages/websocket/example/index.html b/packages/websocket/example/index.html new file mode 100644 index 00000000..d2f4a4ca --- /dev/null +++ b/packages/websocket/example/index.html @@ -0,0 +1,30 @@ + + + + + + + Angel WS + + + + + diff --git a/packages/websocket/example/main.dart b/packages/websocket/example/main.dart new file mode 100644 index 00000000..cf3746dc --- /dev/null +++ b/packages/websocket/example/main.dart @@ -0,0 +1,56 @@ +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_framework/http2.dart'; +import 'package:angel_websocket/server.dart'; +import 'package:file/local.dart'; +import 'package:logging/logging.dart'; + +main(List args) async { + var app = new Angel(); + var http = new AngelHttp(app); + var ws = new AngelWebSocket(app, sendErrors: !app.environment.isProduction); + var fs = const LocalFileSystem(); + app.logger = new Logger('angel_websocket'); + + // This is a plug-in. It hooks all your services, + // to automatically broadcast events. + await app.configure(ws.configureServer); + + app.get('/', (req, res) => res.streamFile(fs.file('example/index.html'))); + + // Listen for requests at `/ws`. + app.get('/ws', ws.handleRequest); + + app.fallback((req, res) => throw AngelHttpException.notFound()); + + ws.onConnection.listen((socket) { + socket.onData.listen((x) { + socket.send('pong', x); + }); + }); + + if (args.contains('http2')) { + var ctx = new SecurityContext() + ..useCertificateChain('dev.pem') + ..usePrivateKey('dev.key', password: 'dartdart'); + + try { + ctx.setAlpnProtocols(['h2'], true); + } catch (e, st) { + app.logger.severe( + 'Cannot set ALPN protocol on server to `h2`. The server will only serve HTTP/1.x.', + e, + st, + ); + } + + var http2 = new AngelHttp2(app, ctx); + http2.onHttp1.listen(http.handleRequest); + await http2.startServer('127.0.0.1', 3000); + print('Listening at ${http2.uri}'); + } else { + await http.startServer('127.0.0.1', 3000); + print('Listening at ${http.uri}'); + } +} diff --git a/packages/websocket/lib/angel_websocket.dart b/packages/websocket/lib/angel_websocket.dart new file mode 100644 index 00000000..e2a460c3 --- /dev/null +++ b/packages/websocket/lib/angel_websocket.dart @@ -0,0 +1,46 @@ +/// WebSocket plugin for Angel. +library angel_websocket; + +/// A notification from the server that something has occurred. +class WebSocketEvent { + String eventName; + Data data; + + WebSocketEvent({String this.eventName, this.data}); + + factory WebSocketEvent.fromJson(Map data) => new WebSocketEvent( + eventName: data['eventName'].toString(), data: data['data'] as Data); + + WebSocketEvent cast() { + if (T == Data) { + return this as WebSocketEvent; + } else { + return new WebSocketEvent(eventName: eventName, data: data as T); + } + } + + Map toJson() { + return {'eventName': eventName, 'data': data}; + } +} + +/// A command sent to the server, usually corresponding to a service method. +class WebSocketAction { + String id; + String eventName; + var data; + Map params; + + WebSocketAction( + {String this.id, String this.eventName, this.data, this.params}); + + factory WebSocketAction.fromJson(Map data) => new WebSocketAction( + id: data['id'].toString(), + eventName: data['eventName'].toString(), + data: data['data'], + params: data['params'] as Map); + + Map toJson() { + return {'id': id, 'eventName': eventName, 'data': data, 'params': params}; + } +} diff --git a/packages/websocket/lib/base_websocket_client.dart b/packages/websocket/lib/base_websocket_client.dart new file mode 100644 index 00000000..143ce69e --- /dev/null +++ b/packages/websocket/lib/base_websocket_client.dart @@ -0,0 +1,446 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; +import 'package:angel_client/angel_client.dart'; +import 'package:angel_client/base_angel_client.dart'; +import 'package:angel_http_exception/angel_http_exception.dart'; +import 'package:http/src/base_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/status.dart' as status; +import 'angel_websocket.dart'; +import 'constants.dart'; + +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +/// An [Angel] client that operates across WebSockets. +abstract class BaseWebSocketClient extends BaseAngelClient { + Duration _reconnectInterval; + WebSocketChannel _socket; + final Queue _queue = new Queue(); + + final StreamController _onData = new StreamController(); + final StreamController _onAllEvents = + new StreamController(); + final StreamController _onAuthenticated = + new StreamController(); + final StreamController _onError = + new StreamController(); + final StreamController> _onServiceEvent = + new StreamController>.broadcast(); + final StreamController + _onWebSocketChannelException = + new StreamController(); + + /// Use this to handle events that are not standard. + final WebSocketExtraneousEventHandler on = + new WebSocketExtraneousEventHandler(); + + /// Fired on all events. + Stream get onAllEvents => _onAllEvents.stream; + + /// Fired whenever a WebSocket is successfully authenticated. + Stream get onAuthenticated => _onAuthenticated.stream; + + /// A broadcast stream of data coming from the [socket]. + /// + /// Mostly just for internal use. + Stream get onData => _onData.stream; + + /// Fired on errors. + Stream get onError => _onError.stream; + + /// Fired whenever an event is fired by a service. + Stream> get onServiceEvent => + _onServiceEvent.stream; + + /// Fired on [WebSocketChannelException]s. + Stream get onWebSocketChannelException => + _onWebSocketChannelException.stream; + + /// The [WebSocketChannel] underneath this instance. + WebSocketChannel get socket => _socket; + + /// If `true` (default), then the client will automatically try to reconnect to the server + /// if the socket closes. + final bool reconnectOnClose; + + /// The amount of time to wait between reconnect attempts. Default: 10 seconds. + Duration get reconnectInterval => _reconnectInterval; + + Uri _wsUri; + + /// The [Uri] to which a websocket should point. + Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl); + + static Uri _toWsUri(Uri u) { + if (u.hasScheme) { + if (u.scheme == 'http') { + return u.replace(scheme: 'ws'); + } else if (u.scheme == 'https') { + return u.replace(scheme: 'wss'); + } else { + return u; + } + } else { + return _toWsUri(u.replace(scheme: Uri.base.scheme)); + } + } + + BaseWebSocketClient(http.BaseClient client, baseUrl, + {this.reconnectOnClose = true, Duration reconnectInterval}) + : super(client, baseUrl) { + _reconnectInterval = reconnectInterval ?? new Duration(seconds: 10); + } + + @override + Future close() async { + on._close(); + scheduleMicrotask(() async { + await _socket.sink.close(status.goingAway); + await _onData.close(); + await _onAllEvents.close(); + await _onAuthenticated.close(); + await _onError.close(); + await _onServiceEvent.close(); + await _onWebSocketChannelException.close(); + }); + } + + /// Connects the WebSocket. [timeout] is optional. + Future connect({Duration timeout}) async { + if (timeout != null) { + var c = new Completer(); + Timer timer; + + timer = new Timer(timeout, () { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.completeError(new TimeoutException( + 'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms', + timeout)); + } + }); + + scheduleMicrotask(() { + return getConnectedWebSocket().then((socket) { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + + while (_queue.isNotEmpty) { + var action = _queue.removeFirst(); + socket.sink.add(serialize(action)); + } + + c.complete(socket); + } + }).catchError((e, StackTrace st) { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.completeError(e, st); + } + }); + }); + + return await c.future.then((socket) { + _socket = socket; + listen(); + }); + } else { + _socket = await getConnectedWebSocket(); + listen(); + return _socket; + } + } + + /// Returns a new [WebSocketChannel], ready to be listened on. + /// + /// This should be overriden by child classes, **NOT** [connect]. + Future getConnectedWebSocket(); + + @override + WebSocketsService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.toString().replaceAll(_straySlashes, ''); + return new WebSocketsService(socket, this, uri, + deserializer: deserializer); + } + + /// Starts listening for data. + void listen() { + _socket?.stream?.listen( + (data) { + _onData.add(data); + + if (data is WebSocketChannelException) { + _onWebSocketChannelException.add(data); + } else if (data is String) { + var jsons = json.decode(data); + + if (jsons is Map) { + var event = new WebSocketEvent.fromJson(jsons); + + if (event.eventName?.isNotEmpty == true) { + _onAllEvents.add(event); + on._getStream(event.eventName).add(event); + } + + if (event.eventName == errorEvent) { + var error = + new AngelHttpException.fromMap((event.data ?? {}) as Map); + _onError.add(error); + } else if (event.eventName == authenticatedEvent) { + var authResult = new AngelAuthResult.fromMap(event.data as Map); + _onAuthenticated.add(authResult); + } else if (event.eventName?.isNotEmpty == true) { + var split = event.eventName + .split("::") + .where((str) => str.isNotEmpty) + .toList(); + + if (split.length >= 2) { + var serviceName = split[0], eventName = split[1]; + _onServiceEvent + .add({serviceName: event..eventName = eventName}); + } + } + } + } + }, + cancelOnError: true, + onDone: () { + _socket = null; + if (reconnectOnClose == true) { + new Timer.periodic(reconnectInterval, (Timer timer) async { + var result; + + try { + result = await connect(timeout: reconnectInterval); + } catch (e) { + // + } + + if (result != null) timer.cancel(); + }); + } + }); + } + + /// Serializes data to JSON. + serialize(x) => json.encode(x); + + /// Sends the given [action] on the [socket]. + void sendAction(WebSocketAction action) { + if (_socket == null) + _queue.addLast(action); + else + socket.sink.add(serialize(action)); + } + + /// Attempts to authenticate a WebSocket, using a valid JWT. + void authenticateViaJwt(String jwt) { + sendAction(new WebSocketAction( + eventName: authenticateAction, + params: { + 'query': {'jwt': jwt} + }, + )); + } +} + +/// A [Service] that asynchronously interacts with the server. +class WebSocketsService extends Service { + /// The [BaseWebSocketClient] that spawned this service. + @override + final BaseWebSocketClient app; + + /// Used to deserialize JSON into typed data. + final AngelDeserializer deserializer; + + /// The [WebSocketChannel] to listen to, and send data across. + final WebSocketChannel socket; + + /// The service path to listen to. + final String path; + + final StreamController _onAllEvents = + new StreamController(); + final StreamController> _onIndexed = new StreamController(); + final StreamController _onRead = new StreamController(); + final StreamController _onCreated = new StreamController(); + final StreamController _onModified = new StreamController(); + final StreamController _onUpdated = new StreamController(); + final StreamController _onRemoved = new StreamController(); + + /// Fired on all events. + Stream get onAllEvents => _onAllEvents.stream; + + /// Fired on `index` events. + Stream> get onIndexed => _onIndexed.stream; + + /// Fired on `read` events. + Stream get onRead => _onRead.stream; + + /// Fired on `created` events. + Stream get onCreated => _onCreated.stream; + + /// Fired on `modified` events. + Stream get onModified => _onModified.stream; + + /// Fired on `updated` events. + Stream get onUpdated => _onUpdated.stream; + + /// Fired on `removed` events. + Stream get onRemoved => _onRemoved.stream; + + WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) { + listen(); + } + + Future close() async { + await _onAllEvents.close(); + await _onCreated.close(); + await _onIndexed.close(); + await _onModified.close(); + await _onRead.close(); + await _onRemoved.close(); + await _onUpdated.close(); + } + + /// Serializes an [action] to be sent over a WebSocket. + serialize(WebSocketAction action) => json.encode(action); + + /// Deserializes data from a [WebSocketEvent]. + Data deserialize(x) { + return deserializer != null ? deserializer(x) : x as Data; + } + + /// Deserializes the contents of an [event]. + WebSocketEvent transformEvent(WebSocketEvent event) { + return new WebSocketEvent( + eventName: event.eventName, data: deserialize(event.data)); + } + + /// Starts listening for events. + void listen() { + app.onServiceEvent.listen((map) { + if (map.containsKey(path)) { + var event = map[path]; + + _onAllEvents.add(event); + + if (event.eventName == indexedEvent) { + var d = event.data; + var transformed = new WebSocketEvent( + eventName: event.eventName, + data: d is Iterable ? d.map(deserialize).toList() : null); + if (transformed.data != null) _onIndexed.add(transformed.data); + return; + } + + var transformed = transformEvent(event).data; + + switch (event.eventName) { + case readEvent: + _onRead.add(transformed); + break; + case createdEvent: + _onCreated.add(transformed); + break; + case modifiedEvent: + _onModified.add(transformed); + break; + case updatedEvent: + _onUpdated.add(transformed); + break; + case removedEvent: + _onRemoved.add(transformed); + break; + } + } + }); + } + + /// Sends the given [action] on the [socket]. + void send(WebSocketAction action) { + app.sendAction(action); + } + + @override + Future> index([Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$indexAction', params: params ?? {})); + return null; + } + + @override + Future read(id, [Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$readAction', + id: id.toString(), + params: params ?? {})); + return null; + } + + @override + Future create(data, [Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$createAction', data: data, params: params ?? {})); + return null; + } + + @override + Future modify(id, data, [Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$modifyAction', + id: id.toString(), + data: data, + params: params ?? {})); + return null; + } + + @override + Future update(id, data, [Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$updateAction', + id: id.toString(), + data: data, + params: params ?? {})); + return null; + } + + @override + Future remove(id, [Map params]) async { + app.sendAction(new WebSocketAction( + eventName: '$path::$removeAction', + id: id.toString(), + params: params ?? {})); + return null; + } + + /// No longer necessary. + @deprecated + Service unwrap() => this; +} + +/// Contains a dynamic Map of [WebSocketEvent] streams. +class WebSocketExtraneousEventHandler { + Map> _events = {}; + + StreamController _getStream(String index) { + if (_events[index] == null) + _events[index] = new StreamController(); + + return _events[index]; + } + + Stream operator [](String index) { + if (_events[index] == null) + _events[index] = new StreamController(); + + return _events[index].stream; + } + + void _close() { + _events.values.forEach((s) => s.close()); + } +} diff --git a/packages/websocket/lib/browser.dart b/packages/websocket/lib/browser.dart new file mode 100644 index 00000000..3f84ebc3 --- /dev/null +++ b/packages/websocket/lib/browser.dart @@ -0,0 +1,110 @@ +/// Browser WebSocket client library for the Angel framework. +library angel_websocket.browser; + +import 'dart:async'; +import 'dart:html'; +import 'package:angel_client/angel_client.dart'; +import 'package:angel_http_exception/angel_http_exception.dart'; +import 'package:http/browser_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/html.dart'; +import 'base_websocket_client.dart'; +export 'angel_websocket.dart'; + +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration reconnectInterval}) + : super(new http.BrowserClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token', String errorMessage}) { + var ctrl = new StreamController(); + var wnd = window.open(url, 'angel_client_auth_popup'); + + Timer t; + StreamSubscription sub; + t = new Timer.periodic(new Duration(milliseconds: 500), (timer) { + if (!ctrl.isClosed) { + if (wnd.closed) { + ctrl.addError(new AngelHttpException.notAuthenticated( + message: + errorMessage ?? 'Authentication via popup window failed.')); + ctrl.close(); + timer.cancel(); + sub?.cancel(); + } + } else + timer.cancel(); + }); + + sub = window.on[eventName ?? 'token'].listen((e) { + if (!ctrl.isClosed) { + ctrl.add((e as CustomEvent).detail.toString()); + t.cancel(); + ctrl.close(); + sub.cancel(); + } + }); + + return ctrl.stream; + } + + @override + Future getConnectedWebSocket() { + var url = websocketUri; + + if (authToken?.isNotEmpty == true) { + url = url.replace( + queryParameters: new Map.from(url.queryParameters) + ..['token'] = authToken); + } + + var socket = new WebSocket(url.toString()); + var completer = new Completer(); + + socket + ..onOpen.listen((_) { + if (!completer.isCompleted) + return completer.complete(new HtmlWebSocketChannel(socket)); + }) + ..onError.listen((e) { + if (!completer.isCompleted) + return completer.completeError(e is ErrorEvent ? e.error : e); + }); + + return completer.future; + } + + @override + BrowserWebSocketsService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.replaceAll(_straySlashes, ''); + return new BrowserWebSocketsService(socket, this, uri, + deserializer: deserializer); + } +} + +class BrowserWebSocketsService extends WebSocketsService { + final Type type; + + BrowserWebSocketsService(WebSocketChannel socket, WebSockets app, String uri, + {this.type, AngelDeserializer deserializer}) + : super(socket, app, uri, deserializer: deserializer); +} diff --git a/packages/websocket/lib/constants.dart b/packages/websocket/lib/constants.dart new file mode 100644 index 00000000..25f75e28 --- /dev/null +++ b/packages/websocket/lib/constants.dart @@ -0,0 +1,87 @@ +const String authenticateAction = 'authenticate'; +const String indexAction = 'index'; +const String readAction = 'read'; +const String createAction = 'create'; +const String modifyAction = 'modify'; +const String updateAction = 'update'; +const String removeAction = 'remove'; + +@deprecated +const String ACTION_AUTHENTICATE = authenticateAction; + +@deprecated +const String ACTION_INDEX = indexAction; + +@deprecated +const String ACTION_READ = readAction; + +@deprecated +const String ACTION_CREATE = createAction; + +@deprecated +const String ACTION_MODIFY = modifyAction; + +@deprecated +const String ACTION_UPDATE = updateAction; + +@deprecated +const String ACTION_REMOVE = removeAction; + +const String authenticatedEvent = 'authenticated'; +const String errorEvent = 'error'; +const String indexedEvent = 'indexed'; +const String readEvent = 'read'; +const String createdEvent = 'created'; +const String modifiedEvent = 'modified'; +const String updatedEvent = 'updated'; +const String removedEvent = 'removed'; + +@deprecated +const String EVENT_AUTHENTICATED = authenticatedEvent; + +@deprecated +const String EVENT_ERROR = errorEvent; + +@deprecated +const String EVENT_INDEXED = indexedEvent; + +@deprecated +const String EVENT_READ = readEvent; + +@deprecated +const String EVENT_CREATED = createdEvent; + +@deprecated +const String EVENT_MODIFIED = modifiedEvent; + +@deprecated +const String EVENT_UPDATED = updatedEvent; + +@deprecated +const String EVENT_REMOVED = removedEvent; + +/// The standard Angel service actions. +const List actions = const [ + indexAction, + readAction, + createAction, + modifyAction, + updateAction, + removeAction +]; + +@deprecated +const List ACTIONS = actions; + +/// The standard Angel service events. +const List events = const [ + indexedEvent, + readEvent, + createdEvent, + modifiedEvent, + updatedEvent, + removedEvent +]; + +@deprecated +const List EVENTS = events; diff --git a/packages/websocket/lib/flutter.dart b/packages/websocket/lib/flutter.dart new file mode 100644 index 00000000..e88b94be --- /dev/null +++ b/packages/websocket/lib/flutter.dart @@ -0,0 +1,50 @@ +/// Flutter-compatible WebSocket client library for the Angel framework. +library angel_websocket.flutter; + +import 'dart:async'; +import 'dart:io'; +import 'package:http/http.dart' as http; +import 'package:http/io_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'base_websocket_client.dart'; +export 'package:angel_client/angel_client.dart'; +export 'angel_websocket.dart'; + +// final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration reconnectInterval}) + : super(new http.IOClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token'}) { + throw new UnimplementedError( + 'Opening popup windows is not supported in the `dart:io` client.'); + } + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Future getConnectedWebSocket() async { + var socket = await WebSocket.connect(websocketUri.toString(), + headers: authToken?.isNotEmpty == true + ? {'Authorization': 'Bearer $authToken'} + : {}); + return new IOWebSocketChannel(socket); + } +} diff --git a/packages/websocket/lib/hooks.dart b/packages/websocket/lib/hooks.dart new file mode 100644 index 00000000..362e5d62 --- /dev/null +++ b/packages/websocket/lib/hooks.dart @@ -0,0 +1,30 @@ +import 'package:angel_framework/angel_framework.dart'; + +/// Prevents a WebSocket event from being broadcasted, to any client from the given [provider]. +/// +/// [provider] can be a String, a [Provider], or an Iterable. +/// If [provider] is `null`, any provider will be blocked. +HookedServiceEventListener doNotBroadcast([provider]) { + return (HookedServiceEvent e) { + if (e.params != null && e.params.containsKey('provider')) { + bool deny = false; + Iterable providers = provider is Iterable ? provider : [provider]; + + for (var p in providers) { + if (deny) break; + + if (p is Providers) { + deny = deny || + p == e.params['provider'] || + e.params['provider'] == p.via; + } else if (p == null) { + deny = true; + } else + deny = + deny || (e.params['provider'] as Providers).via == p.toString(); + } + + e.params['broadcast'] = false; + } + }; +} diff --git a/packages/websocket/lib/io.dart b/packages/websocket/lib/io.dart new file mode 100644 index 00000000..6bcfc0ed --- /dev/null +++ b/packages/websocket/lib/io.dart @@ -0,0 +1,66 @@ +/// Command-line WebSocket client library for the Angel framework. +library angel_websocket.io; + +import 'dart:async'; +import 'dart:io'; +import 'package:angel_client/angel_client.dart'; +import 'package:http/http.dart' as http; +import 'package:http/io_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'base_websocket_client.dart'; +export 'package:angel_client/angel_client.dart'; +export 'angel_websocket.dart'; + +final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)"); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration reconnectInterval}) + : super(new http.IOClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token'}) { + throw new UnimplementedError( + 'Opening popup windows is not supported in the `dart:io` client.'); + } + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Future getConnectedWebSocket() async { + var socket = await WebSocket.connect(websocketUri.toString(), + headers: authToken?.isNotEmpty == true + ? {'Authorization': 'Bearer $authToken'} + : {}); + return new IOWebSocketChannel(socket); + } + + @override + IoWebSocketsService service(String path, + {Type type, AngelDeserializer deserializer}) { + String uri = path.replaceAll(_straySlashes, ''); + return new IoWebSocketsService(socket, this, uri, type); + } +} + +class IoWebSocketsService extends WebSocketsService { + final Type type; + + IoWebSocketsService( + WebSocketChannel socket, WebSockets app, String uri, this.type) + : super(socket, app, uri); +} diff --git a/packages/websocket/lib/server.dart b/packages/websocket/lib/server.dart new file mode 100644 index 00000000..734c2c35 --- /dev/null +++ b/packages/websocket/lib/server.dart @@ -0,0 +1,454 @@ +/// Server-side support for WebSockets. +library angel_websocket.server; + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:mirrors'; +import 'package:angel_auth/angel_auth.dart'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_framework/http2.dart'; +import 'package:merge_map/merge_map.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'angel_websocket.dart'; +import 'constants.dart'; +export 'angel_websocket.dart'; + +part 'websocket_context.dart'; + +part 'websocket_controller.dart'; + +typedef String WebSocketResponseSerializer(data); + +/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s. +class AngelWebSocket { + List _clients = []; + final List _servicesAlreadyWired = []; + + final StreamController _onAction = + new StreamController(); + final StreamController _onData = new StreamController(); + final StreamController _onConnection = + new StreamController.broadcast(); + final StreamController _onDisconnect = + new StreamController.broadcast(); + + final Angel app; + + /// If this is not `true`, then all client-side service parameters will be + /// discarded, other than `params['query']`. + final bool allowClientParams; + + /// An optional whitelist of allowed client origins, or [:null:]. + final List allowedOrigins; + + /// An optional whitelist of allowed client protocols, or [:null:]. + final List allowedProtocols; + + /// If `true`, then clients can authenticate their WebSockets by sending a valid JWT. + final bool allowAuth; + + /// Send error information across WebSockets, without including debug information.. + final bool sendErrors; + + /// A list of clients currently connected to this server via WebSockets. + List get clients => new List.unmodifiable(_clients); + + /// Services that have already been hooked to fire socket events. + List get servicesAlreadyWired => + new List.unmodifiable(_servicesAlreadyWired); + + /// Used to notify other nodes of an event's firing. Good for scaled applications. + final StreamChannel synchronizationChannel; + + /// Fired on any [WebSocketAction]. + Stream get onAction => _onAction.stream; + + /// Fired whenever a WebSocket sends data. + Stream get onData => _onData.stream; + + /// Fired on incoming connections. + Stream get onConnection => _onConnection.stream; + + /// Fired when a user disconnects. + Stream get onDisconnection => _onDisconnect.stream; + + /// Serializes data to WebSockets. + WebSocketResponseSerializer serializer; + + /// Deserializes data from WebSockets. + Function deserializer; + + AngelWebSocket(this.app, + {this.sendErrors = false, + this.allowClientParams = false, + this.allowAuth = true, + this.synchronizationChannel, + this.serializer, + this.deserializer, + this.allowedOrigins, + this.allowedProtocols}) { + if (serializer == null) serializer = json.encode; + if (deserializer == null) deserializer = (params) => params; + } + + HookedServiceEventListener serviceHook(String path) { + return (HookedServiceEvent e) async { + if (e.params != null && e.params['broadcast'] == false) return; + + var event = await transformEvent(e); + event.eventName = "$path::${event.eventName}"; + + _filter(WebSocketContext socket) { + if (e.service.configuration.containsKey('ws:filter')) + return e.service.configuration['ws:filter'](e, socket); + else if (e.params != null && e.params.containsKey('ws:filter')) + return e.params['ws:filter'](e, socket); + else + return true; + } + + await batchEvent(event, filter: _filter); + }; + } + + /// Slates an event to be dispatched. + Future batchEvent(WebSocketEvent event, + {filter(WebSocketContext socket), bool notify = true}) async { + // Default implementation will just immediately fire events + _clients.forEach((client) async { + dynamic result = true; + if (filter != null) result = await filter(client); + if (result == true) { + client.channel.sink.add((serializer ?? json.encode)(event.toJson())); + } + }); + + if (synchronizationChannel != null && notify != false) + synchronizationChannel.sink.add(event); + } + + /// Returns a list of events yet to be sent. + Future> getBatchedEvents() async => []; + + /// Responds to an incoming action on a WebSocket. + Future handleAction(WebSocketAction action, WebSocketContext socket) async { + var split = action.eventName.split("::"); + + if (split.length < 2) { + socket.sendError(new AngelHttpException.badRequest()); + return null; + } + + var service = app.findService(split[0]); + + if (service == null) { + socket.sendError(new AngelHttpException.notFound( + message: "No service \"${split[0]}\" exists.")); + return null; + } + + var actionName = split[1]; + + if (action.params is! Map) action.params = {}; + + if (allowClientParams != true) { + if (action.params['query'] is Map) + action.params = {'query': action.params['query']}; + else + action.params = {}; + } + + var params = mergeMap([ + ((deserializer ?? (params) => params)(action.params)) + as Map, + { + "provider": Providers.websocket, + '__requestctx': socket.request, + '__responsectx': socket.response + } + ]); + + try { + if (actionName == indexAction) { + socket.send( + "${split[0]}::" + indexedEvent, await service.index(params)); + return null; + } else if (actionName == readAction) { + socket.send( + "${split[0]}::" + readEvent, await service.read(action.id, params)); + return null; + } else if (actionName == createAction) { + return new WebSocketEvent( + eventName: "${split[0]}::" + createdEvent, + data: await service.create(action.data, params)); + } else if (actionName == modifyAction) { + return new WebSocketEvent( + eventName: "${split[0]}::" + modifiedEvent, + data: await service.modify(action.id, action.data, params)); + } else if (actionName == updateAction) { + return new WebSocketEvent( + eventName: "${split[0]}::" + updatedEvent, + data: await service.update(action.id, action.data, params)); + } else if (actionName == removeAction) { + return new WebSocketEvent( + eventName: "${split[0]}::" + removedEvent, + data: await service.remove(action.id, params)); + } else { + socket.sendError(new AngelHttpException.methodNotAllowed( + message: "Method Not Allowed: \"$actionName\"")); + return null; + } + } catch (e, st) { + catchError(e, st, socket); + } + } + + /// Authenticates a [WebSocketContext]. + Future handleAuth(WebSocketAction action, WebSocketContext socket) async { + if (allowAuth != false && + action.eventName == authenticateAction && + action.params['query'] is Map && + action.params['query']['jwt'] is String) { + try { + var auth = socket.request.container.make(); + var jwt = action.params['query']['jwt'] as String; + AuthToken token; + + token = new AuthToken.validate(jwt, auth.hmac); + var user = await auth.deserializer(token.userId); + socket.request + ..container.registerSingleton(token) + ..container.registerSingleton(user, as: user.runtimeType as Type); + socket._onAuthenticated.add(null); + socket.send(authenticatedEvent, + {'token': token.serialize(auth.hmac), 'data': user}); + } catch (e, st) { + catchError(e, st, socket); + } + } else { + socket.sendError(new AngelHttpException.badRequest( + message: 'No JWT provided for authentication.')); + } + } + + /// Hooks a service up to have its events broadcasted. + hookupService(Pattern _path, HookedService service) { + String path = _path.toString(); + service.after( + [ + HookedServiceEvent.created, + HookedServiceEvent.modified, + HookedServiceEvent.updated, + HookedServiceEvent.removed + ], + serviceHook(path), + ); + _servicesAlreadyWired.add(path); + } + + /// Runs before firing [onConnection]. + Future handleConnect(WebSocketContext socket) async {} + + /// Handles incoming data from a WebSocket. + handleData(WebSocketContext socket, data) async { + try { + socket._onData.add(data); + var fromJson = json.decode(data.toString()); + var action = new WebSocketAction.fromJson(fromJson as Map); + _onAction.add(action); + + if (action.eventName == null || + action.eventName is! String || + action.eventName.isEmpty) { + throw new AngelHttpException.badRequest(); + } + + if (fromJson is Map && fromJson.containsKey("eventName")) { + socket._onAction.add(new WebSocketAction.fromJson(fromJson)); + socket.on + ._getStreamForEvent(fromJson["eventName"].toString()) + .add(fromJson["data"] as Map); + } + + if (action.eventName == authenticateAction) + await handleAuth(action, socket); + + if (action.eventName.contains("::")) { + var split = action.eventName.split("::"); + + if (split.length >= 2) { + if (actions.contains(split[1])) { + var event = await handleAction(action, socket); + if (event is Future) event = await event; + } + } + } + } catch (e, st) { + catchError(e, st, socket); + } + } + + void catchError(e, StackTrace st, WebSocketContext socket) { + // Send an error + if (e is AngelHttpException) { + socket.sendError(e); + app.logger?.severe(e.message, e.error ?? e, e.stackTrace); + } else if (sendErrors) { + var err = new AngelHttpException(e, + message: e.toString(), stackTrace: st, errors: [st.toString()]); + socket.sendError(err); + app.logger?.severe(err.message, e, st); + } else { + var err = new AngelHttpException(e); + socket.sendError(err); + app.logger?.severe(e.toString(), e, st); + } + } + + /// Transforms a [HookedServiceEvent], so that it can be broadcasted. + Future transformEvent(HookedServiceEvent event) async { + return new WebSocketEvent(eventName: event.eventName, data: event.result); + } + + /// Hooks any [HookedService]s that are not being broadcasted yet. + wireAllServices(Angel app) { + for (Pattern key in app.services.keys.where((x) { + return !_servicesAlreadyWired.contains(x) && + app.services[x] is HookedService; + })) { + hookupService(key, app.services[key] as HookedService); + } + } + + /// Configures an [Angel] instance to listen for WebSocket connections. + Future configureServer(Angel app) async { + app..container.registerSingleton(this); + + if (runtimeType != AngelWebSocket) + app..container.registerSingleton(this); + + // Set up services + wireAllServices(app); + + app.onService.listen((_) { + wireAllServices(app); + }); + + if (synchronizationChannel != null) { + synchronizationChannel.stream.listen((e) => batchEvent(e, notify: false)); + } + + app.shutdownHooks.add((_) => synchronizationChannel?.sink?.close()); + } + + /// Handles an incoming [WebSocketContext]. + Future handleClient(WebSocketContext socket) async { + var origin = socket.request.headers.value('origin'); + if (allowedOrigins != null && !allowedOrigins.contains(origin)) { + throw new AngelHttpException.forbidden( + message: + 'WebSocket connections are not allowed from the origin "$origin".'); + } + + _clients.add(socket); + await handleConnect(socket); + + _onConnection.add(socket); + + socket.request.container.registerSingleton(socket); + + socket.channel.stream.listen( + (data) { + _onData.add(data); + handleData(socket, data); + }, + onDone: () { + _onDisconnect.add(socket); + _clients.remove(socket); + }, + onError: (e) { + _onDisconnect.add(socket); + _clients.remove(socket); + }, + cancelOnError: true, + ); + } + + /// Handles an incoming HTTP request. + Future handleRequest(RequestContext req, ResponseContext res) async { + if (req is HttpRequestContext && res is HttpResponseContext) { + if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest)) + throw new AngelHttpException.badRequest(); + await res.detach(); + var ws = await WebSocketTransformer.upgrade(req.rawRequest); + var channel = new IOWebSocketChannel(ws); + var socket = new WebSocketContext(channel, req, res); + scheduleMicrotask(() => handleClient(socket)); + return false; + } else if (req is Http2RequestContext && res is Http2ResponseContext) { + var connection = + req.headers['connection']?.map((s) => s.toLowerCase().trim()); + var upgrade = req.headers.value('upgrade')?.toLowerCase(); + var version = req.headers.value('sec-websocket-version'); + var key = req.headers.value('sec-websocket-key'); + var protocol = req.headers.value('sec-websocket-protocol'); + + if (connection == null) { + throw new AngelHttpException.badRequest( + message: 'Missing `connection` header.'); + } else if (!connection.contains('upgrade')) { + throw new AngelHttpException.badRequest( + message: 'Missing "upgrade" in `connection` header.'); + } else if (upgrade != 'websocket') { + throw new AngelHttpException.badRequest( + message: 'The `upgrade` header must equal "websocket".'); + } else if (version != '13') { + throw new AngelHttpException.badRequest( + message: 'The `sec-websocket-version` header must equal "13".'); + } else if (key == null) { + throw new AngelHttpException.badRequest( + message: 'Missing `sec-websocket-key` header.'); + } else if (protocol != null && + allowedProtocols != null && + !allowedProtocols.contains(protocol)) { + throw new AngelHttpException.badRequest( + message: 'Disallowed `sec-websocket-protocol` header "$protocol".'); + } else { + var stream = res.detach(); + var ctrl = new StreamChannelController>(); + + ctrl.local.stream.listen((buf) { + stream.sendData(buf); + }, onDone: () { + stream.outgoingMessages.close(); + }); + + if (req.hasParsedBody) { + await ctrl.local.sink.close(); + } else { + await req.body.pipe(ctrl.local.sink); + } + + var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink); + sink.add("HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n"); + if (protocol != null) sink.add("Sec-WebSocket-Protocol: $protocol\r\n"); + sink.add("\r\n"); + + var ws = new WebSocketChannel(ctrl.foreign); + var socket = new WebSocketContext(ws, req, res); + scheduleMicrotask(() => handleClient(socket)); + return false; + } + } else { + throw new ArgumentError( + 'Not an HTTP/1.1 or HTTP/2 RequestContext+ResponseContext pair: $req, $res'); + } + } +} diff --git a/packages/websocket/lib/websocket_context.dart b/packages/websocket/lib/websocket_context.dart new file mode 100644 index 00000000..1876f8bd --- /dev/null +++ b/packages/websocket/lib/websocket_context.dart @@ -0,0 +1,73 @@ +part of angel_websocket.server; + +/// Represents a WebSocket session, with the original +/// [RequestContext] and [ResponseContext] attached. +class WebSocketContext { + /// Use this to listen for events. + _WebSocketEventTable on = new _WebSocketEventTable(); + + /// The underlying [StreamChannel]. + final StreamChannel channel; + + /// The original [RequestContext]. + final RequestContext request; + + /// The original [ResponseContext]. + final ResponseContext response; + + StreamController _onAction = + new StreamController(); + + StreamController _onAuthenticated = StreamController(); + + StreamController _onClose = new StreamController(); + + StreamController _onData = new StreamController(); + + /// Fired on any [WebSocketAction]; + Stream get onAction => _onAction.stream; + + /// Fired when the user authenticates. + Stream get onAuthenticated => _onAuthenticated.stream; + + /// Fired once the underlying [WebSocket] closes. + Stream get onClose => _onClose.stream; + + /// Fired when any data is sent through [channel]. + Stream get onData => _onData.stream; + + WebSocketContext(this.channel, this.request, this.response); + + /// Closes the underlying [StreamChannel]. + Future close() async { + scheduleMicrotask(() async { + await channel.sink.close(); + await _onAction.close(); + await _onAuthenticated.close(); + await _onData.close(); + await _onClose.add(null); + await _onClose.close(); + }); + } + + /// Sends an arbitrary [WebSocketEvent]; + void send(String eventName, data) { + channel.sink.add(json + .encode(new WebSocketEvent(eventName: eventName, data: data).toJson())); + } + + /// Sends an error event. + void sendError(AngelHttpException error) => send(errorEvent, error.toJson()); +} + +class _WebSocketEventTable { + Map> _handlers = {}; + + StreamController _getStreamForEvent(String eventName) { + if (!_handlers.containsKey(eventName)) + _handlers[eventName] = new StreamController(); + return _handlers[eventName]; + } + + Stream operator [](String key) => _getStreamForEvent(key).stream; +} diff --git a/packages/websocket/lib/websocket_controller.dart b/packages/websocket/lib/websocket_controller.dart new file mode 100644 index 00000000..ad577484 --- /dev/null +++ b/packages/websocket/lib/websocket_controller.dart @@ -0,0 +1,89 @@ +part of angel_websocket.server; + +/// Marks a method as available to WebSockets. +class ExposeWs { + final String eventName; + + const ExposeWs(this.eventName); +} + +/// A special controller that also supports WebSockets. +class WebSocketController extends Controller { + /// The plug-in instance powering this controller. + final AngelWebSocket ws; + + Map _handlers = {}; + Map _handlerSymbols = {}; + + WebSocketController(this.ws) : super(); + + /// Sends an event to all clients. + void broadcast(String eventName, data, {filter(WebSocketContext socket)}) { + ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data), + filter: filter); + } + + /// Fired on new connections. + onConnect(WebSocketContext socket) {} + + /// Fired on disconnections. + onDisconnect(WebSocketContext socket) {} + + /// Fired on all incoming actions. + onAction(WebSocketAction action, WebSocketContext socket) async {} + + /// Fired on arbitrary incoming data. + onData(data, WebSocketContext socket) {} + + @override + Future configureServer(Angel app) async { + if (findExpose(app.container.reflector) != null) + await super.configureServer(app); + + InstanceMirror instanceMirror = reflect(this); + ClassMirror classMirror = reflectClass(this.runtimeType); + classMirror.instanceMembers.forEach((sym, mirror) { + if (mirror.isRegularMethod) { + InstanceMirror exposeMirror = mirror.metadata.firstWhere( + (mirror) => mirror.reflectee is ExposeWs, + orElse: () => null); + + if (exposeMirror != null) { + ExposeWs exposeWs = exposeMirror.reflectee as ExposeWs; + _handlers[exposeWs.eventName] = mirror; + _handlerSymbols[exposeWs.eventName] = sym; + } + } + }); + + ws.onConnection.listen((socket) async { + if (!socket.request.container.has()) { + socket.request.container.registerSingleton(socket); + } + + await onConnect(socket); + + socket.onData.listen((data) => onData(data, socket)); + + socket.onAction.listen((WebSocketAction action) async { + var container = socket.request.container.createChild(); + container.registerSingleton(action); + + try { + await onAction(action, socket); + + if (_handlers.containsKey(action.eventName)) { + var methodMirror = _handlers[action.eventName]; + var fn = instanceMirror.getField(methodMirror.simpleName).reflectee; + return app.runContained( + fn as Function, socket.request, socket.response, container); + } + } catch (e, st) { + ws.catchError(e, st, socket); + } + }); + }); + + ws.onDisconnection.listen(onDisconnect); + } +} diff --git a/packages/websocket/pubspec.yaml b/packages/websocket/pubspec.yaml new file mode 100644 index 00000000..91d16906 --- /dev/null +++ b/packages/websocket/pubspec.yaml @@ -0,0 +1,23 @@ +name: angel_websocket +description: Support for using pkg:angel_client with WebSockets. Designed for Angel. +environment: + sdk: ">=2.0.0-dev <3.0.0" +version: 2.0.3 +author: Tobe O +homepage: https://github.com/angel-dart/angel_websocket +dependencies: + angel_auth: ^2.0.0-alpha + angel_client: ^2.0.0-alpha + angel_framework: ^2.0.0-rc.0 + angel_http_exception: ^1.0.0 + http: ">=0.11.0 <0.13.0" + merge_map: ^1.0.0 + meta: ^1.0.0 + stream_channel: ^2.0.0 + web_socket_channel: ^1.0.0 +dev_dependencies: + angel_container: ^1.0.0-alpha + angel_model: ^1.0.0 + logging: ^0.11.0 + pedantic: ^1.0.0 + test: ^1.0.0 diff --git a/packages/websocket/test/auth_test.dart b/packages/websocket/test/auth_test.dart new file mode 100644 index 00000000..8a8567ca --- /dev/null +++ b/packages/websocket/test/auth_test.dart @@ -0,0 +1,63 @@ +import 'dart:async'; +import 'package:angel_auth/angel_auth.dart'; +import 'package:angel_client/io.dart' as c; +import 'package:angel_framework/angel_framework.dart'; +import "package:angel_framework/http.dart"; +import 'package:angel_websocket/io.dart' as c; +import 'package:angel_websocket/server.dart'; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; + +const Map USER = const {'username': 'foo', 'password': 'bar'}; + +main() { + Angel app; + AngelHttp http; + c.Angel client; + c.WebSockets ws; + + setUp(() async { + app = new Angel(); + http = new AngelHttp(app, useZone: false); + var auth = new AngelAuth(); + + auth.serializer = (_) async => 'baz'; + auth.deserializer = (_) async => USER; + + auth.strategies['local'] = new LocalAuthStrategy( + (username, password) async { + if (username == 'foo' && password == 'bar') return USER; + }, + ); + + app.post('/auth/local', auth.authenticate('local')); + + await app.configure(auth.configureServer); + var sock = new AngelWebSocket(app); + await app.configure(sock.configureServer); + app.all('/ws', sock.handleRequest); + app.logger = new Logger('angel_auth')..onRecord.listen(print); + + var server = await http.startServer(); + client = new c.Rest('http://${server.address.address}:${server.port}'); + ws = new c.WebSockets('ws://${server.address.address}:${server.port}/ws'); + await ws.connect(); + }); + + tearDown(() { + return Future.wait([ + http.close(), + client.close(), + ws.close(), + ]); + }); + + test('auth event fires', () async { + var localAuth = await client.authenticate(type: 'local', credentials: USER); + print('JWT: ${localAuth.token}'); + + ws.authenticateViaJwt(localAuth.token); + var auth = await ws.onAuthenticated.first; + expect(auth.token, localAuth.token); + }); +} diff --git a/packages/websocket/test/controller/common.dart b/packages/websocket/test/controller/common.dart new file mode 100644 index 00000000..8e624d73 --- /dev/null +++ b/packages/websocket/test/controller/common.dart @@ -0,0 +1,35 @@ +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_websocket/server.dart'; + +class Game { + final String playerOne, playerTwo; + + const Game({this.playerOne, this.playerTwo}); + + factory Game.fromJson(Map data) => new Game( + playerOne: data['playerOne'].toString(), + playerTwo: data['playerTwo'].toString()); + + Map toJson() { + return {'playerOne': playerOne, 'playerTwo': playerTwo}; + } + + @override + bool operator ==(other) => + other is Game && + other.playerOne == playerOne && + other.playerTwo == playerTwo; +} + +const Game johnVsBob = const Game(playerOne: 'John', playerTwo: 'Bob'); + +@Expose('/game') +class GameController extends WebSocketController { + GameController(AngelWebSocket ws) : super(ws); + + @ExposeWs('search') + search(WebSocketContext socket) async { + print('User is searching for a game...'); + socket.send('searched', johnVsBob); + } +} diff --git a/packages/websocket/test/controller/io_test.dart b/packages/websocket/test/controller/io_test.dart new file mode 100644 index 00000000..929e3eb8 --- /dev/null +++ b/packages/websocket/test/controller/io_test.dart @@ -0,0 +1,70 @@ +import 'dart:io'; +import 'package:angel_container/mirrors.dart'; +import 'package:angel_framework/angel_framework.dart' as srv; +import "package:angel_framework/http.dart" as srv; +import 'package:angel_websocket/io.dart' as ws; +import 'package:angel_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; +import 'common.dart'; + +main() { + srv.Angel app; + srv.AngelHttp http; + ws.WebSockets client; + srv.AngelWebSocket websockets; + HttpServer server; + String url; + + setUp(() async { + app = new srv.Angel(reflector: const MirrorsReflector()); + http = new srv.AngelHttp(app, useZone: false); + + websockets = new srv.AngelWebSocket(app) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); + await app.configure(new GameController(websockets).configureServer); + app.logger = new Logger('angel_auth')..onRecord.listen(print); + + server = await http.startServer(); + url = 'ws://${server.address.address}:${server.port}/ws'; + + client = new ws.WebSockets(url); + await client.connect(timeout: new Duration(seconds: 3)); + + print('Connected'); + + client + ..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client.close(); + await http.close(); + app = null; + client = null; + server = null; + url = null; + }); + + group('controller.io', () { + test('search', () async { + client.sendAction(new ws.WebSocketAction(eventName: 'search')); + var search = await client.on['searched'].first; + print('Searched: ${search.data}'); + expect(new Game.fromJson(search.data as Map), equals(johnVsBob)); + }); + }); +} diff --git a/packages/websocket/test/service/browser_test.dart b/packages/websocket/test/service/browser_test.dart new file mode 100644 index 00000000..b11d0664 --- /dev/null +++ b/packages/websocket/test/service/browser_test.dart @@ -0,0 +1,5 @@ +import 'package:test/test.dart'; + +main() { + group('service.browser', () {}); +} diff --git a/packages/websocket/test/service/common.dart b/packages/websocket/test/service/common.dart new file mode 100644 index 00000000..9d327860 --- /dev/null +++ b/packages/websocket/test/service/common.dart @@ -0,0 +1,35 @@ +import 'dart:async'; + +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_model/angel_model.dart'; +import 'package:angel_websocket/base_websocket_client.dart'; +import 'package:angel_websocket/server.dart'; +import 'package:test/test.dart'; + +class Todo extends Model { + String text; + String when; + + Todo({String this.text, String this.when}); +} + +class TodoService extends MapService { + TodoService() : super() { + configuration['ws:filter'] = + (HookedServiceEvent e, WebSocketContext socket) { + print('Hello, service filter world!'); + return true; + }; + } +} + +testIndex(BaseWebSocketClient client) async { + var todoService = client.service('api/todos'); + scheduleMicrotask(() => todoService.index()); + + var indexed = await todoService.onIndexed.first; + print('indexed: $indexed'); + + expect(indexed, isList); + expect(indexed, isEmpty); +} diff --git a/packages/websocket/test/service/io_test.dart b/packages/websocket/test/service/io_test.dart new file mode 100644 index 00000000..318f1417 --- /dev/null +++ b/packages/websocket/test/service/io_test.dart @@ -0,0 +1,60 @@ +import 'dart:io'; +import 'package:angel_framework/angel_framework.dart' as srv; +import "package:angel_framework/http.dart" as srv; +import 'package:angel_websocket/io.dart' as ws; +import 'package:angel_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; +import 'common.dart'; + +main() { + srv.Angel app; + srv.AngelHttp http; + ws.WebSockets client; + srv.AngelWebSocket websockets; + HttpServer server; + String url; + + setUp(() async { + app = new srv.Angel()..use('/api/todos', new TodoService()); + http = new srv.AngelHttp(app, useZone: false); + + websockets = new srv.AngelWebSocket(app) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); + app.logger = new Logger('angel_auth')..onRecord.listen(print); + server = await http.startServer(); + url = 'ws://${server.address.address}:${server.port}/ws'; + + client = new ws.WebSockets(url); + await client.connect(); + + client + ..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client.close(); + await http.close(); + app = null; + client = null; + server = null; + url = null; + }); + + group('service.io', () { + test('index', () => testIndex(client)); + }); +} diff --git a/packages/websocket/web/index.html b/packages/websocket/web/index.html new file mode 100644 index 00000000..7728ad7e --- /dev/null +++ b/packages/websocket/web/index.html @@ -0,0 +1,9 @@ + + + + Client + + + + + \ No newline at end of file diff --git a/packages/websocket/web/main.dart b/packages/websocket/web/main.dart new file mode 100644 index 00000000..ffc04ed9 --- /dev/null +++ b/packages/websocket/web/main.dart @@ -0,0 +1,12 @@ +import 'dart:html'; +import 'package:angel_websocket/browser.dart'; + +/// Dummy app to ensure client works with DDC. +main() { + var app = new WebSockets(window.location.origin); + window.alert(app.baseUrl.toString()); + + app.connect().catchError((_) { + window.alert('no websocket'); + }); +}