feat: Initial commit
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/target
|
||||
2236
Cargo.lock
generated
Normal file
2236
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
16
Cargo.toml
Normal file
16
Cargo.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "pg-instance-handler"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
regex = "1.12.2"
|
||||
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio-rustls"] }
|
||||
tokio = { version = "1.49.0", features = ["rt-multi-thread"] }
|
||||
tracing = "0.1.44"
|
||||
tracing-subscriber = "0.3.22"
|
||||
|
||||
[dev-dependencies]
|
||||
mockall = "0.14.0"
|
||||
rstest = "0.26.1"
|
||||
tempfile = "3"
|
||||
1
database.txt
Normal file
1
database.txt
Normal file
@@ -0,0 +1 @@
|
||||
hello
|
||||
13
docker-compose.yml
Normal file
13
docker-compose.yml
Normal file
@@ -0,0 +1,13 @@
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:15
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
ports:
|
||||
- "5432:5432"
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "pg_isready -U postgres"]
|
||||
interval: 5s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
69
src/actors/driven/database_file_retriever.rs
Normal file
69
src/actors/driven/database_file_retriever.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{BufRead, BufReader},
|
||||
};
|
||||
|
||||
use crate::app::database_port::ForGettingDatabasesWantedState;
|
||||
pub struct DatabaseFileRetriever {
|
||||
file_path: String,
|
||||
}
|
||||
|
||||
impl DatabaseFileRetriever {
|
||||
pub fn new(file_path: String) -> Self {
|
||||
DatabaseFileRetriever { file_path }
|
||||
}
|
||||
}
|
||||
|
||||
impl ForGettingDatabasesWantedState for DatabaseFileRetriever {
|
||||
fn get_wanted_databases(&self) -> Result<Vec<String>, String> {
|
||||
let file = File::open(&self.file_path)
|
||||
.map_err(|e| format!("failed to open '{}': {}", self.file_path, e))?;
|
||||
let reader = BufReader::new(file);
|
||||
let mut databases = Vec::new();
|
||||
for line in reader.lines() {
|
||||
let line = line.map_err(|e| format!("failed to read line: {}", e))?;
|
||||
databases.push(line);
|
||||
}
|
||||
Ok(databases)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rstest::rstest;
|
||||
use std::io::Write;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[rstest]
|
||||
#[case::two_lines(Some(vec!["db1", "db2"]), Some(vec!["db1".to_string(), "db2".to_string()]))]
|
||||
#[case::empty(Some(vec![]), Some(vec![]))]
|
||||
#[case::missing(None, None)]
|
||||
fn get_wanted_databases_parametrized(
|
||||
#[case] lines: Option<Vec<&str>>,
|
||||
#[case] expected: Option<Vec<String>>,
|
||||
) {
|
||||
let dir = tempdir().unwrap();
|
||||
let file_path = dir.path().join("dbs.txt");
|
||||
|
||||
if let Some(lines) = &lines {
|
||||
let mut file = File::create(&file_path).unwrap();
|
||||
for l in lines {
|
||||
writeln!(file, "{}", l).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let retriever = DatabaseFileRetriever::new(file_path.to_string_lossy().into_owned());
|
||||
let result = retriever.get_wanted_databases();
|
||||
|
||||
match expected {
|
||||
Some(expected_vec) => {
|
||||
let got = result.unwrap();
|
||||
assert_eq!(got, expected_vec);
|
||||
}
|
||||
None => {
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
156
src/actors/driven/sqlx_handler.rs
Normal file
156
src/actors/driven/sqlx_handler.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
use crate::app::database_port::ForManagingDatabases;
|
||||
use regex::Regex;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use tracing::{debug, info};
|
||||
|
||||
pub struct SQLXHandler {
|
||||
database_url: String,
|
||||
}
|
||||
|
||||
impl SQLXHandler {
|
||||
pub fn new(database_url: String) -> Self {
|
||||
SQLXHandler { database_url }
|
||||
}
|
||||
|
||||
fn make_runtime() -> Result<tokio::runtime::Runtime, String> {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.map_err(|e| format!("failed to build runtime: {}", e))
|
||||
}
|
||||
|
||||
fn sanitize_identifier(name: &str) -> Result<String, String> {
|
||||
// Restrict to simple PostgreSQL identifiers for safety.
|
||||
// Allows letters, numbers, and underscore; must start with a letter or underscore.
|
||||
let re = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").map_err(|e| e.to_string())?;
|
||||
if re.is_match(name) {
|
||||
Ok(name.to_string())
|
||||
} else {
|
||||
Err(format!("invalid database name: '{}'", name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ForManagingDatabases for SQLXHandler {
|
||||
fn list_databases(&self) -> Result<Vec<String>, String> {
|
||||
info!("Listing PostgreSQL databases");
|
||||
let rt = Self::make_runtime()?;
|
||||
rt.block_on(async {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&self.database_url)
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect: {}", e))?;
|
||||
|
||||
let dbs: Vec<String> = sqlx::query_scalar(
|
||||
r#"SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname"#,
|
||||
)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|e| format!("list databases failed: {}", e))?;
|
||||
|
||||
debug!(?dbs, "Got databases");
|
||||
Ok(dbs)
|
||||
})
|
||||
}
|
||||
|
||||
fn create_database(&self, name: &str) -> Result<(), String> {
|
||||
let ident = Self::sanitize_identifier(name)?;
|
||||
info!(database = %ident, "Creating database");
|
||||
let rt = Self::make_runtime()?;
|
||||
rt.block_on(async {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&self.database_url)
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect: {}", e))?;
|
||||
|
||||
let sql = format!("CREATE DATABASE {}", ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.map_err(|e| format!("create database failed: {}", e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn delete_database(&self, name: &str) -> Result<(), String> {
|
||||
let ident = Self::sanitize_identifier(name)?;
|
||||
info!(database = %ident, "Deleting database");
|
||||
let rt = Self::make_runtime()?;
|
||||
rt.block_on(async {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&self.database_url)
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect: {}", e))?;
|
||||
|
||||
// Terminate connections before dropping (best effort)
|
||||
let terminate_sql = r#"
|
||||
SELECT pg_terminate_backend(pid)
|
||||
FROM pg_stat_activity
|
||||
WHERE datname = $1 AND pid <> pg_backend_pid()
|
||||
"#;
|
||||
let _ = sqlx::query(terminate_sql).bind(&ident).execute(&pool).await;
|
||||
|
||||
let sql = format!("DROP DATABASE IF EXISTS {}", ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.map_err(|e| format!("drop database failed: {}", e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn sanitize_identifier_allows_valid() {
|
||||
for ok in ["db", "_db", "db_123", "A_b1"] {
|
||||
assert!(
|
||||
SQLXHandler::sanitize_identifier(ok).is_ok(),
|
||||
"{} should be valid",
|
||||
ok
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sanitize_identifier_rejects_invalid() {
|
||||
for bad in ["", "1db", "db-1", "db name", "db;drop", "db$", "db."] {
|
||||
assert!(
|
||||
SQLXHandler::sanitize_identifier(bad).is_err(),
|
||||
"{} should be invalid",
|
||||
bad
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Optional smoke test against a real Postgres instance.
|
||||
// Set TEST_ADMIN_DATABASE_URL to something like:
|
||||
// postgres://postgres:postgres@localhost/postgres
|
||||
// Run with: cargo test -- --ignored
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn postgres_smoke_create_list_delete() {
|
||||
let url = std::env::var("TEST_ADMIN_DATABASE_URL")
|
||||
.expect("set TEST_ADMIN_DATABASE_URL to an admin database URL");
|
||||
let handler = SQLXHandler::new(url);
|
||||
|
||||
let db_name = format!("testdb_{}", std::process::id());
|
||||
|
||||
// Ensure clean slate (ignore errors)
|
||||
let _ = handler.delete_database(&db_name);
|
||||
|
||||
handler.create_database(&db_name).expect("create ok");
|
||||
let list = handler.list_databases().expect("list ok");
|
||||
assert!(list.contains(&db_name));
|
||||
|
||||
handler.delete_database(&db_name).expect("drop ok");
|
||||
let list2 = handler.list_databases().expect("list ok");
|
||||
assert!(!list2.contains(&db_name));
|
||||
}
|
||||
}
|
||||
11
src/app/database_port.rs
Normal file
11
src/app/database_port.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
pub trait ForGettingDatabasesWantedState {
|
||||
fn get_wanted_databases(&self) -> Result<Vec<String>, String>;
|
||||
}
|
||||
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
pub trait ForManagingDatabases {
|
||||
fn list_databases(&self) -> Result<Vec<String>, String>;
|
||||
fn create_database(&self, name: &str) -> Result<(), String>;
|
||||
fn delete_database(&self, name: &str) -> Result<(), String>;
|
||||
}
|
||||
126
src/app/database_service.rs
Normal file
126
src/app/database_service.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use tracing::info;
|
||||
|
||||
use crate::app::database_port::{ForGettingDatabasesWantedState, ForManagingDatabases};
|
||||
|
||||
pub struct DatabaseService<X, Y>
|
||||
where
|
||||
X: ForManagingDatabases,
|
||||
Y: ForGettingDatabasesWantedState,
|
||||
{
|
||||
database_handler: X,
|
||||
wanted_state_handler: Y,
|
||||
}
|
||||
|
||||
impl<X, Y> DatabaseService<X, Y>
|
||||
where
|
||||
X: ForManagingDatabases,
|
||||
Y: ForGettingDatabasesWantedState,
|
||||
{
|
||||
pub fn new(database_handler: X, wanted_state_handler: Y) -> Self {
|
||||
DatabaseService {
|
||||
database_handler,
|
||||
wanted_state_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reconcile_databases(&self) -> Result<(), String> {
|
||||
let wanted_databases = self.wanted_state_handler.get_wanted_databases()?;
|
||||
let existing_databases = self.database_handler.list_databases()?;
|
||||
|
||||
info!(
|
||||
"Reconciling databases. Wanted: {:?}, Existing: {:?}",
|
||||
wanted_databases, existing_databases
|
||||
);
|
||||
|
||||
for db in &wanted_databases {
|
||||
if !existing_databases.contains(db) {
|
||||
self.database_handler.create_database(db)?;
|
||||
}
|
||||
}
|
||||
|
||||
for db in &existing_databases {
|
||||
if !wanted_databases.contains(db) && db != "postgres" {
|
||||
self.database_handler.delete_database(db)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::app::database_port::{MockForGettingDatabasesWantedState, MockForManagingDatabases};
|
||||
use mockall::predicate::*;
|
||||
use rstest::rstest;
|
||||
|
||||
#[rstest]
|
||||
// No databases exist and none are wanted
|
||||
#[case::no_existing_no_wanted(Ok(vec![]), Ok(vec![]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// No databases exist but one is wanted
|
||||
#[case::no_existing_one_wanted(Ok(vec![]), Ok(vec!["db1".to_string()]), vec!["db1".to_string()], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// A database exists and is wanted
|
||||
#[case::exists_and_wanted(Ok(vec!["db1".to_string()]), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// One database exists and another is wanted
|
||||
#[case::different_existing_vs_wanted(Ok(vec!["db1".to_string()]), Ok(vec!["db2".to_string()]), vec!["db2".to_string()], Ok(()), vec!["db1".to_string()], Ok(()), Ok(()))]
|
||||
// A database exists but is not wanted
|
||||
#[case::existing_not_wanted(Ok(vec!["db1".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string()], Ok(()), Ok(()))]
|
||||
// Multiple databases exist and none are wanted
|
||||
#[case::multiple_existing_none_wanted(Ok(vec!["db1".to_string(), "db2".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string(), "db2".to_string()], Ok(()), Ok(()))]
|
||||
// No databases exist but multiple are wanted
|
||||
#[case::no_existing_multiple_wanted(Ok(vec![]), Ok(vec!["db1".to_string(), "db2".to_string()]), vec!["db1".to_string(), "db2".to_string()], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// Multiple databases exist and some are wanted
|
||||
#[case::multiple_existing_some_wanted(Ok(vec!["db1".to_string(), "db2".to_string(), "db3".to_string()]), Ok(vec!["db2".to_string(), "db4".to_string()]), vec!["db4".to_string()], Ok(()), vec!["db1".to_string(), "db3".to_string()], Ok(()), Ok(()))]
|
||||
// Error retrieving existing databases
|
||||
#[case::error_list_databases(Err("DB Error".to_string()), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Err("DB Error".to_string()))]
|
||||
// Error retrieving wanted databases
|
||||
#[case::error_wanted_databases(Ok(vec!["db1".to_string()]), Err("Wanted State Error".to_string()), vec![], Ok(()), vec![], Ok(()), Err("Wanted State Error".to_string()))]
|
||||
// Error creating a database
|
||||
#[case::error_create_database(Ok(vec![]), Ok(vec!["db1".to_string()]), vec!["db1".to_string()], Err("Creation Error".to_string()), vec![], Ok(()), Err("Creation Error".to_string()))]
|
||||
// Error deleting a database
|
||||
#[case::error_delete_database(Ok(vec!["db1".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string()], Err("Deletion Error".to_string()), Err("Deletion Error".to_string()))]
|
||||
// If database name is "postgres", delete should not be called
|
||||
#[case::dont_delete_postgres(Ok(vec!["postgres".to_string(), "db1".to_string()]), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
fn test_reconcile_databases(
|
||||
#[case] existing_dbs_result: Result<Vec<String>, String>,
|
||||
#[case] wanted_dbs_result: Result<Vec<String>, String>,
|
||||
#[case] expected_database_creation_param: Vec<String>,
|
||||
#[case] expected_database_creation_result: Result<(), String>,
|
||||
#[case] expected_database_deletion_param: Vec<String>,
|
||||
#[case] expected_database_deletion_result: Result<(), String>,
|
||||
#[case] expected_result: Result<(), String>,
|
||||
) {
|
||||
let mut mock_db_handler = MockForManagingDatabases::new();
|
||||
let mut mock_wanted_state_handler = MockForGettingDatabasesWantedState::new();
|
||||
|
||||
mock_wanted_state_handler
|
||||
.expect_get_wanted_databases()
|
||||
.returning(move || wanted_dbs_result.clone());
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_databases()
|
||||
.returning(move || existing_dbs_result.clone());
|
||||
|
||||
for db_name in expected_database_creation_param.clone() {
|
||||
let db_name_clone = db_name.clone();
|
||||
let expected_database_creation_result = expected_database_creation_result.clone();
|
||||
mock_db_handler
|
||||
.expect_create_database()
|
||||
.with(eq(db_name_clone))
|
||||
.returning(move |_| expected_database_creation_result.clone());
|
||||
}
|
||||
|
||||
for db_name in expected_database_deletion_param.clone() {
|
||||
let db_name_clone = db_name.clone();
|
||||
let expected_database_deletion_result = expected_database_deletion_result.clone();
|
||||
mock_db_handler
|
||||
.expect_delete_database()
|
||||
.with(eq(db_name_clone))
|
||||
.returning(move |_| expected_database_deletion_result.clone());
|
||||
}
|
||||
|
||||
let service = DatabaseService::new(mock_db_handler, mock_wanted_state_handler);
|
||||
let result = service.reconcile_databases();
|
||||
assert_eq!(result, expected_result);
|
||||
}
|
||||
}
|
||||
25
src/main.rs
Normal file
25
src/main.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
mod app {
|
||||
pub mod database_port;
|
||||
pub mod database_service;
|
||||
}
|
||||
|
||||
mod actors {
|
||||
pub mod driven {
|
||||
pub mod database_file_retriever;
|
||||
pub mod sqlx_handler;
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/postgres".to_string());
|
||||
let database_handler = actors::driven::sqlx_handler::SQLXHandler::new(database_url);
|
||||
let file_retriever = actors::driven::database_file_retriever::DatabaseFileRetriever::new(
|
||||
"database.txt".to_string(),
|
||||
);
|
||||
|
||||
let service = app::database_service::DatabaseService::new(database_handler, file_retriever);
|
||||
service.reconcile_databases().unwrap();
|
||||
}
|
||||
Reference in New Issue
Block a user