add: adding websocket package

This commit is contained in:
Patrick Stewart 2024-12-15 11:11:54 -07:00
parent 4a42ce4bed
commit b0fee2ca94
28 changed files with 2334 additions and 0 deletions

71
packages/websocket/.gitignore vendored Normal file
View file

@ -0,0 +1,71 @@
# See https://www.dartlang.org/tools/private-files.html
# Files and directories created by pub
.dart_tool
.packages
.pub/
build/
# If you're building an application, you may want to check-in your pubspec.lock
pubspec.lock
# Directory created by dartdoc
# If you don't generate documentation locally you can remove this line.
doc/api/
### Dart template
# See https://www.dartlang.org/tools/private-files.html
# Files and directories created by pub
# SDK 1.20 and later (no longer creates packages directories)
# Older SDK versions
# (Include if the minimum SDK version specified in pubsepc.yaml is earlier than 1.20)
.project
.buildlog
**/packages/
# Files created by dart2js
# (Most Dart developers will use pub build to compile Dart, use/modify these
# rules if you intend to use dart2js directly
# Convention is to use extension '.dart.js' for Dart compiled to Javascript to
# differentiate from explicit Javascript files)
*.dart.js
*.part.js
*.js.deps
*.js.map
*.info.json
# Directory created by dartdoc
# Don't commit pubspec lock file
# (Library packages only! Remove pattern if developing an application package)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
## VsCode
.vscode/
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
.idea/
/out/
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties

View file

@ -0,0 +1,12 @@
Primary Authors
===============
* __[Thomas Hii](dukefirehawk.apps@gmail.com)__
Thomas is the current maintainer of the code base. He has refactored and migrated the
code base to support NNBD.
* __[Tobe O](thosakwe@gmail.com)__
Tobe has written much of the original code prior to NNBD migration. He has moved on and
is no longer involved with the project.

View file

@ -0,0 +1,136 @@
# Change Log
## 8.2.0
* Require Dart >= 3.3
* Updated `lints` to 4.0.0
* Updated `web_socket_channel` to 3.0.0
## 8.1.1
* Updated repository link
## 8.1.0
* Updated `lints` to 3.0.0
* Fixed linter warnings
* Updated `web_socket_channel` to versions below 2.4.1 temporarily. Starting with 2.4.1, its dependency on `dart:html` has been changed to `package:web` which requires a code refactoring to resolve.
## 8.0.0
* Require Dart >= 3.0
## 7.0.0
* Require Dart >= 2.17
## 6.0.0
* Require Dart >= 2.16
## 5.0.0
* Skipped release
## 4.1.2
* Updated `package:angel3_container`
## 4.1.1
* Fixed issue with type casting
* Changed `app` parameter of `AngelWebSocket` to non-nullable
## 4.1.0
* Updated `package:belatuk_merge_map`
* Updated linter to `package:lints`
## 4.0.1
* Updated README
* Fixed authentication unit test
* Fixed NNBD issues
* All 3 unit tests passed
## 4.0.0
* Migrated to support Dart >= 2.12 NNBD
## 3.0.0
* Migrated to work with Dart >= 2.12 Non NNBD
## 2.0.3
* Remove `WebSocketController.plugin`.
* Remove any unawaited futures.
## 2.0.2
* Update `stream_channel` to `2.0.0`.
* Use `angel_framework^@2.0.0-rc.0`.
## 2.0.1
* Add `reconnectOnClose` and `reconnectinterval` parameters in top-level `WebSockets` constructors.
* Close `WebSocketExtraneousEventHandler`.
* Add onAuthenticated to server-side.
## 2.0.0
* Update to work with `client@2.0.0`.
## 2.0.0-alpha.8
* Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever).
## 2.0.0-alpha.7
* Replace `WebSocketSynchronizer` with `StreamChannel<WebSocketEvent>`.
## 2.0.0-alpha.6
* Explicit import of `import 'package:http/io_client.dart' as http;`
## 2.0.0-alpha.5
* Update `http` dependency.
## 2.0.0-alpha.4
* Remove `package:json_god`.
* Make `WebSocketContext` take any `StreamChannel`.
* Strong typing updates.
## 2.0.0-alpha.3
* Directly import Angel HTTP.
## 2.0.0-alpha.2
* Updated for the next version of `angel_client`.
## 2.0.0-alpha.1
* Refactorings for updated Angel 2 versions.
* Remove `package:dart2_constant`.
## 2.0.0-alpha
* Depend on Dart 2 and Angel 2.
## 1.1.2
* Dart 2 updates.
* Added `handleClient`, which is nice for external implementations
that plug into `AngelWebSocket`.
## 1.1.1
* Deprecated `unwrap`.
* Service streams now pump out `e.data`, rather than the actual event.
## 1.1.0+1
* Added `unwrap`.

View file

