Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions benchmarks/sql-benchmark/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Benchmarks of Kaskada vs. SQL

Use the `generate.py` to create test data:

```bash
python generate.py --users 500 --items 500 --purchases 10000 --page_views 5000 --reviews 2500
```

This will generate two Parquet files in the current directory.

## Kaskada (Fenl)

Run the statements in the `kaskada.ipynb` using the latest Fenl-supporting Python client.
The RPCs should return the "query time" within each result table.

## DuckDB

Run the queries from `queries_duckdb.sql` one at a time.
With the `enable_profiling` pragma each should report the execution time.

## DataFusion

The SQL statements aren't ready yet -- they run, but we haven't figured out how to write the results to Parquet to measure end-to-end time.
134 changes: 123 additions & 11 deletions benchmarks/sql-benchmark/kaskada.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
"source": [
"from kaskada import table\n",
"\n",
"# Change this to the (relative) path to the generated input.\n",
"INPUT_PATH = '6x_1'\n",
"\n",
"try:\n",
" view.delete_view('ReviewsByItem')\n",
"except:\n",
Expand All @@ -75,9 +78,9 @@
" grouping_id = 'user',\n",
")\n",
"\n",
"table.load_dataframe(\n",
"table.load(\n",
" table_name = 'Purchases',\n",
" dataframe = Purchases,\n",
" file = f'{INPUT_PATH}/purchases.parquet',\n",
")\n",
"\n",
"try:\n",
Expand All @@ -92,9 +95,9 @@
" grouping_id = 'user',\n",
")\n",
"\n",
"table.load_dataframe(\n",
"table.load(\n",
" table_name = 'PageViews',\n",
" dataframe = PageViews,\n",
" file = f'{INPUT_PATH}/page_views.parquet',\n",
")\n",
"\n",
"try:\n",
Expand All @@ -103,15 +106,15 @@
" pass\n",
"\n",
"table.create_table(\n",
" table_name = 'Ratings',\n",
" table_name = 'Reviews',\n",
" time_column_name = 'time',\n",
" entity_key_column_name = 'user',\n",
" grouping_id = 'user',\n",
")\n",
"\n",
"table.load_dataframe(\n",
" table_name = 'Ratings',\n",
" dataframe = Ratings,\n",
"table.load(\n",
" table_name = 'Reviews',\n",
" file = f'{INPUT_PATH}/reviews.parquet',\n",
")"
]
},
Expand All @@ -129,9 +132,8 @@
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=aggregate\n",
"%%fenl --var=aggregate_history\n",
"{ \n",
" amount: Purchases.amount,\n",
" sum_amount: sum(Purchases.amount)\n",
"}"
]
Expand All @@ -144,6 +146,18 @@
"# Aggregation / Snapshot"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=aggregate_snapshot --result-behavior final-results\n",
"{ \n",
" sum_amount: sum(Purchases.amount)\n",
"}"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand All @@ -152,6 +166,20 @@
"# Windowed Aggregation / History"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=windowed_history\n",
"{ \n",
" sum_amount: sum(Purchases.amount, window=since(monthly()))\n",
" # Hack to work-around https://github.com/kaskada-ai/kaskada/issues/297\n",
" | if({ tick: monthly(), input: is_valid($input)} | not($input.tick | else(false)))\n",
"}"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand All @@ -160,6 +188,20 @@
"# Windowed Aggregation / Snapshot"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=windowed_snapshot --result-behavior final-results\n",
"{ \n",
" sum_amount: sum(Purchases.amount, window=since(monthly()))\n",
" # Hack to work-around https://github.com/kaskada-ai/kaskada/issues/297\n",
" | if({ tick: monthly(), input: is_valid($input)} | not($input.tick | else(false)))\n",
"}"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand All @@ -168,18 +210,88 @@
"# Data-Defined Windowed Aggregation / History"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=data_defined_history\n",
"let page_views_since_purchase_raw = count(PageViews, window=since(is_valid(Purchases)))\n",
" | when(is_valid(Purchases))\n",
"in {\n",
" result: page_views_since_purchase_raw | mean()\n",
"}"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Temporal Join / Snapshot"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=reviews_by_item\n",
"let reviews_by_item = Reviews | with_key($input.item)\n",
"in { average_score: reviews_by_item.rating | mean() }"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from kaskada import view\n",
"\n",
"try:\n",
" view.delete_view('ReviewsByItem')\n",
"except:\n",
" pass\n",
"\n",
"view.create_view(\n",
" view_name = 'ReviewsByItem',\n",
" expression = reviews_by_item.expression,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%fenl --var=temporal_join_snapshot --result-behavior final-results\n",
"{\n",
" average_review: lookup(Purchases.item, ReviewsByItem.average_score)\n",
"}"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python"
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
},
"orig_nbformat": 4
},
Expand Down
119 changes: 119 additions & 0 deletions benchmarks/sql-benchmark/queries_datafusion.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
-- Run with `datafusion-cli --data-path <directory>`.
-- Then send these commands.

