Add 'packages/websocket/' from commit '229b5e72058a644292d0f258bd0f7f06613b7b24'

git-subtree-dir: packages/websocket
git-subtree-mainline: e887b1d21f
git-subtree-split: 229b5e7205
This commit is contained in:
Tobe O 2020-02-15 18:28:28 -05:00
commit c97363d290
29 changed files with 2260 additions and 0 deletions

76
packages/websocket/.gitignore vendored Normal file
View file

@ -0,0 +1,76 @@
# See https://www.dartlang.org/tools/private-files.html
# Files and directories created by pub
.buildlog
.packages
.project
.pub/
build/
**/packages/
# Files created by dart2js
# (Most Dart developers will use pub build to compile Dart, use/modify these
# rules if you intend to use dart2js directly
# Convention is to use extension '.dart.js' for Dart compiled to Javascript to
# differentiate from explicit Javascript files)
*.dart.js
*.part.js
*.js.deps
*.js.map
*.info.json
# Directory created by dartdoc
doc/api/
# Don't commit pubspec lock file
# (Library packages only! Remove pattern if developing an application package)
pubspec.lock
.idea
log.txt
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
.dart_tool

View file

@ -0,0 +1,4 @@
language: dart
dart:
- dev
- stable

View file

@ -0,0 +1,57 @@
# 2.0.3
* Remove `WebSocketController.plugin`.
* Remove any unawaited futures.
# 2.0.2
* Update `stream_channel` to `2.0.0`.
* Use `angel_framework^@2.0.0-rc.0`.
# 2.0.1
* Add `reconnectOnClose` and `reconnectinterval` parameters in top-level `WebSockets` constructors.
* Close `WebSocketExtraneousEventHandler`.
* Add onAuthenticated to server-side.
# 2.0.0
* Update to work with `client@2.0.0`.
# 2.0.0-alpha.8
* Support for WebSockets over HTTP/2 (though in practice this doesn't often happen, if ever).
# 2.0.0-alpha.7
* Replace `WebSocketSynchronizer` with `StreamChannel<WebSocketEvent>`.
# 2.0.0-alpha.6
* Explicit import of `import 'package:http/io_client.dart' as http;`
# 2.0.0-alpha.5
* Update `http` dependency.
# 2.0.0-alpha.4
* Remove `package:json_god`.
* Make `WebSocketContext` take any `StreamChannel`.
* Strong typing updates.
# 2.0.0-alpha.3
* Directly import Angel HTTP.
# 2.0.0-alpha.2
* Updated for the next version of `angel_client`.
# 2.0.0-alpha.1
* Refactorings for updated Angel 2 versions.
* Remove `package:dart2_constant`.
# 2.0.0-alpha
* Depend on Dart 2 and Angel 2.
# 1.1.2
* Dart 2 updates.
* Added `handleClient`, which is nice for external implementations
that plug into `AngelWebSocket`.
# 1.1.1
* Deprecated `unwrap`.
* Service streams now pump out `e.data`, rather than the actual event.
# 1.1.0+1
* Added `unwrap`.

View file

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2016 angel-dart
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,163 @@
# angel_websocket
[![Pub](https://img.shields.io/pub/v/angel_websocket.svg)](https://pub.dartlang.org/packages/angel_websocket)
[![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket)
WebSocket plugin for Angel.
This plugin broadcasts events from hooked services via WebSockets.
In addition, it adds itself to the app's IoC container as `AngelWebSocket`, so that it can be used
in controllers as well.
WebSocket contexts are add to `req.properties` as `'socket'`.
# Usage
**Server-side**
```dart
import "package:angel_framework/angel_framework.dart";
import "package:angel_websocket/server.dart";
main() async {
var app = new Angel();
var ws = new AngelWebSocket();
// This is a plug-in. It hooks all your services,
// to automatically broadcast events.
await app.configure(ws.configureServer);
// Listen for requests at `/ws`.
app.all('/ws', ws.handleRequest);
}
```
Filtering events is easy with hooked services. Just return a `bool`, whether
synchronously or asynchronously.
```dart
myService.properties['ws:filter'] = (HookedServiceEvent e, WebSocketContext socket) async {
return true;
}
myService.index({
'ws:filter': (e, socket) => ...;
});
```
**Adding Handlers within a Controller**
`WebSocketController` extends a normal `Controller`, but also listens to WebSockets.
```dart
import 'dart:async';
import "package:angel_framework/angel_framework.dart";
import "package:angel_websocket/server.dart";
@Expose("/")
class MyController extends WebSocketController {
// A reference to the WebSocket plug-in is required.
MyController(AngelWebSocket ws):super(ws);
@override
void onConnect(WebSocketContext socket) {
// On connect...
}
// Dependency injection works, too..
@ExposeWs("read_message")
void sendMessage(WebSocketContext socket, WebSocketAction action, Db db) async {
socket.send(
"found_message",
db.collection("messages").findOne(where.id(action.data['message_id'])));
}
// Event filtering
@ExposeWs("foo")
void foo() {
broadcast(new WebSocketEvent(...), filter: (socket) async => ...);
}
}
```
**Client Use**
This repo also provides two client libraries `browser` and `io` that extend the base
`angel_client` interface, and allow you to use a very similar API on the client to that of
the server.
The provided clients also automatically try to reconnect their WebSockets when disconnected,
which means you can restart your development server without having to reload browser windows.
They also provide streams of data that pump out filtered data as it comes in from the server.
Clients can even perform authentication over WebSockets.
**In the Browser**
```dart
import "package:angel_websocket/browser.dart";
main() async {
Angel app = new WebSockets("/ws");
await app.connect();
var Cars = app.service("api/cars");
Cars.onCreated.listen((car) => print("New car: $car"));
// Happens asynchronously
Cars.create({"brand": "Toyota"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
// Listen for arbitrary events
app.on['custom_event'].listen((event) {
// For example, this might be sent by a
// WebSocketController.
print('Hi!');
});
}
```
**CLI Client**
```dart
import "package:angel_framework/common.dart";
import "package:angel_websocket/io.dart";
// You can include these in a shared file and access on both client and server
class Car extends Model {
int year;
String brand, make;
Car({this.year, this.brand, this.make});
@override String toString() => "$year $brand $make";
}
main() async {
Angel app = new WebSockets("/ws");
// Wait for WebSocket connection...
await app.connect();
var Cars = app.service("api/cars", type: Car);
Cars.onCreated.listen((Car car) {
// Automatically deserialized into a car :)
//
// I just bought a new 2016 Toyota Camry!
print("I just bought a new $car!");
});
// Happens asynchronously
Cars.create({"year": 2016, "brand": "Toyota", "make": "Camry"});
// Authenticate a WebSocket, if you were not already authenticated...
app.authenticateViaJwt('<some-jwt>');
}

View file

@ -0,0 +1,4 @@
include: package:pedantic/analysis_options.yaml
analyzer:
strong-mode:
implicit-casts: false

View file

@ -0,0 +1,29 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE5DAcBgoqhkiG9w0BDAEBMA4ECL7L6rj6uEHGAgIIAASCBMLbucyfqAkgCbhP
xNSHYllPMAv/dsIjtnsBwepCXPGkCBCuOAw/2FaCHjN9hBqL5V7fkrKeaemhm2YE
ycPtlHJYPDf3kEkyMjdZ9rIY6kePGfQizs2uJPcXj4YPyQ4HsfVXpOicKfQrouf5
Mze9bGzeMN065q3iP4dYUMwHAyZYteXCsanQNHlqvsWli0W+H8St8fdsXefZhnv1
qVatKWdNdWQ9t5MuljgNU2Vv56sHKEYXI0yLxk2QUMk8KlJfnmt8foYUsnPUXHmc
gIjLKwwVkpdololnEHSNu0cEOUPowjgJru+uMpn7vdNl7TPEQ9jbEgdNg4JwoYzU
0nao8WzjaSp7kzvZz0VFwKnk5AjstGvvuAWckADdq23QElbn/mF7AG1m/TBpYxzF
gTt37UdndS/AcvVznWVVrRP5iTSIawdIwvqI4s7rqsoE0GCcak+RhchgAz2gWKkS
oODUo0JL6pPVbJ3l4ebbaO6c99nDVc8dViPtc1EkStJEJ2O4kI4xgLSCr4Y9ahKn
oAaoSkX7Xxq3aQm+BzqSpLjdGL8atsqR/YVOIHYIl3gThvP0NfZGx1xHyvO5mCdZ
kHxSA7tKWxauZ3eQ2clbnzeRsl4El0WMHy/5K1ovene4v7sunmoXVtghBC8hK6eh
zMO9orex2PNQ/VQC7HCvtytunOVx1lkSBoNo7hR70igg6rW9H7UyoAoBOwMpT1xa
J6V62nqruTKOqFNfur7aHJGpHGtDb5/ickHeYCyPTvmGp67u4wChzKReeg02oECe
d1E5FKAcIa8s9TVOB6Z+HvTRNQZu2PsI6TJnjQRowvY9DAHiWTlJZBBY/pko3hxX
TsIeybpvRdEHpDWv86/iqtw1hv9CUxS/8ZTWUgBo+osShHW79FeDASr9FC4/Zn76
ZDERTgV4YWlW/klVWcG2lFo7jix+OPXAB+ZQavLhlN1xdWBcIz1AUWjAM4hdPylW
HCX4PB9CQIPl2E7F+Y2p6nMcMWSJVBi5UIH7E9LfaBguXSzMmTk2Fw5p1aOQ6wfN
goVAMVwi8ppAVs741PfHdZ295xMmK/1LCxz5DeAdD/tsA/SYfT753GotioDuC7im
EyJ5JyvTr5I6RFFBuqt3NlUb3Hp16wP3B2x9DZiB6jxr0l341/NHgsyeBXkuIy9j
ON2mvpBPCJhS8kgWo3G0UyyKnx64tcgpGuSvZhGwPz843B6AbYyE6pMRfSWRMkMS
YZYa+VNKhR4ixdj07ocFZEWLVjCH7kxkE8JZXKt8jKYmkWd0lS1QVjgaKlO6lRa3
q6SPJkhW6pvqobvcqVNXwi1XuzpZeEbuh0B7OTekFTTxx5g9XeDl56M8SVQ1KEhT
Q1t7H2Nba18WCB7cf+6PN0F0K0Jz1Kq7ZWaqEI/grX1m4RQuvNF5807sB/QKMO/Z
Gz3NXvHg5xTJRd/567lxPGkor0cE7qD1EZfmJ2HrBYXQ91bhgA7LToBuMZo6ZRXH
QfsanjbP4FPLMiGdQigLjj3A35L/f4sQOOVac/sRaFnm7pzcxsMvyVU/YtvGcjYE
xaOOVnamg661Wo0wksXoDjeSz/JIyyKO3Gwp1FSm2wGLjjy/Ehmqcqy8rvHuf07w
AUukhVtTNn4=
-----END ENCRYPTED PRIVATE KEY-----

View file

@ -0,0 +1,57 @@
-----BEGIN CERTIFICATE-----
MIIDKTCCAhGgAwIBAgIJAOWmjTS+OnTEMA0GCSqGSIb3DQEBCwUAMBcxFTATBgNV
BAMMDGludGVybWVkaWF0ZTAeFw0xNTA1MTgwOTAwNDBaFw0yMzA4MDQwOTAwNDBa
MBQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
AQoCggEBALlcwQJuzd+xH8QFgfJSn5tRlvhkldSX98cE7NiA602NBbnAVyUrkRXq
Ni75lgt0kwjYfA9z674m8WSVbgpLPintPCla9CYky1TH0keIs8Rz6cGWHryWEHiu
EDuljQynu2b3sAFuHu9nfWurbJwZnFakBKpdQ9m4EyOZCHC/jHYY7HacKSXg1Cki
we2ca0BWDrcqy8kLy0dZ5oC6IZG8O8drAK8f3f44CRYw59D3sOKBrKXaabpvyEcb
N7Wk2HDBVwHpUJo1reVwtbM8dhqQayYSD8oXnGpP3RQNu/e2rzlXRyq/BfcDY1JI
7TbC4t/7/N4EcPSpGsTcSOC9A7FpzvECAwEAAaN7MHkwCQYDVR0TBAIwADAsBglg
hkgBhvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0O
BBYEFCnwiEMMFZh7NhCr+qA8K0w4Q+AOMB8GA1UdIwQYMBaAFB0h1Evsaw2vfrmS
YuoCTmC4EE6ZMA0GCSqGSIb3DQEBCwUAA4IBAQAcFmHMaXRxyoNaeOowQ6iQWoZd
AUbvG7SHr7I6Pi2aqdqofsKWts7Ytm5WsS0M2nN+sW504houu0iCPeJJX8RQw2q4
CCcNOs9IXk+2uMzlpocHpv+yYoUiD5DxgWh7eghQMLyMpf8FX3Gy4VazeuXznHOM
4gE4L417xkDzYOzqVTp0FTyAPUv6G2euhNCD6TMru9REcRhYul+K9kocjA5tt2KG
MH6y28LXbLyq4YJUxSUU9gY/xlnbbZS48KDqEcdYC9zjW9nQ0qS+XQuQuFIcwjJ5
V4kAUYxDu6FoTpyQjgsrmBbZlKNxH7Nj4NDlcdJhp/zeSKHqWa5hSWjjKIxp
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDAjCCAeqgAwIBAgIJAOWmjTS+OnTDMA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV
BAMMDXJvb3RhdXRob3JpdHkwHhcNMTUwNTE4MDkwMDQwWhcNMjMwODA0MDkwMDQw
WjAXMRUwEwYDVQQDDAxpbnRlcm1lZGlhdGUwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQDSrAO1CoPvUllgLOzDm5nG0skDF7vh1DUgAIDVGz0ecD0JFbQx
EF79pju/6MbtpTW2FYvRp11t/G7rGtX923ybOHY/1MNFQrdIvPlO1VV7IGKjoMwP
DNeb0fIGjHoE9QxaDxR8NX8xQbItpsw+TUtRfc9SLkR+jaYJfVRoM21BOncZbSHE
YKiZlEbpecB/+EtwVpgvl+8mPD5U07Fi4fp/lza3WXInXQPyiTVllIEJCt4PKmlu
MocNaJOW38bysL7i0PzDpVZtOxLHOTaW68yF3FckIHNCaA7k1ABEEEegjFMmIao7
B9w7A0jvr4jZVvNmui5Djjn+oJxwEVVgyf8LAgMBAAGjUDBOMB0GA1UdDgQWBBQd
IdRL7GsNr365kmLqAk5guBBOmTAfBgNVHSMEGDAWgBRk81s9d0ZbiZhh44KckwPb
oTc0XzAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBZQTK0plfdB5PC
cC5icut4EmrByJa1RbU7ayuEE70e7hla6KVmVjVdCBGltI4jBYwfhKbRItHiAJ/8
x+XZKBG8DLPFuDb7lAa1ObhAYF7YThUFPQYaBhfzKcWrdmWDBFpvNv6E0Mm364dZ
e7Yxmbe5S4agkYPoxEzgEYmcUk9jbjdR6eTbs8laG169ljrECXfEU9RiAcqz5iSX
NLSewqB47hn3B9qgKcQn+PsgO2j7M+rfklhNgeGJeWmy7j6clSOuCsIjWHU0RLQ4
0W3SB/rpEAJ7fgQbYUPTIUNALSOWi/o1tDX2mXPRjBoxqAv7I+vYk1lZPmSzkyRh
FKvRDxsW
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDAzCCAeugAwIBAgIJAJ0MomS4Ck+8MA0GCSqGSIb3DQEBCwUAMBgxFjAUBgNV
BAMMDXJvb3RhdXRob3JpdHkwHhcNMTUwNTE4MDkwMDQwWhcNMjMwODA0MDkwMDQw
WjAYMRYwFAYDVQQDDA1yb290YXV0aG9yaXR5MIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAts1ijtBV92S2cOvpUMOSTp9c6A34nIGr0T5Nhz6XiqRVT+gv
dQgmkdKJQjbvR60y6jzltYFsI2MpGVXY8h/oAL81D/k7PDB2aREgyBfTPAhBHyGw
siR+2xYt5b/Zs99q5RdRqQNzNpLPJriIKvUsRyQWy1UiG2s7pRXQeA8qB0XtJdCj
kFIi+G2bDsaffspGeDOCqt7t+yqvRXfSES0c/l7DIHaiMbbp4//ZNML3RNgAjPz2
hCezZ+wOYajOIyoSPK8IgICrhYFYxvgWxwbLDBEfC5B3jOQsySe10GoRAKZz1gBV
DmgReu81tYJmdgkc9zknnQtIFdA0ex+GvZlfWQIDAQABo1AwTjAdBgNVHQ4EFgQU
ZPNbPXdGW4mYYeOCnJMD26E3NF8wHwYDVR0jBBgwFoAUZPNbPXdGW4mYYeOCnJMD
26E3NF8wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEATzkZ97K777uZ
lQcduNX3ey4IbCiEzFA2zO5Blj+ilfIwNbZXNOgm/lqNvVGDYs6J1apJJe30vL3X
J+t2zsZWzzQzb9uIU37zYemt6m0fHrSrx/iy5lGNqt3HMfqEcOqSCOIK3PCTMz2/
uyGe1iw33PVeWsm1JUybQ9IrU/huJjbgOHU4wab+8SJCM49ipArp68Fr6j4lcEaE
4rfRg1ZsvxiOyUB3qPn6wyL/JB8kOJ+QCBe498376eaem8AEFk0kQRh6hDaWtq/k
t6IIXQLjx+EBDVP/veK0UnVhKRP8YTOoV8ZiG1NcdlJmX/Uk7iAfevP7CkBfSN8W
r6AL284qtw==
-----END CERTIFICATE-----

View file

@ -0,0 +1,30 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Angel WS</title>
</head>
<body>
<script>
var url = location.protocol === "https:" ? "wss://" : "ws://";
url += location.hostname;
if (location.port) url += ":" + location.port;
url += "/ws";
var ws = new WebSocket(url);
window.ws = ws;
ws.onmessage = function(msg) {
console.info(JSON.parse(JSON.parse(msg.data).data));
};
window.sendWs = function(msg) {
var data = { type: "ping", data: msg };
ws.send(JSON.stringify(data));
};
console.info('Connected! Type sendWs("Hey!") to play around.');
</script>
</body>
</html>

View file

@ -0,0 +1,56 @@
import 'dart:io';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/http.dart';
import 'package:angel_framework/http2.dart';
import 'package:angel_websocket/server.dart';
import 'package:file/local.dart';
import 'package:logging/logging.dart';
main(List<String> args) async {
var app = new Angel();
var http = new AngelHttp(app);
var ws = new AngelWebSocket(app, sendErrors: !app.environment.isProduction);
var fs = const LocalFileSystem();
app.logger = new Logger('angel_websocket');
// This is a plug-in. It hooks all your services,
// to automatically broadcast events.
await app.configure(ws.configureServer);
app.get('/', (req, res) => res.streamFile(fs.file('example/index.html')));
// Listen for requests at `/ws`.
app.get('/ws', ws.handleRequest);
app.fallback((req, res) => throw AngelHttpException.notFound());
ws.onConnection.listen((socket) {
socket.onData.listen((x) {
socket.send('pong', x);
});
});
if (args.contains('http2')) {
var ctx = new SecurityContext()
..useCertificateChain('dev.pem')
..usePrivateKey('dev.key', password: 'dartdart');
try {
ctx.setAlpnProtocols(['h2'], true);
} catch (e, st) {
app.logger.severe(
'Cannot set ALPN protocol on server to `h2`. The server will only serve HTTP/1.x.',
e,
st,
);
}
var http2 = new AngelHttp2(app, ctx);
http2.onHttp1.listen(http.handleRequest);
await http2.startServer('127.0.0.1', 3000);
print('Listening at ${http2.uri}');
} else {
await http.startServer('127.0.0.1', 3000);
print('Listening at ${http.uri}');
}
}

View file

@ -0,0 +1,46 @@
/// WebSocket plugin for Angel.
library angel_websocket;
/// A notification from the server that something has occurred.
class WebSocketEvent<Data> {
String eventName;
Data data;
WebSocketEvent({String this.eventName, this.data});
factory WebSocketEvent.fromJson(Map data) => new WebSocketEvent(
eventName: data['eventName'].toString(), data: data['data'] as Data);
WebSocketEvent<T> cast<T>() {
if (T == Data) {
return this as WebSocketEvent<T>;
} else {
return new WebSocketEvent<T>(eventName: eventName, data: data as T);
}
}
Map<String, dynamic> toJson() {
return {'eventName': eventName, 'data': data};
}
}
/// A command sent to the server, usually corresponding to a service method.
class WebSocketAction {
String id;
String eventName;
var data;
Map<String, dynamic> params;
WebSocketAction(
{String this.id, String this.eventName, this.data, this.params});
factory WebSocketAction.fromJson(Map data) => new WebSocketAction(
id: data['id'].toString(),
eventName: data['eventName'].toString(),
data: data['data'],
params: data['params'] as Map<String, dynamic>);
Map<String, dynamic> toJson() {
return {'id': id, 'eventName': eventName, 'data': data, 'params': params};
}
}

View file

@ -0,0 +1,446 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:angel_client/angel_client.dart';
import 'package:angel_client/base_angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:http/src/base_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'angel_websocket.dart';
import 'constants.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// An [Angel] client that operates across WebSockets.
abstract class BaseWebSocketClient extends BaseAngelClient {
Duration _reconnectInterval;
WebSocketChannel _socket;
final Queue<WebSocketAction> _queue = new Queue<WebSocketAction>();
final StreamController _onData = new StreamController();
final StreamController<WebSocketEvent> _onAllEvents =
new StreamController<WebSocketEvent>();
final StreamController<AngelAuthResult> _onAuthenticated =
new StreamController<AngelAuthResult>();
final StreamController<AngelHttpException> _onError =
new StreamController<AngelHttpException>();
final StreamController<Map<String, WebSocketEvent>> _onServiceEvent =
new StreamController<Map<String, WebSocketEvent>>.broadcast();
final StreamController<WebSocketChannelException>
_onWebSocketChannelException =
new StreamController<WebSocketChannelException>();
/// Use this to handle events that are not standard.
final WebSocketExtraneousEventHandler on =
new WebSocketExtraneousEventHandler();
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired whenever a WebSocket is successfully authenticated.
Stream<AngelAuthResult> get onAuthenticated => _onAuthenticated.stream;
/// A broadcast stream of data coming from the [socket].
///
/// Mostly just for internal use.
Stream get onData => _onData.stream;
/// Fired on errors.
Stream<AngelHttpException> get onError => _onError.stream;
/// Fired whenever an event is fired by a service.
Stream<Map<String, WebSocketEvent>> get onServiceEvent =>
_onServiceEvent.stream;
/// Fired on [WebSocketChannelException]s.
Stream<WebSocketChannelException> get onWebSocketChannelException =>
_onWebSocketChannelException.stream;
/// The [WebSocketChannel] underneath this instance.
WebSocketChannel get socket => _socket;
/// If `true` (default), then the client will automatically try to reconnect to the server
/// if the socket closes.
final bool reconnectOnClose;
/// The amount of time to wait between reconnect attempts. Default: 10 seconds.
Duration get reconnectInterval => _reconnectInterval;
Uri _wsUri;
/// The [Uri] to which a websocket should point.
Uri get websocketUri => _wsUri ??= _toWsUri(baseUrl);
static Uri _toWsUri(Uri u) {
if (u.hasScheme) {
if (u.scheme == 'http') {
return u.replace(scheme: 'ws');
} else if (u.scheme == 'https') {
return u.replace(scheme: 'wss');
} else {
return u;
}
} else {
return _toWsUri(u.replace(scheme: Uri.base.scheme));
}
}
BaseWebSocketClient(http.BaseClient client, baseUrl,
{this.reconnectOnClose = true, Duration reconnectInterval})
: super(client, baseUrl) {
_reconnectInterval = reconnectInterval ?? new Duration(seconds: 10);
}
@override
Future close() async {
on._close();
scheduleMicrotask(() async {
await _socket.sink.close(status.goingAway);
await _onData.close();
await _onAllEvents.close();
await _onAuthenticated.close();
await _onError.close();
await _onServiceEvent.close();
await _onWebSocketChannelException.close();
});
}
/// Connects the WebSocket. [timeout] is optional.
Future<WebSocketChannel> connect({Duration timeout}) async {
if (timeout != null) {
var c = new Completer<WebSocketChannel>();
Timer timer;
timer = new Timer(timeout, () {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(new TimeoutException(
'WebSocket connection exceeded timeout of ${timeout.inMilliseconds} ms',
timeout));
}
});
scheduleMicrotask(() {
return getConnectedWebSocket().then((socket) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
while (_queue.isNotEmpty) {
var action = _queue.removeFirst();
socket.sink.add(serialize(action));
}
c.complete(socket);
}
}).catchError((e, StackTrace st) {
if (!c.isCompleted) {
if (timer.isActive) timer.cancel();
c.completeError(e, st);
}
});
});
return await c.future.then((socket) {
_socket = socket;
listen();
});
} else {
_socket = await getConnectedWebSocket();
listen();
return _socket;
}
}
/// Returns a new [WebSocketChannel], ready to be listened on.
///
/// This should be overriden by child classes, **NOT** [connect].
Future<WebSocketChannel> getConnectedWebSocket();
@override
WebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
String uri = path.toString().replaceAll(_straySlashes, '');
return new WebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer);
}
/// Starts listening for data.
void listen() {
_socket?.stream?.listen(
(data) {
_onData.add(data);
if (data is WebSocketChannelException) {
_onWebSocketChannelException.add(data);
} else if (data is String) {
var jsons = json.decode(data);
if (jsons is Map) {
var event = new WebSocketEvent.fromJson(jsons);
if (event.eventName?.isNotEmpty == true) {
_onAllEvents.add(event);
on._getStream(event.eventName).add(event);
}
if (event.eventName == errorEvent) {
var error =
new AngelHttpException.fromMap((event.data ?? {}) as Map);
_onError.add(error);
} else if (event.eventName == authenticatedEvent) {
var authResult = new AngelAuthResult.fromMap(event.data as Map);
_onAuthenticated.add(authResult);
} else if (event.eventName?.isNotEmpty == true) {
var split = event.eventName
.split("::")
.where((str) => str.isNotEmpty)
.toList();
if (split.length >= 2) {
var serviceName = split[0], eventName = split[1];
_onServiceEvent
.add({serviceName: event..eventName = eventName});
}
}
}
}
},
cancelOnError: true,
onDone: () {
_socket = null;
if (reconnectOnClose == true) {
new Timer.periodic(reconnectInterval, (Timer timer) async {
var result;
try {
result = await connect(timeout: reconnectInterval);
} catch (e) {
//
}
if (result != null) timer.cancel();
});
}
});
}
/// Serializes data to JSON.
serialize(x) => json.encode(x);
/// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) {
if (_socket == null)
_queue.addLast(action);
else
socket.sink.add(serialize(action));
}
/// Attempts to authenticate a WebSocket, using a valid JWT.
void authenticateViaJwt(String jwt) {
sendAction(new WebSocketAction(
eventName: authenticateAction,
params: {
'query': {'jwt': jwt}
},
));
}
}
/// A [Service] that asynchronously interacts with the server.
class WebSocketsService<Id, Data> extends Service<Id, Data> {
/// The [BaseWebSocketClient] that spawned this service.
@override
final BaseWebSocketClient app;
/// Used to deserialize JSON into typed data.
final AngelDeserializer<Data> deserializer;
/// The [WebSocketChannel] to listen to, and send data across.
final WebSocketChannel socket;
/// The service path to listen to.
final String path;
final StreamController<WebSocketEvent> _onAllEvents =
new StreamController<WebSocketEvent>();
final StreamController<List<Data>> _onIndexed = new StreamController();
final StreamController<Data> _onRead = new StreamController<Data>();
final StreamController<Data> _onCreated = new StreamController<Data>();
final StreamController<Data> _onModified = new StreamController<Data>();
final StreamController<Data> _onUpdated = new StreamController<Data>();
final StreamController<Data> _onRemoved = new StreamController<Data>();
/// Fired on all events.
Stream<WebSocketEvent> get onAllEvents => _onAllEvents.stream;
/// Fired on `index` events.
Stream<List<Data>> get onIndexed => _onIndexed.stream;
/// Fired on `read` events.
Stream<Data> get onRead => _onRead.stream;
/// Fired on `created` events.
Stream<Data> get onCreated => _onCreated.stream;
/// Fired on `modified` events.
Stream<Data> get onModified => _onModified.stream;
/// Fired on `updated` events.
Stream<Data> get onUpdated => _onUpdated.stream;
/// Fired on `removed` events.
Stream<Data> get onRemoved => _onRemoved.stream;
WebSocketsService(this.socket, this.app, this.path, {this.deserializer}) {
listen();
}
Future close() async {
await _onAllEvents.close();
await _onCreated.close();
await _onIndexed.close();
await _onModified.close();
await _onRead.close();
await _onRemoved.close();
await _onUpdated.close();
}
/// Serializes an [action] to be sent over a WebSocket.
serialize(WebSocketAction action) => json.encode(action);
/// Deserializes data from a [WebSocketEvent].
Data deserialize(x) {
return deserializer != null ? deserializer(x) : x as Data;
}
/// Deserializes the contents of an [event].
WebSocketEvent<Data> transformEvent(WebSocketEvent event) {
return new WebSocketEvent(
eventName: event.eventName, data: deserialize(event.data));
}
/// Starts listening for events.
void listen() {
app.onServiceEvent.listen((map) {
if (map.containsKey(path)) {
var event = map[path];
_onAllEvents.add(event);
if (event.eventName == indexedEvent) {
var d = event.data;
var transformed = new WebSocketEvent(
eventName: event.eventName,
data: d is Iterable ? d.map(deserialize).toList() : null);
if (transformed.data != null) _onIndexed.add(transformed.data);
return;
}
var transformed = transformEvent(event).data;
switch (event.eventName) {
case readEvent:
_onRead.add(transformed);
break;
case createdEvent:
_onCreated.add(transformed);
break;
case modifiedEvent:
_onModified.add(transformed);
break;
case updatedEvent:
_onUpdated.add(transformed);
break;
case removedEvent:
_onRemoved.add(transformed);
break;
}
}
});
}
/// Sends the given [action] on the [socket].
void send(WebSocketAction action) {
app.sendAction(action);
}
@override
Future<List<Data>> index([Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$indexAction', params: params ?? {}));
return null;
}
@override
Future<Data> read(id, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$readAction',
id: id.toString(),
params: params ?? {}));
return null;
}
@override
Future<Data> create(data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$createAction', data: data, params: params ?? {}));
return null;
}
@override
Future<Data> modify(id, data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$modifyAction',
id: id.toString(),
data: data,
params: params ?? {}));
return null;
}
@override
Future<Data> update(id, data, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$updateAction',
id: id.toString(),
data: data,
params: params ?? {}));
return null;
}
@override
Future<Data> remove(id, [Map<String, dynamic> params]) async {
app.sendAction(new WebSocketAction(
eventName: '$path::$removeAction',
id: id.toString(),
params: params ?? {}));
return null;
}
/// No longer necessary.
@deprecated
Service unwrap() => this;
}
/// Contains a dynamic Map of [WebSocketEvent] streams.
class WebSocketExtraneousEventHandler {
Map<String, StreamController<WebSocketEvent>> _events = {};
StreamController<WebSocketEvent> _getStream(String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();
return _events[index];
}
Stream<WebSocketEvent> operator [](String index) {
if (_events[index] == null)
_events[index] = new StreamController<WebSocketEvent>();
return _events[index].stream;
}
void _close() {
_events.values.forEach((s) => s.close());
}
}

View file

@ -0,0 +1,110 @@
/// Browser WebSocket client library for the Angel framework.
library angel_websocket.browser;
import 'dart:async';
import 'dart:html';
import 'package:angel_client/angel_client.dart';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:http/browser_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/html.dart';
import 'base_websocket_client.dart';
export 'angel_websocket.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<BrowserWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
: super(new http.BrowserClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token', String errorMessage}) {
var ctrl = new StreamController<String>();
var wnd = window.open(url, 'angel_client_auth_popup');
Timer t;
StreamSubscription<Event> sub;
t = new Timer.periodic(new Duration(milliseconds: 500), (timer) {
if (!ctrl.isClosed) {
if (wnd.closed) {
ctrl.addError(new AngelHttpException.notAuthenticated(
message:
errorMessage ?? 'Authentication via popup window failed.'));
ctrl.close();
timer.cancel();
sub?.cancel();
}
} else
timer.cancel();
});
sub = window.on[eventName ?? 'token'].listen((e) {
if (!ctrl.isClosed) {
ctrl.add((e as CustomEvent).detail.toString());
t.cancel();
ctrl.close();
sub.cancel();
}
});
return ctrl.stream;
}
@override
Future<WebSocketChannel> getConnectedWebSocket() {
var url = websocketUri;
if (authToken?.isNotEmpty == true) {
url = url.replace(
queryParameters: new Map<String, String>.from(url.queryParameters)
..['token'] = authToken);
}
var socket = new WebSocket(url.toString());
var completer = new Completer<WebSocketChannel>();
socket
..onOpen.listen((_) {
if (!completer.isCompleted)
return completer.complete(new HtmlWebSocketChannel(socket));
})
..onError.listen((e) {
if (!completer.isCompleted)
return completer.completeError(e is ErrorEvent ? e.error : e);
});
return completer.future;
}
@override
BrowserWebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
String uri = path.replaceAll(_straySlashes, '');
return new BrowserWebSocketsService<Id, Data>(socket, this, uri,
deserializer: deserializer);
}
}
class BrowserWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type type;
BrowserWebSocketsService(WebSocketChannel socket, WebSockets app, String uri,
{this.type, AngelDeserializer<Data> deserializer})
: super(socket, app, uri, deserializer: deserializer);
}

View file

@ -0,0 +1,87 @@
const String authenticateAction = 'authenticate';
const String indexAction = 'index';
const String readAction = 'read';
const String createAction = 'create';
const String modifyAction = 'modify';
const String updateAction = 'update';
const String removeAction = 'remove';
@deprecated
const String ACTION_AUTHENTICATE = authenticateAction;
@deprecated
const String ACTION_INDEX = indexAction;
@deprecated
const String ACTION_READ = readAction;
@deprecated
const String ACTION_CREATE = createAction;
@deprecated
const String ACTION_MODIFY = modifyAction;
@deprecated
const String ACTION_UPDATE = updateAction;
@deprecated
const String ACTION_REMOVE = removeAction;
const String authenticatedEvent = 'authenticated';
const String errorEvent = 'error';
const String indexedEvent = 'indexed';
const String readEvent = 'read';
const String createdEvent = 'created';
const String modifiedEvent = 'modified';
const String updatedEvent = 'updated';
const String removedEvent = 'removed';
@deprecated
const String EVENT_AUTHENTICATED = authenticatedEvent;
@deprecated
const String EVENT_ERROR = errorEvent;
@deprecated
const String EVENT_INDEXED = indexedEvent;
@deprecated
const String EVENT_READ = readEvent;
@deprecated
const String EVENT_CREATED = createdEvent;
@deprecated
const String EVENT_MODIFIED = modifiedEvent;
@deprecated
const String EVENT_UPDATED = updatedEvent;
@deprecated
const String EVENT_REMOVED = removedEvent;
/// The standard Angel service actions.
const List<String> actions = const <String>[
indexAction,
readAction,
createAction,
modifyAction,
updateAction,
removeAction
];
@deprecated
const List<String> ACTIONS = actions;
/// The standard Angel service events.
const List<String> events = const <String>[
indexedEvent,
readEvent,
createdEvent,
modifiedEvent,
updatedEvent,
removedEvent
];
@deprecated
const List<String> EVENTS = events;

View file

@ -0,0 +1,50 @@
/// Flutter-compatible WebSocket client library for the Angel framework.
library angel_websocket.flutter;
import 'dart:async';
import 'dart:io';
import 'package:http/http.dart' as http;
import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';
// final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<WebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
: super(new http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw new UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.');
}
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'}
: {});
return new IOWebSocketChannel(socket);
}
}

View file

@ -0,0 +1,30 @@
import 'package:angel_framework/angel_framework.dart';
/// Prevents a WebSocket event from being broadcasted, to any client from the given [provider].
///
/// [provider] can be a String, a [Provider], or an Iterable.
/// If [provider] is `null`, any provider will be blocked.
HookedServiceEventListener doNotBroadcast([provider]) {
return (HookedServiceEvent e) {
if (e.params != null && e.params.containsKey('provider')) {
bool deny = false;
Iterable providers = provider is Iterable ? provider : [provider];
for (var p in providers) {
if (deny) break;
if (p is Providers) {
deny = deny ||
p == e.params['provider'] ||
e.params['provider'] == p.via;
} else if (p == null) {
deny = true;
} else
deny =
deny || (e.params['provider'] as Providers).via == p.toString();
}
e.params['broadcast'] = false;
}
};
}

View file

@ -0,0 +1,66 @@
/// Command-line WebSocket client library for the Angel framework.
library angel_websocket.io;
import 'dart:async';
import 'dart:io';
import 'package:angel_client/angel_client.dart';
import 'package:http/http.dart' as http;
import 'package:http/io_client.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'base_websocket_client.dart';
export 'package:angel_client/angel_client.dart';
export 'angel_websocket.dart';
final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
/// Queries an Angel server via WebSockets.
class WebSockets extends BaseWebSocketClient {
final List<IoWebSocketsService> _services = [];
WebSockets(baseUrl,
{bool reconnectOnClose = true, Duration reconnectInterval})
: super(new http.IOClient(), baseUrl,
reconnectOnClose: reconnectOnClose,
reconnectInterval: reconnectInterval);
@override
Stream<String> authenticateViaPopup(String url,
{String eventName = 'token'}) {
throw new UnimplementedError(
'Opening popup windows is not supported in the `dart:io` client.');
}
@override
Future close() {
for (var service in _services) {
service.close();
}
return super.close();
}
@override
Future<WebSocketChannel> getConnectedWebSocket() async {
var socket = await WebSocket.connect(websocketUri.toString(),
headers: authToken?.isNotEmpty == true
? {'Authorization': 'Bearer $authToken'}
: {});
return new IOWebSocketChannel(socket);
}
@override
IoWebSocketsService<Id, Data> service<Id, Data>(String path,
{Type type, AngelDeserializer<Data> deserializer}) {
String uri = path.replaceAll(_straySlashes, '');
return new IoWebSocketsService<Id, Data>(socket, this, uri, type);
}
}
class IoWebSocketsService<Id, Data> extends WebSocketsService<Id, Data> {
final Type type;
IoWebSocketsService(
WebSocketChannel socket, WebSockets app, String uri, this.type)
: super(socket, app, uri);
}

View file

@ -0,0 +1,454 @@
/// Server-side support for WebSockets.
library angel_websocket.server;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:mirrors';
import 'package:angel_auth/angel_auth.dart';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_framework/http.dart';
import 'package:angel_framework/http2.dart';
import 'package:merge_map/merge_map.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'angel_websocket.dart';
import 'constants.dart';
export 'angel_websocket.dart';
part 'websocket_context.dart';
part 'websocket_controller.dart';
typedef String WebSocketResponseSerializer(data);
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
class AngelWebSocket {
List<WebSocketContext> _clients = <WebSocketContext>[];
final List<String> _servicesAlreadyWired = [];
final StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>();
final StreamController _onData = new StreamController();
final StreamController<WebSocketContext> _onConnection =
new StreamController<WebSocketContext>.broadcast();
final StreamController<WebSocketContext> _onDisconnect =
new StreamController<WebSocketContext>.broadcast();
final Angel app;
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
final bool allowClientParams;
/// An optional whitelist of allowed client origins, or [:null:].
final List<String> allowedOrigins;
/// An optional whitelist of allowed client protocols, or [:null:].
final List<String> allowedProtocols;
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
/// Send error information across WebSockets, without including debug information..
final bool sendErrors;
/// A list of clients currently connected to this server via WebSockets.
List<WebSocketContext> get clients => new List.unmodifiable(_clients);
/// Services that have already been hooked to fire socket events.
List<String> get servicesAlreadyWired =>
new List.unmodifiable(_servicesAlreadyWired);
/// Used to notify other nodes of an event's firing. Good for scaled applications.
final StreamChannel<WebSocketEvent> synchronizationChannel;
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired whenever a WebSocket sends data.
Stream get onData => _onData.stream;
/// Fired on incoming connections.
Stream<WebSocketContext> get onConnection => _onConnection.stream;
/// Fired when a user disconnects.
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
/// Serializes data to WebSockets.
WebSocketResponseSerializer serializer;
/// Deserializes data from WebSockets.
Function deserializer;
AngelWebSocket(this.app,
{this.sendErrors = false,
this.allowClientParams = false,
this.allowAuth = true,
this.synchronizationChannel,
this.serializer,
this.deserializer,
this.allowedOrigins,
this.allowedProtocols}) {
if (serializer == null) serializer = json.encode;
if (deserializer == null) deserializer = (params) => params;
}
HookedServiceEventListener serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) return;
var event = await transformEvent(e);
event.eventName = "$path::${event.eventName}";
_filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter'))
return e.service.configuration['ws:filter'](e, socket);
else if (e.params != null && e.params.containsKey('ws:filter'))
return e.params['ws:filter'](e, socket);
else
return true;
}
await batchEvent(event, filter: _filter);
};
}
/// Slates an event to be dispatched.
Future<void> batchEvent(WebSocketEvent event,
{filter(WebSocketContext socket), bool notify = true}) async {
// Default implementation will just immediately fire events
_clients.forEach((client) async {
dynamic result = true;
if (filter != null) result = await filter(client);
if (result == true) {
client.channel.sink.add((serializer ?? json.encode)(event.toJson()));
}
});
if (synchronizationChannel != null && notify != false)
synchronizationChannel.sink.add(event);
}
/// Returns a list of events yet to be sent.
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
var split = action.eventName.split("::");
if (split.length < 2) {
socket.sendError(new AngelHttpException.badRequest());
return null;
}
var service = app.findService(split[0]);
if (service == null) {
socket.sendError(new AngelHttpException.notFound(
message: "No service \"${split[0]}\" exists."));
return null;
}
var actionName = split[1];
if (action.params is! Map) action.params = <String, dynamic>{};
if (allowClientParams != true) {
if (action.params['query'] is Map)
action.params = {'query': action.params['query']};
else
action.params = {};
}
var params = mergeMap<String, dynamic>([
((deserializer ?? (params) => params)(action.params))
as Map<String, dynamic>,
{
"provider": Providers.websocket,
'__requestctx': socket.request,
'__responsectx': socket.response
}
]);
try {
if (actionName == indexAction) {
socket.send(
"${split[0]}::" + indexedEvent, await service.index(params));
return null;
} else if (actionName == readAction) {
socket.send(
"${split[0]}::" + readEvent, await service.read(action.id, params));
return null;
} else if (actionName == createAction) {
return new WebSocketEvent(
eventName: "${split[0]}::" + createdEvent,
data: await service.create(action.data, params));
} else if (actionName == modifyAction) {
return new WebSocketEvent(
eventName: "${split[0]}::" + modifiedEvent,
data: await service.modify(action.id, action.data, params));
} else if (actionName == updateAction) {
return new WebSocketEvent(
eventName: "${split[0]}::" + updatedEvent,
data: await service.update(action.id, action.data, params));
} else if (actionName == removeAction) {
return new WebSocketEvent(
eventName: "${split[0]}::" + removedEvent,
data: await service.remove(action.id, params));
} else {
socket.sendError(new AngelHttpException.methodNotAllowed(
message: "Method Not Allowed: \"$actionName\""));
return null;
}
} catch (e, st) {
catchError(e, st, socket);
}
}
/// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
action.eventName == authenticateAction &&
action.params['query'] is Map &&
action.params['query']['jwt'] is String) {
try {
var auth = socket.request.container.make<AngelAuth>();
var jwt = action.params['query']['jwt'] as String;
AuthToken token;
token = new AuthToken.validate(jwt, auth.hmac);
var user = await auth.deserializer(token.userId);
socket.request
..container.registerSingleton<AuthToken>(token)
..container.registerSingleton(user, as: user.runtimeType as Type);
socket._onAuthenticated.add(null);
socket.send(authenticatedEvent,
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
catchError(e, st, socket);
}
} else {
socket.sendError(new AngelHttpException.badRequest(
message: 'No JWT provided for authentication.'));
}
}
/// Hooks a service up to have its events broadcasted.
hookupService(Pattern _path, HookedService service) {
String path = _path.toString();
service.after(
[
HookedServiceEvent.created,
HookedServiceEvent.modified,
HookedServiceEvent.updated,
HookedServiceEvent.removed
],
serviceHook(path),
);
_servicesAlreadyWired.add(path);
}
/// Runs before firing [onConnection].
Future handleConnect(WebSocketContext socket) async {}
/// Handles incoming data from a WebSocket.
handleData(WebSocketContext socket, data) async {
try {
socket._onData.add(data);
var fromJson = json.decode(data.toString());
var action = new WebSocketAction.fromJson(fromJson as Map);
_onAction.add(action);
if (action.eventName == null ||
action.eventName is! String ||
action.eventName.isEmpty) {
throw new AngelHttpException.badRequest();
}
if (fromJson is Map && fromJson.containsKey("eventName")) {
socket._onAction.add(new WebSocketAction.fromJson(fromJson));
socket.on
._getStreamForEvent(fromJson["eventName"].toString())
.add(fromJson["data"] as Map);
}
if (action.eventName == authenticateAction)
await handleAuth(action, socket);
if (action.eventName.contains("::")) {
var split = action.eventName.split("::");
if (split.length >= 2) {
if (actions.contains(split[1])) {
var event = await handleAction(action, socket);
if (event is Future) event = await event;
}
}
}
} catch (e, st) {
catchError(e, st, socket);
}
}
void catchError(e, StackTrace st, WebSocketContext socket) {
// Send an error
if (e is AngelHttpException) {
socket.sendError(e);
app.logger?.severe(e.message, e.error ?? e, e.stackTrace);
} else if (sendErrors) {
var err = new AngelHttpException(e,
message: e.toString(), stackTrace: st, errors: [st.toString()]);
socket.sendError(err);
app.logger?.severe(err.message, e, st);
} else {
var err = new AngelHttpException(e);
socket.sendError(err);
app.logger?.severe(e.toString(), e, st);
}
}
/// Transforms a [HookedServiceEvent], so that it can be broadcasted.
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
return new WebSocketEvent(eventName: event.eventName, data: event.result);
}
/// Hooks any [HookedService]s that are not being broadcasted yet.
wireAllServices(Angel app) {
for (Pattern key in app.services.keys.where((x) {
return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
hookupService(key, app.services[key] as HookedService);
}
}
/// Configures an [Angel] instance to listen for WebSocket connections.
Future configureServer(Angel app) async {
app..container.registerSingleton(this);
if (runtimeType != AngelWebSocket)
app..container.registerSingleton<AngelWebSocket>(this);
// Set up services
wireAllServices(app);
app.onService.listen((_) {
wireAllServices(app);
});
if (synchronizationChannel != null) {
synchronizationChannel.stream.listen((e) => batchEvent(e, notify: false));
}
app.shutdownHooks.add((_) => synchronizationChannel?.sink?.close());
}
/// Handles an incoming [WebSocketContext].
Future<void> handleClient(WebSocketContext socket) async {
var origin = socket.request.headers.value('origin');
if (allowedOrigins != null && !allowedOrigins.contains(origin)) {
throw new AngelHttpException.forbidden(
message:
'WebSocket connections are not allowed from the origin "$origin".');
}
_clients.add(socket);
await handleConnect(socket);
_onConnection.add(socket);
socket.request.container.registerSingleton<WebSocketContext>(socket);
socket.channel.stream.listen(
(data) {
_onData.add(data);
handleData(socket, data);
},
onDone: () {
_onDisconnect.add(socket);
_clients.remove(socket);
},
onError: (e) {
_onDisconnect.add(socket);
_clients.remove(socket);
},
cancelOnError: true,
);
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
if (req is HttpRequestContext && res is HttpResponseContext) {
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest))
throw new AngelHttpException.badRequest();
await res.detach();
var ws = await WebSocketTransformer.upgrade(req.rawRequest);
var channel = new IOWebSocketChannel(ws);
var socket = new WebSocketContext(channel, req, res);
scheduleMicrotask(() => handleClient(socket));
return false;
} else if (req is Http2RequestContext && res is Http2ResponseContext) {
var connection =
req.headers['connection']?.map((s) => s.toLowerCase().trim());
var upgrade = req.headers.value('upgrade')?.toLowerCase();
var version = req.headers.value('sec-websocket-version');
var key = req.headers.value('sec-websocket-key');
var protocol = req.headers.value('sec-websocket-protocol');
if (connection == null) {
throw new AngelHttpException.badRequest(
message: 'Missing `connection` header.');
} else if (!connection.contains('upgrade')) {
throw new AngelHttpException.badRequest(
message: 'Missing "upgrade" in `connection` header.');
} else if (upgrade != 'websocket') {
throw new AngelHttpException.badRequest(
message: 'The `upgrade` header must equal "websocket".');
} else if (version != '13') {
throw new AngelHttpException.badRequest(
message: 'The `sec-websocket-version` header must equal "13".');
} else if (key == null) {
throw new AngelHttpException.badRequest(
message: 'Missing `sec-websocket-key` header.');
} else if (protocol != null &&
allowedProtocols != null &&
!allowedProtocols.contains(protocol)) {
throw new AngelHttpException.badRequest(
message: 'Disallowed `sec-websocket-protocol` header "$protocol".');
} else {
var stream = res.detach();
var ctrl = new StreamChannelController<List<int>>();
ctrl.local.stream.listen((buf) {
stream.sendData(buf);
}, onDone: () {
stream.outgoingMessages.close();
});
if (req.hasParsedBody) {
await ctrl.local.sink.close();
} else {
await req.body.pipe(ctrl.local.sink);
}
var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink);
sink.add("HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n");
if (protocol != null) sink.add("Sec-WebSocket-Protocol: $protocol\r\n");
sink.add("\r\n");
var ws = new WebSocketChannel(ctrl.foreign);
var socket = new WebSocketContext(ws, req, res);
scheduleMicrotask(() => handleClient(socket));
return false;
}
} else {
throw new ArgumentError(
'Not an HTTP/1.1 or HTTP/2 RequestContext+ResponseContext pair: $req, $res');
}
}
}

View file

@ -0,0 +1,73 @@
part of angel_websocket.server;
/// Represents a WebSocket session, with the original
/// [RequestContext] and [ResponseContext] attached.
class WebSocketContext {
/// Use this to listen for events.
_WebSocketEventTable on = new _WebSocketEventTable();
/// The underlying [StreamChannel].
final StreamChannel channel;
/// The original [RequestContext].
final RequestContext request;
/// The original [ResponseContext].
final ResponseContext response;
StreamController<WebSocketAction> _onAction =
new StreamController<WebSocketAction>();
StreamController<void> _onAuthenticated = StreamController();
StreamController<Null> _onClose = new StreamController<Null>();
StreamController _onData = new StreamController();
/// Fired on any [WebSocketAction];
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired when the user authenticates.
Stream<void> get onAuthenticated => _onAuthenticated.stream;
/// Fired once the underlying [WebSocket] closes.
Stream<Null> get onClose => _onClose.stream;
/// Fired when any data is sent through [channel].
Stream get onData => _onData.stream;
WebSocketContext(this.channel, this.request, this.response);
/// Closes the underlying [StreamChannel].
Future close() async {
scheduleMicrotask(() async {
await channel.sink.close();
await _onAction.close();
await _onAuthenticated.close();
await _onData.close();
await _onClose.add(null);
await _onClose.close();
});
}
/// Sends an arbitrary [WebSocketEvent];
void send(String eventName, data) {
channel.sink.add(json
.encode(new WebSocketEvent(eventName: eventName, data: data).toJson()));
}
/// Sends an error event.
void sendError(AngelHttpException error) => send(errorEvent, error.toJson());
}
class _WebSocketEventTable {
Map<String, StreamController<Map>> _handlers = {};
StreamController<Map> _getStreamForEvent(String eventName) {
if (!_handlers.containsKey(eventName))
_handlers[eventName] = new StreamController<Map>();
return _handlers[eventName];
}
Stream<Map> operator [](String key) => _getStreamForEvent(key).stream;
}

View file

@ -0,0 +1,89 @@
part of angel_websocket.server;
/// Marks a method as available to WebSockets.
class ExposeWs {
final String eventName;
const ExposeWs(this.eventName);
}
/// A special controller that also supports WebSockets.
class WebSocketController extends Controller {
/// The plug-in instance powering this controller.
final AngelWebSocket ws;
Map<String, MethodMirror> _handlers = {};
Map<String, Symbol> _handlerSymbols = {};
WebSocketController(this.ws) : super();
/// Sends an event to all clients.
void broadcast(String eventName, data, {filter(WebSocketContext socket)}) {
ws.batchEvent(new WebSocketEvent(eventName: eventName, data: data),
filter: filter);
}
/// Fired on new connections.
onConnect(WebSocketContext socket) {}
/// Fired on disconnections.
onDisconnect(WebSocketContext socket) {}
/// Fired on all incoming actions.
onAction(WebSocketAction action, WebSocketContext socket) async {}
/// Fired on arbitrary incoming data.
onData(data, WebSocketContext socket) {}
@override
Future configureServer(Angel app) async {
if (findExpose(app.container.reflector) != null)
await super.configureServer(app);
InstanceMirror instanceMirror = reflect(this);
ClassMirror classMirror = reflectClass(this.runtimeType);
classMirror.instanceMembers.forEach((sym, mirror) {
if (mirror.isRegularMethod) {
InstanceMirror exposeMirror = mirror.metadata.firstWhere(
(mirror) => mirror.reflectee is ExposeWs,
orElse: () => null);
if (exposeMirror != null) {
ExposeWs exposeWs = exposeMirror.reflectee as ExposeWs;
_handlers[exposeWs.eventName] = mirror;
_handlerSymbols[exposeWs.eventName] = sym;
}
}
});
ws.onConnection.listen((socket) async {
if (!socket.request.container.has<WebSocketContext>()) {
socket.request.container.registerSingleton<WebSocketContext>(socket);
}
await onConnect(socket);
socket.onData.listen((data) => onData(data, socket));
socket.onAction.listen((WebSocketAction action) async {
var container = socket.request.container.createChild();
container.registerSingleton<WebSocketAction>(action);
try {
await onAction(action, socket);
if (_handlers.containsKey(action.eventName)) {
var methodMirror = _handlers[action.eventName];
var fn = instanceMirror.getField(methodMirror.simpleName).reflectee;
return app.runContained(
fn as Function, socket.request, socket.response, container);
}
} catch (e, st) {
ws.catchError(e, st, socket);
}
});
});
ws.onDisconnection.listen(onDisconnect);
}
}

View file

@ -0,0 +1,23 @@
name: angel_websocket
description: Support for using pkg:angel_client with WebSockets. Designed for Angel.
environment:
sdk: ">=2.0.0-dev <3.0.0"
version: 2.0.3
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket
dependencies:
angel_auth: ^2.0.0-alpha
angel_client: ^2.0.0-alpha
angel_framework: ^2.0.0-rc.0
angel_http_exception: ^1.0.0
http: ">=0.11.0 <0.13.0"
merge_map: ^1.0.0
meta: ^1.0.0
stream_channel: ^2.0.0
web_socket_channel: ^1.0.0
dev_dependencies:
angel_container: ^1.0.0-alpha
angel_model: ^1.0.0
logging: ^0.11.0
pedantic: ^1.0.0
test: ^1.0.0

View file

@ -0,0 +1,63 @@
import 'dart:async';
import 'package:angel_auth/angel_auth.dart';
import 'package:angel_client/io.dart' as c;
import 'package:angel_framework/angel_framework.dart';
import "package:angel_framework/http.dart";
import 'package:angel_websocket/io.dart' as c;
import 'package:angel_websocket/server.dart';
import 'package:logging/logging.dart';
import 'package:test/test.dart';
const Map<String, String> USER = const {'username': 'foo', 'password': 'bar'};
main() {
Angel app;
AngelHttp http;
c.Angel client;
c.WebSockets ws;
setUp(() async {
app = new Angel();
http = new AngelHttp(app, useZone: false);
var auth = new AngelAuth();
auth.serializer = (_) async => 'baz';
auth.deserializer = (_) async => USER;
auth.strategies['local'] = new LocalAuthStrategy(
(username, password) async {
if (username == 'foo' && password == 'bar') return USER;
},
);
app.post('/auth/local', auth.authenticate('local'));
await app.configure(auth.configureServer);
var sock = new AngelWebSocket(app);
await app.configure(sock.configureServer);
app.all('/ws', sock.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
var server = await http.startServer();
client = new c.Rest('http://${server.address.address}:${server.port}');
ws = new c.WebSockets('ws://${server.address.address}:${server.port}/ws');
await ws.connect();
});
tearDown(() {
return Future.wait([
http.close(),
client.close(),
ws.close(),
]);
});
test('auth event fires', () async {
var localAuth = await client.authenticate(type: 'local', credentials: USER);
print('JWT: ${localAuth.token}');
ws.authenticateViaJwt(localAuth.token);
var auth = await ws.onAuthenticated.first;
expect(auth.token, localAuth.token);
});
}

View file

@ -0,0 +1,35 @@
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_websocket/server.dart';
class Game {
final String playerOne, playerTwo;
const Game({this.playerOne, this.playerTwo});
factory Game.fromJson(Map data) => new Game(
playerOne: data['playerOne'].toString(),
playerTwo: data['playerTwo'].toString());
Map<String, dynamic> toJson() {
return {'playerOne': playerOne, 'playerTwo': playerTwo};
}
@override
bool operator ==(other) =>
other is Game &&
other.playerOne == playerOne &&
other.playerTwo == playerTwo;
}
const Game johnVsBob = const Game(playerOne: 'John', playerTwo: 'Bob');
@Expose('/game')
class GameController extends WebSocketController {
GameController(AngelWebSocket ws) : super(ws);
@ExposeWs('search')
search(WebSocketContext socket) async {
print('User is searching for a game...');
socket.send('searched', johnVsBob);
}
}

View file

@ -0,0 +1,70 @@
import 'dart:io';
import 'package:angel_container/mirrors.dart';
import 'package:angel_framework/angel_framework.dart' as srv;
import "package:angel_framework/http.dart" as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
setUp(() async {
app = new srv.Angel(reflector: const MirrorsReflector());
http = new srv.AngelHttp(app, useZone: false);
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
await app.configure(new GameController(websockets).configureServer);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
await client.connect(timeout: new Duration(seconds: 3));
print('Connected');
client
..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client.close();
await http.close();
app = null;
client = null;
server = null;
url = null;
});
group('controller.io', () {
test('search', () async {
client.sendAction(new ws.WebSocketAction(eventName: 'search'));
var search = await client.on['searched'].first;
print('Searched: ${search.data}');
expect(new Game.fromJson(search.data as Map), equals(johnVsBob));
});
});
}

View file

@ -0,0 +1,5 @@
import 'package:test/test.dart';
main() {
group('service.browser', () {});
}

View file

@ -0,0 +1,35 @@
import 'dart:async';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_model/angel_model.dart';
import 'package:angel_websocket/base_websocket_client.dart';
import 'package:angel_websocket/server.dart';
import 'package:test/test.dart';
class Todo extends Model {
String text;
String when;
Todo({String this.text, String this.when});
}
class TodoService extends MapService {
TodoService() : super() {
configuration['ws:filter'] =
(HookedServiceEvent e, WebSocketContext socket) {
print('Hello, service filter world!');
return true;
};
}
}
testIndex(BaseWebSocketClient client) async {
var todoService = client.service('api/todos');
scheduleMicrotask(() => todoService.index());
var indexed = await todoService.onIndexed.first;
print('indexed: $indexed');
expect(indexed, isList);
expect(indexed, isEmpty);
}

View file

@ -0,0 +1,60 @@
import 'dart:io';
import 'package:angel_framework/angel_framework.dart' as srv;
import "package:angel_framework/http.dart" as srv;
import 'package:angel_websocket/io.dart' as ws;
import 'package:angel_websocket/server.dart' as srv;
import 'package:logging/logging.dart';
import 'package:test/test.dart';
import 'common.dart';
main() {
srv.Angel app;
srv.AngelHttp http;
ws.WebSockets client;
srv.AngelWebSocket websockets;
HttpServer server;
String url;
setUp(() async {
app = new srv.Angel()..use('/api/todos', new TodoService());
http = new srv.AngelHttp(app, useZone: false);
websockets = new srv.AngelWebSocket(app)
..onData.listen((data) {
print('Received by server: $data');
});
await app.configure(websockets.configureServer);
app.all('/ws', websockets.handleRequest);
app.logger = new Logger('angel_auth')..onRecord.listen(print);
server = await http.startServer();
url = 'ws://${server.address.address}:${server.port}/ws';
client = new ws.WebSockets(url);
await client.connect();
client
..onData.listen((data) {
print('Received by client: $data');
})
..onError.listen((error) {
// Auto-fail tests on errors ;)
stderr.writeln(error);
error.errors.forEach(stderr.writeln);
throw error;
});
});
tearDown(() async {
await client.close();
await http.close();
app = null;
client = null;
server = null;
url = null;
});
group('service.io', () {
test('index', () => testIndex(client));
});
}

View file

@ -0,0 +1,9 @@
<!DOCTYPE html>
<html>
<head>
<title>Client</title>
</head>
<body>
<script src="main.dart.js"></script>
</body>
</html>

View file

@ -0,0 +1,12 @@
import 'dart:html';
import 'package:angel_websocket/browser.dart';
/// Dummy app to ensure client works with DDC.
main() {
var app = new WebSockets(window.location.origin);
window.alert(app.baseUrl.toString());
app.connect().catchError((_) {
window.alert('no websocket');
});
}