Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@ require (
github.com/go-redis/redis/v7 v7.4.0
github.com/hashicorp/go-multierror v1.1.0
github.com/influxdata/influxdb1-client v0.0.0-20200515024757-02f0bf5dbca3
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/stretchr/testify v1.5.1
go.uber.org/zap v1.15.0
golang.org/x/net v0.0.0-20191112182307-2180aed22343 // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f // indirect
google.golang.org/protobuf v1.25.0 // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
)
52 changes: 2 additions & 50 deletions go.sum

Large diffs are not rendered by default.

31 changes: 24 additions & 7 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,47 @@ const (
)

type Client struct {
runenv *runtime.RunEnv
client sync.Client
runenv *runtime.RunEnv
syncClient sync.Client
}

// NewClient returns a new network client. Use this client to request network
// changes, such as setting latencies, jitter, packet loss, connectedness, etc.
func NewClient(client sync.Client, runenv *runtime.RunEnv) *Client {
func NewClient(syncClient sync.Client, runenv *runtime.RunEnv) *Client {
return &Client{
runenv: runenv,
client: client,
runenv: runenv,
syncClient: syncClient,
}
}

// WaitNetworkInitialized waits for the sidecar to initialize the network, if
// the sidecar is enabled. If not, it returns immediately.
func (c *Client) WaitNetworkInitialized(ctx context.Context) error {
se := &runtime.Event{StageStartEvent: &runtime.StageStartEvent{
Name: "network-initialized",
TestGroupID: c.runenv.TestGroupID,
}}
if err := c.syncClient.SignalEvent(ctx, se); err != nil {
return err
}

if c.runenv.TestSidecar {
err := <-c.client.MustBarrier(ctx, "network-initialized", c.runenv.TestInstanceCount).C
err := <-c.syncClient.MustBarrier(ctx, "network-initialized", c.runenv.TestInstanceCount).C
if err != nil {
c.runenv.RecordMessage(InitialisationFailed)
return fmt.Errorf("failed to initialize network: %w", err)
}
}
c.runenv.RecordMessage(InitialisationSuccessful)

ee := &runtime.Event{StageEndEvent: &runtime.StageEndEvent{
Name: "network-initialized",
TestGroupID: c.runenv.TestGroupID,
}}
if err := c.syncClient.SignalEvent(ctx, ee); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -81,7 +98,7 @@ func (c *Client) ConfigureNetwork(ctx context.Context, config *Config) (err erro
target = c.runenv.TestInstanceCount
}

_, err = c.client.PublishAndWait(ctx, topic, config, config.CallbackState, target)
_, err = c.syncClient.PublishAndWait(ctx, topic, config, config.CallbackState, target)
if err != nil {
err = fmt.Errorf("failed to configure network: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions run/init_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func (ic *InitContext) init(runenv *runtime.RunEnv) {
runenv: runenv,
}

runenv.AttachSyncClient(client)

runenv.RecordMessage("claimed sequence numbers; global=%d, group(%s)=%d", ic.GlobalSeq, runenv.TestGroupID, ic.GroupSeq)
}

Expand Down
12 changes: 10 additions & 2 deletions run/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ func invoke(runenv *runtime.RunEnv, fn interface{}) {

runenv.RecordStart()

var closer func()
defer func() {
if closer != nil {
closer()
}
}()

var err error
errfile, err := runenv.CreateRawAsset("run.err")
if err != nil {
Expand Down Expand Up @@ -144,7 +151,7 @@ func invoke(runenv *runtime.RunEnv, fn interface{}) {
case InitializedTestCaseFn:
ic := new(InitContext)
ic.init(runenv)
defer ic.close()
closer = ic.close // we want to close the InitContext after having calld RecordSuccess or RecordFailure
errCh <- f(runenv, ic)
default:
msg := fmt.Sprintf("unexpected function passed to Invoke*; expected types: TestCaseFn, InitializedTestCaseFn; was: %T", f)
Expand All @@ -162,7 +169,8 @@ func invoke(runenv *runtime.RunEnv, fn interface{}) {
}
case p := <-panicHandler:
// propagate the panic.
panic(p)
runenv.RecordCrash(p.DebugStacktrace)
panic(p.RecoverObj)
}
}

Expand Down
11 changes: 9 additions & 2 deletions run/panic.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package run

import "runtime/debug"

type PanicPayload struct {
RecoverObj interface{}
DebugStacktrace string
}

// panicHandler is where the top-level main goroutine panic handler is
// listening for panics.
var panicHandler = make(chan interface{})
var panicHandler = make(chan PanicPayload)

// HandlePanics should be called in a defer at the top of any goroutine that
// the test plan spawns, so that panics from children goroutine are propagated
Expand All @@ -13,5 +20,5 @@ func HandlePanics() {
if obj == nil {
return
}
panicHandler <- obj
panicHandler <- PanicPayload{obj, string(debug.Stack())}
}
4 changes: 2 additions & 2 deletions run/panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestPanicFromMain(t *testing.T) {
last = scanner.Text()
}

if !strings.Contains(last, "\"outcome\":\"crashed\"") {
if !strings.Contains(last, "\"crash_event\"") {
t.Fatalf("expected crashed event; got: %s", last)
}
}
Expand All @@ -68,7 +68,7 @@ func TestPanicFromChildGoroutine(t *testing.T) {
last = scanner.Text()
}

if !strings.Contains(last, "\"outcome\":\"crashed\"") {
if !strings.Contains(last, "\"crash_event\"") {
t.Fatalf("expected crashed event; got: %s", last)
}
}
13 changes: 5 additions & 8 deletions runtime/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,18 @@ func (m *Metrics) recordEvent(evt *Event) {
}

// this map copy is terrible; the influxdb v2 SDK makes points mutable.
tags := make(map[string]string, len(m.tags)+1)
tags := make(map[string]string, len(m.tags)+2)
for k, v := range m.tags {
tags[k] = v
}

if evt.Outcome != "" {
tags["outcome"] = string(evt.Outcome)
}

fields := map[string]interface{}{
"error": evt.Error,
"count": 1,
}

measurement := fmt.Sprintf("events.%s", string(evt.Type))
p, err := client.NewPoint(measurement, tags, fields)
tags["event_type"] = evt.Type()

p, err := client.NewPoint("events", tags, fields)
if err != nil {
m.re.RecordMessage("failed to create InfluxDB point: %s", err)
}
Expand Down
21 changes: 19 additions & 2 deletions runtime/runenv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runtime

import (
"context"
"os"
gosync "sync"
"time"
Expand Down Expand Up @@ -30,8 +31,9 @@ var (
type RunEnv struct {
RunParams

logger *zap.Logger
metrics *Metrics
logger *zap.Logger
metrics *Metrics
signalEmitter SignalEmitter

wg gosync.WaitGroup
closeCh chan struct{}
Expand Down Expand Up @@ -61,6 +63,7 @@ func NewRunEnv(params RunParams) *RunEnv {

re.structured.ch = make(chan *zap.Logger)
re.unstructured.ch = make(chan *os.File)
re.signalEmitter = &NilSignalEmitter{}

re.wg.Add(1)
go re.manageAssets()
Expand All @@ -70,6 +73,20 @@ func NewRunEnv(params RunParams) *RunEnv {
return re
}

type SignalEmitter interface {
SignalEvent(context.Context, *Event) error
}

type NilSignalEmitter struct{}

func (ne NilSignalEmitter) SignalEvent(ctx context.Context, event *Event) error {
return nil
}

func (re *RunEnv) AttachSyncClient(se SignalEmitter) {
re.signalEmitter = se
}

// R returns a metrics object for results.
func (re *RunEnv) R() *MetricsApi {
return re.metrics.R()
Expand Down
Loading