Updated postgresql pool

This commit is contained in:
thomashii 2024-01-13 17:50:20 +08:00
parent ea528abc3d
commit cc362fb704
6 changed files with 35 additions and 55 deletions

View file

@ -3,9 +3,9 @@
## 8.1.0 ## 8.1.0
* Updated repository links * Updated repository links
* Updated `postgres` to 3.0.0
* Updated `lints` to 3.0.0 * Updated `lints` to 3.0.0
* Fixed linter warnings * Fixed linter warnings
* [BREAKING] Updated `postgres` to 3.0.0
## 8.0.0 ## 8.0.0

View file

@ -12,3 +12,7 @@ PostgreSQL support for Angel3 ORM.
* PostgreSQL version 10 or greater * PostgreSQL version 10 or greater
For documentation about the ORM, see [Developer Guide](https://angel3-docs.dukefirehawk.com/guides/orm) For documentation about the ORM, see [Developer Guide](https://angel3-docs.dukefirehawk.com/guides/orm)
## Migration
Posrgresql is upgraded from 2.x.x to 3.x.x in version 8.1.0.

View file

@ -1,19 +1,19 @@
import 'dart:io'; import 'dart:io';
import 'package:angel3_orm_postgres/angel3_orm_postgres.dart'; import 'package:angel3_orm_postgres/angel3_orm_postgres.dart';
import 'package:postgres/postgres.dart';
void main() async { void main() async {
var executor = PostgreSqlPoolExecutor(PgPool( var executor = PostgreSqlPoolExecutor(Pool.withEndpoints([
PgEndpoint( Endpoint(
host: 'localhost', host: 'localhost',
port: 5432, port: 5432,
database: Platform.environment['POSTGRES_DB'] ?? 'orm_test', database: Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test', username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123', password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123',
), )
settings: PgPoolSettings() ],
..maxConnectionAge = Duration(hours: 1) settings: PoolSettings(
..concurrency = 5, maxConnectionAge: Duration(hours: 1), maxConnectionCount: 5)));
));
var rows = await executor.query('users', 'SELECT * FROM users', {}); var rows = await executor.query('users', 'SELECT * FROM users', {});
print(rows); print(rows);

View file

@ -105,13 +105,13 @@ class PostgreSqlExecutor extends QueryExecutor {
var conn = _session as Connection; var conn = _session as Connection;
return await conn.runTx((ctx) async { return await conn.runTx((session) async {
try { try {
//logger.fine('Entering transaction'); //logger.fine('Entering transaction');
var tx = PostgreSqlExecutor(ctx, logger: logger); var exec = PostgreSqlExecutor(session, logger: logger);
return await f(tx); return await f(exec);
} catch (e) { } catch (e) {
ctx.rollback(); session.rollback();
//ctx.cancelTransaction(reason: e.toString()); //ctx.cancelTransaction(reason: e.toString());
logger.warning("The transation has failed due to ", e); logger.warning("The transation has failed due to ", e);
rethrow; rethrow;

View file

@ -2,13 +2,13 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:angel3_orm/angel3_orm.dart'; import 'package:angel3_orm/angel3_orm.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:postgres_pool/postgres_pool.dart'; import 'package:postgres/postgres.dart';
import '../angel3_orm_postgres.dart'; import '../angel3_orm_postgres.dart';
/// A [QueryExecutor] that uses `package:postgres_pool` for connetions pooling. /// A [QueryExecutor] that uses `package:postgres_pool` for connetions pooling.
class PostgreSqlPoolExecutor extends QueryExecutor { class PostgreSqlPoolExecutor extends QueryExecutor {
final PgPool _pool; final Pool _pool;
/// An optional [Logger] to print information to. /// An optional [Logger] to print information to.
late Logger logger; late Logger logger;
@ -23,7 +23,7 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
Dialect get dialect => _dialect; Dialect get dialect => _dialect;
/// The underlying connection pooling. /// The underlying connection pooling.
PgPool get pool => _pool; Pool get pool => _pool;
/// Closes all the connections in the pool. /// Closes all the connections in the pool.
Future<dynamic> close() { Future<dynamic> close() {
@ -32,7 +32,7 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
/// Run query. /// Run query.
@override @override
Future<PostgreSQLResult> query( Future<Result> query(
String tableName, String query, Map<String, dynamic> substitutionValues, String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) { {String returningQuery = '', List<String> returningFields = const []}) {
if (returningFields.isNotEmpty) { if (returningFields.isNotEmpty) {
@ -54,16 +54,16 @@ class PostgreSqlPoolExecutor extends QueryExecutor {
} }
}); });
return _pool.run<PostgreSQLResult>((pgContext) async { return _pool.run<Result>((session) async {
return await pgContext.query(query, substitutionValues: param); return await session.execute(Sql.named(query), parameters: param);
}); });
} }
/// Run query in a transaction. /// Run query in a transaction.
@override @override
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async { Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async {
return _pool.runTx((pgContext) async { return _pool.runTx((session) async {
var exec = PostgreSqlExecutor(pgContext, logger: logger); var exec = PostgreSqlExecutor(session, logger: logger);
return await f(exec); return await f(exec);
}); });
} }

View file

@ -9,9 +9,6 @@ FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) {
// Use single connection // Use single connection
return () => connectToPostgres(schemas); return () => connectToPostgres(schemas);
// Use connection pooling with 1 connection
//return () => connectToPostgresPool(schemas);
// Use PostgreSqlExecutorPool (Not working) // Use PostgreSqlExecutorPool (Not working)
//return () => connectToPostgresPool1(schemas); //return () => connectToPostgresPool1(schemas);
} }
@ -38,47 +35,26 @@ Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async {
return PostgreSqlExecutor(conn, logger: Logger.root); return PostgreSqlExecutor(conn, logger: Logger.root);
} }
Future<PostgreSqlExecutorPool> connectToPostgresPool1(
Iterable<String> schemas) async {
PostgreSQLConnection connectionFactory() {
return PostgreSQLConnection(
'localhost', 5432, Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123');
}
PostgreSQLConnection conn = connectionFactory();
await conn.open();
// Run sql to create the tables
for (var s in schemas) {
await conn.execute(await File('test/migrations/$s.sql').readAsString());
}
return PostgreSqlExecutorPool(5, connectionFactory, logger: Logger.root);
}
Future<PostgreSqlPoolExecutor> connectToPostgresPool( Future<PostgreSqlPoolExecutor> connectToPostgresPool(
Iterable<String> schemas) async { Iterable<String> schemas) async {
var dbPool = PgPool( var dbPool = Pool.withEndpoints([
PgEndpoint( Endpoint(
host: 'localhost', host: 'localhost',
port: 5432, port: 5432,
database: Platform.environment['POSTGRES_DB'] ?? 'orm_test', database: Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test', username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123', password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123',
), )
settings: PgPoolSettings() ],
..maxConnectionAge = Duration(hours: 1) settings: PoolSettings(
..concurrency = 200, maxConnectionAge: Duration(hours: 1), maxConnectionCount: 5));
);
// Run sql to create the tables in a transaction // Run sql to create the tables in a transaction
//await _pool.runTx((conn) async { await dbPool.runTx((conn) async {
// for (var s in schemas) { for (var s in schemas) {
// await conn.execute(await File('test/migrations/$s.sql').readAsString()); await conn.execute(await File('test/migrations/$s.sql').readAsString());
// } }
//}); });
return PostgreSqlPoolExecutor(dbPool, logger: Logger.root); return PostgreSqlPoolExecutor(dbPool, logger: Logger.root);
} }