Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
64df67c
Add Runner Sem Versioning
leszko Apr 14, 2025
ecdeb05
WIP: Filter out Orchestrators with wrong Runner Version
leszko Apr 14, 2025
0b8066e
Propagate Runner version to Gateway
leszko Apr 14, 2025
091478f
Fix imports
leszko Apr 14, 2025
4663320
Add '-aiMinRunnerVersions' flag
leszko Apr 14, 2025
e3ad48b
Add propagating the flag in Gateway
leszko Apr 14, 2025
c74891f
Add filtering by version
leszko Apr 14, 2025
5b9e802
Exclude Orchestrator by min runner version
leszko Apr 15, 2025
7984b6a
Remove versioning from the box
leszko Apr 15, 2025
18c6387
Regenerate openapi.yaml
leszko Apr 15, 2025
ac6ecbb
Refactor
leszko Apr 15, 2025
e4ea351
Merge branch 'master' into rafal/runner-version
leszko Apr 15, 2025
e932310
Add unit tests
leszko Apr 15, 2025
b7b7c7e
Fix existing unit tests
leszko Apr 15, 2025
c4833fe
Refactor
leszko Apr 15, 2025
1dd150c
Add info that it works only for warm runner containers
leszko Apr 16, 2025
a343337
Merge branch 'master' into rafal/runner-version
leszko Apr 17, 2025
7f08c25
Merge branch 'master' into rafal/runner-version
leszko Apr 17, 2025
da3e05c
Merge branch 'master' into rafal/runner-version
leszko Apr 17, 2025
062135b
Fix returning the default runner version
leszko May 5, 2025
610b1c4
Merge remote-tracking branch 'origin/rafal/runner-version' into rafal…
leszko May 5, 2025
7d9488b
Add warning if multiple runner versions set
leszko May 5, 2025
7373d29
Merge branch 'master' into rafal/runner-version
leszko May 5, 2025
547cd4a
Update go bindings
leszko May 5, 2025
bb41a5d
Fix unit test
leszko May 5, 2025
7fe3364
Refactor
leszko May 5, 2025
07432e4
Merge branch 'master' into rafal/runner-version
leszko May 9, 2025
a020605
Merge branch 'master' into rafal/runner-version
leszko May 9, 2025
27c4c39
Final fixers
leszko May 9, 2025
3965de3
Remove unused WorkerVersion()
leszko May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ai/worker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Name string
Client *ClientWithResponses
Hardware *HardwareInformation
Version *Version

BorrowCtx context.Context
sync.RWMutex
Expand Down Expand Up @@ -87,12 +88,18 @@
} else {
hardware = hdw
}
runnerVersion := &Version{Pipeline: cfg.Pipeline, ModelId: cfg.ModelID, Version: "0.0.0"}
version, err := client.VersionWithResponse(ctx)
if err == nil {
runnerVersion = version.JSON200
}

Check warning on line 95 in ai/worker/container.go

View check run for this annotation

Codecov / codecov/patch

ai/worker/container.go#L94-L95

Added lines #L94 - L95 were not covered by tests

return &RunnerContainer{
RunnerContainerConfig: cfg,
Name: name,
Client: client,
Hardware: hardware,
Version: runnerVersion,
}, nil
}

Expand Down
309 changes: 224 additions & 85 deletions ai/worker/runner.gen.go

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions ai/worker/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package worker

import (
"context"
"github.com/Masterminds/semver/v3"
"github.com/livepeer/go-livepeer/clog"
)

// LowestVersion returns the lowest version of a given pipeline and model ID from a list of versions.
func LowestVersion(versions []Version, pipeline string, modelId string) string {
var res string
var lowest *semver.Version

for _, v := range versions {
if v.Pipeline != pipeline || v.ModelId != modelId {
continue
}
ver, err := semver.NewVersion(v.Version)
if err != nil {
clog.Warningf(context.Background(), "Invalid runner version '%s'", v)
continue

Check warning on line 21 in ai/worker/utils.go

View check run for this annotation

Codecov / codecov/patch

ai/worker/utils.go#L20-L21

Added lines #L20 - L21 were not covered by tests
}
if lowest == nil || ver.LessThan(lowest) {
if lowest != nil {
clog.Warningf(context.Background(), "Orchestrator has multiple versions set for the same pipeline and model ID. Using the lowest version: %s", ver)
}
lowest = ver
res = v.Version
}
}
return res
}
33 changes: 33 additions & 0 deletions ai/worker/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package worker

