diff --git a/rust-executor/src/db.rs b/rust-executor/src/db.rs index 47327e5a1..c496d3f0f 100644 --- a/rust-executor/src/db.rs +++ b/rust-executor/src/db.rs @@ -87,6 +87,20 @@ impl Ad4mDb { )", [], )?; + // Ensure we don't have duplicate link expressions and enforce uniqueness going forward + conn.execute( + "DELETE FROM link + WHERE id NOT IN ( + SELECT MIN(id) FROM link + GROUP BY perspective, source, predicate, target, author, timestamp + )", + [], + )?; + conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS link_unique_idx + ON link (perspective, source, predicate, target, author, timestamp)", + [], + )?; conn.execute( "CREATE TABLE IF NOT EXISTS expression ( @@ -733,7 +747,7 @@ impl Ad4mDb { status: &LinkStatus, ) -> Ad4mDbResult<()> { self.conn.execute( - "INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) + "INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", params![ perspective_uuid, @@ -758,7 +772,7 @@ impl Ad4mDb { ) -> Ad4mDbResult<()> { for link in links.iter() { self.conn.execute( - "INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) + "INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", params![ perspective_uuid, @@ -877,7 +891,7 @@ impl Ad4mDb { perspective_uuid: &str, ) -> Ad4mDbResult> { let mut stmt = self.conn.prepare( - "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1", + "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 ORDER BY timestamp, source, predicate, target, author", )?; let link_iter = stmt.query_map(params![perspective_uuid], |row| { let status: LinkStatus = @@ -914,7 +928,7 @@ impl Ad4mDb { source: &str, ) -> Ad4mDbResult> { let mut stmt = self.conn.prepare( - "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND source = ?2", + "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND source = ?2 ORDER BY timestamp, predicate, target, author", )?; let link_iter = stmt.query_map(params![perspective_uuid, source], |row| { let status: LinkStatus = @@ -951,7 +965,7 @@ impl Ad4mDb { target: &str, ) -> Ad4mDbResult> { let mut stmt = self.conn.prepare( - "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND target = ?2", + "SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND target = ?2 ORDER BY timestamp, source, predicate, author", )?; let link_iter = stmt.query_map(params![perspective_uuid, target], |row| { let status: LinkStatus = @@ -1640,7 +1654,7 @@ impl Ad4mDb { log::debug!("Importing {} links", links.len()); for (link, signature, key) in links { match self.conn.execute( - "INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + "INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", params![ link.perspective, link.source, diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 16235821a..2fc99e1b6 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -593,16 +593,57 @@ impl PerspectiveInstance { pub async fn diff_from_link_language(&self, diff: PerspectiveDiff) { let handle = self.persisted.lock().await.clone(); - if !diff.additions.is_empty() { + + // Deduplicate by (author, timestamp, source, predicate, target) + // Use structured keys to avoid delimiter collision issues + let mut seen_add: std::collections::HashSet = std::collections::HashSet::new(); + let mut unique_additions: Vec = Vec::new(); + for link in diff.additions.iter() { + let key_tuple = ( + &link.author, + &link.timestamp, + &link.data.source, + link.data.predicate.as_deref().unwrap_or(""), + &link.data.target, + ); + let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| { + // Fallback to a simple hash if serialization fails + format!("{:?}", key_tuple) + }); + if seen_add.insert(key) { + unique_additions.push(link.clone()); + } + } + + let mut seen_rem: std::collections::HashSet = std::collections::HashSet::new(); + let mut unique_removals: Vec = Vec::new(); + for link in diff.removals.iter() { + let key_tuple = ( + &link.author, + &link.timestamp, + &link.data.source, + link.data.predicate.as_deref().unwrap_or(""), + &link.data.target, + ); + let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| { + // Fallback to a simple hash if serialization fails + format!("{:?}", key_tuple) + }); + if seen_rem.insert(key) { + unique_removals.push(link.clone()); + } + } + + if !unique_additions.is_empty() { Ad4mDb::with_global_instance(|db| { - db.add_many_links(&handle.uuid, diff.additions.clone(), &LinkStatus::Shared) + db.add_many_links(&handle.uuid, unique_additions.clone(), &LinkStatus::Shared) }) .expect("Failed to add many links"); } - if !diff.removals.is_empty() { + if !unique_removals.is_empty() { Ad4mDb::with_global_instance(|db| { - for link in &diff.removals { + for link in &unique_removals { db.remove_link(&handle.uuid, link) .expect("Failed to remove link"); } @@ -610,13 +651,11 @@ impl PerspectiveInstance { } let decorated_diff = DecoratedPerspectiveDiff { - additions: diff - .additions + additions: unique_additions .iter() .map(|link| DecoratedLinkExpression::from((link.clone(), LinkStatus::Shared))) .collect(), - removals: diff - .removals + removals: unique_removals .iter() .map(|link| DecoratedLinkExpression::from((link.clone(), LinkStatus::Shared))) .collect(), @@ -2800,9 +2839,21 @@ mod tests { } let query = LinkQuery::default(); - let links = perspective.get_links(&query).await.unwrap(); + let mut links = perspective.get_links(&query).await.unwrap(); assert_eq!(links.len(), 5); - assert_eq!(links, all_links); + let mut all_links_sorted = all_links.clone(); + let cmp = |a: &DecoratedLinkExpression, b: &DecoratedLinkExpression| { + let at = chrono::DateTime::parse_from_rfc3339(&a.timestamp).unwrap(); + let bt = chrono::DateTime::parse_from_rfc3339(&b.timestamp).unwrap(); + at.cmp(&bt) + .then(a.data.source.cmp(&b.data.source)) + .then(a.data.predicate.cmp(&b.data.predicate)) + .then(a.data.target.cmp(&b.data.target)) + .then(a.author.cmp(&b.author)) + }; + links.sort_by(cmp); + all_links_sorted.sort_by(cmp); + assert_eq!(links, all_links_sorted); } #[tokio::test] @@ -2828,12 +2879,22 @@ mod tests { source: Some(source.to_string()), ..Default::default() }; - let links = perspective.get_links(&query).await.unwrap(); - let expected_links: Vec<_> = all_links + let mut links = perspective.get_links(&query).await.unwrap(); + let mut expected_links: Vec<_> = all_links .into_iter() .filter(|expr| expr.data.source == source) .collect(); assert_eq!(links.len(), expected_links.len()); + let cmp = |a: &DecoratedLinkExpression, b: &DecoratedLinkExpression| { + let at = chrono::DateTime::parse_from_rfc3339(&a.timestamp).unwrap(); + let bt = chrono::DateTime::parse_from_rfc3339(&b.timestamp).unwrap(); + at.cmp(&bt) + .then(a.data.predicate.cmp(&b.data.predicate)) + .then(a.data.target.cmp(&b.data.target)) + .then(a.author.cmp(&b.author)) + }; + links.sort_by(cmp); + expected_links.sort_by(cmp); assert_eq!(links, expected_links); } diff --git a/tests/js/tests/prolog-and-literals.test.ts b/tests/js/tests/prolog-and-literals.test.ts index 4b373b980..6e0a04ddf 100644 --- a/tests/js/tests/prolog-and-literals.test.ts +++ b/tests/js/tests/prolog-and-literals.test.ts @@ -2196,7 +2196,11 @@ describe("Prolog + Literals", () => { // Verify models are updated const recipesAfterUpdate = await BatchRecipe.findAll(perspective!); - expect(recipesAfterUpdate[0].ingredients).to.deep.equal(["pasta", "sauce", "cheese", "garlic"]); + expect(recipesAfterUpdate[0].ingredients.length).to.equal(4); + expect(recipesAfterUpdate[0].ingredients.includes("pasta")).to.be.true; + expect(recipesAfterUpdate[0].ingredients.includes("sauce")).to.be.true; + expect(recipesAfterUpdate[0].ingredients.includes("cheese")).to.be.true; + expect(recipesAfterUpdate[0].ingredients.includes("garlic")).to.be.true; const notesAfterUpdate = await BatchNote.findAll(perspective!); expect(notesAfterUpdate[0].content).to.equal("Updated: Use fresh ingredients and add garlic");