1.0.0
This commit is contained in:
parent
d6d56f1f68
commit
bc079d3735
11 changed files with 91 additions and 151 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -62,3 +62,4 @@ pubspec.lock
|
||||||
# Directory created by dartdoc
|
# Directory created by dartdoc
|
||||||
# If you don't generate documentation locally you can remove this line.
|
# If you don't generate documentation locally you can remove this line.
|
||||||
doc/api/
|
doc/api/
|
||||||
|
.dart_tool
|
31
README.md
31
README.md
|
@ -1,2 +1,33 @@
|
||||||
# eventsource
|
# eventsource
|
||||||
Server-sent Events (SSE) plugin for Angel.
|
Server-sent Events (SSE) plugin for Angel.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
In your `pubspec.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
dependencies:
|
||||||
|
angel_eventsource: ^1.0.0
|
||||||
|
```
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
SSE and WebSockets are somewhat similar in that they allow pushing of events from server
|
||||||
|
to client. SSE is not bi-directional, but the same abstractions used for WebSockets can be
|
||||||
|
applied to SSE easily.
|
||||||
|
|
||||||
|
For this reason, the `AngelEventSourcePublisher` class is a simple adapter that
|
||||||
|
hands control of SSE requests to an existing `AngelWebSocket` driver.
|
||||||
|
|
||||||
|
So, using this is pretty straightforward. You can dispatch events
|
||||||
|
via WebSocket as per usual, and have them propagated to SSE clients
|
||||||
|
as well.
|
||||||
|
|
||||||
|
```dart
|
||||||
|
var app = new Angel();
|
||||||
|
var ws = new AngelWebSocket(app);
|
||||||
|
var events = new AngelEventSourcePublisher(ws);
|
||||||
|
|
||||||
|
await app.configure(ws.configureServer);
|
||||||
|
|
||||||
|
app.all('/ws', ws.handleRequest);
|
||||||
|
app.get('/events', events.handleRequest);
|
||||||
|
```
|
|
@ -1,2 +1,3 @@
|
||||||
analyzer:
|
analyzer:
|
||||||
strong-mode: true
|
strong-mode:
|
||||||
|
implicit-casts: false
|
|
@ -1,25 +1,26 @@
|
||||||
import 'dart:convert';
|
|
||||||
import 'dart:io';
|
|
||||||
import 'package:angel_eventsource/server.dart';
|
import 'package:angel_eventsource/server.dart';
|
||||||
import 'package:angel_framework/angel_framework.dart';
|
import 'package:angel_framework/angel_framework.dart';
|
||||||
|
import 'package:angel_framework/http.dart';
|
||||||
|
import 'package:angel_websocket/server.dart';
|
||||||
import 'package:eventsource/eventsource.dart';
|
import 'package:eventsource/eventsource.dart';
|
||||||
import 'package:eventsource/publisher.dart';
|
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'pretty_logging.dart';
|
import 'pretty_logging.dart';
|
||||||
|
|
||||||
main() async {
|
main() async {
|
||||||
var app = new Angel();
|
var app = new Angel();
|
||||||
|
var ws = new AngelWebSocket(app);
|
||||||
|
var events = new AngelEventSourcePublisher(ws);
|
||||||
|
|
||||||
|
await app.configure(ws.configureServer);
|
||||||
|
|
||||||
app.use('/api/todos', new MapService());
|
app.use('/api/todos', new MapService());
|
||||||
|
app.all('/ws', ws.handleRequest);
|
||||||
var publisher = new AngelEventSourcePublisher(new EventSourcePublisher());
|
app.get('/events', events.handleRequest);
|
||||||
await app.configure(publisher.configureServer);
|
|
||||||
|
|
||||||
app.get('/sse', publisher.handleRequest);
|
|
||||||
|
|
||||||
app.logger = new Logger('angel_eventsource')..onRecord.listen(prettyLog);
|
app.logger = new Logger('angel_eventsource')..onRecord.listen(prettyLog);
|
||||||
|
|
||||||
var server = await app.startServer('127.0.0.1', 3000);
|
var http = new AngelHttp(app);
|
||||||
|
var server = await http.startServer('127.0.0.1', 3000);
|
||||||
var url = Uri.parse('http://${server.address.address}:${server.port}');
|
var url = Uri.parse('http://${server.address.address}:${server.port}');
|
||||||
print('Listening at $url');
|
print('Listening at $url');
|
||||||
|
|
||||||
|
@ -41,11 +42,9 @@ main() async {
|
||||||
rs.transform(UTF8.decoder).transform(const LineSplitter()).listen(print);
|
rs.transform(UTF8.decoder).transform(const LineSplitter()).listen(print);
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
var eventSource = await EventSource.connect(url);
|
var eventSource = await EventSource.connect(url);
|
||||||
|
|
||||||
await for (var event in eventSource) {
|
await for (var event in eventSource) {
|
||||||
print(event.data);
|
print(event.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,15 +2,13 @@ import 'package:console/console.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
/// Prints the contents of a [LogRecord] with pretty colors.
|
/// Prints the contents of a [LogRecord] with pretty colors.
|
||||||
prettyLog(LogRecord record) async {
|
prettyLog(LogRecord record) async {
|
||||||
var pen = new TextPen();
|
var pen = new TextPen();
|
||||||
chooseLogColor(pen.reset(), record.level);
|
chooseLogColor(pen.reset(), record.level);
|
||||||
pen(record.toString());
|
pen(record.toString());
|
||||||
|
|
||||||
if (record.error != null)
|
if (record.error != null) pen(record.error.toString());
|
||||||
pen(record.error.toString());
|
if (record.stackTrace != null) pen(record.stackTrace.toString());
|
||||||
if (record.stackTrace != null)
|
|
||||||
pen(record.stackTrace.toString());
|
|
||||||
|
|
||||||
pen();
|
pen();
|
||||||
}
|
}
|
||||||
|
@ -27,6 +25,5 @@ void chooseLogColor(TextPen pen, Level level) {
|
||||||
pen.magenta();
|
pen.magenta();
|
||||||
else if (level == Level.FINER)
|
else if (level == Level.FINER)
|
||||||
pen.blue();
|
pen.blue();
|
||||||
else if (level == Level.FINEST)
|
else if (level == Level.FINEST) pen.darkBlue();
|
||||||
pen.darkBlue();
|
|
||||||
}
|
}
|
||||||
|
|
118
lib/server.dart
118
lib/server.dart
|
@ -1,119 +1,51 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'package:angel_framework/angel_framework.dart';
|
import 'package:angel_framework/angel_framework.dart';
|
||||||
import 'package:angel_framework/hooks.dart' as hooks;
|
|
||||||
import 'package:angel_websocket/server.dart';
|
import 'package:angel_websocket/server.dart';
|
||||||
import 'package:eventsource/eventsource.dart';
|
import 'package:eventsource/eventsource.dart';
|
||||||
import 'package:eventsource/src/encoder.dart';
|
import 'package:eventsource/src/encoder.dart';
|
||||||
import 'package:eventsource/publisher.dart';
|
import 'package:eventsource/publisher.dart';
|
||||||
import 'package:json_god/json_god.dart' as god;
|
import 'package:stream_channel/stream_channel.dart';
|
||||||
|
|
||||||
class AngelEventSourcePublisher {
|
class AngelEventSourcePublisher {
|
||||||
final EventSourcePublisher eventSourcePublisher;
|
final AngelWebSocket webSocketDriver;
|
||||||
|
|
||||||
/// Used to notify other nodes of an event's firing. Good for scaled applications.
|
final String channel;
|
||||||
final WebSocketSynchronizer synchronizer;
|
|
||||||
|
|
||||||
/// Serializes a [WebSocketEvent] to JSON.
|
|
||||||
///
|
|
||||||
/// Defaults to [god.serialize].
|
|
||||||
final String Function(WebSocketEvent) serializer;
|
|
||||||
int _count = 0;
|
int _count = 0;
|
||||||
|
|
||||||
AngelEventSourcePublisher(this.eventSourcePublisher,
|
AngelEventSourcePublisher(this.webSocketDriver, {this.channel: ''});
|
||||||
{this.synchronizer, this.serializer});
|
|
||||||
|
|
||||||
Future configureServer(Angel app) async {
|
Future handleRequest(RequestContext req, ResponseContext res) async {
|
||||||
await app.configure(hooks.hookAllServices((service) {
|
|
||||||
if (service is HookedService) {
|
|
||||||
var path =
|
|
||||||
app.services.keys.firstWhere((p) => app.services[p] == service);
|
|
||||||
|
|
||||||
service.after([
|
|
||||||
HookedServiceEvent.created,
|
|
||||||
HookedServiceEvent.modified,
|
|
||||||
HookedServiceEvent.updated,
|
|
||||||
HookedServiceEvent.removed,
|
|
||||||
], (e) {
|
|
||||||
var event = new WebSocketEvent(
|
|
||||||
eventName: '${path.toString()}::${e.eventName}',
|
|
||||||
data: e.result,
|
|
||||||
);
|
|
||||||
|
|
||||||
_filter(RequestContext req, ResponseContext res) {
|
|
||||||
if (e.service.configuration.containsKey('sse:filter'))
|
|
||||||
return e.service.configuration['sse:filter'](e, req, res);
|
|
||||||
else if (e.params != null && e.params.containsKey('sse:filter'))
|
|
||||||
return e.params['sse:filter'](e, req, res);
|
|
||||||
else
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
var canSend = _filter(e.request, e.response);
|
|
||||||
|
|
||||||
if (canSend) {
|
|
||||||
batchEvent(event, e.request.properties['channel'] ?? '');
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
if (synchronizer != null) {
|
|
||||||
var sub = synchronizer.stream.listen((e) => batchEvent(e, ''));
|
|
||||||
app.shutdownHooks.add((_) async {
|
|
||||||
sub.cancel();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future batchEvent(WebSocketEvent event, String channel) async {
|
|
||||||
eventSourcePublisher.add(
|
|
||||||
new Event(
|
|
||||||
id: (_count++).toString(),
|
|
||||||
data: (serializer ?? god.serialize)(event),
|
|
||||||
),
|
|
||||||
channels: [channel],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<bool> handleRequest(RequestContext req, ResponseContext res) async {
|
|
||||||
if (!req.accepts('text/event-stream', strict: false))
|
if (!req.accepts('text/event-stream', strict: false))
|
||||||
throw new AngelHttpException.badRequest();
|
throw new AngelHttpException.badRequest();
|
||||||
|
|
||||||
res
|
res.headers.addAll({
|
||||||
..headers.addAll({
|
'cache-control': 'no-cache, no-store, must-revalidate',
|
||||||
'cache-control': 'no-cache, no-store, must-revalidate',
|
'content-type': 'text/event-stream',
|
||||||
'content-type': 'text/event-stream',
|
'connection': 'keep-alive',
|
||||||
'connection': 'keep-alive',
|
});
|
||||||
})
|
|
||||||
..willCloseItself = true
|
|
||||||
..end();
|
|
||||||
|
|
||||||
var acceptsGzip =
|
var acceptsGzip =
|
||||||
(req.headers['accept-encoding']?.contains('gzip') == true);
|
(req.headers['accept-encoding']?.contains('gzip') == true);
|
||||||
|
|
||||||
if (acceptsGzip) res.io.headers.set('content-encoding', 'gzip');
|
if (acceptsGzip) res.headers['content-encoding'] = 'gzip';
|
||||||
res.headers.forEach(res.io.headers.set);
|
|
||||||
|
|
||||||
var sock = res.io ?? await res.io.detachSocket();
|
|
||||||
sock.flush();
|
|
||||||
|
|
||||||
var eventSink = new EventSourceEncoder(compressed: acceptsGzip)
|
var eventSink = new EventSourceEncoder(compressed: acceptsGzip)
|
||||||
.startChunkedConversion(sock);
|
.startChunkedConversion(res);
|
||||||
|
|
||||||
eventSourcePublisher.newSubscription(
|
// Listen for events.
|
||||||
onEvent: (e) {
|
var ctrl = new StreamChannelController();
|
||||||
try {
|
|
||||||
eventSink.add(e);
|
|
||||||
sock.flush();
|
|
||||||
} catch (_) {
|
|
||||||
// Ignore disconnect
|
|
||||||
}
|
|
||||||
},
|
|
||||||
onClose: eventSink.close,
|
|
||||||
channel: req.properties['channel'] ?? '',
|
|
||||||
lastEventId: req.headers.value('last-event-id'),
|
|
||||||
);
|
|
||||||
|
|
||||||
return false;
|
// Incoming events are strings, and should be sent via the eventSink.
|
||||||
|
ctrl.local.stream.cast<String>().listen((data) {
|
||||||
|
eventSink.add(new Event(
|
||||||
|
id: (_count++).toString(),
|
||||||
|
data: data,
|
||||||
|
));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create a new WebSocketContext, and hand it off to the driver.
|
||||||
|
var socket = new WebSocketContext(ctrl.foreign, req, res);
|
||||||
|
return await webSocketDriver.handleClient(socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
26
pubspec.yaml
26
pubspec.yaml
|
@ -1,14 +1,16 @@
|
||||||
name: "angel_eventsource"
|
name: angel_eventsource
|
||||||
|
version: 1.0.0
|
||||||
|
description: Server-sent Events (SSE) plugin for Angel.
|
||||||
|
homepage: https://github.com/angel-dart/eventsource
|
||||||
|
author: Tobe O <thosakwe@gmail.com>
|
||||||
|
environment:
|
||||||
|
sdk: ">=2.0.0-dev <3.0.0"
|
||||||
dependencies:
|
dependencies:
|
||||||
angel_client: "^1.0.0"
|
angel_framework: ^2.0.0-alpha
|
||||||
angel_framework: "^1.0.0-dev"
|
angel_websocket: ^2.0.0-alpha
|
||||||
angel_model: "^1.0.0"
|
eventsource: ^0.2.0
|
||||||
angel_serialize: "^1.0.0-alpha"
|
stream_channel: ^1.0.0
|
||||||
angel_websocket: "^1.0.0"
|
|
||||||
eventsource: "^0.1.0"
|
|
||||||
json_god: ^2.0.0-beta
|
|
||||||
dev_dependencies:
|
dev_dependencies:
|
||||||
angel_serialize_generator: "^1.0.0-alpha"
|
console: ^3.0.0
|
||||||
build_runner: "^0.5.0"
|
logging:
|
||||||
console: "^2.2.4"
|
test: ^1.0.0
|
||||||
test: "^0.12.0"
|
|
||||||
|
|
|
@ -1,4 +0,0 @@
|
||||||
import 'package:build_runner/build_runner.dart';
|
|
||||||
import 'build_actions.dart';
|
|
||||||
|
|
||||||
main() => build(buildActions, deleteFilesByDefault: true);
|
|
|
@ -1,15 +0,0 @@
|
||||||
import 'package:angel_serialize_generator/angel_serialize_generator.dart';
|
|
||||||
import 'package:build_runner/build_runner.dart';
|
|
||||||
import 'package:source_gen/source_gen.dart';
|
|
||||||
|
|
||||||
final List<BuildAction> buildActions = [
|
|
||||||
new BuildAction(
|
|
||||||
new PartBuilder([
|
|
||||||
const JsonModelGenerator(autoIdAndDateFields: false),
|
|
||||||
]),
|
|
||||||
'angel_eventsource',
|
|
||||||
inputs: const [
|
|
||||||
'lib/angel_eventsource.dart',
|
|
||||||
],
|
|
||||||
),
|
|
||||||
];
|
|
|
@ -1,4 +0,0 @@
|
||||||
import 'package:build_runner/build_runner.dart';
|
|
||||||
import 'build_actions.dart';
|
|
||||||
|
|
||||||
main() => watch(buildActions, deleteFilesByDefault: true);
|
|
Loading…
Reference in a new issue