Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions ai/worker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (m *DockerManager) EnsureImageAvailable(ctx context.Context, pipeline strin
func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID string, optimizationFlags OptimizationFlags) error {
m.mu.Lock()
defer m.mu.Unlock()
defer m.monitorInUse()
defer m.monitorInUse(pipeline, modelID)

_, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags)
if err != nil {
Expand All @@ -183,7 +183,7 @@ func (m *DockerManager) Stop(ctx context.Context) error {
func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*RunnerContainer, error) {
m.mu.Lock()
defer m.mu.Unlock()
defer m.monitorInUse()
defer m.monitorInUse(pipeline, modelID)

var rc *RunnerContainer
var err error
Expand Down Expand Up @@ -221,7 +221,7 @@ func (m *DockerManager) borrowContainerLocked(ctx context.Context, rc *RunnerCon
func (m *DockerManager) returnContainer(rc *RunnerContainer) {
m.mu.Lock()
defer m.mu.Unlock()
defer m.monitorInUse()
defer m.monitorInUse(rc.Pipeline, rc.ModelID)

rc.Lock()
rc.BorrowCtx = nil
Expand Down Expand Up @@ -259,7 +259,7 @@ func (m *DockerManager) getContainerImageName(pipeline, modelID string) (string,
func (m *DockerManager) HasCapacity(ctx context.Context, pipeline, modelID string) bool {
m.mu.Lock()
defer m.mu.Unlock()
defer m.monitorInUse()
defer m.monitorInUse(pipeline, modelID)

// Check if unused managed container exists for the requested model.
for _, rc := range m.containers {
Expand Down Expand Up @@ -541,7 +541,7 @@ func (m *DockerManager) destroyContainer(rc *RunnerContainer, locked bool) error
if !locked {
m.mu.Lock()
defer m.mu.Unlock()
defer m.monitorInUse()
defer m.monitorInUse(rc.Pipeline, rc.ModelID)
}
delete(m.gpuContainers, rc.GPU)
delete(m.containers, rc.Name)
Expand Down Expand Up @@ -766,12 +766,12 @@ func (m *DockerManager) GetCapacity() Capacity {
}
}

func (m *DockerManager) monitorInUse() {
func (m *DockerManager) monitorInUse(pipeline string, modelID string) {
if monitor.Enabled {
capacity := m.GetCapacity()
monitor.AIContainersInUse(capacity.ContainersInUse, "", "")
monitor.AIContainersIdle(capacity.ContainersIdle, "", "")
monitor.AIGPUsIdle(len(m.gpus) - len(m.gpuContainers)) // Indicates a misconfiguration so we should alert on this
monitor.AIContainersInUse(capacity.ContainersInUse, pipeline, modelID)
monitor.AIContainersIdle(capacity.ContainersIdle, pipeline, modelID, "")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here (worker) ai_container_idle is produced with pipeline and model_name and here (orch.) with just model and orchestratorUri. Should we have separate metrics from O and W?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, I think it would be better to have a separate metric from Gateway and a separate from Orchestrator. Because it's confusing right now.

Anyway, I'm ok if it's done later, as a separate PR.

monitor.AIGPUsIdle(len(m.gpus)-len(m.gpuContainers), pipeline, modelID) // Indicates a misconfiguration so we should alert on this
}
}

Expand Down
3 changes: 2 additions & 1 deletion discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func getModelCaps(caps *net.Capabilities) map[string]*net.Capabilities_Capabilit
if !ok {
return nil
}

return liveAI.Models
}

Expand All @@ -244,7 +245,7 @@ func reportLiveAICapacity(info *net.OrchestratorInfo, capsReq common.CapabilityC
}
}

monitor.AIContainersIdle(idle, modelID, orchURL.String())
monitor.AIContainersIdle(idle, "", modelID, orchURL.String())
}
}

Expand Down
37 changes: 23 additions & 14 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,14 +1017,14 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_container_in_use",
Measure: census.mAIContainersInUse,
Description: "Number of containers currently used for AI processing",
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_container_idle",
Measure: census.mAIContainersIdle,
Description: "Number of containers currently available for AI processing",
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName, census.kOrchestratorURI}, baseTags...),
Aggregation: view.LastValue(),
},
{
Expand All @@ -1038,7 +1038,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_current_live_pipelines",
Measure: census.mAICurrentLivePipelines,
Description: "Number of live AI pipelines currently running",
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
TagKeys: append([]tag.Key{census.kPipeline}, baseTags...),
Aggregation: view.LastValue(),
},
{
Expand All @@ -1052,7 +1052,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_live_attempt",
Measure: census.mAILiveAttempts,
Description: "AI Live stream attempted",
TagKeys: baseTags,
TagKeys: append([]tag.Key{census.kModelName}, baseTags...),
Aggregation: view.Count(),
},
{
Expand Down Expand Up @@ -1109,8 +1109,6 @@ func InitCensus(nodeType NodeType, version string) {
stats.Record(census.ctx, census.mWinningTicketsRecv.M(int64(0)))
stats.Record(census.ctx, census.mCurrentSessions.M(int64(0)))
stats.Record(census.ctx, census.mValueRedeemed.M(float64(0)))
stats.Record(census.ctx, census.mAIContainersInUse.M(int64(0)))
stats.Record(census.ctx, census.mAICurrentLivePipelines.M(int64(0)))
}

/*
Expand Down Expand Up @@ -2010,24 +2008,28 @@ func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.
}
}

func AIContainersInUse(currentContainersInUse int, model, uri string) {
func AIContainersInUse(currentContainersInUse int, pipeline, modelID string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, uri)},
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelID)},
census.mAIContainersInUse.M(int64(currentContainersInUse))); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

func AIContainersIdle(currentContainersIdle int, model, uri string) {
func AIContainersIdle(currentContainersIdle int, pipeline, modelID, uri string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, uri)},
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelID), tag.Insert(census.kOrchestratorURI, uri)},
census.mAIContainersIdle.M(int64(currentContainersIdle))); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

func AIGPUsIdle(currentGPUsIdle int) {
stats.Record(census.ctx, census.mAIGPUsIdle.M(int64(currentGPUsIdle)))
func AIGPUsIdle(currentGPUsIdle int, pipeline, modelID string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelID)},
census.mAIGPUsIdle.M(int64(currentGPUsIdle))); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

func AICurrentLiveSessions(sessionsByPipeline map[string]int) {
Expand Down Expand Up @@ -2144,8 +2146,15 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
}
}

func AILiveVideoAttempt() {
stats.Record(census.ctx, census.mAILiveAttempts.M(1))
func AILiveVideoAttempt(modelID string) {
if !Enabled {
return
}
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kModelName, modelID)},
census.mAILiveAttempts.M(1)); err != nil {
glog.Errorf("Error recording %s metric err=%q", census.mAILiveAttempts.Name(), err)
}
}

func AINumOrchestrators(count int, modelName string) {
Expand Down
12 changes: 5 additions & 7 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {

// Count `ai_live_attempts` after successful parameters validation
clog.V(common.VERBOSE).Infof(ctx, "AI Live video attempt")
if monitor.Enabled {
monitor.AILiveVideoAttempt()
}
monitor.AILiveVideoAttempt(pipeline) // this `pipeline` is actually modelID

sendErrorEvent := LiveErrorEventSender(ctx, streamID, map[string]string{
"type": "error",
Expand Down Expand Up @@ -1037,6 +1035,10 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
"url": "",
},
})

clog.V(common.VERBOSE).Infof(ctx, "AI Live video attempt")
monitor.AILiveVideoAttempt(pipeline) // this `pipeline` is actually modelID

go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -1062,10 +1064,6 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
})
}()

if monitor.Enabled {
monitor.AILiveVideoAttempt()
}

if outputURL != "" {
rtmpOutputs = append(rtmpOutputs, outputURL)
}
Expand Down
Loading