platform/packages/websocket/lib/server.dart

523 lines
17 KiB
Dart
Raw Normal View History

2016-12-23 20:57:46 +00:00
/// Server-side support for WebSockets.
2021-05-15 07:19:35 +00:00
library angel3_websocket.server;
2016-04-29 01:19:09 +00:00
2016-06-27 00:42:21 +00:00
import 'dart:async';
2018-10-02 15:32:06 +00:00
import 'dart:convert';
2016-04-29 01:19:09 +00:00
import 'dart:io';
2016-09-18 02:53:58 +00:00
import 'dart:mirrors';
2021-05-15 07:19:35 +00:00
import 'package:angel3_auth/angel3_auth.dart';
import 'package:angel3_framework/angel3_framework.dart';
import 'package:angel3_framework/http.dart';
import 'package:angel3_framework/http2.dart';
2021-09-25 06:32:32 +00:00
import 'package:belatuk_merge_map/belatuk_merge_map.dart';
2022-02-22 00:07:01 +00:00
import 'package:logging/logging.dart';
2018-11-04 02:11:52 +00:00
import 'package:stream_channel/stream_channel.dart';
2017-09-24 16:19:16 +00:00
import 'package:web_socket_channel/io.dart';
2018-12-09 16:40:09 +00:00
import 'package:web_socket_channel/web_socket_channel.dart';
2021-04-26 00:47:32 +00:00
import 'package:collection/collection.dart' show IterableExtension;
2021-05-15 07:19:35 +00:00
import 'angel3_websocket.dart';
2019-01-06 02:41:46 +00:00
import 'constants.dart';
2021-05-15 07:19:35 +00:00
export 'angel3_websocket.dart';
2016-04-29 01:19:09 +00:00
part 'websocket_context.dart';
2017-06-03 18:13:38 +00:00
2016-09-18 02:53:58 +00:00
part 'websocket_controller.dart';
2016-04-29 01:19:09 +00:00
2021-04-10 15:12:43 +00:00
typedef WebSocketResponseSerializer = String Function(dynamic data);
2018-07-10 16:54:55 +00:00
2016-12-23 20:57:46 +00:00
/// Broadcasts events from [HookedService]s, and handles incoming [WebSocketAction]s.
2017-09-24 04:37:58 +00:00
class AngelWebSocket {
2021-04-10 15:12:43 +00:00
final List<WebSocketContext> _clients = <WebSocketContext>[];
2016-12-23 10:47:21 +00:00
final List<String> _servicesAlreadyWired = [];
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketAction> _onAction =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketAction>();
final StreamController _onData = StreamController();
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketContext> _onConnection =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketContext>.broadcast();
2016-12-23 20:57:46 +00:00
final StreamController<WebSocketContext> _onDisconnect =
2021-02-21 02:47:23 +00:00
StreamController<WebSocketContext>.broadcast();
2017-09-24 04:37:58 +00:00
2021-09-29 07:40:27 +00:00
final Angel app;
2016-12-23 20:57:46 +00:00
2017-02-22 22:34:35 +00:00
/// If this is not `true`, then all client-side service parameters will be
/// discarded, other than `params['query']`.
final bool allowClientParams;
2018-12-09 16:40:09 +00:00
/// An optional whitelist of allowed client origins, or [:null:].
2021-09-29 07:40:27 +00:00
final List<String> allowedOrigins;
2018-12-09 16:40:09 +00:00
/// An optional whitelist of allowed client protocols, or [:null:].
2021-09-29 07:40:27 +00:00
final List<String> allowedProtocols;
2018-12-09 16:40:09 +00:00
2017-02-28 14:15:34 +00:00
/// If `true`, then clients can authenticate their WebSockets by sending a valid JWT.
final bool allowAuth;
2018-07-10 16:54:55 +00:00
/// Send error information across WebSockets, without including debug information..
2017-09-24 04:37:58 +00:00
final bool sendErrors;
2017-01-28 21:38:26 +00:00
2016-12-23 10:47:21 +00:00
/// A list of clients currently connected to this server via WebSockets.
2021-02-21 02:47:23 +00:00
List<WebSocketContext> get clients => List.unmodifiable(_clients);
2016-12-23 10:47:21 +00:00
/// Services that have already been hooked to fire socket events.
2016-12-23 20:57:46 +00:00
List<String> get servicesAlreadyWired =>
2021-02-21 02:47:23 +00:00
List.unmodifiable(_servicesAlreadyWired);
2016-12-23 10:47:21 +00:00
2022-02-22 00:07:01 +00:00
Logger get _log => app.logger;
2017-02-28 14:15:34 +00:00
/// Used to notify other nodes of an event's firing. Good for scaled applications.
2021-04-26 00:47:32 +00:00
final StreamChannel<WebSocketEvent>? synchronizationChannel;
2017-02-28 14:15:34 +00:00
2016-12-23 20:57:46 +00:00
/// Fired on any [WebSocketAction].
Stream<WebSocketAction> get onAction => _onAction.stream;
/// Fired whenever a WebSocket sends data.
Stream get onData => _onData.stream;
2016-12-23 10:47:21 +00:00
/// Fired on incoming connections.
2016-09-18 01:35:16 +00:00
Stream<WebSocketContext> get onConnection => _onConnection.stream;
2016-12-23 10:47:21 +00:00
/// Fired when a user disconnects.
2016-09-18 03:16:51 +00:00
Stream<WebSocketContext> get onDisconnection => _onDisconnect.stream;
2016-04-29 01:19:09 +00:00
2017-04-23 18:40:30 +00:00
/// Serializes data to WebSockets.
2021-04-26 00:47:32 +00:00
WebSocketResponseSerializer? serializer;
2017-04-23 18:40:30 +00:00
/// Deserializes data from WebSockets.
2021-04-26 00:47:32 +00:00
Function? deserializer;
2017-04-23 18:40:30 +00:00
2017-09-24 04:37:58 +00:00
AngelWebSocket(this.app,
2019-01-06 02:41:46 +00:00
{this.sendErrors = false,
this.allowClientParams = false,
this.allowAuth = true,
2018-11-15 16:43:51 +00:00
this.synchronizationChannel,
2017-09-24 04:37:58 +00:00
this.serializer,
2018-12-09 16:40:09 +00:00
this.deserializer,
2021-09-29 07:40:27 +00:00
this.allowedOrigins = const [],
this.allowedProtocols = const []}) {
2021-03-07 16:18:07 +00:00
serializer ??= json.encode;
deserializer ??= (params) => params;
2017-04-23 18:40:30 +00:00
}
2021-04-25 04:38:24 +00:00
/*
2021-04-26 00:47:32 +00:00
* Deprecated. Original code that failed to compile after upgrading
2021-04-25 04:38:24 +00:00
*/
2021-04-26 00:47:32 +00:00
/*
HookedServiceEventListener serviceHookOriginal(String path) {
return (HookedServiceEvent e) async {
2021-04-25 04:38:24 +00:00
if (e.params != null && e.params['broadcast'] == false) {
return;
}
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
2017-02-22 22:34:35 +00:00
2021-04-25 04:38:24 +00:00
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
} else if (e.params != null && e.params.containsKey('ws:filter')) {
return e.params['ws:filter'](e, socket);
} else {
return true;
}
}
await batchEvent(event, filter: _filter);
};
}
FutureOr<dynamic> Function(HookedServiceEvent<dynamic, dynamic, Service> e)
serviceHook(String path) {
return (HookedServiceEvent e) async {
if (e.params != null && e.params['broadcast'] == false) {
return;
}
var event = await transformEvent(e);
2021-03-07 16:18:07 +00:00
event.eventName = '$path::${event.eventName}';
2017-01-29 20:02:19 +00:00
2021-03-07 16:18:07 +00:00
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
2017-10-19 22:26:59 +00:00
return e.service.configuration['ws:filter'](e, socket);
2021-03-07 16:18:07 +00:00
} else if (e.params != null && e.params.containsKey('ws:filter')) {
2017-06-30 23:09:03 +00:00
return e.params['ws:filter'](e, socket);
2021-03-07 16:18:07 +00:00
} else {
2017-01-29 20:02:19 +00:00
return true;
2021-03-07 16:18:07 +00:00
}
2017-01-29 20:02:19 +00:00
}
await batchEvent(event, filter: _filter);
};
}
2021-04-26 00:47:32 +00:00
*/
FutureOr<dynamic> Function(HookedServiceEvent<dynamic, dynamic, Service> e)
serviceHook(String path) {
return (HookedServiceEvent e) async {
2022-02-22 00:07:01 +00:00
if (e.params['broadcast'] == false) return;
2021-04-26 00:47:32 +00:00
var event = await transformEvent(e);
event.eventName = '$path::${event.eventName}';
dynamic _filter(WebSocketContext socket) {
if (e.service.configuration.containsKey('ws:filter')) {
return e.service.configuration['ws:filter'](e, socket);
2022-02-22 00:07:01 +00:00
} else if (e.params.containsKey('ws:filter')) {
return e.params['ws:filter'](e, socket);
2021-04-26 00:47:32 +00:00
} else {
return true;
}
}
await batchEvent(event, filter: _filter);
};
}
2016-12-23 20:57:46 +00:00
/// Slates an event to be dispatched.
2019-01-06 02:41:46 +00:00
Future<void> batchEvent(WebSocketEvent event,
2021-04-26 00:47:32 +00:00
{Function(WebSocketContext socket)? filter, bool notify = true}) async {
// Default implementation will just immediately fire events
2021-09-29 07:40:27 +00:00
for (var client in _clients) {
2018-07-10 16:54:55 +00:00
dynamic result = true;
2021-09-29 07:40:27 +00:00
if (filter != null) {
result = await filter(client);
}
2017-02-12 05:05:57 +00:00
if (result == true) {
2018-11-04 02:11:52 +00:00
client.channel.sink.add((serializer ?? json.encode)(event.toJson()));
2017-02-12 05:05:57 +00:00
}
2021-09-29 07:40:27 +00:00
}
2017-02-28 14:15:34 +00:00
2021-04-10 15:12:43 +00:00
if (synchronizationChannel != null && notify != false) {
2021-04-26 00:47:32 +00:00
synchronizationChannel!.sink.add(event);
2021-04-10 15:12:43 +00:00
}
}
2016-12-23 20:57:46 +00:00
/// Returns a list of events yet to be sent.
Future<List<WebSocketEvent>> getBatchedEvents() async => [];
2016-06-27 00:42:21 +00:00
2016-12-23 20:57:46 +00:00
/// Responds to an incoming action on a WebSocket.
Future handleAction(WebSocketAction action, WebSocketContext socket) async {
2021-04-26 00:47:32 +00:00
var split = action.eventName!.split('::');
2016-06-27 00:42:21 +00:00
2018-07-10 16:54:55 +00:00
if (split.length < 2) {
2021-02-21 02:47:23 +00:00
socket.sendError(AngelHttpException.badRequest());
2018-07-10 16:54:55 +00:00
return null;
}
2016-06-27 00:42:21 +00:00
2021-09-29 07:40:27 +00:00
var service = app.findService(split[0]);
2016-04-29 01:19:09 +00:00
2018-07-10 16:54:55 +00:00
if (service == null) {
2021-02-21 02:47:23 +00:00
socket.sendError(AngelHttpException.notFound(
2021-09-29 07:40:27 +00:00
message: 'No service "${split[0]}" exists.'));
2018-07-10 16:54:55 +00:00
return null;
}
2016-12-23 20:57:46 +00:00
var actionName = split[1];
2022-02-22 00:07:01 +00:00
//if (action.params is! Map) action.params = <String, dynamic>{};
2017-02-22 22:34:35 +00:00
if (allowClientParams != true) {
2022-02-22 00:07:01 +00:00
if (action.params['query'] is Map) {
action.params = {'query': action.params['query']};
2021-04-10 15:12:43 +00:00
} else {
2017-02-22 22:34:35 +00:00
action.params = {};
2021-04-10 15:12:43 +00:00
}
2017-02-22 22:34:35 +00:00
}
2018-10-02 15:32:06 +00:00
var params = mergeMap<String, dynamic>([
2021-04-26 00:47:32 +00:00
(((deserializer ?? (params) => params)(action.params))
2021-09-29 07:40:27 +00:00
as Map<String, dynamic>),
2017-02-12 20:06:54 +00:00
{
2021-04-10 15:12:43 +00:00
'provider': Providers.websocket,
2017-02-12 20:06:54 +00:00
'__requestctx': socket.request,
'__responsectx': socket.response
}
]);
2016-07-06 13:33:40 +00:00
try {
2019-01-06 02:41:46 +00:00
if (actionName == indexAction) {
2018-07-10 16:54:55 +00:00
socket.send(
2021-04-10 15:12:43 +00:00
'${split[0]}::' + indexedEvent, await service.index(params));
2018-07-10 16:54:55 +00:00
return null;
2019-01-06 02:41:46 +00:00
} else if (actionName == readAction) {
socket.send(
2021-04-10 15:12:43 +00:00
'${split[0]}::' + readEvent, await service.read(action.id, params));
2018-07-10 16:54:55 +00:00
return null;
2019-01-06 02:41:46 +00:00
} else if (actionName == createAction) {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(
2021-04-10 15:12:43 +00:00
eventName: '${split[0]}::' + createdEvent,
data: await service.create(action.data, params));
2019-01-06 02:41:46 +00:00
} else if (actionName == modifyAction) {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(
2021-04-10 15:12:43 +00:00
eventName: '${split[0]}::' + modifiedEvent,
data: await service.modify(action.id, action.data, params));
2019-01-06 02:41:46 +00:00
} else if (actionName == updateAction) {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(
2021-04-10 15:12:43 +00:00
eventName: '${split[0]}::' + updatedEvent,
data: await service.update(action.id, action.data, params));
2019-01-06 02:41:46 +00:00
} else if (actionName == removeAction) {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(
2021-04-10 15:12:43 +00:00
eventName: '${split[0]}::' + removedEvent,
data: await service.remove(action.id, params));
} else {
2021-02-21 02:47:23 +00:00
socket.sendError(AngelHttpException.methodNotAllowed(
2021-09-29 07:40:27 +00:00
message: 'Method Not Allowed: $actionName'));
2018-07-10 16:54:55 +00:00
return null;
2016-04-29 01:19:09 +00:00
}
2016-12-23 20:57:46 +00:00
} catch (e, st) {
2021-07-10 04:32:42 +00:00
_log.severe('Unable to handle unknown action');
2017-09-24 04:37:58 +00:00
catchError(e, st, socket);
2016-04-29 01:19:09 +00:00
}
}
2016-04-29 01:19:09 +00:00
2017-02-28 14:15:34 +00:00
/// Authenticates a [WebSocketContext].
Future handleAuth(WebSocketAction action, WebSocketContext socket) async {
if (allowAuth != false &&
2019-01-06 02:41:46 +00:00
action.eventName == authenticateAction &&
2022-02-22 00:07:01 +00:00
action.params['query'] is Map &&
action.params['query']['jwt'] is String) {
2017-02-28 14:15:34 +00:00
try {
2021-10-04 05:13:56 +00:00
var auth = socket.request.container!.make<AngelAuth>();
2022-02-22 00:07:01 +00:00
var jwt = action.params['query']['jwt'] as String;
2017-02-28 14:15:34 +00:00
AuthToken token;
2021-02-21 02:47:23 +00:00
token = AuthToken.validate(jwt, auth.hmac);
2021-09-29 07:40:27 +00:00
var user = await auth.deserializer(token.userId);
2018-08-28 14:17:14 +00:00
socket.request
2021-04-26 00:47:32 +00:00
..container!.registerSingleton<AuthToken>(token)
..container!.registerSingleton(user, as: user.runtimeType);
2019-02-03 19:44:53 +00:00
socket._onAuthenticated.add(null);
2019-01-06 02:41:46 +00:00
socket.send(authenticatedEvent,
2017-02-28 14:15:34 +00:00
{'token': token.serialize(auth.hmac), 'data': user});
} catch (e, st) {
2021-07-10 04:32:42 +00:00
_log.severe('Authentication failed');
2017-09-24 04:37:58 +00:00
catchError(e, st, socket);
2017-02-28 14:15:34 +00:00
}
2017-04-23 18:53:12 +00:00
} else {
2021-02-21 02:47:23 +00:00
socket.sendError(AngelHttpException.badRequest(
2017-04-23 18:53:12 +00:00
message: 'No JWT provided for authentication.'));
2017-02-28 14:15:34 +00:00
}
}
2016-12-23 20:57:46 +00:00
/// Hooks a service up to have its events broadcasted.
2021-03-07 16:18:07 +00:00
dynamic hookupService(Pattern _path, HookedService service) {
var path = _path.toString();
2017-09-24 04:37:58 +00:00
service.after(
[
HookedServiceEvent.created,
HookedServiceEvent.modified,
HookedServiceEvent.updated,
HookedServiceEvent.removed
],
serviceHook(path),
);
2016-12-23 10:47:21 +00:00
_servicesAlreadyWired.add(path);
}
2016-12-23 20:57:46 +00:00
/// Runs before firing [onConnection].
Future handleConnect(WebSocketContext socket) async {}
2016-07-06 13:33:40 +00:00
2016-12-23 20:57:46 +00:00
/// Handles incoming data from a WebSocket.
2021-04-10 15:12:43 +00:00
dynamic handleData(WebSocketContext socket, data) async {
try {
2016-09-18 01:35:16 +00:00
socket._onData.add(data);
2018-07-10 16:54:55 +00:00
var fromJson = json.decode(data.toString());
2021-02-21 02:47:23 +00:00
var action = WebSocketAction.fromJson(fromJson as Map);
2016-12-23 20:57:46 +00:00
_onAction.add(action);
if (action.eventName == null ||
action.eventName is! String ||
2021-04-26 00:47:32 +00:00
action.eventName!.isEmpty) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest();
2016-07-06 13:33:40 +00:00
}
2016-05-03 23:42:06 +00:00
2022-02-22 00:07:01 +00:00
if (fromJson.containsKey('eventName')) {
2021-02-21 02:47:23 +00:00
socket._onAction.add(WebSocketAction.fromJson(fromJson));
2016-12-23 20:57:46 +00:00
socket.on
2021-04-26 00:47:32 +00:00
._getStreamForEvent(fromJson['eventName'].toString())!
.add(fromJson['data'] as Map?);
2016-09-18 01:35:16 +00:00
}
2016-07-06 13:33:40 +00:00
2021-04-10 15:12:43 +00:00
if (action.eventName == authenticateAction) {
2017-06-03 18:13:38 +00:00
await handleAuth(action, socket);
2021-04-10 15:12:43 +00:00
}
2017-06-03 18:13:38 +00:00
2021-04-26 00:47:32 +00:00
if (action.eventName!.contains('::')) {
var split = action.eventName!.split('::');
2016-09-18 01:35:16 +00:00
if (split.length >= 2) {
2019-01-06 02:41:46 +00:00
if (actions.contains(split[1])) {
2018-07-10 16:54:55 +00:00
var event = await handleAction(action, socket);
2016-09-18 01:35:16 +00:00
if (event is Future) event = await event;
}
}
2016-04-29 01:19:09 +00:00
}
2016-12-23 20:57:46 +00:00
} catch (e, st) {
2021-07-10 04:32:42 +00:00
_log.severe('Invalid data');
2017-09-24 04:37:58 +00:00
catchError(e, st, socket);
}
}
void catchError(e, StackTrace st, WebSocketContext socket) {
// Send an error
if (e is AngelHttpException) {
socket.sendError(e);
2022-02-22 00:07:01 +00:00
app.logger.severe(e.message, e.error ?? e, e.stackTrace);
2017-09-24 04:37:58 +00:00
} else if (sendErrors) {
2022-02-22 00:07:01 +00:00
var err = AngelHttpException(
2017-09-24 04:37:58 +00:00
message: e.toString(), stackTrace: st, errors: [st.toString()]);
socket.sendError(err);
2022-02-22 00:07:01 +00:00
app.logger.severe(err.message, e, st);
2017-09-24 04:37:58 +00:00
} else {
2022-02-22 00:07:01 +00:00
var err = AngelHttpException();
2017-09-24 04:37:58 +00:00
socket.sendError(err);
2022-02-22 00:07:01 +00:00
app.logger.severe(e.toString(), e, st);
}
}
2016-12-23 20:57:46 +00:00
/// Transforms a [HookedServiceEvent], so that it can be broadcasted.
Future<WebSocketEvent> transformEvent(HookedServiceEvent event) async {
2021-02-21 02:47:23 +00:00
return WebSocketEvent(eventName: event.eventName, data: event.result);
}
2016-12-23 20:57:46 +00:00
/// Hooks any [HookedService]s that are not being broadcasted yet.
2021-04-10 15:12:43 +00:00
void wireAllServices(Angel app) {
for (var key in app.services.keys.where((x) {
2016-12-23 10:47:21 +00:00
return !_servicesAlreadyWired.contains(x) &&
app.services[x] is HookedService;
})) {
2018-07-10 16:54:55 +00:00
hookupService(key, app.services[key] as HookedService);
}
2016-06-27 00:42:21 +00:00
}
2018-07-10 16:54:55 +00:00
/// Configures an [Angel] instance to listen for WebSocket connections.
2017-09-24 04:37:58 +00:00
Future configureServer(Angel app) async {
2022-02-22 00:07:01 +00:00
app.container.registerSingleton(this);
2016-09-17 20:00:17 +00:00
2021-04-10 15:12:43 +00:00
if (runtimeType != AngelWebSocket) {
2022-02-22 00:07:01 +00:00
app.container.registerSingleton<AngelWebSocket>(this);
2021-04-10 15:12:43 +00:00
}
// Set up services
wireAllServices(app);
app.onService.listen((_) {
wireAllServices(app);
});
2018-11-15 16:43:51 +00:00
if (synchronizationChannel != null) {
2021-07-10 04:32:42 +00:00
synchronizationChannel?.stream
2021-04-26 00:47:32 +00:00
.listen((e) => batchEvent(e, notify: false));
2017-09-24 04:37:58 +00:00
}
2021-04-26 00:47:32 +00:00
app.shutdownHooks.add((_) => synchronizationChannel?.sink.close());
2018-07-10 16:54:55 +00:00
}
2018-07-10 16:54:55 +00:00
/// Handles an incoming [WebSocketContext].
2018-12-09 16:40:09 +00:00
Future<void> handleClient(WebSocketContext socket) async {
2021-07-10 04:32:42 +00:00
var origin = socket.request.headers?.value('origin');
2021-09-29 07:40:27 +00:00
if (allowedOrigins.isNotEmpty && !allowedOrigins.contains(origin)) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.forbidden(
2018-12-09 16:40:09 +00:00
message:
'WebSocket connections are not allowed from the origin "$origin".');
}
2017-09-24 04:37:58 +00:00
_clients.add(socket);
await handleConnect(socket);
2017-01-29 20:02:19 +00:00
2017-09-24 04:37:58 +00:00
_onConnection.add(socket);
2016-09-17 20:00:17 +00:00
2021-07-10 04:32:42 +00:00
socket.request.container?.registerSingleton<WebSocketContext>(socket);
2017-09-24 04:37:58 +00:00
2018-07-10 16:54:55 +00:00
socket.channel.stream.listen(
2018-08-28 14:17:14 +00:00
(data) {
2016-12-23 20:57:46 +00:00
_onData.add(data);
handleData(socket, data);
2017-09-24 04:37:58 +00:00
},
onDone: () {
2016-09-18 03:16:51 +00:00
_onDisconnect.add(socket);
2018-07-10 16:54:55 +00:00
_clients.remove(socket);
2017-09-24 04:37:58 +00:00
},
onError: (e) {
2016-09-18 03:16:51 +00:00
_onDisconnect.add(socket);
2018-07-10 16:54:55 +00:00
_clients.remove(socket);
2017-09-24 04:37:58 +00:00
},
cancelOnError: true,
);
2018-07-10 16:54:55 +00:00
}
/// Handles an incoming HTTP request.
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
2018-08-28 14:17:14 +00:00
if (req is HttpRequestContext && res is HttpResponseContext) {
2021-04-26 00:47:32 +00:00
if (!WebSocketTransformer.isUpgradeRequest(req.rawRequest!)) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest();
2021-04-10 15:12:43 +00:00
}
res.detach();
2021-04-26 00:47:32 +00:00
var ws = await WebSocketTransformer.upgrade(req.rawRequest!);
2021-02-21 02:47:23 +00:00
var channel = IOWebSocketChannel(ws);
var socket = WebSocketContext(channel, req, res);
2019-05-01 22:58:47 +00:00
scheduleMicrotask(() => handleClient(socket));
2018-07-10 16:54:55 +00:00
return false;
2018-12-09 16:40:09 +00:00
} else if (req is Http2RequestContext && res is Http2ResponseContext) {
var connection =
2021-07-10 04:32:42 +00:00
req.headers?['connection']?.map((s) => s.toLowerCase().trim());
var upgrade = req.headers?.value('upgrade')?.toLowerCase();
var version = req.headers?.value('sec-websocket-version');
var key = req.headers?.value('sec-websocket-key');
var protocol = req.headers?.value('sec-websocket-protocol');
2018-12-09 16:40:09 +00:00
if (connection == null) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'Missing `connection` header.');
} else if (!connection.contains('upgrade')) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'Missing "upgrade" in `connection` header.');
} else if (upgrade != 'websocket') {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'The `upgrade` header must equal "websocket".');
} else if (version != '13') {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'The `sec-websocket-version` header must equal "13".');
} else if (key == null) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'Missing `sec-websocket-key` header.');
} else if (protocol != null &&
2021-09-29 07:40:27 +00:00
allowedProtocols.isNotEmpty &&
!allowedProtocols.contains(protocol)) {
2021-02-21 02:47:23 +00:00
throw AngelHttpException.badRequest(
2018-12-09 16:40:09 +00:00
message: 'Disallowed `sec-websocket-protocol` header "$protocol".');
} else {
var stream = res.detach();
2021-02-21 02:47:23 +00:00
var ctrl = StreamChannelController<List<int>>();
2018-12-09 16:40:09 +00:00
ctrl.local.stream.listen((buf) {
stream.sendData(buf);
}, onDone: () {
stream.outgoingMessages.close();
});
if (req.hasParsedBody) {
2019-05-01 22:58:47 +00:00
await ctrl.local.sink.close();
2018-12-09 16:40:09 +00:00
} else {
2019-05-01 22:58:47 +00:00
await req.body.pipe(ctrl.local.sink);
2018-12-09 16:40:09 +00:00
}
var sink = utf8.encoder.startChunkedConversion(ctrl.foreign.sink);
2021-04-10 15:12:43 +00:00
sink.add('HTTP/1.1 101 Switching Protocols\r\n'
'Upgrade: websocket\r\n'
'Connection: Upgrade\r\n'
'Sec-WebSocket-Accept: ${WebSocketChannel.signKey(key)}\r\n');
if (protocol != null) sink.add('Sec-WebSocket-Protocol: $protocol\r\n');
sink.add('\r\n');
2018-12-09 16:40:09 +00:00
2021-02-21 02:47:23 +00:00
var ws = WebSocketChannel(ctrl.foreign);
var socket = WebSocketContext(ws, req, res);
2019-05-01 22:58:47 +00:00
scheduleMicrotask(() => handleClient(socket));
2018-12-09 16:40:09 +00:00
return false;
}
2018-07-10 16:54:55 +00:00
} else {
2021-02-21 02:47:23 +00:00
throw ArgumentError(
2018-12-09 16:40:09 +00:00
'Not an HTTP/1.1 or HTTP/2 RequestContext+ResponseContext pair: $req, $res');
2018-07-10 16:54:55 +00:00
}
}
}