import "testing"

func TestLowestVersion(t *testing.T) {
versions := []Version{
{Pipeline: "pipeline1", ModelId: "model1", Version: "1.0.0"},
{Pipeline: "pipeline1", ModelId: "model1", Version: "2.0.0"},
{Pipeline: "pipeline2", ModelId: "model2", Version: "1.5.0"},
{Pipeline: "pipeline2", ModelId: "model2", Version: "0.1.0"},
{Pipeline: "pipeline2", ModelId: "model2", Version: "1.5.0"},
{Pipeline: "pipeline2", ModelId: "model2", Version: "0.0.2"},
}

tests := []struct {
pipeline string
modelId string
expected string
}{
{"pipeline1", "model1", "1.0.0"},
{"pipeline2", "model2", "0.0.2"},
{"pipeline3", "model3", ""},
}

for _, test := range tests {
t.Run(test.pipeline+"_"+test.modelId, func(t *testing.T) {
result := LowestVersion(versions, test.pipeline, test.modelId)
if result != test.expected {
t.Errorf("Expected %s, got %s", test.expected, result)
}
})
}
}
21 changes: 21 additions & 0 deletions ai/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@
return hardware
}

func (w *Worker) Version() []Version {
var version []Version
for _, rc := range w.externalContainers {
if rc.Version != nil {
version = append(version, *rc.Version)
} else {
version = append(version, Version{})
}

Check warning on line 100 in ai/worker/worker.go

View check run for this annotation

Codecov / codecov/patch

ai/worker/worker.go#L93-L100

Added lines #L93 - L100 were not covered by tests
}

for _, rc := range w.manager.containers {
if rc.Version != nil {
version = append(version, *rc.Version)
} else {
version = append(version, Version{})
}

Check warning on line 108 in ai/worker/worker.go

View check run for this annotation

Codecov / codecov/patch

ai/worker/worker.go#L103-L108

Added lines #L103 - L108 were not covered by tests
}

return version

Check warning on line 111 in ai/worker/worker.go

View check run for this annotation

Codecov / codecov/patch

ai/worker/worker.go#L111

Added line #L111 was not covered by tests
}

func (w *Worker) TextToImage(ctx context.Context, req GenTextToImageJSONRequestBody) (*ImageResponse, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)
cfg.AIProcessingRetryTimeout = flag.Duration("aiProcessingRetryTimeout", *cfg.AIProcessingRetryTimeout, "Timeout for retrying to initiate AI processing request")
cfg.AIRunnerContainersPerGPU = flag.Int("aiRunnerContainersPerGPU", *cfg.AIRunnerContainersPerGPU, "Number of AI runner containers to run per GPU; default to 1")
cfg.AIMinRunnerVersion = flag.String("aiMinRunnerVersion", *cfg.AIMinRunnerVersion, `JSON specifying the min runner versions for each pipeline. It works ONLY for warm runner containers, SHOULD NOT be used for cold runners. Example: '[{"model_id": "noop", "pipeline": "live-video-to-video", "minVersion": "0.0.2"}]'; if not set, the runner's min version is used"`)

// Live AI:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
Expand Down
9 changes: 9 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
AIVerboseLogs *bool
AIProcessingRetryTimeout *time.Duration
AIRunnerContainersPerGPU *int
AIMinRunnerVersion *string
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
Expand Down Expand Up @@ -221,6 +222,7 @@
defaultAIVerboseLogs := false
defaultAIProcessingRetryTimeout := 2 * time.Second
defaultAIRunnerContainersPerGPU := 1
defaultAIMinRunnerVersion := "[]"

Check warning on line 225 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L225

