Updated to postgres 3
This commit is contained in:
7 changed files with 63 additions and 162 deletions
@ -12,7 +12,7 @@
"js": []
"editor.codeActionsOnSave": {
"source.fixAll.markdownlint": true
"source.fixAll.markdownlint": "explicit"
"cmake.configureOnOpen": false
Normal file
Normal file
Binary file not shown.
After ![]() (image error) Size: 23 KiB |
@ -1,11 +1,9 @@
# Change Log
## 8.1.1
* Updated repository link
## 8.1.0
* Updated repository links
* Updated `postgres` to 3.0.0
* Updated `lints` to 3.0.0
* Fixed linter warnings
@ -1,6 +1,5 @@
import 'dart:io';
import 'package:angel3_orm_postgres/angel3_orm_postgres.dart';
import 'package:postgres_pool/postgres_pool.dart';
void main() async {
var executor = PostgreSqlPoolExecutor(PgPool(
@ -2,40 +2,46 @@ import 'dart:async';
import 'dart:convert';
import 'package:angel3_orm/angel3_orm.dart';
import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:postgres/postgres.dart';
/// A [QueryExecutor] that queries a PostgreSQL database.
class PostgreSqlExecutor extends QueryExecutor {
PostgreSQLExecutionContext _connection;
final Dialect _dialect = const PostgreSQLDialect();
ConnectionSettings? _settings;
Endpoint? _endpoint;
Session _session;
/// An optional [Logger] to print information to. A default logger will be used
/// if not set
late Logger logger;
PostgreSqlExecutor(this._connection, {Logger? logger}) {
this.logger = logger ?? Logger('PostgreSqlExecutor');
final Dialect _dialect = const PostgreSQLDialect();
Dialect get dialect => _dialect;
/// The underlying connection.
PostgreSQLExecutionContext get connection => _connection;
/// The underlying database session.
Session get session => _session;
{Endpoint? endpoint, ConnectionSettings? settings, Logger? logger}) {
this.logger = logger ?? Logger('PostgreSqlExecutor');
_settings = settings;
_endpoint = endpoint;
/// Closes the connection.
Future close() {
if (_connection is PostgreSQLConnection) {
return (_connection as PostgreSQLConnection).close();
} else {
return Future.value();
Future<void> close() async {
if (_session is Connection) {
await (_session as Connection).close();
return Future.value();
Future<PostgreSQLResult> query(
Future<Result> query(
String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) {
if (returningFields.isNotEmpty) {
@ -44,8 +50,8 @@ class PostgreSqlExecutor extends QueryExecutor {
query = '$query $returning';
//logger.fine('Query: $query');
//logger.fine('Values: $substitutionValues');
logger.fine('Query: $query');
logger.fine('Values: $substitutionValues');
// Convert List into String
var param = <String, dynamic>{};
@ -57,28 +63,24 @@ class PostgreSqlExecutor extends QueryExecutor {
return _connection
.query(query, substitutionValues: param)
return _session
.execute(Sql.named(query), parameters: param)
.catchError((err) async {
if (err is PostgreSQLException) {
if (err is PgException) {
// This is a hack to detect broken db connection
bool brokenConnection =
err.message?.contains("connection is not open") ?? false;
bool brokenConnection = err.message.contains("connection is not open");
if (brokenConnection) {
if (_connection is PostgreSQLConnection) {
// Open a new db connection
var currentConnection = _connection as PostgreSQLConnection;
// Open a new db session
if (_session is Connection) {
(_session as Connection).close();
"A broken database connection is detected. Creating a new database connection.");
var conn = _createNewConnection(currentConnection);
await conn.open();
_connection = conn;
_session = await _createNewSession();
// Retry the query with the new db connection
return _connection.query(query, substitutionValues: param);
return _session.execute(Sql.named(query), parameters: param);
@ -86,129 +88,36 @@ class PostgreSqlExecutor extends QueryExecutor {
// Create a new database connection from an existing connection
PostgreSQLConnection _createNewConnection(PostgreSQLConnection conn) {
return PostgreSQLConnection(conn.host, conn.port, conn.databaseName,
username: conn.username,
password: conn.password,
useSSL: conn.useSSL,
timeZone: conn.timeZone,
timeoutInSeconds: conn.timeoutInSeconds);
// Create a new database connection
Future<Session> _createNewSession() async {
if (_endpoint != null) {
return await Connection.open(_endpoint!, settings: _settings);
throw PgException("Unable to create new connection");
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) async {
if (_connection is! PostgreSQLConnection) {
return await f(this);
//if (_connection is! PostgreSQLConnection) {
// return await f(this);
var conn = _connection as PostgreSQLConnection;
T? returnValue;
var conn = _session as Connection;
var txResult = await conn.transaction((ctx) async {
return await conn.runTx((ctx) async {
try {
//logger.fine('Entering transaction');
var tx = PostgreSqlExecutor(ctx, logger: logger);
returnValue = await f(tx);
return returnValue;
return await f(tx);
} catch (e) {
ctx.cancelTransaction(reason: e.toString());
//ctx.cancelTransaction(reason: e.toString());
logger.warning("The transation has failed due to ", e);
if (txResult is PostgreSQLRollback) {
//if (txResult.reason == null) {
// throw StateError('The transaction was cancelled.');
//} else {
throw StateError(
'The transaction was cancelled with reason "${txResult.reason}".');
} else {
return returnValue!;
/// A [QueryExecutor] that manages a pool of PostgreSQL connections.
class PostgreSqlExecutorPool extends QueryExecutor {
/// The maximum amount of concurrent connections.
final int size;
/// Creates a new [PostgreSQLConnection], on demand.
/// The created connection should **not** be open.
final PostgreSQLConnection Function() connectionFactory;
/// An optional [Logger] to print information to.
late Logger logger;
final List<PostgreSqlExecutor> _connections = [];
int _index = 0;
final Pool _pool, _connMutex = Pool(1);
PostgreSqlExecutorPool(this.size, this.connectionFactory, {Logger? logger})
: _pool = Pool(size) {
if (logger != null) {
this.logger = logger;
} else {
this.logger = Logger('PostgreSqlExecutorPool');
assert(size > 0, 'Connection pool cannot be empty.');
final Dialect _dialect = const PostgreSQLDialect();
Dialect get dialect => _dialect;
/// Closes all connections.
Future close() async {
await _pool.close();
await _connMutex.close();
return Future.wait(_connections.map((c) => c.close()));
Future _open() async {
if (_connections.isEmpty) {
_connections.addAll(await Future.wait(List.generate(size, (_) async {
//logger.fine('Spawning connections...');
var conn = connectionFactory();
await conn.open();
//return conn
// .open()
// .then((_) => PostgreSqlExecutor(conn, logger: logger));
return PostgreSqlExecutor(conn, logger: logger);
Future<PostgreSqlExecutor> _next() {
return _connMutex.withResource(() async {
await _open();
if (_index >= size) _index = 0;
return _connections[_index++];
Future<PostgreSQLResult> query(
String tableName, String query, Map<String, dynamic> substitutionValues,
{String returningQuery = '', List<String> returningFields = const []}) {
return _pool.withResource(() async {
var executor = await _next();
return executor.query(tableName, query, substitutionValues,
returningFields: returningFields);
Future<T> transaction<T>(FutureOr<T> Function(QueryExecutor) f) {
return _pool.withResource(() async {
var executor = await _next();
return executor.transaction(f);
}).onError((error, stackTrace) {
throw StateError('The transaction was cancelled.');
@ -1,17 +1,15 @@
name: angel3_orm_postgres
version: 8.1.1
version: 8.1.0
description: PostgreSQL support for Angel3 ORM. Includes functionality for querying and transactions.
homepage: https://angel3-framework.web.app/
repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_orm_postgres
publish_to: none
sdk: '>=3.0.0 <4.0.0'
angel3_orm: ^8.0.0
logging: ^1.2.0
pool: ^1.5.0
postgres: ^2.6.1
postgres_pool: ^2.1.6
postgres: ^3.0.0
belatuk_pretty_logging: ^6.1.0
angel3_orm_generator: ^8.0.0
@ -3,7 +3,7 @@ import 'dart:io';
import 'package:angel3_orm/angel3_orm.dart';
import 'package:angel3_orm_postgres/angel3_orm_postgres.dart';
import 'package:logging/logging.dart';
import 'package:postgres_pool/postgres_pool.dart';
import 'package:postgres/postgres.dart';
FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) {
// Use single connection
@ -19,19 +19,16 @@ FutureOr<QueryExecutor> Function() pg(Iterable<String> schemas) {
Future<void> closePg(QueryExecutor executor) async {
if (executor is PostgreSqlExecutor) {
await executor.close();
//} else if (executor is PostgreSqlExecutorPool) {
// await executor.close();
} else if (executor is PostgreSqlPoolExecutor) {
await executor.close();
Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async {
var conn = PostgreSQLConnection(
'localhost', 5432, Platform.environment['POSTGRES_DB'] ?? 'orm_test',
var conn = await Connection.open(Endpoint(
host: 'localhost',
port: 5432,
database: Platform.environment['POSTGRES_DB'] ?? 'orm_test',
username: Platform.environment['POSTGRES_USERNAME'] ?? 'test',
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123');
await conn.open();
password: Platform.environment['POSTGRES_PASSWORD'] ?? 'test123'));
// Run sql to create the tables
for (var s in schemas) {
Reference in a new issue