Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rust-executor/src/neighbourhoods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ pub async fn neighbourhood_publish_from_perspective(
// Add shared perspective to original perspective and then update controller
perspective_handle.shared_url = Some(neighbourhood_url.clone());
perspective_handle.neighbourhood = Some(neighbourhood_exp);
perspective_handle.state = PerspectiveState::Synced;
perspective_handle.state = PerspectiveState::NeighbourhoodCreationInitiated;
update_perspective(&perspective_handle)
.await
.map_err(|e| anyhow!(e))?;

// Ensure any existing shared links are committed to the link language
// This is critical for early links created before neighbourhood sharing
// We need to do this after the neighbourhood is created but before other agents join
perspective.ensure_public_links_are_shared().await;
Ok(neighbourhood_url)
}

Expand Down
109 changes: 106 additions & 3 deletions rust-executor/src/perspectives/perspective_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ pub struct PerspectiveInstance {
immediate_commits_remaining: Arc<Mutex<usize>>,
subscribed_queries: Arc<Mutex<HashMap<String, SubscribedQuery>>>,
batch_store: Arc<RwLock<HashMap<String, PerspectiveDiff>>>,
// Fallback sync tracking for ensure_public_links_are_shared
last_successful_fallback_sync: Arc<Mutex<Option<tokio::time::Instant>>>,
fallback_sync_interval: Arc<Mutex<Duration>>,
}

impl PerspectiveInstance {
Expand All @@ -196,6 +199,9 @@ impl PerspectiveInstance {
immediate_commits_remaining: Arc::new(Mutex::new(IMMEDIATE_COMMITS_COUNT)),
subscribed_queries: Arc::new(Mutex::new(HashMap::new())),
batch_store: Arc::new(RwLock::new(HashMap::new())),
// Initialize fallback sync tracking
last_successful_fallback_sync: Arc::new(Mutex::new(None)),
fallback_sync_interval: Arc::new(Mutex::new(Duration::from_secs(30))),
}
}

Expand All @@ -205,7 +211,8 @@ impl PerspectiveInstance {
self.notification_check_loop(),
self.nh_sync_loop(),
self.pending_diffs_loop(),
self.subscribed_queries_loop()
self.subscribed_queries_loop(),
self.fallback_sync_loop()
);
}

