Conversation
network/client.go
Outdated
| } | ||
| c.runenv.RecordMessage(InitialisationSuccessful) | ||
|
|
||
| err = c.syncClient.SignalEvent(ctx, runtime.NewStageNotification(c.runenv.TestGroupID, "network-initialized", "exit")) |
There was a problem hiding this comment.
Can we record exit:err or exit:ok depending on the outcome. Also, should notifications be able to carry a message? It seems like propagating the error could save us from having to look into logs.
There was a problem hiding this comment.
The idea behind notifications is for them to be aggregated - I don't really have a use-case for them to propagate specific messages.
exit:err and exit:ok do not belong here as far as I can tell, this is specifically the place that emits a notification on one of the barriers we care about (and that we've had issues in the past, once networking goes down)... so I am not sure I follow the question :)
raulk
left a comment
There was a problem hiding this comment.
Oh, and we should probably add tests for this stuff ;-)
60b22ab to
ba89012
Compare
| }, nil | ||
| } | ||
|
|
||
| func (w *WatchClient) FetchAllEvents(rp *runtime.RunParams) ([]*runtime.Event, error) { |
There was a problem hiding this comment.
The method implicitly assumes that this will be called at the end. If that's the case, we should document it (as it's part of the contract), and instead of doing this undeterministic pagination, we should do an XLen and consume that many elements (paginating, if necessary), to make this method deterministic.
Why not move this to a channel-based subscription? We are going to need real-time consumption pretty soon. And we are already doing a funny dance here to paginate, anyway. We don't need to implement indefinite Block and the unblock flow (like the sync service topic subscription does).
Just passing in a context, keeping the block time of 1 second, and looping until the context is closed is enough.
We can also pass a closeOnTerminalEvt bool flag that closes the channel when we find a terminal event (crash, success, failure). We can add Event#IsTerminal() bool to abstract over that.
That would allow the daemon to do:
wcl := NewWatchClient()
for evt := range wcl.FetchEvents(ctx) {
}There was a problem hiding this comment.
Right - this method is not guaranteed to exit, if the producer keeps on spamming events.
We should either refactor and just use the Subscribe method, or use XLen as you suggest.
This PR is introducing a
notificationpackage in the SDK - a way for the daemon to receive notifications from thesync servicewith respect to events on the test instances.Notifications' goal is to convey information to the Testground daemon with respect to the:
When test plans have multiple stages, this functionality will make it easier to determine at which stage a test run blocked and why.
TODO:
FOLLOW UP ISSUE:
SubscribeEventsfunction toWatchClient, so that we can monitor incoming events as they happen (useful for the Stage events).FetchAllEventsandSubscribeEventsand maybe unify in one method.