From 53825910a798019f8c7b6998b3779fbeb882250a Mon Sep 17 00:00:00 2001 From: zilto Date: Mon, 13 May 2024 19:08:27 -0400 Subject: [PATCH 1/8] added materializers --- hamilton/function_modifiers/base.py | 1 + hamilton/plugins/kedro_extensions.py | 99 ++++++++++++++++++++++++++ tests/plugins/test_kedro_extensions.py | 27 +++++++ 3 files changed, 127 insertions(+) create mode 100644 hamilton/plugins/kedro_extensions.py create mode 100644 tests/plugins/test_kedro_extensions.py diff --git a/hamilton/function_modifiers/base.py b/hamilton/function_modifiers/base.py index bb7a0b948..2a876c21f 100644 --- a/hamilton/function_modifiers/base.py +++ b/hamilton/function_modifiers/base.py @@ -38,6 +38,7 @@ "vaex", "ibis", "dlt", + "kedro", "huggingface", ] for plugin_module in plugins_modules: diff --git a/hamilton/plugins/kedro_extensions.py b/hamilton/plugins/kedro_extensions.py new file mode 100644 index 000000000..8cb5da166 --- /dev/null +++ b/hamilton/plugins/kedro_extensions.py @@ -0,0 +1,99 @@ +import dataclasses +from typing import Any, Collection, Dict, Optional, Tuple, Type + +from kedro.io import DataCatalog + +from hamilton import registry +from hamilton.io.data_adapters import DataLoader, DataSaver + + +@dataclasses.dataclass +class KedroSaver(DataSaver): + """Use Kedro DataCatalog and Dataset to save results + ref: https://docs.kedro.org/en/stable/data/advanced_data_catalog_usage.html + + .. code-block:: python + + from kedro.framework.session import KedroSession + + with KedroSession.create() as session: + context = session.load_context() + catalog = context.catalog + + dr.materialize( + to.kedro( + id="my_dataset__kedro", + dependencies=["my_dataset"], + dataset_name="my_dataset", + catalog=catalog + ) + ) + """ + + dataset_name: str + catalog: DataCatalog + + @classmethod + def applicable_types(cls) -> Collection[Type]: + return [Any] + + def save_data(self, data: Any) -> Dict[str, Any]: + self.catalog.save(name=self.dataset_name, data=data) + return dict(success=True) + + @classmethod + def name(cls) -> str: + return "kedro" + + +@dataclasses.dataclass +class KedroLoader(DataLoader): + """Use Kedro DataCatalog and Dataset to load data + ref: https://docs.kedro.org/en/stable/data/advanced_data_catalog_usage.html + + .. code-block:: python + + from kedro.framework.session import KedroSession + + with KedroSession.create() as session: + context = session.load_context() + catalog = context.catalog + + dr.materialize( + from_.kedro( + target="input_table", + dataset_name="input_table", + catalog=catalog + ) + ) + """ + + dataset_name: str + catalog: DataCatalog + version: Optional[str] = None + + @classmethod + def applicable_types(cls) -> Collection[Type]: + return [Any] + + def load_data(self, type_: Type) -> Tuple[Any, Dict[str, Any]]: + data = self.catalog.load(name=self.dataset_name, version=self.version) + metadata = dict(dataset_name=self.dataset_name, version=self.version) + return data, metadata + + @classmethod + def name(cls) -> str: + return "kedro" + + +def register_data_loaders(): + for loader in [ + KedroSaver, + KedroLoader, + ]: + registry.register_adapter(loader) + + +register_data_loaders() + +COLUMN_FRIENDLY_DF_TYPE = False diff --git a/tests/plugins/test_kedro_extensions.py b/tests/plugins/test_kedro_extensions.py new file mode 100644 index 000000000..c29c28618 --- /dev/null +++ b/tests/plugins/test_kedro_extensions.py @@ -0,0 +1,27 @@ +from kedro.io import DataCatalog +from kedro.io.memory_dataset import MemoryDataset + +from hamilton.plugins import kedro_extensions + + +def test_kedro_saver(): + dataset_name = "in_memory" + data = 37 + catalog = DataCatalog({dataset_name: MemoryDataset()}) + + saver = kedro_extensions.KedroSaver(dataset_name=dataset_name, catalog=catalog) + saver.save_data(data) + loaded_data = catalog.load(dataset_name) + + assert loaded_data == data + + +def test_kedro_loader(): + dataset_name = "in_memory" + data = 37 + catalog = DataCatalog({dataset_name: MemoryDataset(data=data)}) + + loader = kedro_extensions.KedroLoader(dataset_name=dataset_name, catalog=catalog) + loaded_data, metadata = loader.load_data(int) + + assert loaded_data == data From 62e555459b240ecb1dfdfa4aaa61387752c1335f Mon Sep 17 00:00:00 2001 From: zilto Date: Tue, 21 May 2024 18:36:18 -0400 Subject: [PATCH 2/8] fix missing type annotation --- .../kedro-code/src/kedro_code/pipelines/data_science/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/kedro/kedro-code/src/kedro_code/pipelines/data_science/nodes.py b/examples/kedro/kedro-code/src/kedro_code/pipelines/data_science/nodes.py index 48954ea22..e71d97a19 100755 --- a/examples/kedro/kedro-code/src/kedro_code/pipelines/data_science/nodes.py +++ b/examples/kedro/kedro-code/src/kedro_code/pipelines/data_science/nodes.py @@ -39,7 +39,7 @@ def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression: return regressor -def evaluate_model(regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series): +def evaluate_model(regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series) -> None: """Calculates and logs the coefficient of determination. Args: From aa66c6328c9f382c4479c8f3915d5b5edbc1dc3c Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 16:16:07 -0400 Subject: [PATCH 3/8] added kedro plugin + example --- examples/kedro/README.md | 3 +- examples/kedro/kedro-plugin/README.md | 4 + .../kedro-plugin/kedro_to_hamilton.ipynb | 1795 +++++++++++++++++ hamilton/plugins/h_kedro.py | 135 ++ tests/plugins/test_h_kedro.py | 58 + 5 files changed, 1994 insertions(+), 1 deletion(-) create mode 100644 examples/kedro/kedro-plugin/README.md create mode 100644 examples/kedro/kedro-plugin/kedro_to_hamilton.ipynb create mode 100644 hamilton/plugins/h_kedro.py create mode 100644 tests/plugins/test_h_kedro.py diff --git a/examples/kedro/README.md b/examples/kedro/README.md index 6c6d413b7..4ca3eeba3 100644 --- a/examples/kedro/README.md +++ b/examples/kedro/README.md @@ -6,6 +6,7 @@ This repository compares how to build dataflows with Kedro and Hamilton. ## Content - `kedro-code/` includes code from the [Kedro Spaceflight tutorial](https://docs.kedro.org/en/stable/tutorial/tutorial_template.html). -- `hamilton-code/` is a refactor of `kedro-code/` using the Hamilton framework. +- `hamilton-code/` is a refactor of `kedro-code/` using the Hamilton library. +- `kedro-plugin/` showcases Hamilton plugins to integrate with the Kedro framework. Each directory contains a `README` with instructions on how to run the code. We suggest going through the Kedro code first, and then read the Hamilton refactor. diff --git a/examples/kedro/kedro-plugin/README.md b/examples/kedro/kedro-plugin/README.md new file mode 100644 index 000000000..8d43cd3cd --- /dev/null +++ b/examples/kedro/kedro-plugin/README.md @@ -0,0 +1,4 @@ +# Kedro plugin + +## Content +- `kedro_to_hamilton.ipynb` contains a tutorial on how to execute your Kedro `Pipeline` using Hamilton, track your execution in the Hamilton UI, and use the Kedro materializers to load & save data. diff --git a/examples/kedro/kedro-plugin/kedro_to_hamilton.ipynb b/examples/kedro/kedro-plugin/kedro_to_hamilton.ipynb new file mode 100644 index 000000000..f4c047ca3 --- /dev/null +++ b/examples/kedro/kedro-plugin/kedro_to_hamilton.ipynb @@ -0,0 +1,1795 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Kedro to Hamilton plugin\n", + "\n", + "The plugin allows you to convert a Kedro `Pipeline` object in to a valid Hamilton `Driver`. This allows you to execute your Kedro pipeline on Hamilton and track execution using the [Hamilton UI](https://hamilton.dagworks.io/en/latest/concepts/ui/), which provides rich observability and introspection features.\n", + "\n", + "\n", + "## Content\n", + "1. From Kedro `Pipeline` to Hamilton `Driver`\n", + "2. Executing the `Driver`\n", + "3. Reusing your Kedro `DataCatalog`\n", + "4. Using Hamilton materialization\n", + "5. Connect to the Hamilton UI\n", + "\n", + "## Prerequisite\n", + "Install the example Kedro project by executing the next cell" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install ../kedro-code\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. From `Pipeline` to `Driver`\n", + "We import from the Kedro project `kedro_code` the modules `data_processing` to create a `Pipeline` object. Then, we pass it to the Hamilton plugin using `h_kedro.kedro_pipeline_to_driver()` to create a Hamilton `Driver`. It can be viewed, executed, and supports all `Driver` operations!" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "preprocessed_shuttles\n", + "\n", + "preprocessed_shuttles\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "model_input_table\n", + "\n", + "model_input_table\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "preprocessed_shuttles->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "preprocessed_companies\n", + "\n", + "preprocessed_companies\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "preprocessed_companies->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_preprocessed_shuttles_inputs\n", + "\n", + "shuttles\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_preprocessed_shuttles_inputs->preprocessed_shuttles\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_model_input_table_inputs\n", + "\n", + "reviews\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_model_input_table_inputs->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_preprocessed_companies_inputs\n", + "\n", + "companies\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_preprocessed_companies_inputs->preprocessed_companies\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input\n", + "\n", + "input\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from hamilton.plugins import h_kedro\n", + "from kedro_code.pipelines import data_processing\n", + "\n", + "dr = h_kedro.kedro_pipeline_to_driver(data_processing.create_pipeline())\n", + "dr" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also pass multiple Kedro `Pipeline` to compose them together in a single Hamilton `Driver`" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "preprocessed_shuttles\n", + "\n", + "preprocessed_shuttles\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "model_input_table\n", + "\n", + "model_input_table\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "preprocessed_shuttles->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "X_test\n", + "\n", + "X_test\n", + "Any\n", + "\n", + "\n", + "\n", + "evaluate_model\n", + "\n", + "evaluate_model\n", + "NoneType\n", + "\n", + "\n", + "\n", + "X_test->evaluate_model\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "X_train\n", + "\n", + "X_train\n", + "Any\n", + "\n", + "\n", + "\n", + "regressor\n", + "\n", + "regressor\n", + "LinearRegression\n", + "\n", + "\n", + "\n", + "X_train->regressor\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "regressor->evaluate_model\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "y_test\n", + "\n", + "y_test\n", + "Any\n", + "\n", + "\n", + "\n", + "y_test->evaluate_model\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "split_data\n", + "\n", + "split_data\n", + "Dict\n", + "\n", + "\n", + "\n", + "model_input_table->split_data\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "preprocessed_companies\n", + "\n", + "preprocessed_companies\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "preprocessed_companies->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "y_train\n", + "\n", + "y_train\n", + "Any\n", + "\n", + "\n", + "\n", + "y_train->regressor\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "split_data->X_test\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "split_data->X_train\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "split_data->y_test\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "split_data->y_train\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_preprocessed_shuttles_inputs\n", + "\n", + "shuttles\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_preprocessed_shuttles_inputs->preprocessed_shuttles\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_model_input_table_inputs\n", + "\n", + "reviews\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_model_input_table_inputs->model_input_table\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_preprocessed_companies_inputs\n", + "\n", + "companies\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_preprocessed_companies_inputs->preprocessed_companies\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_split_data_inputs\n", + "\n", + "model_options\n", + "Dict\n", + "\n", + "\n", + "\n", + "_split_data_inputs->split_data\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input\n", + "\n", + "input\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from hamilton.plugins import h_kedro\n", + "from kedro_code.pipelines import data_processing, data_science\n", + "\n", + "dr = h_kedro.kedro_pipeline_to_driver(\n", + " data_processing.create_pipeline(),\n", + " data_science.create_pipeline(),\n", + ")\n", + "dr" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Execute the `Driver`\n", + "You can execute `Driver` using the regular Hamilton approach. We load data from files and pass it to `Driver.execute(inputs=...)`" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
shuttle_locationshuttle_typeengine_typeengine_vendorenginespassenger_capacitycancellation_policycrewd_check_completemoon_clearance_complete...review_scores_crewreview_scores_locationreview_scores_pricenumber_of_reviewsreviews_per_monthidcompany_ratingcompany_locationtotal_fleet_countiata_approved
0Sao Tome and PrincipeType V5PlasmaThetaBase Services2.04moderate2.0FalseFalse...9.09.09.0260.77324131.0Faroe Islands1.0False
1Wallis and FutunaType V2PlasmaThetaBase Services3.05moderate3.0FalseFalse...10.010.09.0610.62141221.0Malta1.0True
2NiueType F5QuantumThetaBase Services1.02strict1.0TrueFalse...10.010.010.04674.66477611.0Niue2.0False
3MaltaType V2QuantumThetaBase Services1.02moderate1.0FalseFalse...10.09.09.03183.22266481.0Niue2.0True
4MaltaType V2PlasmaThetaBase Services5.010strict5.0FalseFalse...10.09.010.0220.29266481.0Niue2.0True
\n", + "

5 rows × 27 columns

\n", + "
" + ], + "text/plain": [ + " shuttle_location shuttle_type engine_type engine_vendor \\\n", + "0 Sao Tome and Principe Type V5 Plasma ThetaBase Services \n", + "1 Wallis and Futuna Type V2 Plasma ThetaBase Services \n", + "2 Niue Type F5 Quantum ThetaBase Services \n", + "3 Malta Type V2 Quantum ThetaBase Services \n", + "4 Malta Type V2 Plasma ThetaBase Services \n", + "\n", + " engines passenger_capacity cancellation_policy crew d_check_complete \\\n", + "0 2.0 4 moderate 2.0 False \n", + "1 3.0 5 moderate 3.0 False \n", + "2 1.0 2 strict 1.0 True \n", + "3 1.0 2 moderate 1.0 False \n", + "4 5.0 10 strict 5.0 False \n", + "\n", + " moon_clearance_complete ... review_scores_crew review_scores_location \\\n", + "0 False ... 9.0 9.0 \n", + "1 False ... 10.0 10.0 \n", + "2 False ... 10.0 10.0 \n", + "3 False ... 10.0 9.0 \n", + "4 False ... 10.0 9.0 \n", + "\n", + " review_scores_price number_of_reviews reviews_per_month id \\\n", + "0 9.0 26 0.77 32413 \n", + "1 9.0 61 0.62 14122 \n", + "2 10.0 467 4.66 47761 \n", + "3 9.0 318 3.22 26648 \n", + "4 10.0 22 0.29 26648 \n", + "\n", + " company_rating company_location total_fleet_count iata_approved \n", + "0 1.0 Faroe Islands 1.0 False \n", + "1 1.0 Malta 1.0 True \n", + "2 1.0 Niue 2.0 False \n", + "3 1.0 Niue 2.0 True \n", + "4 1.0 Niue 2.0 True \n", + "\n", + "[5 rows x 27 columns]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "\n", + "# loading stored data\n", + "inputs=dict(\n", + " companies=pd.read_csv(\"../kedro-code/data/01_raw/companies.csv\"),\n", + " reviews=pd.read_csv(\"../kedro-code/data/01_raw/reviews.csv\"),\n", + " shuttles=pd.read_excel(\"../kedro-code/data/01_raw/shuttles.xlsx\"),\n", + ")\n", + "results = dr.execute([\"model_input_table\"], inputs=inputs)\n", + "results[\"model_input_table\"].head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Reuse the Kedro `DataCatalog`\n", + "If you have an existing Kedro project with a `DataCatalog` (probably in a `.yaml` file), you can directly reuse it. In Hamilton, both datasets and \"parameters\" can be passed to `Driver.execute(inputs=...)`. \n", + "\n", + "To do so, you need to load the Kedro project and access the catalog and parameters from the `KedroSession`" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
[05/22/24 16:10:54] INFO     Loading data from companies (CSVDataset)...                        data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:10:54]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mcompanies\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=554145;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=747028;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from reviews (CSVDataset)...                          data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mreviews\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=756925;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=7965;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from shuttles (ExcelDataset)...                       data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mshuttles\u001b[0m \u001b[1m(\u001b[0mExcelDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=791772;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=315027;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n"
+      ],
+      "text/plain": []
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "text/html": [
+       "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
shuttle_locationshuttle_typeengine_typeengine_vendorenginespassenger_capacitycancellation_policycrewd_check_completemoon_clearance_complete...review_scores_crewreview_scores_locationreview_scores_pricenumber_of_reviewsreviews_per_monthidcompany_ratingcompany_locationtotal_fleet_countiata_approved
0Sao Tome and PrincipeType V5PlasmaThetaBase Services2.04moderate2.0FalseFalse...9.09.09.0260.77324131.0Faroe Islands1.0False
1Wallis and FutunaType V2PlasmaThetaBase Services3.05moderate3.0FalseFalse...10.010.09.0610.62141221.0Malta1.0True
2NiueType F5QuantumThetaBase Services1.02strict1.0TrueFalse...10.010.010.04674.66477611.0Niue2.0False
3MaltaType V2QuantumThetaBase Services1.02moderate1.0FalseFalse...10.09.09.03183.22266481.0Niue2.0True
4MaltaType V2PlasmaThetaBase Services5.010strict5.0FalseFalse...10.09.010.0220.29266481.0Niue2.0True
\n", + "

5 rows × 27 columns

\n", + "
" + ], + "text/plain": [ + "\n", + " shuttle_location shuttle_type engine_type engine_vendor \\\n", + "\u001b[1;36m0\u001b[0m Sao Tome and Principe Type V5 Plasma ThetaBase Services \n", + "\u001b[1;36m1\u001b[0m Wallis and Futuna Type V2 Plasma ThetaBase Services \n", + "\u001b[1;36m2\u001b[0m Niue Type F5 Quantum ThetaBase Services \n", + "\u001b[1;36m3\u001b[0m Malta Type V2 Quantum ThetaBase Services \n", + "\u001b[1;36m4\u001b[0m Malta Type V2 Plasma ThetaBase Services \n", + "\n", + " engines passenger_capacity cancellation_policy crew d_check_complete \\\n", + "\u001b[1;36m0\u001b[0m \u001b[1;36m2.0\u001b[0m \u001b[1;36m4\u001b[0m moderate \u001b[1;36m2.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\u001b[1;36m1\u001b[0m \u001b[1;36m3.0\u001b[0m \u001b[1;36m5\u001b[0m moderate \u001b[1;36m3.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\u001b[1;36m2\u001b[0m \u001b[1;36m1.0\u001b[0m \u001b[1;36m2\u001b[0m strict \u001b[1;36m1.0\u001b[0m \u001b[3;92mTrue\u001b[0m \n", + "\u001b[1;36m3\u001b[0m \u001b[1;36m1.0\u001b[0m \u001b[1;36m2\u001b[0m moderate \u001b[1;36m1.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\u001b[1;36m4\u001b[0m \u001b[1;36m5.0\u001b[0m \u001b[1;36m10\u001b[0m strict \u001b[1;36m5.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\n", + " moon_clearance_complete \u001b[33m...\u001b[0m review_scores_crew review_scores_location \\\n", + "\u001b[1;36m0\u001b[0m \u001b[3;91mFalse\u001b[0m \u001b[33m...\u001b[0m \u001b[1;36m9.0\u001b[0m \u001b[1;36m9.0\u001b[0m \n", + "\u001b[1;36m1\u001b[0m \u001b[3;91mFalse\u001b[0m \u001b[33m...\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m10.0\u001b[0m \n", + "\u001b[1;36m2\u001b[0m \u001b[3;91mFalse\u001b[0m \u001b[33m...\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m10.0\u001b[0m \n", + "\u001b[1;36m3\u001b[0m \u001b[3;91mFalse\u001b[0m \u001b[33m...\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m9.0\u001b[0m \n", + "\u001b[1;36m4\u001b[0m \u001b[3;91mFalse\u001b[0m \u001b[33m...\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m9.0\u001b[0m \n", + "\n", + " review_scores_price number_of_reviews reviews_per_month id \\\n", + "\u001b[1;36m0\u001b[0m \u001b[1;36m9.0\u001b[0m \u001b[1;36m26\u001b[0m \u001b[1;36m0.77\u001b[0m \u001b[1;36m32413\u001b[0m \n", + "\u001b[1;36m1\u001b[0m \u001b[1;36m9.0\u001b[0m \u001b[1;36m61\u001b[0m \u001b[1;36m0.62\u001b[0m \u001b[1;36m14122\u001b[0m \n", + "\u001b[1;36m2\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m467\u001b[0m \u001b[1;36m4.66\u001b[0m \u001b[1;36m47761\u001b[0m \n", + "\u001b[1;36m3\u001b[0m \u001b[1;36m9.0\u001b[0m \u001b[1;36m318\u001b[0m \u001b[1;36m3.22\u001b[0m \u001b[1;36m26648\u001b[0m \n", + "\u001b[1;36m4\u001b[0m \u001b[1;36m10.0\u001b[0m \u001b[1;36m22\u001b[0m \u001b[1;36m0.29\u001b[0m \u001b[1;36m26648\u001b[0m \n", + "\n", + " company_rating company_location total_fleet_count iata_approved \n", + "\u001b[1;36m0\u001b[0m \u001b[1;36m1.0\u001b[0m Faroe Islands \u001b[1;36m1.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\u001b[1;36m1\u001b[0m \u001b[1;36m1.0\u001b[0m Malta \u001b[1;36m1.0\u001b[0m \u001b[3;92mTrue\u001b[0m \n", + "\u001b[1;36m2\u001b[0m \u001b[1;36m1.0\u001b[0m Niue \u001b[1;36m2.0\u001b[0m \u001b[3;91mFalse\u001b[0m \n", + "\u001b[1;36m3\u001b[0m \u001b[1;36m1.0\u001b[0m Niue \u001b[1;36m2.0\u001b[0m \u001b[3;92mTrue\u001b[0m \n", + "\u001b[1;36m4\u001b[0m \u001b[1;36m1.0\u001b[0m Niue \u001b[1;36m2.0\u001b[0m \u001b[3;92mTrue\u001b[0m \n", + "\n", + "\u001b[1m[\u001b[0m\u001b[1;36m5\u001b[0m rows x \u001b[1;36m27\u001b[0m columns\u001b[1m]\u001b[0m" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from kedro.framework.session import KedroSession\n", + "from kedro.framework.startup import bootstrap_project\n", + "\n", + "project_path = \"../kedro-code\"\n", + "bootstrap_project(project_path)\n", + "with KedroSession.create(project_path) as session:\n", + " context = session.load_context()\n", + " catalog = context.catalog\n", + " params = context.params\n", + "\n", + "inputs = dict(\n", + " companies=catalog.load(\"companies\"),\n", + " reviews=catalog.load(\"reviews\"),\n", + " shuttles=catalog.load(\"shuttles\"),\n", + " **params, # unpack a dictionary\n", + ")\n", + "\n", + "results = dr.execute([\"model_input_table\"], inputs=inputs)\n", + "results[\"model_input_table\"].head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Use Hamilton materializers\n", + "\n", + "The concept of [materialization](https://hamilton.dagworks.io/en/latest/concepts/materialization/#) in Hamilton closely ressembles `Dataset` objects in Kedro. The `from_` object defines loaders and `to` defines savers, and they're collectively called \"materializers\".\n", + "\n", + "If you're using Hamilton, you'll find plenty of savers & loaders for common formats and libraries (`parquet`, `csv`, `json`, `xgboost`, `dlt`, etc.). We suggest using them directly if you're wanting to move away from the data catalog and YAML." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n"
+      ],
+      "text/plain": []
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "text/plain": [
+       "\n",
+       "\u001b[1m{\u001b[0m\n",
+       "    \u001b[32m'model_input_table__parquet'\u001b[0m: \u001b[1m{\u001b[0m\n",
+       "        \u001b[32m'file_metadata'\u001b[0m: \u001b[1m{\u001b[0m\n",
+       "            \u001b[32m'size'\u001b[0m: \u001b[1;36m215150\u001b[0m,\n",
+       "            \u001b[32m'path'\u001b[0m: \u001b[32m'../kedro-code/data/03_primary/model_input_table.pq'\u001b[0m,\n",
+       "            \u001b[32m'last_modified'\u001b[0m: \u001b[1;36m1716408659.312288\u001b[0m,\n",
+       "            \u001b[32m'timestamp'\u001b[0m: \u001b[1;36m1716423059.328785\u001b[0m,\n",
+       "            \u001b[32m'scheme'\u001b[0m: \u001b[32m''\u001b[0m,\n",
+       "            \u001b[32m'notes'\u001b[0m: \u001b[32m''\u001b[0m\n",
+       "        \u001b[1m}\u001b[0m,\n",
+       "        \u001b[32m'dataframe_metadata'\u001b[0m: \u001b[1m{\u001b[0m\n",
+       "            \u001b[32m'rows'\u001b[0m: \u001b[1;36m6027\u001b[0m,\n",
+       "            \u001b[32m'columns'\u001b[0m: \u001b[1;36m27\u001b[0m,\n",
+       "            \u001b[32m'column_names'\u001b[0m: \u001b[1m[\u001b[0m\n",
+       "                \u001b[32m'shuttle_location'\u001b[0m,\n",
+       "                \u001b[32m'shuttle_type'\u001b[0m,\n",
+       "                \u001b[32m'engine_type'\u001b[0m,\n",
+       "                \u001b[32m'engine_vendor'\u001b[0m,\n",
+       "                \u001b[32m'engines'\u001b[0m,\n",
+       "                \u001b[32m'passenger_capacity'\u001b[0m,\n",
+       "                \u001b[32m'cancellation_policy'\u001b[0m,\n",
+       "                \u001b[32m'crew'\u001b[0m,\n",
+       "                \u001b[32m'd_check_complete'\u001b[0m,\n",
+       "                \u001b[32m'moon_clearance_complete'\u001b[0m,\n",
+       "                \u001b[32m'price'\u001b[0m,\n",
+       "                \u001b[32m'company_id'\u001b[0m,\n",
+       "                \u001b[32m'shuttle_id'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_rating'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_comfort'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_amenities'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_trip'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_crew'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_location'\u001b[0m,\n",
+       "                \u001b[32m'review_scores_price'\u001b[0m,\n",
+       "                \u001b[32m'number_of_reviews'\u001b[0m,\n",
+       "                \u001b[32m'reviews_per_month'\u001b[0m,\n",
+       "                \u001b[32m'id'\u001b[0m,\n",
+       "                \u001b[32m'company_rating'\u001b[0m,\n",
+       "                \u001b[32m'company_location'\u001b[0m,\n",
+       "                \u001b[32m'total_fleet_count'\u001b[0m,\n",
+       "                \u001b[32m'iata_approved'\u001b[0m\n",
+       "            \u001b[1m]\u001b[0m,\n",
+       "            \u001b[32m'datatypes'\u001b[0m: \u001b[1m[\u001b[0m\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'boolean'\u001b[0m,\n",
+       "                \u001b[32m'boolean'\u001b[0m,\n",
+       "                \u001b[32m'float64'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'Int64'\u001b[0m,\n",
+       "                \u001b[32m'float64'\u001b[0m,\n",
+       "                \u001b[32m'string'\u001b[0m,\n",
+       "                \u001b[32m'Float64'\u001b[0m,\n",
+       "                \u001b[32m'boolean'\u001b[0m\n",
+       "            \u001b[1m]\u001b[0m\n",
+       "        \u001b[1m}\u001b[0m\n",
+       "    \u001b[1m}\u001b[0m\n",
+       "\u001b[1m}\u001b[0m"
+      ]
+     },
+     "execution_count": 6,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "from hamilton.io.materialization import from_, to\n",
+    "\n",
+    "# contains both Savers and Loaders\n",
+    "materializers = [\n",
+    "    # `target` is the name of the Hamilton node receiving data\n",
+    "    from_.csv(\n",
+    "        target=\"companies\",\n",
+    "        path=\"../kedro-code/data/01_raw/companies.csv\",\n",
+    "    ), \n",
+    "    from_.csv(\n",
+    "        target=\"reviews\",\n",
+    "        path=\"../kedro-code/data/01_raw/reviews.csv\",\n",
+    "    ), \n",
+    "    from_.excel(\n",
+    "        target=\"shuttles\",\n",
+    "        path=\"../kedro-code/data/01_raw/shuttles.xlsx\",\n",
+    "    ),\n",
+    "    # `id` is the name of the generated \"saver\" node\n",
+    "    to.parquet(\n",
+    "        id=\"model_input_table__parquet\",\n",
+    "        dependencies=[\"model_input_table\"],\n",
+    "        path=\"../kedro-code/data/03_primary/model_input_table.pq\",\n",
+    "    )\n",
+    "]\n",
+    "\n",
+    "# `.materialize()` will load data using `from_` objects and store results of `to` objects\n",
+    "# it returns execution metadata about stored results.\n",
+    "metadata, _ = dr.materialize(*materializers)\n",
+    "metadata"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "If Hamilton doesn't support a specific format, you can use Kedro datasets using the `from_.kedro()` and `to.kedro()` materializers!\n",
+    "\n",
+    "This first snippet shows how to read from the `DataCatalog` defined in YAML format within the Kedro project."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "
[05/22/24 16:10:59] INFO     Loading data from shuttles (ExcelDataset)...                       data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:10:59]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mshuttles\u001b[0m \u001b[1m(\u001b[0mExcelDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=251889;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=569579;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
[05/22/24 16:11:01] INFO     Loading data from companies (CSVDataset)...                        data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:11:01]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mcompanies\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=698982;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=247154;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from reviews (CSVDataset)...                          data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mreviews\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=651853;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=754005;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
[05/22/24 16:11:02] INFO     Saving data to model_input_table (ParquetDataset)...               data_catalog.py:525\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:11:02]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Saving data to \u001b[38;5;208mmodel_input_table\u001b[0m \u001b[1m(\u001b[0mParquetDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=845498;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=59594;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#525\u001b\\\u001b[2m525\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n"
+      ],
+      "text/plain": []
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "text/plain": [
+       "\u001b[1m{\u001b[0m\u001b[32m'model_input_table__parquet'\u001b[0m: \u001b[1m{\u001b[0m\u001b[32m'success'\u001b[0m: \u001b[3;92mTrue\u001b[0m\u001b[1m}\u001b[0m\u001b[1m}\u001b[0m"
+      ]
+     },
+     "execution_count": 7,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "project_path = \"../kedro-code\"\n",
+    "bootstrap_project(project_path)\n",
+    "with KedroSession.create(project_path) as session:\n",
+    "    context = session.load_context()\n",
+    "    catalog = context.catalog\n",
+    "    params = context.params\n",
+    "\n",
+    "# pass the `DataCatalog` to the `catalog` parameter\n",
+    "materializers = [\n",
+    "    from_.kedro(\n",
+    "        target=\"companies\",\n",
+    "        dataset_name=\"companies\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    from_.kedro(\n",
+    "        target=\"shuttles\",\n",
+    "        dataset_name=\"shuttles\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    from_.kedro(\n",
+    "        target=\"reviews\",\n",
+    "        dataset_name=\"reviews\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    to.kedro(\n",
+    "        id=\"model_input_table__parquet\",\n",
+    "        dependencies=[\"model_input_table\"],\n",
+    "        dataset_name=\"model_input_table\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "]\n",
+    "metadata, _ = dr.materialize(*materializers)\n",
+    "metadata"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "This second snippet shows how to define a `DataCatalog` directly in Python ([documentation](https://docs.kedro.org/en/stable/data/advanced_data_catalog_usage.html)). Note that in this case, all formats are already supported in Hamilton and it makes code a bit redundant."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 8,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "
                    INFO     Loading data from shuttles (ExcelDataset)...                       data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mshuttles\u001b[0m \u001b[1m(\u001b[0mExcelDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=765638;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=540969;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
[05/22/24 16:11:04] INFO     Loading data from companies (CSVDataset)...                        data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:11:04]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mcompanies\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=845539;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=373818;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from reviews (CSVDataset)...                          data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mreviews\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=673966;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=380648;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Saving data to model_input_table (ParquetDataset)...               data_catalog.py:525\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Saving data to \u001b[38;5;208mmodel_input_table\u001b[0m \u001b[1m(\u001b[0mParquetDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=611834;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=912002;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#525\u001b\\\u001b[2m525\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n"
+      ],
+      "text/plain": []
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "text/plain": [
+       "\u001b[1m{\u001b[0m\u001b[32m'model_input_table__parquet'\u001b[0m: \u001b[1m{\u001b[0m\u001b[32m'success'\u001b[0m: \u001b[3;92mTrue\u001b[0m\u001b[1m}\u001b[0m\u001b[1m}\u001b[0m"
+      ]
+     },
+     "execution_count": 8,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "from kedro.io import DataCatalog\n",
+    "from kedro_datasets.pandas import (\n",
+    "    CSVDataset,\n",
+    "    ExcelDataset,\n",
+    "    ParquetDataset,\n",
+    ")\n",
+    "\n",
+    "catalog = DataCatalog(dict(\n",
+    "    companies=CSVDataset(filepath=\"../kedro-code/data/01_raw/companies.csv\"),\n",
+    "    reviews=CSVDataset(filepath=\"../kedro-code/data/01_raw/reviews.csv\"),\n",
+    "    shuttles=ExcelDataset(filepath=\"../kedro-code/data/01_raw/shuttles.xlsx\"),\n",
+    "    model_input_table=ParquetDataset(filepath=\"../kedro-code/data/03_primary/model_input_table.pq\")\n",
+    "))\n",
+    "\n",
+    "materializers = [\n",
+    "    from_.kedro(\n",
+    "        target=\"companies\",\n",
+    "        dataset_name=\"companies\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    from_.kedro(\n",
+    "        target=\"shuttles\",\n",
+    "        dataset_name=\"shuttles\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    from_.kedro(\n",
+    "        target=\"reviews\",\n",
+    "        dataset_name=\"reviews\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "    to.kedro(\n",
+    "        id=\"model_input_table__parquet\",\n",
+    "        dependencies=[\"model_input_table\"],\n",
+    "        dataset_name=\"model_input_table\",\n",
+    "        catalog=catalog,\n",
+    "    ),\n",
+    "]\n",
+    "metadata, _ = dr.materialize(*materializers)\n",
+    "metadata"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## 5. Connect to the Hamilton UI\n",
+    "\n",
+    "To connect Hamilton to the Hamilton UI, we need to add the `HamiltonTracker` adapter. To connect your Kedro `Pipeline`, you need to pass a `Builder` containing the tracker when converting it to a Hamilton `Driver`. \n",
+    "\n",
+    "The next cell will install the dependencies for the UI. You also need the Hamilton UI container running for the next cells to successfully execute and send metadata. See [installation instructions](https://hamilton.dagworks.io/en/latest/concepts/ui/)."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "!pip install hamilton_sdk"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Running the next cell and creating the `Driver` should populate the Hamilton UI with the dataflow definition."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "
\n"
+      ],
+      "text/plain": []
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "image/svg+xml": [
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "%3\n",
+       "\n",
+       "\n",
+       "cluster__legend\n",
+       "\n",
+       "Legend\n",
+       "\n",
+       "\n",
+       "\n",
+       "preprocessed_shuttles\n",
+       "\n",
+       "preprocessed_shuttles\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "model_input_table\n",
+       "\n",
+       "model_input_table\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "preprocessed_shuttles->model_input_table\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "X_test\n",
+       "\n",
+       "X_test\n",
+       "Any\n",
+       "\n",
+       "\n",
+       "\n",
+       "evaluate_model\n",
+       "\n",
+       "evaluate_model\n",
+       "NoneType\n",
+       "\n",
+       "\n",
+       "\n",
+       "X_test->evaluate_model\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "X_train\n",
+       "\n",
+       "X_train\n",
+       "Any\n",
+       "\n",
+       "\n",
+       "\n",
+       "regressor\n",
+       "\n",
+       "regressor\n",
+       "LinearRegression\n",
+       "\n",
+       "\n",
+       "\n",
+       "X_train->regressor\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "regressor->evaluate_model\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "y_test\n",
+       "\n",
+       "y_test\n",
+       "Any\n",
+       "\n",
+       "\n",
+       "\n",
+       "y_test->evaluate_model\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "split_data\n",
+       "\n",
+       "split_data\n",
+       "Dict\n",
+       "\n",
+       "\n",
+       "\n",
+       "model_input_table->split_data\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "preprocessed_companies\n",
+       "\n",
+       "preprocessed_companies\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "preprocessed_companies->model_input_table\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "y_train\n",
+       "\n",
+       "y_train\n",
+       "Any\n",
+       "\n",
+       "\n",
+       "\n",
+       "y_train->regressor\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "split_data->X_test\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "split_data->X_train\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "split_data->y_test\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "split_data->y_train\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "_preprocessed_shuttles_inputs\n",
+       "\n",
+       "shuttles\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "_preprocessed_shuttles_inputs->preprocessed_shuttles\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "_model_input_table_inputs\n",
+       "\n",
+       "reviews\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "_model_input_table_inputs->model_input_table\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "_preprocessed_companies_inputs\n",
+       "\n",
+       "companies\n",
+       "DataFrame\n",
+       "\n",
+       "\n",
+       "\n",
+       "_preprocessed_companies_inputs->preprocessed_companies\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "_split_data_inputs\n",
+       "\n",
+       "model_options\n",
+       "Dict\n",
+       "\n",
+       "\n",
+       "\n",
+       "_split_data_inputs->split_data\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "\n",
+       "input\n",
+       "\n",
+       "input\n",
+       "\n",
+       "\n",
+       "\n",
+       "function\n",
+       "\n",
+       "function\n",
+       "\n",
+       "\n",
+       "\n"
+      ],
+      "text/plain": [
+       "\u001b[1m<\u001b[0m\u001b[1;95mhamilton.driver.Driver\u001b[0m\u001b[39m object at \u001b[0m\u001b[1;36m0x7f267841b450\u001b[0m\u001b[1m>\u001b[0m"
+      ]
+     },
+     "execution_count": 10,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "from hamilton_sdk.adapters import HamiltonTracker\n",
+    "from hamilton import driver\n",
+    "from hamilton.plugins import h_kedro\n",
+    "from kedro_code.pipelines import data_processing, data_science\n",
+    "\n",
+    "# modify this as needed\n",
+    "tracker = HamiltonTracker(\n",
+    "    project_id=3,\n",
+    "    username=\"abc@my_domain.com\",\n",
+    "    dag_name=\"spaceflight\",\n",
+    ")\n",
+    "builder = driver.Builder().with_adapters(tracker)\n",
+    "\n",
+    "dr = h_kedro.kedro_pipeline_to_driver(\n",
+    "    data_processing.create_pipeline(),\n",
+    "    data_science.create_pipeline(),\n",
+    "    builder=builder\n",
+    ")\n",
+    "dr"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Calling `.materialize()` in the next cell (or using `.execute()`) will populate the Hamilton UI with execution metadata and artifact introspection"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 11,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "
[05/22/24 16:11:10] WARNING                                                                         adapters.py:163\n",
+       "                             Capturing execution run. Results can be found at                                      \n",
+       "                             http://localhost:8242/dashboard/project/3/runs/32                                     \n",
+       "                                                                                                                   \n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:11:10]\u001b[0m\u001b[2;36m \u001b[0m\u001b[31mWARNING \u001b[0m \u001b]8;id=299905;file:///home/tjean/projects/dagworks/hamilton/ui/sdk/src/hamilton_sdk/adapters.py\u001b\\\u001b[2madapters.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=155714;file:///home/tjean/projects/dagworks/hamilton/ui/sdk/src/hamilton_sdk/adapters.py#163\u001b\\\u001b[2m163\u001b[0m\u001b]8;;\u001b\\\n", + "\u001b[2;36m \u001b[0m Capturing execution run. Results can be found at \u001b[2m \u001b[0m\n", + "\u001b[2;36m \u001b[0m \u001b[4;94mhttp://localhost:8242/dashboard/project/3/runs/32\u001b[0m \u001b[2m \u001b[0m\n", + "\u001b[2;36m \u001b[0m \u001b[2m \u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from shuttles (ExcelDataset)...                       data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mshuttles\u001b[0m \u001b[1m(\u001b[0mExcelDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=976064;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=997503;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
[05/22/24 16:11:13] INFO     Loading data from companies (CSVDataset)...                        data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m[05/22/24 16:11:13]\u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mcompanies\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=134319;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=446665;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Loading data from reviews (CSVDataset)...                          data_catalog.py:483\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Loading data from \u001b[38;5;208mreviews\u001b[0m \u001b[1m(\u001b[0mCSVDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=620527;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=424729;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#483\u001b\\\u001b[2m483\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    INFO     Saving data to model_input_table (ParquetDataset)...               data_catalog.py:525\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[34mINFO \u001b[0m Saving data to \u001b[38;5;208mmodel_input_table\u001b[0m \u001b[1m(\u001b[0mParquetDataset\u001b[1m)\u001b[0m\u001b[33m...\u001b[0m \u001b]8;id=102118;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py\u001b\\\u001b[2mdata_catalog.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=182015;file:///home/tjean/projects/dagworks/hamilton/examples/kedro/venv/lib/python3.11/site-packages/kedro/io/data_catalog.py#525\u001b\\\u001b[2m525\u001b[0m\u001b]8;;\u001b\\\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
                    WARNING                                                                         adapters.py:352\n",
+       "                             Captured execution run. Results can be found at                                       \n",
+       "                             http://localhost:8242/dashboard/project/3/runs/32                                     \n",
+       "                                                                                                                   \n",
+       "
\n" + ], + "text/plain": [ + "\u001b[2;36m \u001b[0m\u001b[2;36m \u001b[0m\u001b[31mWARNING \u001b[0m \u001b]8;id=48567;file:///home/tjean/projects/dagworks/hamilton/ui/sdk/src/hamilton_sdk/adapters.py\u001b\\\u001b[2madapters.py\u001b[0m\u001b]8;;\u001b\\\u001b[2m:\u001b[0m\u001b]8;id=639017;file:///home/tjean/projects/dagworks/hamilton/ui/sdk/src/hamilton_sdk/adapters.py#352\u001b\\\u001b[2m352\u001b[0m\u001b]8;;\u001b\\\n", + "\u001b[2;36m \u001b[0m Captured execution run. Results can be found at \u001b[2m \u001b[0m\n", + "\u001b[2;36m \u001b[0m \u001b[4;94mhttp://localhost:8242/dashboard/project/3/runs/32\u001b[0m \u001b[2m \u001b[0m\n", + "\u001b[2;36m \u001b[0m \u001b[2m \u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# reuse the previously defined materializers\n", + "_, _ = dr.materialize(*materializers)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/hamilton/plugins/h_kedro.py b/hamilton/plugins/h_kedro.py new file mode 100644 index 000000000..9ca9413c9 --- /dev/null +++ b/hamilton/plugins/h_kedro.py @@ -0,0 +1,135 @@ +import inspect +from types import NoneType +from typing import Any, Dict, Optional, Tuple + +from kedro.pipeline.node import Node as KNode +from kedro.pipeline.pipeline import Pipeline as KPipeline + +from hamilton import driver, graph +from hamilton.function_modifiers.expanders import extract_fields +from hamilton.lifecycle import base as lifecycle_base +from hamilton.node import Node as HNode + + +def expand_k_node(base_node: HNode, outputs: list[str]) -> list[HNode]: + """Manually apply `@extract_fields()` on a Hamilton node.Node for a Kedro + node that specifies >1 `outputs`. + + The number of nodes == len(outputs) + 1 because it includes the `base_node` + """ + + def _convert_output_from_tuple_to_dict(node_result: Any, node_kwargs: Dict[str, Any]): + return {out: v for out, v in zip(outputs, node_result)} + + # NOTE isinstance(Any, type) is False for Python < 3.11 + extractor = extract_fields(fields={out: Any for out in outputs}) + func = base_node.originating_functions[0] + if issubclass(func.__annotations__["return"], Tuple): + base_node = base_node.transform_output(_convert_output_from_tuple_to_dict, Dict) + func.__annotations__["return"] = Dict + + extractor.validate(func) + return list(extractor.transform_node(base_node, {}, func)) + + +def k_node_to_h_nodes(node: KNode) -> list[HNode]: + """Convert a Kedro node to a list of Hamilton nodes. + If the Kedro node specifies 1 output, generate 1 Hamilton node. + If it generate >1 output, generate len(outputs) + 1 to include the base node + extracted fields. + """ + # determine if more than one output + node_names = [] + if isinstance(node.outputs, list): + node_names.extend(node.outputs) + elif isinstance(node.outputs, dict): + node_names.extend(node.outputs.values()) + + # determine the base node name + if len(node_names) == 1: + base_node_name = node_names[0] + elif isinstance(node.outputs, str): + base_node_name = node.outputs + else: + base_node_name = node.func.__name__ + + func_sig = inspect.signature(node.func) + params = func_sig.parameters.values() + output_type = func_sig.return_annotation + if output_type is None: + output_type = NoneType # manually creating `hamilton.node.Node` doesn't accept `typ=None` + + base_node = HNode( + name=base_node_name, + typ=output_type, + doc_string=getattr(node.func, "__doc__", ""), + callabl=node.func, + originating_functions=(node.func,), + ) + + # if Kedro node defines multiple outputs, use `@extract_fields()` + if len(node_names) > 1: + h_nodes = expand_k_node(base_node, node_names) + else: + h_nodes = [base_node] + + # remap the function parameters to the node `inputs` and clean Kedro `parameters` name + new_params = {} + for param, k_input in zip(params, node.inputs): + if k_input.startswith("params:"): + k_input = k_input.partition("params:")[-1] + + new_params[param.name] = k_input + + h_nodes = [n.reassign_inputs(input_names=new_params) for n in h_nodes] + + return h_nodes + + +def kedro_pipeline_to_driver( + *pipelines: KPipeline, + builder: Optional[driver.Builder] = None, +) -> driver.Driver: + """Convert one or mode Kedro `Pipeline` to a Hamilton `Driver`. + Pass a Hamilton `Builder` to include lifecycle adapters in your `Driver`. + + :param pipelines: one or more Kedro `Pipeline` objects + :param builder: a Hamilton `Builder` to use when building the `Driver` + :return: the Hamilton `Driver` built from Kedro `Pipeline` objects. + + .. code-block: python + + from hamilton import driver + from hamilton.plugins import h_kedro + + builder = driver.Builder().with_adapters(tracker) + + dr = h_kedro.kedro_pipeline_to_driver( + data_science.create_pipeline(), # Kedro Pipeline + data_processing.create_pipeline(), # Kedro Pipeline + builder=builder + ) + """ + # generate nodes + h_nodes = [] + for pipe in pipelines: + for node in pipe.nodes: + h_nodes.extend(k_node_to_h_nodes(node)) + + # resolve dependencies + h_nodes = graph.update_dependencies( + {n.name: n for n in h_nodes}, + lifecycle_base.LifecycleAdapterSet(), + ) + + builder = builder if builder else driver.Builder() + dr = builder.build() + # inject function graph in Driver + dr.graph = graph.FunctionGraph( + h_nodes, config={}, adapter=lifecycle_base.LifecycleAdapterSet(*builder.adapters) + ) + # reapply lifecycle hooks + if dr.adapter.does_hook("post_graph_construct", is_async=False): + dr.adapter.call_all_lifecycle_hooks_sync( + "post_graph_construct", graph=dr.graph, modules=dr.graph_modules, config={} + ) + return dr diff --git a/tests/plugins/test_h_kedro.py b/tests/plugins/test_h_kedro.py new file mode 100644 index 000000000..de1891002 --- /dev/null +++ b/tests/plugins/test_h_kedro.py @@ -0,0 +1,58 @@ +import inspect + +import pandas as pd +from kedro.pipeline import node + +from hamilton.plugins import h_kedro + + +def test_parse_k_node_str_output(): + def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame: + """Preprocesses the data for companies.""" + companies["iata_approved"] = companies["iata_approved"].astype("category") + return companies + + kedro_node = node( + func=preprocess_companies, + inputs="companies", + outputs="preprocessed_companies", + name="preprocess_companies_node", + ) + h_nodes = h_kedro.k_node_to_h_nodes(kedro_node) + assert len(h_nodes) == 1 + assert h_nodes[0].name == "preprocessed_companies" + assert h_nodes[0].type == inspect.signature(preprocess_companies).return_annotation + + +def test_parse_k_node_list_outputs(): + def multi_outputs() -> dict: + return dict(a=1, b=2) + + kedro_node = node( + func=multi_outputs, + inputs=None, + outputs=["a", "b"], + ) + h_nodes = h_kedro.k_node_to_h_nodes(kedro_node) + node_names = [n.name for n in h_nodes] + assert len(h_nodes) == 3 + assert "multi_outputs" in node_names + assert "a" in node_names + assert "b" in node_names + + +def test_parse_k_node_dict_outputs(): + def multi_outputs() -> dict: + return dict(a=1, b=2) + + kedro_node = node( + func=multi_outputs, + inputs=None, + outputs={"a": "a", "b": "b"}, + ) + h_nodes = h_kedro.k_node_to_h_nodes(kedro_node) + node_names = [n.name for n in h_nodes] + assert len(h_nodes) == 3 + assert "multi_outputs" in node_names + assert "a" in node_names + assert "b" in node_names From 4bf9b28bcf92ada4fca50a183ad90b555c21d86b Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 16:26:54 -0400 Subject: [PATCH 4/8] updated tests requirements --- requirements-test.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-test.txt b/requirements-test.txt index d58a4192d..a819c0455 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -7,6 +7,7 @@ dlt fsspec graphviz kaleido +kedro lancedb lightgbm lxml From fde4673e3e57eec19537fba8c1da8adefec76127 Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 16:37:55 -0400 Subject: [PATCH 5/8] fix NoneType for <3.10 --- hamilton/plugins/h_kedro.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hamilton/plugins/h_kedro.py b/hamilton/plugins/h_kedro.py index 9ca9413c9..44109833f 100644 --- a/hamilton/plugins/h_kedro.py +++ b/hamilton/plugins/h_kedro.py @@ -1,6 +1,5 @@ import inspect -from types import NoneType -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple, Type from kedro.pipeline.node import Node as KNode from kedro.pipeline.pipeline import Pipeline as KPipeline @@ -56,7 +55,8 @@ def k_node_to_h_nodes(node: KNode) -> list[HNode]: params = func_sig.parameters.values() output_type = func_sig.return_annotation if output_type is None: - output_type = NoneType # manually creating `hamilton.node.Node` doesn't accept `typ=None` + # manually creating `hamilton.node.Node` doesn't accept `typ=None` + output_type = Type[None] # NoneType is introduced in Python 3.10 base_node = HNode( name=base_node_name, From 4d7adca511d463f0ee40ef4f03c9dc996b6a5143 Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 16:42:55 -0400 Subject: [PATCH 6/8] fixed another 3.8 typing bug --- hamilton/plugins/h_kedro.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hamilton/plugins/h_kedro.py b/hamilton/plugins/h_kedro.py index 44109833f..1da0269b4 100644 --- a/hamilton/plugins/h_kedro.py +++ b/hamilton/plugins/h_kedro.py @@ -1,5 +1,5 @@ import inspect -from typing import Any, Dict, Optional, Tuple, Type +from typing import Any, Dict, List, Optional, Tuple, Type from kedro.pipeline.node import Node as KNode from kedro.pipeline.pipeline import Pipeline as KPipeline @@ -10,7 +10,7 @@ from hamilton.node import Node as HNode -def expand_k_node(base_node: HNode, outputs: list[str]) -> list[HNode]: +def expand_k_node(base_node: HNode, outputs: List[str]) -> List[HNode]: """Manually apply `@extract_fields()` on a Hamilton node.Node for a Kedro node that specifies >1 `outputs`. @@ -31,7 +31,7 @@ def _convert_output_from_tuple_to_dict(node_result: Any, node_kwargs: Dict[str, return list(extractor.transform_node(base_node, {}, func)) -def k_node_to_h_nodes(node: KNode) -> list[HNode]: +def k_node_to_h_nodes(node: KNode) -> List[HNode]: """Convert a Kedro node to a list of Hamilton nodes. If the Kedro node specifies 1 output, generate 1 Hamilton node. If it generate >1 output, generate len(outputs) + 1 to include the base node + extracted fields. From 387cdc2a17b88516ab7cf6b57af8b8949a9e4716 Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 17:18:38 -0400 Subject: [PATCH 7/8] bug fix expanders <3.11 --- hamilton/function_modifiers/expanders.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hamilton/function_modifiers/expanders.py b/hamilton/function_modifiers/expanders.py index e43d65262..834a80827 100644 --- a/hamilton/function_modifiers/expanders.py +++ b/hamilton/function_modifiers/expanders.py @@ -717,6 +717,8 @@ def _validate_extract_fields(fields: dict): if not ( isinstance(field_type, type) + or field_type + is Any # condition needed becuase isinstance(Any, type) == False for Python <3.11 or typing_inspect.is_generic_type(field_type) or typing_inspect.is_union_type(field_type) ): From 76ed273e60ad83f6c098a6999b35a9b9f038ddc0 Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 22 May 2024 17:26:18 -0400 Subject: [PATCH 8/8] moved comment because pre-commit break formatting otherwise --- hamilton/function_modifiers/expanders.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hamilton/function_modifiers/expanders.py b/hamilton/function_modifiers/expanders.py index 834a80827..a375e206e 100644 --- a/hamilton/function_modifiers/expanders.py +++ b/hamilton/function_modifiers/expanders.py @@ -715,10 +715,10 @@ def _validate_extract_fields(fields: dict): if not isinstance(field, str): errors.append(f"{field} is not a string. All keys must be strings.") + # second condition needed because isinstance(Any, type) == False for Python <3.11 if not ( isinstance(field_type, type) - or field_type - is Any # condition needed becuase isinstance(Any, type) == False for Python <3.11 + or field_type is Any or typing_inspect.is_generic_type(field_type) or typing_inspect.is_union_type(field_type) ):