Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ cd media
go test -race
cd ..

cd trickle
go test -race -timeout 10s
cd ..

./test_args.sh

printf "\n\nAll Tests Passed\n\n"
5 changes: 4 additions & 1 deletion trickle/local_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
}
c.mu.Lock()
defer c.mu.Unlock()
segment, latestSeq, exists := stream.getForRead(c.seq)
segment, latestSeq, exists, closed := stream.getForRead(c.seq)

Check warning on line 42 in trickle/local_subscriber.go

View check run for this annotation

Codecov / codecov/patch

trickle/local_subscriber.go#L42

Added line #L42 was not covered by tests
if !exists {
if closed {
return nil, EOS
}

Check warning on line 46 in trickle/local_subscriber.go

View check run for this annotation

Codecov / codecov/patch

trickle/local_subscriber.go#L44-L46

Added lines #L44 - L46 were not covered by tests
return nil, errors.New("seq not found")
}
c.seq++
Expand Down
13 changes: 9 additions & 4 deletions trickle/trickle_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"sync"
)

var StreamNotFoundErr = errors.New("stream not found")

Check warning on line 13 in trickle/trickle_publisher.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

error var StreamNotFoundErr should have name of the form ErrFoo

// TricklePublisher represents a trickle streaming client
type TricklePublisher struct {
Expand Down Expand Up @@ -75,23 +75,30 @@
return nil, err
}
req.Header.Set("Content-Type", c.contentType)
httpclient := c.client

// Start the POST request in a background goroutine
go func() {
resp, err := c.client.Do(req)
resp, err := httpclient.Do(req)
if err != nil {
slog.Error("Failed to complete POST for segment", "url", url, "err", err)
errCh <- err
return
}
isEOS := resp.Header.Get("Lp-Trickle-Closed") != ""
body, err := io.ReadAll(resp.Body)
if err != nil {
if err != nil && !isEOS {
slog.Error("Error reading body", "url", url, "err", err)
errCh <- err
return
}
defer resp.Body.Close()

if isEOS {
errCh <- EOS
return
}

if resp.StatusCode != http.StatusOK {
slog.Error("Failed POST segment", "url", url, "status_code", resp.StatusCode, "msg", string(body))
if resp.StatusCode == http.StatusNotFound {
Expand Down Expand Up @@ -145,7 +152,6 @@
if pp == nil {
p, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return nil, err
}
pp = p
Expand All @@ -154,7 +160,6 @@
// Set up the next connection
nextPost, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return nil, err
}
c.pendingPost = nextPost
Expand Down
130 changes: 96 additions & 34 deletions trickle/trickle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
nextWrite int
writeTime time.Time
closed bool
canReset bool
}

type Segment struct {
Expand All @@ -64,6 +65,9 @@
cond *sync.Cond
buffer *bytes.Buffer
closed bool

// to shut down any pending publishers
closeCh chan bool
}

type SegmentSubscriber struct {
Expand Down Expand Up @@ -159,6 +163,7 @@
name: streamName,
mimeType: mimeType,
writeTime: time.Now(),
canReset: !isLocal,
}
sm.streams[streamName] = stream
slog.Info("Creating stream", "stream", streamName)
Expand Down Expand Up @@ -294,6 +299,7 @@
func (sm *Server) handlePost(w http.ResponseWriter, r *http.Request) {
stream := sm.getOrCreateStream(r.PathValue("streamName"), r.Header.Get("Content-Type"), false)
if stream == nil {
w.Header().Set("Connection", "close") // Wakes up gotrickle preconnects
http.Error(w, "Stream not found", http.StatusNotFound)
return
}
Expand All @@ -306,22 +312,25 @@
}

type timeoutReader struct {
body io.Reader
body io.ReadCloser
timeout time.Duration
firstByteRead bool
readStarted bool
doneCh chan int
errCh chan error
ch chan struct {
n int
err error
}
closeCh chan bool
skipClose bool
}

func (tr *timeoutReader) startRead(p []byte) {
go func() {
n, err := tr.body.Read(p)
if err != nil {
tr.errCh <- err
return
}
tr.doneCh <- n
tr.ch <- struct {
n int
err error
}{n, err}
}()
}

Expand All @@ -333,35 +342,38 @@

// we only want to start the reader once
if !tr.readStarted {
tr.errCh = make(chan error, 1)
tr.doneCh = make(chan int, 1)
tr.ch = make(chan struct {
n int
err error
}, 1)
tr.readStarted = true
go tr.startRead(p)
}

select {
case err := <-tr.errCh:
return 0, err
case n := <-tr.doneCh:
if n > 0 {
case res := <-tr.ch:
if res.n > 0 {
tr.firstByteRead = true
}
return n, nil
return res.n, res.err
case <-tr.closeCh:
// Signals preconnected publishers that are waiting
return 0, io.EOF
case <-time.After(tr.timeout):
return 0, FirstByteTimeout
}
}

func (tr *timeoutReader) Close() error {
if tr.skipClose {
return nil
}
return tr.body.Close()
}

// Handle post requests for a given index
func (s *Stream) handlePost(w http.ResponseWriter, r *http.Request, idx int) {
segment, exists := s.getForWrite(idx)
if exists {
slog.Warn("Overwriting existing entry", "idx", idx)
// Overwrite anything that exists now. TODO figure out a safer behavior?
// TODO fix concurrent writes to the same segment; would be very bad
segment.buffer.Reset()
segment.closed = false
}
segment, _ := s.getForWrite(idx)

// Wrap the request body with the custom timeoutReader so we can send
// provisional headers (keepalives) until receiving the first byte
Expand All @@ -370,8 +382,9 @@
// This can't be too short for now but ideally it'd be like 1 second
// https://github.com/golang/go/issues/65035
timeout: 10 * time.Second,
closeCh: segment.closeCh,
}
defer r.Body.Close()
defer reader.Close()

buf := make([]byte, 1024*32) // 32kb to begin with
totalRead := 0
Expand All @@ -398,6 +411,21 @@
w.WriteHeader(http.StatusContinue)
continue
} else if err == io.EOF {
// Usually this comes from a preconnect where the underlying channel is closed
if totalRead <= 0 {
s.mutex.Lock()
isClosed := s.closed
s.mutex.Unlock()
if isClosed {
w.Header().Set("Lp-Trickle-Closed", "terminated")
}
w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK)
// we have read nothing; don't attempt to read anything more
// body.Close() will read until EOF and we don't want that
// without this, body.Close() may hang under some scenarios
reader.skipClose = true
}
break
}
slog.Info("Error reading POST body", "stream", s.name, "idx", idx, "bytes written", totalRead, "err", err)
Expand All @@ -420,6 +448,13 @@
segmentPos := idx % maxSegmentsPerStream
if segment := s.segments[segmentPos]; segment != nil {
if idx == segment.idx {
if s.canReset {
reset := segment.reset()
if reset > 0 {
slog.Warn("Reset an existing segment", "stream", s.name, "idx", idx, "bytes", reset)
}
return segment, reset > 0
}
return segment, !segment.isFresh()
}
// something exists here but its not the expected segment
Expand All @@ -431,7 +466,7 @@
return segment, false
}

