Skip to content

Commit 89e614f

Browse files
[release-22.0] [Bugfix] Broken Heartbeat system in Row Streamer (#18390) (#18398)
Signed-off-by: siddharth16396 <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
1 parent 416646c commit 89e614f

File tree

2 files changed

+80
-5
lines changed

2 files changed

+80
-5
lines changed

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,13 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
392392
heartbeatTicker := time.NewTicker(rowStreamertHeartbeatInterval)
393393
defer heartbeatTicker.Stop()
394394
go func() {
395-
select {
396-
case <-rs.ctx.Done():
397-
return
398-
case <-heartbeatTicker.C:
399-
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
395+
for {
396+
select {
397+
case <-rs.ctx.Done():
398+
return
399+
case <-heartbeatTicker.C:
400+
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
401+
}
400402
}
401403
}()
402404

go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"regexp"
2323
"strconv"
2424
"testing"
25+
"time"
2526

2627
"github.com/stretchr/testify/require"
2728

@@ -542,6 +543,78 @@ func TestStreamRowsCancel(t *testing.T) {
542543
}
543544
}
544545

546+
func TestStreamRowsHeartbeat(t *testing.T) {
547+
if testing.Short() {
548+
t.Skip()
549+
}
550+
551+
// Save original heartbeat interval and restore it after test
552+
originalInterval := rowStreamertHeartbeatInterval
553+
defer func() {
554+
rowStreamertHeartbeatInterval = originalInterval
555+
}()
556+
557+
// Set a very short heartbeat interval for testing (100ms)
558+
rowStreamertHeartbeatInterval = 10 * time.Millisecond
559+
560+
execStatements(t, []string{
561+
"create table t1(id int, val varchar(128), primary key(id))",
562+
"insert into t1 values (1, 'test1')",
563+
"insert into t1 values (2, 'test2')",
564+
"insert into t1 values (3, 'test3')",
565+
"insert into t1 values (4, 'test4')",
566+
"insert into t1 values (5, 'test5')",
567+
})
568+
569+
defer execStatements(t, []string{
570+
"drop table t1",
571+
})
572+
573+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
574+
defer cancel()
575+
576+
heartbeatCount := 0
577+
dataReceived := false
578+
579+
var options binlogdatapb.VStreamOptions
580+
options.ConfigOverrides = make(map[string]string)
581+
options.ConfigOverrides["vstream_dynamic_packet_size"] = "false"
582+
options.ConfigOverrides["vstream_packet_size"] = "10"
583+
584+
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
585+
if rows.Heartbeat {
586+
heartbeatCount++
587+
// After receiving at least 3 heartbeats, we can be confident the fix is working
588+
if heartbeatCount >= 3 {
589+
cancel()
590+
return nil
591+
}
592+
} else if len(rows.Rows) > 0 {
593+
dataReceived = true
594+
}
595+
// Add a small delay to allow heartbeats to be sent
596+
time.Sleep(50 * time.Millisecond)
597+
return nil
598+
}, &options)
599+
600+
// We expect context canceled error since we cancel after receiving heartbeats
601+
if err != nil && err.Error() != "stream ended: context canceled" {
602+
t.Errorf("unexpected error: %v", err)
603+
}
604+
605+
// Verify we received data
606+
if !dataReceived {
607+
t.Error("expected to receive data rows")
608+
}
609+
610+
// This is the critical test: we should receive multiple heartbeats
611+
// Without the fix (missing for loop), we would only get 1 heartbeat
612+
// With the fix, we should get at least 3 heartbeats
613+
if heartbeatCount < 3 {
614+
t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount)
615+
}
616+
}
617+
545618
func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery string, wantStream []string) {
546619
t.Helper()
547620

0 commit comments

Comments
 (0)