Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pkg/modelagent/gopher.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,18 @@ func (s *Gopher) processTask(task *GopherTask) error {

// For Download and DownloadOverride tasks, set the node label to "Updating"
if task.TaskType == Download || task.TaskType == DownloadOverride {
// Atomic check-and-set: skip if another worker is already downloading this model
s.activeDownloadsMutex.Lock()
if _, isDownloading := s.activeDownloads[modelUID]; isDownloading {
s.activeDownloadsMutex.Unlock()
s.logger.Infof("Model %s is already being downloaded, skipping duplicate download", modelInfo)
return nil
}
// Register the download before releasing the lock to prevent races
ctx, cancel = context.WithCancel(context.Background())
s.activeDownloads[modelUID] = cancel
s.activeDownloadsMutex.Unlock()

s.logger.Infof("Setting model %s status to Updating before download", modelInfo)
nodeLabelOp := &NodeLabelOp{
ModelStateOnNode: Updating,
Expand All @@ -275,14 +287,6 @@ func (s *Gopher) processTask(task *GopherTask) error {
// Continue with download anyway
}

// Create a cancellable context for this download
ctx, cancel = context.WithCancel(context.Background())

// Register the cancel function
s.activeDownloadsMutex.Lock()
s.activeDownloads[modelUID] = cancel
s.activeDownloadsMutex.Unlock()

// Ensure cleanup on completion
defer func() {
s.activeDownloadsMutex.Lock()
Expand Down
Loading