@ -0,0 +1,29 @@
BSD 3-Clause License
Copyright (c) 2021, dukefirehawk.com
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,157 @@
# Angel3 Websocket
![Pub Version (including pre-releases)](https://img.shields.io/pub/v/platform_websocket?include_prereleases)
[![Null Safety](https://img.shields.io/badge/null-safety-brightgreen)](https://dart.dev/null-safety)
[![Discord](https://img.shields.io/discord/1060322353214660698)](https://discord.gg/3X6bxTUdCM)
[![License](https://img.shields.io/github/license/dart-backend/angel)](https://github.com/dart-backend/angel/tree/master/packages/websocket/LICENSE)
WebSocket plugin for Angel3 framework. This plugin broadcasts events from hooked services via WebSockets. In addition, it adds itself to the app's IoC container as `AngelWebSocket`, so that it can be used in controllers as well.
WebSocket contexts are add to `req.properties` as `'socket'`.
## Usage
### Server-side
```dart
import "package:angel3_framework/angel3_framework.dart";
import "package:platform_websocket/server.dart";
void main() async {
var app = Angel();
var ws = AngelWebSocket();
// This is a plug-in. It hooks all your services,
// to automatically broadcast events.
await app.configure(ws.configureServer);
// Listen for requests at `/ws`.
app.all('/ws', ws.handleRequest);
}
```
Filtering events is easy with hooked services. Just return a `bool`, whether synchronously or asynchronously.
```dart
myService.properties['ws:filter'] = (HookedServiceEvent e, WebSocketContext socket) async {
return true;
}
myService.index({
'ws:filter': (e, socket) => ...;
});
```
#### Adding Handlers within a Controller
`WebSocketController` extends a normal `Controller`, but also listens to WebSockets.
```dart
import 'dart:async';
import "package:angel3_framework/angel3_framework.dart";
import "package:platform_websocket/server.dart";
@Expose("/")
class MyController extends WebSocketController {
// A reference to the WebSocket plug-in is required.
MyController(AngelWebSocket ws):super(ws);
@override
void onConnect(WebSocketContext socket) {
// On connect...
}
// Dependency injection works, too..
@ExposeWs("read_message")
void sendMessage(WebSocketContext socket, WebSocketAction action, Db db) async {
socket.send(
"found_message",
db.collection("messages").findOne(where.id(action.data['message_id'])));
}
// Event filtering
@ExposeWs("foo")
void foo() {
broadcast( WebSocketEvent(...), filter: (socket) async => ...);
}
}
```
### Client Use
This repo also provides two client libraries `browser` and `io` that extend the base `angel3_client` interface, and allow you to use a very similar API on the client to that of the server.
The provided clients also automatically try to reconnect their WebSockets when disconnected, which means you can restart your development server without having to reload browser windows.
They also provide streams of data that pump out filtered data as it comes in from the server.
Clients can even perform authentication over WebSockets.
#### In the Browser
```dart
import "package:platform_websocket/browser.dart";
void main() async {
Angel app = WebSockets("/ws");
await app.connect();
var Cars = app.service("api/cars");
Cars.onCreated.listen((car) => print("New car: $car"));
// Happens asynchronously
Cars.create({"brand": "Toyota"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
// Listen for arbitrary events
app.on['custom_event'].listen((event) {
// For example, this might be sent by a
// WebSocketController.
print('Hi!');
});
}
```
#### CLI Client
```dart
import "package:angel3_framework/common.dart";
import "package:platform_websocket/io.dart";
// You can include these in a shared file and access on both client and server
class Car extends Model {
int year;
String brand, make;
Car({this.year, this.brand, this.make});
@override String toString() => "$year $brand $make";
}
void main() async {
Angel app = WebSockets("/ws");
// Wait for WebSocket connection...
await app.connect();
var Cars = app.service("api/cars", type: Car);
Cars.onCreated.listen((Car car) {
// Automatically deserialized into a car :)
//
// I just bought a new 2016 Toyota Camry!
print("I just bought a new $car!");
});
// Happens asynchronously
Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
}
```

View file

@ -0,0 +1 @@
include: package:lints/recommended.yaml

View file

@ -0,0 +1,30 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Angel WS</title>
</head>
<body>
<script>
var url = location.protocol === "https:" ? "wss://" : "ws://";
url += location.hostname;
if (location.port) url += ":" + location.port;
url += "/ws";
var ws = new WebSocket(url);
window.ws = ws;
ws.onmessage = function(msg) {
console.info(JSON.parse(JSON.parse(msg.data).data));
};
window.sendWs = function(msg) {
var data = { type: "ping", data: msg };
ws.send(JSON.stringify(data));
};
console.info('Connected! Type sendWs("Hey!") to play around.');
</script>
</body>
</html>

View file

@ -0,0 +1,59 @@
import 'dart:io';
import 'package:platform_foundation/core.dart';
import 'package:platform_foundation/http.dart';
import 'package:platform_foundation/http2.dart';
import 'package:platform_websocket/server.dart';
import 'package:file/local.dart';
import 'package:logging/logging.dart';
void main(List<String> args) async {
var app = Application();
var http = PlatformHttp(app);
var ws = AngelWebSocket(app, sendErrors: !app.environment.isProduction);
var fs = const LocalFileSystem();
app.logger = Logger('platform_websocket');
// This is a plug-in. It hooks all your services,
// to automatically broadcast events.
await app.configure(ws.configureServer);
app.get('/', (req, res) => res.streamFile(fs.file('example/index.html')));
// Listen for requests at `/ws`.
app.get('/ws', ws.handleRequest);
app.fallback((req, res) => throw PlatformHttpException.notFound());
ws.onConnection.listen((socket) {
var h = socket.request.headers;
print('WebSocket onConnection $h');
socket.onData.listen((x) {
socket.send('pong', x);
});
});
if (args.contains('http2')) {
var ctx = SecurityContext()
..useCertificateChain('dev.pem')
..usePrivateKey('dev.key', password: 'dartdart');
try {
ctx.setAlpnProtocols(['h2'], true);
} catch (e, st) {
app.logger.severe(
'Cannot set ALPN protocol on server to `h2`. The server will only serve HTTP/1.x.',
e,
st,
);
}
var http2 = PlatformHttp2(app, ctx);
http2.onHttp1.listen(http.handleRequest);
await http2.startServer('127.0.0.1', 3000);
print('Listening at ${http2.uri}');
} else {
await http.startServer('127.0.0.1', 3000);
print('Listening at ${http.uri}');
}
}

View file

@ -0,0 +1,461 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:platform_client/platform_client.dart';
import 'package:platform_client/base_platform_client.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'platform_websocket.dart';
import 'constants.dart';
final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient {
Duration? _reconnectInterval;
WebSocketChannel? _socket;
final Queue<WebSocketAction> _queue = Queue<WebSocketAction>();
final StreamController _onData = StreamController();
final StreamController<WebSocketEvent> _onAllEvents =
StreamController<WebSocketEvent>();
final StreamController<AngelAuthResult> _onAuthenticated =
StreamController<AngelAuthResult>();
final StreamController<PlatformHttpException> _onError =
StreamController<PlatformHttpException>();
final StreamController<Map<String, WebSocketEvent>> _onServiceEvent =
StreamController<Map<String, WebSocketEvent>>.broadcast();
final StreamController<WebSocketChannelException>
_onWebSocketChannelException =
StreamController<WebSocketChannelException>();
/// Use this to handle events that are not standard.
final WebSocketExtraneousEventHandler on = WebSocketExtraneousEventHandler();
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired whenever a WebSocket is successfully authenticated.
@override
Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream;
/// A broadcast stream of data coming from the [socket].
///
/// Mostly just for internal use.
Stream get onData => _onData.stream;
/// Fired on errors.
Stream<PlatformHttpException> get onError => _onError.stream;
/// Fired whenever an event is fired by a service.
Stream<Map<String, WebSocketEvent>> get onServiceEvent =>
_onServiceEvent.stream;
/// Fired on [WebSocketChannelException]s.
Stream<WebSocketChannelException> get onWebSocketChannelException =>
_onWebSocketChannelException.stream;
/// The [WebSocketChannel] underneath this instance.
WebSocketChannel? get socket => _socket;
/// If `true` (default), then the client will automatically try to reconnect to the server
/// if the socket closes.
final bool reconnectOnClose;
/// The amount of time to wait between reconnect attempts. Default: 10 seconds.
Duration? get reconnectInterval => _reconnectInterval;
Uri? _wsUri;
/// The [Uri] to which a websocket should point.
Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl);
static Uri _toWsUri(Uri u) {
if (u.hasScheme) {
if (u.scheme == 'http') {
return u.replace(scheme: 'ws');
} else if (u.scheme == 'https') {
return u.replace(scheme: 'wss');
} else {
return u;
}
} else {
return _toWsUri(u.replace(scheme: Uri.base.scheme));
}
}
BaseWebSocketClient(super.client, super.baseUrl,
{this.reconnectOnClose = true, Duration? reconnectInterval}) {
_reconnectInterval = reconnectInterval ?? Duration(seconds: 10);
}
@override
Future close() async {
on._close();
scheduleMicrotask(() async {
await _socket!.sink.close(status.normalClosure);
await _onData.close();
await _onAllEvents.close();
await _onAuthenticated.close();
await _onError.close();
await _onServiceEvent.close();
await _onWebSocketChannelException.close();
});
}
/// Connects the WebSocket. [timeout] is optional.
Future<WebSocketChannel?> connect({Duration? timeout}) async {
if (timeout != null) {
var c = Completer<WebSocketChannel>();
late Timer timer;
timer = Timer(timeout, () {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(TimeoutException(
'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms',
timeout));
}
});
scheduleMicrotask(() {
getConnectedWebSocket().then((socket) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
while (_queue.isNotEmpty) {
var action = _queue.removeFirst();
socket.sink.add(serialize(action));
}
c.complete(socket);
}
}).catchError((e, StackTrace st) {
if (!c.isCompleted) {
if (timer.isActive) {
timer.cancel();
}
// TODO: Re-evaluate this error
var obj = 'Error';
c.completeError(obj, st);
}
});
});
return await c.future.then((socket) {
_socket = socket;
listen();
return _socket;
});
} else {
_socket = await getConnectedWebSocket();
listen();
return _socket;
}
}
/// Returns a new [WebSocketChannel], ready to be listened on.
///
/// This should be overriden by child classes, **NOT** [connect].
Future<WebSocketChannel> getConnectedWebSocket();
@override
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.toString().replaceAll(_straySlashes, '');
var wsService = WebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer);
return wsService as Service<Id, Data>;
}
/// Starts listening for data.
void listen() {
_socket?.stream.listen(
(data) {
_onData.add(data);
if (data is WebSocketChannelException) {
_onWebSocketChannelException.add(data);
} else if (data is String) {
var jsons = json.decode(data);
if (jsons is Map) {
var event = WebSocketEvent.fromJson(jsons);
if (event.eventName?.isNotEmpty == true) {
_onAllEvents.add(event);
on._getStream(event.eventName)!.add(event);
}
if (event.eventName == errorEvent) {
var error =
PlatformHttpException.fromMap((event.data ?? {}) as Map);
_onError.add(error);
} else if (event.eventName == authenticatedEvent) {
var authResult = AngelAuthResult.fromMap(event.data as Map?);
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName!
.split('::')
.where((str) => str.isNotEmpty)
.toList();
if (split.length >= 2) {
var serviceName = split[0], eventName = split[1];
_onServiceEvent
.add({serviceName: event..eventName = eventName});
}
}
}
}
},
cancelOnError: true,
onDone: () {
_socket = null;
if (reconnectOnClose == true) {
Timer.periodic(reconnectInterval!, (Timer timer) async {
WebSocketChannel? result;
try {
result = await connect(timeout: reconnectInterval);
} catch (e) {
//
}
if (result != null) timer.cancel();
});
}
});
}
/// Serializes data to JSON.
dynamic serialize(x) => json.encode(x);
/// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) {
if (_socket == null) {
_queue.addLast(action);
} else {
socket?.sink.add(serialize(action));
}
}
/// Attempts to authenticate a WebSocket, using a valid JWT.
void authenticateViaJwt(String? jwt) {
sendAction(WebSocketAction(
eventName: authenticateAction,
params: {
'query': {'jwt': jwt}
},
));
}
}
/// A [Service] that asynchronously interacts with the server.
class WebSocketsService<Id, Data> extends Service<Id, Data?> {
/// The [BaseWebSocketClient] that spawned this service.
@override
final BaseWebSocketClient app;
/// Used to deserialize JSON into typed data.
final AngelDeserializer<Data>? deserializer;
/// The [WebSocketChannel] to listen to, and send data across.
final WebSocketChannel? socket;
/// The service path to listen to.
final String path;
final StreamController<WebSocketEvent> _onAllEvents =
StreamController<WebSocketEvent>();
final StreamController<List<Data?>> _onIndexed = StreamController();
final StreamController<Data?> _onRead = StreamController<Data>();
final StreamController<Data?> _onCreated = StreamController<Data>();
final StreamController<Data?> _onModified = StreamController<Data>();
final StreamController<Data?> _onUpdated = StreamController<Data>();
final StreamController<Data?> _onRemoved = StreamController<Data>();
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired on `index` events.
@override
Stream<List<Data?>> get onIndexed => _onIndexed.stream;
/// Fired on `read` events.
@override
Stream<Data?> get onRead => _onRead.stream;
/// Fired on `created` events.
@override
Stream<Data?> get onCreated => _onCreated.stream;
/// Fired on `modified` events.
@override
Stream<Data?> get onModified => _onModified.stream;
/// Fired on `updated` events.
@override
Stream<Data?> get onUpdated => _onUpdated.stream;
/// Fired on `removed` events.
@override
Stream<Data?> get onRemoved => _onRemoved.stream;
WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) {
listen();
}
@override
Future close() async {
await _onAllEvents.close();
await _onCreated.close();
await _onIndexed.close();
await _onModified.close();
await _onRead.close();
await _onRemoved.close();
await _onUpdated.close();
}
/// Serializes an [action] to be sent over a WebSocket.
dynamic serialize(WebSocketAction action) => json.encode(action);
/// Deserializes data from a [WebSocketEvent].
Data? deserialize(x) {
return deserializer != null ? deserializer!(x) : x as Data?;
}
/// Deserializes the contents of an [event].
WebSocketEvent<Data> transformEvent(WebSocketEvent event) {
return WebSocketEvent(
eventName: event.eventName, data: deserialize(event.data));
}
/// Starts listening for events.
void listen() {
app.onServiceEvent.listen((map) {
if (map.containsKey(path)) {
var event = map[path]!;
_onAllEvents.add(event);
if (event.eventName == indexedEvent) {
var d = event.data;
var transformed = WebSocketEvent(
eventName: event.eventName,
data: d is Iterable ? d.map(deserialize).toList() : null);
if (transformed.data != null) {
_onIndexed.add(transformed.data!);
}
return;
}
var transformed = transformEvent(event).data;
switch (event.eventName) {
case readEvent:
_onRead.add(transformed);
break;
case createdEvent:
_onCreated.add(transformed);
break;
case modifiedEvent:
_onModified.add(transformed);
break;
case updatedEvent:
_onUpdated.add(transformed);
break;
case removedEvent:
_onRemoved.add(transformed);
break;
}
}
});
}
/// Sends the given [action] on the [socket].
void send(WebSocketAction action) {
app.sendAction(action);
}
@override
Future<List<Data>?> index([Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$indexAction', params: params ?? {}));
return null;
}
@override
Future<Data?> read(id, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$readAction',
id: id.toString(),
params: params ?? {}));
return null;
}
@override
Future<Data?> create(data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$createAction', data: data, params: params ?? {}));
return null;
}
@override
Future<Data?> modify(id, data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$modifyAction',
id: id.toString(),
data: data,
params: params ?? {}));
return null;
}
@override
Future<Data?> update(id, data, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$updateAction',
id: id.toString(),
data: data,
params: params ?? {}));
return null;
}
@override
Future<Data?> remove(id, [Map<String, dynamic>? params]) async {
app.sendAction(WebSocketAction(
eventName: '$path::$removeAction',
id: id.toString(),
params: params ?? {}));
return null;
}
}
/// Contains a dynamic Map of [WebSocketEvent] streams.
class WebSocketExtraneousEventHandler {
final Map<String?, StreamController<WebSocketEvent>> _events = {};
StreamController<WebSocketEvent>? _getStream(String? index) {
if (_events[index] == null) {
_events[index] = StreamController<WebSocketEvent>();
}
return _events[index];
}
Stream<WebSocketEvent> operator [](String index) {
if (_events[index] == null) {
_events[index] = StreamController<WebSocketEvent>();
}
return _events[index]!.stream;
}
void _close() {
for (var s in _events.values) {
s.close();
}
}
}

