From bc079d37353d94d827290914523b2757fb392ff9 Mon Sep 17 00:00:00 2001 From: Tobe O Date: Thu, 8 Nov 2018 10:09:32 -0500 Subject: [PATCH] 1.0.0 --- .gitignore | 1 + README.md | 31 ++++++++++ analysis_options.yaml | 3 +- example/main.dart | 21 +++---- example/pretty_logging.dart | 15 ++--- lib/angel_eventsource.dart | 2 +- lib/server.dart | 118 ++++++++---------------------------- pubspec.yaml | 28 +++++---- tool/build.dart | 4 -- tool/build_actions.dart | 15 ----- tool/watch.dart | 4 -- 11 files changed, 91 insertions(+), 151 deletions(-) delete mode 100644 tool/build.dart delete mode 100644 tool/build_actions.dart delete mode 100644 tool/watch.dart diff --git a/.gitignore b/.gitignore index b4d6e266..dbce896b 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,4 @@ pubspec.lock # Directory created by dartdoc # If you don't generate documentation locally you can remove this line. doc/api/ +.dart_tool \ No newline at end of file diff --git a/README.md b/README.md index 517ec5d3..ea437ace 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,33 @@ # eventsource Server-sent Events (SSE) plugin for Angel. + +## Installation +In your `pubspec.yaml`: + +```yaml +dependencies: + angel_eventsource: ^1.0.0 +``` + +## Usage +SSE and WebSockets are somewhat similar in that they allow pushing of events from server +to client. SSE is not bi-directional, but the same abstractions used for WebSockets can be +applied to SSE easily. + +For this reason, the `AngelEventSourcePublisher` class is a simple adapter that +hands control of SSE requests to an existing `AngelWebSocket` driver. + +So, using this is pretty straightforward. You can dispatch events +via WebSocket as per usual, and have them propagated to SSE clients +as well. + +```dart +var app = new Angel(); +var ws = new AngelWebSocket(app); +var events = new AngelEventSourcePublisher(ws); + +await app.configure(ws.configureServer); + +app.all('/ws', ws.handleRequest); +app.get('/events', events.handleRequest); +``` \ No newline at end of file diff --git a/analysis_options.yaml b/analysis_options.yaml index 518eb901..eae1e42a 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -1,2 +1,3 @@ analyzer: - strong-mode: true \ No newline at end of file + strong-mode: + implicit-casts: false \ No newline at end of file diff --git a/example/main.dart b/example/main.dart index e935acb6..f208544f 100644 --- a/example/main.dart +++ b/example/main.dart @@ -1,25 +1,26 @@ -import 'dart:convert'; -import 'dart:io'; import 'package:angel_eventsource/server.dart'; import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/http.dart'; +import 'package:angel_websocket/server.dart'; import 'package:eventsource/eventsource.dart'; -import 'package:eventsource/publisher.dart'; import 'package:logging/logging.dart'; import 'pretty_logging.dart'; main() async { var app = new Angel(); + var ws = new AngelWebSocket(app); + var events = new AngelEventSourcePublisher(ws); + + await app.configure(ws.configureServer); app.use('/api/todos', new MapService()); - - var publisher = new AngelEventSourcePublisher(new EventSourcePublisher()); - await app.configure(publisher.configureServer); - - app.get('/sse', publisher.handleRequest); + app.all('/ws', ws.handleRequest); + app.get('/events', events.handleRequest); app.logger = new Logger('angel_eventsource')..onRecord.listen(prettyLog); - var server = await app.startServer('127.0.0.1', 3000); + var http = new AngelHttp(app); + var server = await http.startServer('127.0.0.1', 3000); var url = Uri.parse('http://${server.address.address}:${server.port}'); print('Listening at $url'); @@ -41,11 +42,9 @@ main() async { rs.transform(UTF8.decoder).transform(const LineSplitter()).listen(print); */ - var eventSource = await EventSource.connect(url); await for (var event in eventSource) { print(event.data); } - } diff --git a/example/pretty_logging.dart b/example/pretty_logging.dart index 50b25765..883b979b 100644 --- a/example/pretty_logging.dart +++ b/example/pretty_logging.dart @@ -2,16 +2,14 @@ import 'package:console/console.dart'; import 'package:logging/logging.dart'; /// Prints the contents of a [LogRecord] with pretty colors. - prettyLog(LogRecord record) async { +prettyLog(LogRecord record) async { var pen = new TextPen(); chooseLogColor(pen.reset(), record.level); pen(record.toString()); - - if (record.error != null) - pen(record.error.toString()); - if (record.stackTrace != null) - pen(record.stackTrace.toString()); - + + if (record.error != null) pen(record.error.toString()); + if (record.stackTrace != null) pen(record.stackTrace.toString()); + pen(); } @@ -27,6 +25,5 @@ void chooseLogColor(TextPen pen, Level level) { pen.magenta(); else if (level == Level.FINER) pen.blue(); - else if (level == Level.FINEST) - pen.darkBlue(); + else if (level == Level.FINEST) pen.darkBlue(); } diff --git a/lib/angel_eventsource.dart b/lib/angel_eventsource.dart index 1507ea3d..1071f8fc 100644 --- a/lib/angel_eventsource.dart +++ b/lib/angel_eventsource.dart @@ -1 +1 @@ -export 'package:angel_websocket/angel_websocket.dart'; \ No newline at end of file +export 'package:angel_websocket/angel_websocket.dart'; diff --git a/lib/server.dart b/lib/server.dart index b8fd9c3a..ea6c4e19 100644 --- a/lib/server.dart +++ b/lib/server.dart @@ -1,119 +1,51 @@ import 'dart:async'; import 'package:angel_framework/angel_framework.dart'; -import 'package:angel_framework/hooks.dart' as hooks; import 'package:angel_websocket/server.dart'; import 'package:eventsource/eventsource.dart'; import 'package:eventsource/src/encoder.dart'; import 'package:eventsource/publisher.dart'; -import 'package:json_god/json_god.dart' as god; +import 'package:stream_channel/stream_channel.dart'; class AngelEventSourcePublisher { - final EventSourcePublisher eventSourcePublisher; + final AngelWebSocket webSocketDriver; - /// Used to notify other nodes of an event's firing. Good for scaled applications. - final WebSocketSynchronizer synchronizer; + final String channel; - /// Serializes a [WebSocketEvent] to JSON. - /// - /// Defaults to [god.serialize]. - final String Function(WebSocketEvent) serializer; int _count = 0; - AngelEventSourcePublisher(this.eventSourcePublisher, - {this.synchronizer, this.serializer}); + AngelEventSourcePublisher(this.webSocketDriver, {this.channel: ''}); - Future configureServer(Angel app) async { - await app.configure(hooks.hookAllServices((service) { - if (service is HookedService) { - var path = - app.services.keys.firstWhere((p) => app.services[p] == service); - - service.after([ - HookedServiceEvent.created, - HookedServiceEvent.modified, - HookedServiceEvent.updated, - HookedServiceEvent.removed, - ], (e) { - var event = new WebSocketEvent( - eventName: '${path.toString()}::${e.eventName}', - data: e.result, - ); - - _filter(RequestContext req, ResponseContext res) { - if (e.service.configuration.containsKey('sse:filter')) - return e.service.configuration['sse:filter'](e, req, res); - else if (e.params != null && e.params.containsKey('sse:filter')) - return e.params['sse:filter'](e, req, res); - else - return true; - } - - var canSend = _filter(e.request, e.response); - - if (canSend) { - batchEvent(event, e.request.properties['channel'] ?? ''); - } - }); - } - })); - - if (synchronizer != null) { - var sub = synchronizer.stream.listen((e) => batchEvent(e, '')); - app.shutdownHooks.add((_) async { - sub.cancel(); - }); - } - } - - Future batchEvent(WebSocketEvent event, String channel) async { - eventSourcePublisher.add( - new Event( - id: (_count++).toString(), - data: (serializer ?? god.serialize)(event), - ), - channels: [channel], - ); - } - - Future handleRequest(RequestContext req, ResponseContext res) async { + Future handleRequest(RequestContext req, ResponseContext res) async { if (!req.accepts('text/event-stream', strict: false)) throw new AngelHttpException.badRequest(); - res - ..headers.addAll({ - 'cache-control': 'no-cache, no-store, must-revalidate', - 'content-type': 'text/event-stream', - 'connection': 'keep-alive', - }) - ..willCloseItself = true - ..end(); + res.headers.addAll({ + 'cache-control': 'no-cache, no-store, must-revalidate', + 'content-type': 'text/event-stream', + 'connection': 'keep-alive', + }); var acceptsGzip = (req.headers['accept-encoding']?.contains('gzip') == true); - if (acceptsGzip) res.io.headers.set('content-encoding', 'gzip'); - res.headers.forEach(res.io.headers.set); - - var sock = res.io ?? await res.io.detachSocket(); - sock.flush(); + if (acceptsGzip) res.headers['content-encoding'] = 'gzip'; var eventSink = new EventSourceEncoder(compressed: acceptsGzip) - .startChunkedConversion(sock); + .startChunkedConversion(res); - eventSourcePublisher.newSubscription( - onEvent: (e) { - try { - eventSink.add(e); - sock.flush(); - } catch (_) { - // Ignore disconnect - } - }, - onClose: eventSink.close, - channel: req.properties['channel'] ?? '', - lastEventId: req.headers.value('last-event-id'), - ); + // Listen for events. + var ctrl = new StreamChannelController(); - return false; + // Incoming events are strings, and should be sent via the eventSink. + ctrl.local.stream.cast().listen((data) { + eventSink.add(new Event( + id: (_count++).toString(), + data: data, + )); + }); + + // Create a new WebSocketContext, and hand it off to the driver. + var socket = new WebSocketContext(ctrl.foreign, req, res); + return await webSocketDriver.handleClient(socket); } } diff --git a/pubspec.yaml b/pubspec.yaml index d2427746..69fecedd 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,14 +1,16 @@ -name: "angel_eventsource" -dependencies: - angel_client: "^1.0.0" - angel_framework: "^1.0.0-dev" - angel_model: "^1.0.0" - angel_serialize: "^1.0.0-alpha" - angel_websocket: "^1.0.0" - eventsource: "^0.1.0" - json_god: ^2.0.0-beta +name: angel_eventsource +version: 1.0.0 +description: Server-sent Events (SSE) plugin for Angel. +homepage: https://github.com/angel-dart/eventsource +author: Tobe O +environment: + sdk: ">=2.0.0-dev <3.0.0" +dependencies: + angel_framework: ^2.0.0-alpha + angel_websocket: ^2.0.0-alpha + eventsource: ^0.2.0 + stream_channel: ^1.0.0 dev_dependencies: - angel_serialize_generator: "^1.0.0-alpha" - build_runner: "^0.5.0" - console: "^2.2.4" - test: "^0.12.0" + console: ^3.0.0 + logging: + test: ^1.0.0 diff --git a/tool/build.dart b/tool/build.dart deleted file mode 100644 index c1f9fd61..00000000 --- a/tool/build.dart +++ /dev/null @@ -1,4 +0,0 @@ -import 'package:build_runner/build_runner.dart'; -import 'build_actions.dart'; - -main() => build(buildActions, deleteFilesByDefault: true); diff --git a/tool/build_actions.dart b/tool/build_actions.dart deleted file mode 100644 index 0df8b123..00000000 --- a/tool/build_actions.dart +++ /dev/null @@ -1,15 +0,0 @@ -import 'package:angel_serialize_generator/angel_serialize_generator.dart'; -import 'package:build_runner/build_runner.dart'; -import 'package:source_gen/source_gen.dart'; - -final List buildActions = [ - new BuildAction( - new PartBuilder([ - const JsonModelGenerator(autoIdAndDateFields: false), - ]), - 'angel_eventsource', - inputs: const [ - 'lib/angel_eventsource.dart', - ], - ), -]; diff --git a/tool/watch.dart b/tool/watch.dart deleted file mode 100644 index 1435a703..00000000 --- a/tool/watch.dart +++ /dev/null @@ -1,4 +0,0 @@ -import 'package:build_runner/build_runner.dart'; -import 'build_actions.dart'; - -main() => watch(buildActions, deleteFilesByDefault: true);