Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type LivepeerNode struct {
type LivePipeline struct {
RequestID string
Params string
Pipeline string
ControlPub *trickle.TricklePublisher
StopControl func()
}
Expand Down
62 changes: 44 additions & 18 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,23 @@ type (
mSceneClassification *stats.Int64Measure

// Metrics for AI jobs
mAIModelsRequested *stats.Int64Measure
mAIRequestLatencyScore *stats.Float64Measure
mAIRequestPrice *stats.Float64Measure
mAIRequestError *stats.Int64Measure
mAIResultDownloaded *stats.Int64Measure
mAIResultDownloadTime *stats.Float64Measure
mAIResultUploaded *stats.Int64Measure
mAIResultUploadTime *stats.Float64Measure
mAIResultSaveFailed *stats.Int64Measure
mAIContainersInUse *stats.Int64Measure
mAIContainersIdle *stats.Int64Measure
mAIGPUsIdle *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure
mAILiveAttempts *stats.Int64Measure
mAINumOrchs *stats.Int64Measure
mAIModelsRequested *stats.Int64Measure
mAIRequestLatencyScore *stats.Float64Measure
mAIRequestPrice *stats.Float64Measure
mAIRequestError *stats.Int64Measure
mAIResultDownloaded *stats.Int64Measure
mAIResultDownloadTime *stats.Float64Measure
mAIResultUploaded *stats.Int64Measure
mAIResultUploadTime *stats.Float64Measure
mAIResultSaveFailed *stats.Int64Measure
mAIContainersInUse *stats.Int64Measure
mAIContainersIdle *stats.Int64Measure
mAIGPUsIdle *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
aiLiveSessionsByPipeline map[string]int
mAIFirstSegmentDelay *stats.Int64Measure
mAILiveAttempts *stats.Int64Measure
mAINumOrchs *stats.Int64Measure

mAIWhipTransportBytesReceived *stats.Int64Measure
mAIWhipTransportBytesSent *stats.Int64Measure
Expand Down Expand Up @@ -391,6 +392,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIContainersIdle = stats.Int64("ai_container_idle", "Number of containers currently available for AI processing", "tot")
census.mAIGPUsIdle = stats.Int64("ai_gpus_idle", "Number of idle GPUs (with no configured container)", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.aiLiveSessionsByPipeline = make(map[string]int)
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")
census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot")
census.mAINumOrchs = stats.Int64("ai_orchestrators_available_total", "AI Live number of available orchestrators", "tot")
Expand Down Expand Up @@ -2028,8 +2030,32 @@ func AIGPUsIdle(currentGPUsIdle int) {
stats.Record(census.ctx, census.mAIGPUsIdle.M(int64(currentGPUsIdle)))
}

func AICurrentLiveSessions(currentPipelines int) {
stats.Record(census.ctx, census.mAICurrentLivePipelines.M(int64(currentPipelines)))
func AICurrentLiveSessions(sessionsByPipeline map[string]int) {
census.lock.Lock()
defer census.lock.Unlock()

// Reset all existing pipeline live session counts to zero first.
// This ensures that pipelines that no longer have live sessions are correctly reported as 0.
for k := range census.aiLiveSessionsByPipeline {
census.aiLiveSessionsByPipeline[k] = 0
}
// Update counts for pipelines that currently have live sessions.
for k, v := range sessionsByPipeline {
census.aiLiveSessionsByPipeline[k] = v
}
// Record metrics for all pipelines.
// Iterate over census.aiLiveSessionsByPipeline which now holds the updated (or zeroed) counts.
for k, v := range census.aiLiveSessionsByPipeline {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, k)},
census.mAICurrentLivePipelines.M(int64(v))); err != nil {
glog.Errorf("Error recording metrics for pipeline %q: %v", k, err)
}
if v == 0 {
// Remove zero counts, no need to report it again
delete(census.aiLiveSessionsByPipeline, k)
}
}
}

func AIWhipTransportBytesReceived(bytes int64) {
Expand Down
4 changes: 2 additions & 2 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func registerControl(ctx context.Context, params aiRequestParams) {

params.node.LivePipelines[stream] = &core.LivePipeline{
RequestID: params.liveParams.requestID,
Pipeline: params.liveParams.pipeline,
}
}

Expand Down Expand Up @@ -503,8 +504,7 @@ func startControlPublish(ctx context.Context, control *url.URL, params aiRequest
sess.StopControl = stop

if monitor.Enabled {
monitor.AICurrentLiveSessions(len(params.node.LivePipelines))
logCurrentLiveSessions(params.node.LivePipelines)
monitorCurrentLiveSessions(params.node.LivePipelines)
}

// Send any cached control params in a goroutine outside the lock.
Expand Down
10 changes: 6 additions & 4 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,7 @@ func cleanupControl(ctx context.Context, params aiRequestParams) {
}
delete(node.LivePipelines, stream)
if monitor.Enabled {
monitor.AICurrentLiveSessions(len(node.LivePipelines))
logCurrentLiveSessions(node.LivePipelines)
monitorCurrentLiveSessions(node.LivePipelines)
}
node.LiveMu.Unlock()

Expand All @@ -1189,11 +1188,14 @@ func cleanupControl(ctx context.Context, params aiRequestParams) {
}
}

func logCurrentLiveSessions(pipelines map[string]*core.LivePipeline) {
func monitorCurrentLiveSessions(pipelines map[string]*core.LivePipeline) {
countByPipeline := make(map[string]int)
var streams []string
for k := range pipelines {
for k, v := range pipelines {
countByPipeline[v.Pipeline] = countByPipeline[v.Pipeline] + 1
streams = append(streams, k)
}
monitor.AICurrentLiveSessions(countByPipeline)
clog.V(common.DEBUG).Infof(context.Background(), "Streams currently live (total=%d): %v", len(pipelines), streams)
}

Expand Down
Loading