Merge pull request #120 from dukefirehawk/feature/upgrade_postgres

Feature/upgrade postgres
This commit is contained in:
Thomas 2024-02-18 12:11:13 +08:00 committed by GitHub
commit c2cef6c68e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 212 additions and 259 deletions

View file

@ -12,7 +12,7 @@
"js": [] "js": []
}, },
"editor.codeActionsOnSave": { "editor.codeActionsOnSave": {
"source.fixAll.markdownlint": true "source.fixAll.markdownlint": "explicit"
}, },
"cmake.configureOnOpen": false "cmake.configureOnOpen": false
} }

BIN
logo_icon_512.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

View file

@ -1,5 +1,9 @@
# Change Log # Change Log
## 8.2.0
* [BREAKING] Updated `postgres` to 3.0.0
## 8.1.1 ## 8.1.1
* Updated repository link * Updated repository link

View file

@ -19,10 +19,15 @@ Supported database:
* Use `MySqlMigrationRunner` to perform database migration for MySQL and MariaDB. This runner is using [`mysql_client`](https://pub.dev/packages?q=mysql_client) driver. * Use `MySqlMigrationRunner` to perform database migration for MySQL and MariaDB. This runner is using [`mysql_client`](https://pub.dev/packages?q=mysql_client) driver.
* Use `MariaDbMigrationRunner` to perform database migration for MariaDB. This runner is using[`mysql1`](https://pub.dev/packages?q=mysql1) driver. * Use `MariaDbMigrationRunner` to perform database migration for MariaDB. This runner is using [`mysql1`](https://pub.dev/packages?q=mysql1) driver.
## Supported Operations ## Supported Operations
* reset - Clear out all records in the `migrations` table and drop all the managed ORM tables. * reset - Clear out all records in the `migrations` table and drop all the ORM related tables.
* up - Generate all the managed ORM tables based on the ORM models. * up - Generate all the tables based on the ORM models.
* refresh - Run `reset` follow by `up` * refresh - Run `reset` follow by `up`
## To do
* Update existing tables
* Reverse engineer tables into models

View file

@ -1,17 +1,30 @@
import 'dart:io';
import 'package:angel3_migration/angel3_migration.dart'; import 'package:angel3_migration/angel3_migration.dart';
import 'package:angel3_migration_runner/angel3_migration_runner.dart'; import 'package:angel3_migration_runner/angel3_migration_runner.dart';
import 'package:angel3_migration_runner/postgres.dart'; import 'package:angel3_migration_runner/postgres.dart';
import 'package:angel3_migration_runner/mysql.dart';
import 'package:angel3_orm/angel3_orm.dart'; import 'package:angel3_orm/angel3_orm.dart';
import 'package:postgres/postgres.dart'; import 'package:postgres/postgres.dart';
import 'package:mysql_client/mysql_client.dart';
import 'todo.dart'; import 'todo.dart';
void main(List<String> args) async { void main(List<String> args) async {
var host = Platform.environment['DB_HOST'] ?? 'localhost';
var database = Platform.environment['DB_NAME'] ?? 'demo';
var username = Platform.environment['DB_USERNAME'] ?? 'demouser';
var password = Platform.environment['DB_PASSWORD'] ?? 'demo123';
print("$host $database $username $password");
Connection conn = await Connection.open(Endpoint(
host: host,
port: 5432,
database: database,
username: username,
password: password));
var postgresqlMigrationRunner = PostgresMigrationRunner( var postgresqlMigrationRunner = PostgresMigrationRunner(
PostgreSQLConnection('localhost', 5432, 'demo', conn,
username: 'demouser', password: 'demo123'),
migrations: [ migrations: [
UserMigration(), UserMigration(),
TodoMigration(), TodoMigration(),
@ -19,12 +32,13 @@ void main(List<String> args) async {
], ],
); );
/*
var mySQLConn = await MySQLConnection.createConnection( var mySQLConn = await MySQLConnection.createConnection(
host: "localhost", host: host,
port: 3306, port: 3306,
databaseName: "orm_test", databaseName: database,
userName: "test", userName: username,
password: "Test123*", password: password,
secure: false); secure: false);
// ignore: unused_local_variable // ignore: unused_local_variable
@ -36,6 +50,7 @@ void main(List<String> args) async {
FooMigration(), FooMigration(),
], ],
); );
*/
runMigrations(postgresqlMigrationRunner, args); runMigrations(postgresqlMigrationRunner, args);
} }

View file

@ -11,7 +11,7 @@ class PostgresMigrationRunner implements MigrationRunner {
final _log = Logger('PostgresMigrationRunner'); final _log = Logger('PostgresMigrationRunner');
final Map<String, Migration> migrations = {}; final Map<String, Migration> migrations = {};
final PostgreSQLConnection connection; final Connection connection;
final Queue<Migration> _migrationQueue = Queue(); final Queue<Migration> _migrationQueue = Queue();
bool _connected = false; bool _connected = false;
@ -34,7 +34,8 @@ class PostgresMigrationRunner implements MigrationRunner {
} }
if (!_connected) { if (!_connected) {
await connection.open(); //await connection.open();
//Connection.open(_endpoint!, settings: _settings);
_connected = true; _connected = true;
} }
@ -55,7 +56,7 @@ class PostgresMigrationRunner implements MigrationRunner {
@override @override
Future up() async { Future up() async {
await _init(); await _init();
var r = await connection.query('SELECT path from migrations;'); var r = await connection.execute('SELECT path from migrations;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = <String>[]; var toRun = <String>[];
@ -64,7 +65,7 @@ class PostgresMigrationRunner implements MigrationRunner {
}); });
if (toRun.isNotEmpty) { if (toRun.isNotEmpty) {
var r = await connection.query('SELECT MAX(batch) from migrations;'); var r = await connection.execute('SELECT MAX(batch) from migrations;');
var curBatch = (r[0][0] ?? 0) as int; var curBatch = (r[0][0] ?? 0) as int;
var batch = curBatch + 1; var batch = curBatch + 1;
@ -74,16 +75,15 @@ class PostgresMigrationRunner implements MigrationRunner {
migration.up(schema); migration.up(schema);
_log.info('Added "$k" into "migrations" table.'); _log.info('Added "$k" into "migrations" table.');
await schema.run(connection).then((_) { await schema.run(connection).then((_) {
return connection.transaction((ctx) async { return connection.runTx((ctx) async {
var result = await ctx.query( var result = await ctx.execute(
"INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, '$k')"); "INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, '$k')");
return result.affectedRowCount; return result.affectedRows;
}); });
//return connection.execute(
// 'INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, \'$k\');');
}).catchError((e) { }).catchError((e) {
_log.severe('Failed to insert into "migrations" table.'); _log.severe('Failed to insert into "migrations" table.');
return -1;
}); });
} }
} else { } else {
@ -95,11 +95,10 @@ class PostgresMigrationRunner implements MigrationRunner {
Future rollback() async { Future rollback() async {
await _init(); await _init();
PostgreSQLResult r = Result r = await connection.execute('SELECT MAX(batch) from migrations;');
await connection.query('SELECT MAX(batch) from migrations;');
var curBatch = (r[0][0] ?? 0) as int; var curBatch = (r[0][0] ?? 0) as int;
r = await connection r = await connection
.query('SELECT path from migrations WHERE batch = $curBatch;'); .execute('SELECT path from migrations WHERE batch = $curBatch;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = <String>[]; var toRun = <String>[];
@ -127,7 +126,7 @@ class PostgresMigrationRunner implements MigrationRunner {
Future reset() async { Future reset() async {
await _init(); await _init();
var r = await connection var r = await connection
.query('SELECT path from migrations ORDER BY batch DESC;'); .execute('SELECT path from migrations ORDER BY batch DESC;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = existing.where(migrations.containsKey).toList(); var toRun = existing.where(migrations.containsKey).toList();

View file

@ -14,18 +14,18 @@ class PostgresSchema extends Schema {
factory PostgresSchema() => PostgresSchema._(StringBuffer(), 0); factory PostgresSchema() => PostgresSchema._(StringBuffer(), 0);
Future<int> run(PostgreSQLConnection connection) async { Future<int> run(Connection connection) async {
//return connection.execute(compile()); //return connection.execute(compile());
var result = await connection.transaction((ctx) async { var result = await connection.runTx((ctx) async {
var sql = compile(); var sql = compile();
var result = await ctx.query(sql).catchError((e) { var result = await ctx.execute(sql).catchError((e) {
_log.severe('Failed to run query: [ $sql ]', e); _log.severe('Failed to run query: [ $sql ]', e);
throw Exception(e); throw Exception(e);
}); });
return result.affectedRowCount; return result.affectedRows;
}); });
return (result is int) ? result : 0; return result;
} }
String compile() => _buf.toString(); String compile() => _buf.toString();

View file

@ -1,5 +1,5 @@
name: angel3_migration_runner name: angel3_migration_runner
version: 8.1.1 version: 8.2.0
description: Command-line based database migration runner for Angel3's ORM. description: Command-line based database migration runner for Angel3's ORM.
homepage: https://angel3-framework.web.app/ homepage: https://angel3-framework.web.app/
repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_migration_runner repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_migration_runner
@ -11,7 +11,7 @@ dependencies:
angel3_orm: ^8.0.0 angel3_orm: ^8.0.0
args: ^2.4.0 args: ^2.4.0
charcode: ^1.3.0 charcode: ^1.3.0
postgres: ^2.6.0 postgres: ^3.0.0
mysql_client: ^0.0.27 mysql_client: ^0.0.27
mysql1: ^0.20.0 mysql1: ^0.20.0
logging: ^1.2.0 logging: ^1.2.0

View file

@ -392,7 +392,7 @@ abstract class Query<T, Where extends QueryWhere> extends QueryBase<T> {
throw ArgumentError("Unsupported database dialect."); throw ArgumentError("Unsupported database dialect.");
} }
//_log.warning("Insert Query = $sql"); _log.fine("Insert Query = $sql");
return executor return executor
.query(tableName, sql, substitutionValues, .query(tableName, sql, substitutionValues,

View file

@ -1,13 +1,11 @@
# Change Log # Change Log
## 8.1.1
* Updated repository link
## 8.1.0 ## 8.1.0
* Updated repository links
* Updated `lints` to 3.0.0 * Updated `lints` to 3.0.0
* Fixed linter warnings * Fixed linter warnings
* [BREAKING] Updated `postgres` to 3.0.0
## 8.0.0 ## 8.0.0

View file

@ -12,3 +12,7 @@ PostgreSQL support for Angel3 ORM.
* PostgreSQL version 10 or greater * PostgreSQL version 10 or greater
For documentation about the ORM, see [Developer Guide](https://angel3-docs.dukefirehawk.com/guides/orm) For documentation about the ORM, see [Developer Guide](https://angel3-docs.dukefirehawk.com/guides/orm)
## Migration
Posrgresql is upgraded from 2.x.x to 3.x.x in version 8.1.0.

View file

@ -1,20 +1,19 @@
import 'dart:io'; import 'dart:io';
import 'package:angel3_orm_postgres/angel3_orm_postgres.dart'; import 'package:angel3_orm_postgres/angel3_orm_postgres.dart';
import 'package:postgres_pool/postgres_pool.dart'; import 'package:postgres/postgres.dart';
void main() async { void main() async {
var executor = PostgreSqlPoolExecutor(PgPool( var executor = PostgreSqlPoolExecutor(Pool.withEndpoints([
PgEndpoint( Endpoint(
host: 'localhost', host: Platform.environment['POSTGRES_HOSTNAME'] ?? 'localhost',
port: 5432, port: 5432,
database: Platform.environment['POSTGRES_DB'] ?? 'orm_test', database: Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test', username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123', password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123',
), )
settings: PgPoolSettings() ],
..maxConnectionAge = Duration(hours: 1) settings: PoolSettings(
..concurrency = 5, maxConnectionAge: Duration(hours: 1), maxConnectionCount: 5)));
));
var rows = await executor.query('users', 'SELECT * FROM users', {}); var rows = await executor.query('users', 'SELECT * FROM users', {});
print(rows); print(rows);

View file

@ -2,40 +2,46 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:angel3_orm/angel3_orm.dart'; import 'package:angel3_orm/angel3_orm.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:postgres/postgres.dart'; import 'package:postgres/postgres.dart';
/// A [QueryExecutor] that queries a PostgreSQL database. /// A [QueryExecutor] that queries a PostgreSQL database.
class PostgreSqlExecutor extends QueryExecutor { class PostgreSqlExecutor extends QueryExecutor {
PostgreSQLExecutionContext _connection; final Dialect _dialect = const PostgreSQLDialect();
ConnectionSettings? _settings;
Endpoint? _endpoint;
Session _session;
/// An optional [Logger] to print information to. A default logger will be used /// An optional [Logger] to print information to. A default logger will be used
/// if not set /// if not set
late Logger logger; late Logger logger;
PostgreSqlExecutor(this._connection, {Logger? logger}) {
this.logger = logger ?? Logger('PostgreSqlExecutor');
}
final Dialect _dialect = const PostgreSQLDialect();
@override @override
Dialect get dialect => _dialect; Dialect get dialect => _dialect;
/// The underlying connection. /// The underlying database session.
PostgreSQLExecutionContext get connection => _connection; Session get session => _session;
PostgreSqlExecutor(this._session,
{Endpoint? endpoint, ConnectionSettings? settings, Logger? logger}) {
this.logger = logger ?? Logger('PostgreSqlExecutor');
_settings = settings;
_endpoint = endpoint;
}
/// Closes the connection. /// Closes the connection.
Future close() { Future<void> close() async {
if (_connection is PostgreSQLConnection) { if (_session is Connection) {
return (_connection as PostgreSQLConnection).close(); await (_session as Connection).close();
} else {
return Future.value();
} }
return Future.value();
} }
@override @override
Future<PostgreSQLResult> query( Future<Result> query(
String tableName, String query, Map<String, dynamic> substitutionValues, String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) { {String returningQuery = '', List<String> returningFields = const []}) {
if (returningFields.isNotEmpty) { if (returningFields.isNotEmpty) {
@ -44,8 +50,8 @@ class PostgreSqlExecutor extends QueryExecutor {
query = '$query $returning'; query = '$query $returning';
} }
//logger.fine('Query: $query'); logger.fine('Query: $query');
//logger.fine('Values: $substitutionValues'); logger.fine('Values: $substitutionValues');
// Convert List into String // Convert List into String
var param = <String, dynamic>{}; var param = <String, dynamic>{};
@ -57,28 +63,24 @@ class PostgreSqlExecutor extends QueryExecutor {
} }
}); });
return _connection return _session
.query(query, substitutionValues: param) .execute(Sql.named(query), parameters: param)
.catchError((err) async { .catchError((err) async {
logger.warning(err); logger.warning(err);
if (err is PostgreSQLException) { if (err is PgException) {
// This is a hack to detect broken db connection // This is a hack to detect broken db connection
bool brokenConnection = bool brokenConnection = err.message.contains("connection is not open");
err.message?.contains("connection is not open") ?? false;
if (brokenConnection) { if (brokenConnection) {
if (_connection is PostgreSQLConnection) { // Open a new db session
// Open a new db connection if (_session is Connection) {
var currentConnection = _connection as PostgreSQLConnection; (_session as Connection).close();
currentConnection.close();
logger.warning( logger.warning(
"A broken database connection is detected. Creating a new database connection."); "A broken database connection is detected. Creating a new database connection.");
var conn = _createNewConnection(currentConnection); _session = await _createNewSession();
await conn.open();
_connection = conn;
// Retry the query with the new db connection // Retry the query with the new db connection
return _connection.query(query, substitutionValues: param); return _session.execute(Sql.named(query), parameters: param);
} }
} }
} }
@ -86,129 +88,36 @@ class PostgreSqlExecutor extends QueryExecutor {
}); });
} }
// Create a new database connection from an existing connection // Create a new database connection
PostgreSQLConnection _createNewConnection(PostgreSQLConnection conn) { Future<Session> _createNewSession() async {
return PostgreSQLConnection(conn.host, conn.port, conn.databaseName, if (_endpoint != null) {
username: conn.username, return await Connection.open(_endpoint!, settings: _settings);
password: conn.password, }
useSSL: conn.useSSL,
timeZone: conn.timeZone, throw PgException("Unable to create new connection");
timeoutInSeconds: conn.timeoutInSeconds);
} }
@override @override
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async { Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async {
if (_connection is! PostgreSQLConnection) { //if (_connection is! PostgreSQLConnection) {
return await f(this); // return await f(this);
} //}
var conn = _connection as PostgreSQLConnection; var conn = _session as Connection;
T? returnValue;
var txResult = await conn.transaction((ctx) async { return await conn.runTx((session) async {
try { try {
//logger.fine('Entering transaction'); //logger.fine('Entering transaction');
var tx = PostgreSqlExecutor(ctx, logger: logger); var exec = PostgreSqlExecutor(session, logger: logger);
returnValue = await f(tx); return await f(exec);
return returnValue;
} catch (e) { } catch (e) {
ctx.cancelTransaction(reason: e.toString()); session.rollback();
//ctx.cancelTransaction(reason: e.toString());
logger.warning("The transation has failed due to ", e);
rethrow; rethrow;
} }
}); }).onError((error, stackTrace) {
throw StateError('The transaction was cancelled.');
if (txResult is PostgreSQLRollback) {
//if (txResult.reason == null) {
// throw StateError('The transaction was cancelled.');
//} else {
throw StateError(
'The transaction was cancelled with reason "${txResult.reason}".');
//}
} else {
return returnValue!;
}
}
}
/// A [QueryExecutor] that manages a pool of PostgreSQL connections.
class PostgreSqlExecutorPool extends QueryExecutor {
/// The maximum amount of concurrent connections.
final int size;
/// Creates a new [PostgreSQLConnection], on demand.
///
/// The created connection should **not** be open.
final PostgreSQLConnection Function() connectionFactory;
/// An optional [Logger] to print information to.
late Logger logger;
final List<PostgreSqlExecutor> _connections = [];
int _index = 0;
final Pool _pool, _connMutex = Pool(1);
PostgreSqlExecutorPool(this.size, this.connectionFactory, {Logger? logger})
: _pool = Pool(size) {
if (logger != null) {
this.logger = logger;
} else {
this.logger = Logger('PostgreSqlExecutorPool');
}
assert(size > 0, 'Connection pool cannot be empty.');
}
final Dialect _dialect = const PostgreSQLDialect();
@override
Dialect get dialect => _dialect;
/// Closes all connections.
Future close() async {
await _pool.close();
await _connMutex.close();
return Future.wait(_connections.map((c) => c.close()));
}
Future _open() async {
if (_connections.isEmpty) {
_connections.addAll(await Future.wait(List.generate(size, (_) async {
//logger.fine('Spawning connections...');
var conn = connectionFactory();
await conn.open();
//return conn
// .open()
// .then((_) => PostgreSqlExecutor(conn, logger: logger));
return PostgreSqlExecutor(conn, logger: logger);
})));
}
}
Future<PostgreSqlExecutor> _next() {
return _connMutex.withResource(() async {
await _open();
if (_index >= size) _index = 0;
return _connections[_index++];
});
}
@override
Future<PostgreSQLResult> query(
String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) {
return _pool.withResource(() async {
var executor = await _next();
return executor.query(tableName, query, substitutionValues,
returningFields: returningFields);
});
}
@override
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) {
return _pool.withResource(() async {
var executor = await _next();
return executor.transaction(f);
}); });
} }
} }

View file

@ -2,13 +2,13 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:angel3_orm/angel3_orm.dart'; import 'package:angel3_orm/angel3_orm.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:postgres_pool/postgres_pool.dart'; import 'package:postgres/postgres.dart';
import '../angel3_orm_postgres.dart'; import '../angel3_orm_postgres.dart';
/// A [QueryExecutor] that uses `package:postgres_pool` for connetions pooling. /// A [QueryExecutor] that uses `package:postgres_pool` for connetions pooling.
class PostgreSqlPoolExecutor extends QueryExecutor { class PostgreSqlPoolExecutor extends QueryExecutor {
final PgPool _pool; final Pool _pool;
/// An optional [Logger] to print information to. /// An optional [Logger] to print information to.
late Logger logger; late Logger logger;
@ -23,7 +23,7 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
Dialect get dialect => _dialect; Dialect get dialect => _dialect;
/// The underlying connection pooling. /// The underlying connection pooling.
PgPool get pool => _pool; Pool get pool => _pool;
/// Closes all the connections in the pool. /// Closes all the connections in the pool.
Future<dynamic> close() { Future<dynamic> close() {
@ -32,7 +32,7 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
/// Run query. /// Run query.
@override @override
Future<PostgreSQLResult> query( Future<Result> query(
String tableName, String query, Map<String, dynamic> substitutionValues, String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) { {String returningQuery = '', List<String> returningFields = const []}) {
if (returningFields.isNotEmpty) { if (returningFields.isNotEmpty) {
@ -54,16 +54,16 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
} }
}); });
return _pool.run<PostgreSQLResult>((pgContext) async { return _pool.run<Result>((session) async {
return await pgContext.query(query, substitutionValues: param); return await session.execute(Sql.named(query), parameters: param);
}); });
} }
/// Run query in a transaction. /// Run query in a transaction.
@override @override
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async { Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async {
return _pool.runTx((pgContext) async { return _pool.runTx((session) async {
var exec = PostgreSqlExecutor(pgContext, logger: logger); var exec = PostgreSqlExecutor(session, logger: logger);
return await f(exec); return await f(exec);
}); });
} }

View file

@ -1,35 +1,33 @@
name: angel3_orm_postgres name: angel3_orm_postgres
version: 8.1.1 version: 8.1.0
description: PostgreSQL support for Angel3 ORM. Includes functionality for querying and transactions. description: PostgreSQL support for Angel3 ORM. Includes functionality for querying and transactions.
homepage: https://angel3-framework.web.app/ homepage: https://angel3-framework.web.app/
repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_orm_postgres repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_orm_postgres
publish_to: none
environment: environment:
sdk: '>=3.0.0 <4.0.0' sdk: '>=3.0.0 <4.0.0'
dependencies: dependencies:
angel3_orm: ^8.0.0 angel3_orm: ^8.0.0
logging: ^1.2.0 logging: ^1.2.0
pool: ^1.5.0 pool: ^1.5.0
postgres: ^2.6.1 postgres: ^3.0.0
postgres_pool: ^2.1.6
dev_dependencies: dev_dependencies:
belatuk_pretty_logging: ^6.1.0 belatuk_pretty_logging: ^6.1.0
angel3_orm_generator: ^8.0.0 angel3_orm_generator: ^8.0.0
angel3_orm_test: ^8.0.0 angel3_orm_test: ^8.0.0
test: ^1.24.0 test: ^1.24.0
lints: ^3.0.0 lints: ^3.0.0
# dependency_overrides: dependency_overrides:
# angel3_serialize: # angel3_serialize:
# path: ../../serialize/angel_serialize # path: ../../serialize/angel_serialize
# angel3_serialize_generator: # angel3_serialize_generator:
# path: ../../serialize/angel_serialize_generator # path: ../../serialize/angel_serialize_generator
# angel3_model: # angel3_model:
# path: ../../model # path: ../../model
# angel3_orm_test: angel3_orm_test:
# path: ../angel_orm_test path: ../angel_orm_test
# angel3_orm_generator: # angel3_orm_generator:
# path: ../angel_orm_generator # path: ../angel_orm_generator
# angel3_orm: angel3_orm:
# path: ../angel_orm path: ../angel_orm
# angel3_migration: angel3_migration:
# path: ../angel_migration path: ../angel_migration

View file

@ -3,15 +3,12 @@ import 'dart:io';
import 'package:angel3_orm/angel3_orm.dart'; import 'package:angel3_orm/angel3_orm.dart';
import 'package:angel3_orm_postgres/angel3_orm_postgres.dart'; import 'package:angel3_orm_postgres/angel3_orm_postgres.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:postgres_pool/postgres_pool.dart'; import 'package:postgres/postgres.dart';
FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) { FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) {
// Use single connection // Use single connection
return () => connectToPostgres(schemas); return () => connectToPostgres(schemas);
// Use connection pooling with 1 connection
//return () => connectToPostgresPool(schemas);
// Use PostgreSqlExecutorPool (Not working) // Use PostgreSqlExecutorPool (Not working)
//return () => connectToPostgresPool1(schemas); //return () => connectToPostgresPool1(schemas);
} }
@ -19,69 +16,87 @@ FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) {
Future<void> closePg(QueryExecutor executor) async { Future<void> closePg(QueryExecutor executor) async {
if (executor is PostgreSqlExecutor) { if (executor is PostgreSqlExecutor) {
await executor.close(); await executor.close();
//} else if (executor is PostgreSqlExecutorPool) {
// await executor.close();
} else if (executor is PostgreSqlPoolExecutor) {
await executor.close();
} }
} }
Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async { Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async {
var conn = PostgreSQLConnection( var host = Platform.environment['POSTGRES_HOST'] ?? 'localhost';
'localhost', 5432, Platform.environment['POSTGRES_DB'] ?? 'orm_test', var database = Platform.environment['POSTGRES_DB'] ?? 'orm_test';
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test', var username = Platform.environment['POSTGRES_USERNAME'] ?? 'test';
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123'); var password = Platform.environment['POSTGRES_PASSWORD'] ?? 'test123';
await conn.open();
var conn = await Connection.open(
Endpoint(
host: host,
port: 5432,
database: database,
username: username,
password: password),
settings: ConnectionSettings(sslMode: SslMode.disable));
// Run sql to create the tables // Run sql to create the tables
for (var s in schemas) { for (var s in schemas) {
await conn.execute(await File('test/migrations/$s.sql').readAsString()); var rawQueryString = await File('test/migrations/$s.sql').readAsString();
print("Raw SQL Query: $rawQueryString");
//await conn.execute(queryString);
// Split the queries and execute them
var stringLen = rawQueryString.length;
var index = 0;
while (index < stringLen) {
index = rawQueryString.indexOf(";");
if (index < 0) {
break;
}
var query = rawQueryString.substring(0, index);
print("SQL Query: $query;");
await conn.execute("$query;");
index++;
if (index < stringLen) {
var tempString = rawQueryString.substring(index).trim();
rawQueryString = tempString;
stringLen = rawQueryString.length;
index = 0;
}
}
/*
var queryString = rawQueryString.replaceAll("\n", " ");
print("Raw Query: $queryString");
var queries = queryString.split(';');
for (var rawQuery in queries) {
var query = rawQuery.trim();
if (query.isNotEmpty) {
print("SQL Query: $query;");
await conn.execute("$query;");
}
}
*/
} }
return PostgreSqlExecutor(conn, logger: Logger.root); return PostgreSqlExecutor(conn, logger: Logger.root);
} }
Future<PostgreSqlExecutorPool> connectToPostgresPool1(
Iterable<String> schemas) async {
PostgreSQLConnection connectionFactory() {
return PostgreSQLConnection(
'localhost', 5432, Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123');
}
PostgreSQLConnection conn = connectionFactory();
await conn.open();
// Run sql to create the tables
for (var s in schemas) {
await conn.execute(await File('test/migrations/$s.sql').readAsString());
}
return PostgreSqlExecutorPool(5, connectionFactory, logger: Logger.root);
}
Future<PostgreSqlPoolExecutor> connectToPostgresPool( Future<PostgreSqlPoolExecutor> connectToPostgresPool(
Iterable<String> schemas) async { Iterable<String> schemas) async {
var dbPool = PgPool( var dbPool = Pool.withEndpoints([
PgEndpoint( Endpoint(
host: 'localhost', host: 'localhost',
port: 5432, port: 5432,
database: Platform.environment['POSTGRES_DB'] ?? 'orm_test', database: Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test', username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123', password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123',
), )
settings: PgPoolSettings() ],
..maxConnectionAge = Duration(hours: 1) settings: PoolSettings(
..concurrency = 200, maxConnectionAge: Duration(hours: 1), maxConnectionCount: 5));
);
// Run sql to create the tables in a transaction // Run sql to create the tables in a transaction
//await _pool.runTx((conn) async { await dbPool.runTx((conn) async {
// for (var s in schemas) { for (var s in schemas) {
// await conn.execute(await File('test/migrations/$s.sql').readAsString()); await conn.execute(await File('test/migrations/$s.sql').readAsString());
// } }
//}); });
return PostgreSqlPoolExecutor(dbPool, logger: Logger.root); return PostgreSqlPoolExecutor(dbPool, logger: Logger.root);
} }

View file

@ -10,8 +10,11 @@ void customExprTests(FutureOr<QueryExecutor> Function() createExecutor,
close ??= (_) => null; close ??= (_) => null;
var hasExecutor = false;
setUp(() async { setUp(() async {
executor = await createExecutor(); executor = await createExecutor();
hasExecutor = true;
var now = DateTime.now(); var now = DateTime.now();
var nQuery = NumbersQuery(); var nQuery = NumbersQuery();
@ -24,7 +27,11 @@ void customExprTests(FutureOr<QueryExecutor> Function() createExecutor,
}); });
}); });
tearDown(() => close!(executor)); tearDown(() {
if (hasExecutor && close != null) {
close(executor);
}
});
test('fetches correct result', () async { test('fetches correct result', () async {
expect(numbersModel.two, 2); expect(numbersModel.two, 2);

View file

@ -19,7 +19,7 @@ dev_dependencies:
angel3_framework: ^8.0.0 angel3_framework: ^8.0.0
build_runner: ^2.4.0 build_runner: ^2.4.0
lints: ^3.0.0 lints: ^3.0.0
# dependency_overrides: dependency_overrides:
# angel3_container: # angel3_container:
# path: ../../container/angel_container # path: ../../container/angel_container
# angel3_framework: # angel3_framework:
@ -36,8 +36,8 @@ dev_dependencies:
# path: ../../serialize/angel_serialize # path: ../../serialize/angel_serialize
# angel3_serialize_generator: # angel3_serialize_generator:
# path: ../../serialize/angel_serialize_generator # path: ../../serialize/angel_serialize_generator
# angel3_orm: angel3_orm:
# path: ../angel_orm path: ../angel_orm
# angel3_migration: # angel3_migration:
# path: ../angel_migration # path: ../angel_migration
# angel3_orm_generator: # angel3_orm_generator: