diff --git a/podnet/server/Cargo.toml b/podnet/server/Cargo.toml index 5167342..49a5dda 100644 --- a/podnet/server/Cargo.toml +++ b/podnet/server/Cargo.toml @@ -22,6 +22,8 @@ axum = { version = "0.7", features = ["macros"] } tower = "0.4" tower-http = { version = "0.5", features = ["cors", "fs"] } rusqlite = { workspace = true, features = ["bundled"] } +rusqlite_migration = { workspace = true } +lazy_static = { workspace = true } pulldown-cmark = "0.13" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } diff --git a/podnet/server/src/db/migrations.rs b/podnet/server/src/db/migrations.rs new file mode 100644 index 0000000..4bc4f53 --- /dev/null +++ b/podnet/server/src/db/migrations.rs @@ -0,0 +1,172 @@ +use lazy_static::lazy_static; +use podnet_models::ReplyReference; +use rusqlite::OptionalExtension; +use rusqlite_migration::{M, Migrations}; + +lazy_static! { + pub static ref MIGRATIONS: Migrations<'static> = Migrations::new(vec![ + M::up( + "CREATE TABLE IF NOT EXISTS posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_edited_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content_id TEXT NOT NULL, + post_id INTEGER NOT NULL, + revision INTEGER NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + pod TEXT NOT NULL, + timestamp_pod TEXT NOT NULL, + uploader_id TEXT NOT NULL, + upvote_count_pod TEXT, + tags TEXT DEFAULT '[]', + authors TEXT DEFAULT '[]', + reply_to INTEGER, + FOREIGN KEY (post_id) REFERENCES posts (id), + UNIQUE (post_id, revision) + ); + CREATE TABLE IF NOT EXISTS identity_servers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + server_id TEXT NOT NULL UNIQUE, + public_key TEXT NOT NULL, + challenge_pod TEXT NOT NULL, + identity_pod TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE IF NOT EXISTS upvotes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + document_id INTEGER NOT NULL, + username TEXT NOT NULL, + pod_json TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (document_id) REFERENCES documents (id), + UNIQUE (document_id, username) + );", + ), + M::up( + "ALTER TABLE posts ADD COLUMN parent_post_id INTEGER REFERENCES posts(id); + ALTER TABLE posts ADD COLUMN thread_root_post_id INTEGER REFERENCES posts(id); + ALTER TABLE posts ADD COLUMN reply_to_document_id INTEGER REFERENCES documents(id); + CREATE INDEX IF NOT EXISTS idx_posts_parent_post_id ON posts(parent_post_id); + CREATE INDEX IF NOT EXISTS idx_posts_thread_root_post_id ON posts(thread_root_post_id);" + ), + M::up("ALTER TABLE documents ADD COLUMN requested_post_id INTEGER;"), + M::up("ALTER TABLE documents ADD COLUMN title TEXT NOT NULL DEFAULT '';"), + M::up( + "ALTER TABLE documents ADD COLUMN thread_root_id INTEGER; + CREATE INDEX IF NOT EXISTS idx_thread_root_id ON documents(thread_root_id);" + ), + M::up_with_hook("-- V6 migrate reply_to column to text", |tx| { + // Check if the migration has already been applied by checking if reply_to contains JSON + let migration_check: rusqlite::Result = tx.query_row( + "SELECT reply_to FROM documents WHERE reply_to IS NOT NULL LIMIT 1", + [], + |row| { + Ok(row + .get::<_, Option>(0) + .unwrap_or_default() + .unwrap_or_default()) + }, + ); + + // If we can get a value and it's a number (not JSON), we need to migrate + if let Ok(value) = migration_check + && !value.is_empty() && value.parse::().is_ok() { + // Create a new table with the correct schema + tx.execute_batch( + "CREATE TABLE IF NOT EXISTS documents_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content_id TEXT NOT NULL, + post_id INTEGER NOT NULL, + revision INTEGER NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + pod TEXT NOT NULL, + timestamp_pod TEXT NOT NULL, + uploader_id TEXT NOT NULL, + upvote_count_pod TEXT, + tags TEXT DEFAULT '[]', + authors TEXT DEFAULT '[]', + reply_to TEXT, + requested_post_id INTEGER, + title TEXT NOT NULL, + FOREIGN KEY (post_id) REFERENCES posts (id), + UNIQUE (post_id, revision) + ); + + INSERT INTO documents_new + SELECT id, content_id, post_id, revision, created_at, pod, timestamp_pod, + uploader_id, upvote_count_pod, tags, authors, + CASE + WHEN reply_to IS NULL THEN NULL + ELSE json_object('post_id', -1, 'document_id', reply_to) + END as reply_to, + requested_post_id, title + FROM documents; + + DROP TABLE documents; + ALTER TABLE documents_new RENAME TO documents;", + )?; + } + Ok(()) + }), + M::up_with_hook("-- V7 migrate thread_root_id data", |tx| { + // Check if migration is needed - if any document has null thread_root_id + let needs_migration: bool = tx.query_row( + "SELECT EXISTS(SELECT 1 FROM documents WHERE thread_root_id IS NULL)", + [], + |row| row.get(0), + )?; + + if !needs_migration { + return Ok(()); + } + + // Get all documents without thread_root_id + let mut stmt = tx.prepare( + "SELECT id, reply_to FROM documents WHERE thread_root_id IS NULL ORDER BY id", + )?; + + let documents: Vec<(i64, Option)> = stmt + .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()?; + + // Process each document + for (doc_id, reply_to_json) in documents { + let thread_root_id = if let Some(reply_json) = reply_to_json { + // This is a reply - find the thread root by traversing up the chain + if let Ok(reply_ref) = serde_json::from_str::(&reply_json) { + let mut current_id = reply_ref.document_id; + let mut visited = std::collections::HashSet::new(); + loop { + if visited.contains(¤t_id) { break; } + visited.insert(current_id); + let next_reply: Option = tx.query_row( + "SELECT reply_to FROM documents WHERE id = ?1", + [current_id], |row| row.get(0)).optional()?.flatten(); + if let Some(next_json) = next_reply { + if let Ok(next_ref) = serde_json::from_str::(&next_json) { + current_id = next_ref.document_id; + } else { break; } + } else { break; } + } + current_id + } else { + doc_id + } + } else { + doc_id + }; + + // Update the document with its thread_root_id + tx.execute( + "UPDATE documents SET thread_root_id = ?1 WHERE id = ?2", + [thread_root_id, doc_id], + )?; + } + + Ok(()) + }), + ]); +} diff --git a/podnet/server/src/db/mod.rs b/podnet/server/src/db/mod.rs index c7b5d31..d4e9670 100644 --- a/podnet/server/src/db/mod.rs +++ b/podnet/server/src/db/mod.rs @@ -8,6 +8,8 @@ use podnet_models::{ }; use rusqlite::{Connection, OptionalExtension, Result}; +pub mod migrations; + pub struct Database { conn: Mutex, } @@ -16,140 +18,43 @@ impl Database { pub async fn new(db_path: &str) -> anyhow::Result { let db_path = db_path.to_string(); tokio::task::spawn_blocking(move || { - let conn = Connection::open(&db_path)?; + let mut conn = Connection::open(&db_path)?; + + // --- Bootstrap logic for existing databases --- + let current_version: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?; + // TODO: We can eventually remove this once the production database is fully migrated. + // Prior to adopting rusqlite_migration, we manually applied some migrations. We want + // to still run this on existing databases to ensure they're up to date. + if current_version == 0 { + let table_info: String = conn + .query_row( + "SELECT sql FROM sqlite_master WHERE name = 'documents' AND type = 'table'", + [], + |row| row.get(0), + ) + .optional()? + .unwrap_or_default(); + + // We have a database which is at the baseline for version 7. + if !table_info.is_empty() && table_info.contains("thread_root_id") { + tracing::info!( + "Detected existing un-versioned database. Baselining to latest migration version." + ); + conn.execute_batch("PRAGMA user_version = 7")?; + } + // Otherwise, we should run migrations to reach the baseline. + } + + migrations::MIGRATIONS.to_latest(&mut conn)?; + let db = Database { conn: Mutex::new(conn), }; - db.init_tables()?; Ok(db) }) .await? } - fn init_tables(&self) -> Result<()> { - let conn = self.conn.lock().unwrap(); - - // Create posts table - conn.execute( - "CREATE TABLE IF NOT EXISTS posts ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - last_edited_at DATETIME DEFAULT CURRENT_TIMESTAMP - )", - [], - )?; - - // Add thread relationship columns to posts (migration-safe if already exist) - let _ = conn.execute( - "ALTER TABLE posts ADD COLUMN parent_post_id INTEGER REFERENCES posts(id)", - [], - ); - let _ = conn.execute( - "ALTER TABLE posts ADD COLUMN thread_root_post_id INTEGER REFERENCES posts(id)", - [], - ); - let _ = conn.execute( - "ALTER TABLE posts ADD COLUMN reply_to_document_id INTEGER REFERENCES documents(id)", - [], - ); - // Indexes for thread navigation and activity queries - let _ = conn.execute( - "CREATE INDEX IF NOT EXISTS idx_posts_parent_post_id ON posts(parent_post_id)", - [], - ); - let _ = conn.execute( - "CREATE INDEX IF NOT EXISTS idx_posts_thread_root_post_id ON posts(thread_root_post_id)", - [], - ); - - // Create documents table (revisions of posts) - conn.execute( - "CREATE TABLE IF NOT EXISTS documents ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - content_id TEXT NOT NULL, - post_id INTEGER NOT NULL, - revision INTEGER NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - pod TEXT NOT NULL, - timestamp_pod TEXT NOT NULL, - uploader_id TEXT NOT NULL, - upvote_count_pod TEXT, - tags TEXT DEFAULT '[]', - authors TEXT DEFAULT '[]', - reply_to TEXT, - requested_post_id INTEGER, - title TEXT NOT NULL, - thread_root_id INTEGER, - FOREIGN KEY (post_id) REFERENCES posts (id), - FOREIGN KEY (thread_root_id) REFERENCES documents (id), - UNIQUE (post_id, revision) - )", - [], - )?; - - // Add requested_post_id column to existing databases (migration) - // This will fail silently if the column already exists - let _ = conn.execute( - "ALTER TABLE documents ADD COLUMN requested_post_id INTEGER", - [], - ); - - // Add title column to existing databases (migration) - // This will fail silently if the column already exists - let _ = conn.execute( - "ALTER TABLE documents ADD COLUMN title TEXT NOT NULL DEFAULT ''", - [], - ); - - // Add thread_root_id column to existing databases (migration) - // This will fail silently if the column already exists - let _ = conn.execute( - "ALTER TABLE documents ADD COLUMN thread_root_id INTEGER", - [], - ); - - // Migrate reply_to column to TEXT for ReplyReference support - self.migrate_reply_to_column(&conn)?; - - // Migrate existing documents to populate thread_root_id - self.migrate_thread_root_id(&conn)?; - - // Create index for thread_root_id for performance - let _ = conn.execute( - "CREATE INDEX IF NOT EXISTS idx_thread_root_id ON documents(thread_root_id)", - [], - ); - - // Create identity_servers table - conn.execute( - "CREATE TABLE IF NOT EXISTS identity_servers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - server_id TEXT NOT NULL UNIQUE, - public_key TEXT NOT NULL, - challenge_pod TEXT NOT NULL, - identity_pod TEXT NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - )", - [], - )?; - - // Create upvotes table - conn.execute( - "CREATE TABLE IF NOT EXISTS upvotes ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - document_id INTEGER NOT NULL, - username TEXT NOT NULL, - pod_json TEXT NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (document_id) REFERENCES documents (id), - UNIQUE (document_id, username) - )", - [], - )?; - - Ok(()) - } - pub fn set_post_thread_links( &self, post_id: i64, @@ -165,151 +70,6 @@ impl Database { Ok(()) } - fn migrate_reply_to_column(&self, conn: &Connection) -> Result<()> { - // Check if the migration has already been applied by checking if reply_to contains JSON - let migration_check: Result = conn.query_row( - "SELECT reply_to FROM documents WHERE reply_to IS NOT NULL LIMIT 1", - [], - |row| { - Ok(row - .get::<_, Option>(0) - .unwrap_or_default() - .unwrap_or_default()) - }, - ); - - // If we can get a value and it's a number (not JSON), we need to migrate - if let Ok(value) = migration_check - && !value.is_empty() - && value.parse::().is_ok() - { - // Create a new table with the correct schema - conn.execute( - "CREATE TABLE IF NOT EXISTS documents_new ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - content_id TEXT NOT NULL, - post_id INTEGER NOT NULL, - revision INTEGER NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - pod TEXT NOT NULL, - timestamp_pod TEXT NOT NULL, - uploader_id TEXT NOT NULL, - upvote_count_pod TEXT, - tags TEXT DEFAULT '[]', - authors TEXT DEFAULT '[]', - reply_to TEXT, - requested_post_id INTEGER, - title TEXT NOT NULL, - FOREIGN KEY (post_id) REFERENCES posts (id), - UNIQUE (post_id, revision) - )", - [], - )?; - - // Copy data, converting INTEGER reply_to to JSON format - conn.execute( - "INSERT INTO documents_new - SELECT id, content_id, post_id, revision, created_at, pod, timestamp_pod, - uploader_id, upvote_count_pod, tags, authors, - CASE - WHEN reply_to IS NULL THEN NULL - ELSE json_object('post_id', -1, 'document_id', reply_to) - END as reply_to, - requested_post_id, title - FROM documents", - [], - )?; - - // Drop old table and rename new one - conn.execute("DROP TABLE documents", [])?; - conn.execute("ALTER TABLE documents_new RENAME TO documents", [])?; - } - - Ok(()) - } - - fn migrate_thread_root_id(&self, conn: &Connection) -> Result<()> { - // Check if migration is needed - if any document has null thread_root_id - let needs_migration: bool = conn.query_row( - "SELECT EXISTS(SELECT 1 FROM documents WHERE thread_root_id IS NULL)", - [], - |row| row.get(0), - )?; - - if !needs_migration { - return Ok(()); - } - - // Get all documents without thread_root_id - let mut stmt = conn.prepare( - "SELECT id, reply_to FROM documents WHERE thread_root_id IS NULL ORDER BY id", - )?; - - let documents: Vec<(i64, Option)> = stmt - .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? - .collect::, _>>()?; - - // Process each document - for (doc_id, reply_to_json) in documents { - let thread_root_id = if let Some(reply_json) = reply_to_json { - // This is a reply - find the thread root by traversing up the chain - if let Ok(reply_ref) = serde_json::from_str::(&reply_json) { - self.find_thread_root_id(conn, reply_ref.document_id)? - } else { - // Invalid reply_to, treat as root document - doc_id - } - } else { - // This is a root document - thread_root_id is itself - doc_id - }; - - // Update the document with its thread_root_id - conn.execute( - "UPDATE documents SET thread_root_id = ?1 WHERE id = ?2", - [thread_root_id, doc_id], - )?; - } - - Ok(()) - } - - fn find_thread_root_id(&self, conn: &Connection, mut document_id: i64) -> Result { - let mut visited = std::collections::HashSet::new(); - - loop { - // Prevent infinite loops - if visited.contains(&document_id) { - return Ok(document_id); - } - visited.insert(document_id); - - // Check if this document has a reply_to - let reply_to_json: Option = conn - .query_row( - "SELECT reply_to FROM documents WHERE id = ?1", - [document_id], - |row| row.get(0), - ) - .optional()? - .flatten(); - - if let Some(reply_json) = reply_to_json { - if let Ok(reply_ref) = serde_json::from_str::(&reply_json) { - document_id = reply_ref.document_id; - } else { - // Invalid reply_to, this is the root - break; - } - } else { - // No reply_to, this is the root - break; - } - } - - Ok(document_id) - } - // Post methods pub fn create_post(&self) -> Result { let conn = self.conn.lock().unwrap();