Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5cd91c0
postgresql 13 to 16 in readme and docker yml
james-at-kc Apr 15, 2026
ea3bdf5
define TableSpec in data_classes.py
james-at-kc Apr 15, 2026
88a7048
syntax
james-at-kc Apr 15, 2026
a0c4c17
refactored archive table in delta and load query to delta
james-at-kc Apr 15, 2026
60aadf6
refactor
james-at-kc Apr 20, 2026
c99a22d
refactor
james-at-kc Apr 20, 2026
fd1d22b
refactor
james-at-kc Apr 22, 2026
8258776
define TableSpec in data_classes.py
james-at-kc Apr 15, 2026
48d51a1
syntax
james-at-kc Apr 15, 2026
05c9f06
refactored archive table in delta and load query to delta
james-at-kc Apr 15, 2026
599d3b3
refactor
james-at-kc Apr 20, 2026
399f408
refactor
james-at-kc Apr 20, 2026
ad0f3c7
refactor
james-at-kc Apr 22, 2026
e5e7068
Merge branch 'ftr/14329-refactor-table-spec' of https://github.com/fe…
james-at-kc Apr 22, 2026
09bb51d
Merge branch 'qat' into ftr/14329-refactor-table-spec
james-at-kc Apr 22, 2026
486858a
[DEV-14329] lint
james-at-kc Apr 22, 2026
bf4a537
[DEV-14329] merge
james-at-kc Apr 22, 2026
d66ecc8
[14329] lintlint
james-at-kc Apr 22, 2026
f7a31f0
[DEV-14329] delint
james-at-kc Apr 22, 2026
d7cf521
Merge branch 'qat' into ftr/14329-refactor-table-spec
james-at-kc Apr 22, 2026
fa89b66
[DEV-14329] more linting and typing
james-at-kc Apr 22, 2026
915cbdd
Merge branch 'ftr/14329-refactor-table-spec' of https://github.com/fe…
james-at-kc Apr 22, 2026
f4435ff
[DEV-14329] lint
james-at-kc Apr 22, 2026
6189ddb
[DEV-14329] lint part treize
james-at-kc Apr 22, 2026
267e674
[DEV-14329] rearrange declaration order for strict type checking
james-at-kc Apr 22, 2026
4c511bc
[DEV-14329] add default value for source_schema in class definition
james-at-kc Apr 22, 2026
16b7e56
[DEV-14329] refactor - imports
james-at-kc Apr 29, 2026
6fb1867
[DEV-14329] refactor - imports
james-at-kc Apr 29, 2026
791c100
[DEV-14329] refactor - imports
james-at-kc Apr 29, 2026
264adf0
[DEV-14329] refactor - unit test now N/A
james-at-kc Apr 29, 2026
329bd0f
[DEV-14329] - refactor - adjust uiit tests to use refactored TableSpe…
james-at-kc Apr 30, 2026
f1ba78d
Merge branch 'qat' into ftr/14329-refactor-table-spec
james-at-kc Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Create a `.envrc` file in the repo root, which will be ignored by git. Change cr
```shell
export DATABASE_URL=postgres://usaspending:usaspender@localhost:5432/data_store_api
export ES_HOSTNAME=http://localhost:9200
export BROKER_DB=postgres://admin:root@localhost:5435/data_broker
export BROKER_DB=postgres://admin:root@localhost:5432/data_broker
```

If `direnv` does not pick this up after saving the file, type
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
- manage
- test
- ci
image: postgres:13.8-alpine
image: postgres:16-alpine
shm_size: 1g
container_name: usaspending-db
volumes:
Expand Down
36 changes: 35 additions & 1 deletion usaspending_api/common/data_classes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from typing import Optional
from typing import Any, Callable, List, Optional, Union

from typing_extensions import Literal


Expand Down Expand Up @@ -48,3 +49,36 @@ class TransactionColumn:
# to be applied on this input. For example, a valid scalar_transformation string is
# "CASE {input} WHEN 'UNITED STATES' THEN 'USA' ELSE {input} END"
scalar_transformation: str = None


@dataclass(slots=True)
class TableSpec:
"""
Delta table metadata class
"""

destination_database: str
delta_table_create_sql: Union[str, Any] # pyspark.sql.types.StructType

destination_table_name: Optional[str] = None
source_table: Optional[str] = None
source_database: Optional[str] = None
model: Optional[str] = None
partition_column: Optional[str] = None
partition_column_type: Optional[str] = None
is_partition_column_unique: bool = False
delta_table_create_options: Optional[dict] = None
delta_table_create_partitions: Optional[List[str]] = None
source_schema: Optional[Union[List, dict]] = None
custom_schema: Optional[str] = None
column_names: Optional[List[str]] = None
is_from_broker: bool = False
source_query: Optional[Union[str, List[str], Callable]] = None
source_query_incremental: Optional[Union[str, Callable]] = None
user_defined_functions: Optional[List[dict]] = None
archive_data_field: str = "update_date"
postgres_seq_name: Optional[str] = None
postgres_partition_spec: Optional[dict] = None
tsvectors: Optional[Union[List[str], dict]] = None
swap_table: Optional[str] = None
swap_schema: Optional[str] = None
26 changes: 13 additions & 13 deletions usaspending_api/etl/management/commands/archive_table_in_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import psycopg
from django.core.management.base import BaseCommand

