diff --git a/VERSION b/VERSION index 86a2e95..f01291b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.5.7-dev +1.5.7 diff --git a/drsm/updates.go b/drsm/updates.go index e49eaa7..212dbe9 100644 --- a/drsm/updates.go +++ b/drsm/updates.go @@ -110,6 +110,12 @@ func (d *Drsm) handleDbUpdates() { } } +func (d *Drsm) ensurePodChunksInitialized(podD *podData) { + if podD.podChunks == nil { + podD.podChunks = make(map[int32]*chunk) + } +} + func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.ChangeStream) { logger.DrsmLog.Debugf("iterate change stream for podData: %v", d) @@ -154,15 +160,32 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan // update on chunkId.. // looks like chunk owner getting change owner := s.Update.UpdFields.PodId + if owner == "" { + logger.DrsmLog.Warnf("stream(Update): missing owner in update for doc %s, operation: %+v", s.DId.Id, s.Update) + continue + } c := getChunkIdFromDocId(s.DId.Id) d.globalChunkTblMutex.Lock() cp := d.globalChunkTbl[c] d.globalChunkTblMutex.Unlock() + if cp == nil { + logger.DrsmLog.Warnf("stream(Update): chunk %d not found in global table for owner %s - will be corrected by periodic resync", c, owner) + // Without a chunk reference there is nothing to update; skip to avoid panic. + // The periodic checkAllChunks() will resync state from MongoDB. + continue + } // TODO update IP address as well. cp.Owner.PodName = owner cp.Owner.PodIp = s.Update.UpdFields.PodIp cp.Owner.PodInstance = s.Update.UpdFields.PodInstance - podD := d.podMap[owner] + podD, found := d.podMap[owner] + if !found { + logger.DrsmLog.Warnf("stream(Update): pod %s not in local map for chunk %d update - will be corrected when keepalive arrives or during periodic resync", owner, c) + // Wait for proper pod initialization via keepalive. Eventual consistency will be maintained by periodic resync and proper keepalive events. + continue + } + // Defensive: should never happen if addPod() was called, but prevents panic + d.ensurePodChunksInitialized(podD) podD.podChunks[c] = cp // add chunk to pod logger.DrsmLog.Infof("stream(Update): pod to chunk map %v", podD.podChunks) } @@ -270,7 +293,7 @@ func (d *Drsm) addChunk(full *FullStream) { func (d *Drsm) addPod(full *FullStream) *podData { podI := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp} pod := &podData{PodId: podI} - pod.podChunks = make(map[int32]*chunk) + d.ensurePodChunksInitialized(pod) d.podMap[full.PodId] = pod logger.DrsmLog.Infof("keepalive insert d.podMaps %v", d.podMap) return pod