View file

@ -0,0 +1,111 @@
/// Browser WebSocket client library for the Angel framework.
library platform_websocket.browser;
import 'dart:async';
import 'dart:html';
import 'package:platform_client/platform_client.dart';
import 'package:http/browser_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/html.dart';
import 'base_websocket_client.dart';
export 'platform_websocket.dart';
final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<BrowserWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.BrowserClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token', String? errorMessage}) {
var ctrl = StreamController<String>();
var wnd = window.open(url, 'angel_client_auth_popup');
Timer t;
StreamSubscription<Event>? sub;
t = Timer.periodic(Duration(milliseconds: 500), (timer) {
if (!ctrl.isClosed) {
if (wnd.closed == true) {
ctrl.addError(PlatformHttpException.notAuthenticated(
message:
errorMessage ?? 'Authentication via popup window failed.'));
ctrl.close();
timer.cancel();
sub?.cancel();
}
} else {
timer.cancel();
}
});
sub = window.on[eventName].listen((e) {
if (!ctrl.isClosed) {
ctrl.add((e as CustomEvent).detail.toString());
t.cancel();
ctrl.close();
sub?.cancel();
}
});
return ctrl.stream;
}
@override
Future<WebSocketChannel> getConnectedWebSocket() {
var url = websocketUri;
if (authToken?.isNotEmpty == true) {
url = url.replace(
queryParameters: Map<String, String?>.from(url.queryParameters)
..['token'] = authToken);
}
var socket = WebSocket(url.toString());
var completer = Completer<WebSocketChannel>();
socket
..onOpen.listen((_) {
if (!completer.isCompleted) {
return completer.complete(HtmlWebSocketChannel(socket));
}
})
..onError.listen((e) {
if (!completer.isCompleted) {
return completer.completeError(e is ErrorEvent ? e.error! : e);
}
});
return completer.future;
}
@override
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.replaceAll(_straySlashes, '');
return BrowserWebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer) as Service<Id, Data>;
}
}
class BrowserWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type? type;
BrowserWebSocketsService(super.socket, WebSockets super.app, super.uri,
{this.type, super.deserializer});
}

