feat: Add Stream Transform & Streamer Interface#106
Merged
Conversation
julieagnessparks
approved these changes
Apr 27, 2023
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
examples/streaming/)internal/transformprocess/aggregateto use a private method for aggregating dataMotivation and Context
This PR adds a third type of data transformation called "streaming" (or "stream") that uses channels to process data. This solves a couple problems:
The second problem is the most impactful because it improves how users run our default ITL application. With stream support the application creates a concurrency pipeline for data processing, so data is always sent to the next processor in series whenever the processor is ready to accept more data. This is different from the batch transform, where data must always be sent to the next processor as a group; batching data can create unintentional bottlenecks in the system depending on the configuration. Eventually stream processing should be the default settings for all non-transfer data processing use cases.
This PR also adds condition support to the processor Applier interface, which means that the
Applymethod now checks if the data passes a condition before processing. This simplifies some of the batching code and should reduce user confusion (e.g., "why is the configured condition not working?"), but also solves a problem we have with meta-processors likeprocess/pipelinenot using configured conditions.How Has This Been Tested?
Added and updated unit tests, integration tested on a high-volume production deployment. Here's some real-world evidence from AWS X-Ray of how the streaming transformation differs from batch transformation:
Streaming

Batch

Notice how in the streaming screencap the Kinesis PutRecord calls occur before any DynamoDB GetItem calls -- that's because the concurrency pipeline is continuously sending data to the sink instead of waiting for all data to be processed first. In the batching screencap, all data processing (i.e., calls to DynamoDB GetItem) must complete before sending data to the sink.
Types of changes
Checklist: