diff --git a/cmd/aws/lambda/README.md b/cmd/aws/lambda/README.md index bb6b913c..100cd02f 100644 --- a/cmd/aws/lambda/README.md +++ b/cmd/aws/lambda/README.md @@ -13,6 +13,7 @@ This app handles ingest, transform, and load for data from these AWS services: * [Synchronous Invocation (Lambda)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html) * [S3](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html) * [S3 via SNS](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html) +* [S3 via SQS](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html) * [SNS](https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html) * [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) diff --git a/cmd/aws/lambda/substation/main.go b/cmd/aws/lambda/substation/main.go index 44941798..ff5db636 100644 --- a/cmd/aws/lambda/substation/main.go +++ b/cmd/aws/lambda/substation/main.go @@ -77,6 +77,8 @@ func main() { lambda.Start(s3Handler) case "AWS_S3_SNS": lambda.Start(s3SnsHandler) + case "AWS_S3_SQS": + lambda.Start(s3SqsHandler) case "AWS_SNS": lambda.Start(snsHandler) case "AWS_SQS": diff --git a/cmd/aws/lambda/substation/s3.go b/cmd/aws/lambda/substation/s3.go index 75afd686..dcc5bfb2 100644 --- a/cmd/aws/lambda/substation/s3.go +++ b/cmd/aws/lambda/substation/s3.go @@ -31,7 +31,6 @@ type s3Metadata struct { ObjectSize int64 `json:"objectSize"` } -//nolint:gocognit func s3Handler(ctx context.Context, event events.S3Event) error { // Retrieve and load configuration. conf, err := getConfig(ctx) @@ -91,7 +90,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error { return nil }) - // Data ingest + // Data ingest. File contents are downloaded and sent to the channel. group.Go(func() error { defer ch.Close() @@ -103,89 +102,101 @@ func s3Handler(ctx context.Context, event events.S3Event) error { c := s3.NewFromConfig(awsCfg) client := manager.NewDownloader(c) - for _, record := range event.Records { - // The S3 object key is URL encoded. - // - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html - objectKey, err := url.QueryUnescape(record.S3.Object.Key) - if err != nil { - return err - } + if err := processS3Event(ctx, ch, client, event); err != nil { + return err + } - m := s3Metadata{ - EventTime: record.EventTime, - BucketArn: record.S3.Bucket.Arn, - BucketName: record.S3.Bucket.Name, - ObjectKey: objectKey, - ObjectSize: record.S3.Object.Size, - } + return nil + }) - metadata, err := json.Marshal(m) - if err != nil { - return err - } + // Wait for all goroutines to complete. This includes the goroutines that are + // executing the transform functions. + if err := group.Wait(); err != nil { + return err + } - dst, err := os.CreateTemp("", "substation") - if err != nil { - return err - } - defer os.Remove(dst.Name()) - defer dst.Close() + return nil +} - if _, err := client.Download(ctx, dst, &s3.GetObjectInput{ - Bucket: &record.S3.Bucket.Name, - Key: &objectKey, - }); err != nil { - return err - } +func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { + // Retrieve and load configuration. + conf, err := getConfig(ctx) + if err != nil { + return err + } - // Determines if the file should be treated as text. - // Text files are decompressed by the bufio package - // (if necessary) and each line is sent as a separate - // message. All other files are sent as a single message. - mediaType, err := media.File(dst) - if err != nil { - return err - } + cfg := customConfig{} + if err := json.NewDecoder(conf).Decode(&cfg); err != nil { + return err + } - if _, err := dst.Seek(0, 0); err != nil { - return err + sub, err := substation.New(ctx, cfg.Config) + if err != nil { + return err + } + + ch := channel.New[*message.Message]() + group, ctx := errgroup.WithContext(ctx) + + // Data transformation. Transforms are executed concurrently using a worker pool + // managed by an errgroup. Each Message is processed in a separate goroutine. + group.Go(func() error { + tfGroup, tfCtx := errgroup.WithContext(ctx) + tfGroup.SetLimit(cfg.Concurrency) + + for message := range ch.Recv() { + select { + case <-ctx.Done(): + return ctx.Err() + default: } - // Unsupported media types are sent as binary data. - if !slices.Contains(bufio.MediaTypes, mediaType) { - r, err := io.ReadAll(dst) - if err != nil { + msg := message + tfGroup.Go(func() error { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := sub.Transform(tfCtx, msg); err != nil { return err } - msg := message.New().SetData(r).SetMetadata(metadata) - ch.Send(msg) - return nil - } + }) + } - scanner := bufio.NewScanner() - defer scanner.Close() + if err := tfGroup.Wait(); err != nil { + return err + } - if err := scanner.ReadFile(dst); err != nil { - return err - } + // CTRL messages flush the pipeline. This must be done + // after all messages have been processed. + ctrl := message.New().AsControl() + if _, err := sub.Transform(tfCtx, ctrl); err != nil { + return err + } - for scanner.Scan() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + return nil + }) + + // Data ingest. File contents are downloaded and sent to the channel. + group.Go(func() error { + defer ch.Close() + + awsCfg, err := iconfig.NewAWS(ctx, iconfig.AWS{}) + if err != nil { + return err + } - b := []byte(scanner.Text()) - msg := message.New().SetData(b).SetMetadata(metadata) + c := s3.NewFromConfig(awsCfg) + client := manager.NewDownloader(c) - ch.Send(msg) + for _, record := range event.Records { + var s3Event events.S3Event + err := json.Unmarshal([]byte(record.SNS.Message), &s3Event) + if err != nil { + return err } - if err := scanner.Err(); err != nil { + if err := processS3Event(ctx, ch, client, s3Event); err != nil { return err } } @@ -202,8 +213,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error { return nil } -//nolint:gocognit -func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { +func s3SqsHandler(ctx context.Context, event events.SQSEvent) error { // Retrieve and load configuration. conf, err := getConfig(ctx) if err != nil { @@ -262,7 +272,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { return nil }) - // Data ingest. + // Data ingest. File contents are downloaded and sent to the channel. group.Go(func() error { defer ch.Close() @@ -276,105 +286,132 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { for _, record := range event.Records { var s3Event events.S3Event - err := json.Unmarshal([]byte(record.SNS.Message), &s3Event) - if err != nil { + + // S3 -> SQS + if err := json.Unmarshal([]byte(record.Body), &s3Event); err != nil { return err } - for _, record := range s3Event.Records { - // The S3 object key is URL encoded. - // - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html - objectKey, err := url.QueryUnescape(record.S3.Object.Key) - if err != nil { + if len(s3Event.Records) > 0 { + if err := processS3Event(ctx, ch, client, s3Event); err != nil { return err } - m := s3Metadata{ - record.EventTime, - record.S3.Bucket.Arn, - record.S3.Bucket.Name, - objectKey, - record.S3.Object.Size, - } - metadata, err := json.Marshal(m) - if err != nil { - return err - } + continue + } - dst, err := os.CreateTemp("", "substation") - if err != nil { - return err - } - defer os.Remove(dst.Name()) - defer dst.Close() + // S3 -> SNS -> SQS + var sns events.SNSEntity + if err := json.Unmarshal([]byte(record.Body), &sns); err != nil { + return err + } - if _, err := client.Download(ctx, dst, &s3.GetObjectInput{ - Bucket: &record.S3.Bucket.Name, - Key: &objectKey, - }); err != nil { - return err - } + if err := json.Unmarshal([]byte(sns.Message), &s3Event); err != nil { + return err + } - // Determines if the file should be treated as text. - // Text files are decompressed by the bufio package - // (if necessary) and each line is sent as a separate - // message. All other files are sent as a single message. - mediaType, err := media.File(dst) - if err != nil { - return err - } + if err := processS3Event(ctx, ch, client, s3Event); err != nil { + return err + } + } - if _, err := dst.Seek(0, 0); err != nil { - return err - } + return nil + }) - // Unsupported media types are sent as binary data. - if !slices.Contains(bufio.MediaTypes, mediaType) { - r, err := io.ReadAll(dst) - if err != nil { - return err - } + // Wait for all goroutines to complete. This includes the goroutines that are + // executing the transform functions. + if err := group.Wait(); err != nil { + return err + } - msg := message.New().SetData(r).SetMetadata(metadata) - ch.Send(msg) + return nil +} - return nil - } +func processS3Event(ctx context.Context, ch *channel.Channel[*message.Message], client *manager.Downloader, s3Event events.S3Event) error { + for _, record := range s3Event.Records { + // The S3 object key is URL encoded. + // + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html + objectKey, err := url.QueryUnescape(record.S3.Object.Key) + if err != nil { + return err + } - scanner := bufio.NewScanner() - defer scanner.Close() + m := s3Metadata{ + record.EventTime, + record.S3.Bucket.Arn, + record.S3.Bucket.Name, + objectKey, + record.S3.Object.Size, + } + metadata, err := json.Marshal(m) + if err != nil { + return err + } - if err := scanner.ReadFile(dst); err != nil { - return err - } + dst, err := os.CreateTemp("", "substation") + if err != nil { + return err + } + defer os.Remove(dst.Name()) + defer dst.Close() - for scanner.Scan() { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + if _, err := client.Download(ctx, dst, &s3.GetObjectInput{ + Bucket: &record.S3.Bucket.Name, + Key: &objectKey, + }); err != nil { + return err + } - b := []byte(scanner.Text()) - msg := message.New().SetData(b).SetMetadata(metadata) + // Determines if the file should be treated as text. + // Text files are decompressed by the bufio package + // (if necessary) and each line is sent as a separate + // message. All other files are sent as a single message. + mediaType, err := media.File(dst) + if err != nil { + return err + } - ch.Send(msg) - } + if _, err := dst.Seek(0, 0); err != nil { + return err + } - if err := scanner.Err(); err != nil { - return err - } + // Unsupported media types are sent as binary data. + if !slices.Contains(bufio.MediaTypes, mediaType) { + r, err := io.ReadAll(dst) + if err != nil { + return err } + + msg := message.New().SetData(r).SetMetadata(metadata) + ch.Send(msg) + + return nil } - return nil - }) + scanner := bufio.NewScanner() + defer scanner.Close() - // Wait for all goroutines to complete. This includes the goroutines that are - // executing the transform functions. - if err := group.Wait(); err != nil { - return err + if err := scanner.ReadFile(dst); err != nil { + return err + } + + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + b := []byte(scanner.Text()) + msg := message.New().SetData(b).SetMetadata(metadata) + + ch.Send(msg) + } + + if err := scanner.Err(); err != nil { + return err + } } return nil