Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
LiteFS Go Library
LiteFS Go Library [![Go Reference](https://pkg.go.dev/badge/github.com/superfly/litefs-go.svg)](https://pkg.go.dev/github.com/superfly/litefs-go)
=================

This Go library is for interacting with LiteFS features that cannot be accessed
Expand Down
55 changes: 55 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package litefs

import (
"context"
"net/http"
)

// Client is an HTTP client for communicating with a LiteFS node.
type Client struct {
// Base URL of the LiteFS cluster node.
URL string

// HTTP client to use for requests to cluster node.
HTTP *http.Client
}

// DefaultClient is a client for communicating with the default
// (localhost:20202) LiteFS node.
var DefaultClient = &Client{
URL: "http://localhost:20202",
HTTP: http.DefaultClient,
}
Comment on lines +17 to +22
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works. You could also have NewClient() return a client with defaults set.


// SubscribeEvents subscribes to events from the LiteFS node.
func (c *Client) SubscribeEvents() EventSubscription {
ctx, cancel := context.WithCancel(context.Background())

return &eventSubscription{
c: c,
ctx: ctx,
cancelCtx: cancel,
}
}

// MonitorPrimary monitors the primary status of the LiteFS cluster via the
// LiteFS node's event stream.
func (c *Client) MonitorPrimary() PrimaryMonitor {
pm := &primaryMonitor{
es: c.SubscribeEvents(),
ready: make(chan struct{}),
}

go pm.run()

return pm
}

func (c *Client) get(ctx context.Context, path string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.URL+path, nil)
if err != nil {
return nil, err
}

return c.HTTP.Do(req)
}
72 changes: 72 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package litefs

import (
"encoding/json"
"time"
)

////
// copied from litefs repo.
// modifications are commented.

const (
EventTypeInit = "init"
EventTypeTx = "tx"
EventTypePrimaryChange = "primaryChange"
)

// Event represents a generic event.
type Event struct {
Type string `json:"type"`
DB string `json:"db,omitempty"`
Data any `json:"data,omitempty"`
}

func (e *Event) UnmarshalJSON(data []byte) error {
var v eventJSON
if err := json.Unmarshal(data, &v); err != nil {
return err
}

e.Type = v.Type
e.DB = v.DB

switch v.Type {
case EventTypeInit:
e.Data = &InitEventData{}
case EventTypeTx:
e.Data = &TxEventData{}
case EventTypePrimaryChange:
e.Data = &PrimaryChangeEventData{}
default:
e.Data = nil
}
if err := json.Unmarshal(v.Data, &e.Data); err != nil {
return err
}
return nil
}

type eventJSON struct {
Type string `json:"type"`
DB string `json:"db,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}

type InitEventData struct {
IsPrimary bool `json:"isPrimary"`
Hostname string `json:"hostname,omitempty"`
}

type TxEventData struct {
TXID string `json:"txID"` // ltx.TXID
PostApplyChecksum string `json:"postApplyChecksum"` // ltx.Checksum
PageSize uint32 `json:"pageSize"`
Commit uint32 `json:"commit"`
Timestamp time.Time `json:"timestamp"`
}

type PrimaryChangeEventData struct {
IsPrimary bool `json:"isPrimary"`
Hostname string `json:"hostname,omitempty"`
}
88 changes: 88 additions & 0 deletions event_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package litefs

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
)

var (
ErrClosed = errors.New("closed EventSubscription")
errUnexpectedStatus = errors.New("unexpected status")
)

// EventSubscription monitors a LiteFS node for published events.
type EventSubscription interface {
// Next attempts to read the next event from the LiteFS node. An error is
// returned if the request fails. Calling `Next()` again after an error will
// initiate a new HTTP request. ErrClosed is returned if the EventSubscription
// is closed while this method is blocking.
Next() (*Event, error)

// Close aborts any in-progress requests to the LiteFS node.
Close()
}
Comment on lines +17 to +27
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was better just having the regular struct. I'm not a huge fan of interfaces unless you really need them. I don't see users mocking this out and if they do, it's pretty easy to mock out the actual HTTP endpoint.


type eventSubscription struct {
c *Client
ctx context.Context
d *json.Decoder
cancelCtx func()
closeBody func() error
m sync.Mutex
}

var _ EventSubscription = (*eventSubscription)(nil)

func (es *eventSubscription) Next() (*Event, error) {
e, err := es.next()
if errors.Is(err, context.Canceled) || errors.Is(err, http.ErrBodyReadAfterClose) {
err = ErrClosed
}
return e, err
}

func (es *eventSubscription) next() (*Event, error) {
es.m.Lock()
defer es.m.Unlock()

if es.d == nil {
resp, err := es.c.get(es.ctx, "/events")
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("%w: %d", errUnexpectedStatus, resp.StatusCode)
}

es.closeBody = resp.Body.Close
es.d = json.NewDecoder(resp.Body)
}

var e Event
if err := es.d.Decode(&e); err != nil {
es.closeBody()
es.d = nil

return nil, err
}

return &e, nil
}

func (es *eventSubscription) Close() {
es.cancelCtx()

es.m.Lock()
defer es.m.Unlock()

if es.d != nil {
es.closeBody()
es.d = nil
}
}
172 changes: 172 additions & 0 deletions event_subscription_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package litefs

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
)

func ExampleSubscribeEvents() {
// setup fake events endpoint
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"type":"init","data":{"isPrimary":true,"hostname":"node-1"}}`)
fmt.Fprintln(w, `{"type":"primaryChange","data":{"isPrimary":false,"hostname":"node-2"}}`)
}))
defer server.Close()
client := &Client{URL: server.URL, HTTP: server.Client()}

