From fe9ccd7d7c015f5a77d03ba261ef4ffb0d9d8937 Mon Sep 17 00:00:00 2001 From: Patrick Stewart Date: Tue, 3 Sep 2024 13:14:31 -0700 Subject: [PATCH] add(conduit): refactoring conduit core --- packages/app/.gitignore | 7 + packages/app/CHANGELOG.md | 3 + packages/app/LICENSE.md | 10 + packages/app/README.md | 1 + packages/app/analysis_options.yaml | 30 ++ packages/app/lib/application.dart | 18 ++ packages/app/lib/src/application.dart | 237 ++++++++++++++ packages/app/lib/src/application_server.dart | 123 ++++++++ packages/app/lib/src/channel.dart | 290 ++++++++++++++++++ .../lib/src/isolate_application_server.dart | 101 ++++++ packages/app/lib/src/isolate_supervisor.dart | 147 +++++++++ packages/app/lib/src/options.dart | 106 +++++++ packages/app/lib/src/starter.dart | 32 ++ packages/app/pubspec.yaml | 23 ++ packages/{auth/lib/src => app/test}/.gitkeep | 0 15 files changed, 1128 insertions(+) create mode 100644 packages/app/.gitignore create mode 100644 packages/app/CHANGELOG.md create mode 100644 packages/app/LICENSE.md create mode 100644 packages/app/README.md create mode 100644 packages/app/analysis_options.yaml create mode 100644 packages/app/lib/application.dart create mode 100644 packages/app/lib/src/application.dart create mode 100644 packages/app/lib/src/application_server.dart create mode 100644 packages/app/lib/src/channel.dart create mode 100644 packages/app/lib/src/isolate_application_server.dart create mode 100644 packages/app/lib/src/isolate_supervisor.dart create mode 100644 packages/app/lib/src/options.dart create mode 100644 packages/app/lib/src/starter.dart create mode 100644 packages/app/pubspec.yaml rename packages/{auth/lib/src => app/test}/.gitkeep (100%) diff --git a/packages/app/.gitignore b/packages/app/.gitignore new file mode 100644 index 0000000..3cceda5 --- /dev/null +++ b/packages/app/.gitignore @@ -0,0 +1,7 @@ +# https://dart.dev/guides/libraries/private-files +# Created by `dart pub` +.dart_tool/ + +# Avoid committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock diff --git a/packages/app/CHANGELOG.md b/packages/app/CHANGELOG.md new file mode 100644 index 0000000..effe43c --- /dev/null +++ b/packages/app/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version. diff --git a/packages/app/LICENSE.md b/packages/app/LICENSE.md new file mode 100644 index 0000000..0fd0d03 --- /dev/null +++ b/packages/app/LICENSE.md @@ -0,0 +1,10 @@ +The MIT License (MIT) + +The Laravel Framework is Copyright (c) Taylor Otwell +The Fabric Framework is Copyright (c) Vieo, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/packages/app/README.md b/packages/app/README.md new file mode 100644 index 0000000..757f4c9 --- /dev/null +++ b/packages/app/README.md @@ -0,0 +1 @@ +