View file

@ -0,0 +1,36 @@
const String authenticateAction = 'authenticate';
const String indexAction = 'index';
const String readAction = 'read';
const String createAction = 'create';
const String modifyAction = 'modify';
const String updateAction = 'update';
const String removeAction = 'remove';
const String authenticatedEvent = 'authenticated';
const String errorEvent = 'error';
const String indexedEvent = 'indexed';
const String readEvent = 'read';
const String createdEvent = 'created';
const String modifiedEvent = 'modified';
const String updatedEvent = 'updated';
const String removedEvent = 'removed';
/// The standard Angel service actions.
const List<String> actions = <String>[
indexAction,
readAction,
createAction,
modifyAction,
updateAction,
removeAction
];
/// The standard Angel service events.
const List<String> events = <String>[
indexedEvent,
readEvent,
createdEvent,
modifiedEvent,
updatedEvent,
removedEvent
];

View file

@ -0,0 +1,49 @@
/// Flutter-compatible WebSocket client library for the Angel framework.
library angel_websocket.flutter;
import 'dart:async';
import 'dart:io';
import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'base_websocket_client.dart';
export 'package:platform_client/platform_client.dart';
export 'platform_websocket.dart';
// final RegExp _straySlashes = RegExp(r"(^/)|(/+$)");
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<WebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.');
}
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'}
: {});
return IOWebSocketChannel(socket);
}
}

View file

@ -0,0 +1,29 @@
import 'package:platform_foundation/core.dart';
/// Prevents a WebSocket event from being broadcasted, to any client from the given [provider].
///
/// [provider] can be a String, a [Provider], or an Iterable.
/// If [provider] is `null`, any provider will be blocked.
HookedServiceEventListener doNotBroadcast([provider]) {
return (HookedServiceEvent e) {
if (e.params.containsKey('provider')) {
var eParam = e.params;
var deny = false;
var providers = provider is Iterable ? provider : [provider];
for (var p in providers) {
if (deny) break;
if (p is Providers) {
deny = deny || p == eParam['provider'] || eParam['provider'] == p.via;
} else if (p == null) {
deny = true;
} else {
deny = deny || (eParam['provider'] as Providers).via == p.toString();
}
}
eParam['broadcast'] = false;
}
};
}

View file

@ -0,0 +1,64 @@
/// Command-line WebSocket client library for the Angel framework.
library platform_websocket.io;
import 'dart:async';
import 'dart:io';
import 'package:platform_client/platform_client.dart';
import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'base_websocket_client.dart';
export 'package:platform_client/platform_client.dart';
export 'platform_websocket.dart';
final RegExp _straySlashes = RegExp(r'(^/)|(/+$)');
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<IoWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration? reconnectInterval})
: super(http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.');
}
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'}
: {});
return IOWebSocketChannel(socket);
}
@override
Service<Id, Data> service<Id, Data>(String path,
{Type? type, AngelDeserializer<Data>? deserializer}) {
var uri = path.replaceAll(_straySlashes, '');
return IoWebSocketsService<Id, Data>(socket, this, uri, type)
as Service<Id, Data>;
}
}
class IoWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type? type;
IoWebSocketsService(super.socket, WebSockets super.app, super.uri, this.type);
}

