README + add pub_sub
This commit is contained in:
parent
2d61492fff
commit
221dda89a1
5 changed files with 105 additions and 16 deletions
22
README.md
22
README.md
|
@ -50,6 +50,28 @@ It is injected into your application's `Container` as
|
||||||
`pub_sub.Client`, so you can use it as follows:
|
`pub_sub.Client`, so you can use it as follows:
|
||||||
|
|
||||||
```dart
|
```dart
|
||||||
|
// Use the injected `pub_sub.Client` to send messages.
|
||||||
|
var client = app.container.make<pub_sub.Client>();
|
||||||
|
|
||||||
|
// We can listen for an event to perform some behavior.
|
||||||
|
//
|
||||||
|
// Here, we use message passing to synchronize some common state.
|
||||||
|
var onGreetingChanged = await client.subscribe('user_upgraded');
|
||||||
|
onGreetingChanged
|
||||||
|
.cast<User>()
|
||||||
|
.listen((user) {
|
||||||
|
// Do something...
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
## Run-time Metadata
|
||||||
|
At run-time, you may want to know information about the currently-running instance,
|
||||||
|
for example, which number instance. For this, the `InstanceInfo` class is injected
|
||||||
|
into each instance:
|
||||||
|
|
||||||
|
```dart
|
||||||
|
var instanceInfo = app.container.make<InstanceInfo>();
|
||||||
|
print('This is instance #${instanceInfo.id}');
|
||||||
```
|
```
|
||||||
|
|
||||||
## Command-line Options
|
## Command-line Options
|
||||||
|
|
|
@ -2,12 +2,37 @@ import 'dart:async';
|
||||||
import 'dart:isolate';
|
import 'dart:isolate';
|
||||||
import 'package:angel_framework/angel_framework.dart';
|
import 'package:angel_framework/angel_framework.dart';
|
||||||
import 'package:angel_production/angel_production.dart';
|
import 'package:angel_production/angel_production.dart';
|
||||||
|
import 'package:pub_sub/pub_sub.dart' as pub_sub;
|
||||||
|
|
||||||
main(List<String> args) => new Runner('example', configureServer).run(args);
|
main(List<String> args) => new Runner('example', configureServer).run(args);
|
||||||
|
|
||||||
Future configureServer(Angel app) async {
|
Future configureServer(Angel app) async {
|
||||||
|
// Use the injected `pub_sub.Client` to send messages.
|
||||||
|
var client = app.container.make<pub_sub.Client>();
|
||||||
|
var greeting = 'Hello! This is the default greeting.';
|
||||||
|
|
||||||
|
// We can listen for an event to perform some behavior.
|
||||||
|
//
|
||||||
|
// Here, we use message passing to synchronize some common state.
|
||||||
|
var onGreetingChanged = await client.subscribe('greeting_changed');
|
||||||
|
onGreetingChanged
|
||||||
|
.cast<String>()
|
||||||
|
.listen((newGreeting) => greeting = newGreeting);
|
||||||
|
|
||||||
|
// Add some routes...
|
||||||
app.get('/', (req, res) => 'Hello, production world!');
|
app.get('/', (req, res) => 'Hello, production world!');
|
||||||
|
|
||||||
|
// Create some routes to demonstrate message passing.
|
||||||
|
app.get('/greeting', (req, res) => greeting);
|
||||||
|
|
||||||
|
// This route will push a new value for `greeting`.
|
||||||
|
app.get('/change_greeting/:newGreeting', (req, res) {
|
||||||
|
greeting = req.params['newGreeting'] as String;
|
||||||
|
client.publish('greeting_changed', greeting);
|
||||||
|
return 'Changed greeting -> $greeting';
|
||||||
|
});
|
||||||
|
|
||||||
|
// The `Runner` helps with fault tolerance.
|
||||||
app.get('/crash', (req, res) {
|
app.get('/crash', (req, res) {
|
||||||
// We'll crash this instance deliberately, but the Runner will auto-respawn for us.
|
// We'll crash this instance deliberately, but the Runner will auto-respawn for us.
|
||||||
new Timer(const Duration(seconds: 3), Isolate.current.kill);
|
new Timer(const Duration(seconds: 3), Isolate.current.kill);
|
||||||
|
|
|
@ -1,2 +1,3 @@
|
||||||
|
export 'src/instance_info.dart';
|
||||||
export 'src/options.dart';
|
export 'src/options.dart';
|
||||||
export 'src/runner.dart';
|
export 'src/runner.dart';
|
6
lib/src/instance_info.dart
Normal file
6
lib/src/instance_info.dart
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
/// Information about the currently-running instance.
|
||||||
|
class InstanceInfo {
|
||||||
|
final int id;
|
||||||
|
|
||||||
|
const InstanceInfo({this.id});
|
||||||
|
}
|
|
@ -4,9 +4,12 @@ import 'dart:isolate';
|
||||||
import 'package:angel_container/angel_container.dart';
|
import 'package:angel_container/angel_container.dart';
|
||||||
import 'package:angel_framework/angel_framework.dart';
|
import 'package:angel_framework/angel_framework.dart';
|
||||||
import 'package:args/args.dart';
|
import 'package:args/args.dart';
|
||||||
import 'package:logging/logging.dart';
|
|
||||||
import 'package:io/ansi.dart';
|
import 'package:io/ansi.dart';
|
||||||
import 'package:io/io.dart';
|
import 'package:io/io.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
import 'package:pub_sub/isolate.dart' as pub_sub;
|
||||||
|
import 'package:pub_sub/pub_sub.dart' as pub_sub;
|
||||||
|
import 'instance_info.dart';
|
||||||
import 'options.dart';
|
import 'options.dart';
|
||||||
|
|
||||||
/// A command-line utility for easier running of multiple instances of an Angel application.
|
/// A command-line utility for easier running of multiple instances of an Angel application.
|
||||||
|
@ -68,18 +71,20 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
/// The returned [Future] completes when the application instance exits.
|
/// The returned [Future] completes when the application instance exits.
|
||||||
///
|
///
|
||||||
/// If respawning is enabled, the [Future] will *never* complete.
|
/// If respawning is enabled, the [Future] will *never* complete.
|
||||||
Future spawnIsolate(RunnerOptions options) {
|
Future spawnIsolate(int id, RunnerOptions options, SendPort pubSubSendPort) {
|
||||||
return _spawnIsolate(new Completer(), options);
|
return _spawnIsolate(id, new Completer(), options, pubSubSendPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future _spawnIsolate(Completer c, RunnerOptions options) {
|
Future _spawnIsolate(
|
||||||
|
int id, Completer c, RunnerOptions options, SendPort pubSubSendPort) {
|
||||||
var onLogRecord = new ReceivePort();
|
var onLogRecord = new ReceivePort();
|
||||||
var onExit = new ReceivePort();
|
var onExit = new ReceivePort();
|
||||||
var onError = new ReceivePort();
|
var onError = new ReceivePort();
|
||||||
var runnerArgs = new _RunnerArgs(
|
var runnerArgs = new _RunnerArgs(name, configureServer, options, reflector,
|
||||||
name, configureServer, options, reflector, onLogRecord.sendPort);
|
onLogRecord.sendPort, pubSubSendPort);
|
||||||
|
var argsWithId = new _RunnerArgsWithId(id, runnerArgs);
|
||||||
|
|
||||||
Isolate.spawn(isolateMain, runnerArgs,
|
Isolate.spawn(isolateMain, argsWithId,
|
||||||
onExit: onExit.sendPort,
|
onExit: onExit.sendPort,
|
||||||
onError: onError.sendPort,
|
onError: onError.sendPort,
|
||||||
errorsAreFatal: true && false)
|
errorsAreFatal: true && false)
|
||||||
|
@ -103,9 +108,9 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
if (options.respawn) {
|
if (options.respawn) {
|
||||||
handleLogRecord(new LogRecord(
|
handleLogRecord(new LogRecord(
|
||||||
Level.WARNING,
|
Level.WARNING,
|
||||||
'Detected a crashed instance at ${new DateTime.now()}. Respawning immediately...',
|
'Instance #$id at ${new DateTime.now()}. Respawning immediately...',
|
||||||
runnerArgs.loggerName));
|
runnerArgs.loggerName));
|
||||||
_spawnIsolate(c, options);
|
_spawnIsolate(id, c, options, pubSubSendPort);
|
||||||
} else {
|
} else {
|
||||||
c.complete();
|
c.complete();
|
||||||
}
|
}
|
||||||
|
@ -119,6 +124,8 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
|
|
||||||
/// Starts a number of isolates, running identical instances of an Angel application.
|
/// Starts a number of isolates, running identical instances of an Angel application.
|
||||||
Future run(List<String> args) async {
|
Future run(List<String> args) async {
|
||||||
|
pub_sub.Server server;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
var argResults = RunnerOptions.argParser.parse(args);
|
var argResults = RunnerOptions.argParser.parse(args);
|
||||||
var options = new RunnerOptions.fromArgResults(argResults);
|
var options = new RunnerOptions.fromArgResults(argResults);
|
||||||
|
@ -137,8 +144,18 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
print('Starting `${name}` application...');
|
print('Starting `${name}` application...');
|
||||||
print('Arguments: $args...\n');
|
print('Arguments: $args...\n');
|
||||||
|
|
||||||
await Future.wait(
|
var adapter = new pub_sub.IsolateAdapter();
|
||||||
new List.generate(options.concurrency, (_) => spawnIsolate(options)));
|
server = new pub_sub.Server([adapter]);
|
||||||
|
|
||||||
|
// Register clients
|
||||||
|
for (int i = 0; i < Platform.numberOfProcessors; i++) {
|
||||||
|
server.registerClient(new pub_sub.ClientInfo('client$i'));
|
||||||
|
}
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
await Future.wait(new List.generate(options.concurrency,
|
||||||
|
(id) => spawnIsolate(id, options, adapter.receivePort.sendPort)));
|
||||||
} on ArgParserException catch (e) {
|
} on ArgParserException catch (e) {
|
||||||
stderr
|
stderr
|
||||||
..writeln(e.message)
|
..writeln(e.message)
|
||||||
|
@ -149,10 +166,13 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
stderr..writeln('fatal error: $e');
|
stderr..writeln('fatal error: $e');
|
||||||
exitCode = 1;
|
exitCode = 1;
|
||||||
|
} finally {
|
||||||
|
server?.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void isolateMain(_RunnerArgs args) {
|
static void isolateMain(_RunnerArgsWithId argsWithId) {
|
||||||
|
var args = argsWithId.args;
|
||||||
hierarchicalLoggingEnabled = true;
|
hierarchicalLoggingEnabled = true;
|
||||||
|
|
||||||
var zone = Zone.current.fork(specification: new ZoneSpecification(
|
var zone = Zone.current.fork(specification: new ZoneSpecification(
|
||||||
|
@ -163,7 +183,15 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
));
|
));
|
||||||
|
|
||||||
zone.run(() async {
|
zone.run(() async {
|
||||||
var app = new Angel(reflector: args.reflector);
|
var client = new pub_sub.IsolateClient(
|
||||||
|
'client${argsWithId.id}', args.pubSubSendPort);
|
||||||
|
|
||||||
|
var app = new Angel(reflector: args.reflector)
|
||||||
|
..container.registerSingleton<pub_sub.Client>(client)
|
||||||
|
..container.registerSingleton(new InstanceInfo(id: argsWithId.id));
|
||||||
|
|
||||||
|
app.shutdownHooks.add((_) => client.close());
|
||||||
|
|
||||||
await app.configure(args.configureServer);
|
await app.configure(args.configureServer);
|
||||||
|
|
||||||
if (app.logger == null) {
|
if (app.logger == null) {
|
||||||
|
@ -177,11 +205,18 @@ _ ___ | /| / / /_/ / _ /___ _ /___
|
||||||
await http.startServer(args.options.hostname, args.options.port);
|
await http.startServer(args.options.hostname, args.options.port);
|
||||||
var url = new Uri(
|
var url = new Uri(
|
||||||
scheme: 'http', host: server.address.address, port: server.port);
|
scheme: 'http', host: server.address.address, port: server.port);
|
||||||
print('Listening at $url');
|
print('Instance #${argsWithId.id} listening at $url');
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class _RunnerArgsWithId {
|
||||||
|
final int id;
|
||||||
|
final _RunnerArgs args;
|
||||||
|
|
||||||
|
_RunnerArgsWithId(this.id, this.args);
|
||||||
|
}
|
||||||
|
|
||||||
class _RunnerArgs {
|
class _RunnerArgs {
|
||||||
final String name;
|
final String name;
|
||||||
|
|
||||||
|
@ -191,10 +226,10 @@ class _RunnerArgs {
|
||||||
|
|
||||||
final Reflector reflector;
|
final Reflector reflector;
|
||||||
|
|
||||||
final SendPort loggingSendPort;
|
final SendPort loggingSendPort, pubSubSendPort;
|
||||||
|
|
||||||
_RunnerArgs(this.name, this.configureServer, this.options, this.reflector,
|
_RunnerArgs(this.name, this.configureServer, this.options, this.reflector,
|
||||||
this.loggingSendPort);
|
this.loggingSendPort, this.pubSubSendPort);
|
||||||
|
|
||||||
String get loggerName => name;
|
String get loggerName => name;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue