Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

#### General

* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges)

#### CLI
52 changes: 37 additions & 15 deletions ai/worker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ type DockerManager struct {
// Map of idle containers. container name => container
containers map[string]*RunnerContainer
mu *sync.Mutex
// Every managed container has a watchContainer() goroutine that will stop when the manager ctx is done. We use the WaitGroup to wait for all containers to be removed.
watchGroup sync.WaitGroup
ctx context.Context
stop context.CancelFunc
}

func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string, modelDir string, client DockerClient, containerCreatorID string) (*DockerManager, error) {
Expand All @@ -134,12 +138,13 @@ func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string,
}
}

ctx, cancel := context.WithTimeout(context.Background(), containerTimeout)
if _, err := RemoveExistingContainers(ctx, client, containerCreatorID); err != nil {
cancel()
ctx, cancel := context.WithCancel(context.Background())

cleanupCtx, cleanupCancel := context.WithTimeout(ctx, containerTimeout)
defer cleanupCancel()
if _, err := RemoveExistingContainers(cleanupCtx, client, containerCreatorID); err != nil {
return nil, err
}
cancel()

manager := &DockerManager{
gpus: gpus,
Expand All @@ -151,6 +156,9 @@ func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string,
gpuContainers: make(map[string]*RunnerContainer),
containers: make(map[string]*RunnerContainer),
mu: &sync.Mutex{},
watchGroup: sync.WaitGroup{},
ctx: ctx,
stop: cancel,
}

return manager, nil
Expand Down Expand Up @@ -189,17 +197,21 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin
}

func (m *DockerManager) Stop(ctx context.Context) error {
var stopContainerWg sync.WaitGroup
for _, rc := range m.containers {
stopContainerWg.Add(1)
go func(container *RunnerContainer) {
defer stopContainerWg.Done()
m.destroyContainer(container, false)
}(rc)
}
/// Flag the stop signal to all watchContainer() goroutines and wait for them to (destroy containers) and exit.
m.stop()

stopContainerWg.Wait()
return nil
done := make(chan struct{})
go func() {
m.watchGroup.Wait()
close(done)
}()

select {
case <-done:
return nil
case <-ctx.Done():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should m.ctx.Done() be checked as well or is that implicit in the wait group / done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the manager's ctx will always be done here, since we call m.stop() in the beginning

return ctx.Err()
}
}

func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*RunnerContainer, error) {
Expand Down Expand Up @@ -495,7 +507,7 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
slog.Info("Warm container started on loading state, removing from pool on startup", slog.String("container", rc.Name))
m.borrowContainerLocked(context.Background(), rc)
}
go m.watchContainer(rc)
m.watchGroup.Go(func() { m.watchContainer(rc) })

return rc, nil
}
Expand Down Expand Up @@ -587,6 +599,13 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer) {
var loadingStartTime time.Time
failures := 0
for {
if m.ctx.Err() != nil {
slog.Info("Docker manager context is done, stopping container", "container", rc.Name)
m.destroyContainer(rc, false)
slog.Info("Container destroyed", "container", rc.Name)
return
}

if failures >= maxHealthCheckFailures {
slog.Error("Container health check failed too many times", slog.String("container", rc.Name))
m.destroyContainer(rc, false)
Expand Down Expand Up @@ -617,6 +636,9 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer) {
}

select {
case <-m.ctx.Done():
// handled in the beginning of the loop
continue
case <-borrowDone:
m.returnContainer(rc)
continue
Expand Down
3 changes: 3 additions & 0 deletions ai/worker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewMockServer() *MockServer {

// createDockerManager creates a DockerManager with a mock DockerClient.
func createDockerManager(mockDockerClient *MockDockerClient) *DockerManager {
ctx, cancel := context.WithCancel(context.Background())
return &DockerManager{
gpus: []string{"gpu0"},
modelDir: "/models",
Expand All @@ -107,6 +108,8 @@ func createDockerManager(mockDockerClient *MockDockerClient) *DockerManager {
gpuContainers: make(map[string]*RunnerContainer),
containers: make(map[string]*RunnerContainer),
mu: &sync.Mutex{},
ctx: ctx,
stop: cancel,
}
}

Expand Down
Loading