part of angel3_websocket.server; /// Represents a WebSocket session, with the original /// [RequestContext] and [ResponseContext] attached. class WebSocketContext { /// Use this to listen for events. _WebSocketEventTable on = _WebSocketEventTable(); /// The underlying [StreamChannel]. final StreamChannel channel; /// The original [RequestContext]. final RequestContext request; /// The original [ResponseContext]. final ResponseContext response; final StreamController _onAction = StreamController(); final StreamController _onAuthenticated = StreamController(); final StreamController _onClose = StreamController(); final StreamController _onData = StreamController(); /// Fired on any [WebSocketAction]; Stream get onAction => _onAction.stream; /// Fired when the user authenticates. Stream get onAuthenticated => _onAuthenticated.stream; /// Fired once the underlying [WebSocket] closes. Stream get onClose => _onClose.stream; /// Fired when any data is sent through [channel]. Stream get onData => _onData.stream; WebSocketContext(this.channel, this.request, this.response); /// Closes the underlying [StreamChannel]. Future close() async { scheduleMicrotask(() async { await channel.sink.close(); await _onAction.close(); await _onAuthenticated.close(); await _onData.close(); _onClose.add(null); await _onClose.close(); }); } /// Sends an arbitrary [WebSocketEvent]; void send(String eventName, data) { channel.sink.add( json.encode(WebSocketEvent(eventName: eventName, data: data).toJson())); } /// Sends an error event. void sendError(AngelHttpException error) => send(errorEvent, error.toJson()); } class _WebSocketEventTable { final Map> _handlers = {}; StreamController? _getStreamForEvent(String eventName) { if (!_handlers.containsKey(eventName)) { _handlers[eventName] = StreamController(); } return _handlers[eventName]; } Stream operator [](String key) => _getStreamForEvent(key)!.stream; }