Updated migration runner

This commit is contained in:
thomashii 2024-01-25 12:14:23 +08:00
parent cc362fb704
commit b9b59523ef
5 changed files with 28 additions and 22 deletions

View file

@ -9,9 +9,15 @@ import 'package:mysql_client/mysql_client.dart';
import 'todo.dart'; import 'todo.dart';
void main(List<String> args) async { void main(List<String> args) async {
Connection conn = await Connection.open(Endpoint(
host: 'localhost',
port: 5432,
database: 'demo',
username: 'demouser',
password: 'demo123'));
var postgresqlMigrationRunner = PostgresMigrationRunner( var postgresqlMigrationRunner = PostgresMigrationRunner(
PostgreSQLConnection('localhost', 5432, 'demo', conn,
username: 'demouser', password: 'demo123'),
migrations: [ migrations: [
UserMigration(), UserMigration(),
TodoMigration(), TodoMigration(),

View file

@ -11,7 +11,7 @@ class PostgresMigrationRunner implements MigrationRunner {
final _log = Logger('PostgresMigrationRunner'); final _log = Logger('PostgresMigrationRunner');
final Map<String, Migration> migrations = {}; final Map<String, Migration> migrations = {};
final PostgreSQLConnection connection; final Connection connection;
final Queue<Migration> _migrationQueue = Queue(); final Queue<Migration> _migrationQueue = Queue();
bool _connected = false; bool _connected = false;
@ -34,7 +34,8 @@ class PostgresMigrationRunner implements MigrationRunner {
} }
if (!_connected) { if (!_connected) {
await connection.open(); //await connection.open();
//Connection.open(_endpoint!, settings: _settings);
_connected = true; _connected = true;
} }
@ -55,7 +56,7 @@ class PostgresMigrationRunner implements MigrationRunner {
@override @override
Future up() async { Future up() async {
await _init(); await _init();
var r = await connection.query('SELECT path from migrations;'); var r = await connection.execute('SELECT path from migrations;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = <String>[]; var toRun = <String>[];
@ -64,7 +65,7 @@ class PostgresMigrationRunner implements MigrationRunner {
}); });
if (toRun.isNotEmpty) { if (toRun.isNotEmpty) {
var r = await connection.query('SELECT MAX(batch) from migrations;'); var r = await connection.execute('SELECT MAX(batch) from migrations;');
var curBatch = (r[0][0] ?? 0) as int; var curBatch = (r[0][0] ?? 0) as int;
var batch = curBatch + 1; var batch = curBatch + 1;
@ -74,16 +75,15 @@ class PostgresMigrationRunner implements MigrationRunner {
migration.up(schema); migration.up(schema);
_log.info('Added "$k" into "migrations" table.'); _log.info('Added "$k" into "migrations" table.');
await schema.run(connection).then((_) { await schema.run(connection).then((_) {
return connection.transaction((ctx) async { return connection.runTx((ctx) async {
var result = await ctx.query( var result = await ctx.execute(
"INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, '$k')"); "INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, '$k')");
return result.affectedRowCount; return result.affectedRows;
}); });
//return connection.execute(
// 'INSERT INTO MIGRATIONS (batch, path) VALUES ($batch, \'$k\');');
}).catchError((e) { }).catchError((e) {
_log.severe('Failed to insert into "migrations" table.'); _log.severe('Failed to insert into "migrations" table.');
return -1;
}); });
} }
} else { } else {
@ -95,11 +95,10 @@ class PostgresMigrationRunner implements MigrationRunner {
Future rollback() async { Future rollback() async {
await _init(); await _init();
PostgreSQLResult r = Result r = await connection.execute('SELECT MAX(batch) from migrations;');
await connection.query('SELECT MAX(batch) from migrations;');
var curBatch = (r[0][0] ?? 0) as int; var curBatch = (r[0][0] ?? 0) as int;
r = await connection r = await connection
.query('SELECT path from migrations WHERE batch = $curBatch;'); .execute('SELECT path from migrations WHERE batch = $curBatch;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = <String>[]; var toRun = <String>[];
@ -127,7 +126,7 @@ class PostgresMigrationRunner implements MigrationRunner {
Future reset() async { Future reset() async {
await _init(); await _init();
var r = await connection var r = await connection
.query('SELECT path from migrations ORDER BY batch DESC;'); .execute('SELECT path from migrations ORDER BY batch DESC;');
var existing = r.expand((x) => x).cast<String>(); var existing = r.expand((x) => x).cast<String>();
var toRun = existing.where(migrations.containsKey).toList(); var toRun = existing.where(migrations.containsKey).toList();

View file

@ -14,18 +14,18 @@ class PostgresSchema extends Schema {
factory PostgresSchema() => PostgresSchema._(StringBuffer(), 0); factory PostgresSchema() => PostgresSchema._(StringBuffer(), 0);
Future<int> run(PostgreSQLConnection connection) async { Future<int> run(Connection connection) async {
//return connection.execute(compile()); //return connection.execute(compile());
var result = await connection.transaction((ctx) async { var result = await connection.runTx((ctx) async {
var sql = compile(); var sql = compile();
var result = await ctx.query(sql).catchError((e) { var result = await ctx.execute(sql).catchError((e) {
_log.severe('Failed to run query: [ $sql ]', e); _log.severe('Failed to run query: [ $sql ]', e);
throw Exception(e); throw Exception(e);
}); });
return result.affectedRowCount; return result.affectedRows;
}); });
return (result is int) ? result : 0; return result;
} }
String compile() => _buf.toString(); String compile() => _buf.toString();

View file

@ -1,5 +1,5 @@
name: angel3_migration_runner name: angel3_migration_runner
version: 8.1.1 version: 8.2.0
description: Command-line based database migration runner for Angel3's ORM. description: Command-line based database migration runner for Angel3's ORM.
homepage: https://angel3-framework.web.app/ homepage: https://angel3-framework.web.app/
repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_migration_runner repository: https://github.com/dart-backend/angel/tree/master/packages/orm/angel_migration_runner
@ -11,7 +11,7 @@ dependencies:
angel3_orm: ^8.0.0 angel3_orm: ^8.0.0
args: ^2.4.0 args: ^2.4.0
charcode: ^1.3.0 charcode: ^1.3.0
postgres: ^2.6.0 postgres: ^3.0.0
mysql_client: ^0.0.27 mysql_client: ^0.0.27
mysql1: ^0.20.0 mysql1: ^0.20.0
logging: ^1.2.0 logging: ^1.2.0

View file

@ -20,6 +20,7 @@ Future<void> closePg(QueryExecutor executor) async {
} }
Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async { Future<PostgreSqlExecutor> connectToPostgres(Iterable<String> schemas) async {
// postgres://kfayrlbi:OAaEE39zOMLEPfH4DDgHbGNVsQtNdHu7@heffalump.db.elephantsql.com/kfayrlbi
var conn = await Connection.open(Endpoint( var conn = await Connection.open(Endpoint(
host: 'localhost', host: 'localhost',
port: 5432, port: 5432,