working on metrics

This commit is contained in:
Tobe O 2017-10-28 04:50:16 -04:00
parent 1e8cb87e18
commit 77a5cec9c0
15 changed files with 777 additions and 495 deletions

View file

@ -26,7 +26,7 @@
<entry key="angel_route">
<value>
<list>
<option value="$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_route-1.0.6/lib" />
<option value="$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_route-1.0.7/lib" />
</list>
</value>
</entry>
@ -54,7 +54,7 @@
<entry key="body_parser">
<value>
<list>
<option value="$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/body_parser-1.0.2/lib" />
<option value="$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/body_parser-1.0.3/lib" />
</list>
</value>
</entry>
@ -435,11 +435,11 @@
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/analyzer-0.30.0+4/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_http_exception-1.0.0/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_model-1.0.0/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_route-1.0.6/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/angel_route-1.0.7/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/args-1.0.1/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/async-1.13.3/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/barback-0.15.2+13/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/body_parser-1.0.2/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/body_parser-1.0.3/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/boolean_selector-1.0.2/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/charcode-1.1.1/lib" />
<root url="file://$USER_HOME$/.pub-cache/hosted/pub.dartlang.org/cli_util-0.1.2+1/lib" />

View file

@ -1,6 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="performance::hello" type="DartCommandLineRunConfigurationType" factoryName="Dart Command Line Application" singleton="true">
<option name="checkedMode" value="false" />
<configuration default="false" name="performance::hello (DEV)" type="DartCommandLineRunConfigurationType" factoryName="Dart Command Line Application" singleton="true">
<option name="filePath" value="$PROJECT_DIR$/performance/hello/main.dart" />
<option name="workingDirectory" value="$PROJECT_DIR$" />
<method />

View file

@ -0,0 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="performance::hello (PRODUCTION)" type="DartCommandLineRunConfigurationType" factoryName="Dart Command Line Application" singleton="true">
<option name="checkedMode" value="false" />
<option name="envs">
<entry key="ANGEL_ENV" value="production" />
</option>
<option name="filePath" value="$PROJECT_DIR$/performance/hello/main.dart" />
<option name="workingDirectory" value="$PROJECT_DIR$" />
<method />
</configuration>
</component>

File diff suppressed because it is too large Load diff

2
lib/metrics.dart Normal file
View file

@ -0,0 +1,2 @@
export 'src/stats/metric_server.dart';
export 'src/stats/stats.dart';

View file