View file

@ -0,0 +1,45 @@
/// WebSocket plugin for Angel.
library platform_websocket;
/// A notification from the server that something has occurred.
class WebSocketEvent<Data> {
String? eventName;
Data? data;
WebSocketEvent({this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) => WebSocketEvent(
eventName: data['eventName'].toString(), data: data['data'] as Data?);
WebSocketEvent<T> cast<T>() {
if (T == Data) {
return this as WebSocketEvent<T>;
} else {
return WebSocketEvent<T>(eventName: eventName, data: data as T?);
}
}
Map<String, dynamic> toJson() {
return {'eventName': eventName, 'data': data};
}
}
/// A command sent to the server, usually corresponding to a service method.
class WebSocketAction {
String? id;
String? eventName;
dynamic data;
Map<String, dynamic> params;
WebSocketAction({this.id, this.eventName, this.data, this.params = const {}});
factory WebSocketAction.fromJson(Map data) => WebSocketAction(
id: data['id'].toString(),
eventName: data['eventName'].toString(),
data: data['data'],
params: data['params'] as Map<String, dynamic>? ?? {});
Map<String, dynamic> toJson() {
return {'id': id, 'eventName': eventName, 'data': data, 'params': params};
}
}

View file

@ -0,0 +1,521 @@
/// Server-side support for WebSockets.
library platform_websocket.server;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:mirrors';
import 'package:platform_auth/auth.dart';
import 'package:platform_foundation/core.dart';
import 'package:platform_foundation/http.dart';
import 'package:platform_foundation/http2.dart';
import 'package:platform_merge_map/merge_map.dart';
import 'package:logging/logging.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:collection/collection.dart' show IterableExtension;
import 'platform_websocket.dart';
import 'constants.dart';
export 'platform_websocket.dart';
part 'websocket_context.dart';
part 'websocket_controller.dart';
typedef WebSocketResponseSerializer = String Function(dynamic data);
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket {
final List<WebSocketContext> _clients = <WebSocketContext>[];
final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction =
StreamController<WebSocketAction>();
final StreamController _onData = StreamController();
final StreamController<WebSocketContext> _onConnection =
StreamController<WebSocketContext>.broadcast();
final StreamController<WebSocketContext> _onDisconnect =
StreamController<WebSocketContext>.broadcast();
final Application app;
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
final bool allowClientParams;
/// An optional whitelist of allowed client origins, or [:null:].
final List<String> allowedOrigins;
/// An optional whitelist of allowed client protocols, or [:null:].
final List<String> allowedProtocols;
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
/// Send error information across WebSockets, without including debug information..
final bool sendErrors;
/// A list of clients currently connected to this server via WebSockets.
List<WebSocketContext> get clients => List.unmodifiable(_clients);
/// Services that have already been hooked to fire socket events.
List<String> get servicesAlreadyWired =>
List.unmodifiable(_servicesAlreadyWired);
Logger get _log => app.logger;
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final StreamChannel<WebSocketEvent>? synchronizationChannel;
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired whenever a WebSocket sends data.
Stream get onData => _onData.stream;
/// Fired on incoming connections.
Stream<WebSocketContext> get onConnection => _onConnection.stream;
/// Fired when a user disconnects.
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
/// Serializes data to WebSockets.
WebSocketResponseSerializer? serializer;
/// Deserializes data from WebSockets.
Function? deserializer;
AngelWebSocket(this.app,
{this.sendErrors = false,
this.allowClientParams = false,
this.allowAuth = true,
this.synchronizationChannel,
this.serializer,
this.deserializer,
this.allowedOrigins = const [],
this.allowedProtocols = const []}) {
serializer ??= json.encode;
deserializer ??= (params) => params;
}
/*
* Deprecated. Original code that failed to compile after upgrading
*/
/*
HookedServiceEventListener serviceHookOriginal(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) {
return;
}
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
} else if (e.params != null && e.params.containsKey('ws:filter')) {
return e.params['ws:filter'](e, socket);
} else {
return true;
}
}
await batchEvent(event, filter: _filter);
};
}
FutureOr<dynamic> Function(HookedServiceEvent<dynamic, dynamic, Service> e)
serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) {
return;
}
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
} else if (e.params != null && e.params.containsKey('ws:filter')) {
return e.params['ws:filter'](e, socket);
} else {
return true;
}
}
await batchEvent(event, filter: _filter);
};
}
*/
FutureOr<dynamic> Function(HookedServiceEvent<dynamic, dynamic, Service> e)
serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params['broadcast'] == false) return;
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
dynamic filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
} else if (e.params.containsKey('ws:filter')) {
return e.params['ws:filter'](e, socket);
} else {
return true;
}
}
await batchEvent(event, filter: filter);
};
}
/// Slates an event to be dispatched.
Future<void> batchEvent(WebSocketEvent event,
{Function(WebSocketContext socket)? filter, bool notify = true}) async {
// Default implementation will just immediately fire events
for (var client in _clients) {
dynamic result = true;
if (filter != null) {
result = await filter(client);
}
if (result == true) {
client.channel.sink.add((serializer ?? json.encode)(event.toJson()));
}
}
if (synchronizationChannel != null && notify != false) {
synchronizationChannel!.sink.add(event);
}
}
/// Returns a list of events yet to be sent.
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName!.split('::');
if (split.length < 2) {
socket.sendError(PlatformHttpException.badRequest());
return null;
}
var service = app.findService(split[0]);
if (service == null) {
socket.sendError(PlatformHttpException.notFound(
message: 'No service "${split[0]}" exists.'));
return null;
}
var actionName = split[1];
//if (action.params is! Map) action.params = <String, dynamic>{};
if (allowClientParams != true) {
if (action.params['query'] is Map) {
action.params = {'query': action.params['query']};
} else {
action.params = {};
}
}
var params = mergeMap<String, dynamic>([
(((deserializer ?? (params) => params)(action.params))
as Map<String, dynamic>),
{
'provider': Providers.websocket,
'__requestctx': socket.request,
'__responsectx': socket.response
}
]);
try {
if (actionName == indexAction) {
socket.send('${split[0]}::$indexedEvent', await service.index(params));
return null;
} else if (actionName == readAction) {
socket.send(
'${split[0]}::$readEvent', await service.read(action.id, params));
return null;
} else if (actionName == createAction) {
return WebSocketEvent(
eventName: '${split[0]}::$createdEvent',
data: await service.create(action.data, params));
} else if (actionName == modifyAction) {
return WebSocketEvent(
eventName: '${split[0]}::$modifiedEvent',
data: await service.modify(action.id, action.data, params));
} else if (actionName == updateAction) {
return WebSocketEvent(
eventName: '${split[0]}::$updatedEvent',
data: await service.update(action.id, action.data, params));
} else if (actionName == removeAction) {
return WebSocketEvent(
eventName: '${split[0]}::$removedEvent',
data: await service.remove(action.id, params));
} else {
socket.sendError(PlatformHttpException.methodNotAllowed(
message: 'Method Not Allowed: $actionName'));
return null;
}
} catch (e, st) {
_log.severe('Unable to handle unknown action');
catchError(e, st, socket);
}
}
/// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
action.eventName == authenticateAction &&
action.params['query'] is Map &&
action.params['query']['jwt'] is String) {
try {
var auth = socket.request.container!.make<PlatformAuth>();
var jwt = action.params['query']['jwt'] as String;
AuthToken token;
token = AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
socket.request
..container!.registerSingleton<AuthToken>(token)
..container!.registerSingleton(user, as: user.runtimeType);
socket._onAuthenticated.add(null);
socket.send(authenticatedEvent,
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
_log.severe('Authentication failed');
catchError(e, st, socket);
}
} else {
socket.sendError(PlatformHttpException.badRequest(
message: 'No JWT provided for authentication.'));
}
}
/// Hooks a service up to have its events broadcasted.
dynamic hookupService(Pattern path, HookedService service) {
var localPath = path.toString();
service.after(
[
HookedServiceEvent.created,
HookedServiceEvent.modified,
HookedServiceEvent.updated,
HookedServiceEvent.removed
],
serviceHook(localPath),
);
_servicesAlreadyWired.add(localPath);
}
/// Runs before firing [onConnection].
Future handleConnect(WebSocketContext socket) async {}
/// Handles incoming data from a WebSocket.
dynamic handleData(WebSocketContext socket, data) async {
try {
socket._onData.add(data);
var fromJson = json.decode(data.toString());
var action = WebSocketAction.fromJson(fromJson as Map);
_onAction.add(action);
if (action.eventName == null ||
action.eventName is! String ||
action.eventName!.isEmpty) {
throw PlatformHttpException.badRequest();
}
if (fromJson.containsKey('eventName')) {
socket._onAction.add(WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson['eventName'].toString())!
.add(fromJson['data'] as Map?);
}
if (action.eventName == authenticateAction) {
await handleAuth(action, socket);
}
if (action.eventName!.contains('::')) {
var split = action.eventName!.split('::');
if (split.length >= 2) {
if (actions.contains(split[1])) {
var event = await handleAction(action, socket);
if (event is Future) event = await event;
}
}
}
} catch (e, st) {
_log.severe('Invalid data');
catchError(e, st, socket);
}
}
void catchError(e, StackTrace st, WebSocketContext socket) {
// Send an error
if (e is PlatformHttpException) {
socket.sendError(e);
app.logger.severe(e.message, e.error ?? e, e.stackTrace);
} else if (sendErrors) {
var err = PlatformHttpException(
message: e.toString(), stackTrace: st, errors: [st.toString()]);
socket.sendError(err);
app.logger.severe(err.message, e, st);
} else {
var err = PlatformHttpException();
socket.sendError(err);
app.logger.severe(e.toString(), e, st);
}
}
/// Transforms a [HookedServiceEvent], so that it can be broadcasted.
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
return WebSocketEvent(eventName: event.eventName, data: event.result);
}
/// Hooks any [HookedService]s that are not being broadcasted yet.
void wireAllServices(Application app) {
for (var key in app.services.keys.where((x) {
return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
hookupService(key, app.services[key] as HookedService);
}
}
/// Configures an [Application] instance to listen for WebSocket connections.
Future configureServer(Application app) async {
app.container.registerSingleton(this);
if (runtimeType != AngelWebSocket) {
app.container.registerSingleton<AngelWebSocket>(this);
}
// Set up services
wireAllServices(app);
app.onService.listen((_) {
wireAllServices(app);
});
if (synchronizationChannel != null) {
synchronizationChannel?.stream
.listen((e) => batchEvent(e, notify: false));
}
app.shutdownHooks.add((_) => synchronizationChannel?.sink.close());
}
/// Handles an incoming [WebSocketContext].
Future<void> handleClient(WebSocketContext socket) async {
var origin = socket.request.headers?.value('origin');
if (allowedOrigins.isNotEmpty && !allowedOrigins.contains(origin)) {
throw PlatformHttpException.forbidden(
message:
'WebSocket connections are not allowed from the origin "$origin".');
}
_clients.add(socket);
await handleConnect(socket);
_onConnection.add(socket);
socket.request.container?.registerSingleton<WebSocketContext>(socket);
socket.channel.stream.listen(
(data) {
_onData.add(data);
handleData(socket, data);
},
onDone: () {
_onDisconnect.add(socket);
_clients.remove(socket);
},
onError: (e) {
_onDisconnect.add(socket);
_clients.remove(socket);
},
cancelOnError: true,
);
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (req is HttpRequestContext && res is HttpResponseContext) {
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest!)) {
throw PlatformHttpException.badRequest();
}
res.detach();
var ws = await WebSocketTransformer.upgrade(req.rawRequest!);
var channel = IOWebSocketChannel(ws);
var socket = WebSocketContext(channel, req, res);
scheduleMicrotask(() => handleClient(socket));
return false;
} else if (req is Http2RequestContext && res is Http2ResponseContext) {
var connection =
req.headers?['connection']?.map((s) => s.toLowerCase().trim());
var upgrade = req.headers?.value('upgrade')?.toLowerCase();
var version = req.headers?.value('sec-websocket-version');
var key = req.headers?.value('sec-websocket-key');
var protocol = req.headers?.value('sec-websocket-protocol');
if (connection == null) {
throw PlatformHttpException.badRequest(
message: 'Missing `connection` header.');
} else if (!connection.contains('upgrade')) {
throw PlatformHttpException.badRequest(
message: 'Missing "upgrade" in `connection` header.');
} else if (upgrade != 'websocket') {
throw PlatformHttpException.badRequest(
message: 'The `upgrade` header must equal "websocket".');
} else if (version != '13') {
throw PlatformHttpException.badRequest(
message: 'The `sec-websocket-version` header must equal "13".');
} else if (key == null) {
throw PlatformHttpException.badRequest(
message: 'Missing `sec-websocket-key` header.');
} else if (protocol != null &&
allowedProtocols.isNotEmpty &&
!allowedProtocols.contains(protocol)) {
throw PlatformHttpException.badRequest(
message: 'Disallowed `sec-websocket-protocol` header "$protocol".');
} else {
var stream = res.detach();
var ctrl = StreamChannelController<List<int>>();
ctrl.local.stream.listen((buf) {
stream.sendData(buf);
}, onDone: () {
stream.outgoingMessages.close();
});
if (req.hasParsedBody) {
await ctrl.local.sink.close();
} else {
await req.body.pipe(ctrl.local.sink);
}
var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink);
sink.add('HTTP/1.1 101 Switching Protocols\r\n'
'Upgrade: websocket\r\n'
'Connection: Upgrade\r\n'
'Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n');
if (protocol != null) sink.add('Sec-WebSocket-Protocol: $protocol\r\n');
sink.add('\r\n');
//var ws = IOWebSocketChannel.connect(ctrl.foreign);
var socket = WebSocketContext(ctrl.foreign, req, res);
scheduleMicrotask(() => handleClient(socket));
return false;
}
} else {
throw ArgumentError(
'Not an HTTP/1.1 or HTTP/2 RequestContext+ResponseContext pair: $req, $res');
}
}
}

