diff --git a/pkg/modelagent/gopher.go b/pkg/modelagent/gopher.go index 36f97df4..eb855332 100644 --- a/pkg/modelagent/gopher.go +++ b/pkg/modelagent/gopher.go @@ -263,6 +263,18 @@ func (s *Gopher) processTask(task *GopherTask) error { // For Download and DownloadOverride tasks, set the node label to "Updating" if task.TaskType == Download || task.TaskType == DownloadOverride { + // Atomic check-and-set: skip if another worker is already downloading this model + s.activeDownloadsMutex.Lock() + if _, isDownloading := s.activeDownloads[modelUID]; isDownloading { + s.activeDownloadsMutex.Unlock() + s.logger.Infof("Model %s is already being downloaded, skipping duplicate download", modelInfo) + return nil + } + // Register the download before releasing the lock to prevent races + ctx, cancel = context.WithCancel(context.Background()) + s.activeDownloads[modelUID] = cancel + s.activeDownloadsMutex.Unlock() + s.logger.Infof("Setting model %s status to Updating before download", modelInfo) nodeLabelOp := &NodeLabelOp{ ModelStateOnNode: Updating, @@ -275,14 +287,6 @@ func (s *Gopher) processTask(task *GopherTask) error { // Continue with download anyway } - // Create a cancellable context for this download - ctx, cancel = context.WithCancel(context.Background()) - - // Register the cancel function - s.activeDownloadsMutex.Lock() - s.activeDownloads[modelUID] = cancel - s.activeDownloadsMutex.Unlock() - // Ensure cleanup on completion defer func() { s.activeDownloadsMutex.Lock()