diff --git a/packages/websocket/.gitignore b/packages/websocket/.gitignore new file mode 100644 index 0000000..24d6831 --- /dev/null +++ b/packages/websocket/.gitignore @@ -0,0 +1,71 @@ +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub +.dart_tool +.packages +.pub/ +build/ + +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ + +### Dart template +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub + +# SDK 1.20 and later (no longer creates packages directories) + +# Older SDK versions +# (Include if the minimum SDK version specified in pubsepc.yaml is earlier than 1.20) +.project +.buildlog +**/packages/ + + +# Files created by dart2js +# (Most Dart developers will use pub build to compile Dart, use/modify these +# rules if you intend to use dart2js directly +# Convention is to use extension '.dart.js' for Dart compiled to Javascript to +# differentiate from explicit Javascript files) +*.dart.js +*.part.js +*.js.deps +*.js.map +*.info.json + +# Directory created by dartdoc + +# Don't commit pubspec lock file +# (Library packages only! Remove pattern if developing an application package) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: + +## VsCode +.vscode/ + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +.idea/ +/out/ +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties diff --git a/packages/websocket/AUTHORS.md b/packages/websocket/AUTHORS.md new file mode 100644 index 0000000..ac95ab5 --- /dev/null +++ b/packages/websocket/AUTHORS.md @@ -0,0 +1,12 @@ +Primary Authors +=============== + +* __[Thomas Hii](dukefirehawk.apps@gmail.com)__ + + Thomas is the current maintainer of the code base. He has refactored and migrated the + code base to support NNBD. + +* __[Tobe O](thosakwe@gmail.com)__ + + Tobe has written much of the original code prior to NNBD migration. He has moved on and + is no longer involved with the project. diff --git a/packages/websocket/CHANGELOG.md b/packages/websocket/CHANGELOG.md new file mode 100644 index 0000000..a60c661 --- /dev/null +++ b/packages/websocket/CHANGELOG.md @@ -0,0 +1,136 @@ +# Change Log + +## 8.2.0 + +* Require Dart >= 3.3 +* Updated `lints` to 4.0.0 +* Updated `web_socket_channel` to 3.0.0 + +## 8.1.1 + +* Updated repository link + +## 8.1.0 + +* Updated `lints` to 3.0.0 +* Fixed linter warnings +* Updated `web_socket_channel` to versions below 2.4.1 temporarily. Starting with 2.4.1, its dependency on `dart:html` has been changed to `package:web` which requires a code refactoring to resolve. + +## 8.0.0 + +* Require Dart >= 3.0 + +## 7.0.0 + +* Require Dart >= 2.17 + +## 6.0.0 + +* Require Dart >= 2.16 + +## 5.0.0 + +* Skipped release + +## 4.1.2 + +* Updated `package:angel3_container` + +## 4.1.1 + +* Fixed issue with type casting +* Changed `app` parameter of `AngelWebSocket` to non-nullable + +## 4.1.0 + +* Updated `package:belatuk_merge_map` +* Updated linter to `package:lints` + +## 4.0.1 + +* Updated README +* Fixed authentication unit test +* Fixed NNBD issues +* All 3 unit tests passed + +## 4.0.0 + +* Migrated to support Dart >= 2.12 NNBD + +## 3.0.0 + +* Migrated to work with Dart >= 2.12 Non NNBD + +## 2.0.3 + +* Remove `WebSocketController.plugin`. +* Remove any unawaited futures. + +## 2.0.2 + +* Update `stream_channel` to `2.0.0`. +* Use `angel_framework^@2.0.0-rc.0`. + +## 2.0.1 + +* Add `reconnectOnClose` and `reconnectinterval` parameters in top-level `WebSockets` constructors. +* Close `WebSocketExtraneousEventHandler`. +* Add onAuthenticated to server-side. + +## 2.0.0 + +* Update to work with `client@2.0.0`. + +## 2.0.0-alpha.8 + +* Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever). + +## 2.0.0-alpha.7 + +* Replace `WebSocketSynchronizer` with `StreamChannel`. + +## 2.0.0-alpha.6 + +* Explicit import of `import 'package:http/io_client.dart' as http;` + +## 2.0.0-alpha.5 + +* Update `http` dependency. + +## 2.0.0-alpha.4 + +* Remove `package:json_god`. +* Make `WebSocketContext` take any `StreamChannel`. +* Strong typing updates. + +## 2.0.0-alpha.3 + +* Directly import Angel HTTP. + +## 2.0.0-alpha.2 + +* Updated for the next version of `angel_client`. + +## 2.0.0-alpha.1 + +* Refactorings for updated Angel 2 versions. +* Remove `package:dart2_constant`. + +## 2.0.0-alpha + +* Depend on Dart 2 and Angel 2. + +## 1.1.2 + +* Dart 2 updates. +* Added `handleClient`, which is nice for external implementations +that plug into `AngelWebSocket`. + +## 1.1.1 + +* Deprecated `unwrap`. +* Service streams now pump out `e.data`, rather than the actual event. + +## 1.1.0+1 + +* Added `unwrap`. diff --git a/packages/websocket/LICENSE b/packages/websocket/LICENSE new file mode 100644 index 0000000..df5e063 --- /dev/null +++ b/packages/websocket/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2021, dukefirehawk.com +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/packages/websocket/README.md b/packages/websocket/README.md new file mode 100644 index 0000000..2f627d5 --- /dev/null +++ b/packages/websocket/README.md @@ -0,0 +1,157 @@ +# Angel3 Websocket + +![Pub Version (including pre-releases)](https://img.shields.io/pub/v/platform_websocket?include_prereleases) +[![Null Safety](https://img.shields.io/badge/null-safety-brightgreen)](https://dart.dev/null-safety) +[![Discord](https://img.shields.io/discord/1060322353214660698)](https://discord.gg/3X6bxTUdCM) +[![License](https://img.shields.io/github/license/dart-backend/angel)](https://github.com/dart-backend/angel/tree/master/packages/websocket/LICENSE) + +WebSocket plugin for Angel3 framework. This plugin broadcasts events from hooked services via WebSockets. In addition, it adds itself to the app's IoC container as `AngelWebSocket`, so that it can be used in controllers as well. + +WebSocket contexts are add to `req.properties` as `'socket'`. + +## Usage + +### Server-side + +```dart +import "package:angel3_framework/angel3_framework.dart"; +import "package:platform_websocket/server.dart"; + +void main() async { + var app = Angel(); + + var ws = AngelWebSocket(); + + // This is a plug-in. It hooks all your services, + // to automatically broadcast events. + await app.configure(ws.configureServer); + + // Listen for requests at `/ws`. + app.all('/ws', ws.handleRequest); +} + +``` + +Filtering events is easy with hooked services. Just return a `bool`, whether synchronously or asynchronously. + +```dart +myService.properties['ws:filter'] = (HookedServiceEvent e, WebSocketContext socket) async { + return true; +} + +myService.index({ + 'ws:filter': (e, socket) => ...; +}); +``` + +#### Adding Handlers within a Controller + +`WebSocketController` extends a normal `Controller`, but also listens to WebSockets. + +```dart +import 'dart:async'; +import "package:angel3_framework/angel3_framework.dart"; +import "package:platform_websocket/server.dart"; + +@Expose("/") +class MyController extends WebSocketController { + // A reference to the WebSocket plug-in is required. + MyController(AngelWebSocket ws):super(ws); + + @override + void onConnect(WebSocketContext socket) { + // On connect... + } + + // Dependency injection works, too.. + @ExposeWs("read_message") + void sendMessage(WebSocketContext socket, WebSocketAction action, Db db) async { + socket.send( + "found_message", + db.collection("messages").findOne(where.id(action.data['message_id']))); + } + + // Event filtering + @ExposeWs("foo") + void foo() { + broadcast( WebSocketEvent(...), filter: (socket) async => ...); + } +} +``` + +### Client Use + +This repo also provides two client libraries `browser` and `io` that extend the base `angel3_client` interface, and allow you to use a very similar API on the client to that of the server. + +The provided clients also automatically try to reconnect their WebSockets when disconnected, which means you can restart your development server without having to reload browser windows. + +They also provide streams of data that pump out filtered data as it comes in from the server. + +Clients can even perform authentication over WebSockets. + +#### In the Browser + +```dart +import "package:platform_websocket/browser.dart"; + +void main() async { + Angel app = WebSockets("/ws"); + await app.connect(); + + var Cars = app.service("api/cars"); + + Cars.onCreated.listen((car) => print("New car: $car")); + + // Happens asynchronously + Cars.create({"brand": "Toyota"}); + + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); + + // Listen for arbitrary events + app.on['custom_event'].listen((event) { + // For example, this might be sent by a + // WebSocketController. + print('Hi!'); + }); +} +``` + +#### CLI Client + +```dart +import "package:angel3_framework/common.dart"; +import "package:platform_websocket/io.dart"; + +// You can include these in a shared file and access on both client and server +class Car extends Model { + int year; + String brand, make; + + Car({this.year, this.brand, this.make}); + + @override String toString() => "$year $brand $make"; +} + +void main() async { + Angel app = WebSockets("/ws"); + + // Wait for WebSocket connection... + await app.connect(); + + var Cars = app.service("api/cars", type: Car); + + Cars.onCreated.listen((Car car) { + // Automatically deserialized into a car :) + // + // I just bought a new 2016 Toyota Camry! + print("I just bought a new $car!"); + }); + + // Happens asynchronously + Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"}); + + // Authenticate a WebSocket, if you were not already authenticated... + app.authenticateViaJwt(''); +} +``` diff --git a/packages/websocket/analysis_options.yaml b/packages/websocket/analysis_options.yaml new file mode 100644 index 0000000..ea2c9e9 --- /dev/null +++ b/packages/websocket/analysis_options.yaml @@ -0,0 +1 @@ +include: package:lints/recommended.yaml \ No newline at end of file diff --git a/packages/websocket/example/index.html b/packages/websocket/example/index.html new file mode 100644 index 0000000..d2f4a4c --- /dev/null +++ b/packages/websocket/example/index.html @@ -0,0 +1,30 @@ + + + + + + + Angel WS + + + + + diff --git a/packages/websocket/example/main.dart b/packages/websocket/example/main.dart new file mode 100644 index 0000000..5bb29aa --- /dev/null +++ b/packages/websocket/example/main.dart @@ -0,0 +1,59 @@ +import 'dart:io'; +import 'package:platform_foundation/core.dart'; +import 'package:platform_foundation/http.dart'; +import 'package:platform_foundation/http2.dart'; +import 'package:platform_websocket/server.dart'; +import 'package:file/local.dart'; +import 'package:logging/logging.dart'; + +void main(List args) async { + var app = Application(); + var http = PlatformHttp(app); + var ws = AngelWebSocket(app, sendErrors: !app.environment.isProduction); + var fs = const LocalFileSystem(); + app.logger = Logger('platform_websocket'); + + // This is a plug-in. It hooks all your services, + // to automatically broadcast events. + await app.configure(ws.configureServer); + + app.get('/', (req, res) => res.streamFile(fs.file('example/index.html'))); + + // Listen for requests at `/ws`. + app.get('/ws', ws.handleRequest); + + app.fallback((req, res) => throw PlatformHttpException.notFound()); + + ws.onConnection.listen((socket) { + var h = socket.request.headers; + print('WebSocket onConnection $h'); + + socket.onData.listen((x) { + socket.send('pong', x); + }); + }); + + if (args.contains('http2')) { + var ctx = SecurityContext() + ..useCertificateChain('dev.pem') + ..usePrivateKey('dev.key', password: 'dartdart'); + + try { + ctx.setAlpnProtocols(['h2'], true); + } catch (e, st) { + app.logger.severe( + 'Cannot set ALPN protocol on server to `h2`. The server will only serve HTTP/1.x.', + e, + st, + ); + } + + var http2 = PlatformHttp2(app, ctx); + http2.onHttp1.listen(http.handleRequest); + await http2.startServer('127.0.0.1', 3000); + print('Listening at ${http2.uri}'); + } else { + await http.startServer('127.0.0.1', 3000); + print('Listening at ${http.uri}'); + } +} diff --git a/packages/websocket/lib/base_websocket_client.dart b/packages/websocket/lib/base_websocket_client.dart new file mode 100644 index 0000000..03f3970 --- /dev/null +++ b/packages/websocket/lib/base_websocket_client.dart @@ -0,0 +1,461 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; +import 'package:platform_client/platform_client.dart'; +import 'package:platform_client/base_platform_client.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/status.dart' as status; +import 'platform_websocket.dart'; +import 'constants.dart'; + +final RegExp _straySlashes = RegExp(r'(^/)|(/+$)'); + +/// An [Angel] client that operates across WebSockets. +abstract class BaseWebSocketClient extends BaseAngelClient { + Duration? _reconnectInterval; + WebSocketChannel? _socket; + final Queue _queue = Queue(); + + final StreamController _onData = StreamController(); + final StreamController _onAllEvents = + StreamController(); + final StreamController _onAuthenticated = + StreamController(); + final StreamController _onError = + StreamController(); + final StreamController> _onServiceEvent = + StreamController>.broadcast(); + final StreamController + _onWebSocketChannelException = + StreamController(); + + /// Use this to handle events that are not standard. + final WebSocketExtraneousEventHandler on = WebSocketExtraneousEventHandler(); + + /// Fired on all events. + Stream get onAllEvents => _onAllEvents.stream; + + /// Fired whenever a WebSocket is successfully authenticated. + @override + Stream get onAuthenticated => _onAuthenticated.stream; + + /// A broadcast stream of data coming from the [socket]. + /// + /// Mostly just for internal use. + Stream get onData => _onData.stream; + + /// Fired on errors. + Stream get onError => _onError.stream; + + /// Fired whenever an event is fired by a service. + Stream> get onServiceEvent => + _onServiceEvent.stream; + + /// Fired on [WebSocketChannelException]s. + Stream get onWebSocketChannelException => + _onWebSocketChannelException.stream; + + /// The [WebSocketChannel] underneath this instance. + WebSocketChannel? get socket => _socket; + + /// If `true` (default), then the client will automatically try to reconnect to the server + /// if the socket closes. + final bool reconnectOnClose; + + /// The amount of time to wait between reconnect attempts. Default: 10 seconds. + Duration? get reconnectInterval => _reconnectInterval; + + Uri? _wsUri; + + /// The [Uri] to which a websocket should point. + Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl); + + static Uri _toWsUri(Uri u) { + if (u.hasScheme) { + if (u.scheme == 'http') { + return u.replace(scheme: 'ws'); + } else if (u.scheme == 'https') { + return u.replace(scheme: 'wss'); + } else { + return u; + } + } else { + return _toWsUri(u.replace(scheme: Uri.base.scheme)); + } + } + + BaseWebSocketClient(super.client, super.baseUrl, + {this.reconnectOnClose = true, Duration? reconnectInterval}) { + _reconnectInterval = reconnectInterval ?? Duration(seconds: 10); + } + + @override + Future close() async { + on._close(); + scheduleMicrotask(() async { + await _socket!.sink.close(status.normalClosure); + await _onData.close(); + await _onAllEvents.close(); + await _onAuthenticated.close(); + await _onError.close(); + await _onServiceEvent.close(); + await _onWebSocketChannelException.close(); + }); + } + + /// Connects the WebSocket. [timeout] is optional. + Future connect({Duration? timeout}) async { + if (timeout != null) { + var c = Completer(); + late Timer timer; + + timer = Timer(timeout, () { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + c.completeError(TimeoutException( + 'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms', + timeout)); + } + }); + + scheduleMicrotask(() { + getConnectedWebSocket().then((socket) { + if (!c.isCompleted) { + if (timer.isActive) timer.cancel(); + + while (_queue.isNotEmpty) { + var action = _queue.removeFirst(); + socket.sink.add(serialize(action)); + } + + c.complete(socket); + } + }).catchError((e, StackTrace st) { + if (!c.isCompleted) { + if (timer.isActive) { + timer.cancel(); + } + + // TODO: Re-evaluate this error + var obj = 'Error'; + c.completeError(obj, st); + } + }); + }); + + return await c.future.then((socket) { + _socket = socket; + listen(); + + return _socket; + }); + } else { + _socket = await getConnectedWebSocket(); + listen(); + return _socket; + } + } + + /// Returns a new [WebSocketChannel], ready to be listened on. + /// + /// This should be overriden by child classes, **NOT** [connect]. + Future getConnectedWebSocket(); + + @override + Service service(String path, + {Type? type, AngelDeserializer? deserializer}) { + var uri = path.toString().replaceAll(_straySlashes, ''); + var wsService = WebSocketsService(socket, this, uri, + deserializer: deserializer); + return wsService as Service; + } + + /// Starts listening for data. + void listen() { + _socket?.stream.listen( + (data) { + _onData.add(data); + + if (data is WebSocketChannelException) { + _onWebSocketChannelException.add(data); + } else if (data is String) { + var jsons = json.decode(data); + + if (jsons is Map) { + var event = WebSocketEvent.fromJson(jsons); + + if (event.eventName?.isNotEmpty == true) { + _onAllEvents.add(event); + on._getStream(event.eventName)!.add(event); + } + + if (event.eventName == errorEvent) { + var error = + PlatformHttpException.fromMap((event.data ?? {}) as Map); + _onError.add(error); + } else if (event.eventName == authenticatedEvent) { + var authResult = AngelAuthResult.fromMap(event.data as Map?); + _onAuthenticated.add(authResult); + } else if (event.eventName?.isNotEmpty == true) { + var split = event.eventName! + .split('::') + .where((str) => str.isNotEmpty) + .toList(); + + if (split.length >= 2) { + var serviceName = split[0], eventName = split[1]; + _onServiceEvent + .add({serviceName: event..eventName = eventName}); + } + } + } + } + }, + cancelOnError: true, + onDone: () { + _socket = null; + if (reconnectOnClose == true) { + Timer.periodic(reconnectInterval!, (Timer timer) async { + WebSocketChannel? result; + + try { + result = await connect(timeout: reconnectInterval); + } catch (e) { + // + } + + if (result != null) timer.cancel(); + }); + } + }); + } + + /// Serializes data to JSON. + dynamic serialize(x) => json.encode(x); + + /// Sends the given [action] on the [socket]. + void sendAction(WebSocketAction action) { + if (_socket == null) { + _queue.addLast(action); + } else { + socket?.sink.add(serialize(action)); + } + } + + /// Attempts to authenticate a WebSocket, using a valid JWT. + void authenticateViaJwt(String? jwt) { + sendAction(WebSocketAction( + eventName: authenticateAction, + params: { + 'query': {'jwt': jwt} + }, + )); + } +} + +/// A [Service] that asynchronously interacts with the server. +class WebSocketsService extends Service { + /// The [BaseWebSocketClient] that spawned this service. + @override + final BaseWebSocketClient app; + + /// Used to deserialize JSON into typed data. + final AngelDeserializer? deserializer; + + /// The [WebSocketChannel] to listen to, and send data across. + final WebSocketChannel? socket; + + /// The service path to listen to. + final String path; + + final StreamController _onAllEvents = + StreamController(); + final StreamController> _onIndexed = StreamController(); + final StreamController _onRead = StreamController(); + final StreamController _onCreated = StreamController(); + final StreamController _onModified = StreamController(); + final StreamController _onUpdated = StreamController(); + final StreamController _onRemoved = StreamController(); + + /// Fired on all events. + Stream get onAllEvents => _onAllEvents.stream; + + /// Fired on `index` events. + @override + Stream> get onIndexed => _onIndexed.stream; + + /// Fired on `read` events. + @override + Stream get onRead => _onRead.stream; + + /// Fired on `created` events. + @override + Stream get onCreated => _onCreated.stream; + + /// Fired on `modified` events. + @override + Stream get onModified => _onModified.stream; + + /// Fired on `updated` events. + @override + Stream get onUpdated => _onUpdated.stream; + + /// Fired on `removed` events. + @override + Stream get onRemoved => _onRemoved.stream; + + WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) { + listen(); + } + + @override + Future close() async { + await _onAllEvents.close(); + await _onCreated.close(); + await _onIndexed.close(); + await _onModified.close(); + await _onRead.close(); + await _onRemoved.close(); + await _onUpdated.close(); + } + + /// Serializes an [action] to be sent over a WebSocket. + dynamic serialize(WebSocketAction action) => json.encode(action); + + /// Deserializes data from a [WebSocketEvent]. + Data? deserialize(x) { + return deserializer != null ? deserializer!(x) : x as Data?; + } + + /// Deserializes the contents of an [event]. + WebSocketEvent transformEvent(WebSocketEvent event) { + return WebSocketEvent( + eventName: event.eventName, data: deserialize(event.data)); + } + + /// Starts listening for events. + void listen() { + app.onServiceEvent.listen((map) { + if (map.containsKey(path)) { + var event = map[path]!; + + _onAllEvents.add(event); + + if (event.eventName == indexedEvent) { + var d = event.data; + var transformed = WebSocketEvent( + eventName: event.eventName, + data: d is Iterable ? d.map(deserialize).toList() : null); + if (transformed.data != null) { + _onIndexed.add(transformed.data!); + } + return; + } + + var transformed = transformEvent(event).data; + + switch (event.eventName) { + case readEvent: + _onRead.add(transformed); + break; + case createdEvent: + _onCreated.add(transformed); + break; + case modifiedEvent: + _onModified.add(transformed); + break; + case updatedEvent: + _onUpdated.add(transformed); + break; + case removedEvent: + _onRemoved.add(transformed); + break; + } + } + }); + } + + /// Sends the given [action] on the [socket]. + void send(WebSocketAction action) { + app.sendAction(action); + } + + @override + Future?> index([Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$indexAction', params: params ?? {})); + return null; + } + + @override + Future read(id, [Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$readAction', + id: id.toString(), + params: params ?? {})); + return null; + } + + @override + Future create(data, [Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$createAction', data: data, params: params ?? {})); + return null; + } + + @override + Future modify(id, data, [Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$modifyAction', + id: id.toString(), + data: data, + params: params ?? {})); + return null; + } + + @override + Future update(id, data, [Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$updateAction', + id: id.toString(), + data: data, + params: params ?? {})); + return null; + } + + @override + Future remove(id, [Map? params]) async { + app.sendAction(WebSocketAction( + eventName: '$path::$removeAction', + id: id.toString(), + params: params ?? {})); + return null; + } +} + +/// Contains a dynamic Map of [WebSocketEvent] streams. +class WebSocketExtraneousEventHandler { + final Map> _events = {}; + + StreamController? _getStream(String? index) { + if (_events[index] == null) { + _events[index] = StreamController(); + } + + return _events[index]; + } + + Stream operator [](String index) { + if (_events[index] == null) { + _events[index] = StreamController(); + } + + return _events[index]!.stream; + } + + void _close() { + for (var s in _events.values) { + s.close(); + } + } +} diff --git a/packages/websocket/lib/browser.dart b/packages/websocket/lib/browser.dart new file mode 100644 index 0000000..25a1677 --- /dev/null +++ b/packages/websocket/lib/browser.dart @@ -0,0 +1,111 @@ +/// Browser WebSocket client library for the Angel framework. +library platform_websocket.browser; + +import 'dart:async'; +import 'dart:html'; +import 'package:platform_client/platform_client.dart'; +import 'package:http/browser_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/html.dart'; +import 'base_websocket_client.dart'; +export 'platform_websocket.dart'; + +final RegExp _straySlashes = RegExp(r'(^/)|(/+$)'); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration? reconnectInterval}) + : super(http.BrowserClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token', String? errorMessage}) { + var ctrl = StreamController(); + var wnd = window.open(url, 'angel_client_auth_popup'); + + Timer t; + StreamSubscription? sub; + t = Timer.periodic(Duration(milliseconds: 500), (timer) { + if (!ctrl.isClosed) { + if (wnd.closed == true) { + ctrl.addError(PlatformHttpException.notAuthenticated( + message: + errorMessage ?? 'Authentication via popup window failed.')); + ctrl.close(); + timer.cancel(); + sub?.cancel(); + } + } else { + timer.cancel(); + } + }); + + sub = window.on[eventName].listen((e) { + if (!ctrl.isClosed) { + ctrl.add((e as CustomEvent).detail.toString()); + t.cancel(); + ctrl.close(); + sub?.cancel(); + } + }); + + return ctrl.stream; + } + + @override + Future getConnectedWebSocket() { + var url = websocketUri; + + if (authToken?.isNotEmpty == true) { + url = url.replace( + queryParameters: Map.from(url.queryParameters) + ..['token'] = authToken); + } + + var socket = WebSocket(url.toString()); + var completer = Completer(); + + socket + ..onOpen.listen((_) { + if (!completer.isCompleted) { + return completer.complete(HtmlWebSocketChannel(socket)); + } + }) + ..onError.listen((e) { + if (!completer.isCompleted) { + return completer.completeError(e is ErrorEvent ? e.error! : e); + } + }); + + return completer.future; + } + + @override + Service service(String path, + {Type? type, AngelDeserializer? deserializer}) { + var uri = path.replaceAll(_straySlashes, ''); + return BrowserWebSocketsService(socket, this, uri, + deserializer: deserializer) as Service; + } +} + +class BrowserWebSocketsService extends WebSocketsService { + final Type? type; + + BrowserWebSocketsService(super.socket, WebSockets super.app, super.uri, + {this.type, super.deserializer}); +} diff --git a/packages/websocket/lib/constants.dart b/packages/websocket/lib/constants.dart new file mode 100644 index 0000000..71d935f --- /dev/null +++ b/packages/websocket/lib/constants.dart @@ -0,0 +1,36 @@ +const String authenticateAction = 'authenticate'; +const String indexAction = 'index'; +const String readAction = 'read'; +const String createAction = 'create'; +const String modifyAction = 'modify'; +const String updateAction = 'update'; +const String removeAction = 'remove'; + +const String authenticatedEvent = 'authenticated'; +const String errorEvent = 'error'; +const String indexedEvent = 'indexed'; +const String readEvent = 'read'; +const String createdEvent = 'created'; +const String modifiedEvent = 'modified'; +const String updatedEvent = 'updated'; +const String removedEvent = 'removed'; + +/// The standard Angel service actions. +const List actions = [ + indexAction, + readAction, + createAction, + modifyAction, + updateAction, + removeAction +]; + +/// The standard Angel service events. +const List events = [ + indexedEvent, + readEvent, + createdEvent, + modifiedEvent, + updatedEvent, + removedEvent +]; diff --git a/packages/websocket/lib/flutter.dart b/packages/websocket/lib/flutter.dart new file mode 100644 index 0000000..6520cdf --- /dev/null +++ b/packages/websocket/lib/flutter.dart @@ -0,0 +1,49 @@ +/// Flutter-compatible WebSocket client library for the Angel framework. +library angel_websocket.flutter; + +import 'dart:async'; +import 'dart:io'; +import 'package:http/io_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'base_websocket_client.dart'; +export 'package:platform_client/platform_client.dart'; +export 'platform_websocket.dart'; + +// final RegExp _straySlashes = RegExp(r"(^/)|(/+$)"); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration? reconnectInterval}) + : super(http.IOClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token'}) { + throw UnimplementedError( + 'Opening popup windows is not supported in the `dart:io` client.'); + } + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Future getConnectedWebSocket() async { + var socket = await WebSocket.connect(websocketUri.toString(), + headers: authToken?.isNotEmpty == true + ? {'Authorization': 'Bearer $authToken'} + : {}); + return IOWebSocketChannel(socket); + } +} diff --git a/packages/websocket/lib/hooks.dart b/packages/websocket/lib/hooks.dart new file mode 100644 index 0000000..2e036f5 --- /dev/null +++ b/packages/websocket/lib/hooks.dart @@ -0,0 +1,29 @@ +import 'package:platform_foundation/core.dart'; + +/// Prevents a WebSocket event from being broadcasted, to any client from the given [provider]. +/// +/// [provider] can be a String, a [Provider], or an Iterable. +/// If [provider] is `null`, any provider will be blocked. +HookedServiceEventListener doNotBroadcast([provider]) { + return (HookedServiceEvent e) { + if (e.params.containsKey('provider')) { + var eParam = e.params; + var deny = false; + var providers = provider is Iterable ? provider : [provider]; + + for (var p in providers) { + if (deny) break; + + if (p is Providers) { + deny = deny || p == eParam['provider'] || eParam['provider'] == p.via; + } else if (p == null) { + deny = true; + } else { + deny = deny || (eParam['provider'] as Providers).via == p.toString(); + } + } + + eParam['broadcast'] = false; + } + }; +} diff --git a/packages/websocket/lib/io.dart b/packages/websocket/lib/io.dart new file mode 100644 index 0000000..f57fff0 --- /dev/null +++ b/packages/websocket/lib/io.dart @@ -0,0 +1,64 @@ +/// Command-line WebSocket client library for the Angel framework. +library platform_websocket.io; + +import 'dart:async'; +import 'dart:io'; +import 'package:platform_client/platform_client.dart'; +import 'package:http/io_client.dart' as http; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'base_websocket_client.dart'; +export 'package:platform_client/platform_client.dart'; +export 'platform_websocket.dart'; + +final RegExp _straySlashes = RegExp(r'(^/)|(/+$)'); + +/// Queries an Angel server via WebSockets. +class WebSockets extends BaseWebSocketClient { + final List _services = []; + + WebSockets(baseUrl, + {bool reconnectOnClose = true, Duration? reconnectInterval}) + : super(http.IOClient(), baseUrl, + reconnectOnClose: reconnectOnClose, + reconnectInterval: reconnectInterval); + + @override + Stream authenticateViaPopup(String url, + {String eventName = 'token'}) { + throw UnimplementedError( + 'Opening popup windows is not supported in the `dart:io` client.'); + } + + @override + Future close() { + for (var service in _services) { + service.close(); + } + + return super.close(); + } + + @override + Future getConnectedWebSocket() async { + var socket = await WebSocket.connect(websocketUri.toString(), + headers: authToken?.isNotEmpty == true + ? {'Authorization': 'Bearer $authToken'} + : {}); + return IOWebSocketChannel(socket); + } + + @override + Service service(String path, + {Type? type, AngelDeserializer? deserializer}) { + var uri = path.replaceAll(_straySlashes, ''); + return IoWebSocketsService(socket, this, uri, type) + as Service; + } +} + +class IoWebSocketsService extends WebSocketsService { + final Type? type; + + IoWebSocketsService(super.socket, WebSockets super.app, super.uri, this.type); +} diff --git a/packages/websocket/lib/platform_websocket.dart b/packages/websocket/lib/platform_websocket.dart new file mode 100644 index 0000000..1514437 --- /dev/null +++ b/packages/websocket/lib/platform_websocket.dart @@ -0,0 +1,45 @@ +/// WebSocket plugin for Angel. +library platform_websocket; + +/// A notification from the server that something has occurred. +class WebSocketEvent { + String? eventName; + Data? data; + + WebSocketEvent({this.eventName, this.data}); + + factory WebSocketEvent.fromJson(Map data) => WebSocketEvent( + eventName: data['eventName'].toString(), data: data['data'] as Data?); + + WebSocketEvent cast() { + if (T == Data) { + return this as WebSocketEvent; + } else { + return WebSocketEvent(eventName: eventName, data: data as T?); + } + } + + Map toJson() { + return {'eventName': eventName, 'data': data}; + } +} + +/// A command sent to the server, usually corresponding to a service method. +class WebSocketAction { + String? id; + String? eventName; + dynamic data; + Map params; + + WebSocketAction({this.id, this.eventName, this.data, this.params = const {}}); + + factory WebSocketAction.fromJson(Map data) => WebSocketAction( + id: data['id'].toString(), + eventName: data['eventName'].toString(), + data: data['data'], + params: data['params'] as Map? ?? {}); + + Map toJson() { + return {'id': id, 'eventName': eventName, 'data': data, 'params': params}; + } +} diff --git a/packages/websocket/lib/server.dart b/packages/websocket/lib/server.dart new file mode 100644 index 0000000..4733d05 --- /dev/null +++ b/packages/websocket/lib/server.dart @@ -0,0 +1,521 @@ +/// Server-side support for WebSockets. +library platform_websocket.server; + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:mirrors'; +import 'package:platform_auth/auth.dart'; +import 'package:platform_foundation/core.dart'; +import 'package:platform_foundation/http.dart'; +import 'package:platform_foundation/http2.dart'; +import 'package:platform_merge_map/merge_map.dart'; +import 'package:logging/logging.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket_channel/io.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:collection/collection.dart' show IterableExtension; +import 'platform_websocket.dart'; +import 'constants.dart'; +export 'platform_websocket.dart'; + +part 'websocket_context.dart'; + +part 'websocket_controller.dart'; + +typedef WebSocketResponseSerializer = String Function(dynamic data); + +/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s. +class AngelWebSocket { + final List _clients = []; + final List _servicesAlreadyWired = []; + + final StreamController _onAction = + StreamController(); + final StreamController _onData = StreamController(); + final StreamController _onConnection = + StreamController.broadcast(); + final StreamController _onDisconnect = + StreamController.broadcast(); + + final Application app; + + /// If this is not `true`, then all client-side service parameters will be + /// discarded, other than `params['query']`. + final bool allowClientParams; + + /// An optional whitelist of allowed client origins, or [:null:]. + final List allowedOrigins; + + /// An optional whitelist of allowed client protocols, or [:null:]. + final List allowedProtocols; + + /// If `true`, then clients can authenticate their WebSockets by sending a valid JWT. + final bool allowAuth; + + /// Send error information across WebSockets, without including debug information.. + final bool sendErrors; + + /// A list of clients currently connected to this server via WebSockets. + List get clients => List.unmodifiable(_clients); + + /// Services that have already been hooked to fire socket events. + List get servicesAlreadyWired => + List.unmodifiable(_servicesAlreadyWired); + + Logger get _log => app.logger; + + /// Used to notify other nodes of an event's firing. Good for scaled applications. + final StreamChannel? synchronizationChannel; + + /// Fired on any [WebSocketAction]. + Stream get onAction => _onAction.stream; + + /// Fired whenever a WebSocket sends data. + Stream get onData => _onData.stream; + + /// Fired on incoming connections. + Stream get onConnection => _onConnection.stream; + + /// Fired when a user disconnects. + Stream get onDisconnection => _onDisconnect.stream; + + /// Serializes data to WebSockets. + WebSocketResponseSerializer? serializer; + + /// Deserializes data from WebSockets. + Function? deserializer; + + AngelWebSocket(this.app, + {this.sendErrors = false, + this.allowClientParams = false, + this.allowAuth = true, + this.synchronizationChannel, + this.serializer, + this.deserializer, + this.allowedOrigins = const [], + this.allowedProtocols = const []}) { + serializer ??= json.encode; + deserializer ??= (params) => params; + } + + /* + * Deprecated. Original code that failed to compile after upgrading + */ + /* + HookedServiceEventListener serviceHookOriginal(String path) { + return (HookedServiceEvent e) async { + if (e.params != null && e.params['broadcast'] == false) { + return; + } + var event = await transformEvent(e); + event.eventName = '$path::${event.eventName}'; + + dynamic _filter(WebSocketContext socket) { + if (e.service.configuration.containsKey('ws:filter')) { + return e.service.configuration['ws:filter'](e, socket); + } else if (e.params != null && e.params.containsKey('ws:filter')) { + return e.params['ws:filter'](e, socket); + } else { + return true; + } + } + + await batchEvent(event, filter: _filter); + }; + } + + FutureOr Function(HookedServiceEvent e) + serviceHook(String path) { + return (HookedServiceEvent e) async { + if (e.params != null && e.params['broadcast'] == false) { + return; + } + var event = await transformEvent(e); + event.eventName = '$path::${event.eventName}'; + + dynamic _filter(WebSocketContext socket) { + if (e.service.configuration.containsKey('ws:filter')) { + return e.service.configuration['ws:filter'](e, socket); + } else if (e.params != null && e.params.containsKey('ws:filter')) { + return e.params['ws:filter'](e, socket); + } else { + return true; + } + } + + await batchEvent(event, filter: _filter); + }; + } + */ + + FutureOr Function(HookedServiceEvent e) + serviceHook(String path) { + return (HookedServiceEvent e) async { + if (e.params['broadcast'] == false) return; + + var event = await transformEvent(e); + event.eventName = '$path::${event.eventName}'; + + dynamic filter(WebSocketContext socket) { + if (e.service.configuration.containsKey('ws:filter')) { + return e.service.configuration['ws:filter'](e, socket); + } else if (e.params.containsKey('ws:filter')) { + return e.params['ws:filter'](e, socket); + } else { + return true; + } + } + + await batchEvent(event, filter: filter); + }; + } + + /// Slates an event to be dispatched. + Future batchEvent(WebSocketEvent event, + {Function(WebSocketContext socket)? filter, bool notify = true}) async { + // Default implementation will just immediately fire events + for (var client in _clients) { + dynamic result = true; + if (filter != null) { + result = await filter(client); + } + if (result == true) { + client.channel.sink.add((serializer ?? json.encode)(event.toJson())); + } + } + + if (synchronizationChannel != null && notify != false) { + synchronizationChannel!.sink.add(event); + } + } + + /// Returns a list of events yet to be sent. + Future> getBatchedEvents() async => []; + + /// Responds to an incoming action on a WebSocket. + Future handleAction(WebSocketAction action, WebSocketContext socket) async { + var split = action.eventName!.split('::'); + + if (split.length < 2) { + socket.sendError(PlatformHttpException.badRequest()); + return null; + } + + var service = app.findService(split[0]); + + if (service == null) { + socket.sendError(PlatformHttpException.notFound( + message: 'No service "${split[0]}" exists.')); + return null; + } + + var actionName = split[1]; + + //if (action.params is! Map) action.params = {}; + + if (allowClientParams != true) { + if (action.params['query'] is Map) { + action.params = {'query': action.params['query']}; + } else { + action.params = {}; + } + } + + var params = mergeMap([ + (((deserializer ?? (params) => params)(action.params)) + as Map), + { + 'provider': Providers.websocket, + '__requestctx': socket.request, + '__responsectx': socket.response + } + ]); + + try { + if (actionName == indexAction) { + socket.send('${split[0]}::$indexedEvent', await service.index(params)); + return null; + } else if (actionName == readAction) { + socket.send( + '${split[0]}::$readEvent', await service.read(action.id, params)); + return null; + } else if (actionName == createAction) { + return WebSocketEvent( + eventName: '${split[0]}::$createdEvent', + data: await service.create(action.data, params)); + } else if (actionName == modifyAction) { + return WebSocketEvent( + eventName: '${split[0]}::$modifiedEvent', + data: await service.modify(action.id, action.data, params)); + } else if (actionName == updateAction) { + return WebSocketEvent( + eventName: '${split[0]}::$updatedEvent', + data: await service.update(action.id, action.data, params)); + } else if (actionName == removeAction) { + return WebSocketEvent( + eventName: '${split[0]}::$removedEvent', + data: await service.remove(action.id, params)); + } else { + socket.sendError(PlatformHttpException.methodNotAllowed( + message: 'Method Not Allowed: $actionName')); + return null; + } + } catch (e, st) { + _log.severe('Unable to handle unknown action'); + catchError(e, st, socket); + } + } + + /// Authenticates a [WebSocketContext]. + Future handleAuth(WebSocketAction action, WebSocketContext socket) async { + if (allowAuth != false && + action.eventName == authenticateAction && + action.params['query'] is Map && + action.params['query']['jwt'] is String) { + try { + var auth = socket.request.container!.make(); + var jwt = action.params['query']['jwt'] as String; + AuthToken token; + + token = AuthToken.validate(jwt, auth.hmac); + var user = await auth.deserializer(token.userId); + socket.request + ..container!.registerSingleton(token) + ..container!.registerSingleton(user, as: user.runtimeType); + socket._onAuthenticated.add(null); + socket.send(authenticatedEvent, + {'token': token.serialize(auth.hmac), 'data': user}); + } catch (e, st) { + _log.severe('Authentication failed'); + catchError(e, st, socket); + } + } else { + socket.sendError(PlatformHttpException.badRequest( + message: 'No JWT provided for authentication.')); + } + } + + /// Hooks a service up to have its events broadcasted. + dynamic hookupService(Pattern path, HookedService service) { + var localPath = path.toString(); + + service.after( + [ + HookedServiceEvent.created, + HookedServiceEvent.modified, + HookedServiceEvent.updated, + HookedServiceEvent.removed + ], + serviceHook(localPath), + ); + _servicesAlreadyWired.add(localPath); + } + + /// Runs before firing [onConnection]. + Future handleConnect(WebSocketContext socket) async {} + + /// Handles incoming data from a WebSocket. + dynamic handleData(WebSocketContext socket, data) async { + try { + socket._onData.add(data); + var fromJson = json.decode(data.toString()); + var action = WebSocketAction.fromJson(fromJson as Map); + _onAction.add(action); + + if (action.eventName == null || + action.eventName is! String || + action.eventName!.isEmpty) { + throw PlatformHttpException.badRequest(); + } + + if (fromJson.containsKey('eventName')) { + socket._onAction.add(WebSocketAction.fromJson(fromJson)); + socket.on + ._getStreamForEvent(fromJson['eventName'].toString())! + .add(fromJson['data'] as Map?); + } + + if (action.eventName == authenticateAction) { + await handleAuth(action, socket); + } + + if (action.eventName!.contains('::')) { + var split = action.eventName!.split('::'); + + if (split.length >= 2) { + if (actions.contains(split[1])) { + var event = await handleAction(action, socket); + if (event is Future) event = await event; + } + } + } + } catch (e, st) { + _log.severe('Invalid data'); + catchError(e, st, socket); + } + } + + void catchError(e, StackTrace st, WebSocketContext socket) { + // Send an error + if (e is PlatformHttpException) { + socket.sendError(e); + app.logger.severe(e.message, e.error ?? e, e.stackTrace); + } else if (sendErrors) { + var err = PlatformHttpException( + message: e.toString(), stackTrace: st, errors: [st.toString()]); + socket.sendError(err); + app.logger.severe(err.message, e, st); + } else { + var err = PlatformHttpException(); + socket.sendError(err); + app.logger.severe(e.toString(), e, st); + } + } + + /// Transforms a [HookedServiceEvent], so that it can be broadcasted. + Future transformEvent(HookedServiceEvent event) async { + return WebSocketEvent(eventName: event.eventName, data: event.result); + } + + /// Hooks any [HookedService]s that are not being broadcasted yet. + void wireAllServices(Application app) { + for (var key in app.services.keys.where((x) { + return !_servicesAlreadyWired.contains(x) && + app.services[x] is HookedService; + })) { + hookupService(key, app.services[key] as HookedService); + } + } + + /// Configures an [Application] instance to listen for WebSocket connections. + Future configureServer(Application app) async { + app.container.registerSingleton(this); + + if (runtimeType != AngelWebSocket) { + app.container.registerSingleton(this); + } + + // Set up services + wireAllServices(app); + + app.onService.listen((_) { + wireAllServices(app); + }); + + if (synchronizationChannel != null) { + synchronizationChannel?.stream + .listen((e) => batchEvent(e, notify: false)); + } + + app.shutdownHooks.add((_) => synchronizationChannel?.sink.close()); + } + + /// Handles an incoming [WebSocketContext]. + Future handleClient(WebSocketContext socket) async { + var origin = socket.request.headers?.value('origin'); + if (allowedOrigins.isNotEmpty && !allowedOrigins.contains(origin)) { + throw PlatformHttpException.forbidden( + message: + 'WebSocket connections are not allowed from the origin "$origin".'); + } + + _clients.add(socket); + await handleConnect(socket); + + _onConnection.add(socket); + + socket.request.container?.registerSingleton(socket); + + socket.channel.stream.listen( + (data) { + _onData.add(data); + handleData(socket, data); + }, + onDone: () { + _onDisconnect.add(socket); + _clients.remove(socket); + }, + onError: (e) { + _onDisconnect.add(socket); + _clients.remove(socket); + }, + cancelOnError: true, + ); + } + + /// Handles an incoming HTTP request. + Future handleRequest(RequestContext req, ResponseContext res) async { + if (req is HttpRequestContext && res is HttpResponseContext) { + if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest!)) { + throw PlatformHttpException.badRequest(); + } + res.detach(); + var ws = await WebSocketTransformer.upgrade(req.rawRequest!); + var channel = IOWebSocketChannel(ws); + var socket = WebSocketContext(channel, req, res); + scheduleMicrotask(() => handleClient(socket)); + return false; + } else if (req is Http2RequestContext && res is Http2ResponseContext) { + var connection = + req.headers?['connection']?.map((s) => s.toLowerCase().trim()); + var upgrade = req.headers?.value('upgrade')?.toLowerCase(); + var version = req.headers?.value('sec-websocket-version'); + var key = req.headers?.value('sec-websocket-key'); + var protocol = req.headers?.value('sec-websocket-protocol'); + + if (connection == null) { + throw PlatformHttpException.badRequest( + message: 'Missing `connection` header.'); + } else if (!connection.contains('upgrade')) { + throw PlatformHttpException.badRequest( + message: 'Missing "upgrade" in `connection` header.'); + } else if (upgrade != 'websocket') { + throw PlatformHttpException.badRequest( + message: 'The `upgrade` header must equal "websocket".'); + } else if (version != '13') { + throw PlatformHttpException.badRequest( + message: 'The `sec-websocket-version` header must equal "13".'); + } else if (key == null) { + throw PlatformHttpException.badRequest( + message: 'Missing `sec-websocket-key` header.'); + } else if (protocol != null && + allowedProtocols.isNotEmpty && + !allowedProtocols.contains(protocol)) { + throw PlatformHttpException.badRequest( + message: 'Disallowed `sec-websocket-protocol` header "$protocol".'); + } else { + var stream = res.detach(); + var ctrl = StreamChannelController>(); + + ctrl.local.stream.listen((buf) { + stream.sendData(buf); + }, onDone: () { + stream.outgoingMessages.close(); + }); + + if (req.hasParsedBody) { + await ctrl.local.sink.close(); + } else { + await req.body.pipe(ctrl.local.sink); + } + + var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink); + sink.add('HTTP/1.1 101 Switching Protocols\r\n' + 'Upgrade: websocket\r\n' + 'Connection: Upgrade\r\n' + 'Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n'); + if (protocol != null) sink.add('Sec-WebSocket-Protocol: $protocol\r\n'); + sink.add('\r\n'); + + //var ws = IOWebSocketChannel.connect(ctrl.foreign); + var socket = WebSocketContext(ctrl.foreign, req, res); + scheduleMicrotask(() => handleClient(socket)); + return false; + } + } else { + throw ArgumentError( + 'Not an HTTP/1.1 or HTTP/2 RequestContext+ResponseContext pair: $req, $res'); + } + } +} diff --git a/packages/websocket/lib/websocket_context.dart b/packages/websocket/lib/websocket_context.dart new file mode 100644 index 0000000..05cc8ac --- /dev/null +++ b/packages/websocket/lib/websocket_context.dart @@ -0,0 +1,75 @@ +part of 'server.dart'; + +/// Represents a WebSocket session, with the original +/// [RequestContext] and [ResponseContext] attached. +class WebSocketContext { + /// Use this to listen for events. + WebSocketEventTable on = WebSocketEventTable(); + + /// The underlying [StreamChannel]. + final StreamChannel channel; + + /// The original [RequestContext]. + final RequestContext request; + + /// The original [ResponseContext]. + final ResponseContext response; + + final StreamController _onAction = + StreamController(); + + final StreamController _onAuthenticated = StreamController(); + + final StreamController _onClose = StreamController(); + + final StreamController _onData = StreamController(); + + /// Fired on any [WebSocketAction]; + Stream get onAction => _onAction.stream; + + /// Fired when the user authenticates. + Stream get onAuthenticated => _onAuthenticated.stream; + + /// Fired once the underlying [WebSocket] closes. + Stream get onClose => _onClose.stream; + + /// Fired when any data is sent through [channel]. + Stream get onData => _onData.stream; + + WebSocketContext(this.channel, this.request, this.response); + + /// Closes the underlying [StreamChannel]. + Future close() async { + scheduleMicrotask(() async { + await channel.sink.close(); + await _onAction.close(); + await _onAuthenticated.close(); + await _onData.close(); + //_onClose.add(null); + await _onClose.close(); + }); + } + + /// Sends an arbitrary [WebSocketEvent]; + void send(String eventName, data) { + channel.sink.add( + json.encode(WebSocketEvent(eventName: eventName, data: data).toJson())); + } + + /// Sends an error event. + void sendError(PlatformHttpException error) => + send(errorEvent, error.toJson()); +} + +class WebSocketEventTable { + final Map> _handlers = {}; + + StreamController? _getStreamForEvent(String eventName) { + if (!_handlers.containsKey(eventName)) { + _handlers[eventName] = StreamController(); + } + return _handlers[eventName]; + } + + Stream operator [](String key) => _getStreamForEvent(key)!.stream; +} diff --git a/packages/websocket/lib/websocket_controller.dart b/packages/websocket/lib/websocket_controller.dart new file mode 100644 index 0000000..2be7108 --- /dev/null +++ b/packages/websocket/lib/websocket_controller.dart @@ -0,0 +1,90 @@ +part of 'server.dart'; + +/// Marks a method as available to WebSockets. +class ExposeWs { + final String eventName; + + const ExposeWs(this.eventName); +} + +/// A special controller that also supports WebSockets. +class WebSocketController extends Controller { + /// The plug-in instance powering this controller. + final AngelWebSocket ws; + + final Map _handlers = {}; + final Map _handlerSymbols = {}; + + WebSocketController(this.ws) : super(); + + /// Sends an event to all clients. + void broadcast(String eventName, data, + {Function(WebSocketContext socket)? filter}) { + ws.batchEvent(WebSocketEvent(eventName: eventName, data: data), + filter: filter); + } + + /// Fired on new connections. + dynamic onConnect(WebSocketContext socket) {} + + /// Fired on disconnections. + dynamic onDisconnect(WebSocketContext socket) {} + + /// Fired on all incoming actions. + dynamic onAction(WebSocketAction action, WebSocketContext socket) async {} + + /// Fired on arbitrary incoming data. + dynamic onData(data, WebSocketContext socket) {} + + @override + Future configureServer(Application app) async { + if (findExpose(app.container.reflector) != null) { + await super.configureServer(app); + } + + var instanceMirror = reflect(this); + var classMirror = reflectClass(runtimeType); + classMirror.instanceMembers.forEach((sym, mirror) { + if (mirror.isRegularMethod) { + var exposeMirror = mirror.metadata + .firstWhereOrNull((mirror) => mirror.reflectee is ExposeWs); + + if (exposeMirror != null) { + var exposeWs = exposeMirror.reflectee as ExposeWs; + _handlers[exposeWs.eventName] = mirror; + _handlerSymbols[exposeWs.eventName] = sym; + } + } + }); + + ws.onConnection.listen((socket) async { + if (!socket.request.container!.has()) { + socket.request.container!.registerSingleton(socket); + } + + await onConnect(socket); + + socket.onData.listen((data) => onData(data, socket)); + + socket.onAction.listen((WebSocketAction action) async { + var container = socket.request.container!.createChild(); + container.registerSingleton(action); + + try { + await onAction(action, socket); + + if (_handlers.containsKey(action.eventName)) { + var methodMirror = _handlers[action.eventName!]!; + var fn = instanceMirror.getField(methodMirror.simpleName).reflectee; + return app.runContained( + fn as Function, socket.request, socket.response, container); + } + } catch (e, st) { + ws.catchError(e, st, socket); + } + }); + }); + + ws.onDisconnection.listen(onDisconnect); + } +} diff --git a/packages/websocket/melos_angel3_websocket.iml b/packages/websocket/melos_angel3_websocket.iml new file mode 100644 index 0000000..389d07a --- /dev/null +++ b/packages/websocket/melos_angel3_websocket.iml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/packages/websocket/pubspec.yaml b/packages/websocket/pubspec.yaml new file mode 100644 index 0000000..12b1771 --- /dev/null +++ b/packages/websocket/pubspec.yaml @@ -0,0 +1,43 @@ +name: platform_websocket +version: 8.2.0 +description: This library provides WebSockets support for Angel3 framework. +homepage: https://angel3-framework.web.app/ +repository: https://github.com/dart-backend/angel/tree/master/packages/websocket +environment: + sdk: '>=3.3.0 <4.0.0' +dependencies: + platform_auth: ^8.0.0 + platform_client: ^8.0.0 + platform_foundation: ^8.0.0 + platform_support: ^8.0.0 + platform_merge_map: ^5.1.0 + http: ^1.0.0 + meta: ^1.8.0 + stream_channel: ^2.1.0 + web_socket_channel: ^3.0.0 + collection: ^1.17.0 + logging: ^1.1.0 +dev_dependencies: + platform_container: ^8.0.0 + platform_model: ^8.0.0 + quiver: ^3.2.0 + test: ^1.25.0 + lints: ^4.0.0 + file: ^7.0.0 +# dependency_overrides: +# angel3_container: +# path: ../container/angel_container +# angel3_framework: +# path: ../framework +# angel3_http_exception: +# path: ../http_exception +# angel3_model: +# path: ../model +# angel3_route: +# path: ../route +# angel3_mock_request: +# path: ../mock_request +# angel3_auth: +# path: ../auth +# angel3_client: +# path: ../client \ No newline at end of file diff --git a/packages/websocket/test/auth_test.dart b/packages/websocket/test/auth_test.dart new file mode 100644 index 0000000..a1be48a --- /dev/null +++ b/packages/websocket/test/auth_test.dart @@ -0,0 +1,66 @@ +import 'package:platform_auth/auth.dart'; +import 'package:platform_client/io.dart' as c; +import 'package:platform_foundation/core.dart'; +import 'package:platform_foundation/http.dart'; +import 'package:platform_websocket/io.dart' as c; +import 'package:platform_websocket/server.dart'; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; + +const Map user = {'username': 'foo', 'password': 'bar'}; + +void main() { + Application app; + late PlatformHttp http; + late c.Angel client; + late c.WebSockets ws; + + setUp(() async { + app = Application(); + http = PlatformHttp(app, useZone: false); + var auth = PlatformAuth( + serializer: (_) async => 'baz', deserializer: (_) async => user); + + auth.strategies['local'] = LocalAuthStrategy( + (username, password) async { + if (username == 'foo' && password == 'bar') { + return user; + } + + return {}; + }, + ); + + app.post('/auth/local', auth.authenticate('local')); + + await app.configure(auth.configureServer); + var sock = AngelWebSocket(app); + + await app.configure(sock.configureServer); + + app.all('/ws', sock.handleRequest); + app.logger = Logger('angel_auth')..onRecord.listen(print); + + var server = await http.startServer(); + + client = c.Rest('http://${server.address.address}:${server.port}'); + + ws = c.WebSockets('ws://${server.address.address}:${server.port}/ws'); + await ws.connect(); + }); + + tearDown(() { + http.close(); + client.close(); + ws.close(); + }); + + test('auth event fires', () async { + var localAuth = await client.authenticate(type: 'local', credentials: user); + print('JWT: ${localAuth.token}'); + + ws.authenticateViaJwt(localAuth.token); + var auth = await ws.onAuthenticated.first; + expect(auth.token, localAuth.token); + }); +} diff --git a/packages/websocket/test/controller/common.dart b/packages/websocket/test/controller/common.dart new file mode 100644 index 0000000..c015c97 --- /dev/null +++ b/packages/websocket/test/controller/common.dart @@ -0,0 +1,39 @@ +import 'package:platform_foundation/core.dart'; +import 'package:platform_websocket/server.dart'; +import 'package:quiver/core.dart'; + +class Game { + final String? playerOne, playerTwo; + + const Game({this.playerOne, this.playerTwo}); + + factory Game.fromJson(Map data) => Game( + playerOne: data['playerOne'].toString(), + playerTwo: data['playerTwo'].toString()); + + Map toJson() { + return {'playerOne': playerOne, 'playerTwo': playerTwo}; + } + + @override + bool operator ==(other) => + other is Game && + other.playerOne == playerOne && + other.playerTwo == playerTwo; + + @override + int get hashCode => hash2(playerOne, playerTwo); +} + +const Game johnVsBob = Game(playerOne: 'John', playerTwo: 'Bob'); + +@Expose('/game') +class GameController extends WebSocketController { + GameController(super.ws); + + @ExposeWs('search') + dynamic search(WebSocketContext socket) async { + print('User is searching for a game...'); + socket.send('searched', johnVsBob); + } +} diff --git a/packages/websocket/test/controller/io_test.dart b/packages/websocket/test/controller/io_test.dart new file mode 100644 index 0000000..b45b0f0 --- /dev/null +++ b/packages/websocket/test/controller/io_test.dart @@ -0,0 +1,69 @@ +import 'dart:io'; +import 'package:platform_container/mirrors.dart'; +import 'package:platform_foundation/core.dart' as srv; +import 'package:platform_foundation/http.dart' as srv; +import 'package:platform_websocket/io.dart' as ws; +import 'package:platform_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; +import 'common.dart'; + +void main() { + srv.Application app; + late srv.PlatformHttp http; + late ws.WebSockets client; + srv.AngelWebSocket websockets; + HttpServer? server; + String? url; + + setUp(() async { + app = srv.Application(reflector: const MirrorsReflector()); + http = srv.PlatformHttp(app, useZone: false); + + websockets = srv.AngelWebSocket(app) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); + await app.configure(GameController(websockets).configureServer); + app.logger = Logger('angel_auth')..onRecord.listen(print); + + server = await http.startServer(); + url = 'ws://${server!.address.address}:${server!.port}/ws'; + + client = ws.WebSockets(url); + await client.connect(timeout: Duration(seconds: 3)); + + print('Connected'); + + client + ..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client.close(); + await http.close(); + //app = null; + server = null; + url = null; + }); + + group('controller.io', () { + test('search', () async { + client.sendAction(ws.WebSocketAction(eventName: 'search')); + var search = await client.on['searched'].first; + print('Searched: ${search.data}'); + expect(Game.fromJson(search.data as Map), equals(johnVsBob)); + }); + }); +} diff --git a/packages/websocket/test/service/browser_test.dart b/packages/websocket/test/service/browser_test.dart new file mode 100644 index 0000000..9bd9f3e --- /dev/null +++ b/packages/websocket/test/service/browser_test.dart @@ -0,0 +1,5 @@ +import 'package:test/test.dart'; + +void main() { + group('service.browser', () {}); +} diff --git a/packages/websocket/test/service/common.dart b/packages/websocket/test/service/common.dart new file mode 100644 index 0000000..d97e168 --- /dev/null +++ b/packages/websocket/test/service/common.dart @@ -0,0 +1,34 @@ +import 'dart:async'; + +import 'package:platform_foundation/core.dart'; +import 'package:platform_websocket/base_websocket_client.dart'; +import 'package:platform_websocket/server.dart'; +import 'package:test/test.dart'; + +class Todo extends Model { + String? text; + String? when; + + Todo({this.text, this.when}); +} + +class TodoService extends MapService { + TodoService() : super() { + configuration['ws:filter'] = + (HookedServiceEvent e, WebSocketContext socket) { + print('Hello, service filter world!'); + return true; + }; + } +} + +dynamic testIndex(BaseWebSocketClient client) async { + var todoService = client.service('api/todos'); + scheduleMicrotask(() => todoService.index()); + + var indexed = await todoService.onIndexed.first; + print('indexed: $indexed'); + + expect(indexed, isList); + expect(indexed, isEmpty); +} diff --git a/packages/websocket/test/service/io_test.dart b/packages/websocket/test/service/io_test.dart new file mode 100644 index 0000000..95496e9 --- /dev/null +++ b/packages/websocket/test/service/io_test.dart @@ -0,0 +1,64 @@ +import 'dart:io'; +import 'package:platform_container/mirrors.dart'; +import 'package:platform_foundation/core.dart' as srv; +import 'package:platform_foundation/http.dart' as srv; +import 'package:platform_websocket/io.dart' as ws; +import 'package:platform_websocket/server.dart' as srv; +import 'package:logging/logging.dart'; +import 'package:test/test.dart'; +import 'common.dart'; + +void main() { + srv.Application app; + late srv.PlatformHttp http; + ws.WebSockets? client; + srv.AngelWebSocket websockets; + HttpServer? server; + String? url; + + setUp(() async { + app = srv.Application(reflector: MirrorsReflector()) + ..use('/api/todos', TodoService()); + http = srv.PlatformHttp(app, useZone: false); + + websockets = srv.AngelWebSocket(app) + ..onData.listen((data) { + print('Received by server: $data'); + }); + + await app.configure(websockets.configureServer); + app.all('/ws', websockets.handleRequest); + app.logger = Logger('angel_auth')..onRecord.listen(print); + server = await http.startServer(); + url = 'ws://${server!.address.address}:${server!.port}/ws'; + + client = ws.WebSockets(url); + await client!.connect(); + + client + ?..onData.listen((data) { + print('Received by client: $data'); + }) + ..onError.listen((error) { + // Auto-fail tests on errors ;) + stderr.writeln(error); + error.errors.forEach(stderr.writeln); + throw error; + }); + }); + + tearDown(() async { + await client!.close(); + await http.server!.close(force: true); + + //app = null; + client = null; + server = null; + url = null; + //exit(0); + }); + + group('service.io', () { + test('index', () => testIndex(client!)); + }); +} diff --git a/packages/websocket/web/index.html b/packages/websocket/web/index.html new file mode 100644 index 0000000..7728ad7 --- /dev/null +++ b/packages/websocket/web/index.html @@ -0,0 +1,9 @@ + + + + Client + + + + + \ No newline at end of file diff --git a/packages/websocket/web/main.dart b/packages/websocket/web/main.dart new file mode 100644 index 0000000..1d46ef7 --- /dev/null +++ b/packages/websocket/web/main.dart @@ -0,0 +1,13 @@ +import 'dart:html'; +import 'package:platform_websocket/browser.dart'; + +/// Dummy app to ensure client works with DDC. +void main() { + var app = WebSockets(window.location.origin); + window.alert(app.baseUrl.toString()); + + // ignore: body_might_complete_normally_catch_error + app.connect().catchError((_) { + window.alert('no websocket'); + }); +}