func (s *Stream) getForRead(idx int) (*Segment, int, bool) {
func (s *Stream) getForRead(idx int) (*Segment, int, bool, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
exists := func(seg *Segment, i int) bool {
Expand All @@ -450,14 +485,14 @@
}
segmentPos := idx % maxSegmentsPerStream
segment := s.segments[segmentPos]
if !exists(segment, idx) && (idx == s.nextWrite || (s.nextWrite == 0 && idx == 1)) {
if !exists(segment, idx) && (idx == s.nextWrite || (s.nextWrite == 0 && idx == 1)) && !s.closed {
// read request is just a little bit ahead of write head
segment = newSegment(idx)
s.segments[segmentPos] = segment
slog.Info("GET precreating", "stream", s.name, "idx", idx, "next", s.nextWrite)
}
slog.Info("GET segment", "stream", s.name, "idx", idx, "next", s.nextWrite, "exists?", exists(segment, idx))
return segment, s.nextWrite, exists(segment, idx)
return segment, s.nextWrite, exists(segment, idx), s.closed
}

func (sm *Server) handleGet(w http.ResponseWriter, r *http.Request) {
Expand All @@ -475,12 +510,16 @@
}

func (s *Stream) handleGet(w http.ResponseWriter, r *http.Request, idx int) {
segment, latestSeq, exists := s.getForRead(idx)
segment, latestSeq, exists, closed := s.getForRead(idx)
if !exists {
// Special status to indicate "stream exists but segment doesn't"
w.Header().Set("Lp-Trickle-Latest", strconv.Itoa(latestSeq))
w.Header().Set("Lp-Trickle-Seq", strconv.Itoa(idx))
w.WriteHeader(470)
if closed {
w.Header().Set("Lp-Trickle-Closed", "terminated")
} else {
// Special status to indicate "stream exists but segment doesn't"
w.WriteHeader(470)
}

Check warning on line 522 in trickle/trickle_server.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_server.go#L517-L522

Added lines #L517 - L522 were not covered by tests
w.Write([]byte("Entry not found"))
return
}
Expand Down Expand Up @@ -556,10 +595,11 @@
func newSegment(idx int) *Segment {
mu := &sync.Mutex{}
return &Segment{
idx: idx,
buffer: new(bytes.Buffer),
cond: sync.NewCond(mu),
mutex: mu,
idx: idx,
buffer: new(bytes.Buffer),
cond: sync.NewCond(mu),
mutex: mu,
closeCh: make(chan bool),
}
}

Expand Down Expand Up @@ -605,9 +645,31 @@
defer s.mutex.Unlock()
if !s.closed {
s.closed = true
close(s.closeCh)
s.cond.Broadcast()
}
}

func (s *Segment) reset() int {
s.mutex.Lock()
defer s.mutex.Unlock()
blen := s.buffer.Len()
if blen <= 0 {
return blen
}

Check warning on line 659 in trickle/trickle_server.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_server.go#L658-L659

Added lines #L658 - L659 were not covered by tests
// Reset the segment *without* kicking off any readers
// TODO this is not a great approach but it is what we do for now
// TODO concurrent writes to the same segment would be pretty bad
if !s.closed {
close(s.closeCh)
}
// Kick off any writers
s.closeCh = make(chan bool, 1)
s.closed = false
s.buffer.Reset()
return blen
}

func (s *Segment) isFresh() bool {
// fresh segments have not been written to yet
s.mutex.Lock()
Expand Down
Loading
Loading