-- Load the data.
-- These tables weren't usable, so just used inline definitions.
--
-- CREATE EXTERNAL TABLE Purchases
-- STORED AS parquet
-- LOCATION 'purchases.parquet';

-- CREATE EXTERNAL TABLE Reviews
-- STORED AS parquet
-- LOCATION 'reviews.parquet';

-- CREATE EXTERNAL TABLE PageViews
-- STORED AS parquet
-- LOCATION 'page_views.parquet';

-- Aggregation / History
COPY (SELECT
user,
time,
SUM(amount) OVER (
PARTITION BY user
ORDER BY time
)
FROM 'purchases.parquet') TO 'output/agg_history_df.parquet';

-- Aggregation / Snapshot
COPY (SELECT
user,
SUM(amount)
FROM Purchases
GROUP BY user) TO 'output/agg_snapshot_duckdb.parquet';

-- Time-Windowed Aggregation / History
COPY (SELECT
user,
time,
sum(amount) OVER (
PARTITION BY
user,
time_bucket(INTERVAL '1 month', time)
ORDER BY time
)
FROM Purchases
ORDER BY time) TO 'output/windowed_history_duckdb.parquet';

-- Time-Windowed Aggregation / Snapshot
COPY (SELECT
user,
sum(amount)
FROM Purchases
WHERE time_bucket(INTERVAL '1 month', time) >= time_bucket(INTERVAL '1 month', DATE '2022-05-03')
GROUP BY user) TO 'output/windowed_snapshot_duckdb.parquet';

-- Data-Defined Windowed Aggregation / History
COPY (WITH activity AS (
(SELECT user, time, 1 as is_page_view FROM PageViews)
UNION
(SELECT user, time, 0 as is_page_view FROM Purchases)
), purchase_counts AS (
SELECT
user, time, is_page_view,
SUM(CASE WHEN is_page_view = 0 THEN 1 ELSE 0 END)
OVER (PARTITION BY user ORDER BY time) AS purchase_count
FROM activity
), page_views_since_purchase AS (
SELECT
user, time,
SUM(CASE WHEN is_page_view = 1 THEN 1 ELSE 0 END)
OVER (PARTITION BY user, purchase_count ORDER BY time) AS views
FROM purchase_counts
)
SELECT user, time,
AVG(views) OVER (PARTITION BY user ORDER BY time)
as avg_views_since_purchase
FROM page_views_since_purchase
ORDER BY time) TO 'output/data_defined_history_duckdb.parquet';

-- Temporal Join / Snapshot [Spline]
--
-- Not reported -- ASOF join is more efficient.
COPY (WITH review_avg AS (
SELECT item, time,
AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score
FROM Reviews
), review_times AS (
SELECT item, review_avg.time AS time, review_avg.time AS r_time,
CAST(NULL AS TIMESTAMP) as p_time
FROM review_avg
), purchase_times AS (
SELECT item, Purchases.time as time, Purchases.time as p_time,
CAST(NULL AS TIMESTAMP) AS r_time,
FROM Purchases
), all_times AS (
(SELECT * FROM review_times) UNION (SELECT * FROM purchase_times)
), spline AS (
SELECT item, time, max(r_time) OVER w AS last_r_time,
FROM all_times
WINDOW w AS (PARTITION BY item ORDER BY time)
)
SELECT user, Purchases.time, avg_score
FROM Purchases
LEFT JOIN spline
ON Purchases.time = spline.time AND Purchases.item = spline.item
LEFT JOIN review_avg
ON spline.last_r_time = review_avg.time
AND Purchases.item = review_avg.item) TO 'output/temporal_join_spline_snapshot_duckdb.parquet';

-- Temporal Join / Snapshot [ASOF Join]
COPY (WITH review_avg AS (
SELECT item, time,
AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score
FROM Reviews
)
SELECT p.user, p.time, r.avg_score
FROM review_avg r ASOF RIGHT JOIN Purchases p
ON p.item = r.item AND r.time >= p.time) TO 'output/temporal_join_asof_snapshot_duckdb.parquet';
Loading