ai/live: Store and forward parameter updates.#3682
Conversation
This allows us to correctly handle cases where: * We haven't selected an orchestrator yet but the client sent a control API update. * We've swapped orchestrators. * We're mid-swap without an orchestrator. In all these cases, the newly selected orchestrator will receive the most recent parameter update as soon as it's selected. Also add a 1 MB limit for reading parameters.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3682 +/- ##
===================================================
- Coverage 31.97787% 31.94626% -0.03161%
===================================================
Files 156 156
Lines 47364 47414 +50
===================================================
+ Hits 15146 15147 +1
- Misses 31325 31374 +49
Partials 893 893
... and 1 file with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
| } | ||
|
|
||
| // Send any cached control params in a goroutine outside the lock. | ||
| msg := sess.Params |
There was a problem hiding this comment.
Don't we need to lock when reading this as it gets updated in the UpdateLiveVideo() function? Also wondering if there's a race condition if UpdateLiveVideo() is called while we're applying the cached params here and we end up applying older cached params rather than latest.
There was a problem hiding this comment.
Don't we need to lock when reading this as it gets updated in the UpdateLiveVideo() function?
The lock is acquired earlier in the function, but we don't want to hold it while doing the network write, hence the goroutine.
Also wondering if there's a race condition if UpdateLiveVideo() is called
Yep, it's not watertight. The raciness is also there within UpdateLiveVideo, since we could swap orchestrators and it could end up sending to the wrong orch.
We could:
-
Block all other trickle control messages (for all streams on the gateway, not just this one) while we send the message. That was the old behavior.
-
Add another per-stream mutex within the LivePipelines struct, but that could still block forward progress within the same stream.
-
Re-create the LivePipeline entry so we're not dealing with a shared object and a per-stream lock. (We'd still need the global
LiveMulock though.) That doesn't solve the fundamental problem, which is that there could be a param update while swapping orchs, or orch swap while updating params. -
Do it like we do now and ignore the problem 🙈 Not great, but we can at least make forward progress, even if the result is that the param update didn't get applied. For our case, I think that is better than not being able to send params at all, because we're blocked by another orchestrator.
It should be rare for this to happen ... it will probably happen at scale, but we're not there yet.
This is the shape of the problem:
There was a problem hiding this comment.
IMO, it's ok to accept this "race condition" for now. At this point, I think we have tons of other bugs, which are causing more issues. At some point, we'll need to address this one, but not a super high priority right now.
There was a problem hiding this comment.
Yep totally ok with that. Thanks for the detail Josh, will be useful for later on
| }, | ||
| } | ||
|
|
||
| registerControl(ctx, params) |
There was a problem hiding this comment.
With your change, I see that we'll mark params.node.LivePipelines sooner in the process. It will be set before we even start the Orchestrator discovery vs before we marked it after the discovery during the tickle subscription startup.
This will make inputStreamExists() return true sooner. And it will also mean that we if the discovery fails, we'll need to clean up params.node.LivePipelines. Have you checked these 2 scenarios? From the top of my head, the first scenario may be ok, but the 2nd was a problem in the past, because we used to not clean up correctly params.node.LivePipelines, when the segmentation failed before the Orchestrator discovery.
There was a problem hiding this comment.
. It will be set before we even start the Orchestrator discovery vs before we marked it after the discovery during the tickle subscription startup.
Correct, because orch selection could be a bit delayed, but we still want to be able to receive parameter updates as soon as the stream is connected.
This will make inputStreamExists() return true sooner.
Correct ... that's technically more accurate behavior. Do you suspect an issue there? I am not sure that I see anything, but could be missing something.
And it will also mean that we if the discovery fails, we'll need to clean up params.node.LivePipelines.
Yes, that behavior has not changed. We do register sooner, but we haven't changed how clean up works. We still clean up LivePipelines after both of the following happen:
- After orch selection completes (whether successfully or not)
- After the input stream disconnects
BTW I just tested this to be sure and ... we do have a problem! It's not from this PR though. Basically, processStream never returns if there is an error selecting the first orchestrator, so we'd be stuck here forever and not make any progress in cleaning up the rest of the stack. Here's a fix for that: 0ca0c9e
There was a problem hiding this comment.
Ahh, thanks for the fix.
I looked again at the code and I think it should fine.
| }, | ||
| } | ||
|
|
||
| registerControl(ctx, params) |
There was a problem hiding this comment.
Ahh, thanks for the fix.
I looked again at the code and I think it should fine.
| } | ||
|
|
||
| // Send any cached control params in a goroutine outside the lock. | ||
| msg := sess.Params |
There was a problem hiding this comment.
IMO, it's ok to accept this "race condition" for now. At this point, I think we have tons of other bugs, which are causing more issues. At some point, we'll need to address this one, but not a super high priority right now.

This allows us to correctly handle cases where:
We haven't selected an orchestrator yet but the client sent a control API update.
We've swapped orchestrators.
We're mid-swap without an orchestrator.
In all these cases, the newly selected orchestrator will receive the most recent parameter update as soon as it's selected.
Also add a 1 MB limit for reading parameters.