diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..b4d6e266 --- /dev/null +++ b/.gitignore @@ -0,0 +1,64 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Dart template +# See https://www.dartlang.org/tools/private-files.html + +# Files and directories created by pub +.packages +.pub/ +build/ +# If you're building an application, you may want to check-in your pubspec.lock +pubspec.lock + +# Directory created by dartdoc +# If you don't generate documentation locally you can remove this line. +doc/api/ diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 00000000..639900d1 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..2e3efb43 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/build_dart.xml b/.idea/runConfigurations/build_dart.xml new file mode 100644 index 00000000..7bbffc56 --- /dev/null +++ b/.idea/runConfigurations/build_dart.xml @@ -0,0 +1,7 @@ + + + + diff --git a/.idea/runConfigurations/main_dart.xml b/.idea/runConfigurations/main_dart.xml new file mode 100644 index 00000000..750f7262 --- /dev/null +++ b/.idea/runConfigurations/main_dart.xml @@ -0,0 +1,7 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/watch_dart.xml b/.idea/runConfigurations/watch_dart.xml new file mode 100644 index 00000000..7cd7173c --- /dev/null +++ b/.idea/runConfigurations/watch_dart.xml @@ -0,0 +1,7 @@ + + + + diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 00000000..518eb901 --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,2 @@ +analyzer: + strong-mode: true \ No newline at end of file diff --git a/eventsource.iml b/eventsource.iml new file mode 100644 index 00000000..5a5ced28 --- /dev/null +++ b/eventsource.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/example/main.dart b/example/main.dart new file mode 100644 index 00000000..e935acb6 --- /dev/null +++ b/example/main.dart @@ -0,0 +1,51 @@ +import 'dart:convert'; +import 'dart:io'; +import 'package:angel_eventsource/server.dart'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:eventsource/eventsource.dart'; +import 'package:eventsource/publisher.dart'; +import 'package:logging/logging.dart'; +import 'pretty_logging.dart'; + +main() async { + var app = new Angel(); + + app.use('/api/todos', new MapService()); + + var publisher = new AngelEventSourcePublisher(new EventSourcePublisher()); + await app.configure(publisher.configureServer); + + app.get('/sse', publisher.handleRequest); + + app.logger = new Logger('angel_eventsource')..onRecord.listen(prettyLog); + + var server = await app.startServer('127.0.0.1', 3000); + var url = Uri.parse('http://${server.address.address}:${server.port}'); + print('Listening at $url'); + + /* + var sock = await Socket.connect(server.address, server.port); + sock + ..writeln('GET /sse HTTP/1.1') + ..writeln('Accept: text/event-stream') + ..writeln('Host: 127.0.0.1') + ..writeln() + ..flush(); + sock.transform(UTF8.decoder).transform(const LineSplitter()).listen(print); + */ + + /* + var client = new HttpClient(); + var rq = await client.openUrl('GET', url); + var rs = await rq.close(); + rs.transform(UTF8.decoder).transform(const LineSplitter()).listen(print); + */ + + + var eventSource = await EventSource.connect(url); + + await for (var event in eventSource) { + print(event.data); + } + +} diff --git a/example/pretty_logging.dart b/example/pretty_logging.dart new file mode 100644 index 00000000..50b25765 --- /dev/null +++ b/example/pretty_logging.dart @@ -0,0 +1,32 @@ +import 'package:console/console.dart'; +import 'package:logging/logging.dart'; + +/// Prints the contents of a [LogRecord] with pretty colors. + prettyLog(LogRecord record) async { + var pen = new TextPen(); + chooseLogColor(pen.reset(), record.level); + pen(record.toString()); + + if (record.error != null) + pen(record.error.toString()); + if (record.stackTrace != null) + pen(record.stackTrace.toString()); + + pen(); +} + +/// Chooses a color based on the logger [level]. +void chooseLogColor(TextPen pen, Level level) { + if (level == Level.SHOUT) + pen.darkRed(); + else if (level == Level.SEVERE) + pen.red(); + else if (level == Level.WARNING) + pen.yellow(); + else if (level == Level.INFO) + pen.magenta(); + else if (level == Level.FINER) + pen.blue(); + else if (level == Level.FINEST) + pen.darkBlue(); +} diff --git a/lib/angel_eventsource.dart b/lib/angel_eventsource.dart new file mode 100644 index 00000000..1507ea3d --- /dev/null +++ b/lib/angel_eventsource.dart @@ -0,0 +1 @@ +export 'package:angel_websocket/angel_websocket.dart'; \ No newline at end of file diff --git a/lib/server.dart b/lib/server.dart new file mode 100644 index 00000000..b8fd9c3a --- /dev/null +++ b/lib/server.dart @@ -0,0 +1,119 @@ +import 'dart:async'; +import 'package:angel_framework/angel_framework.dart'; +import 'package:angel_framework/hooks.dart' as hooks; +import 'package:angel_websocket/server.dart'; +import 'package:eventsource/eventsource.dart'; +import 'package:eventsource/src/encoder.dart'; +import 'package:eventsource/publisher.dart'; +import 'package:json_god/json_god.dart' as god; + +class AngelEventSourcePublisher { + final EventSourcePublisher eventSourcePublisher; + + /// Used to notify other nodes of an event's firing. Good for scaled applications. + final WebSocketSynchronizer synchronizer; + + /// Serializes a [WebSocketEvent] to JSON. + /// + /// Defaults to [god.serialize]. + final String Function(WebSocketEvent) serializer; + int _count = 0; + + AngelEventSourcePublisher(this.eventSourcePublisher, + {this.synchronizer, this.serializer}); + + Future configureServer(Angel app) async { + await app.configure(hooks.hookAllServices((service) { + if (service is HookedService) { + var path = + app.services.keys.firstWhere((p) => app.services[p] == service); + + service.after([ + HookedServiceEvent.created, + HookedServiceEvent.modified, + HookedServiceEvent.updated, + HookedServiceEvent.removed, + ], (e) { + var event = new WebSocketEvent( + eventName: '${path.toString()}::${e.eventName}', + data: e.result, + ); + + _filter(RequestContext req, ResponseContext res) { + if (e.service.configuration.containsKey('sse:filter')) + return e.service.configuration['sse:filter'](e, req, res); + else if (e.params != null && e.params.containsKey('sse:filter')) + return e.params['sse:filter'](e, req, res); + else + return true; + } + + var canSend = _filter(e.request, e.response); + + if (canSend) { + batchEvent(event, e.request.properties['channel'] ?? ''); + } + }); + } + })); + + if (synchronizer != null) { + var sub = synchronizer.stream.listen((e) => batchEvent(e, '')); + app.shutdownHooks.add((_) async { + sub.cancel(); + }); + } + } + + Future batchEvent(WebSocketEvent event, String channel) async { + eventSourcePublisher.add( + new Event( + id: (_count++).toString(), + data: (serializer ?? god.serialize)(event), + ), + channels: [channel], + ); + } + + Future handleRequest(RequestContext req, ResponseContext res) async { + if (!req.accepts('text/event-stream', strict: false)) + throw new AngelHttpException.badRequest(); + + res + ..headers.addAll({ + 'cache-control': 'no-cache, no-store, must-revalidate', + 'content-type': 'text/event-stream', + 'connection': 'keep-alive', + }) + ..willCloseItself = true + ..end(); + + var acceptsGzip = + (req.headers['accept-encoding']?.contains('gzip') == true); + + if (acceptsGzip) res.io.headers.set('content-encoding', 'gzip'); + res.headers.forEach(res.io.headers.set); + + var sock = res.io ?? await res.io.detachSocket(); + sock.flush(); + + var eventSink = new EventSourceEncoder(compressed: acceptsGzip) + .startChunkedConversion(sock); + + eventSourcePublisher.newSubscription( + onEvent: (e) { + try { + eventSink.add(e); + sock.flush(); + } catch (_) { + // Ignore disconnect + } + }, + onClose: eventSink.close, + channel: req.properties['channel'] ?? '', + lastEventId: req.headers.value('last-event-id'), + ); + + return false; + } +} diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 00000000..d2427746 --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,14 @@ +name: "angel_eventsource" +dependencies: + angel_client: "^1.0.0" + angel_framework: "^1.0.0-dev" + angel_model: "^1.0.0" + angel_serialize: "^1.0.0-alpha" + angel_websocket: "^1.0.0" + eventsource: "^0.1.0" + json_god: ^2.0.0-beta +dev_dependencies: + angel_serialize_generator: "^1.0.0-alpha" + build_runner: "^0.5.0" + console: "^2.2.4" + test: "^0.12.0" diff --git a/tool/build.dart b/tool/build.dart new file mode 100644 index 00000000..c1f9fd61 --- /dev/null +++ b/tool/build.dart @@ -0,0 +1,4 @@ +import 'package:build_runner/build_runner.dart'; +import 'build_actions.dart'; + +main() => build(buildActions, deleteFilesByDefault: true); diff --git a/tool/build_actions.dart b/tool/build_actions.dart new file mode 100644 index 00000000..0df8b123 --- /dev/null +++ b/tool/build_actions.dart @@ -0,0 +1,15 @@ +import 'package:angel_serialize_generator/angel_serialize_generator.dart'; +import 'package:build_runner/build_runner.dart'; +import 'package:source_gen/source_gen.dart'; + +final List buildActions = [ + new BuildAction( + new PartBuilder([ + const JsonModelGenerator(autoIdAndDateFields: false), + ]), + 'angel_eventsource', + inputs: const [ + 'lib/angel_eventsource.dart', + ], + ), +]; diff --git a/tool/watch.dart b/tool/watch.dart new file mode 100644 index 00000000..1435a703 --- /dev/null +++ b/tool/watch.dart @@ -0,0 +1,4 @@ +import 'package:build_runner/build_runner.dart'; +import 'build_actions.dart'; + +main() => watch(buildActions, deleteFilesByDefault: true);