Skip to content

Commit bdbbc1f

Browse files
committed
feat: support sse transfer mode #117
1 parent 5cd7e6a commit bdbbc1f

File tree

8 files changed

+186
-110
lines changed

8 files changed

+186
-110
lines changed

_example/default/go.mod

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@ go 1.19
44

55
require github.com/arl/statsviz v0.6.0
66

7-
require github.com/gorilla/websocket v1.5.0 // indirect
8-
97
replace github.com/arl/statsviz => ../../

_example/default/go.sum

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
2-
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
1+
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
32
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
4-
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
3+
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
4+
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
55
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ module github.com/arl/statsviz
33
go 1.20
44

55
require (
6-
github.com/gorilla/websocket v1.5.0
76
github.com/rogpeppe/go-internal v1.11.0
87
)
98

109
require (
11-
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
10+
golang.org/x/net v0.17.0 // indirect
11+
golang.org/x/sys v0.13.0 // indirect
1212
golang.org/x/tools v0.1.12 // indirect
1313
)

go.sum

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
2-
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
1+
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
2+
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
33
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
44
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
5-
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
6-
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5+
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
6+
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
7+
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
8+
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
79
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
810
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=

internal/static/js/app.js

Lines changed: 74 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,6 @@ import * as plot from "./plot.js";
33
import * as theme from "./theme.js";
44
import PlotsDef from './plotsdef.js';
55

6-
const buildWebsocketURI = () => {
7-
var loc = window.location,
8-
ws_prot = "ws:";
9-
if (loc.protocol === "https:") {
10-
ws_prot = "wss:";
11-
}
12-
return ws_prot + "//" + loc.host + loc.pathname + "ws"
13-
}
14-
156
const dataRetentionSeconds = 600;
167
var timeout = 250;
178

@@ -26,38 +17,45 @@ let paused = false;
2617
let show_gc = true;
2718
let timerange = 60;
2819

