Nearly every streaming analytics system stores processed data somewhere for further analysis, historical reference, or integration with BI tools.
In this example project, we incorporate relational data stores. We stream data into SQLite and DuckDB, but these examples can be altered to work with MySQL, PostgreSQL, MongoDB, and more.
We use one producer that can write up to four different sinks:
- to a file
- to a Kafka topic (set in .env)
- to a SQLite database
- to a DuckDB database
In data pipelines:
- A source generates records (our message generator).
- A sink stores or forwards them (file, Kafka, SQLite, DuckDB).
- An emitter is a small function that takes data from a source and writes it into a sink.
- Each emitter has one job (
emit_messageto the specified sink).
Explore the code to see which aspects are common to all sinks and which parts are unique.
Before starting, ensure you have completed the setup tasks in https://github.com/denisecase/buzzline-01-case and https://github.com/denisecase/buzzline-02-case first. Python 3.11 is required.
- Once the tools are installed, copy/fork this project into your GitHub account and create your own version of this project to run and experiment with.
- Name it
buzzline-05-yournamewhere yourname is something unique to you.
Additional information about our standard professional Python project workflow is available at https://github.com/denisecase/pro-analytics-01.
Launch WSL. Open a PowerShell terminal in VS Code. Run the following command:
wslYou should now be in a Linux shell (prompt shows something like username@DESKTOP:.../repo-name$).
Do all steps related to starting Kafka in this WSL window.
In P2, you downloaded, installed, configured a local Kafka service. Before starting, run a short prep script to ensure Kafka has a persistent data directory and meta.properties set up. This step works on WSL, macOS, and Linux - be sure you have the $ prompt and you are in the root project folder.
- Make sure the script is executable.
- Run the shell script to set up Kafka.
- Cd (change directory) to the kafka directory.
- Start the Kafka server in the foreground. Keep this terminal open - Kafka will run here.
chmod +x scripts/prepare_kafka.sh
scripts/prepare_kafka.sh
cd ~/kafka
bin/kafka-server-start.sh config/kraft/server.propertiesKeep this terminal open! Kafka is running and needs to stay active.
For detailed instructions, see SETUP_KAFKA from Project 2.
Open your project in VS Code and use the commands for your operating system to:
- Create a Python virtual environment.
- Activate the virtual environment.
- Upgrade pip and key tools.
- Install from requirements.txt.
Open a new PowerShell terminal in VS Code (Terminal / New Terminal / PowerShell). Python 3.11 is required for Apache Kafka.
py -3.11 -m venv .venv
.\.venv\Scripts\Activate.ps1
py -m pip install --upgrade pip wheel setuptools
py -m pip install --upgrade -r requirements.txtIf you get execution policy error, run this first:
Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
Open a new terminal in VS Code (Terminal / New Terminal)
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install --upgrade pip
python3 -m pip install --upgrade -r requirements.txtIn the same terminal used for Task 2, we'll run some tests to ensure that all four emitters are working fine on your machine. All tests should pass if everything is installed and set up correctly.
pytest -vThen run the verify_emitters.py script as a module to check that we can emit to all four types.
For the Kakfa sink to work, the Kafka service must be running.
py -m verify_emitterspython3 -m verify_emittersThis will take two terminals:
- One to run the producer which writes messages using various emitters.
- Another to run each consumer.
Start the producer to generate the messages.
The existing producer writes messages to a live data file in the data folder. If the Kafka service is running, it will try to write the messages to a Kafka topic as well. For configuration details, see the .env file.
In VS Code, open a NEW terminal. Use the commands below to activate .venv, and start the producer.
Windows:
.\.venv\Scripts\Activate.ps1
py -m producers.producer_caseMac/Linux:
source .venv/bin/activate
python3 -m producers.producer_caseNOTE: The producer will still work if the Kafka service is not available.
Start an associated consumer. You have options.
- Start the consumer that reads from the live data file.
- Start the consumer that reads from the Kafka topic.
- Start the consumer that reads from the SQLite relational data store.
- Start the consumer that reads from the DuckDB relational data store.
In VS Code, open a NEW terminal in your root project folder. Use the commands below to activate .venv, and start the consumer.
Windows:
.\.venv\Scripts\Activate.ps1
py -m consumers.kafka_consumer_case
OR
py -m consumers.file_consumer_case
OR
py -m consumers.sqlite_consumer_case
OR
py -m consumers.duckdb_consumer_caseMac/Linux:
source .venv/bin/activate
python3 -m consumers.kafka_consumer_case
OR
python3 -m consumers.file_consumer_case
OR
python3 -m consumers.sqlite_consumer_case
OR
python3 -m consumers.duckdb_consumer_caseReview the requirements.txt file.
- What - if any - new requirements do we need for this project?
- Note that requirements.txt now lists both kafka-python and six.
- What are some common dependencies as we incorporate data stores into our streaming pipelines?
Review the .env file with the environment variables.
- Why is it helpful to put some settings in a text file?
- As we add database access and passwords, we start to keep two versions:
- .env
- .env.example
- Read the notes in those files - which one is typically NOT added to source control?
- How do we ignore a file so it doesn't get published in GitHub (hint: .gitignore)
Review the .gitignore file.
- What new entry has been added?
Review the code for the producer and the two consumers.
- Understand how the information is generated by the producer.
- Understand how the different consumers read, process, and store information in a data store?
Compare the consumer that reads from a live data file and the consumer that reads from a Kafka topic.
- Which functions are the same for both?
- Which parts are different?
What files are in the utils folder?
- Why bother breaking functions out into utility modules?
- Would similar streaming projects be likely to take advantage of any of these files?
What files are in the producers folder?
- How do these compare to earlier projects?
- What has been changed?
- What has stayed the same?
What files are in the consumers folder?
- This is where the processing and storage takes place.
- Why did we make a separate file for reading from the live data file vs reading from the Kafka file?
- What functions are in each?
- Are any of the functions duplicated?
- Can you refactor the project so we could write a duplicated function just once and reuse it?
- What functions are in the sqlite script?
- What functions might be needed to initialize a different kind of data store?
- What functions might be needed to insert a message into a different kind of data store?
- Did you run the kafka consumer or the live file consumer? Why?
- Can you use the examples to add a database to your own streaming applications?
- What parts are most interesting to you?
- What parts are most challenging?
Windows PowerShell
# count rows
duckdb .\data\buzz.duckdb "SELECT COUNT(*) FROM streamed_messages;"
# peek
duckdb .\data\buzz.duckdb "SELECT * FROM streamed_messages ORDER BY id DESC LIMIT 10;"
# live analytics
duckdb .\data\buzz.duckdb "SELECT category, AVG(sentiment) FROM streamed_messages GROUP BY category ORDER BY AVG(sentiment) DESC;"macOS/Linux/WSL
# count rows
duckdb data/buzz.duckdb -c "SELECT COUNT(*) FROM streamed_messages;"
# peek
duckdb data/buzz.duckdb -c "SELECT author, COUNT(*) c FROM streamed_messages GROUP BY author ORDER BY c DESC;"
# live analytics
duckdb data/buzz.duckdb -c "SELECT category, AVG(sentiment) FROM streamed_messages GROUP BY category ORDER BY AVG(sentiment) DESC;"
To kill the terminal, hit CTRL c (hold both CTRL key and c key down at the same time).
When resuming work on this project:
- Open the project repository folder in VS Code.
- Start the Kafka service (use WSL if Windows) and keep the terminal running.
- Activate your local project virtual environment (.venv) in your OS-specific terminal.
- Run
git pullto get any changes made from the remote repo (on GitHub).
- Git add everything to source control (
git add .) - Git commit with a -m message.
- Git push to origin main.
git add .
git commit -m "your message in quotes"
git push -u origin mainTo save disk space, you can delete the .venv folder when not actively working on this project. You can always recreate it, activate it, and reinstall the necessary packages later. Managing Python virtual environments is a valuable skill.
This project is licensed under the MIT License as an example project. You are encouraged to fork, copy, explore, and modify the code as you like. See the LICENSE file for more.
- Black Formatter by Microsoft
- Markdown All in One by Yu Zhang
- PowerShell by Microsoft (on Windows Machines)
- Python by Microsoft
- Python Debugger by Microsoft
- Ruff by Astral Software (Linter + Formatter)
- SQLite Viewer by Florian Klampfer
- WSL by Microsoft (on Windows Machines)