View file

@ -0,0 +1,75 @@
part of 'server.dart';
/// Represents a WebSocket session, with the original
/// [RequestContext] and [ResponseContext] attached.
class WebSocketContext {
/// Use this to listen for events.
WebSocketEventTable on = WebSocketEventTable();
/// The underlying [StreamChannel].
final StreamChannel channel;
/// The original [RequestContext].
final RequestContext request;
/// The original [ResponseContext].
final ResponseContext response;
final StreamController<WebSocketAction> _onAction =
StreamController<WebSocketAction>();
final StreamController<void> _onAuthenticated = StreamController();
final StreamController<void> _onClose = StreamController<void>();
final StreamController _onData = StreamController();
/// Fired on any [WebSocketAction];
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired when the user authenticates.
Stream<void> get onAuthenticated => _onAuthenticated.stream;
/// Fired once the underlying [WebSocket] closes.
Stream<void> get onClose => _onClose.stream;
/// Fired when any data is sent through [channel].
Stream get onData => _onData.stream;
WebSocketContext(this.channel, this.request, this.response);
/// Closes the underlying [StreamChannel].
Future close() async {
scheduleMicrotask(() async {
await channel.sink.close();
await _onAction.close();
await _onAuthenticated.close();
await _onData.close();
//_onClose.add(null);
await _onClose.close();
});
}
/// Sends an arbitrary [WebSocketEvent];
void send(String eventName, data) {
channel.sink.add(
json.encode(WebSocketEvent(eventName: eventName, data: data).toJson()));
}
/// Sends an error event.
void sendError(PlatformHttpException error) =>
send(errorEvent, error.toJson());
}
class WebSocketEventTable {
final Map<String, StreamController<Map?>> _handlers = {};
StreamController<Map?>? _getStreamForEvent(String eventName) {
if (!_handlers.containsKey(eventName)) {
_handlers[eventName] = StreamController<Map?>();
}
return _handlers[eventName];
}
Stream<Map?> operator [](String key) => _getStreamForEvent(key)!.stream;
}