Added line #L225 was not covered by tests
defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second
Expand Down Expand Up @@ -334,6 +336,7 @@
AIVerboseLogs: &defaultAIVerboseLogs,
AIProcessingRetryTimeout: &defaultAIProcessingRetryTimeout,
AIRunnerContainersPerGPU: &defaultAIRunnerContainersPerGPU,
AIMinRunnerVersion: &defaultAIMinRunnerVersion,

Check warning on line 339 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L339

Added line #L339 was not covered by tests
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
Expand Down Expand Up @@ -1321,6 +1324,9 @@
}
}

// For now, we assume that the version served by the orchestrator is the lowest from all remote workers
modelConstraint.RunnerVersion = worker.LowestVersion(n.AIWorker.Version(), config.Pipeline, config.ModelID)

Check warning on line 1329 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1328-L1329

Added lines #L1328 - L1329 were not covered by tests
// Show warning if people set OptimizationFlags but not Warm.
if len(config.OptimizationFlags) > 0 && !config.Warm {
glog.Warningf("Model %v has 'optimization_flags' set without 'warm'. Optimization flags are currently only used for warm containers.", config.ModelID)
Expand Down Expand Up @@ -1605,6 +1611,9 @@
if cfg.OrchMinLivepeerVersion != nil {
n.Capabilities.SetMinVersionConstraint(*cfg.OrchMinLivepeerVersion)
}
if cfg.AIMinRunnerVersion != nil {
n.Capabilities.SetMinRunnerVersionConstraint(*cfg.AIMinRunnerVersion)
}

Check warning on line 1616 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1614-L1616

Added lines #L1614 - L1616 were not covered by tests
if n.AIWorkerManager != nil {
// Set min version constraint to prevent incompatible workers.
n.Capabilities.SetMinVersionConstraint(core.LivepeerVersion)
Expand Down
1 change: 1 addition & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type AI interface {
HasCapacity(string, string) bool
EnsureImageAvailable(context.Context, string, string) error
HardwareInformation() []worker.HardwareInformation
Version() []worker.Version
}

// Custom type to parse a big.Rat from a JSON number.
Expand Down
1 change: 1 addition & 0 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RemoteAIWorker struct {
stream net.AIWorker_RegisterAIWorkerServer
capabilities *Capabilities
hardware []worker.HardwareInformation
version []worker.Version
eof chan struct{}
addr string
}
Expand Down
4 changes: 4 additions & 0 deletions core/ai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ func (a *stubAIWorker) HardwareInformation() []worker.HardwareInformation {
return nil
}

func (a *stubAIWorker) Version() []worker.Version {
return nil
}

type StubAIWorkerServer struct {
manager *RemoteAIWorkerManager
SendError error
Expand Down
89 changes: 78 additions & 11 deletions core/capabilities.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package core

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

"github.com/Masterminds/semver/v3"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/lpms/ffmpeg"
Expand All @@ -15,8 +18,9 @@
type ModelConstraints map[string]*ModelConstraint

type ModelConstraint struct {
Warm bool
Capacity int
Warm bool
Capacity int
RunnerVersion string
}

type Capability int
Expand Down Expand Up @@ -282,6 +286,26 @@
return true
}

func (c *PerCapabilityConstraints) SetRunnerVersion(cap Capability, modelId, version string) {
if c == nil {
return
}

Check warning on line 292 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L291-L292

Added lines #L291 - L292 were not covered by tests
if (*c)[cap] == nil {
(*c)[cap] = &CapabilityConstraints{Models: make(ModelConstraints)}
}
if (*c)[cap].Models[modelId] == nil {
(*c)[cap].Models[modelId] = &ModelConstraint{}
}
(*c)[cap].Models[modelId].RunnerVersion = version
}

func (c *PerCapabilityConstraints) GetRunnerVersion(cap Capability, modelId string) string {
if c == nil || (*c)[cap] == nil || (*c)[cap].Models[modelId] == nil {
return ""
}
return (*c)[cap].Models[modelId].RunnerVersion
}

func (c1 *CapabilityConstraints) CompatibleWith(c2 *CapabilityConstraints) bool {
return c1.Models.CompatibleWith(c2.Models)
}
Expand All @@ -298,6 +322,10 @@
// c1 requires the model ID to be warm, but c2's model ID is not warm so it is incompatible
return false
}

if !isVersionCompatible(c1ModelConstraint.RunnerVersion, c2ModelConstraint.RunnerVersion) {
return false
}

Check warning on line 328 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L326-L328

Added lines #L326 - L328 were not covered by tests
}