\ No newline at end of file diff --git a/packages/app/analysis_options.yaml b/packages/app/analysis_options.yaml new file mode 100644 index 0000000..dee8927 --- /dev/null +++ b/packages/app/analysis_options.yaml @@ -0,0 +1,30 @@ +# This file configures the static analysis results for your project (errors, +# warnings, and lints). +# +# This enables the 'recommended' set of lints from `package:lints`. +# This set helps identify many issues that may lead to problems when running +# or consuming Dart code, and enforces writing Dart using a single, idiomatic +# style and format. +# +# If you want a smaller set of lints you can change this to specify +# 'package:lints/core.yaml'. These are just the most critical lints +# (the recommended set includes the core lints). +# The core lints are also what is used by pub.dev for scoring packages. + +include: package:lints/recommended.yaml + +# Uncomment the following section to specify additional rules. + +# linter: +# rules: +# - camel_case_types + +# analyzer: +# exclude: +# - path/to/excluded/files/** + +# For more information about the core and recommended set of lints, see +# https://dart.dev/go/core-lints + +# For additional information about configuring this file, see +# https://dart.dev/guides/language/analysis-options diff --git a/packages/app/lib/application.dart b/packages/app/lib/application.dart new file mode 100644 index 0000000..15b974a --- /dev/null +++ b/packages/app/lib/application.dart @@ -0,0 +1,18 @@ +/* + * This file is part of the Protevus Platform. + * + * (C) Protevus + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +library; + +export 'src/application.dart'; +export 'src/application_server.dart'; +export 'src/channel.dart'; +export 'src/isolate_application_server.dart'; +export 'src/isolate_supervisor.dart'; +export 'src/options.dart'; +export 'src/starter.dart'; diff --git a/packages/app/lib/src/application.dart b/packages/app/lib/src/application.dart new file mode 100644 index 0000000..20737b1 --- /dev/null +++ b/packages/app/lib/src/application.dart @@ -0,0 +1,237 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:protevus_application/application.dart'; +import 'package:protevus_openapi/v3.dart'; +import 'package:protevus_runtime/runtime.dart'; +import 'package:logging/logging.dart'; + +export 'application_server.dart'; +export 'options.dart'; +export 'starter.dart'; + +/// This object starts and stops instances of your [ApplicationChannel]. +/// +/// An application object opens HTTP listeners that forward requests to instances of your [ApplicationChannel]. +/// It is unlikely that you need to use this class directly - the `conduit serve` command creates an application object +/// on your behalf. +class Application { + /// A list of isolates that this application supervises. + List supervisors = []; + + /// The [ApplicationServer] listening for HTTP requests while under test. + /// + /// This property is only valid when an application is started via [startOnCurrentIsolate]. + late ApplicationServer server; + + /// The [ApplicationChannel] handling requests while under test. + /// + /// This property is only valid when an application is started via [startOnCurrentIsolate]. You use + /// this value to access elements of your application channel during testing. + T get channel => server.channel as T; + + /// The logger that this application will write messages to. + /// + /// This logger's name will appear as 'conduit'. + Logger logger = Logger("conduit"); + + /// The options used to configure this application. + /// + /// Changing these values once the application has started will have no effect. + ApplicationOptions options = ApplicationOptions(); + + /// The duration to wait for each isolate during startup before failing. + /// + /// A [TimeoutException] is thrown if an isolate fails to startup in this time period. + /// + /// Defaults to 30 seconds. + Duration isolateStartupTimeout = const Duration(seconds: 30); + + /// Whether or not this application is running. + /// + /// This will return true if [start]/[startOnCurrentIsolate] have been invoked and completed; i.e. this is the synchronous version of the [Future] returned by [start]/[startOnCurrentIsolate]. + /// + /// This value will return to false after [stop] has completed. + bool get isRunning => _hasFinishedLaunching; + bool _hasFinishedLaunching = false; + ChannelRuntime get _runtime => RuntimeContext.current[T] as ChannelRuntime; + + /// Starts this application, allowing it to handle HTTP requests. + /// + /// This method spawns [numberOfInstances] isolates, instantiates your application channel + /// for each of these isolates, and opens an HTTP listener that sends requests to these instances. + /// + /// The [Future] returned from this method will complete once all isolates have successfully started + /// and are available to handle requests. + /// + /// If your application channel implements [ApplicationChannel.initializeApplication], + /// it will be invoked prior to any isolate being spawned. + /// + /// See also [startOnCurrentIsolate] for starting an application when running automated tests. + Future start({int numberOfInstances = 1, bool consoleLogging = false}) async { + if (supervisors.isNotEmpty) { + throw StateError( + "Application error. Cannot invoke 'start' on already running Conduit application.", + ); + } + + if (options.address == null) { + if (options.isIpv6Only) { + options.address = InternetAddress.anyIPv6; + } else { + options.address = InternetAddress.anyIPv4; + } + } + + try { + await _runtime.runGlobalInitialization(options); + + for (var i = 0; i < numberOfInstances; i++) { + final supervisor = await _spawn( + this, + options, + i + 1, + logger, + isolateStartupTimeout, + logToConsole: consoleLogging, + ); + supervisors.add(supervisor); + await supervisor.resume(); + } + } catch (e, st) { + logger.severe("$e", this, st); + await stop().timeout(const Duration(seconds: 5)); + rethrow; + } + for (final sup in supervisors) { + sup.sendPendingMessages(); + } + _hasFinishedLaunching = true; + } + + /// Starts the application on the current isolate, and does not spawn additional isolates. + /// + /// An application started in this way will run on the same isolate this method is invoked on. + /// Performance is limited when running the application with this method; prefer to use [start]. + Future startOnCurrentIsolate() async { + if (supervisors.isNotEmpty) { + throw StateError( + "Application error. Cannot invoke 'test' on already running Conduit application.", + ); + } + + options.address ??= InternetAddress.loopbackIPv4; + + try { + await _runtime.runGlobalInitialization(options); + + server = ApplicationServer(_runtime.channelType, options, 1); + + await server.start(); + _hasFinishedLaunching = true; + } catch (e, st) { + logger.severe("$e", this, st); + await stop().timeout(const Duration(seconds: 5)); + rethrow; + } + } + + /// Stops the application from running. + /// + /// Closes every isolate and their channel and stops listening for HTTP requests. + /// The [ServiceRegistry] will close any of its resources. + Future stop() async { + _hasFinishedLaunching = false; + await Future.wait(supervisors.map((s) => s.stop())) + .onError((error, stackTrace) { + if (error.runtimeType.toString() == 'LateError') { + throw StateError( + 'Channel type $T was not loaded in the current isolate. Check that the class was declared and public.', + ); + } + throw error! as Error; + }); + + try { + await server.server.close(force: true); + } catch (e) { + logger.severe(e); + } + + _hasFinishedLaunching = false; + supervisors = []; + + logger.clearListeners(); + } + + /// Creates an [APIDocument] from an [ApplicationChannel]. + /// + /// This method is called by the `conduit document` CLI. + static Future document( + Type type, + ApplicationOptions config, + Map projectSpec, + ) async { + final runtime = RuntimeContext.current[type] as ChannelRuntime; + + await runtime.runGlobalInitialization(config); + + final server = ApplicationServer(runtime.channelType, config, 1); + + await server.channel.prepare(); + + final doc = await server.channel.documentAPI(projectSpec); + + await server.channel.close(); + + return doc; + } + + Future _spawn( + Application application, + ApplicationOptions config, + int identifier, + Logger logger, + Duration startupTimeout, { + bool logToConsole = false, + }) async { + final receivePort = ReceivePort(); + + final libraryUri = _runtime.libraryUri; + final typeName = _runtime.name; + final entryPoint = _runtime.isolateEntryPoint; + + final initialMessage = ApplicationInitialServerMessage( + typeName, + libraryUri, + config, + identifier, + receivePort.sendPort, + logToConsole: logToConsole, + ); + final isolate = + await Isolate.spawn(entryPoint, initialMessage, paused: true); + + return ApplicationIsolateSupervisor( + application, + isolate, + receivePort, + identifier, + logger, + startupTimeout: startupTimeout, + ); + } +} + +/// Thrown when an application encounters an exception during startup. +/// +/// Contains the original exception that halted startup. +class ApplicationStartupException implements Exception { + ApplicationStartupException(this.originalException); + + dynamic originalException; + + @override + String toString() => originalException.toString(); +} diff --git a/packages/app/lib/src/application_server.dart b/packages/app/lib/src/application_server.dart new file mode 100644 index 0000000..ed6279d --- /dev/null +++ b/packages/app/lib/src/application_server.dart @@ -0,0 +1,123 @@ +import 'dart:async'; +import 'dart:io'; +import 'package:protevus_application/application.dart'; +import 'package:protevus_http/http.dart'; +import 'package:protevus_runtime/runtime.dart'; +import 'package:logging/logging.dart'; + +/// Listens for HTTP requests and delivers them to its [ApplicationChannel] instance. +/// +/// A Conduit application creates instances of this type to pair an HTTP server and an +/// instance of an [ApplicationChannel] subclass. Instances are created by [Application] +/// and shouldn't be created otherwise. +class ApplicationServer { + /// Creates a new server. + /// + /// You should not need to invoke this method directly. + ApplicationServer(this.channelType, this.options, this.identifier) { + channel = (RuntimeContext.current[channelType] as ChannelRuntime) + .instantiateChannel() + ..server = this + ..options = options; + } + + /// The configuration this instance used to start its [channel]. + ApplicationOptions options; + + /// The underlying [HttpServer]. + late final HttpServer server; + + /// The instance of [ApplicationChannel] serving requests. + late ApplicationChannel channel; + + /// The cached entrypoint of [channel]. + late Controller entryPoint; + + final Type channelType; + + /// Target for sending messages to other [ApplicationChannel.messageHub]s. + /// + /// Events are added to this property by instances of [ApplicationMessageHub] and should not otherwise be used. + EventSink? hubSink; + + /// Whether or not this server requires an HTTPS listener. + bool get requiresHTTPS => _requiresHTTPS; + bool _requiresHTTPS = false; + + /// The unique identifier of this instance. + /// + /// Each instance has its own identifier, a numeric value starting at 1, to identify it + /// among other instances. + int identifier; + + /// The logger of this instance + Logger get logger => Logger("conduit"); + + /// Starts this instance, allowing it to receive HTTP requests. + /// + /// Do not invoke this method directly. + Future start({bool shareHttpServer = false}) async { + logger.fine("ApplicationServer($identifier).start entry"); + + await channel.prepare(); + + entryPoint = channel.entryPoint; + entryPoint.didAddToChannel(); + + logger.fine("ApplicationServer($identifier).start binding HTTP"); + final securityContext = channel.securityContext; + if (securityContext != null) { + _requiresHTTPS = true; + + server = await HttpServer.bindSecure( + options.address, + options.port, + securityContext, + requestClientCertificate: options.isUsingClientCertificate, + v6Only: options.isIpv6Only, + shared: shareHttpServer, + ); + } else { + _requiresHTTPS = false; + + server = await HttpServer.bind( + options.address, + options.port, + v6Only: options.isIpv6Only, + shared: shareHttpServer, + ); + } + + logger.fine("ApplicationServer($identifier).start bound HTTP"); + return didOpen(); + } + + /// Closes this HTTP server and channel. + Future close() async { + logger.fine("ApplicationServer($identifier).close Closing HTTP listener"); + await server.close(force: true); + logger.fine("ApplicationServer($identifier).close Closing channel"); + await channel.close(); + + // This is actually closed by channel.messageHub.close, but this shuts up the analyzer. + hubSink?.close(); + logger.fine("ApplicationServer($identifier).close Closing complete"); + } + + /// Invoked when this server becomes ready receive requests. + /// + /// [ApplicationChannel.willStartReceivingRequests] is invoked after this opening has completed. + Future didOpen() async { + server.serverHeader = "conduit/$identifier"; + + logger.fine("ApplicationServer($identifier).didOpen start listening"); + server.map((baseReq) => Request(baseReq)).listen(entryPoint.receive); + + channel.willStartReceivingRequests(); + logger.info("Server conduit/$identifier started."); + } + + void sendApplicationEvent(dynamic event) { + // By default, do nothing + } +} diff --git a/packages/app/lib/src/channel.dart b/packages/app/lib/src/channel.dart new file mode 100644 index 0000000..8f120f8 --- /dev/null +++ b/packages/app/lib/src/channel.dart @@ -0,0 +1,290 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:protevus_openapi/documentable.dart'; +import 'package:protevus_application/application.dart'; +import 'package:protevus_http/http.dart'; +import 'package:protevus_openapi/v3.dart'; +import 'package:protevus_runtime/runtime.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; + +/// An object that defines the behavior specific to your application. +/// +/// You create a subclass of [ApplicationChannel] to initialize your application's services and define how HTTP requests are handled by your application. +/// There *must* only be one subclass in an application and it must be visible to your application library file, e.g., 'package:my_app/my_app.dart'. +/// +/// You must implement [entryPoint] to define the controllers that comprise your application channel. Most applications will +/// also override [prepare] to read configuration values and initialize services. Some applications will provide an [initializeApplication] +/// method to do global startup tasks. +/// +/// When your application is started, an instance of your application channel is created for each isolate (see [Application.start]). Each instance +/// is a replica of your application that runs in its own memory isolated thread. +abstract class ApplicationChannel implements APIComponentDocumenter { + /// You implement this method to provide global initialization for your application. + /// + /// Most of your application initialization code is written in [prepare], which is invoked for each isolate. For initialization that + /// needs to occur once per application start, you must provide an implementation for this method. This method is invoked prior + /// to any isolates being spawned. + /// + /// You may alter [options] in this method and those changes will be available in each instance's [options]. To pass arbitrary data + /// to each of your isolates at startup, add that data to [ApplicationOptions.context]. + /// + /// Example: + /// + /// class MyChannel extends ApplicationChannel { + /// static Future initializeApplication(ApplicationOptions options) async { + /// options.context["runtimeOption"] = "foo"; + /// } + /// + /// Future prepare() async { + /// if (options.context["runtimeOption"] == "foo") { + /// // do something + /// } + /// } + /// } + /// + /// + /// Do not configure objects like [CodecRegistry], [CORSPolicy.defaultPolicy] or any other value that isn't explicitly passed through [options]. + /// + /// * Note that static methods are not inherited in Dart and therefore you are not overriding this method. The declaration of this method in the base [ApplicationChannel] class + /// is for documentation purposes. + static Future initializeApplication(ApplicationOptions options) async {} + + /// The logger that this object will write messages to. + /// + /// This logger's name appears as 'conduit'. + Logger get logger => Logger("conduit"); + + /// The [ApplicationServer] that sends HTTP requests to this object. + ApplicationServer get server => _server; + + set server(ApplicationServer server) { + _server = server; + messageHub._outboundController.stream.listen(server.sendApplicationEvent); + server.hubSink = messageHub._inboundController.sink; + } + + /// Use this object to send data to channels running on other isolates. + /// + /// You use this object to synchronize state across the isolates of an application. Any data sent + /// through this object will be received by every other channel in your application (except the one that sent it). + final ApplicationMessageHub messageHub = ApplicationMessageHub(); + + /// The context used for setting up HTTPS in an application. + /// + /// If this value is non-null, the [server] receiving HTTP requests will only accept requests over HTTPS. + /// + /// By default, this value is null. If the [ApplicationOptions] provided to the application are configured to + /// reference a private key and certificate file, this value is derived from that information. You may override + /// this method to provide an alternative means to creating a [SecurityContext]. + SecurityContext? get securityContext { + if (options?.certificateFilePath == null || + options?.privateKeyFilePath == null) { + return null; + } + + return SecurityContext() + ..useCertificateChain(options!.certificateFilePath!) + ..usePrivateKey(options!.privateKeyFilePath!); + } + + /// The configuration options used to start the application this channel belongs to. + /// + /// These options are set when starting the application. Changes to this object have no effect + /// on other isolates. + ApplicationOptions? options; + + /// You implement this accessor to define how HTTP requests are handled by your application. + /// + /// You must implement this method to return the first controller that will handle an HTTP request. Additional controllers + /// are linked to the first controller to create the entire flow of your application's request handling logic. This method + /// is invoked during startup and controllers cannot be changed after it is invoked. This method is always invoked after + /// [prepare]. + /// + /// In most applications, the first controller is a [Router]. Example: + /// + /// @override + /// Controller get entryPoint { + /// final router = Router(); + /// router.route("/path").link(() => PathController()); + /// return router; + /// } + Controller get entryPoint; + + late ApplicationServer _server; + + /// You override this method to perform initialization tasks. + /// + /// This method allows this instance to perform any initialization (other than setting up the [entryPoint]). This method + /// is often used to set up services that [Controller]s use to fulfill their duties. This method is invoked + /// prior to [entryPoint], so that the services it creates can be injected into [Controller]s. + /// + /// By default, this method does nothing. + Future prepare() async {} + + /// You override this method to perform initialization tasks that occur after [entryPoint] has been established. + /// + /// Override this method to take action just before [entryPoint] starts receiving requests. By default, does nothing. + void willStartReceivingRequests() {} + + /// You override this method to release any resources created in [prepare]. + /// + /// This method is invoked when the owning [Application] is stopped. It closes open ports + /// that this channel was using so that the application can be properly shut down. + /// + /// Prefer to use [ServiceRegistry] instead of overriding this method. + /// + /// If you do override this method, you must call the super implementation. + @mustCallSuper + Future close() async { + logger.fine( + "ApplicationChannel(${server.identifier}).close: closing messageHub", + ); + await messageHub.close(); + } + + /// Creates an OpenAPI document for the components and paths in this channel. + /// + /// This method invokes [entryPoint] and [prepare] before starting the documentation process. + /// + /// The documentation process first invokes [documentComponents] on this channel. Every controller in the channel will have its + /// [documentComponents] methods invoked. Any declared property + /// of this channel that implements [APIComponentDocumenter] will have its [documentComponents] + /// method invoked. If there services that are part of the application, but not stored as properties of this channel, you may override + /// [documentComponents] in your subclass to add them. You must call the superclass' implementation of [documentComponents]. + /// + /// After components have been documented, [APIOperationDocumenter.documentPaths] is invoked on [entryPoint]. The controllers + /// of the channel will add paths and operations to the document during this process. + /// + /// This method should not be overridden. + /// + /// [projectSpec] should contain the keys `name`, `version` and `description`. + Future documentAPI(Map projectSpec) async { + final doc = APIDocument()..components = APIComponents(); + final root = entryPoint; + root.didAddToChannel(); + + final context = APIDocumentContext(doc); + documentComponents(context); + + doc.paths = root.documentPaths(context); + + doc.info = APIInfo( + projectSpec["name"] as String?, + projectSpec["version"] as String?, + description: projectSpec["description"] as String?, + ); + + await context.finalize(); + + return doc; + } + + @mustCallSuper + @override + void documentComponents(APIDocumentContext registry) { + entryPoint.documentComponents(registry); + + (RuntimeContext.current[runtimeType] as ChannelRuntime) + .getDocumentableChannelComponents(this) + .forEach((component) { + component.documentComponents(registry); + }); + } +} + +/// An object that sends and receives messages between [ApplicationChannel]s. +/// +/// You use this object to share information between isolates. Each [ApplicationChannel] has a property of this type. A message sent through this object +/// is received by every other channel through its hub. +/// +/// To receive messages in a hub, add a listener via [listen]. To send messages, use [add]. +/// +/// For example, an application may want to send data to every connected websocket. A reference to each websocket +/// is only known to the isolate it established a connection on. This data must be sent to each isolate so that each websocket +/// connected to that isolate can send the data: +/// +/// router.route("/broadcast").linkFunction((req) async { +/// var message = await req.body.decodeAsString(); +/// websocketsOnThisIsolate.forEach((s) => s.add(message); +/// messageHub.add({"event": "broadcastMessage", "data": message}); +/// return Response.accepted(); +/// }); +/// +/// messageHub.listen((event) { +/// if (event is Map && event["event"] == "broadcastMessage") { +/// websocketsOnThisIsolate.forEach((s) => s.add(event["data"]); +/// } +/// }); +class ApplicationMessageHub extends Stream implements Sink { + final Logger _logger = Logger("conduit"); + final StreamController _outboundController = + StreamController(); + final StreamController _inboundController = + StreamController.broadcast(); + + /// Adds a listener for messages from other hubs. + /// + /// You use this method to add listeners for messages from other hubs. + /// When another hub [add]s a message, this hub will receive it on [onData]. + /// + /// [onError], if provided, will be invoked when this isolate tries to [add] invalid data. Only the isolate + /// that failed to send the data will receive [onError] events. + @override + StreamSubscription listen( + void Function(dynamic event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError = false, + }) => + _inboundController.stream.listen( + onData, + onError: onError ?? + ((err, StackTrace st) => + _logger.severe("ApplicationMessageHub error", err, st)), + onDone: onDone, + cancelOnError: cancelOnError, + ); + + /// Sends a message to all other hubs. + /// + /// [event] will be delivered to all other isolates that have set up a callback for [listen]. + /// + /// [event] must be isolate-safe data - in general, this means it may not be or contain a closure. Consult the API reference `dart:isolate` for more details. If [event] + /// is not isolate-safe data, an error is delivered to [listen] on this isolate. + @override + void add(dynamic event) { + _outboundController.sink.add(event); + } + + @override + Future close() async { + if (!_outboundController.hasListener) { + _outboundController.stream.listen(null); + } + + if (!_inboundController.hasListener) { + _inboundController.stream.listen(null); + } + + await _outboundController.close(); + await _inboundController.close(); + } +} + +abstract class ChannelRuntime { + Iterable getDocumentableChannelComponents( + ApplicationChannel channel, + ); + + Type get channelType; + + String get name; + Uri get libraryUri; + IsolateEntryFunction get isolateEntryPoint; + + ApplicationChannel instantiateChannel(); + + Future runGlobalInitialization(ApplicationOptions config); +} diff --git a/packages/app/lib/src/isolate_application_server.dart b/packages/app/lib/src/isolate_application_server.dart new file mode 100644 index 0000000..2972d24 --- /dev/null +++ b/packages/app/lib/src/isolate_application_server.dart @@ -0,0 +1,101 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:protevus_application/application.dart'; +import 'package:logging/logging.dart'; + +class ApplicationIsolateServer extends ApplicationServer { + ApplicationIsolateServer( + Type channelType, + ApplicationOptions configuration, + int identifier, + this.supervisingApplicationPort, { + bool logToConsole = false, + }) : super(channelType, configuration, identifier) { + if (logToConsole) { + hierarchicalLoggingEnabled = true; + logger.level = Level.ALL; + // ignore: avoid_print + logger.onRecord.listen(print); + } + supervisingReceivePort = ReceivePort(); + supervisingReceivePort.listen(listener); + + logger + .fine("ApplicationIsolateServer($identifier) listening, sending port"); + supervisingApplicationPort.send(supervisingReceivePort.sendPort); + } + + SendPort supervisingApplicationPort; + late ReceivePort supervisingReceivePort; + + @override + Future start({bool shareHttpServer = false}) async { + final result = await super.start(shareHttpServer: shareHttpServer); + logger.fine( + "ApplicationIsolateServer($identifier) started, sending listen message", + ); + supervisingApplicationPort + .send(ApplicationIsolateSupervisor.messageKeyListening); + + return result; + } + + @override + void sendApplicationEvent(dynamic event) { + try { + supervisingApplicationPort.send(MessageHubMessage(event)); + } catch (e, st) { + hubSink?.addError(e, st); + } + } + + void listener(dynamic message) { + if (message == ApplicationIsolateSupervisor.messageKeyStop) { + stop(); + } else if (message is MessageHubMessage) { + hubSink?.add(message.payload); + } + } + + Future stop() async { + supervisingReceivePort.close(); + logger.fine("ApplicationIsolateServer($identifier) closing server"); + await close(); + logger.fine("ApplicationIsolateServer($identifier) did close server"); + logger.clearListeners(); + logger.fine( + "ApplicationIsolateServer($identifier) sending stop acknowledgement", + ); + supervisingApplicationPort + .send(ApplicationIsolateSupervisor.messageKeyStop); + } +} + +typedef IsolateEntryFunction = void Function( + ApplicationInitialServerMessage message, +); + +class ApplicationInitialServerMessage { + ApplicationInitialServerMessage( + this.streamTypeName, + this.streamLibraryURI, + this.configuration, + this.identifier, + this.parentMessagePort, { + this.logToConsole = false, + }); + + String streamTypeName; + Uri streamLibraryURI; + ApplicationOptions configuration; + SendPort parentMessagePort; + int identifier; + bool logToConsole = false; +} + +class MessageHubMessage { + MessageHubMessage(this.payload); + + dynamic payload; +} diff --git a/packages/app/lib/src/isolate_supervisor.dart b/packages/app/lib/src/isolate_supervisor.dart new file mode 100644 index 0000000..27a0343 --- /dev/null +++ b/packages/app/lib/src/isolate_supervisor.dart @@ -0,0 +1,147 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:protevus_application/application.dart'; +import 'package:logging/logging.dart'; + +/// Represents the supervision of a [ApplicationIsolateServer]. +/// +/// You should not use this class directly. +class ApplicationIsolateSupervisor { + /// Create an instance of [ApplicationIsolateSupervisor]. + ApplicationIsolateSupervisor( + this.supervisingApplication, + this.isolate, + this.receivePort, + this.identifier, + this.logger, { + this.startupTimeout = const Duration(seconds: 30), + }); + + /// The [Isolate] being supervised. + final Isolate isolate; + + /// The [ReceivePort] for which messages coming from [isolate] will be received. + final ReceivePort receivePort; + + /// A numeric identifier for the isolate relative to the [Application]. + final int identifier; + + final Duration startupTimeout; + + /// A reference to the owning [Application] + Application supervisingApplication; + + /// A reference to the [Logger] used by the [supervisingApplication]. + Logger logger; + + final List _pendingMessageQueue = []; + + bool get _isLaunching => !_launchCompleter.isCompleted; + late SendPort _serverSendPort; + late Completer _launchCompleter; + Completer? _stopCompleter; + + static const String messageKeyStop = "_MessageStop"; + static const String messageKeyListening = "_MessageListening"; + + /// Resumes the [Isolate] being supervised. + Future resume() { + _launchCompleter = Completer(); + receivePort.listen(listener); + + isolate.setErrorsFatal(false); + isolate.addErrorListener(receivePort.sendPort); + logger.fine( + "ApplicationIsolateSupervisor($identifier).resume will resume isolate", + ); + isolate.resume(isolate.pauseCapability!); + + return _launchCompleter.future.timeout( + startupTimeout, + onTimeout: () { + logger.fine( + "ApplicationIsolateSupervisor($identifier).resume timed out waiting for isolate start", + ); + throw TimeoutException( + "Isolate ($identifier) failed to launch in $startupTimeout seconds. " + "There may be an error with your application or Application.isolateStartupTimeout needs to be increased."); + }, + ); + } + + /// Stops the [Isolate] being supervised. + Future stop() async { + _stopCompleter = Completer(); + logger.fine( + "ApplicationIsolateSupervisor($identifier).stop sending stop to supervised isolate", + ); + _serverSendPort.send(messageKeyStop); + + try { + await _stopCompleter!.future.timeout(const Duration(seconds: 5)); + } on TimeoutException { + logger.severe( + "Isolate ($identifier) not responding to stop message, terminating.", + ); + } finally { + isolate.kill(); + } + + receivePort.close(); + } + + void listener(dynamic message) { + if (message is SendPort) { + _serverSendPort = message; + } else if (message == messageKeyListening) { + _launchCompleter.complete(); + logger.fine( + "ApplicationIsolateSupervisor($identifier) isolate listening acknowledged", + ); + } else if (message == messageKeyStop) { + logger.fine( + "ApplicationIsolateSupervisor($identifier) stop message acknowledged", + ); + receivePort.close(); + + _stopCompleter!.complete(); + _stopCompleter = null; + } else if (message is List) { + logger.fine( + "ApplicationIsolateSupervisor($identifier) received isolate error ${message.first}", + ); + final stacktrace = StackTrace.fromString(message.last as String); + _handleIsolateException(message.first, stacktrace); + } else if (message is MessageHubMessage) { + if (!supervisingApplication.isRunning) { + _pendingMessageQueue.add(message); + } else { + _sendMessageToOtherSupervisors(message); + } + } + } + + void sendPendingMessages() { + final list = List.from(_pendingMessageQueue); + _pendingMessageQueue.clear(); + list.forEach(_sendMessageToOtherSupervisors); + } + + void _sendMessageToOtherSupervisors(MessageHubMessage message) { + supervisingApplication.supervisors + .where((sup) => sup != this) + .forEach((supervisor) { + supervisor._serverSendPort.send(message); + }); + } + + void _handleIsolateException(dynamic error, StackTrace stacktrace) { + if (_isLaunching) { + final appException = ApplicationStartupException(error); + _launchCompleter.completeError(appException, stacktrace); + } else { + logger.severe("Uncaught exception in isolate.", error, stacktrace); + } + } +} diff --git a/packages/app/lib/src/options.dart b/packages/app/lib/src/options.dart new file mode 100644 index 0000000..6520e25 --- /dev/null +++ b/packages/app/lib/src/options.dart @@ -0,0 +1,106 @@ +import 'dart:io'; + +import 'package:args/args.dart'; +import 'package:protevus_application/application.dart'; + +/// An object that contains configuration values for an [Application]. +/// +/// You use this object in an [ApplicationChannel] to manage external configuration data for your application. +class ApplicationOptions { + /// The absolute path of the configuration file for this application. + /// + /// This path is provided when an application is started by the `--config-path` option to `conduit serve`. + /// You may load the file at this path in [ApplicationChannel] to use configuration values. + String? configurationFilePath; + + /// The address to listen for HTTP requests on. + /// + /// By default, this address will default to 'any' address (0.0.0.0). If [isIpv6Only] is true, + /// 'any' will be any IPv6 address, otherwise, it will be any IPv4 or IPv6 address. + /// + /// This value may be an [InternetAddress] or a [String]. + dynamic address; + + /// The port to listen for HTTP requests on. + /// + /// Defaults to 8888. + int port = 8888; + + /// Whether or not the application should only receive connections over IPv6. + /// + /// Defaults to false. This flag impacts the default value of the [address] property. + bool isIpv6Only = false; + + /// Whether or not the application's request controllers should use client-side HTTPS certificates. + /// + /// Defaults to false. + bool isUsingClientCertificate = false; + + /// The path to a SSL certificate. + /// + /// If specified - along with [privateKeyFilePath] - an [Application] will only allow secure connections over HTTPS. + /// This value is often set through the `--ssl-certificate-path` command line option of `conduit serve`. For finer control + /// over how HTTPS is configured for an application, see [ApplicationChannel.securityContext]. + String? certificateFilePath; + + /// The path to a private key. + /// + /// If specified - along with [certificateFilePath] - an [Application] will only allow secure connections over HTTPS. + /// This value is often set through the `--ssl-key-path` command line option of `conduit serve`. For finer control + /// over how HTTPS is configured for an application, see [ApplicationChannel.securityContext]. + String? privateKeyFilePath; + + /// Contextual configuration values for each [ApplicationChannel]. + /// + /// This is a user-specific set of configuration options provided by [ApplicationChannel.initializeApplication]. + /// Each instance of [ApplicationChannel] has access to these values if set. + final Map context = {}; + + static final parser = ArgParser() + ..addOption( + "address", + abbr: "a", + help: "The address to listen on. See HttpServer.bind for more details." + " Using the default will listen on any address.", + ) + ..addOption( + "config-path", + abbr: "c", + help: + "The path to a configuration file. This File is available in the ApplicationOptions " + "for a ApplicationChannel to use to read application-specific configuration values. Relative paths are relative to [directory].", + defaultsTo: "config.yaml", + ) + ..addOption( + "isolates", + abbr: "n", + help: "Number of isolates handling requests.", + ) + ..addOption( + "port", + abbr: "p", + help: "The port number to listen for HTTP requests on.", + defaultsTo: "8888", + ) + ..addFlag( + "ipv6-only", + help: "Limits listening to IPv6 connections only.", + negatable: false, + ) + ..addOption( + "ssl-certificate-path", + help: + "The path to an SSL certicate file. If provided along with --ssl-certificate-path, the application will be HTTPS-enabled.", + ) + ..addOption( + "ssl-key-path", + help: + "The path to an SSL private key file. If provided along with --ssl-certificate-path, the application will be HTTPS-enabled.", + ) + ..addOption( + "timeout", + help: "Number of seconds to wait to ensure startup succeeded.", + defaultsTo: "45", + ) + ..addFlag("help"); +} diff --git a/packages/app/lib/src/starter.dart b/packages/app/lib/src/starter.dart new file mode 100644 index 0000000..6cb23df --- /dev/null +++ b/packages/app/lib/src/starter.dart @@ -0,0 +1,32 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:protevus_application/application.dart'; + +/* + Warning: do not remove. This method is invoked by a generated script. + + */ +Future startApplication( + Application app, + int isolateCount, + SendPort parentPort, +) async { + final port = ReceivePort(); + + port.listen((msg) { + if (msg["command"] == "stop") { + port.close(); + app.stop().then((_) { + parentPort.send({"status": "stopped"}); + }); + } + }); + + if (isolateCount == 0) { + await app.startOnCurrentIsolate(); + } else { + await app.start(numberOfInstances: isolateCount); + } + parentPort.send({"status": "ok", "port": port.sendPort}); +} diff --git a/packages/app/pubspec.yaml b/packages/app/pubspec.yaml new file mode 100644 index 0000000..d588edc --- /dev/null +++ b/packages/app/pubspec.yaml @@ -0,0 +1,23 @@ +name: protevus_application +description: The Application Package for the Protevus Platform +version: 0.0.1 +homepage: https://protevus.com +documentation: https://docs.protevus.com +repository: https://git.protevus.com/protevus/platform + +environment: + sdk: ^3.4.2 + +# Add regular dependencies here. +dependencies: + args: ^2.4.2 + logging: ^1.2.0 + meta: ^1.12.0 + protevus_runtime: ^0.0.1 + protevus_openapi: ^0.0.1 + protevus_http: ^0.0.1 + # path: ^1.8.0 + +dev_dependencies: + lints: ^3.0.0 + test: ^1.24.0 diff --git a/packages/auth/lib/src/.gitkeep b/packages/app/test/.gitkeep similarity index 100% rename from packages/auth/lib/src/.gitkeep rename to packages/app/test/.gitkeep