diff --git a/cmd/development/substation/main.go b/cmd/development/substation/main.go index 5b736816..e07790e8 100644 --- a/cmd/development/substation/main.go +++ b/cmd/development/substation/main.go @@ -11,13 +11,12 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/brexhq/substation/cmd" "github.com/brexhq/substation/config" "github.com/brexhq/substation/internal/bufio" "github.com/brexhq/substation/internal/file" "github.com/brexhq/substation/internal/json" + "golang.org/x/sync/errgroup" ) type metadata struct { diff --git a/examples/streaming/config.jsonnet b/examples/streaming/config.jsonnet new file mode 100644 index 00000000..362a71e4 --- /dev/null +++ b/examples/streaming/config.jsonnet @@ -0,0 +1,5 @@ +local sub = import '../../build/config/substation.libsonnet'; + +sub.interfaces.processor.aggregate( + options={separator: ' '} +) diff --git a/examples/streaming/main.go b/examples/streaming/main.go new file mode 100644 index 00000000..0da825e6 --- /dev/null +++ b/examples/streaming/main.go @@ -0,0 +1,101 @@ +// Provides an example of how to use streaming processors, which use channels to +// process data. This example uses the aggregate processor, which aggregates data +// into a single item. +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/process" + + "golang.org/x/sync/errgroup" +) + +func main() { + cfg, err := os.ReadFile("./config.json") + if err != nil { + panic(err) + } + + conf := config.Config{} + if err := json.Unmarshal(cfg, &conf); err != nil { + panic(err) + } + + // maintains state across goroutines and creates + // two channels used for ingesting and sinking data + // from the transform process. Substation channels are + // unbuffered and can be accessed directly, in addition + // to helper methods for sending data and closing the + // channel. + group, ctx := errgroup.WithContext(context.TODO()) + in, out := config.NewChannel(), config.NewChannel() + + // create and start the transform process. the process + // must run in a goroutine to prevent blocking + // the main goroutine. + agg, err := process.NewStreamer(ctx, conf) + if err != nil { + panic(err) + } + + group.Go(func() error { + if err := agg.Stream(ctx, in, out); err != nil { + panic(err) + } + + return nil + }) + + // reading data must start before writing data and run inside a + // goroutine to prevent blocking the main goroutine. + group.Go(func() error { + for capsule := range out.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // do something with the data + fmt.Printf("sink: %s\n", string(capsule.Data())) + } + } + + return nil + }) + + // writing data to the ingest channel can be run from the main goroutine + // or inside a goroutine. the channel must be closed after all data + // is written -- this sends a signal to the transform process to + // stop reading from the channel. + data := [][]byte{ + []byte("this"), + []byte("is"), + []byte("a"), + []byte("test"), + } + + for _, d := range data { + fmt.Printf("ingest: %s\n", string(d)) + + capsule := config.NewCapsule() + capsule.SetData(d) + + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + in.Send(capsule) + } + } + in.Close() + + // wait for all goroutines to finish, otherwise the program will exit + // before the transform process completes. + if err := group.Wait(); err != nil { + panic(err) + } +} diff --git a/internal/transform/stream.go b/internal/transform/stream.go new file mode 100644 index 00000000..6d4bb393 --- /dev/null +++ b/internal/transform/stream.go @@ -0,0 +1,113 @@ +package transform + +import ( + "context" + "sync" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/internal/metrics" + "github.com/brexhq/substation/process" + "golang.org/x/sync/errgroup" +) + +// stream transforms data by applying a series of processors to a pipeline of +// encapsulated data. +// +// Data processing is iterative and each processor is enabled through conditions. +type tformStream struct { + Processors []config.Config `json:"processors"` + + streamers []process.Streamer +} + +func newTformStream(ctx context.Context, cfg config.Config) (t tformStream, err error) { + if err = config.Decode(cfg.Settings, &t); err != nil { + return tformStream{}, err + } + + t.streamers, err = process.NewStreamers(ctx, t.Processors...) + if err != nil { + return tformStream{}, err + } + + return t, nil +} + +// Transform processes a channel of encapsulated data with the transform. +func (t tformStream) Transform(ctx context.Context, wg *sync.WaitGroup, in, out *config.Channel) error { + go func() { + wg.Wait() + //nolint: errcheck // errors are ignored in case closing fails in a single applier + process.CloseStreamers(ctx, t.streamers...) + }() + + group, ctx := errgroup.WithContext(ctx) + + // each streamer has two channels, one for input and one for output, that create + // a pipeline. streamers are executed in order as goroutines so that capsules are + // processed in series. + prevChan := config.NewChannel() + firstChan := prevChan + + for _, s := range t.streamers { + nextChan := config.NewChannel() + func(s process.Streamer, inner, outer *config.Channel) { + group.Go(func() error { + return s.Stream(ctx, inner, outer) + }) + }(s, prevChan, nextChan) + + prevChan = nextChan + } + + // the last streamer in the pipeline sends to the sink (drain), and must + // be read before capsules are put into the pipeline to avoid deadlock. + var sent int + group.Go(func() error { + for capsule := range prevChan.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + out.Send(capsule) + sent++ + } + } + + return nil + }) + + // the first streamer in the pipeline receives from the source, and must + // start after the drain goroutine to avoid deadlock. + var received int + for capsule := range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + firstChan.Send(capsule) + received++ + } + } + + // this is required so that the pipeline goroutines can exit. + firstChan.Close() + + // an error in any streamer will cause the entire pipeline, including sending + // to the sink, to fail. + if err := group.Wait(); err != nil { + return err + } + + _ = metrics.Generate(ctx, metrics.Data{ + Name: "CapsulesReceived", + Value: received, + }) + + _ = metrics.Generate(ctx, metrics.Data{ + Name: "CapsulesSent", + Value: sent, + }) + + return nil +} diff --git a/internal/transform/transform.go b/internal/transform/transform.go index 421bc2db..02c4e138 100644 --- a/internal/transform/transform.go +++ b/internal/transform/transform.go @@ -18,6 +18,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { switch t := cfg.Type; t { case "batch": return newTformBatch(ctx, cfg) + case "stream": + return newTformStream(ctx, cfg) case "transfer": return newTformTransfer(ctx, cfg) default: diff --git a/process/aggregate.go b/process/aggregate.go index d1c029be..e35cd4fe 100644 --- a/process/aggregate.go +++ b/process/aggregate.go @@ -90,8 +90,83 @@ func newProcAggregate(ctx context.Context, cfg config.Config) (p procAggregate, return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procAggregate) Stream(ctx context.Context, in, out *config.Channel) error { + defer out.Close() + + // aggregateKeys is used to return data stored in the + // buffer in order if the aggregate doesn't meet the + // configured threshold. any aggregate that meets the + // threshold is delivered immediately, out of order. + var aggregateKeys []string + buffer := map[string]*aggregate.Bytes{} + + for capsule := range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return fmt.Errorf("process: aggregate: %v", err) + } else if !ok { + out.C <- capsule + continue + } + + // data that exceeds the size of the buffer will never + // fit within it + length := len(capsule.Data()) + if length > p.Options.MaxSize { + return fmt.Errorf("process: aggregate: size %d data length %d: %v", p.Options.MaxSize, length, errAggregateSizeLimit) + } + + var aggregateKey string + if p.Options.Key != "" { + aggregateKey = capsule.Get(p.Options.Key).String() + } + + if _, ok := buffer[aggregateKey]; !ok { + buffer[aggregateKey] = &aggregate.Bytes{} + buffer[aggregateKey].New(p.Options.MaxCount, p.Options.MaxSize) + aggregateKeys = append(aggregateKeys, aggregateKey) + } + + ok := buffer[aggregateKey].Add(capsule.Data()) + // data was successfully added to the buffer, every item after + // this is a failure + if ok { + continue + } + + // the buffer is full, emit the aggregated data + data := buffer[aggregateKey].Get() + c, err := p.newCapsule(data) + if err != nil { + return fmt.Errorf("process: aggregate: %v", err) + } + out.Send(c) + + // by this point, addition of the failed data is guaranteed + // to succeed after the buffer is reset + buffer[aggregateKey].Reset() + _ = buffer[aggregateKey].Add(capsule.Data()) + } + } + + // emit any remaining data in the buffer + for _, aggregateKey := range aggregateKeys { + data := buffer[aggregateKey].Get() + c, err := p.newCapsule(data) + if err != nil { + return fmt.Errorf("process: aggregate: %v", err) + } + out.Send(c) + } + + return nil +} + +// Batch processes one or more capsules with the processor. func (p procAggregate) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { // aggregateKeys is used to return elements stored in the // buffer in order if the aggregate doesn't meet the @@ -102,12 +177,9 @@ func (p procAggregate) Batch(ctx context.Context, capsules ...config.Capsule) ([ newCapsules := newBatch(&capsules) for _, capsule := range capsules { - ok, err := p.operator.Operate(ctx, capsule) - if err != nil { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { return nil, fmt.Errorf("process: aggregate: %v", err) - } - - if !ok { + } else if !ok { newCapsules = append(newCapsules, capsule) continue } @@ -130,34 +202,19 @@ func (p procAggregate) Batch(ctx context.Context, capsules ...config.Capsule) ([ aggregateKeys = append(aggregateKeys, aggregateKey) } - ok = buffer[aggregateKey].Add(capsule.Data()) + ok := buffer[aggregateKey].Add(capsule.Data()) // data was successfully added to the buffer, every item after // this is a failure if ok { continue } - newCapsule := config.NewCapsule() - elements := buffer[aggregateKey].Get() - if p.SetKey != "" { - var value []byte - for _, element := range elements { - var err error - - value, err = json.Set(value, p.SetKey, element) - if err != nil { - return nil, fmt.Errorf("process: aggregate: %v", err) - } - } - - newCapsule.SetData(value) - newCapsules = append(newCapsules, newCapsule) - } else { - value := bytes.Join(elements, []byte(p.Options.Separator)) - - newCapsule.SetData(value) - newCapsules = append(newCapsules, newCapsule) + data := buffer[aggregateKey].Get() + c, err := p.newCapsule(data) + if err != nil { + return nil, fmt.Errorf("process: aggregate: %v", err) } + newCapsules = append(newCapsules, c) // by this point, addition of the failed data is guaranteed // to succeed after the buffer is reset @@ -167,33 +224,43 @@ func (p procAggregate) Batch(ctx context.Context, capsules ...config.Capsule) ([ // remaining items must be drained from the buffer, otherwise // data is lost - newCapsule := config.NewCapsule() for _, key := range aggregateKeys { if buffer[key].Count() == 0 { continue } - elements := buffer[key].Get() - if p.SetKey != "" { - var value []byte - for _, element := range elements { - var err error + data := buffer[key].Get() + c, err := p.newCapsule(data) + if err != nil { + return nil, fmt.Errorf("process: aggregate: %v", err) + } + + newCapsules = append(newCapsules, c) + } - value, err = json.Set(value, p.SetKey, element) - if err != nil { - return nil, fmt.Errorf("process: aggregate: %v", err) - } - } + return newCapsules, nil +} - newCapsule.SetData(value) - newCapsules = append(newCapsules, newCapsule) - } else { - value := bytes.Join(elements, []byte(p.Options.Separator)) +func (p procAggregate) newCapsule(data [][]byte) (config.Capsule, error) { + newCapsule := config.NewCapsule() + + if p.SetKey != "" { + var value []byte + for _, element := range data { + var err error - newCapsule.SetData(value) - newCapsules = append(newCapsules, newCapsule) + value, err = json.Set(value, p.SetKey, element) + if err != nil { + return newCapsule, err + } } + + newCapsule.SetData(value) + return newCapsule, nil } - return newCapsules, nil + value := bytes.Join(data, []byte(p.Options.Separator)) + newCapsule.SetData(value) + + return newCapsule, nil } diff --git a/process/aggregate_test.go b/process/aggregate_test.go index 4e26f858..b337f99c 100644 --- a/process/aggregate_test.go +++ b/process/aggregate_test.go @@ -6,6 +6,12 @@ import ( "testing" "github.com/brexhq/substation/config" + "golang.org/x/sync/errgroup" +) + +var ( + _ Batcher = procAggregate{} + _ Streamer = procAggregate{} ) var aggregateTests = []struct { @@ -205,7 +211,6 @@ func TestAggregate(t *testing.T) { if err != nil { t.Fatal(err) } - var _ Batcher = proc var capsules []config.Capsule for _, t := range test.test { @@ -228,6 +233,52 @@ func TestAggregate(t *testing.T) { } } +func TestAggregateStream(t *testing.T) { + capsule := config.NewCapsule() + + for _, test := range aggregateTests { + t.Run(test.name, func(t *testing.T) { + group, ctx := errgroup.WithContext(context.TODO()) + in, out := config.NewChannel(), config.NewChannel() + + proc, err := newProcAggregate(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + group.Go(func() error { + if err := proc.Stream(ctx, in, out); err != nil { + t.Error(err) + } + + return nil + }) + + group.Go(func() error { + var i int + for res := range out.C { + expected := test.expected[i] + if !bytes.Equal(expected, res.Data()) { + t.Errorf("expected %s, got %s", expected, string(res.Data())) + } + i++ + } + return nil + }) + + for _, t := range test.test { + capsule.SetData(t) + in.Send(capsule) + } + in.Close() + + if err := group.Wait(); err != nil { + t.Error(err) + } + }) + } +} + func benchmarkAggregate(b *testing.B, batcher procAggregate, capsules []config.Capsule) { ctx := context.TODO() for i := 0; i < b.N; i++ { diff --git a/process/aws_dynamodb.go b/process/aws_dynamodb.go index 3cce0558..e5502bcb 100644 --- a/process/aws_dynamodb.go +++ b/process/aws_dynamodb.go @@ -96,14 +96,24 @@ func newProcAWSDynamoDB(ctx context.Context, cfg config.Config) (p procAWSDynamo return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procAWSDynamoDB) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procAWSDynamoDB) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procAWSDynamoDB) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: aws_dynamodb: %v", err) + } else if !ok { + return capsule, nil + } + // lazy load API if !dynamodbAPI.IsEnabled() { dynamodbAPI.Setup() diff --git a/process/aws_dynamodb_test.go b/process/aws_dynamodb_test.go index 2ba817f1..26249624 100644 --- a/process/aws_dynamodb_test.go +++ b/process/aws_dynamodb_test.go @@ -13,6 +13,12 @@ import ( ddb "github.com/brexhq/substation/internal/aws/dynamodb" ) +var ( + _ Applier = procAWSDynamoDB{} + _ Batcher = procAWSDynamoDB{} + _ Streamer = procAWSDynamoDB{} +) + type mockedQuery struct { dynamodbiface.DynamoDBAPI Resp dynamodb.QueryOutput @@ -24,7 +30,7 @@ func (m mockedQuery) QueryWithContext(ctx aws.Context, input *dynamodb.QueryInpu var dynamodbTests = []struct { name string - proc procAWSDynamoDB + cfg config.Config test []byte expected []byte err error @@ -32,14 +38,15 @@ var dynamodbTests = []struct { }{ { "JSON", - procAWSDynamoDB{ - process: process{ - Key: "foo", - SetKey: "foo", - }, - Options: procAWSDynamoDBOptions{ - Table: "fooer", - KeyConditionExpression: "barre", + config.Config{ + Type: "aws_dynamodb", + Settings: map[string]interface{}{ + "key": "foo", + "set_key": "foo", + "options": map[string]interface{}{ + "table": "fooer", + "key_condition_expression": "barre", + }, }, }, []byte(`{"foo":{"PK":"bar"}}`), @@ -66,13 +73,15 @@ func TestDynamoDB(t *testing.T) { capsule := config.NewCapsule() for _, test := range dynamodbTests { - var _ Applier = test.proc - var _ Batcher = test.proc - dynamodbAPI = test.api - capsule.SetData(test.test) - result, err := test.proc.Apply(ctx, capsule) + proc, err := newProcAWSDynamoDB(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + capsule.SetData(test.test) + result, err := proc.Apply(ctx, capsule) if err != nil { t.Error(err) } @@ -91,13 +100,20 @@ func benchmarkDynamoDB(b *testing.B, applier procAWSDynamoDB, test config.Capsul } func BenchmarkDynamoDB(b *testing.B) { + ctx := context.TODO() capsule := config.NewCapsule() + for _, test := range dynamodbTests { b.Run(test.name, func(b *testing.B) { dynamodbAPI = test.api + proc, err := newProcAWSDynamoDB(ctx, test.cfg) + if err != nil { + b.Fatal(err) + } + capsule.SetData(test.test) - benchmarkDynamoDB(b, test.proc, capsule) + benchmarkDynamoDB(b, proc, capsule) }, ) } diff --git a/process/aws_lambda.go b/process/aws_lambda.go index e24bc586..d5031048 100644 --- a/process/aws_lambda.go +++ b/process/aws_lambda.go @@ -67,14 +67,24 @@ func newProcAWSLambda(ctx context.Context, cfg config.Config) (p procAWSLambda, return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procAWSLambda) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procAWSLambda) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procAWSLambda) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: aws_lambda: %v", err) + } else if !ok { + return capsule, nil + } + // lazy load API if !lambdaAPI.IsEnabled() { lambdaAPI.Setup() diff --git a/process/aws_lambda_test.go b/process/aws_lambda_test.go index 134222c7..0636b34a 100644 --- a/process/aws_lambda_test.go +++ b/process/aws_lambda_test.go @@ -13,6 +13,12 @@ import ( lamb "github.com/brexhq/substation/internal/aws/lambda" ) +var ( + _ Applier = procAWSLambda{} + _ Batcher = procAWSLambda{} + _ Streamer = procAWSLambda{} +) + type mockedInvoke struct { lambdaiface.LambdaAPI Resp lambda.InvokeOutput @@ -24,7 +30,7 @@ func (m mockedInvoke) InvokeWithContext(ctx aws.Context, input *lambda.InvokeInp var lambdaTests = []struct { name string - proc procAWSLambda + cfg config.Config test []byte expected []byte err error @@ -32,13 +38,14 @@ var lambdaTests = []struct { }{ { "JSON", - procAWSLambda{ - process: process{ - Key: "foo", - SetKey: "foo", - }, - Options: procAWSLambdaOptions{ - FunctionName: "fooer", + config.Config{ + Type: "aws_lambda", + Settings: map[string]interface{}{ + "key": "foo", + "set_key": "foo", + "options": map[string]interface{}{ + "function_name": "fooer", + }, }, }, []byte(`{"foo":{"bar":"baz"}}`), @@ -59,13 +66,14 @@ func TestLambda(t *testing.T) { capsule := config.NewCapsule() for _, test := range lambdaTests { - var _ Applier = test.proc - var _ Batcher = test.proc - lambdaAPI = test.api - capsule.SetData(test.test) + proc, err := newProcAWSLambda(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } - result, err := test.proc.Apply(ctx, capsule) + capsule.SetData(test.test) + result, err := proc.Apply(ctx, capsule) if err != nil { t.Error(err) } @@ -84,12 +92,19 @@ func benchmarkLambda(b *testing.B, applier procAWSLambda, test config.Capsule) { } func BenchmarkLambda(b *testing.B) { + ctx := context.TODO() capsule := config.NewCapsule() + for _, test := range lambdaTests { b.Run(test.name, func(b *testing.B) { + proc, err := newProcAWSLambda(ctx, test.cfg) + if err != nil { + b.Fatal(err) + } + capsule.SetData(test.test) - benchmarkLambda(b, test.proc, capsule) + benchmarkLambda(b, proc, capsule) }, ) } diff --git a/process/base64.go b/process/base64.go index f644303d..afd85bb7 100644 --- a/process/base64.go +++ b/process/base64.go @@ -71,14 +71,24 @@ func (p procBase64) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procBase64) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procBase64) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procBase64) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: base64: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/base64_test.go b/process/base64_test.go index 2ed5f183..29dc39fc 100644 --- a/process/base64_test.go +++ b/process/base64_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procBase64{} - _ Batcher = procBase64{} + _ Applier = procBase64{} + _ Batcher = procBase64{} + _ Streamer = procBase64{} ) var base64Tests = []struct { diff --git a/process/capture.go b/process/capture.go index a6634bcf..86f849e2 100644 --- a/process/capture.go +++ b/process/capture.go @@ -96,14 +96,24 @@ func (p procCapture) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procCapture) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procCapture) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procCapture) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: capture: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/capture_test.go b/process/capture_test.go index d2d86b97..369c0553 100644 --- a/process/capture_test.go +++ b/process/capture_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procCapture{} - _ Batcher = procCapture{} + _ Applier = procCapture{} + _ Batcher = procCapture{} + _ Streamer = procCapture{} ) var captureTests = []struct { diff --git a/process/case.go b/process/case.go index 688d511c..e440ae63 100644 --- a/process/case.go +++ b/process/case.go @@ -80,14 +80,24 @@ func (p procCase) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procCase) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procCase) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procCase) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: case: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/case_test.go b/process/case_test.go index 49d74962..0d9a815d 100644 --- a/process/case_test.go +++ b/process/case_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procCase{} - _ Batcher = procCase{} + _ Applier = procCase{} + _ Batcher = procCase{} + _ Streamer = procCase{} ) var caseTests = []struct { diff --git a/process/convert.go b/process/convert.go index 2bc3b6a7..79fbf627 100644 --- a/process/convert.go +++ b/process/convert.go @@ -68,14 +68,24 @@ func (p procConvert) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procConvert) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procConvert) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procConvert) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: convert: %v", err) + } else if !ok { + return capsule, nil + } + // only supports JSON, error early if there are no keys if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key) diff --git a/process/convert_test.go b/process/convert_test.go index be7dfea4..c1ce7363 100644 --- a/process/convert_test.go +++ b/process/convert_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procConvert{} - _ Batcher = procConvert{} + _ Applier = procConvert{} + _ Batcher = procConvert{} + _ Streamer = procConvert{} ) var convertTests = []struct { diff --git a/process/copy.go b/process/copy.go index b5a43484..5330c1b8 100644 --- a/process/copy.go +++ b/process/copy.go @@ -39,14 +39,24 @@ func (p procCopy) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procCopy) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procCopy) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procCopy) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: copy: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { if err := capsule.Set(p.SetKey, capsule.Get(p.Key)); err != nil { diff --git a/process/copy_test.go b/process/copy_test.go index 22917f87..61f2cae6 100644 --- a/process/copy_test.go +++ b/process/copy_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procCopy{} - _ Batcher = procCopy{} + _ Applier = procCopy{} + _ Batcher = procCopy{} + _ Streamer = procCopy{} ) var copyTests = []struct { diff --git a/process/count.go b/process/count.go index 383f8cdb..78731f9b 100644 --- a/process/count.go +++ b/process/count.go @@ -31,8 +31,30 @@ func (p procCount) String() string { return toString(p) } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procCount) Stream(ctx context.Context, in, out *config.Channel) error { + defer out.Close() + + var count int + for range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + count++ + } + } + + newCapsule := config.NewCapsule() + if err := newCapsule.Set("count", count); err != nil { + return fmt.Errorf("process: count: : %v", err) + } + + out.Send(newCapsule) + return nil +} + +// Batch processes one or more capsules with the processor. func (p procCount) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { newCapsule := config.NewCapsule() if err := newCapsule.Set("count", len(capsules)); err != nil { diff --git a/process/count_test.go b/process/count_test.go index f96ac7b7..f03346be 100644 --- a/process/count_test.go +++ b/process/count_test.go @@ -6,9 +6,13 @@ import ( "testing" "github.com/brexhq/substation/config" + "golang.org/x/sync/errgroup" ) -var _ Batcher = procCount{} +var ( + _ Batcher = procCount{} + _ Streamer = procCount{} +) var countTests = []struct { name string @@ -88,3 +92,48 @@ func BenchmarkCount(b *testing.B) { ) } } + +func TestCountStream(t *testing.T) { + capsule := config.NewCapsule() + + for _, test := range countTests { + t.Run(test.name, func(t *testing.T) { + group, ctx := errgroup.WithContext(context.TODO()) + in, out := config.NewChannel(), config.NewChannel() + + proc, err := newProcCount(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + group.Go(func() error { + if err := proc.Stream(ctx, in, out); err != nil { + t.Error(err) + } + + return nil + }) + + group.Go(func() error { + var i int + for res := range out.C { + if !bytes.Equal(test.expected, res.Data()) { + t.Errorf("expected %s, got %s", test.expected, string(res.Data())) + } + i++ + } + return nil + }) + + for _, t := range test.test { + capsule.SetData(t) + in.Send(capsule) + } + in.Close() + + if err := group.Wait(); err != nil { + t.Error(err) + } + }) + } +} diff --git a/process/delete.go b/process/delete.go index 9c646c7c..abe6213a 100644 --- a/process/delete.go +++ b/process/delete.go @@ -42,14 +42,24 @@ func (p procDelete) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procDelete) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procDelete) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procDelete) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: delete: %v", err) + } else if !ok { + return capsule, nil + } + if err := capsule.Delete(p.Key); err != nil { return capsule, fmt.Errorf("process: delete: %v", err) } diff --git a/process/delete_test.go b/process/delete_test.go index f3722d33..7eb24d55 100644 --- a/process/delete_test.go +++ b/process/delete_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procDelete{} - _ Batcher = procDelete{} + _ Applier = procDelete{} + _ Batcher = procDelete{} + _ Streamer = procDelete{} ) var deleteTests = []struct { diff --git a/process/dns.go b/process/dns.go index 0e36b43c..518e3a5c 100644 --- a/process/dns.go +++ b/process/dns.go @@ -84,16 +84,26 @@ func (p procDNS) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procDNS) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procDNS) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. // -//nolint:gocognit +//nolint:gocognit,cyclop // ignore complexity func (p procDNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: delete: %v", err) + } else if !ok { + return capsule, nil + } + var timeout time.Duration if p.Options.Timeout != 0 { timeout = time.Duration(p.Options.Timeout) * time.Millisecond diff --git a/process/domain.go b/process/domain.go index e0228971..b05d01b1 100644 --- a/process/domain.go +++ b/process/domain.go @@ -80,14 +80,24 @@ func newProcDomain(ctx context.Context, cfg config.Config) (p procDomain, err er return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procDomain) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procDomain) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procDomain) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: delete: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/domain_test.go b/process/domain_test.go index ff2b7f61..f275dd17 100644 --- a/process/domain_test.go +++ b/process/domain_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procDomain{} - _ Batcher = procDomain{} + _ Applier = procDomain{} + _ Batcher = procDomain{} + _ Streamer = procDomain{} ) var domainTests = []struct { diff --git a/process/drop.go b/process/drop.go index 862d0e6a..e515ede3 100644 --- a/process/drop.go +++ b/process/drop.go @@ -39,17 +39,34 @@ func (p procDrop) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procDrop) Stream(ctx context.Context, in, out *config.Channel) error { + defer out.Close() + + for capsule := range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return fmt.Errorf("process: drop: %v", err) + } else if !ok { + out.Send(capsule) + continue + } + } + } + + return nil +} + +// Batch processes one or more capsules with the processor. func (p procDrop) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { newCapsules := newBatch(&capsules) for _, capsule := range capsules { - ok, err := p.operator.Operate(ctx, capsule) - if err != nil { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { return nil, fmt.Errorf("process: drop: %v", err) - } - - if !ok { + } else if !ok { newCapsules = append(newCapsules, capsule) continue } diff --git a/process/drop_test.go b/process/drop_test.go index 81c9786f..0cf85aa2 100644 --- a/process/drop_test.go +++ b/process/drop_test.go @@ -7,7 +7,10 @@ import ( "github.com/brexhq/substation/config" ) -var _ Batcher = procDrop{} +var ( + _ Batcher = procDrop{} + _ Streamer = procDrop{} +) var dropTests = []struct { name string diff --git a/process/example_test.go b/process/example_test.go index 4e9d4a9a..a4dbe4e5 100644 --- a/process/example_test.go +++ b/process/example_test.go @@ -6,6 +6,7 @@ import ( "github.com/brexhq/substation/config" "github.com/brexhq/substation/process" + "golang.org/x/sync/errgroup" ) func ExampleNewApplier() { @@ -168,6 +169,54 @@ func ExampleBatchBytes() { // Output: {"foo":"fizz","bar":"fizz"} } +func ExampleNewStreamer() { + ctx := context.TODO() + + // copies the value of key "a" into key "z" + cfg := config.Config{ + Type: "copy", + Settings: map[string]interface{}{ + "key": "a", + "set_key": "z", + }, + } + + // create a streamer from the config + streamer, err := process.NewStreamer(ctx, cfg) + if err != nil { + // handle err + panic(err) + } + + fmt.Println(streamer) + // Output: {"condition":{"operator":"","inspectors":null},"key":"a","set_key":"z","ignore_close":false,"ignore_errors":false} +} + +func ExampleNewStreamers() { + ctx := context.TODO() + + // copies the value of key "a" into key "z" + cfg := config.Config{ + Type: "copy", + Settings: map[string]interface{}{ + "key": "a", + "set_key": "z", + }, + } + + // one or more streamers are created + streamers, err := process.NewStreamers(ctx, cfg) + if err != nil { + // handle err + panic(err) + } + + for _, s := range streamers { + fmt.Println(s) + } + // Output: {"condition":{"operator":"","inspectors":null},"key":"a","set_key":"z","ignore_close":false,"ignore_errors":false} +} + func Example_applier() { ctx := context.TODO() @@ -247,6 +296,65 @@ func Example_batcher() { // {"foo":"fizz","bar":"fizz"} } +func Example_streamer() { + ctx := context.TODO() + + // copies the value of key "a" into key "z" + cfg := config.Config{ + Type: "copy", + Settings: map[string]interface{}{ + "key": "a", + "set_key": "z", + }, + } + + // streamer is retrieved from the factory + streamer, err := process.NewStreamer(ctx, cfg) + if err != nil { + // handle err + panic(err) + } + + // setup an error group and channels for streaming data + group, ctx := errgroup.WithContext(ctx) + in, out := config.NewChannel(), config.NewChannel() + + // first, the streamer goroutine is started + group.Go(func() error { + if err := streamer.Stream(ctx, in, out); err != nil { + panic(err) + } + + return nil + }) + + // second, the reader goroutine is started. reading must precede + // writing to prevent blocking the main goroutine. + group.Go(func() error { + for capsule := range out.C { + fmt.Println(string(capsule.Data())) + } + + return nil + }) + + // last, writer sends data to the streamer. + capsule := config.NewCapsule() + capsule.SetData([]byte(`{"a":"b"}`)) + + in.Send(capsule) + in.Close() + + // an error in any streamer will cause the entire pipeline, including sending + // to the sink, to fail. + if err := group.Wait(); err != nil { + panic(err) + } + + // Output: + // {"a":"b","z":"b"} +} + func Example_dNS() { ctx := context.TODO() diff --git a/process/expand.go b/process/expand.go index 0a34fdef..b29e35cb 100644 --- a/process/expand.go +++ b/process/expand.go @@ -40,17 +40,113 @@ func (p procExpand) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procExpand) Stream(ctx context.Context, in, out *config.Channel) error { + defer out.Close() + + for capsule := range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return fmt.Errorf("process: expand: %v", err) + } else if !ok { + out.Send(capsule) + continue + } + + // data is processed by retrieving and iterating an + // array containing JSON objects (result) and setting + // any remaining keys from the object (remains) into + // each new object. if there is no Key, then the input + // is treated as an array. + // + // input: + // {"expand":[{"foo":"bar"},{"baz":"qux"}],"quux":"corge"} + // result: + // [{"foo":"bar"},{"baz":"qux"}] + // remains: + // {"quux":"corge"} + // output: + // {"foo":"bar","quux":"corge"} + // {"baz":"qux","quux":"corge"} + var result, remains json.Result + + if p.Key != "" { + result = json.Get(capsule.Data(), p.Key) + // deleting the key from the object speeds + // up processing large objects. + if err := capsule.Delete(p.Key); err != nil { + return fmt.Errorf("process: expand: %v", err) + } + + remains = json.Get(capsule.Data(), "@this") + } else { + // remains is unused when there is no key + result = json.Get(capsule.Data(), "@this") + } + + for _, res := range result.Array() { + // retains metadata from the original event + newCapsule := capsule + newCapsule.SetData([]byte{}) + + // data processing + // + // elements from the array become new data. + if p.Key == "" { + newCapsule.SetData([]byte(res.String())) + out.Send(newCapsule) + continue + } + + // object processing + // + // remaining keys from the original object are added + // to the new object. + for key, val := range remains.Map() { + if err := newCapsule.Set(key, val); err != nil { + return fmt.Errorf("process: expand: %v", err) + } + } + + if p.SetKey != "" { + if err := newCapsule.Set(p.SetKey, res); err != nil { + return fmt.Errorf("process: expand: %v", err) + } + + out.Send(newCapsule) + continue + } + + // at this point there should be two objects that need to be + // merged into a single object. the objects are merged using + // the GJSON @join function, which joins all objects that are + // in an array. if the array contains non-object data, then + // it is ignored. + // + // [{"foo":"bar"},{"baz":"qux"}}] becomes {"foo":"bar","baz":"qux"} + // [{"foo":"bar"},{"baz":"qux"},"quux"] becomes {"foo":"bar","baz":"qux"} + tmp := fmt.Sprintf(`[%s,%s]`, newCapsule.Data(), res.String()) + join := json.Get([]byte(tmp), "@join") + newCapsule.SetData([]byte(join.String())) + + out.Send(newCapsule) + } + } + } + + return nil +} + +// Batch processes one or more capsules with the processor. func (p procExpand) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { newCapsules := newBatch(&capsules) for _, capsule := range capsules { - ok, err := p.operator.Operate(ctx, capsule) - if err != nil { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { return nil, fmt.Errorf("process: expand: %v", err) - } - - if !ok { + } else if !ok { newCapsules = append(newCapsules, capsule) continue } @@ -106,7 +202,7 @@ func (p procExpand) Batch(ctx context.Context, capsules ...config.Capsule) ([]co // remaining keys from the original object are added // to the new object. for key, val := range remains.Map() { - if err = newCapsule.Set(key, val); err != nil { + if err := newCapsule.Set(key, val); err != nil { return nil, fmt.Errorf("process: expand: %v", err) } } diff --git a/process/expand_test.go b/process/expand_test.go index 743fcf6d..ac9ff56b 100644 --- a/process/expand_test.go +++ b/process/expand_test.go @@ -6,9 +6,13 @@ import ( "testing" "github.com/brexhq/substation/config" + "golang.org/x/sync/errgroup" ) -var _ Batcher = procExpand{} +var ( + _ Batcher = procExpand{} + _ Streamer = procExpand{} +) var expandTests = []struct { name string @@ -195,3 +199,47 @@ func BenchmarkExpand(b *testing.B) { ) } } + +func TestExpandStream(t *testing.T) { + capsule := config.NewCapsule() + + for _, test := range expandTests { + t.Run(test.name, func(t *testing.T) { + group, ctx := errgroup.WithContext(context.TODO()) + in, out := config.NewChannel(), config.NewChannel() + + proc, err := newProcExpand(ctx, test.cfg) + if err != nil { + t.Fatal(err) + } + + group.Go(func() error { + if err := proc.Stream(ctx, in, out); err != nil { + t.Error(err) + } + + return nil + }) + + group.Go(func() error { + var i int + for res := range out.C { + expected := test.expected[i] + if !bytes.Equal(expected, res.Data()) { + t.Errorf("expected %s, got %s", expected, string(res.Data())) + } + i++ + } + return nil + }) + + capsule.SetData(test.test) + in.Send(capsule) + in.Close() + + if err := group.Wait(); err != nil { + t.Error(err) + } + }) + } +} diff --git a/process/flatten.go b/process/flatten.go index 4c9521ca..fced341e 100644 --- a/process/flatten.go +++ b/process/flatten.go @@ -53,14 +53,24 @@ func (p procFlatten) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procFlatten) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procFlatten) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procFlatten) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: flatten: %v", err) + } else if !ok { + return capsule, nil + } + var value interface{} if p.Options.Deep { value = capsule.Get(p.Key + `|@flatten:{"deep":true}`) diff --git a/process/flatten_test.go b/process/flatten_test.go index 910f0701..40d7b532 100644 --- a/process/flatten_test.go +++ b/process/flatten_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procFlatten{} - _ Batcher = procFlatten{} + _ Applier = procFlatten{} + _ Batcher = procFlatten{} + _ Streamer = procFlatten{} ) var flattenTests = []struct { diff --git a/process/for_each.go b/process/for_each.go index a9569913..c82e550a 100644 --- a/process/for_each.go +++ b/process/for_each.go @@ -86,14 +86,24 @@ func (p procForEach) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procForEach) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procForEach) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procForEach) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: for_each: %v", err) + } else if !ok { + return capsule, nil + } + result := capsule.Get(p.Key) if !result.IsArray() { return capsule, nil diff --git a/process/for_each_test.go b/process/for_each_test.go index b001bc46..1789c7d5 100644 --- a/process/for_each_test.go +++ b/process/for_each_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procForEach{} - _ Batcher = procForEach{} + _ Applier = procForEach{} + _ Batcher = procForEach{} + _ Streamer = procForEach{} ) var forEachTests = []struct { diff --git a/process/group.go b/process/group.go index 54d3486f..f2bcea3c 100644 --- a/process/group.go +++ b/process/group.go @@ -55,14 +55,24 @@ func (p procGroup) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procGroup) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procGroup) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procGroup) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: group: %v", err) + } else if !ok { + return capsule, nil + } + if len(p.Options.Keys) == 0 { // elements in the values array are stored at their // relative position inside the map to maintain order diff --git a/process/group_test.go b/process/group_test.go index 4051dcca..959bff1c 100644 --- a/process/group_test.go +++ b/process/group_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procGroup{} - _ Batcher = procGroup{} + _ Applier = procGroup{} + _ Batcher = procGroup{} + _ Streamer = procGroup{} ) var groupTests = []struct { diff --git a/process/gzip.go b/process/gzip.go index b4897335..ccfa831a 100644 --- a/process/gzip.go +++ b/process/gzip.go @@ -90,14 +90,24 @@ func (p procGzip) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procGzip) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procGzip) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procGzip) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: gzip: %v", err) + } else if !ok { + return capsule, nil + } + var value []byte switch p.Options.Direction { case "from": diff --git a/process/gzip_test.go b/process/gzip_test.go index 231e7aad..0d383df7 100644 --- a/process/gzip_test.go +++ b/process/gzip_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procGzip{} - _ Batcher = procGzip{} + _ Applier = procGzip{} + _ Batcher = procGzip{} + _ Streamer = procGzip{} ) var gzipTests = []struct { diff --git a/process/hash.go b/process/hash.go index 949b5acb..fdbe3a1f 100644 --- a/process/hash.go +++ b/process/hash.go @@ -69,14 +69,24 @@ func (p procHash) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procHash) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procHash) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procHash) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: hash: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/hash_test.go b/process/hash_test.go index 8d4ebcbc..31acdc07 100644 --- a/process/hash_test.go +++ b/process/hash_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procHash{} - _ Batcher = procHash{} + _ Applier = procHash{} + _ Batcher = procHash{} + _ Streamer = procHash{} ) var hashTests = []struct { diff --git a/process/http.go b/process/http.go index 387e2e06..1b38cd84 100644 --- a/process/http.go +++ b/process/http.go @@ -115,14 +115,24 @@ func newProcHTTP(ctx context.Context, cfg config.Config) (p procHTTP, err error) return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procHTTP) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procHTTP) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procHTTP) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: http: %v", err) + } else if !ok { + return capsule, nil + } + // the URL can exist in three states: // // - no interpolation, the URL is unchanged diff --git a/process/insert.go b/process/insert.go index f2cac137..d29d2c25 100644 --- a/process/insert.go +++ b/process/insert.go @@ -50,14 +50,24 @@ func (p procInsert) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procInsert) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procInsert) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procInsert) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: insert: %v", err) + } else if !ok { + return capsule, nil + } + if err := capsule.Set(p.SetKey, p.Options.Value); err != nil { return capsule, fmt.Errorf("process: insert: %v", err) } diff --git a/process/insert_test.go b/process/insert_test.go index 6d6e238a..2d022aa1 100644 --- a/process/insert_test.go +++ b/process/insert_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procInsert{} - _ Batcher = procInsert{} + _ Applier = procInsert{} + _ Batcher = procInsert{} + _ Streamer = procInsert{} ) var insertTests = []struct { diff --git a/process/ip_database.go b/process/ip_database.go index 7ea6a733..1fffeb40 100644 --- a/process/ip_database.go +++ b/process/ip_database.go @@ -1,5 +1,6 @@ //go:build !wasm +// todo(v1.0.0): remove this processor package process import ( @@ -86,14 +87,24 @@ func newProcIPDatabase(ctx context.Context, cfg config.Config) (p procIPDatabase return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procIPDatabase) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procIPDatabase) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procIPDatabase) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: ip_database: %v", err) + } else if !ok { + return capsule, nil + } + res := capsule.Get(p.Key).String() record, err := p.db.Get(res) if err != nil { diff --git a/process/join.go b/process/join.go index bcb5ccb7..12a3e182 100644 --- a/process/join.go +++ b/process/join.go @@ -56,14 +56,24 @@ func (p procJoin) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procJoin) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procJoin) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes encapsulated data with the processor. func (p procJoin) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: join: %v", err) + } else if !ok { + return capsule, nil + } + // data is processed by retrieving and iterating the // array (Key) containing string values and joining // each one with the separator string diff --git a/process/join_test.go b/process/join_test.go index e171ed7b..decf67bd 100644 --- a/process/join_test.go +++ b/process/join_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procJoin{} - _ Batcher = procJoin{} + _ Applier = procJoin{} + _ Batcher = procJoin{} + _ Streamer = procJoin{} ) var joinTests = []struct { diff --git a/process/jq.go b/process/jq.go index d9b46637..8629c6af 100644 --- a/process/jq.go +++ b/process/jq.go @@ -62,14 +62,24 @@ func (p procJQ) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procJQ) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procJQ) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes encapsulated data with the processor. func (p procJQ) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: jq: %v", err) + } else if !ok { + return capsule, nil + } + var i interface{} if err := gojson.Unmarshal(capsule.Data(), &i); err != nil { return capsule, fmt.Errorf("process: jq: %v", err) diff --git a/process/jq_test.go b/process/jq_test.go index 8fc2489a..1a4b12e5 100644 --- a/process/jq_test.go +++ b/process/jq_test.go @@ -8,6 +8,12 @@ import ( "github.com/brexhq/substation/config" ) +var ( + _ Applier = procJQ{} + _ Batcher = procJQ{} + _ Streamer = procJQ{} +) + var jsonTests = []struct { name string cfg config.Config diff --git a/process/kv_store.go b/process/kv_store.go index e3cf79c3..e4d7b296 100644 --- a/process/kv_store.go +++ b/process/kv_store.go @@ -117,14 +117,24 @@ func (p procKVStore) Close(ctx context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procKVStore) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procKVStore) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procKVStore) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: kv_store: %v", err) + } else if !ok { + return capsule, nil + } + switch p.Options.Type { case "get": key := capsule.Get(p.Key).String() diff --git a/process/math.go b/process/math.go index c67cf162..5ce77d42 100644 --- a/process/math.go +++ b/process/math.go @@ -75,14 +75,24 @@ func (p procMath) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procMath) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procMath) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procMath) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: math: %v", err) + } else if !ok { + return capsule, nil + } + var value float64 result := capsule.Get(p.Key) for i, res := range result.Array() { diff --git a/process/math_test.go b/process/math_test.go index 80c73702..34518684 100644 --- a/process/math_test.go +++ b/process/math_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procMath{} - _ Batcher = procMath{} + _ Applier = procMath{} + _ Batcher = procMath{} + _ Streamer = procMath{} ) var mathTests = []struct { diff --git a/process/pipeline.go b/process/pipeline.go index df8b57c8..462a030f 100644 --- a/process/pipeline.go +++ b/process/pipeline.go @@ -55,10 +55,14 @@ func (p procPipeline) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procPipeline) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procPipeline) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. @@ -73,6 +77,12 @@ func (p procPipeline) Batch(ctx context.Context, capsules ...config.Capsule) ([] // should be run through the forEach processor (which can // encapsulate the pipeline processor). func (p procPipeline) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: pipeline: %v", err) + } else if !ok { + return capsule, nil + } + if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key) if result.IsArray() { diff --git a/process/pipeline_test.go b/process/pipeline_test.go index a8207b34..a0532f1f 100644 --- a/process/pipeline_test.go +++ b/process/pipeline_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procPipeline{} - _ Batcher = procPipeline{} + _ Applier = procPipeline{} + _ Batcher = procPipeline{} + _ Streamer = procPipeline{} ) var pipelineTests = []struct { diff --git a/process/pretty_print.go b/process/pretty_print.go index 997853de..5311a462 100644 --- a/process/pretty_print.go +++ b/process/pretty_print.go @@ -93,6 +93,11 @@ func (p procPrettyPrint) Close(context.Context) error { return nil } +// Stream processes a pipeline of capsules with the processor. +func (p procPrettyPrint) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + // Batch processes one or more capsules with the processor. // // Applying prettyprint formatting is handled by the @@ -111,12 +116,9 @@ func (p procPrettyPrint) Batch(ctx context.Context, capsules ...config.Capsule) newCapsules := newBatch(&capsules) for _, capsule := range capsules { - ok, err := p.operator.Operate(ctx, capsule) - if err != nil { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { return nil, fmt.Errorf("process: pretty_print: %v", err) - } - - if !ok { + } else if !ok { newCapsules = append(newCapsules, capsule) continue } @@ -177,6 +179,12 @@ func (p procPrettyPrint) Batch(ctx context.Context, capsules ...config.Capsule) // this support is unnecessary for multi-line objects that // are stored in a single byte array. func (p procPrettyPrint) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: pretty_print: %v", err) + } else if !ok { + return capsule, nil + } + switch p.Options.Direction { case "to": capsule.SetData([]byte(capsule.Get(ppModifier).String())) diff --git a/process/pretty_print_test.go b/process/pretty_print_test.go index 8972a196..a4c8e34c 100644 --- a/process/pretty_print_test.go +++ b/process/pretty_print_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procPrettyPrint{} - _ Batcher = procPrettyPrint{} + _ Applier = procPrettyPrint{} + _ Batcher = procPrettyPrint{} + _ Streamer = procPrettyPrint{} ) var prettyPrintBatchTests = []struct { diff --git a/process/process.go b/process/process.go index 48783849..8fe1f841 100644 --- a/process/process.go +++ b/process/process.go @@ -44,6 +44,9 @@ func toString(i interface{}) string { case Batcher: b, _ := json.Marshal(v) return string(b) + case Streamer: + b, _ := json.Marshal(v) + return string(b) default: return "" } @@ -316,26 +319,138 @@ func newBatch(s *[]config.Capsule) []config.Capsule { return make([]config.Capsule, 0, 10) } -func batchApply(ctx context.Context, capsules []config.Capsule, app Applier, op condition.Operator) ([]config.Capsule, error) { +func batchApply(ctx context.Context, capsules []config.Capsule, app Applier) ([]config.Capsule, error) { newCapsules := newBatch(&capsules) for _, c := range capsules { - ok, err := op.Operate(ctx, c) + newCapsule, err := app.Apply(ctx, c) if err != nil { return nil, err } - if !ok { - newCapsules = append(newCapsules, c) - continue - } + newCapsules = append(newCapsules, newCapsule) + } - newCapsule, err := app.Apply(ctx, c) + return newCapsules, nil +} + +type Streamer interface { + Stream(context.Context, *config.Channel, *config.Channel) error + Close(context.Context) error +} + +// NewStreamer returns a configured Streamer from a processor configuration. +func NewStreamer(ctx context.Context, cfg config.Config) (Streamer, error) { //nolint: cyclop, gocyclo // ignore cyclomatic complexity + switch cfg.Type { + case "aggregate": + return newProcAggregate(ctx, cfg) + case "aws_dynamodb": + return newProcAWSDynamoDB(ctx, cfg) + case "aws_lambda": + return newProcAWSLambda(ctx, cfg) + case "base64": + return newProcBase64(ctx, cfg) + case "capture": + return newProcCapture(ctx, cfg) + case "case": + return newProcCase(ctx, cfg) + case "convert": + return newProcConvert(ctx, cfg) + case "copy": + return newProcCopy(ctx, cfg) + case "count": + return newProcCount(ctx, cfg) + case "delete": + return newProcDelete(ctx, cfg) + case "dns": + return newProcDNS(ctx, cfg) + case "domain": + return newProcDomain(ctx, cfg) + case "drop": + return newProcDrop(ctx, cfg) + case "expand": + return newProcExpand(ctx, cfg) + case "flatten": + return newProcFlatten(ctx, cfg) + case "for_each": + return newProcForEach(ctx, cfg) + case "group": + return newProcGroup(ctx, cfg) + case "gzip": + return newProcGzip(ctx, cfg) + case "hash": + return newProcHash(ctx, cfg) + case "http": + return newProcHTTP(ctx, cfg) + case "insert": + return newProcInsert(ctx, cfg) + case "ip_database": + return newProcIPDatabase(ctx, cfg) + case "join": + return newProcJoin(ctx, cfg) + case "jq": + return newProcJQ(ctx, cfg) + case "kv_store": + return newProcKVStore(ctx, cfg) + case "math": + return newProcMath(ctx, cfg) + case "pipeline": + return newProcPipeline(ctx, cfg) + case "pretty_print": + return newProcPrettyPrint(ctx, cfg) + case "replace": + return newProcReplace(ctx, cfg) + case "split": + return newProcSplit(ctx, cfg) + case "time": + return newProcTime(ctx, cfg) + default: + return nil, fmt.Errorf("process: new_streamer: type %q settings %+v: %v", cfg.Type, cfg.Settings, errors.ErrInvalidFactoryInput) + } +} + +// NewStreamers accepts one or more processor configurations and returns configured streamers. +func NewStreamers(ctx context.Context, cfg ...config.Config) ([]Streamer, error) { + var streamers []Streamer + + for _, c := range cfg { + s, err := NewStreamer(ctx, c) if err != nil { return nil, err } - newCapsules = append(newCapsules, newCapsule) + streamers = append(streamers, s) } - return newCapsules, nil + return streamers, nil +} + +// CloseStreamers closes all streamers and returns an error if any close fails. +func CloseStreamers(ctx context.Context, streamers ...Streamer) error { + for _, s := range streamers { + if err := s.Close(ctx); err != nil { + return err + } + } + + return nil +} + +func streamApply(ctx context.Context, in, out *config.Channel, app Applier) error { + defer out.Close() + + for c := range in.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + capsule, err := app.Apply(ctx, c) + if err != nil { + return err + } + + out.Send(capsule) + } + } + + return nil } diff --git a/process/process_test.go b/process/process_test.go index acc8cc47..e544d60f 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -6,22 +6,21 @@ import ( "testing" "github.com/brexhq/substation/config" + "golang.org/x/sync/errgroup" ) var processTests = []struct { name string - conf []config.Config + conf config.Config test []byte expected []byte }{ { "copy", - []config.Config{ - { - Type: "copy", - Settings: map[string]interface{}{ - "set_key": "foo", - }, + config.Config{ + Type: "copy", + Settings: map[string]interface{}{ + "set_key": "foo", }, }, []byte(`bar`), @@ -29,14 +28,12 @@ var processTests = []struct { }, { "insert", - []config.Config{ - { - Type: "insert", - Settings: map[string]interface{}{ - "set_key": "foo", - "options": map[string]interface{}{ - "value": "bar", - }, + config.Config{ + Type: "insert", + Settings: map[string]interface{}{ + "set_key": "foo", + "options": map[string]interface{}{ + "value": "bar", }, }, }, @@ -45,13 +42,11 @@ var processTests = []struct { }, { "gzip", - []config.Config{ - { - Type: "gzip", - Settings: map[string]interface{}{ - "options": map[string]interface{}{ - "direction": "from", - }, + config.Config{ + Type: "gzip", + Settings: map[string]interface{}{ + "options": map[string]interface{}{ + "direction": "from", }, }, }, @@ -60,13 +55,11 @@ var processTests = []struct { }, { "base64", - []config.Config{ - { - Type: "base64", - Settings: map[string]interface{}{ - "options": map[string]interface{}{ - "direction": "from", - }, + config.Config{ + Type: "base64", + Settings: map[string]interface{}{ + "options": map[string]interface{}{ + "direction": "from", }, }, }, @@ -75,16 +68,14 @@ var processTests = []struct { }, { "split", - []config.Config{ - { - Type: "split", - Settings: map[string]interface{}{ - "options": map[string]interface{}{ - "separator": ".", - }, - "key": "foo", - "set_key": "foo", + config.Config{ + Type: "split", + Settings: map[string]interface{}{ + "options": map[string]interface{}{ + "separator": ".", }, + "key": "foo", + "set_key": "foo", }, }, []byte(`{"foo":"bar.baz"}`), @@ -92,13 +83,11 @@ var processTests = []struct { }, { "pretty_print", - []config.Config{ - { - Type: "pretty_print", - Settings: map[string]interface{}{ - "options": map[string]interface{}{ - "direction": "to", - }, + config.Config{ + Type: "pretty_print", + Settings: map[string]interface{}{ + "options": map[string]interface{}{ + "direction": "to", }, }, }, @@ -110,16 +99,14 @@ var processTests = []struct { }, { "time", - []config.Config{ - { - Type: "time", - Settings: map[string]interface{}{ - "key": "foo", - "set_key": "foo", - "options": map[string]interface{}{ - "format": "unix", - "set_format": "2006-01-02T15:04:05.000000Z", - }, + config.Config{ + Type: "time", + Settings: map[string]interface{}{ + "key": "foo", + "set_key": "foo", + "options": map[string]interface{}{ + "format": "unix", + "set_format": "2006-01-02T15:04:05.000000Z", }, }, }, @@ -135,12 +122,12 @@ func TestApply(t *testing.T) { t.Run(test.name, func(t *testing.T) { capsule.SetData(test.test) - appliers, err := NewAppliers(ctx, test.conf...) + applier, err := NewApplier(ctx, test.conf) if err != nil { t.Error(err) } - result, err := Apply(ctx, capsule, appliers...) + result, err := Apply(ctx, capsule, applier) if err != nil { t.Error(err) } @@ -162,12 +149,12 @@ func TestBatch(t *testing.T) { batch := make([]config.Capsule, 1) batch[0] = capsule - appliers, err := NewBatchers(ctx, test.conf...) + batcher, err := NewBatcher(ctx, test.conf) if err != nil { t.Error(err) } - result, err := Batch(ctx, batch, appliers...) + result, err := Batch(ctx, batch, batcher) if err != nil { t.Error(err) } @@ -178,3 +165,45 @@ func TestBatch(t *testing.T) { }) } } + +func TestStream(t *testing.T) { + capsule := config.NewCapsule() + + for _, test := range processTests { + t.Run(test.name, func(t *testing.T) { + group, ctx := errgroup.WithContext(context.TODO()) + in, out := config.NewChannel(), config.NewChannel() + + streamer, err := NewStreamer(ctx, test.conf) + if err != nil { + t.Error(err) + } + + group.Go(func() error { + if err := streamer.Stream(ctx, in, out); err != nil { + panic(err) + } + + return nil + }) + + group.Go(func() error { + for capsule := range out.C { + if !bytes.Equal(capsule.Data(), test.expected) { + t.Errorf("expected %v, got %v", test.expected, capsule.Data()) + } + } + + return nil + }) + + capsule.SetData(test.test) + in.Send(capsule) + in.Close() + + if err := group.Wait(); err != nil { + panic(err) + } + }) + } +} diff --git a/process/replace.go b/process/replace.go index f85367ca..4827f818 100644 --- a/process/replace.go +++ b/process/replace.go @@ -63,14 +63,24 @@ func (p procReplace) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procReplace) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procReplace) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procReplace) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: replace: %v", err) + } else if !ok { + return capsule, nil + } + // JSON processing if p.Key != "" && p.SetKey != "" { result := capsule.Get(p.Key).String() diff --git a/process/replace_test.go b/process/replace_test.go index a68283f4..ae4746c0 100644 --- a/process/replace_test.go +++ b/process/replace_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procReplace{} - _ Batcher = procReplace{} + _ Applier = procReplace{} + _ Batcher = procReplace{} + _ Streamer = procReplace{} ) var replaceTests = []struct { diff --git a/process/split.go b/process/split.go index df70f70c..895c3c1b 100644 --- a/process/split.go +++ b/process/split.go @@ -53,17 +53,18 @@ func (p procSplit) Close(context.Context) error { return nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procSplit) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procSplit) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { newCapsules := newBatch(&capsules) for _, capsule := range capsules { - ok, err := p.operator.Operate(ctx, capsule) - if err != nil { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { return nil, fmt.Errorf("process: split: %v", err) - } - - if !ok { + } else if !ok { newCapsules = append(newCapsules, capsule) continue } diff --git a/process/split_test.go b/process/split_test.go index 2073f710..db469513 100644 --- a/process/split_test.go +++ b/process/split_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procSplit{} - _ Batcher = procSplit{} + _ Applier = procSplit{} + _ Batcher = procSplit{} + _ Streamer = procSplit{} ) var splitTests = []struct { diff --git a/process/time.go b/process/time.go index 7d4e7615..e6c63d53 100644 --- a/process/time.go +++ b/process/time.go @@ -83,14 +83,24 @@ func newProcTime(ctx context.Context, cfg config.Config) (p procTime, err error) return p, nil } -// Batch processes one or more capsules with the processor. Conditions are -// optionally applied to the data to enable processing. +// Stream processes a pipeline of capsules with the processor. +func (p procTime) Stream(ctx context.Context, in, out *config.Channel) error { + return streamApply(ctx, in, out, p) +} + +// Batch processes one or more capsules with the processor. func (p procTime) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) { - return batchApply(ctx, capsules, p, p.operator) + return batchApply(ctx, capsules, p) } // Apply processes a capsule with the processor. func (p procTime) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) { + if ok, err := p.operator.Operate(ctx, capsule); err != nil { + return capsule, fmt.Errorf("process: time: %v", err) + } else if !ok { + return capsule, nil + } + // "now" processing, supports json and data if p.Options.Format == "now" { ts := time.Now() diff --git a/process/time_test.go b/process/time_test.go index fa9e1a10..ef22b789 100644 --- a/process/time_test.go +++ b/process/time_test.go @@ -9,8 +9,9 @@ import ( ) var ( - _ Applier = procTime{} - _ Batcher = procTime{} + _ Applier = procTime{} + _ Batcher = procTime{} + _ Streamer = procTime{} ) var setFmt = "2006-01-02T15:04:05.000000Z"