diff --git a/benchmarks/sql-benchmark/README.MD b/benchmarks/sql-benchmark/README.MD new file mode 100644 index 000000000..f3ffbac70 --- /dev/null +++ b/benchmarks/sql-benchmark/README.MD @@ -0,0 +1,23 @@ +# Benchmarks of Kaskada vs. SQL + +Use the `generate.py` to create test data: + +```bash +python generate.py --users 500 --items 500 --purchases 10000 --page_views 5000 --reviews 2500 +``` + +This will generate two Parquet files in the current directory. + +## Kaskada (Fenl) + +Run the statements in the `kaskada.ipynb` using the latest Fenl-supporting Python client. +The RPCs should return the "query time" within each result table. + +## DuckDB + +Run the queries from `queries_duckdb.sql` one at a time. +With the `enable_profiling` pragma each should report the execution time. + +## DataFusion + +The SQL statements aren't ready yet -- they run, but we haven't figured out how to write the results to Parquet to measure end-to-end time. \ No newline at end of file diff --git a/benchmarks/sql-benchmark/kaskada.ipynb b/benchmarks/sql-benchmark/kaskada.ipynb index 4079f05c8..3bf1b71f7 100644 --- a/benchmarks/sql-benchmark/kaskada.ipynb +++ b/benchmarks/sql-benchmark/kaskada.ipynb @@ -58,6 +58,9 @@ "source": [ "from kaskada import table\n", "\n", + "# Change this to the (relative) path to the generated input.\n", + "INPUT_PATH = '6x_1'\n", + "\n", "try:\n", " view.delete_view('ReviewsByItem')\n", "except:\n", @@ -75,9 +78,9 @@ " grouping_id = 'user',\n", ")\n", "\n", - "table.load_dataframe(\n", + "table.load(\n", " table_name = 'Purchases',\n", - " dataframe = Purchases,\n", + " file = f'{INPUT_PATH}/purchases.parquet',\n", ")\n", "\n", "try:\n", @@ -92,9 +95,9 @@ " grouping_id = 'user',\n", ")\n", "\n", - "table.load_dataframe(\n", + "table.load(\n", " table_name = 'PageViews',\n", - " dataframe = PageViews,\n", + " file = f'{INPUT_PATH}/page_views.parquet',\n", ")\n", "\n", "try:\n", @@ -103,15 +106,15 @@ " pass\n", "\n", "table.create_table(\n", - " table_name = 'Ratings',\n", + " table_name = 'Reviews',\n", " time_column_name = 'time',\n", " entity_key_column_name = 'user',\n", " grouping_id = 'user',\n", ")\n", "\n", - "table.load_dataframe(\n", - " table_name = 'Ratings',\n", - " dataframe = Ratings,\n", + "table.load(\n", + " table_name = 'Reviews',\n", + " file = f'{INPUT_PATH}/reviews.parquet',\n", ")" ] }, @@ -129,9 +132,8 @@ "metadata": {}, "outputs": [], "source": [ - "%%fenl --var=aggregate\n", + "%%fenl --var=aggregate_history\n", "{ \n", - " amount: Purchases.amount,\n", " sum_amount: sum(Purchases.amount)\n", "}" ] @@ -144,6 +146,18 @@ "# Aggregation / Snapshot" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=aggregate_snapshot --result-behavior final-results\n", + "{ \n", + " sum_amount: sum(Purchases.amount)\n", + "}" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -152,6 +166,20 @@ "# Windowed Aggregation / History" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=windowed_history\n", + "{ \n", + " sum_amount: sum(Purchases.amount, window=since(monthly()))\n", + " # Hack to work-around https://github.com/kaskada-ai/kaskada/issues/297\n", + " | if({ tick: monthly(), input: is_valid($input)} | not($input.tick | else(false)))\n", + "}" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -160,6 +188,20 @@ "# Windowed Aggregation / Snapshot" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=windowed_snapshot --result-behavior final-results\n", + "{ \n", + " sum_amount: sum(Purchases.amount, window=since(monthly()))\n", + " # Hack to work-around https://github.com/kaskada-ai/kaskada/issues/297\n", + " | if({ tick: monthly(), input: is_valid($input)} | not($input.tick | else(false)))\n", + "}" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -168,6 +210,20 @@ "# Data-Defined Windowed Aggregation / History" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=data_defined_history\n", + "let page_views_since_purchase_raw = count(PageViews, window=since(is_valid(Purchases)))\n", + " | when(is_valid(Purchases))\n", + "in {\n", + " result: page_views_since_purchase_raw | mean()\n", + "}" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -175,11 +231,67 @@ "source": [ "# Temporal Join / Snapshot" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=reviews_by_item\n", + "let reviews_by_item = Reviews | with_key($input.item)\n", + "in { average_score: reviews_by_item.rating | mean() }" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from kaskada import view\n", + "\n", + "try:\n", + " view.delete_view('ReviewsByItem')\n", + "except:\n", + " pass\n", + "\n", + "view.create_view(\n", + " view_name = 'ReviewsByItem',\n", + " expression = reviews_by_item.expression,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%fenl --var=temporal_join_snapshot --result-behavior final-results\n", + "{\n", + " average_review: lookup(Purchases.item, ReviewsByItem.average_score)\n", + "}" + ] } ], "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, "language_info": { - "name": "python" + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" }, "orig_nbformat": 4 }, diff --git a/benchmarks/sql-benchmark/queries_datafusion.sql b/benchmarks/sql-benchmark/queries_datafusion.sql new file mode 100644 index 000000000..b8dc79db9 --- /dev/null +++ b/benchmarks/sql-benchmark/queries_datafusion.sql @@ -0,0 +1,119 @@ +-- Run with `datafusion-cli --data-path `. +-- Then send these commands. + +-- Load the data. +-- These tables weren't usable, so just used inline definitions. +-- +-- CREATE EXTERNAL TABLE Purchases +-- STORED AS parquet +-- LOCATION 'purchases.parquet'; + +-- CREATE EXTERNAL TABLE Reviews +-- STORED AS parquet +-- LOCATION 'reviews.parquet'; + +-- CREATE EXTERNAL TABLE PageViews +-- STORED AS parquet +-- LOCATION 'page_views.parquet'; + +-- Aggregation / History +COPY (SELECT + user, + time, + SUM(amount) OVER ( + PARTITION BY user + ORDER BY time + ) +FROM 'purchases.parquet') TO 'output/agg_history_df.parquet'; + +-- Aggregation / Snapshot +COPY (SELECT + user, + SUM(amount) +FROM Purchases +GROUP BY user) TO 'output/agg_snapshot_duckdb.parquet'; + +-- Time-Windowed Aggregation / History +COPY (SELECT + user, + time, + sum(amount) OVER ( + PARTITION BY + user, + time_bucket(INTERVAL '1 month', time) + ORDER BY time + ) +FROM Purchases +ORDER BY time) TO 'output/windowed_history_duckdb.parquet'; + +-- Time-Windowed Aggregation / Snapshot +COPY (SELECT + user, + sum(amount) +FROM Purchases +WHERE time_bucket(INTERVAL '1 month', time) >= time_bucket(INTERVAL '1 month', DATE '2022-05-03') +GROUP BY user) TO 'output/windowed_snapshot_duckdb.parquet'; + +-- Data-Defined Windowed Aggregation / History +COPY (WITH activity AS ( + (SELECT user, time, 1 as is_page_view FROM PageViews) + UNION + (SELECT user, time, 0 as is_page_view FROM Purchases) +), purchase_counts AS ( + SELECT + user, time, is_page_view, + SUM(CASE WHEN is_page_view = 0 THEN 1 ELSE 0 END) + OVER (PARTITION BY user ORDER BY time) AS purchase_count + FROM activity +), page_views_since_purchase AS ( + SELECT + user, time, + SUM(CASE WHEN is_page_view = 1 THEN 1 ELSE 0 END) + OVER (PARTITION BY user, purchase_count ORDER BY time) AS views + FROM purchase_counts +) +SELECT user, time, + AVG(views) OVER (PARTITION BY user ORDER BY time) + as avg_views_since_purchase +FROM page_views_since_purchase +ORDER BY time) TO 'output/data_defined_history_duckdb.parquet'; + +-- Temporal Join / Snapshot [Spline] +-- +-- Not reported -- ASOF join is more efficient. +COPY (WITH review_avg AS ( + SELECT item, time, + AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score + FROM Reviews +), review_times AS ( + SELECT item, review_avg.time AS time, review_avg.time AS r_time, + CAST(NULL AS TIMESTAMP) as p_time + FROM review_avg +), purchase_times AS ( + SELECT item, Purchases.time as time, Purchases.time as p_time, + CAST(NULL AS TIMESTAMP) AS r_time, + FROM Purchases +), all_times AS ( + (SELECT * FROM review_times) UNION (SELECT * FROM purchase_times) +), spline AS ( + SELECT item, time, max(r_time) OVER w AS last_r_time, + FROM all_times + WINDOW w AS (PARTITION BY item ORDER BY time) +) +SELECT user, Purchases.time, avg_score +FROM Purchases +LEFT JOIN spline + ON Purchases.time = spline.time AND Purchases.item = spline.item +LEFT JOIN review_avg + ON spline.last_r_time = review_avg.time + AND Purchases.item = review_avg.item) TO 'output/temporal_join_spline_snapshot_duckdb.parquet'; + +-- Temporal Join / Snapshot [ASOF Join] +COPY (WITH review_avg AS ( + SELECT item, time, + AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score + FROM Reviews +) +SELECT p.user, p.time, r.avg_score +FROM review_avg r ASOF RIGHT JOIN Purchases p +ON p.item = r.item AND r.time >= p.time) TO 'output/temporal_join_asof_snapshot_duckdb.parquet'; \ No newline at end of file diff --git a/benchmarks/sql-benchmark/queries_duckdb.sql b/benchmarks/sql-benchmark/queries_duckdb.sql index 6a6cdf8b4..9ea290e1c 100644 --- a/benchmarks/sql-benchmark/queries_duckdb.sql +++ b/benchmarks/sql-benchmark/queries_duckdb.sql @@ -1,11 +1,17 @@ PRAGMA enable_profiling; --- Create (L2.87s) +-- Temp directory is necessary for larger queries. +PRAGMA temp_directory='tmp'; + +-- Set this to the input directory to use +-- PRAGMA file_search_path='6x_1'; + +--Load the data CREATE TABLE Purchases AS SELECT * FROM read_parquet('purchases.parquet'); CREATE TABLE Reviews AS SELECT * FROM read_parquet('reviews.parquet'); CREATE TABLE PageViews AS SELECT * FROM read_parquet('page_views.parquet'); --- Aggregation / History (0.151s) +-- Aggregation / History COPY (SELECT user, time, @@ -13,16 +19,16 @@ COPY (SELECT PARTITION BY user ORDER BY time ) -FROM Purchases) TO 'agg_history_duckdb.parquet'; +FROM Purchases) TO 'output/agg_history_duckdb.parquet'; --- Aggregation / Snapshot (0.0114s) +-- Aggregation / Snapshot COPY (SELECT user, SUM(amount) FROM Purchases -GROUP BY user) TO 'agg_snapshot_duckdb.parquet'; +GROUP BY user) TO 'output/agg_snapshot_duckdb.parquet'; --- Time-Windowed Aggregation / History (0.178s) +-- Time-Windowed Aggregation / History COPY (SELECT user, time, @@ -33,17 +39,17 @@ COPY (SELECT ORDER BY time ) FROM Purchases -ORDER BY time) TO 'windowed_history_duckdb.parquet'; +ORDER BY time) TO 'output/windowed_history_duckdb.parquet'; --- Time-Windowed Aggregation / Snapshot (0.0219s) +-- Time-Windowed Aggregation / Snapshot COPY (SELECT user, sum(amount) FROM Purchases WHERE time_bucket(INTERVAL '1 month', time) >= time_bucket(INTERVAL '1 month', DATE '2022-05-03') -GROUP BY user) TO 'windowed_snapshot_duckdb.parquet'; +GROUP BY user) TO 'output/windowed_snapshot_duckdb.parquet'; --- Data-Defined Windowed Aggregation / History (2.92s) +-- Data-Defined Windowed Aggregation / History COPY (WITH activity AS ( (SELECT user, time, 1 as is_page_view FROM PageViews) UNION @@ -65,9 +71,11 @@ SELECT user, time, AVG(views) OVER (PARTITION BY user ORDER BY time) as avg_views_since_purchase FROM page_views_since_purchase -ORDER BY time) TO 'data_defined_history_duckdb.parquet'; +ORDER BY time) TO 'output/data_defined_history_duckdb.parquet'; --- Temporal Join / Snapshot [Spline] (0.208s) +-- Temporal Join / Snapshot [Spline] +-- +-- Not reported -- ASOF join is more efficient. COPY (WITH review_avg AS ( SELECT item, time, AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score @@ -93,9 +101,9 @@ LEFT JOIN spline ON Purchases.time = spline.time AND Purchases.item = spline.item LEFT JOIN review_avg ON spline.last_r_time = review_avg.time - AND Purchases.item = review_avg.item) TO 'temporal_join_spline_snapshot_duckdb.parquet'; + AND Purchases.item = review_avg.item) TO 'output/temporal_join_spline_snapshot_duckdb.parquet'; --- Temporal Join / Snapshot [ASOF Join] (0.125s) +-- Temporal Join / Snapshot [ASOF Join] COPY (WITH review_avg AS ( SELECT item, time, AVG(rating) OVER (PARTITION BY item ORDER BY time) as avg_score @@ -103,4 +111,4 @@ COPY (WITH review_avg AS ( ) SELECT p.user, p.time, r.avg_score FROM review_avg r ASOF RIGHT JOIN Purchases p -ON p.item = r.item AND r.time >= p.time) TO 'temporal_join_asof_snapshot_duckdb.parquet'; \ No newline at end of file +ON p.item = r.item AND r.time >= p.time) TO 'output/temporal_join_asof_snapshot_duckdb.parquet'; \ No newline at end of file