feat: Initial commit
This commit is contained in:
172
src/actors/driven/database_file_retriever.rs
Normal file
172
src/actors/driven/database_file_retriever.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
use crate::app::database::ports::driven::database_port::{
|
||||
ForGettingDatabasesWantedState, GrantDbAccess, WantedState,
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde::Deserialize;
|
||||
|
||||
pub struct DatabaseFileRetriever {
|
||||
file_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||
pub struct DbSpec {
|
||||
pub name: String,
|
||||
pub user: String,
|
||||
pub password: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DesiredState {
|
||||
pub databases: Vec<DbSpec>,
|
||||
}
|
||||
|
||||
impl DatabaseFileRetriever {
|
||||
pub fn new(file_path: String) -> Self {
|
||||
DatabaseFileRetriever { file_path }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ForGettingDatabasesWantedState for DatabaseFileRetriever {
|
||||
async fn get_wanted_state(&self) -> Result<WantedState, String> {
|
||||
let state = load_desired_state(&self.file_path)?;
|
||||
|
||||
let mut databases: Vec<String> = Vec::new();
|
||||
let mut users: Vec<String> = Vec::new();
|
||||
let mut user_passwords: std::collections::BTreeMap<String, String> =
|
||||
std::collections::BTreeMap::new();
|
||||
let mut database_owners: std::collections::BTreeMap<String, String> =
|
||||
std::collections::BTreeMap::new();
|
||||
let mut grants: Vec<GrantDbAccess> = Vec::new();
|
||||
|
||||
let ident_re = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").map_err(|e| e.to_string())?;
|
||||
|
||||
for db in state.databases {
|
||||
if !ident_re.is_match(&db.name) {
|
||||
return Err(format!("invalid database name: '{}'", db.name));
|
||||
}
|
||||
if !ident_re.is_match(&db.user) {
|
||||
return Err(format!("invalid user name: '{}'", db.user));
|
||||
}
|
||||
|
||||
if !databases.contains(&db.name) {
|
||||
databases.push(db.name.clone());
|
||||
}
|
||||
if !users.contains(&db.user) {
|
||||
users.push(db.user.clone());
|
||||
}
|
||||
|
||||
match user_passwords.get(&db.user) {
|
||||
Some(existing) if existing != &db.password => {
|
||||
return Err(format!(
|
||||
"conflicting passwords provided for user '{}'",
|
||||
db.user
|
||||
));
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
user_passwords.insert(db.user.clone(), db.password.clone());
|
||||
}
|
||||
}
|
||||
|
||||
database_owners.insert(db.name.clone(), db.user.clone());
|
||||
|
||||
grants.push(GrantDbAccess {
|
||||
user: db.user,
|
||||
database: db.name,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(WantedState {
|
||||
databases,
|
||||
users,
|
||||
user_passwords,
|
||||
database_owners,
|
||||
grants,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_desired_state(path: &str) -> Result<DesiredState, String> {
|
||||
let content =
|
||||
std::fs::read_to_string(path).map_err(|e| format!("failed to read '{}': {}", path, e))?;
|
||||
let state: DesiredState = serde_yaml::from_str(&content)
|
||||
.map_err(|e| format!("failed to parse YAML '{}': {}", path, e))?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[test]
|
||||
fn wanted_state_keeps_passwords_and_owners() {
|
||||
let tmp = tempfile::NamedTempFile::new().unwrap();
|
||||
std::fs::write(
|
||||
tmp.path(),
|
||||
r#"databases:
|
||||
- name: hello
|
||||
user: hello_user
|
||||
password: secret
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let retriever = DatabaseFileRetriever::new(tmp.path().to_string_lossy().to_string());
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let wanted = rt.block_on(retriever.get_wanted_state()).unwrap();
|
||||
|
||||
assert_eq!(wanted.databases, vec!["hello".to_string()]);
|
||||
assert_eq!(wanted.users, vec!["hello_user".to_string()]);
|
||||
|
||||
let mut expected_pw = BTreeMap::new();
|
||||
expected_pw.insert("hello_user".to_string(), "secret".to_string());
|
||||
assert_eq!(wanted.user_passwords, expected_pw);
|
||||
|
||||
let mut expected_owners = BTreeMap::new();
|
||||
expected_owners.insert("hello".to_string(), "hello_user".to_string());
|
||||
assert_eq!(wanted.database_owners, expected_owners);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conflicting_passwords_for_same_user_fails() {
|
||||
let tmp = tempfile::NamedTempFile::new().unwrap();
|
||||
std::fs::write(
|
||||
tmp.path(),
|
||||
r#"databases:
|
||||
- name: db1
|
||||
user: u1
|
||||
password: p1
|
||||
- name: db2
|
||||
user: u1
|
||||
password: p2
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let retriever = DatabaseFileRetriever::new(tmp.path().to_string_lossy().to_string());
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let err = rt.block_on(retriever.get_wanted_state()).unwrap_err();
|
||||
assert!(err.contains("conflicting passwords"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_identifiers_fail_fast() {
|
||||
let tmp = tempfile::NamedTempFile::new().unwrap();
|
||||
std::fs::write(
|
||||
tmp.path(),
|
||||
r#"databases:
|
||||
- name: "bad-name"
|
||||
user: ok_user
|
||||
password: secret
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let retriever = DatabaseFileRetriever::new(tmp.path().to_string_lossy().to_string());
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let err = rt.block_on(retriever.get_wanted_state()).unwrap_err();
|
||||
assert!(err.contains("invalid database name"));
|
||||
}
|
||||
}
|
||||
251
src/actors/driven/sqlx_handler.rs
Normal file
251
src/actors/driven/sqlx_handler.rs
Normal file
@@ -0,0 +1,251 @@
|
||||
use regex::Regex;
|
||||
use sqlx::PgPool;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::app::database::ports::driven::database_port::{ForManagingDatabases, GrantDbAccess};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SQLXHandler {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl SQLXHandler {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
SQLXHandler { pool }
|
||||
}
|
||||
|
||||
fn sanitize_identifier(name: &str) -> Result<String, String> {
|
||||
// Restrict to simple PostgreSQL identifiers for safety.
|
||||
// Allows letters, numbers, and underscore; must start with a letter or underscore.
|
||||
let re = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").map_err(|e| e.to_string())?;
|
||||
if re.is_match(name) {
|
||||
Ok(name.to_string())
|
||||
} else {
|
||||
Err(format!("invalid database name: '{}'", name))
|
||||
}
|
||||
}
|
||||
|
||||
async fn user_exists(&self, user_ident: &str) -> Result<bool, String> {
|
||||
let exists: Option<bool> =
|
||||
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname = $1)")
|
||||
.bind(user_ident)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("role exists check failed: {}", e))?;
|
||||
Ok(exists.unwrap_or(false))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ForManagingDatabases for SQLXHandler {
|
||||
async fn list_databases(&self) -> Result<Vec<String>, String> {
|
||||
info!("Listing PostgreSQL databases");
|
||||
let dbs: Vec<String> = sqlx::query_scalar(
|
||||
r#"SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname"#,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("list databases failed: {}", e))?;
|
||||
|
||||
debug!(?dbs, "Got databases");
|
||||
Ok(dbs)
|
||||
}
|
||||
|
||||
async fn create_database(&self, name: &str) -> Result<(), String> {
|
||||
let ident = Self::sanitize_identifier(name)?;
|
||||
info!(database = %ident, "Creating database");
|
||||
let sql = format!("CREATE DATABASE {}", ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("create database failed: {}", e))?;
|
||||
|
||||
info!(database = %ident, "Database created");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_database(&self, name: &str) -> Result<(), String> {
|
||||
let ident = Self::sanitize_identifier(name)?;
|
||||
info!(database = %ident, "Deleting database");
|
||||
// Terminate connections before dropping (best effort)
|
||||
let terminate_sql = r#"
|
||||
SELECT pg_terminate_backend(pid)
|
||||
FROM pg_stat_activity
|
||||
WHERE datname = $1 AND pid <> pg_backend_pid()
|
||||
"#;
|
||||
let _ = sqlx::query(terminate_sql)
|
||||
.bind(&ident)
|
||||
.execute(&self.pool)
|
||||
.await;
|
||||
|
||||
let sql = format!("DROP DATABASE IF EXISTS {}", ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("drop database failed: {}", e))?;
|
||||
|
||||
info!(database = %ident, "Database deleted");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_users(&self) -> Result<Vec<String>, String> {
|
||||
info!("Listing PostgreSQL users");
|
||||
let users: Vec<String> = sqlx::query_scalar(
|
||||
r#"SELECT rolname FROM pg_roles WHERE rolcanlogin = true ORDER BY rolname"#,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("list users failed: {}", e))?;
|
||||
|
||||
debug!(?users, "Got users");
|
||||
Ok(users)
|
||||
}
|
||||
|
||||
async fn list_grants(&self) -> Result<Vec<GrantDbAccess>, String> {
|
||||
info!("Listing explicit database grants");
|
||||
// We only return explicit CONNECT grants from database ACLs (datacl).
|
||||
// This avoids exploding results when PUBLIC has CONNECT.
|
||||
let rows: Vec<(String, Option<String>)> = sqlx::query_as(
|
||||
r#"SELECT datname, array_to_string(datacl, ',') AS acl
|
||||
FROM pg_database
|
||||
WHERE datistemplate = false
|
||||
ORDER BY datname"#,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("list grants failed: {}", e))?;
|
||||
|
||||
let mut grants: Vec<GrantDbAccess> = Vec::new();
|
||||
for (db_name, acl_opt) in rows {
|
||||
let Some(acl) = acl_opt else {
|
||||
continue;
|
||||
};
|
||||
for entry in acl.split(',') {
|
||||
// Entry format: grantee=privs/grantor
|
||||
// Example: hello_user=CTc/postgres
|
||||
let (lhs, rhs) = match entry.split_once('=') {
|
||||
Some(v) => v,
|
||||
None => continue,
|
||||
};
|
||||
let grantee = lhs.trim();
|
||||
if grantee.is_empty() {
|
||||
// PUBLIC
|
||||
continue;
|
||||
}
|
||||
let privs = rhs.split('/').next().unwrap_or("");
|
||||
if privs.contains('c') {
|
||||
grants.push(GrantDbAccess {
|
||||
user: grantee.to_string(),
|
||||
database: db_name.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(?grants, "Got grants");
|
||||
Ok(grants)
|
||||
}
|
||||
|
||||
async fn ensure_user(&self, user: &str, password: &str) -> Result<(), String> {
|
||||
let user_ident = Self::sanitize_identifier(user)?;
|
||||
info!(user = %user_ident, "Ensuring user (password + login)");
|
||||
|
||||
let quoted_password: String = sqlx::query_scalar("SELECT quote_literal($1)")
|
||||
.bind(password)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("failed to quote password: {}", e))?;
|
||||
|
||||
let exists = self.user_exists(&user_ident).await?;
|
||||
if exists {
|
||||
let sql = format!(
|
||||
"ALTER ROLE {} WITH LOGIN PASSWORD {} NOSUPERUSER NOCREATEDB NOCREATEROLE NOREPLICATION",
|
||||
user_ident, quoted_password
|
||||
);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("alter role failed: {}", e))?;
|
||||
} else {
|
||||
let sql = format!(
|
||||
"CREATE ROLE {} WITH LOGIN PASSWORD {} NOSUPERUSER NOCREATEDB NOCREATEROLE NOREPLICATION",
|
||||
user_ident, quoted_password
|
||||
);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("create role failed: {}", e))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_user(&self, name: &str) -> Result<(), String> {
|
||||
let user_ident = Self::sanitize_identifier(name)?;
|
||||
info!(user = %user_ident, "Deleting user");
|
||||
// Best-effort cleanup to avoid DROP ROLE failures.
|
||||
let reassign = format!("REASSIGN OWNED BY {} TO postgres", user_ident);
|
||||
let _ = sqlx::query(&reassign).execute(&self.pool).await;
|
||||
|
||||
let drop_owned = format!("DROP OWNED BY {}", user_ident);
|
||||
let _ = sqlx::query(&drop_owned).execute(&self.pool).await;
|
||||
|
||||
let sql = format!("DROP ROLE IF EXISTS {}", user_ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("delete user failed: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn revoke_public_connect(&self, database: &str) -> Result<(), String> {
|
||||
let db_ident = Self::sanitize_identifier(database)?;
|
||||
info!(database = %db_ident, "Revoking CONNECT from PUBLIC");
|
||||
let revoke = format!("REVOKE CONNECT ON DATABASE {} FROM PUBLIC", db_ident);
|
||||
sqlx::query(&revoke)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("revoke public connect failed: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_db_owner(&self, database: &str, owner: &str) -> Result<(), String> {
|
||||
let db_ident = Self::sanitize_identifier(database)?;
|
||||
let owner_ident = Self::sanitize_identifier(owner)?;
|
||||
info!(database = %db_ident, owner = %owner_ident, "Ensuring database owner");
|
||||
let sql = format!("ALTER DATABASE {} OWNER TO {}", db_ident, owner_ident);
|
||||
sqlx::query(&sql)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("alter owner failed: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn grant_access(&self, user: &str, database: &str) -> Result<(), String> {
|
||||
let db_ident = Self::sanitize_identifier(database)?;
|
||||
let user_ident = Self::sanitize_identifier(user)?;
|
||||
info!(user = %user_ident, database = %db_ident, "Granting access");
|
||||
let grant = format!("GRANT CONNECT ON DATABASE {} TO {}", db_ident, user_ident);
|
||||
sqlx::query(&grant)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("grant access failed: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn revoke_access(&self, user: &str, database: &str) -> Result<(), String> {
|
||||
let db_ident = Self::sanitize_identifier(database)?;
|
||||
let user_ident = Self::sanitize_identifier(user)?;
|
||||
info!(user = %user_ident, database = %db_ident, "Revoking access");
|
||||
let revoke = format!(
|
||||
"REVOKE CONNECT ON DATABASE {} FROM {}",
|
||||
db_ident, user_ident
|
||||
);
|
||||
sqlx::query(&revoke)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| format!("revoke access failed: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
410
src/app/database/database_service.rs
Normal file
410
src/app/database/database_service.rs
Normal file
@@ -0,0 +1,410 @@
|
||||
use tracing::info;
|
||||
|
||||
use crate::app::database::ports::driven::database_port::{
|
||||
ForGettingDatabasesWantedState, ForManagingDatabases,
|
||||
};
|
||||
|
||||
pub struct DatabaseService<X, Y>
|
||||
where
|
||||
X: ForManagingDatabases,
|
||||
Y: ForGettingDatabasesWantedState,
|
||||
{
|
||||
database_handler: X,
|
||||
wanted_state_handler: Y,
|
||||
}
|
||||
|
||||
impl<X, Y> DatabaseService<X, Y>
|
||||
where
|
||||
X: ForManagingDatabases,
|
||||
Y: ForGettingDatabasesWantedState,
|
||||
{
|
||||
pub fn new(database_handler: X, wanted_state_handler: Y) -> Self {
|
||||
DatabaseService {
|
||||
database_handler,
|
||||
wanted_state_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reconcile_databases(&self) -> Result<(), String> {
|
||||
let wanted_state = self.wanted_state_handler.get_wanted_state().await?;
|
||||
let existing_databases = self.database_handler.list_databases().await?;
|
||||
let existing_users = self.database_handler.list_users().await?;
|
||||
let existing_grants = self.database_handler.list_grants().await?;
|
||||
|
||||
info!(
|
||||
"Reconciling databases. Wanted: {:?}, Existing: {:?}",
|
||||
wanted_state.databases, existing_databases
|
||||
);
|
||||
|
||||
for db in &wanted_state.databases {
|
||||
if !existing_databases.contains(db) {
|
||||
self.database_handler.create_database(db).await?;
|
||||
}
|
||||
}
|
||||
|
||||
for db in &existing_databases {
|
||||
if !wanted_state.databases.contains(db) && db != "postgres" {
|
||||
self.database_handler.delete_database(db).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure users exist and have expected password.
|
||||
for user in &wanted_state.users {
|
||||
if user == "postgres" {
|
||||
continue;
|
||||
}
|
||||
let password = wanted_state
|
||||
.user_passwords
|
||||
.get(user)
|
||||
.ok_or_else(|| format!("missing password for user '{}'", user))?;
|
||||
self.database_handler.ensure_user(user, password).await?;
|
||||
}
|
||||
|
||||
for user in &existing_users {
|
||||
if !wanted_state.users.contains(user) && user != "postgres" {
|
||||
self.database_handler.delete_user(user).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce connect policy: remove PUBLIC and ensure expected ownership.
|
||||
// PostgreSQL grants CONNECT on databases to PUBLIC by default. If you want users to only
|
||||
// connect to their declared databases, we must revoke it at least on `postgres`.
|
||||
self.database_handler
|
||||
.revoke_public_connect("postgres")
|
||||
.await?;
|
||||
|
||||
for db in &wanted_state.databases {
|
||||
if db == "postgres" {
|
||||
continue;
|
||||
}
|
||||
self.database_handler.revoke_public_connect(db).await?;
|
||||
}
|
||||
|
||||
for (db, owner) in &wanted_state.database_owners {
|
||||
if db == "postgres" {
|
||||
continue;
|
||||
}
|
||||
self.database_handler.ensure_db_owner(db, owner).await?;
|
||||
}
|
||||
|
||||
for grant in &wanted_state.grants {
|
||||
let already_granted = existing_grants
|
||||
.iter()
|
||||
.any(|g| g.user == grant.user && g.database == grant.database);
|
||||
if !already_granted {
|
||||
self.database_handler
|
||||
.grant_access(&grant.user, &grant.database)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Revoke explicit grants on databases we manage when they are no longer wanted.
|
||||
for existing_grant in &existing_grants {
|
||||
if !wanted_state.databases.contains(&existing_grant.database) {
|
||||
continue;
|
||||
}
|
||||
let still_wanted = wanted_state
|
||||
.grants
|
||||
.iter()
|
||||
.any(|g| g.user == existing_grant.user && g.database == existing_grant.database);
|
||||
if !still_wanted {
|
||||
self.database_handler
|
||||
.revoke_access(&existing_grant.user, &existing_grant.database)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::app::database::ports::driven::database_port::{
|
||||
MockForGettingDatabasesWantedState, MockForManagingDatabases, WantedState,
|
||||
};
|
||||
use mockall::predicate::*;
|
||||
use rstest::rstest;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[rstest]
|
||||
// No databases exist and none are wanted
|
||||
#[case::no_existing_no_wanted(Ok(vec![]), Ok(vec![]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// No databases exist but one is wanted
|
||||
#[case::no_existing_one_wanted(Ok(vec![]), Ok(vec!["db1".to_string()]), vec!["db1".to_string()], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// A database exists and is wanted
|
||||
#[case::exists_and_wanted(Ok(vec!["db1".to_string()]), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// One database exists and another is wanted
|
||||
#[case::different_existing_vs_wanted(Ok(vec!["db1".to_string()]), Ok(vec!["db2".to_string()]), vec!["db2".to_string()], Ok(()), vec!["db1".to_string()], Ok(()), Ok(()))]
|
||||
// A database exists but is not wanted
|
||||
#[case::existing_not_wanted(Ok(vec!["db1".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string()], Ok(()), Ok(()))]
|
||||
// Multiple databases exist and none are wanted
|
||||
#[case::multiple_existing_none_wanted(Ok(vec!["db1".to_string(), "db2".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string(), "db2".to_string()], Ok(()), Ok(()))]
|
||||
// No databases exist but multiple are wanted
|
||||
#[case::no_existing_multiple_wanted(Ok(vec![]), Ok(vec!["db1".to_string(), "db2".to_string()]), vec!["db1".to_string(), "db2".to_string()], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
// Multiple databases exist and some are wanted
|
||||
#[case::multiple_existing_some_wanted(Ok(vec!["db1".to_string(), "db2".to_string(), "db3".to_string()]), Ok(vec!["db2".to_string(), "db4".to_string()]), vec!["db4".to_string()], Ok(()), vec!["db1".to_string(), "db3".to_string()], Ok(()), Ok(()))]
|
||||
// Error retrieving existing databases
|
||||
#[case::error_list_databases(Err("DB Error".to_string()), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Err("DB Error".to_string()))]
|
||||
// Error retrieving wanted databases
|
||||
#[case::error_wanted_databases(Ok(vec!["db1".to_string()]), Err("Wanted State Error".to_string()), vec![], Ok(()), vec![], Ok(()), Err("Wanted State Error".to_string()))]
|
||||
// Error creating a database
|
||||
#[case::error_create_database(Ok(vec![]), Ok(vec!["db1".to_string()]), vec!["db1".to_string()], Err("Creation Error".to_string()), vec![], Ok(()), Err("Creation Error".to_string()))]
|
||||
// Error deleting a database
|
||||
#[case::error_delete_database(Ok(vec!["db1".to_string()]), Ok(vec![]), vec![], Ok(()), vec!["db1".to_string()], Err("Deletion Error".to_string()), Err("Deletion Error".to_string()))]
|
||||
// If database name is "postgres", delete should not be called
|
||||
#[case::dont_delete_postgres(Ok(vec!["postgres".to_string(), "db1".to_string()]), Ok(vec!["db1".to_string()]), vec![], Ok(()), vec![], Ok(()), Ok(()))]
|
||||
fn test_reconcile_databases(
|
||||
#[case] existing_dbs_result: Result<Vec<String>, String>,
|
||||
#[case] wanted_dbs_result: Result<Vec<String>, String>,
|
||||
#[case] expected_database_creation_param: Vec<String>,
|
||||
#[case] expected_database_creation_result: Result<(), String>,
|
||||
#[case] expected_database_deletion_param: Vec<String>,
|
||||
#[case] expected_database_deletion_result: Result<(), String>,
|
||||
#[case] expected_result: Result<(), String>,
|
||||
) {
|
||||
let mut mock_db_handler = MockForManagingDatabases::new();
|
||||
let mut mock_wanted_state_handler = MockForGettingDatabasesWantedState::new();
|
||||
|
||||
mock_wanted_state_handler
|
||||
.expect_get_wanted_state()
|
||||
.returning(move || {
|
||||
wanted_dbs_result.clone().map(|dbs| WantedState {
|
||||
databases: dbs,
|
||||
users: vec![],
|
||||
user_passwords: BTreeMap::new(),
|
||||
database_owners: BTreeMap::new(),
|
||||
grants: vec![],
|
||||
})
|
||||
});
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_databases()
|
||||
.returning(move || existing_dbs_result.clone());
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_users()
|
||||
.times(0..)
|
||||
.returning(|| Ok(vec![]));
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_grants()
|
||||
.times(0..)
|
||||
.returning(|| Ok(vec![]));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.times(0..)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_db_owner()
|
||||
.times(0..)
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_user()
|
||||
.times(0..)
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
for db_name in expected_database_creation_param.clone() {
|
||||
let db_name_clone = db_name.clone();
|
||||
let expected_database_creation_result = expected_database_creation_result.clone();
|
||||
mock_db_handler
|
||||
.expect_create_database()
|
||||
.with(eq(db_name_clone))
|
||||
.returning(move |_| expected_database_creation_result.clone());
|
||||
}
|
||||
|
||||
for db_name in expected_database_deletion_param.clone() {
|
||||
let db_name_clone = db_name.clone();
|
||||
let expected_database_deletion_result = expected_database_deletion_result.clone();
|
||||
mock_db_handler
|
||||
.expect_delete_database()
|
||||
.with(eq(db_name_clone))
|
||||
.returning(move |_| expected_database_deletion_result.clone());
|
||||
}
|
||||
|
||||
let service = DatabaseService::new(mock_db_handler, mock_wanted_state_handler);
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let result = rt.block_on(service.reconcile_databases());
|
||||
assert_eq!(result, expected_result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_user_is_called_with_password() {
|
||||
let mut mock_db_handler = MockForManagingDatabases::new();
|
||||
let mut mock_wanted_state_handler = MockForGettingDatabasesWantedState::new();
|
||||
|
||||
mock_wanted_state_handler
|
||||
.expect_get_wanted_state()
|
||||
.returning(|| {
|
||||
let mut user_passwords = BTreeMap::new();
|
||||
user_passwords.insert("u1".to_string(), "p1".to_string());
|
||||
|
||||
Ok(WantedState {
|
||||
databases: vec![],
|
||||
users: vec!["u1".to_string()],
|
||||
user_passwords,
|
||||
database_owners: BTreeMap::new(),
|
||||
grants: vec![],
|
||||
})
|
||||
});
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_databases()
|
||||
.returning(|| Ok(vec![]));
|
||||
mock_db_handler.expect_list_users().returning(|| Ok(vec![]));
|
||||
mock_db_handler
|
||||
.expect_list_grants()
|
||||
.returning(|| Ok(vec![]));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_user()
|
||||
.with(eq("u1"), eq("p1"))
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.with(eq("postgres"))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
mock_db_handler
|
||||
.expect_ensure_db_owner()
|
||||
.times(0)
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
let service = DatabaseService::new(mock_db_handler, mock_wanted_state_handler);
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(service.reconcile_databases()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enforces_public_policy_ownership_and_grants() {
|
||||
let mut mock_db_handler = MockForManagingDatabases::new();
|
||||
let mut mock_wanted_state_handler = MockForGettingDatabasesWantedState::new();
|
||||
|
||||
mock_wanted_state_handler
|
||||
.expect_get_wanted_state()
|
||||
.returning(|| {
|
||||
let mut user_passwords = BTreeMap::new();
|
||||
user_passwords.insert("u1".to_string(), "p1".to_string());
|
||||
let mut database_owners = BTreeMap::new();
|
||||
database_owners.insert("db1".to_string(), "u1".to_string());
|
||||
|
||||
Ok(WantedState {
|
||||
databases: vec!["db1".to_string()],
|
||||
users: vec!["u1".to_string()],
|
||||
user_passwords,
|
||||
database_owners,
|
||||
grants: vec![
|
||||
crate::app::database::ports::driven::database_port::GrantDbAccess {
|
||||
user: "u1".to_string(),
|
||||
database: "db1".to_string(),
|
||||
},
|
||||
],
|
||||
})
|
||||
});
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_databases()
|
||||
.returning(|| Ok(vec!["db1".to_string()]));
|
||||
mock_db_handler
|
||||
.expect_list_users()
|
||||
.returning(|| Ok(vec!["u1".to_string()]));
|
||||
mock_db_handler
|
||||
.expect_list_grants()
|
||||
.returning(|| Ok(vec![]));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_user()
|
||||
.with(eq("u1"), eq("p1"))
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.with(eq("postgres"))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.with(eq("db1"))
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_db_owner()
|
||||
.with(eq("db1"), eq("u1"))
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_grant_access()
|
||||
.with(eq("u1"), eq("db1"))
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
let service = DatabaseService::new(mock_db_handler, mock_wanted_state_handler);
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(service.reconcile_databases()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn revokes_stale_grants_on_managed_databases() {
|
||||
let mut mock_db_handler = MockForManagingDatabases::new();
|
||||
let mut mock_wanted_state_handler = MockForGettingDatabasesWantedState::new();
|
||||
|
||||
mock_wanted_state_handler
|
||||
.expect_get_wanted_state()
|
||||
.returning(|| {
|
||||
Ok(WantedState {
|
||||
databases: vec!["db1".to_string()],
|
||||
users: vec![],
|
||||
user_passwords: BTreeMap::new(),
|
||||
database_owners: BTreeMap::new(),
|
||||
grants: vec![],
|
||||
})
|
||||
});
|
||||
|
||||
mock_db_handler
|
||||
.expect_list_databases()
|
||||
.returning(|| Ok(vec!["db1".to_string()]));
|
||||
mock_db_handler.expect_list_users().returning(|| Ok(vec![]));
|
||||
mock_db_handler.expect_list_grants().returning(|| {
|
||||
Ok(vec![
|
||||
crate::app::database::ports::driven::database_port::GrantDbAccess {
|
||||
user: "u2".to_string(),
|
||||
database: "db1".to_string(),
|
||||
},
|
||||
])
|
||||
});
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.with(eq("postgres"))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_public_connect()
|
||||
.with(eq("db1"))
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_revoke_access()
|
||||
.with(eq("u2"), eq("db1"))
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
mock_db_handler
|
||||
.expect_ensure_db_owner()
|
||||
.times(0)
|
||||
.returning(|_, _| Ok(()));
|
||||
mock_db_handler
|
||||
.expect_ensure_user()
|
||||
.times(0)
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
let service = DatabaseService::new(mock_db_handler, mock_wanted_state_handler);
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(service.reconcile_databases()).unwrap();
|
||||
}
|
||||
}
|
||||
40
src/app/database/ports/driven/database_port.rs
Normal file
40
src/app/database/ports/driven/database_port.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
#[async_trait::async_trait]
|
||||
pub trait ForGettingDatabasesWantedState {
|
||||
async fn get_wanted_state(&self) -> Result<WantedState, String>;
|
||||
}
|
||||
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
#[async_trait::async_trait]
|
||||
pub trait ForManagingDatabases {
|
||||
async fn list_databases(&self) -> Result<Vec<String>, String>;
|
||||
async fn list_users(&self) -> Result<Vec<String>, String>;
|
||||
async fn list_grants(&self) -> Result<Vec<GrantDbAccess>, String>;
|
||||
|
||||
async fn create_database(&self, name: &str) -> Result<(), String>;
|
||||
async fn delete_database(&self, name: &str) -> Result<(), String>;
|
||||
|
||||
async fn ensure_user(&self, user: &str, password: &str) -> Result<(), String>;
|
||||
async fn delete_user(&self, name: &str) -> Result<(), String>;
|
||||
|
||||
async fn revoke_public_connect(&self, database: &str) -> Result<(), String>;
|
||||
async fn ensure_db_owner(&self, database: &str, owner: &str) -> Result<(), String>;
|
||||
|
||||
async fn grant_access(&self, user: &str, database: &str) -> Result<(), String>;
|
||||
async fn revoke_access(&self, user: &str, database: &str) -> Result<(), String>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct WantedState {
|
||||
pub databases: Vec<String>,
|
||||
pub users: Vec<String>,
|
||||
pub user_passwords: std::collections::BTreeMap<String, String>,
|
||||
pub database_owners: std::collections::BTreeMap<String, String>,
|
||||
pub grants: Vec<GrantDbAccess>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct GrantDbAccess {
|
||||
pub user: String,
|
||||
pub database: String,
|
||||
}
|
||||
42
src/main.rs
Normal file
42
src/main.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
mod app {
|
||||
pub mod database {
|
||||
pub mod database_service;
|
||||
pub mod ports {
|
||||
pub mod driven {
|
||||
pub mod database_port;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod actors {
|
||||
pub mod driven {
|
||||
pub mod database_file_retriever;
|
||||
pub mod sqlx_handler;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/postgres".to_string());
|
||||
|
||||
let pool = sqlx::postgres::PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&database_url)
|
||||
.await
|
||||
.expect("failed to connect to postgres");
|
||||
let database_handler = actors::driven::sqlx_handler::SQLXHandler::new(pool);
|
||||
|
||||
let file_retriever = actors::driven::database_file_retriever::DatabaseFileRetriever::new(
|
||||
"database.yaml".to_string(),
|
||||
);
|
||||
|
||||
let database_service = app::database::database_service::DatabaseService::new(
|
||||
database_handler.clone(),
|
||||
file_retriever,
|
||||
);
|
||||
database_service.reconcile_databases().await.unwrap();
|
||||
}
|
||||
Reference in New Issue
Block a user