Skip to content

Commit c37c30c

Browse files
authored
Fix ChunkStream buffer-reuse race causing corrupted stores (folbricht#314)
1 parent e4c5925 commit c37c30c

3 files changed

Lines changed: 57 additions & 1 deletion

File tree

chunker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ func (c *Chunker) fillBuffer() (n int, err error) {
202202
}
203203

204204
// Next returns the starting position as well as the chunk data. Returns
205-
// an empty byte slice when complete
205+
// an empty byte slice when complete. The returned byte slice is only valid
206+
// until the next call to Next; callers that pass the slice to other
207+
// goroutines must copy it first.
206208
func (c *Chunker) Next() (uint64, []byte, error) {
207209
if len(c.buf) < int(c.max) {
208210
n, err := c.fillBuffer()

index.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto"
77
"fmt"
88
"math"
9+
"slices"
910
"sync"
1011

1112
"golang.org/x/sync/errgroup"
@@ -192,6 +193,12 @@ loop:
192193
break
193194
}
194195

196+
// Copy the buffer before sending it to workers. The chunker
197+
// reuses its internal backing buffer across fillBuffer() calls,
198+
// so the slice returned by Next() may be overwritten by the
199+
// next call.
200+
b = slices.Clone(b)
201+
195202
// Send it off for compression and storage
196203
select {
197204
case <-ctx.Done():

index_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package desync
33
import (
44
"bytes"
55
"context"
6+
"encoding/binary"
67
"os"
78
"reflect"
89
"testing"
@@ -144,6 +145,52 @@ func TestIndexChunking(t *testing.T) {
144145
}
145146
}
146147

148+
// TestChunkStreamIntegrity verifies that chunks stored by ChunkStream match
149+
// their IDs. This catches buffer-reuse races where the chunker's internal
150+
// backing buffer is overwritten by fillBuffer() before workers finish
151+
// processing the slice sent through the channel. The input must be large
152+
// enough to trigger multiple fillBuffer() calls (> 10*ChunkSizeMaxDefault).
153+
func TestChunkStreamIntegrity(t *testing.T) {
154+
// 8MB of pseudo-random data ensures multiple fillBuffer() cycles.
155+
// The chunker's internal buffer is 10*max = 2.5MB, so this triggers
156+
// at least 3 refills and produces ~125 chunks at 64KB average.
157+
data := make([]byte, 8*1024*1024)
158+
for i := range len(data) / 8 {
159+
binary.LittleEndian.PutUint64(data[i*8:], uint64(i)*0x9E3779B97F4A7C15)
160+
}
161+
162+
c, err := NewChunker(bytes.NewReader(data), ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault)
163+
if err != nil {
164+
t.Fatal(err)
165+
}
166+
167+
dir := t.TempDir()
168+
s, err := NewLocalStore(dir, StoreOptions{})
169+
if err != nil {
170+
t.Fatal(err)
171+
}
172+
173+
index, err := ChunkStream(context.Background(), c, s, 10)
174+
if err != nil {
175+
t.Fatal(err)
176+
}
177+
178+
// Verify every stored chunk: read it back, decompress, and check
179+
// that its content hashes to the expected ChunkID. GetChunk with
180+
// SkipVerify=false (the default) returns ChunkInvalid on mismatch.
181+
var corrupted int
182+
for i, chunk := range index.Chunks {
183+
_, err := s.GetChunk(chunk.ID)
184+
if err != nil {
185+
corrupted++
186+
t.Errorf("chunk %d (%s): %v", i, chunk.ID, err)
187+
}
188+
}
189+
if corrupted > 0 {
190+
t.Fatalf("%d of %d chunks are corrupted", corrupted, len(index.Chunks))
191+
}
192+
}
193+
147194
// Global var to store benchmark output
148195
var idx Index
149196

0 commit comments

Comments
 (0)