29-
/* WebSocket connection handling */
30-
const connect = () => {
31-
const uri = buildWebsocketURI();
32-
let ws = new WebSocket(uri);
33-
console.info(`Attempting websocket connection to server at ${uri}`);
34-
35-
ws.onopen = () => {
20+
const dataProcessor = {
21+
initDone: false,
22+
close: () => {
23+
},
24+
connected: false,
25+
retrying: false,
26+
onopen: () => {
27+
dataProcessor.initDone = false;
28+
dataProcessor.connected = true;
3629
console.info("Successfully connected");
3730
timeout = 250; // reset connection timeout for next time
38-
};
39-
40-
ws.onclose = event => {
41-
console.error(`Closed websocket connection: code ${event.code}`);
42-
setTimeout(connect, clamp(timeout += timeout, 250, 5000));
43-
};
44-
45-
ws.onerror = err => {
46-
console.error(`Websocket error, closing connection.`);
47-
ws.close();
48-
};
49-
50-
let initDone = false;
51-
ws.onmessage = event => {
31+
},
32+
onclose: event => {
33+
dataProcessor.connected = false;
34+
console.error(`Closed connection: code ${event.code || event}`);
35+
if (dataProcessor.retrying) {
36+
return
37+
}
38+
dataProcessor.retrying = true
39+
setTimeout(() => {
40+
connect()
41+
dataProcessor.retrying = false
42+
}, clamp(timeout += timeout, 250, 5000));
43+
},
44+
onerror: err => {
45+
console.error(`error, closing connection.`, err);
46+
dataProcessor.close();
47+
},
48+
onmessage: event => {
5249
let data = JSON.parse(event.data)
53-
54-
if (!initDone) {
50+
if (!dataProcessor.initDone) {
5551
configurePlots(PlotsDef);
5652
stats.init(PlotsDef, dataRetentionSeconds);
5753

5854
attachPlots();
5955

60-
$('#play_pause').change(() => { paused = !paused; });
56+
$('#play_pause').change(() => {
57+
paused = !paused;
58+
});
6159
$('#show_gc').change(() => {
6260
show_gc = !show_gc;
6361
updatePlots();
@@ -67,16 +65,27 @@ const connect = () => {
6765
timerange = val;
6866
updatePlots();
6967
});
70-
initDone = true;
71-
return;
68+
dataProcessor.initDone = true;
7269
}
73-
70+
dataProcessor.onData(data);
71+
},
72+
onData: data => {
7473
stats.pushData(data);
75-
if (paused) {
74+
if (paused || !dataProcessor.connected) {
7675
return
7776
}
78-
updatePlots(PlotsDef.events);
77+
updatePlots()
78+
}
79+
}
80+
/* WebSocket connection handling */
81+
const connect = () => {
82+
const url = window.location.pathname + "ws";
83+
const eventSource = new EventSource(url);
84+
console.info(`Attempting sse connection to server at ${url}`);
85+
for (let event in dataProcessor) {
86+
eventSource[event] = dataProcessor[event];
7987
}
88+
dataProcessor.close = eventSource.close
8089
}
8190

8291
connect();
@@ -102,7 +111,31 @@ const attachPlots = () => {
102111
}
103112
}
104113

105-
const updatePlots = () => {
114+
function throttle(func, delay) {
115+
let initial = true;
116+
let last = null;
117+
let timer = null;
118+
return function () {
119+
const context = this;
120+
const args = arguments;
121+
if (initial) {
122+
func.apply(context, args);
123+
initial = false;
124+
last = Date.now();
125+
} else {
126+
clearTimeout(timer);
127+
timer = setTimeout(function () {
128+
const now = Date.now();
129+
if (now - last >= delay) {
130+
func.apply(context, args);
131+
last = now;
132+
}
133+
}, delay - (Date.now() - last));
134+
}
135+
}
136+
}
137+
138+
const updatePlots = throttle(() => {
106139
// Create shapes.
107140
let shapes = new Map();
108141

@@ -123,7 +156,7 @@ const updatePlots = () => {
123156
plot.update(xrange, data, shapes);
124157
}
125158
});
126-
}
159+
}, PlotsDef.sendFrequency||1000)
127160

128161
const updatePlotsLayout = () => {
129162
plots.forEach(plot => {
@@ -139,7 +172,7 @@ theme.updateThemeMode();
139172
$('#color_theme_sw').change(() => {
140173
const themeMode = theme.getThemeMode();
141174
const newTheme = themeMode === "dark" && "light" || "dark";
142-
localStorage.setItem("theme-mode", newTheme);
175+
localStorage.setItem("theme-mode", newTheme);
143176
theme.updateThemeMode();
144177
updatePlotsLayout();
145178
});

internal/static/js/stats.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,4 @@ const slice = (n) => {
9292
return sliced;
9393
}
9494

95-
export { init, pushData, slice };
95+
export { init, pushData, slice };

statsviz.go

Lines changed: 68 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,14 @@ import (
4343
"bytes"
4444
"encoding/json"
4545
"fmt"
46+
"io"
4647
"net/http"
4748
"os"
4849
"path/filepath"
4950
"strconv"
5051
"strings"
5152
"time"
5253

53-
"github.com/gorilla/websocket"
54-
5554
"github.com/arl/statsviz/internal/plot"
5655
"github.com/arl/statsviz/internal/static"
5756
)
@@ -82,7 +81,7 @@ func Register(mux *http.ServeMux, opts ...Option) error {
8281
// updates metrics data and provides two essential HTTP handlers:
8382
// - the Index handler serves Statsviz user interface, allowing you to
8483
// visualize runtime metrics on your browser.
85-
// - The Ws handler establishes a WebSocket connection allowing the connected
84+
// - The Ws handler establishes a data connection allowing the connected
8685
// browser to receive metrics updates from the server.
8786
//
8887
// The zero value is not a valid Server, use NewServer to create a valid one.
@@ -161,20 +160,34 @@ func (s *Server) Register(mux *http.ServeMux) {
161160
// intercept is a middleware that intercepts requests for plotsdef.js, which is
162161
// generated dynamically based on the plots configuration. Other requests are
163162
// forwarded as-is.
164-
func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc {
165-
buf := bytes.Buffer{}
166-
buf.WriteString("export default ")
167-
enc := json.NewEncoder(&buf)
168-
enc.SetIndent("", " ")
169-
if err := enc.Encode(cfg); err != nil {
170-
panic("unexpected failure to encode plot definitions: " + err.Error())
163+
func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc {
164+
var plotsdefjs []byte
165+
//Using parentheses helps gc
166+
{
167+
buf := bytes.Buffer{}
168+
buf.WriteString("export default ")
169+
enc := json.NewEncoder(&buf)
170+
enc.SetIndent("", " ")
171+
var encodeValue any = cfg
172+
if len(extraConfig) > 0 {
173+
encodeValue1 := map[string]any{
174+
"series": cfg.Series,
175+
"events": cfg.Events,
176+
}
177+
for k, v := range extraConfig {
178+
encodeValue1[k] = v
179+
}
180+
encodeValue = encodeValue1
181+
}
182+
if err := enc.Encode(encodeValue); err != nil {
183+
panic("unexpected failure to encode plot definitions: " + err.Error())
184+
}
185+
buf.WriteString(";")
186+
plotsdefjs = buf.Bytes()
171187
}
172-
buf.WriteString(";")
173-
plotsdefjs := buf.Bytes()
174-
175188
return func(w http.ResponseWriter, r *http.Request) {
176189
if r.URL.Path == "js/plotsdef.js" {
177-
w.Header().Add("Content-Length", strconv.Itoa(buf.Len()))
190+
w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs)))
178191
w.Header().Add("Content-Type", "text/javascript; charset=utf-8")
179192
w.Write(plotsdefjs)
180193
return
@@ -228,7 +241,9 @@ func assetsFS() http.FileSystem {
228241
func (s *Server) Index() http.HandlerFunc {
229242
prefix := strings.TrimSuffix(s.root, "/") + "/"
230243
assets := http.FileServer(assetsFS())
231-
handler := intercept(assets, s.plots.Config())
244+
handler := intercept(assets, s.plots.Config(), map[string]any{
245+
"sendFrequency": s.intv.Milliseconds(),
246+
})
232247

233248
return http.StripPrefix(prefix, handler).ServeHTTP
234249
}
@@ -238,46 +253,51 @@ func (s *Server) Index() http.HandlerFunc {
238253
// connection to the WebSocket protocol.
239254
func (s *Server) Ws() http.HandlerFunc {
240255
return func(w http.ResponseWriter, r *http.Request) {
241-
var upgrader = websocket.Upgrader{
242-
ReadBufferSize: 1024,
243-
WriteBufferSize: 1024,
244-
}
245-
246-
ws, err := upgrader.Upgrade(w, r, nil)
247-
if err != nil {
256+
if strings.Contains(r.Header.Get("Accept"), "/event-stream") {
257+
// If the connection is initiated by an already open web UI
258+
// (started by a previous process, for example), then plotsdef.js won't be
259+
// requested. Call plots.Config() manually to ensure that s.plots internals
260+
// are correctly initialized.
261+
s.plots.Config()
262+
263+
w.Header().Set("Content-Type", "text/event-stream")
264+
w.Header().Set("Cache-Control", "no-cache")
265+
w.Header().Set("Connection", "keep-alive")
266+
s.startTransfer(w)
248267
return
249268
}
250-
defer ws.Close()
251-
252-
// Ignore this error. This happens when the other end connection closes,
253-
// for example. We can't handle it in any meaningful way anyways.
254-
_ = s.sendStats(ws, s.intv)
269+
w.WriteHeader(http.StatusBadRequest)
270+
w.Write([]byte("This endpoint only supports text/event-stream requests"))
255271
}
256272
}
257273

258-
// sendStats sends runtime statistics over the WebSocket connection.
259-
func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error {
260-
tick := time.NewTicker(frequency)
261-
defer tick.Stop()
262-
263-
// If the WebSocket connection is initiated by an already open web UI
264-
// (started by a previous process, for example), then plotsdef.js won't be
265-
// requested. Call plots.Config() manually to ensure that s.plots internals
266-
// are correctly initialized.
267-
s.plots.Config()
268-
269-
for range tick.C {
270-
w, err := conn.NextWriter(websocket.TextMessage)
271-
if err != nil {
274+
func (s *Server) startTransfer(w io.Writer) {
275+
buffer := bytes.Buffer{}
276+
buffer.WriteString("data: ")
277+
callData := func() error {
278+
if err := s.plots.WriteValues(&buffer); err == nil {
279+
_, err = w.Write(buffer.Bytes())
280+
if err != nil {
281+
return err
282+
}
283+
if f, ok := w.(http.Flusher); ok {
284+
f.Flush()
285+
}
286+
} else {
272287
return err
273288
}
274-
if err := s.plots.WriteValues(w); err != nil {
275-
return err
276-
}
277-
if err := w.Close(); err != nil {
278-
return err
289+
return nil
290+
}
291+
//the first time it was sent immediately
292+
err := callData()
293+
if err != nil {
294+
return
295+
}
296+
tick := time.NewTicker(s.intv)
297+
defer tick.Stop()
298+
for range tick.C {
299+
if callData() != nil {
300+
return
279301
}
280302
}
281-
282-
panic("unreachable")
283303
}

0 commit comments

Comments
 (0)