Skip to content
26 changes: 20 additions & 6 deletions rust-executor/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ impl Ad4mDb {
)",
[],
)?;
// Ensure we don't have duplicate link expressions and enforce uniqueness going forward
conn.execute(
"DELETE FROM link
WHERE id NOT IN (
SELECT MIN(id) FROM link
GROUP BY perspective, source, predicate, target, author, timestamp
)",
[],
)?;
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS link_unique_idx
ON link (perspective, source, predicate, target, author, timestamp)",
[],
)?;

conn.execute(
"CREATE TABLE IF NOT EXISTS expression (
Expand Down Expand Up @@ -733,7 +747,7 @@ impl Ad4mDb {
status: &LinkStatus,
) -> Ad4mDbResult<()> {
self.conn.execute(
"INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status)
"INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
perspective_uuid,
Expand All @@ -758,7 +772,7 @@ impl Ad4mDb {
) -> Ad4mDbResult<()> {
for link in links.iter() {
self.conn.execute(
"INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status)
"INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
perspective_uuid,
Expand Down Expand Up @@ -877,7 +891,7 @@ impl Ad4mDb {
perspective_uuid: &str,
) -> Ad4mDbResult<Vec<(LinkExpression, LinkStatus)>> {
let mut stmt = self.conn.prepare(
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1",
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 ORDER BY timestamp, source, predicate, target, author",
)?;
let link_iter = stmt.query_map(params![perspective_uuid], |row| {
let status: LinkStatus =
Expand Down Expand Up @@ -914,7 +928,7 @@ impl Ad4mDb {
source: &str,
) -> Ad4mDbResult<Vec<(LinkExpression, LinkStatus)>> {
let mut stmt = self.conn.prepare(
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND source = ?2",
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND source = ?2 ORDER BY timestamp, predicate, target, author",
)?;
let link_iter = stmt.query_map(params![perspective_uuid, source], |row| {
let status: LinkStatus =
Expand Down Expand Up @@ -951,7 +965,7 @@ impl Ad4mDb {
target: &str,
) -> Ad4mDbResult<Vec<(LinkExpression, LinkStatus)>> {
let mut stmt = self.conn.prepare(
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND target = ?2",
"SELECT perspective, source, predicate, target, author, timestamp, signature, key, status FROM link WHERE perspective = ?1 AND target = ?2 ORDER BY timestamp, source, predicate, author",
)?;
let link_iter = stmt.query_map(params![perspective_uuid, target], |row| {
let status: LinkStatus =
Expand Down Expand Up @@ -1640,7 +1654,7 @@ impl Ad4mDb {
log::debug!("Importing {} links", links.len());
for (link, signature, key) in links {
match self.conn.execute(
"INSERT INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
"INSERT OR IGNORE INTO link (perspective, source, predicate, target, author, timestamp, signature, key, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
link.perspective,
link.source,
Expand Down
85 changes: 73 additions & 12 deletions rust-executor/src/perspectives/perspective_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,30 +593,69 @@ impl PerspectiveInstance {

pub async fn diff_from_link_language(&self, diff: PerspectiveDiff) {
let handle = self.persisted.lock().await.clone();
if !diff.additions.is_empty() {

// Deduplicate by (author, timestamp, source, predicate, target)
// Use structured keys to avoid delimiter collision issues
let mut seen_add: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut unique_additions: Vec<LinkExpression> = Vec::new();
for link in diff.additions.iter() {
let key_tuple = (
&link.author,
&link.timestamp,
&link.data.source,
link.data.predicate.as_deref().unwrap_or(""),
&link.data.target,
);
let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| {
// Fallback to a simple hash if serialization fails
format!("{:?}", key_tuple)
});
if seen_add.insert(key) {
unique_additions.push(link.clone());
}
}

let mut seen_rem: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut unique_removals: Vec<LinkExpression> = Vec::new();
for link in diff.removals.iter() {
let key_tuple = (
&link.author,
&link.timestamp,
&link.data.source,
link.data.predicate.as_deref().unwrap_or(""),
&link.data.target,
);
let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| {
// Fallback to a simple hash if serialization fails
format!("{:?}", key_tuple)
});
if seen_rem.insert(key) {
unique_removals.push(link.clone());
}
}

if !unique_additions.is_empty() {
Ad4mDb::with_global_instance(|db| {
db.add_many_links(&handle.uuid, diff.additions.clone(), &LinkStatus::Shared)
db.add_many_links(&handle.uuid, unique_additions.clone(), &LinkStatus::Shared)
})
.expect("Failed to add many links");
}

if !diff.removals.is_empty() {
if !unique_removals.is_empty() {
Ad4mDb::with_global_instance(|db| {
for link in &diff.removals {
for link in &unique_removals {
db.remove_link(&handle.uuid, link)
.expect("Failed to remove link");
}
});
}

let decorated_diff = DecoratedPerspectiveDiff {
additions: diff
.additions
additions: unique_additions
.iter()
.map(|link| DecoratedLinkExpression::from((link.clone(), LinkStatus::Shared)))
.collect(),
removals: diff
.removals
removals: unique_removals
.iter()
.map(|link| DecoratedLinkExpression::from((link.clone(), LinkStatus::Shared)))
.collect(),
Expand Down Expand Up @@ -2800,9 +2839,21 @@ mod tests {
}

let query = LinkQuery::default();
let links = perspective.get_links(&query).await.unwrap();
let mut links = perspective.get_links(&query).await.unwrap();
assert_eq!(links.len(), 5);
assert_eq!(links, all_links);
let mut all_links_sorted = all_links.clone();
let cmp = |a: &DecoratedLinkExpression, b: &DecoratedLinkExpression| {
let at = chrono::DateTime::parse_from_rfc3339(&a.timestamp).unwrap();
let bt = chrono::DateTime::parse_from_rfc3339(&b.timestamp).unwrap();
at.cmp(&bt)
.then(a.data.source.cmp(&b.data.source))
.then(a.data.predicate.cmp(&b.data.predicate))
.then(a.data.target.cmp(&b.data.target))
.then(a.author.cmp(&b.author))
};
links.sort_by(cmp);
all_links_sorted.sort_by(cmp);
assert_eq!(links, all_links_sorted);
}

#[tokio::test]
Expand All @@ -2828,12 +2879,22 @@ mod tests {
source: Some(source.to_string()),
..Default::default()
};
let links = perspective.get_links(&query).await.unwrap();
let expected_links: Vec<_> = all_links
let mut links = perspective.get_links(&query).await.unwrap();
let mut expected_links: Vec<_> = all_links
.into_iter()
.filter(|expr| expr.data.source == source)
.collect();
assert_eq!(links.len(), expected_links.len());
let cmp = |a: &DecoratedLinkExpression, b: &DecoratedLinkExpression| {
let at = chrono::DateTime::parse_from_rfc3339(&a.timestamp).unwrap();
let bt = chrono::DateTime::parse_from_rfc3339(&b.timestamp).unwrap();
at.cmp(&bt)
.then(a.data.predicate.cmp(&b.data.predicate))
.then(a.data.target.cmp(&b.data.target))
.then(a.author.cmp(&b.author))
};
links.sort_by(cmp);
expected_links.sort_by(cmp);
assert_eq!(links, expected_links);
}

Expand Down
6 changes: 5 additions & 1 deletion tests/js/tests/prolog-and-literals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,11 @@ describe("Prolog + Literals", () => {

// Verify models are updated
const recipesAfterUpdate = await BatchRecipe.findAll(perspective!);
expect(recipesAfterUpdate[0].ingredients).to.deep.equal(["pasta", "sauce", "cheese", "garlic"]);
expect(recipesAfterUpdate[0].ingredients.length).to.equal(4);
expect(recipesAfterUpdate[0].ingredients.includes("pasta")).to.be.true;
expect(recipesAfterUpdate[0].ingredients.includes("sauce")).to.be.true;
expect(recipesAfterUpdate[0].ingredients.includes("cheese")).to.be.true;
expect(recipesAfterUpdate[0].ingredients.includes("garlic")).to.be.true;

const notesAfterUpdate = await BatchNote.findAll(perspective!);
expect(notesAfterUpdate[0].content).to.equal("Updated: Use fresh ingredients and add garlic");
Expand Down