return true
Expand Down Expand Up @@ -412,27 +440,35 @@
}

func (bcast *Capabilities) LivepeerVersionCompatibleWith(orch *net.Capabilities) bool {
if bcast == nil || orch == nil || bcast.constraints.minVersion == "" {
if bcast == nil || orch == nil {
// should not happen, but just in case, return true by default
return true
}
if orch.Version == "" || orch.Version == "undefined" {
// Orchestrator/Transcoder version is not set, so it's incompatible
return isVersionCompatible(bcast.constraints.minVersion, orch.Version)
}

func isVersionCompatible(minVersion, version string) bool {
if minVersion == "" {
// min version not defined in Gateway, any version is compatible
return true
}
if version == "" || version == "undefined" {
// Orchestrator/Transcoder/Runner version is not set, so it's incompatible
return false
}

minVer, err := semver.NewVersion(bcast.constraints.minVersion)
minVer, err := semver.NewVersion(minVersion)
if err != nil {
glog.Warningf("error while parsing minVersion: %v", err)
return true
}
ver, err := semver.NewVersion(orch.Version)
ver, err := semver.NewVersion(version)
if err != nil {
glog.Warningf("error while parsing version: %v", err)
return false
}

// Ignore prerelease versions as in go-livepeer we actually define post-release suffixes
// Ignore prerelease versions as in go-livepeer/ai-runner we actually define post-release suffixes
minVerNoSuffix, _ := minVer.SetPrerelease("")
verNoSuffix, _ := ver.SetPrerelease("")

Expand Down Expand Up @@ -486,8 +522,9 @@
models := make(map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &net.Capabilities_CapabilityConstraints_ModelConstraint{
Warm: modelConstraint.Warm,
Capacity: uint32(modelConstraint.Capacity),
Warm: modelConstraint.Warm,
Capacity: uint32(modelConstraint.Capacity),
RunnerVersion: modelConstraint.RunnerVersion,
}
}

Expand Down Expand Up @@ -530,7 +567,7 @@
for capabilityInt, constraints := range caps.Constraints.PerCapability {
models := make(map[string]*ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: int(modelConstraint.Capacity)}
models[modelID] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: int(modelConstraint.Capacity), RunnerVersion: modelConstraint.RunnerVersion}
}

coreCaps.constraints.perCapability[Capability(capabilityInt)] = &CapabilityConstraints{
Expand Down Expand Up @@ -763,3 +800,33 @@
}
return ""
}

func (bcast *Capabilities) SetMinRunnerVersionConstraint(minVersionsJson string) {
type MinVersionEntry struct {
ModelID string `json:"model_id"`
Pipeline string `json:"pipeline"`
MinVersion string `json:"minVersion"`
}
var entries []MinVersionEntry
err := json.Unmarshal([]byte(minVersionsJson), &entries)
if err != nil {
clog.Errorf(context.Background(), "Cannot unmarshal minVersionJson: %v", err)
return
}
for _, e := range entries {
if e.ModelID == "" || e.Pipeline == "" || e.MinVersion == "" {
clog.Errorf(context.Background(), "Invalid minVersionJson entry: %v", e)
continue

Check warning on line 819 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L818-L819

Added lines #L818 - L819 were not covered by tests
}
c, err := PipelineToCapability(e.Pipeline)
if err != nil {
clog.Errorf(context.Background(), "Cannot convert pipeline to capability: %v", err)
continue

Check warning on line 824 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L823-L824

Added lines #L823 - L824 were not covered by tests
}
bcast.constraints.perCapability.SetRunnerVersion(c, e.ModelID, e.MinVersion)
}
}

func (bcast *Capabilities) MinRunnerVersionConstraint(cap Capability, modelID string) string {
return bcast.constraints.perCapability.GetRunnerVersion(cap, modelID)
}
Loading
Loading