Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Thank you so much for your interest in contributing to Substation! This document

[Development](#development)
+ [Development Environment](#development-environment)
+ [Messages](#messages)
+ [Conditions](#conditions)
+ [Transforms](#transforms)
+ [Testing](#testing)
Expand Down Expand Up @@ -49,12 +48,6 @@ Enhancements should be submitted as issues using the issue template.

The project supports development through the use of [Visual Studio Code configurations](https://code.visualstudio.com/docs/remote/containers). The VS Code [development container](.devcontainer/Dockerfile) contains all packages required to develop and test changes locally before submitting pull requests.

### [Messages](message/)

Each message can have a series of flags attached to it that are used to determine how the message should be processed by the system. These flags are exported as iota constants and should use verb style naming, such as:
- `IsControl`
- `SkipMissingValues`

### [Conditions](condition/)

Each condition should be functional and solve a single problem, and each one is nested under a "family" of conditions. (We may ask that you split complex condition logic into multiple conditions.) For example, there is a family for string comparisons:
Expand Down
4 changes: 2 additions & 2 deletions cmd/aws/lambda/substation/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest)

b := []byte(request.Body)
msg := []*message.Message{
message.New().SetData(b).SetMetadata(metadata).SkipMissingValues(),
message.New().SetData(b).SetMetadata(metadata),
message.New().AsControl(),
}

Expand All @@ -61,7 +61,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest)
// Convert transformed messages to a JSON array.
var output []json.RawMessage
for _, msg := range res {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/data_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func firehoseHandler(ctx context.Context, event events.KinesisFirehoseEvent) (ev
return resp, err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(record.Data).SetMetadata(metadata)
res, err := sub.Transform(ctx, msg)
if err != nil {
return resp, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
// "before": { ... },
// "after": { ... }
// }
msg := message.New().SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetMetadata(metadata)
if err := msg.SetValue("source.ts_ms", record.Change.ApproximateCreationDateTime.Time.UnixMilli()); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error
return err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(record.Data).SetMetadata(metadata)
ch.Send(msg)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/aws/lambda/substation/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag

// Data and ctrl messages are sent as a group.
msg := []*message.Message{
message.New().SetData(evt).SkipMissingValues(),
message.New().SetData(evt),
message.New().AsControl(),
}

Expand All @@ -44,7 +44,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag
// Convert transformed messages to a JSON array.
var output []json.RawMessage
for _, msg := range res {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/aws/lambda/substation/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
return err
}

msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(r).SetMetadata(metadata)
ch.Send(msg)

return nil
Expand All @@ -180,7 +180,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(b).SetMetadata(metadata)

ch.Send(msg)
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
return err
}

msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(r).SetMetadata(metadata)
ch.Send(msg)

return nil
Expand All @@ -357,7 +357,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(b).SetMetadata(metadata)

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func snsHandler(ctx context.Context, event events.SNSEvent) error {

for _, record := range event.Records {
b := []byte(record.SNS.Message)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(b).SetMetadata(metadata)

ch.Send(msg)
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/aws/lambda/substation/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error {

for _, record := range event.Records {
b := []byte(record.Body)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()

msg := message.New().SetData(b).SetMetadata(metadata)
ch.Send(msg)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/substation/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ partially normalized to the Elastic Common Schema (ECS).
}

msgs := []*message.Message{
message.New().SetData([]byte(evtDemo)).SkipMissingValues(),
message.New().SetData([]byte(evtDemo)),
message.New().AsControl(),
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/substation/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func handleTest(w http.ResponseWriter, r *http.Request) {

testPassed := true
for _, msg := range tMsgs {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
continue
}

Expand Down Expand Up @@ -330,7 +330,7 @@ func handleRun(w http.ResponseWriter, r *http.Request) {
}

msgs := []*message.Message{
message.New().SetData([]byte(request.Input)).SkipMissingValues(),
message.New().SetData([]byte(request.Input)),
message.New().AsControl(),
}

Expand All @@ -342,7 +342,7 @@ func handleRun(w http.ResponseWriter, r *http.Request) {

var output []string
for _, msg := range result {
if !msg.HasFlag(message.IsControl) {
if !msg.IsControl() {
output = append(output, string(msg.Data()))
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/substation/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func read(cfg customConfig, f *os.File) error {
return err
}

msg := message.New().SetData(r).SkipMissingValues()
msg := message.New().SetData(r)
ch.Send(msg)

return nil
Expand All @@ -264,7 +264,7 @@ func read(cfg customConfig, f *os.File) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b).SkipMissingValues()
msg := message.New().SetData(b)

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substation/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func tapKinesis(arg string, extVars map[string]string, offset, stream string) er
return err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
msg := message.New().SetData(record.Data).SetMetadata(metadata)
ch.Send(msg)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/substation/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func testFile(arg string, extVars map[string]string) error {

for _, msg := range tMsgs {
// Skip control messages because they contain no data.
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion condition/format_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type formatJSON struct {
}

func (c *formatJSON) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/format_mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type formatMIME struct {
}

func (c *formatMIME) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_global_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPGlobalUnicast struct {
}

func (insp *networkIPGlobalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_link_local_multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLinkLocalMulticast struct {
}

func (insp *networkIPLinkLocalMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_link_local_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLinkLocalUnicast struct {
}

func (insp *networkIPLinkLocalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_loopback.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLoopback struct {
}

func (insp *networkIPLoopback) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPMulticast struct {
}

func (insp *networkIPMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPPrivate struct {
}

func (insp *networkIPPrivate) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPUnicast struct {
}

func (insp *networkIPUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_unspecified.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPUnspecified struct {
}

func (insp *networkIPUnspecified) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_valid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPValid struct {
}

func (insp *networkIPValid) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_and.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseAND struct {
}

func (insp *numberBitwiseAND) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_not.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseNOT struct {
}

func (insp *numberBitwiseNOT) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_or.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseOR struct {
}

func (insp *numberBitwiseOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseXOR struct {
}

func (insp *numberBitwiseXOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
3 changes: 2 additions & 1 deletion condition/number_equal_to.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type numberEqualTo struct {
}

func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}
compare := insp.conf.Value
Expand All @@ -50,6 +50,7 @@ func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message)
}

target := msg.GetValue(insp.conf.Object.TargetKey)

if target.Exists() {
compare = target.Float()
}
Expand Down
2 changes: 1 addition & 1 deletion condition/number_greater_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type numberGreaterThan struct {
}

func (insp *numberGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_equal_to.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthEqualTo struct {
}

func (insp *numberLengthEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_greater_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthGreaterThan struct {
}

func (insp *numberLengthGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_less_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthLessThan struct {
}

func (insp *numberLengthLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.HasFlag(message.IsControl) {
if msg.IsControl() {
return false, nil
}

Expand Down
Loading
Loading