diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c24f7d8581..0ebca3dbe3 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,7 +18,8 @@ #### General -* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges) * [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode) +* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges) +* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges) #### CLI diff --git a/ai/worker/docker.go b/ai/worker/docker.go index bec8a803e1..ba75cecd5e 100644 --- a/ai/worker/docker.go +++ b/ai/worker/docker.go @@ -123,6 +123,10 @@ type DockerManager struct { // Map of idle containers. container name => container containers map[string]*RunnerContainer mu *sync.Mutex + // Every managed container has a watchContainer() goroutine that will stop when the manager ctx is done. We use the WaitGroup to wait for all containers to be removed. + watchGroup sync.WaitGroup + ctx context.Context + stop context.CancelFunc } func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string, modelDir string, client DockerClient, containerCreatorID string) (*DockerManager, error) { @@ -134,12 +138,13 @@ func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string, } } - ctx, cancel := context.WithTimeout(context.Background(), containerTimeout) - if _, err := RemoveExistingContainers(ctx, client, containerCreatorID); err != nil { - cancel() + ctx, cancel := context.WithCancel(context.Background()) + + cleanupCtx, cleanupCancel := context.WithTimeout(ctx, containerTimeout) + defer cleanupCancel() + if _, err := RemoveExistingContainers(cleanupCtx, client, containerCreatorID); err != nil { return nil, err } - cancel() manager := &DockerManager{ gpus: gpus, @@ -151,6 +156,9 @@ func NewDockerManager(overrides ImageOverrides, verboseLogs bool, gpus []string, gpuContainers: make(map[string]*RunnerContainer), containers: make(map[string]*RunnerContainer), mu: &sync.Mutex{}, + watchGroup: sync.WaitGroup{}, + ctx: ctx, + stop: cancel, } return manager, nil @@ -189,17 +197,21 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin } func (m *DockerManager) Stop(ctx context.Context) error { - var stopContainerWg sync.WaitGroup - for _, rc := range m.containers { - stopContainerWg.Add(1) - go func(container *RunnerContainer) { - defer stopContainerWg.Done() - m.destroyContainer(container, false) - }(rc) - } + /// Flag the stop signal to all watchContainer() goroutines and wait for them to (destroy containers) and exit. + m.stop() - stopContainerWg.Wait() - return nil + done := make(chan struct{}) + go func() { + m.watchGroup.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } } func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*RunnerContainer, error) { @@ -495,7 +507,7 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo slog.Info("Warm container started on loading state, removing from pool on startup", slog.String("container", rc.Name)) m.borrowContainerLocked(context.Background(), rc) } - go m.watchContainer(rc) + m.watchGroup.Go(func() { m.watchContainer(rc) }) return rc, nil } @@ -587,6 +599,13 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer) { var loadingStartTime time.Time failures := 0 for { + if m.ctx.Err() != nil { + slog.Info("Docker manager context is done, stopping container", "container", rc.Name) + m.destroyContainer(rc, false) + slog.Info("Container destroyed", "container", rc.Name) + return + } + if failures >= maxHealthCheckFailures { slog.Error("Container health check failed too many times", slog.String("container", rc.Name)) m.destroyContainer(rc, false) @@ -617,6 +636,9 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer) { } select { + case <-m.ctx.Done(): + // handled in the beginning of the loop + continue case <-borrowDone: m.returnContainer(rc) continue diff --git a/ai/worker/docker_test.go b/ai/worker/docker_test.go index fd562e20ce..f5cd96a4b0 100644 --- a/ai/worker/docker_test.go +++ b/ai/worker/docker_test.go @@ -98,6 +98,7 @@ func NewMockServer() *MockServer { // createDockerManager creates a DockerManager with a mock DockerClient. func createDockerManager(mockDockerClient *MockDockerClient) *DockerManager { + ctx, cancel := context.WithCancel(context.Background()) return &DockerManager{ gpus: []string{"gpu0"}, modelDir: "/models", @@ -107,6 +108,8 @@ func createDockerManager(mockDockerClient *MockDockerClient) *DockerManager { gpuContainers: make(map[string]*RunnerContainer), containers: make(map[string]*RunnerContainer), mu: &sync.Mutex{}, + ctx: ctx, + stop: cancel, } }