subscriber := client.SubscribeEvents()
defer subscriber.Close()

for {
event, err := subscriber.Next()
if err != nil {
fmt.Println(err)
return
}

switch data := event.Data.(type) {
case *InitEventData:
fmt.Printf("init: isPrimary=%t hostname=%s\n", data.IsPrimary, data.Hostname)
case *PrimaryChangeEventData:
fmt.Printf("primary change: isPrimary=%t hostname=%s\n", data.IsPrimary, data.Hostname)
case *TxEventData:
fmt.Printf("tx: %s\n", data.TXID)
}
}

// Output: init: isPrimary=true hostname=node-1
// primary change: isPrimary=false hostname=node-2
// EOF
}

func TestEventStream(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
es := mockServerSubscription(t,
initEventJSON, flush, sleep10,
txEventJSON, flush, sleep10,
pChangeNode2EventJSON,
)

assertReadEvent(t, es, initEvent)
assertReadEvent(t, es, txEvent)
assertReadEvent(t, es, pChangeNode2Event)
})

t.Run("error status", func(t *testing.T) {
es := mockServerSubscription(t,
status500,
initEventJSON, flush, sleep10,
)

if _, err := es.Next(); !errors.Is(err, errUnexpectedStatus) {
t.Fatalf("expected errUnexpectedStatus, got %v", err)
}

assertReadEvent(t, es, initEvent)
})

t.Run("premature hangup", func(t *testing.T) {
es := mockServerSubscription(t,
initEventJSON, flush, sleep10,
hangup,
initEventJSON, flush, sleep10,
)

assertReadEvent(t, es, initEvent)

if _, err := es.Next(); err.Error() != "unexpected EOF" {
t.Fatalf("expected EOF, got %v", err)
}

assertReadEvent(t, es, initEvent)
})

t.Run("bad response", func(t *testing.T) {
es := mockServerSubscription(t,
"beep boop", flush, sleep10,
initEventJSON, flush, sleep10,
)

jerr := new(json.SyntaxError)
if _, err := es.Next(); !errors.As(err, &jerr) {
t.Fatalf("expected json.SyntaxError, got %v", err)
}

assertReadEvent(t, es, initEvent)
})
}

const (
status500 = "status500"
hangup = "hangup"
sleep10 = "sleep10"
flush = "flush"
initEventJSON = `{"type":"init","data":{"isPrimary":true,"hostname":"node-1"}}`
txEventJSON = `{"type":"tx","db":"db","data":{"txID":"0000000000000027","postApplyChecksum":"83b05248774ce767","pageSize":4096,"commit":2,"timestamp":"0001-01-01T00:00:00Z"}}`
pChangeNode2EventJSON = `{"type":"primaryChange","data":{"isPrimary":false,"hostname":"node-2"}}`
pChangeNode1EventJSON = `{"type":"primaryChange","data":{"isPrimary":true,"hostname":"node-1"}}`
)

var (
initEvent = &Event{Type: EventTypeInit, Data: &InitEventData{IsPrimary: true, Hostname: "node-1"}}
txEvent = &Event{Type: EventTypeTx, DB: "db", Data: &TxEventData{TXID: "0000000000000027", PostApplyChecksum: "83b05248774ce767", PageSize: 4096, Commit: 2}}
pChangeNode2Event = &Event{Type: EventTypePrimaryChange, Data: &PrimaryChangeEventData{IsPrimary: false, Hostname: "node-2"}}
)

func mockServerSubscription(t *testing.T, resps ...string) EventSubscription {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for len(resps) != 0 {
if r.Context().Err() != nil {
return
}

resp := resps[0]
resps = resps[1:]

switch resp {
case status500:
w.WriteHeader(http.StatusInternalServerError)
return
case hangup:
conn, _, _ := w.(http.Hijacker).Hijack()
conn.Close()
return
case sleep10:
time.Sleep(10 * time.Millisecond)
case flush:
w.(http.Flusher).Flush()
default:
fmt.Fprintln(w, resp)
}
}
}))
t.Cleanup(s.Close)

client := &Client{
URL: s.URL,
HTTP: s.Client(),
}

es := client.SubscribeEvents()
t.Cleanup(es.Close)

return es
}

func assertReadEvent(t *testing.T, es EventSubscription, expected *Event) {
t.Helper()

event, err := es.Next()
switch {
case err != nil:
t.Fatalf("unexpected error: %s", err)
case !reflect.DeepEqual(event, expected):
t.Fatalf("wrong event\nexpected: %#v\nactual:%#v", expected, event)
}
}
Loading