View file

@ -0,0 +1,90 @@
part of 'server.dart';
/// Marks a method as available to WebSockets.
class ExposeWs {
final String eventName;
const ExposeWs(this.eventName);
}
/// A special controller that also supports WebSockets.
class WebSocketController extends Controller {
/// The plug-in instance powering this controller.
final AngelWebSocket ws;
final Map<String, MethodMirror> _handlers = {};
final Map<String, Symbol> _handlerSymbols = {};
WebSocketController(this.ws) : super();
/// Sends an event to all clients.
void broadcast(String eventName, data,
{Function(WebSocketContext socket)? filter}) {
ws.batchEvent(WebSocketEvent(eventName: eventName, data: data),
filter: filter);
}
/// Fired on new connections.
dynamic onConnect(WebSocketContext socket) {}
/// Fired on disconnections.
dynamic onDisconnect(WebSocketContext socket) {}
/// Fired on all incoming actions.
dynamic onAction(WebSocketAction action, WebSocketContext socket) async {}
/// Fired on arbitrary incoming data.
dynamic onData(data, WebSocketContext socket) {}
@override
Future configureServer(Application app) async {
if (findExpose(app.container.reflector) != null) {
await super.configureServer(app);
}
var instanceMirror = reflect(this);
var classMirror = reflectClass(runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) {
var exposeMirror = mirror.metadata
.firstWhereOrNull((mirror) => mirror.reflectee is ExposeWs);
if (exposeMirror != null) {
var exposeWs = exposeMirror.reflectee as ExposeWs;
_handlers[exposeWs.eventName] = mirror;
_handlerSymbols[exposeWs.eventName] = sym;
}
}
});
ws.onConnection.listen((socket) async {
if (!socket.request.container!.has<WebSocketContext>()) {
socket.request.container!.registerSingleton<WebSocketContext>(socket);
}
await onConnect(socket);
socket.onData.listen((data) => onData(data, socket));
socket.onAction.listen((WebSocketAction action) async {
var container = socket.request.container!.createChild();
container.registerSingleton<WebSocketAction>(action);
try {
await onAction(action, socket);
if (_handlers.containsKey(action.eventName)) {
var methodMirror = _handlers[action.eventName!]!;
var fn = instanceMirror.getField(methodMirror.simpleName).reflectee;
return app.runContained(
fn as Function, socket.request, socket.response, container);
}
} catch (e, st) {
ws.catchError(e, st, socket);
}
});
});
ws.onDisconnection.listen(onDisconnect);
}
}

View file

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/test" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/.dart_tool" />
<excludeFolder url="file://$MODULE_DIR$/.pub" />
<excludeFolder url="file://$MODULE_DIR$/build" />
</content>
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Dart SDK" level="project" />
<orderEntry type="library" name="Dart Packages" level="project" />
</component>
</module>

View file

@ -0,0 +1,43 @@
name: platform_websocket
version: 8.2.0
description: This library provides WebSockets support for Angel3 framework.
homepage: https://angel3-framework.web.app/
repository: https://github.com/dart-backend/angel/tree/master/packages/websocket
environment:
sdk: '>=3.3.0 <4.0.0'
dependencies:
platform_auth: ^8.0.0
platform_client: ^8.0.0
platform_foundation: ^8.0.0
platform_support: ^8.0.0
platform_merge_map: ^5.1.0
http: ^1.0.0
meta: ^1.8.0
stream_channel: ^2.1.0
web_socket_channel: ^3.0.0
collection: ^1.17.0
logging: ^1.1.0
dev_dependencies:
platform_container: ^8.0.0
platform_model: ^8.0.0
quiver: ^3.2.0
test: ^1.25.0
lints: ^4.0.0
file: ^7.0.0
# dependency_overrides:
# angel3_container:
# path: ../container/angel_container
# angel3_framework:
# path: ../framework
# angel3_http_exception:
# path: ../http_exception
# angel3_model:
# path: ../model
# angel3_route:
# path: ../route
# angel3_mock_request:
# path: ../mock_request
# angel3_auth:
# path: ../auth
# angel3_client:
# path: ../client