from usaspending_api.common.data_classes import TableSpec
from usaspending_api.common.etl.spark import load_delta_table
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
Expand All @@ -20,19 +21,18 @@
logger = logging.getLogger(__name__)

TABLE_SPEC = {
"download_job": {
"destination_database": "arc",
"destination_table": "download_job",
"archive_date_field": "update_date",
"source_table": "download_job",
"source_database": "public",
"delta_table_create_sql": download_job_create_sql_string,
}
"download_job": TableSpec(
destination_database="arc",
destination_table_name="download_job",
archive_data_field="update_date",
source_table="download_job",
source_database="public",
delta_table_create_sql=download_job_create_sql_string,
)
}


class Command(BaseCommand):

help = """
Copies records older than "--archive-period" days ago from Postgres to Delta Lake then deletes
those records from Postgres.
Expand Down Expand Up @@ -90,12 +90,12 @@ def handle(self, *args, **options) -> None:
archive_period = options["archive_period"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table
source_table = table_spec["source_table"]
source_database = table_spec["source_database"]
source_table = table_spec.source_table
source_database = table_spec.source_database
qualified_source_table = f"{source_database}.{source_table}"
archive_date_field = table_spec["archive_date_field"]
archive_date_field = table_spec.archive_date_field

archive_date = datetime.now() - timedelta(days=archive_period)
archive_date_string = archive_date.strftime("%Y-%m-%d")
Expand Down
37 changes: 19 additions & 18 deletions usaspending_api/etl/management/commands/create_delta_table.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging

from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandParser
from pyspark.sql.types import StructType

from usaspending_api.awards.delta_models.award_id_lookup import AWARD_ID_LOOKUP_SCHEMA
from usaspending_api.common.data_classes import TableSpec
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
get_active_spark_session,
Expand All @@ -19,14 +20,14 @@
**ARCHIVE_TABLE_SPEC,
**LOAD_TABLE_TABLE_SPEC,
**LOAD_QUERY_TABLE_SPEC,
"award_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA,
},
"transaction_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA,
},
"award_id_lookup": TableSpec(
destination_database="int",
delta_table_create_sql=AWARD_ID_LOOKUP_SCHEMA,
),
"transaction_id_lookup": TableSpec(
destination_database="int",
delta_table_create_sql=TRANSACTION_ID_LOOKUP_SCHEMA,
),
}

logger = logging.getLogger(__name__)
Expand All @@ -37,7 +38,7 @@ class Command(BaseCommand):
This command creates an empty Delta Table based on the provided --destination-table argument.
"""

def add_arguments(self, parser):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--destination-table",
type=str,
Expand Down Expand Up @@ -66,7 +67,7 @@ def add_arguments(self, parser):
"name",
)

def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
spark = get_active_spark_session()
spark_created_by_command = False
if not spark:
Expand All @@ -78,27 +79,27 @@ def handle(self, *args, **options):
spark_s3_bucket = options["spark_s3_bucket"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table

# Set the database that will be interacted with for all Delta Lake table Spark-based activity
logger.info(f"Using Spark Database: {destination_database}")
spark.sql(f"create database if not exists {destination_database};")
spark.sql(f"use {destination_database};")
if isinstance(table_spec["delta_table_create_sql"], str):
if isinstance(table_spec.delta_table_create_sql, str):
# Define Schema Using CREATE TABLE AS command
spark.sql(
TABLE_SPEC[destination_table]["delta_table_create_sql"].format(
table_spec.delta_table_create_sql.format(
DESTINATION_TABLE=destination_table_name,
DESTINATION_DATABASE=destination_database,
SPARK_S3_BUCKET=spark_s3_bucket,
DELTA_LAKE_S3_PATH=CONFIG.DELTA_LAKE_S3_PATH,
)
)
elif isinstance(table_spec["delta_table_create_sql"], StructType):
schema = table_spec["delta_table_create_sql"]
additional_options = table_spec.get("delta_table_create_options") or {}
partition_cols = table_spec.get("delta_table_create_partitions") or []
elif isinstance(table_spec.delta_table_create_sql, StructType):
schema = table_spec.delta_table_create_sql
additional_options = table_spec.delta_table_create_options or {}
partition_cols = table_spec.delta_table_create_partitions or []
df = spark.createDataFrame([], schema)

default_options = {
Expand Down
Loading
Loading