Expand Down Expand Up @@ -274,7 +281,12 @@ impl PerspectiveInstance {
let mut link_language_guard = self.link_language.lock().await;
if let Some(link_language) = link_language_guard.as_mut() {
match link_language.sync().await {
Ok(_) => (),
Ok(_) => {
// Transition to Synced state on successful sync
let _ = self
.update_perspective_state(PerspectiveState::Synced)
.await;
}
Err(e) => {
log::error!("Error calling sync on link language: {:?}", e);
let _ = self
Expand Down Expand Up @@ -416,7 +428,7 @@ impl PerspectiveInstance {
}
}

async fn ensure_public_links_are_shared(&self) {
pub async fn ensure_public_links_are_shared(&self) -> bool {
let uuid = self.persisted.lock().await.uuid.clone();
let mut link_language_guard = self.link_language.lock().await;
if let Some(link_language) = link_language_guard.as_mut() {
Expand Down Expand Up @@ -451,6 +463,7 @@ impl PerspectiveInstance {
}

if !links_to_commit.is_empty() {
let links_count = links_to_commit.len();
let result = link_language
.commit(PerspectiveDiff {
additions: links_to_commit,
Expand All @@ -460,11 +473,18 @@ impl PerspectiveInstance {

if let Err(e) = result {
log::error!("Error calling link language's commit in ensure_public_links_are_shared: {:?}", e);
return false;
}
log::debug!(
"Successfully committed {} links to link language in fallback sync",
links_count
);
}

//Ad4mDb::with_global_instance(|db| db.add_many_links(&self.persisted.lock().await.uuid, &remote_links)).unwrap(); // Assuming add_many_links takes a reference to a Vec<LinkExpression> and returns Result<(), AnyError>
return true;
}
false
}

pub async fn update_perspective_state(&self, state: PerspectiveState) -> Result<(), AnyError> {
Expand Down Expand Up @@ -909,6 +929,8 @@ impl PerspectiveInstance {

if status == LinkStatus::Shared {
self.spawn_commit_and_handle_error(&diff);
// Reset fallback sync interval when new shared links are added
self.reset_fallback_sync_interval().await;
}
Ok(decorated_diff)
}
Expand Down Expand Up @@ -2606,6 +2628,87 @@ impl PerspectiveInstance {
}
}

async fn fallback_sync_loop(&self) {
let uuid = self.persisted.lock().await.uuid.clone();
log::debug!("Starting fallback sync loop for perspective {}", uuid);

while !*self.is_teardown.lock().await {
// Check if we should run the fallback sync (avoid holding multiple locks)
let should_run = {
// Check perspective state first
let is_synced_neighbourhood = {
let handle = self.persisted.lock().await;
let result =
handle.state == PerspectiveState::Synced && handle.neighbourhood.is_some();
drop(handle); // Release lock immediately
result
};

if !is_synced_neighbourhood {
false
} else {
// Check link language availability
let link_lang_available = {
let link_lang = self.link_language.lock().await;
let result = link_lang.is_some();
drop(link_lang); // Release lock immediately
result
};

if !link_lang_available {
false
} else {
// Check timing conditions
let last_success = *self.last_successful_fallback_sync.lock().await;
let current_interval = *self.fallback_sync_interval.lock().await;

// Only run if we haven't had a successful sync recently or it's been a while
last_success.is_none() || last_success.unwrap().elapsed() > current_interval
}
}
};

if should_run {
log::debug!("Running fallback sync for perspective {}", uuid);
let success = self.ensure_public_links_are_shared().await;

if success {
// Update last successful sync time and increase interval
{
*self.last_successful_fallback_sync.lock().await =
Some(tokio::time::Instant::now());
*self.fallback_sync_interval.lock().await = Duration::from_secs(300);
}
log::debug!("Fallback sync successful for perspective {}, increasing interval to 5 minutes", uuid);
} else {
// Reset interval to 30 seconds on failure
*self.fallback_sync_interval.lock().await = Duration::from_secs(30);
log::warn!(
"Fallback sync failed for perspective {}, keeping interval at 30 seconds",
uuid
);
}
}

// Get fresh interval for sleep (after potential updates)
let sleep_interval = *self.fallback_sync_interval.lock().await;
sleep(sleep_interval).await;
}

log::debug!("Fallback sync loop ended for perspective {}", uuid);
}

/// Reset the fallback sync interval to 30 seconds when new links are added
/// This ensures that new links get synced quickly
async fn reset_fallback_sync_interval(&self) {
*self.fallback_sync_interval.lock().await = Duration::from_secs(30);
let uuid = self.persisted.lock().await.uuid.clone();
log::debug!(
"Reset fallback sync interval to 30 seconds for perspective {}",
uuid
);
}

pub async fn create_batch(&self) -> String {
let batch_uuid = Uuid::new_v4().to_string();
self.batch_store.write().await.insert(
Expand Down
52 changes: 42 additions & 10 deletions tests/js/tests/neighbourhood.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,20 @@ export default function neighbourhoodTests(testContext: TestContext) {
expect(perspective?.neighbourhood).not.to.be.undefined;
expect(perspective?.neighbourhood!.data.linkLanguage).to.be.equal(socialContext.address);
expect(perspective?.neighbourhood!.data.meta.links.length).to.be.equal(1);
expect(perspective?.state).to.be.equal(PerspectiveState.Synced);

// The perspective should start in NeighbourhoodCreationInitiated state
expect(perspective?.state).to.be.equal(PerspectiveState.NeighboudhoodCreationInitiated);

// Wait for the perspective to transition to Synced state
let tries = 0;
const maxTries = 10;
let currentPerspective = perspective;
while (currentPerspective?.state !== PerspectiveState.Synced && tries < maxTries) {
await sleep(1000);
currentPerspective = await ad4mClient.perspective.byUUID(create.uuid);
tries++;
}
expect(currentPerspective?.state).to.be.equal(PerspectiveState.Synced);
})

it('can be created by Alice and joined by Bob', async () => {
Expand Down Expand Up @@ -154,12 +167,12 @@ export default function neighbourhoodTests(testContext: TestContext) {
}
//await Promise.all(linkPromises)

console.log("wait 10s")
await sleep(10000)
console.log("wait 15s for initial sync")
await sleep(15000)

let bobLinks = await bob.perspective.queryLinks(bobP1!.uuid, new LinkQuery({source: 'root'}))
let tries = 1
const maxTries = 120 // 2 minutes with 1 second sleep
const maxTries = 180 // 3 minutes with 1 second sleep (increased for fallback sync)

while(bobLinks.length < 1500 && tries < maxTries) {
console.log(`Bob retrying getting links... Got ${bobLinks.length}/1500`);
Expand All @@ -186,11 +199,19 @@ export default function neighbourhoodTests(testContext: TestContext) {
await testContext.alice.perspective.addLink(aliceP1.uuid, {source: 'alice', target: 'test://alice/2'})
await testContext.alice.perspective.addLink(aliceP1.uuid, {source: 'alice', target: 'test://alice/3'})

// Wait for sync
await sleep(5000)
// Wait for sync with retry loop
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
let bobTries = 1
const maxTriesBob = 20 // 20 tries with 2 second sleep = 40 seconds max

while(bobLinks.length < 3 && bobTries < maxTriesBob) {
console.log(`Bob retrying getting Alice's links... Got ${bobLinks.length}/3`);
await sleep(2000)
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
bobTries++
}

// Verify Bob received Alice's links
bobLinks = await testContext.bob.perspective.queryLinks(bobP1.uuid, new LinkQuery({source: 'alice'}))
expect(bobLinks.length).to.equal(3)
expect(bobLinks.some(link => link.data.target === 'test://alice/1')).to.be.true
expect(bobLinks.some(link => link.data.target === 'test://alice/2')).to.be.true
Expand Down Expand Up @@ -321,10 +342,21 @@ export default function neighbourhoodTests(testContext: TestContext) {
})

it('they see each other in `otherAgents`', async () => {
await sleep(10000);
const aliceAgents = await aliceNH!.otherAgents()
// Wait for agents to discover each other with retry loop
let aliceAgents = await aliceNH!.otherAgents()
let bobAgents = await bobNH!.otherAgents()
let tries = 1
const maxTries = 20 // 20 tries with 1 second sleep = 20 seconds max

while ((aliceAgents.length < 1 || bobAgents.length < 1) && tries < maxTries) {
console.log(`Waiting for agents to discover each other... Alice: ${aliceAgents.length}, Bob: ${bobAgents.length}`);
await sleep(1000)
aliceAgents = await aliceNH!.otherAgents()
bobAgents = await bobNH!.otherAgents()
tries++
}

console.log("alice agents", aliceAgents);
const bobAgents = await bobNH!.otherAgents()
console.log("bob agents", bobAgents);
expect(aliceAgents.length).to.be.equal(1)
expect(aliceAgents[0]).to.be.equal(bobDID)
Expand Down