diff --git a/README.md b/README.md index d95ad8b6..50eba889 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,28 @@ It is injected into your application's `Container` as `pub_sub.Client`, so you can use it as follows: ```dart +// Use the injected `pub_sub.Client` to send messages. +var client = app.container.make(); + +// We can listen for an event to perform some behavior. +// +// Here, we use message passing to synchronize some common state. +var onGreetingChanged = await client.subscribe('user_upgraded'); +onGreetingChanged + .cast() + .listen((user) { + // Do something... + }); +``` + +## Run-time Metadata +At run-time, you may want to know information about the currently-running instance, +for example, which number instance. For this, the `InstanceInfo` class is injected +into each instance: + +```dart +var instanceInfo = app.container.make(); +print('This is instance #${instanceInfo.id}'); ``` ## Command-line Options diff --git a/example/main.dart b/example/main.dart index 11d13736..7fc28738 100644 --- a/example/main.dart +++ b/example/main.dart @@ -2,12 +2,37 @@ import 'dart:async'; import 'dart:isolate'; import 'package:angel_framework/angel_framework.dart'; import 'package:angel_production/angel_production.dart'; +import 'package:pub_sub/pub_sub.dart' as pub_sub; main(List args) => new Runner('example', configureServer).run(args); Future configureServer(Angel app) async { + // Use the injected `pub_sub.Client` to send messages. + var client = app.container.make(); + var greeting = 'Hello! This is the default greeting.'; + + // We can listen for an event to perform some behavior. + // + // Here, we use message passing to synchronize some common state. + var onGreetingChanged = await client.subscribe('greeting_changed'); + onGreetingChanged + .cast() + .listen((newGreeting) => greeting = newGreeting); + + // Add some routes... app.get('/', (req, res) => 'Hello, production world!'); + // Create some routes to demonstrate message passing. + app.get('/greeting', (req, res) => greeting); + + // This route will push a new value for `greeting`. + app.get('/change_greeting/:newGreeting', (req, res) { + greeting = req.params['newGreeting'] as String; + client.publish('greeting_changed', greeting); + return 'Changed greeting -> $greeting'; + }); + + // The `Runner` helps with fault tolerance. app.get('/crash', (req, res) { // We'll crash this instance deliberately, but the Runner will auto-respawn for us. new Timer(const Duration(seconds: 3), Isolate.current.kill); diff --git a/lib/angel_production.dart b/lib/angel_production.dart index 6488127f..1e376f25 100644 --- a/lib/angel_production.dart +++ b/lib/angel_production.dart @@ -1,2 +1,3 @@ +export 'src/instance_info.dart'; export 'src/options.dart'; export 'src/runner.dart'; \ No newline at end of file diff --git a/lib/src/instance_info.dart b/lib/src/instance_info.dart new file mode 100644 index 00000000..a19e9dfe --- /dev/null +++ b/lib/src/instance_info.dart @@ -0,0 +1,6 @@ +/// Information about the currently-running instance. +class InstanceInfo { + final int id; + + const InstanceInfo({this.id}); +} \ No newline at end of file diff --git a/lib/src/runner.dart b/lib/src/runner.dart index 3212d4ca..34c2faf8 100644 --- a/lib/src/runner.dart +++ b/lib/src/runner.dart @@ -4,9 +4,12 @@ import 'dart:isolate'; import 'package:angel_container/angel_container.dart'; import 'package:angel_framework/angel_framework.dart'; import 'package:args/args.dart'; -import 'package:logging/logging.dart'; import 'package:io/ansi.dart'; import 'package:io/io.dart'; +import 'package:logging/logging.dart'; +import 'package:pub_sub/isolate.dart' as pub_sub; +import 'package:pub_sub/pub_sub.dart' as pub_sub; +import 'instance_info.dart'; import 'options.dart'; /// A command-line utility for easier running of multiple instances of an Angel application. @@ -68,18 +71,20 @@ _ ___ | /| / / /_/ / _ /___ _ /___ /// The returned [Future] completes when the application instance exits. /// /// If respawning is enabled, the [Future] will *never* complete. - Future spawnIsolate(RunnerOptions options) { - return _spawnIsolate(new Completer(), options); + Future spawnIsolate(int id, RunnerOptions options, SendPort pubSubSendPort) { + return _spawnIsolate(id, new Completer(), options, pubSubSendPort); } - Future _spawnIsolate(Completer c, RunnerOptions options) { + Future _spawnIsolate( + int id, Completer c, RunnerOptions options, SendPort pubSubSendPort) { var onLogRecord = new ReceivePort(); var onExit = new ReceivePort(); var onError = new ReceivePort(); - var runnerArgs = new _RunnerArgs( - name, configureServer, options, reflector, onLogRecord.sendPort); + var runnerArgs = new _RunnerArgs(name, configureServer, options, reflector, + onLogRecord.sendPort, pubSubSendPort); + var argsWithId = new _RunnerArgsWithId(id, runnerArgs); - Isolate.spawn(isolateMain, runnerArgs, + Isolate.spawn(isolateMain, argsWithId, onExit: onExit.sendPort, onError: onError.sendPort, errorsAreFatal: true && false) @@ -103,9 +108,9 @@ _ ___ | /| / / /_/ / _ /___ _ /___ if (options.respawn) { handleLogRecord(new LogRecord( Level.WARNING, - 'Detected a crashed instance at ${new DateTime.now()}. Respawning immediately...', + 'Instance #$id at ${new DateTime.now()}. Respawning immediately...', runnerArgs.loggerName)); - _spawnIsolate(c, options); + _spawnIsolate(id, c, options, pubSubSendPort); } else { c.complete(); } @@ -119,6 +124,8 @@ _ ___ | /| / / /_/ / _ /___ _ /___ /// Starts a number of isolates, running identical instances of an Angel application. Future run(List args) async { + pub_sub.Server server; + try { var argResults = RunnerOptions.argParser.parse(args); var options = new RunnerOptions.fromArgResults(argResults); @@ -137,8 +144,18 @@ _ ___ | /| / / /_/ / _ /___ _ /___ print('Starting `${name}` application...'); print('Arguments: $args...\n'); - await Future.wait( - new List.generate(options.concurrency, (_) => spawnIsolate(options))); + var adapter = new pub_sub.IsolateAdapter(); + server = new pub_sub.Server([adapter]); + + // Register clients + for (int i = 0; i < Platform.numberOfProcessors; i++) { + server.registerClient(new pub_sub.ClientInfo('client$i')); + } + + server.start(); + + await Future.wait(new List.generate(options.concurrency, + (id) => spawnIsolate(id, options, adapter.receivePort.sendPort))); } on ArgParserException catch (e) { stderr ..writeln(e.message) @@ -149,10 +166,13 @@ _ ___ | /| / / /_/ / _ /___ _ /___ } catch (e) { stderr..writeln('fatal error: $e'); exitCode = 1; + } finally { + server?.close(); } } - static void isolateMain(_RunnerArgs args) { + static void isolateMain(_RunnerArgsWithId argsWithId) { + var args = argsWithId.args; hierarchicalLoggingEnabled = true; var zone = Zone.current.fork(specification: new ZoneSpecification( @@ -163,7 +183,15 @@ _ ___ | /| / / /_/ / _ /___ _ /___ )); zone.run(() async { - var app = new Angel(reflector: args.reflector); + var client = new pub_sub.IsolateClient( + 'client${argsWithId.id}', args.pubSubSendPort); + + var app = new Angel(reflector: args.reflector) + ..container.registerSingleton(client) + ..container.registerSingleton(new InstanceInfo(id: argsWithId.id)); + + app.shutdownHooks.add((_) => client.close()); + await app.configure(args.configureServer); if (app.logger == null) { @@ -177,11 +205,18 @@ _ ___ | /| / / /_/ / _ /___ _ /___ await http.startServer(args.options.hostname, args.options.port); var url = new Uri( scheme: 'http', host: server.address.address, port: server.port); - print('Listening at $url'); + print('Instance #${argsWithId.id} listening at $url'); }); } } +class _RunnerArgsWithId { + final int id; + final _RunnerArgs args; + + _RunnerArgsWithId(this.id, this.args); +} + class _RunnerArgs { final String name; @@ -191,10 +226,10 @@ class _RunnerArgs { final Reflector reflector; - final SendPort loggingSendPort; + final SendPort loggingSendPort, pubSubSendPort; _RunnerArgs(this.name, this.configureServer, this.options, this.reflector, - this.loggingSendPort); + this.loggingSendPort, this.pubSubSendPort); String get loggerName => name; }