@ -9,6 +9,9 @@ typedef Future<String> ViewGenerator(String path, [Map data]);
/// Base class for Angel servers. Do not bother extending this.
class AngelBase extends Routable {
static ViewGenerator noViewEngineConfigured =(String view, [Map data]) async =>
"No view engine has been configured yet.";
Container _container = new Container();
final Map configuration = {};
@ -28,6 +31,12 @@ class AngelBase extends Routable {
/// A function that renders views.
///
/// Called by [ResponseContext]@`render`.
ViewGenerator viewGenerator = (String view, [Map data]) async =>
"No view engine has been configured yet.";
ViewGenerator viewGenerator = noViewEngineConfigured;
/// Closes this instance, rendering it **COMPLETELY DEFUNCT**.
Future close() async {
await super.close();
_container = null;
viewGenerator = noViewEngineConfigured;
}
}

View file

@ -686,6 +686,7 @@ class HookedServiceEventDispatcher {
void _close() {
_ctrl.forEach((c) => c.close());
listeners.clear();
}
/// Fires an event, and returns it once it is either canceled, or all listeners have run.

View file

@ -3,7 +3,6 @@ library angel_framework.http.request_context;
import 'dart:async';
import 'dart:io';
import 'dart:mirrors';
import 'package:angel_http_exception/angel_http_exception.dart';
import 'package:body_parser/body_parser.dart';
import 'package:charcode/charcode.dart';
import 'metadata.dart';
@ -214,6 +213,11 @@ class RequestContext {
/// Shorthand to add to [_injections].
void inject(type, value) {
if (!app.isProduction && type is Type) {
if (!reflect(value).type.isAssignableTo(reflectType(type)))
throw new ArgumentError('Cannot inject $value (${value.runtimeType}) as an instance of $type.');
}
_injections[type] = value;
}
@ -278,4 +282,19 @@ class RequestContext {
return _body = await parseBody(io,
storeOriginalBuffer: app.storeOriginalBuffer == true);
}
/// Disposes of all resources.
Future close() async {
_body = null;
_acceptsAllCache = null;
_acceptHeaderCache = null;
_io = null;
_override = _path = null;
_contentType = null;
_provisionalQuery?.clear();
properties.clear();
_injections.clear();
serviceParams.clear();
params.clear();
}
}

View file

@ -3,7 +3,6 @@ library angel_framework.http.response_context;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:angel_route/angel_route.dart';
import 'package:json_god/json_god.dart' as god;
import 'package:mime/mime.dart';
@ -158,11 +157,24 @@ class ResponseContext implements StreamSink<List<int>>, StringSink {
return f;
}
/// Disposes of all resources.
Future dispose() async {
await close();
properties.clear();
encoders.clear();
_buffer.clear();
cookies.clear();
app = null;
_headers.clear();
serializer = null;
}
/// Prevents further request handlers from running on the response, except for response finalizers.
///
/// To disable response finalizers, see [willCloseItself].
void end() {
_isOpen = false;
if (_done?.isCompleted == false) _done.complete();
}
/// Serializes JSON to the response.
@ -434,14 +446,16 @@ class ResponseContext implements StreamSink<List<int>>, StringSink {
}
abstract class _LockableBytesBuilder extends BytesBuilder {
factory _LockableBytesBuilder() => new _LockableBytesBuilderImpl();
factory _LockableBytesBuilder() {
return new _LockableBytesBuilderImpl();
}
void _lock();
}
class _LockableBytesBuilderImpl implements _LockableBytesBuilder {
final BytesBuilder _buf = new BytesBuilder(copy: false);
bool _closed = false;
Uint8List _data = new Uint8List(0);
StateError _deny() =>
new StateError('Cannot modified a closed response\'s buffer.');
@ -455,70 +469,37 @@ class _LockableBytesBuilderImpl implements _LockableBytesBuilder {
void add(List<int> bytes) {
if (_closed)
throw _deny();
else if (bytes.isNotEmpty) {
int len = _data.length + bytes.length;
var d = new Uint8List(len);
for (int i = 0; i < _data.length; i++) {
d[i] = _data[i];
}
for (int i = 0; i < bytes.length; i++) {
d[i + _data.length] = bytes[i];
}
_data = d;
}
else _buf.add(bytes);
}
@override
void addByte(int byte) {
if (_closed)
throw _deny();
else {
int len = _data.length + 1;
var d = new Uint8List(len);
for (int i = 0; i < _data.length; i++) {
d[i] = _data[i];
}
d[_data.length] = byte;
_data = d;
}
else _buf.addByte(byte);
}
@override
void clear() {
if (_closed)
throw _deny();
else {
for (int i = 0; i < _data.length; i++) _data[i] = 0;
}
_buf.clear();
}
@override
bool get isEmpty => _data.isEmpty;
bool get isEmpty => _buf.isEmpty;
@override
bool get isNotEmpty => _data.isNotEmpty;
bool get isNotEmpty => _buf.isNotEmpty;
@override
int get length => _data.length;
int get length => _buf.length;
@override
List<int> takeBytes() {
if (_closed)
return toBytes();
else {
var r = new Uint8List.fromList(_data);
clear();
return r;
}
return _buf.takeBytes();
}
@override
List<int> toBytes() {
return _data;
return _buf.toBytes();
}
}

View file

@ -39,6 +39,13 @@ class Routable extends Router {
Routable() : super();
Future close() async {
_services.clear();
configuration.clear();
requestMiddleware.clear();
_onService.close();
}
/// Additional filters to be run on designated requests.
@override
final Map<String, RequestHandler> requestMiddleware = {};

View file

@ -30,11 +30,12 @@ typedef Future AngelConfigurer(Angel app);
/// A powerful real-time/REST/MVC server class.
class Angel extends AngelBase {
final List<Angel> _children = [];
final Map<String, Tuple3<MiddlewarePipeline, Map, Match>> _handlerCache = {};
final Map<String, Tuple3<List, Map, Match>> _handlerCache = {};
Router _flattened;
bool _isProduction;
bool _isProduction = false;
Angel _parent;
StreamSubscription<HttpRequest> _sub;
ServerGenerator _serverGenerator = HttpServer.bind;
/// A global Map of converters that can transform responses bodies.
@ -74,10 +75,7 @@ class Angel extends AngelBase {
/// This value is memoized the first time you call it, so do not change environment
/// configuration at runtime!
bool get isProduction {
if (_isProduction != null)
return _isProduction;
else
return _isProduction = Platform.environment['ANGEL_ENV'] == 'production';
return _isProduction ??= (Platform.environment['ANGEL_ENV'] == 'production');
}
/// The function used to bind this instance to an HTTP server.
@ -158,7 +156,8 @@ class Angel extends AngelBase {
}
optimizeForProduction();
return httpServer..listen(handleRequest);
_sub = httpServer.listen(handleRequest);
return httpServer;
}
@override
@ -196,9 +195,13 @@ class Angel extends AngelBase {
}
/// Shuts down the server, and closes any open [StreamController]s.
///
/// The server will be **COMPLETE DEFUNCT** after this operation!
Future<HttpServer> close() async {
HttpServer server;
_sub?.cancel();
if (httpServer != null) {
server = httpServer;
await httpServer.close(force: true);
@ -210,6 +213,20 @@ class Angel extends AngelBase {
for (var plugin in shutdownHooks) await plugin(this);
await super.close();
_preContained.clear();
_handlerCache.clear();
_injections.clear();
encoders.clear();
_serializer = god.serialize;
_children.clear();
_parent = null;
logger = null;
startupHooks.clear();
shutdownHooks.clear();
responseFinalizers.clear();
_flattened = null;
return server;
}
@ -291,9 +308,12 @@ class Angel extends AngelBase {
/// Runs some [handler]. Returns `true` if request execution should continue.
Future<bool> executeHandler(
handler, RequestContext req, ResponseContext res) async {
if (handler == null) return false;
var result = await getHandlerResult(handler, req, res);
if (result is bool) {
if (result == null)
return false;
else if (result is bool) {
return result;
} else if (result != null) {
res.serialize(result,
@ -384,30 +404,36 @@ class Angel extends AngelBase {
if (requestedUrl.isEmpty) requestedUrl = '/';
var tuple = _handlerCache.putIfAbsent('${req.method}:$requestedUrl', () {
Tuple3<List, Map, Match> resolveTuple() {
Router r = isProduction ? (_flattened ??= flatten(this)) : this;
var resolved =
r.resolveAll(requestedUrl, requestedUrl, method: req.method);
return new Tuple3(
new MiddlewarePipeline(resolved),
new MiddlewarePipeline(resolved).handlers,
resolved.fold<Map>({}, (out, r) => out..addAll(r.allParams)),
resolved.isEmpty ? null : resolved.first.route.match(requestedUrl),
);
});
}
var tuple = isProduction
? _handlerCache.putIfAbsent(
'${req.method}:$requestedUrl', resolveTuple)
: resolveTuple();
req.inject(Zone, zone);
req.inject(ZoneSpecification, zoneSpec);
req.inject(MiddlewarePipeline, tuple.item1);
req.params.addAll(tuple.item2);
req.inject(Match, tuple.item3);
req.inject(Stopwatch, new Stopwatch()..start());
var pipeline = tuple.item1.handlers;
if (logger != null) req.inject(Stopwatch, new Stopwatch()..start());
var pipeline = tuple.item1;
for (var handler in pipeline) {
try {
if (!await executeHandler(handler, req, res)) break;
if (handler == null || !await executeHandler(handler, req, res))
break;
} on AngelHttpException catch (e, st) {
e.stackTrace ??= st;
return await handleAngelHttpException(e, st, req, res, request);
@ -436,6 +462,11 @@ class Angel extends AngelBase {
}
return handleAngelHttpException(e, stackTrace, req, res, request);
}).whenComplete(() {
scheduleMicrotask(() {
req.close();
res.dispose();
});
});
}

View file

@ -0,0 +1,155 @@
import 'dart:async';
import 'dart:io';
import 'package:logging/logging.dart';
import '../http/http.dart';
import 'stats.dart';
/// A variant of an [Angel] server that records performance metrics.
class AngelMetrics extends Angel {
Angel _inner;
HttpServer _server;
StreamSubscription<HttpRequest> _sub;
AngelMetrics() : super() {
var zoneBuilder = createZoneForRequest;
createZoneForRequest = (request, req, res) async {
var spec = await zoneBuilder(request, req, res);
return new ZoneSpecification.from(
spec,
run: (Zone self, ZoneDelegate parent, Zone zone, f()) {
var sw = new Stopwatch();
//print('--- ${req.method} ${req.uri}: $f');
sw.start();
void whenDone() {
sw.stop();
var ms = sw.elapsedMilliseconds;
parent.print(
zone, '--- ${req.method} ${req.uri} DONE after ${ms}ms: $f');
}
var r = parent.run(zone, f);
if (r is Future) {
return r.then((x) {
whenDone();
return x;
});
}
whenDone();
return r;
},
);
};
logger = new Logger('angel_metrics')
..onRecord.listen((rec) {
print(rec);
if (rec.error != null) {
print(rec.error);
print(rec.stackTrace);
}
});
}
factory AngelMetrics.custom(ServerGenerator serverGenerator) {
return new AngelMetrics().._inner = new Angel.custom(serverGenerator);
}
@override
HttpServer get httpServer => _server ?? super.httpServer;
final AngelMetricsStats stats = new AngelMetricsStats._();
@override
Future<HttpServer> startServer([address, int port]) async {
if (_inner == null) return await super.startServer(address, port);
var host = address ?? InternetAddress.LOOPBACK_IP_V4;
_server = await _inner.serverGenerator(host, port ?? 0);
for (var configurer in startupHooks) {
await configure(configurer);
}
optimizeForProduction();
_sub = _server.listen(handleRequest);
return _server;
}
@override
Future<HttpServer> close() async {
_sub?.cancel();
await _inner.close();
return await super.close();
}
@override
Future<RequestContext> createRequestContext(HttpRequest request) {
return stats.createRequestContext
.run<RequestContext>(() => super.createRequestContext(request));
}
@override
Future<ResponseContext> createResponseContext(HttpResponse response,
[RequestContext correspondingRequest]) {
return stats.createResponseContext.run<ResponseContext>(
() => super.createResponseContext(response, correspondingRequest));
}
@override
Future handleRequest(HttpRequest request) {
return stats.handleRequest.run(() => super.handleRequest(request));
}
@override
Future<bool> executeHandler(
handler, RequestContext req, ResponseContext res) {
return stats.executeHandler
.run<bool>(() => super.executeHandler(handler, req, res));
}
@override
Future getHandlerResult(handler, RequestContext req, ResponseContext res) {
return stats.getHandlerResult
.run(() => super.getHandlerResult(handler, req, res));
}
@override
Future runContained(
Function handler, RequestContext req, ResponseContext res) {
return stats.runContained.run(() => super.runContained(handler, req, res));
}
@override
Future sendResponse(
HttpRequest request, RequestContext req, ResponseContext res,
{bool ignoreFinalizers: false}) {
return stats.sendResponse.run(() => super.sendResponse(request, req, res));
}
}
class AngelMetricsStats {
AngelMetricsStats._() {
all = [
createRequestContext,
createResponseContext,
];
}
final Stats createRequestContext = new Stats('createRequestContext');
final Stats createResponseContext = new Stats('createResponseContext');
final Stats handleRequest = new Stats('handleRequest');
final Stats executeHandler = new Stats('executeHandler');
final Stats getHandlerResult = new Stats('getHandlerResult');
final Stats runContained = new Stats('runContained');
final Stats sendResponse = new Stats('sendResponse');
List<Stats> all;
void log() {
all.forEach((s) => s.log());
}
}

49
lib/src/stats/stats.dart Normal file
View file

@ -0,0 +1,49 @@
/// Computes averages progressively.
import 'dart:async';
class Stats {
final String name;
int _total = 0, _count = 0;
double _average = 0.0;
Stats(this.name);
double get average => _average ?? (_total / _count);
void log() {
print('$name: $average avg.');
}
void add(int value) {
_average = null;
_total += value;
_count++;
}
FutureOr<T> run<T>(FutureOr<T> f()) {
var sw = new Stopwatch();
//print('--- $name START');
sw.start();
void whenDone() {
sw.stop();
var ms = sw.elapsedMilliseconds;
add(ms);
print('--- $name DONE after ${ms}ms');
}
var r = f();
if (r is Future) {
return (r as Future).then((x) {
whenDone();
return x;
});
}
whenDone();
return r;
}
}

View file

@ -4,24 +4,20 @@ library performance.hello;
import 'dart:io';
import 'dart:isolate';
import 'package:angel_framework/angel_framework.dart';
import 'package:logging/logging.dart';
import 'package:angel_framework/metrics.dart';
main() {
for (int i = 0; i < Platform.numberOfProcessors - 1; i++)
for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
Isolate.spawn(start, i + 1);
}
start(0);
}
void start(int id) {
var app = new Angel.custom(startShared)
var app = new AngelMetrics.custom(startShared)
..lazyParseBodies = true
..get('/', (req, res) => res.write('Hello, world!'))
..optimizeForProduction(force: true)
..logger = (new Logger('streaming_test')
..onRecord.listen((rec) {
print(rec);
if (rec.stackTrace != null) print(rec.stackTrace);
}));
..get('/', (req, res) => res.write('Hello, world!'));
var oldHandler = app.errorHandler;
app.errorHandler = (e, req, res) {

View file

@ -4,7 +4,7 @@ description: A high-powered HTTP server with DI, routing and more.
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_framework
environment:
sdk: ">=1.19.0"
sdk: ">=1.19.0 <2.0.0"
dependencies:
angel_http_exception: ^1.0.0
angel_model: ^1.0.0