Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.7-dev
1.5.7
27 changes: 25 additions & 2 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ func (d *Drsm) handleDbUpdates() {
}
}

func (d *Drsm) ensurePodChunksInitialized(podD *podData) {
if podD.podChunks == nil {
podD.podChunks = make(map[int32]*chunk)
}
}

func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.ChangeStream) {
logger.DrsmLog.Debugf("iterate change stream for podData: %v", d)

Expand Down Expand Up @@ -154,15 +160,32 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
// update on chunkId..
// looks like chunk owner getting change
owner := s.Update.UpdFields.PodId
if owner == "" {
logger.DrsmLog.Warnf("stream(Update): missing owner in update for doc %s, operation: %+v", s.DId.Id, s.Update)
continue
}
c := getChunkIdFromDocId(s.DId.Id)
d.globalChunkTblMutex.Lock()
cp := d.globalChunkTbl[c]
d.globalChunkTblMutex.Unlock()
if cp == nil {
logger.DrsmLog.Warnf("stream(Update): chunk %d not found in global table for owner %s - will be corrected by periodic resync", c, owner)
// Without a chunk reference there is nothing to update; skip to avoid panic.
// The periodic checkAllChunks() will resync state from MongoDB.
continue
}
// TODO update IP address as well.
cp.Owner.PodName = owner
cp.Owner.PodIp = s.Update.UpdFields.PodIp
cp.Owner.PodInstance = s.Update.UpdFields.PodInstance
podD := d.podMap[owner]
podD, found := d.podMap[owner]
if !found {
logger.DrsmLog.Warnf("stream(Update): pod %s not in local map for chunk %d update - will be corrected when keepalive arrives or during periodic resync", owner, c)
// Wait for proper pod initialization via keepalive. Eventual consistency will be maintained by periodic resync and proper keepalive events.
continue
}
// Defensive: should never happen if addPod() was called, but prevents panic
d.ensurePodChunksInitialized(podD)
podD.podChunks[c] = cp // add chunk to pod
logger.DrsmLog.Infof("stream(Update): pod to chunk map %v", podD.podChunks)
}
Expand Down Expand Up @@ -270,7 +293,7 @@ func (d *Drsm) addChunk(full *FullStream) {
func (d *Drsm) addPod(full *FullStream) *podData {
podI := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp}
pod := &podData{PodId: podI}
pod.podChunks = make(map[int32]*chunk)
d.ensurePodChunksInitialized(pod)
d.podMap[full.PodId] = pod
logger.DrsmLog.Infof("keepalive insert d.podMaps %v", d.podMap)
return pod
Expand Down