Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/development/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/bufio"
"github.com/brexhq/substation/internal/file"
"github.com/brexhq/substation/internal/json"
"golang.org/x/sync/errgroup"
)

type metadata struct {
Expand Down
5 changes: 5 additions & 0 deletions examples/streaming/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
local sub = import '../../build/config/substation.libsonnet';

sub.interfaces.processor.aggregate(
options={separator: ' '}
)
101 changes: 101 additions & 0 deletions examples/streaming/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Provides an example of how to use streaming processors, which use channels to
// process data. This example uses the aggregate processor, which aggregates data
// into a single item.
package main

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/process"

"golang.org/x/sync/errgroup"
)

func main() {
cfg, err := os.ReadFile("./config.json")
if err != nil {
panic(err)
}

conf := config.Config{}
if err := json.Unmarshal(cfg, &conf); err != nil {
panic(err)
}

// maintains state across goroutines and creates
// two channels used for ingesting and sinking data
// from the transform process. Substation channels are
// unbuffered and can be accessed directly, in addition
// to helper methods for sending data and closing the
// channel.
group, ctx := errgroup.WithContext(context.TODO())
in, out := config.NewChannel(), config.NewChannel()

// create and start the transform process. the process
// must run in a goroutine to prevent blocking
// the main goroutine.
agg, err := process.NewStreamer(ctx, conf)
if err != nil {
panic(err)
}

group.Go(func() error {
if err := agg.Stream(ctx, in, out); err != nil {
panic(err)
}

return nil
})

// reading data must start before writing data and run inside a
// goroutine to prevent blocking the main goroutine.
group.Go(func() error {
for capsule := range out.C {
select {
case <-ctx.Done():
return ctx.Err()
default:
// do something with the data
fmt.Printf("sink: %s\n", string(capsule.Data()))
}
}

return nil
})

// writing data to the ingest channel can be run from the main goroutine
// or inside a goroutine. the channel must be closed after all data
// is written -- this sends a signal to the transform process to
// stop reading from the channel.
data := [][]byte{
[]byte("this"),
[]byte("is"),
[]byte("a"),
[]byte("test"),
}

for _, d := range data {
fmt.Printf("ingest: %s\n", string(d))

capsule := config.NewCapsule()
capsule.SetData(d)

select {
case <-ctx.Done():
panic(ctx.Err())
default:
in.Send(capsule)
}
}
in.Close()

// wait for all goroutines to finish, otherwise the program will exit
// before the transform process completes.
if err := group.Wait(); err != nil {
panic(err)
}
}
113 changes: 113 additions & 0 deletions internal/transform/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package transform

import (
"context"
"sync"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/process"
"golang.org/x/sync/errgroup"
)

// stream transforms data by applying a series of processors to a pipeline of
// encapsulated data.
//
// Data processing is iterative and each processor is enabled through conditions.
type tformStream struct {
Processors []config.Config `json:"processors"`

streamers []process.Streamer
}

func newTformStream(ctx context.Context, cfg config.Config) (t tformStream, err error) {
if err = config.Decode(cfg.Settings, &t); err != nil {
return tformStream{}, err
}

t.streamers, err = process.NewStreamers(ctx, t.Processors...)
if err != nil {
return tformStream{}, err
}

return t, nil
}

// Transform processes a channel of encapsulated data with the transform.
func (t tformStream) Transform(ctx context.Context, wg *sync.WaitGroup, in, out *config.Channel) error {
go func() {
wg.Wait()
//nolint: errcheck // errors are ignored in case closing fails in a single applier
process.CloseStreamers(ctx, t.streamers...)
}()

group, ctx := errgroup.WithContext(ctx)

// each streamer has two channels, one for input and one for output, that create
// a pipeline. streamers are executed in order as goroutines so that capsules are
// processed in series.
prevChan := config.NewChannel()
firstChan := prevChan

for _, s := range t.streamers {
nextChan := config.NewChannel()
func(s process.Streamer, inner, outer *config.Channel) {
group.Go(func() error {
return s.Stream(ctx, inner, outer)
})
}(s, prevChan, nextChan)

prevChan = nextChan
}

// the last streamer in the pipeline sends to the sink (drain), and must
// be read before capsules are put into the pipeline to avoid deadlock.
var sent int
group.Go(func() error {
for capsule := range prevChan.C {
select {
case <-ctx.Done():
return ctx.Err()
default:
out.Send(capsule)
sent++
}
}

return nil
})

// the first streamer in the pipeline receives from the source, and must
// start after the drain goroutine to avoid deadlock.
var received int
for capsule := range in.C {
select {
case <-ctx.Done():
return ctx.Err()
default:
firstChan.Send(capsule)
received++
}
}

// this is required so that the pipeline goroutines can exit.
firstChan.Close()

// an error in any streamer will cause the entire pipeline, including sending
// to the sink, to fail.
if err := group.Wait(); err != nil {
return err
}

_ = metrics.Generate(ctx, metrics.Data{
Name: "CapsulesReceived",
Value: received,
})

_ = metrics.Generate(ctx, metrics.Data{
Name: "CapsulesSent",
Value: sent,
})

return nil
}
2 changes: 2 additions & 0 deletions internal/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) {
switch t := cfg.Type; t {
case "batch":
return newTformBatch(ctx, cfg)
case "stream":
return newTformStream(ctx, cfg)
case "transfer":
return newTformTransfer(ctx, cfg)
default:
Expand Down
Loading