View file

@ -0,0 +1,66 @@
import 'package:platform_auth/auth.dart';
import 'package:platform_client/io.dart' as c;
import 'package:platform_foundation/core.dart';
import 'package:platform_foundation/http.dart';
import 'package:platform_websocket/io.dart' as c;
import 'package:platform_websocket/server.dart';
import 'package:logging/logging.dart';
import 'package:test/test.dart';
const Map<String, String> user = {'username': 'foo', 'password': 'bar'};
void main() {
Application app;
late PlatformHttp http;
late c.Angel client;
late c.WebSockets ws;
setUp(() async {
app = Application();
http = PlatformHttp(app, useZone: false);
var auth = PlatformAuth(
serializer: (_) async => 'baz', deserializer: (_) async => user);
auth.strategies['local'] = LocalAuthStrategy(
(username, password) async {
if (username == 'foo' && password == 'bar') {
return user;
}
return {};
},
);
app.post('/auth/local', auth.authenticate('local'));
await app.configure(auth.configureServer);
var sock = AngelWebSocket(app);
await app.configure(sock.configureServer);
app.all('/ws', sock.handleRequest);
app.logger = Logger('angel_auth')..onRecord.listen(print);
var server = await http.startServer();
client = c.Rest('http://${server.address.address}:${server.port}');
ws = c.WebSockets('ws://${server.address.address}:${server.port}/ws');
await ws.connect();
});
tearDown(() {
http.close();
client.close();
ws.close();
});
test('auth event fires', () async {
var localAuth = await client.authenticate(type: 'local', credentials: user);
print('JWT: ${localAuth.token}');
ws.authenticateViaJwt(localAuth.token);
var auth = await ws.onAuthenticated.first;
expect(auth.token, localAuth.token);
});
}

View file

@ -0,0 +1,39 @@
import 'package:platform_foundation/core.dart';
import 'package:platform_websocket/server.dart';
import 'package:quiver/core.dart';
class Game {
final String? playerOne, playerTwo;
const Game({this.playerOne, this.playerTwo});
factory Game.fromJson(Map data) => Game(
playerOne: data['playerOne'].toString(),
playerTwo: data['playerTwo'].toString());
Map<String, dynamic> toJson() {
return {'playerOne': playerOne, 'playerTwo': playerTwo};
}
@override
bool operator ==(other) =>
other is Game &&
other.playerOne == playerOne &&
other.playerTwo == playerTwo;
@override
int get hashCode => hash2(playerOne, playerTwo);
}
const Game johnVsBob = Game(playerOne: 'John', playerTwo: 'Bob');
@Expose('/game')
class GameController extends WebSocketController {
GameController(super.ws);
@ExposeWs('search')
dynamic search(WebSocketContext socket) async {
print('User is searching for a game...');
socket.send('searched', johnVsBob);
}
}

View file

@ -0,0 +1,69 @@
import 'dart:io';
import 'package:platform_container/mirrors.dart';
import 'package:platform_foundation/core.dart' as srv;
import 'package:platform_foundation/http.dart' as srv;
import 'package:platform_websocket/io.dart' as ws;
import 'package:platform_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
void main() {
srv.Application app;
late srv.PlatformHttp http;
late ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer? server;
String? url;
setUp(() async {
app = srv.Application(reflector: const MirrorsReflector());
http = srv.PlatformHttp(app, useZone: false);
websockets = srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
await app.configure(GameController(websockets).configureServer);
app.logger = Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server!.address.address}:${server!.port}/ws';
client = ws.WebSockets(url);
await client.connect(timeout: Duration(seconds: 3));
print('Connected');
client
..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client.close();
await http.close();
//app = null;
server = null;
url = null;
});
group('controller.io', () {
test('search', () async {
client.sendAction(ws.WebSocketAction(eventName: 'search'));
var search = await client.on['searched'].first;
print('Searched: ${search.data}');
expect(Game.fromJson(search.data as Map), equals(johnVsBob));
});
});
}

View file

@ -0,0 +1,5 @@
import 'package:test/test.dart';
void main() {
group('service.browser', () {});
}

View file

@ -0,0 +1,34 @@
import 'dart:async';
import 'package:platform_foundation/core.dart';
import 'package:platform_websocket/base_websocket_client.dart';
import 'package:platform_websocket/server.dart';
import 'package:test/test.dart';
class Todo extends Model {
String? text;
String? when;
Todo({this.text, this.when});
}
class TodoService extends MapService {
TodoService() : super() {
configuration['ws:filter'] =
(HookedServiceEvent e, WebSocketContext socket) {
print('Hello, service filter world!');
return true;
};
}
}
dynamic testIndex(BaseWebSocketClient client) async {
var todoService = client.service('api/todos');
scheduleMicrotask(() => todoService.index());
var indexed = await todoService.onIndexed.first;
print('indexed: $indexed');
expect(indexed, isList);
expect(indexed, isEmpty);
}

View file

@ -0,0 +1,64 @@
import 'dart:io';
import 'package:platform_container/mirrors.dart';
import 'package:platform_foundation/core.dart' as srv;
import 'package:platform_foundation/http.dart' as srv;
import 'package:platform_websocket/io.dart' as ws;
import 'package:platform_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
void main() {
srv.Application app;
late srv.PlatformHttp http;
ws.WebSockets? client;
srv.AngelWebSocket websockets;
HttpServer? server;
String? url;
setUp(() async {
app = srv.Application(reflector: MirrorsReflector())
..use('/api/todos', TodoService());
http = srv.PlatformHttp(app, useZone: false);
websockets = srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
app.logger = Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server!.address.address}:${server!.port}/ws';
client = ws.WebSockets(url);
await client!.connect();
client
?..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client!.close();
await http.server!.close(force: true);
//app = null;
client = null;
server = null;
url = null;
//exit(0);
});
group('service.io', () {
test('index', () => testIndex(client!));
});
}

View file

@ -0,0 +1,9 @@
<!DOCTYPE html>
<html>
<head>
<title>Client</title>
</head>
<body>
<script src="main.dart.js"></script>
</body>
</html>

View file

@ -0,0 +1,13 @@
import 'dart:html';
import 'package:platform_websocket/browser.dart';
/// Dummy app to ensure client works with DDC.
void main() {
var app = WebSockets(window.location.origin);
window.alert(app.baseUrl.toString());
// ignore: body_might_complete_normally_catch_error
app.connect().catchError((_) {
window.alert('no websocket');
});
}