Skip to content

Commit f56f7f4

Browse files
siddharth16396tanjinx
authored andcommitted
[Bugfix] Broken Heartbeat system in Row Streamer (vitessio#18390)
Signed-off-by: siddharth16396 <[email protected]> Signed-off-by: Tanjin Xu <[email protected]>
1 parent e4761bb commit f56f7f4

File tree

2 files changed

+86
-5
lines changed

2 files changed

+86
-5
lines changed

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -376,11 +376,13 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
376376
heartbeatTicker := time.NewTicker(rowStreamertHeartbeatInterval)
377377
defer heartbeatTicker.Stop()
378378
go func() {
379-
select {
380-
case <-rs.ctx.Done():
381-
return
382-
case <-heartbeatTicker.C:
383-
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
379+
for {
380+
select {
381+
case <-rs.ctx.Done():
382+
return
383+
case <-heartbeatTicker.C:
384+
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
385+
}
384386
}
385387
}()
386388

go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,85 @@ func TestStreamRowsCancel(t *testing.T) {
449449
}
450450
}
451451

452+
func TestStreamRowsHeartbeat(t *testing.T) {
453+
if testing.Short() {
454+
t.Skip()
455+
}
456+
457+
// Save original heartbeat interval and restore it after test
458+
originalInterval := rowStreamertHeartbeatInterval
459+
defer func() {
460+
rowStreamertHeartbeatInterval = originalInterval
461+
}()
462+
463+
// Set a very short heartbeat interval for testing (100ms)
464+
rowStreamertHeartbeatInterval = 10 * time.Millisecond
465+
466+
// Save original packet size values and restore after test
467+
originalUseDynamicPacketSize := useDynamicPacketSize
468+
originalDefaultPacketSize := defaultPacketSize
469+
defer func() {
470+
useDynamicPacketSize = originalUseDynamicPacketSize
471+
defaultPacketSize = originalDefaultPacketSize
472+
}()
473+
474+
// Set desired packet size configuration for testing
475+
useDynamicPacketSize = false
476+
defaultPacketSize = 10
477+
478+
execStatements(t, []string{
479+
"create table t1(id int, val varchar(128), primary key(id))",
480+
"insert into t1 values (1, 'test1')",
481+
"insert into t1 values (2, 'test2')",
482+
"insert into t1 values (3, 'test3')",
483+
"insert into t1 values (4, 'test4')",
484+
"insert into t1 values (5, 'test5')",
485+
})
486+
487+
defer execStatements(t, []string{
488+
"drop table t1",
489+
})
490+
491+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
492+
defer cancel()
493+
494+
heartbeatCount := 0
495+
dataReceived := false
496+
497+
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
498+
if rows.Heartbeat {
499+
heartbeatCount++
500+
// After receiving at least 3 heartbeats, we can be confident the fix is working
501+
if heartbeatCount >= 3 {
502+
cancel()
503+
return nil
504+
}
505+
} else if len(rows.Rows) > 0 {
506+
dataReceived = true
507+
}
508+
// Add a small delay to allow heartbeats to be sent
509+
time.Sleep(50 * time.Millisecond)
510+
return nil
511+
})
512+
513+
// We expect context canceled error since we cancel after receiving heartbeats
514+
if err != nil && err.Error() != "stream ended: context canceled" {
515+
t.Errorf("unexpected error: %v", err)
516+
}
517+
518+
// Verify we received data
519+
if !dataReceived {
520+
t.Error("expected to receive data rows")
521+
}
522+
523+
// This is the critical test: we should receive multiple heartbeats
524+
// Without the fix (missing for loop), we would only get 1 heartbeat
525+
// With the fix, we should get at least 3 heartbeats
526+
if heartbeatCount < 3 {
527+
t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount)
528+
}
529+
}
530+
452531
func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery string, wantStream []string) {
453532
t.Helper()
454533

0 commit comments

Comments
 (0)