Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions disruption_py/settings/nickname_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
resolve MDSplus tree names for various tokamaks.
"""

import os
import sys
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Union

import pandas as pd
from loguru import logger

from disruption_py.config import config
from disruption_py.core.utils.enums import map_string_to_enum
from disruption_py.core.utils.misc import get_temporary_folder
from disruption_py.inout.base import DataConnection
from disruption_py.inout.sql import ShotDatabase
from disruption_py.machine.tokamak import Tokamak
Expand Down Expand Up @@ -47,6 +53,20 @@ class NicknameSettingParams:
tokamak: Tokamak


def _efit_df_to_dict(df: pd.DataFrame) -> dict[int, str]:
"""
Build shot/tree dict from an indexed DataFrame.
When multiple rows exist for the same shot, select the largest idx.
"""
df = (
df.reset_index()
.sort_values("idx", ascending=False)
.drop_duplicates("shot", keep="first")
.set_index("shot")["tree"]
)
return df.to_dict()


class NicknameSetting(ABC):
"""
Abstract base class for nickname settings to resolve tree names for MDSPlus data.
Expand All @@ -55,6 +75,8 @@ class NicknameSetting(ABC):
-------
get_tree_name(params)
Resolve the tree name based on the given params.
prefetch_db(database, tokamak)
Pre-fetch EFIT trees for all shots, if available.
"""

def get_tree_name(self, params: NicknameSettingParams) -> str:
Expand All @@ -76,6 +98,18 @@ def get_tree_name(self, params: NicknameSettingParams) -> str:
return self.tokamak_overrides[params.tokamak](params)
return self._get_tree_name(params)

def prefetch_db(self, database: ShotDatabase, tokamak: Tokamak) -> None:
"""
Pre-fetch EFIT trees for all shots, if available.

Parameters
----------
database : ShotDatabase
Database connection for querying tokamak shot data.
tokamak : Tokamak
The tokamak for which results are being processed.
"""

@abstractmethod
def _get_tree_name(self, params: NicknameSettingParams) -> str:
"""
Expand Down Expand Up @@ -123,6 +157,21 @@ def __init__(self, nickname_setting_dict: Dict[Tokamak, NicknameSettingType]):
}
self.resolved_nickname_setting_dict = resolved_nickname_setting_dict

def prefetch_db(self, database: ShotDatabase, tokamak: Tokamak) -> None:
"""
Map pre-fetching to the tokamak-specific nickname setting.

Parameters
----------
database : ShotDatabase
Database connection for querying tokamak shot data.
tokamak : Tokamak
The tokamak for which results are being processed.
"""
setting = self.resolved_nickname_setting_dict.get(tokamak)
if setting is not None:
setting.prefetch_db(database, tokamak)

def _get_tree_name(self, params: NicknameSettingParams) -> str:
"""
Get the tree name based on the resolved nickname setting.
Expand Down Expand Up @@ -228,6 +277,81 @@ def __init__(self):
Tokamak.D3D: self._d3d_nickname,
Tokamak.EAST: StaticNicknameSetting("efit_east")._get_tree_name,
}
self._d3d_efit_trees: dict | None = None

def prefetch_db(self, database: ShotDatabase, tokamak: Tokamak) -> None:
"""
Pre-fetch EFIT trees for all DIII-D shots in a single query.
Results are cached to a CSV file in the daily temporary folder.

Parameters
----------
database : ShotDatabase
Database connection for querying tokamak shot data.
tokamak : Tokamak
The tokamak for which results are being processed.
"""
if tokamak != Tokamak.D3D:
return

runtag = config(tokamak).efit.runtag
if "pytest" in sys.modules:
runtag = "DIS"

# write to unique runid folder, cache to parent
tmp = os.path.join(get_temporary_folder(), f"rundb_{runtag}.csv")
csv = os.path.join(
os.path.dirname(get_temporary_folder()), f"rundb_{runtag}.csv"
)

# cache hit
if os.path.isfile(csv):
logger.debug(
"Loading EFIT '{runtag}' trees from cache: {csv}",
runtag=runtag,
csv=csv,
)
df = pd.read_csv(csv, index_col="idx")
self._d3d_efit_trees = _efit_df_to_dict(df)
return

# cache miss
logger.debug("Fetching EFIT '{runtag}' trees from DB...", runtag=runtag)
took = -time.time()
df = database.query(
"select idx, shot, tree "
"from code_rundb.dbo.plasmas "
f"where runtag = '{runtag}' and deleted = 0 "
"order by idx"
)
took += time.time()

# empty
if df is None or df.empty:
logger.warning("Fetched empty EFIT '{runtag}' trees.", runtag=runtag)
self._d3d_efit_trees = {}
return

# log results
logger.debug(
"Fetched EFIT '{runtag}' trees in {sec:.3f}s: {tot:,} rows, {unique:,} unique shots.",
runtag=runtag,
sec=took,
tot=len(df),
unique=df["shot"].nunique(),
)

# store
df = df.set_index("idx")
self._d3d_efit_trees = _efit_df_to_dict(df)

# write cache
logger.debug("Caching EFIT '{runtag}' trees: {csv}", runtag=runtag, csv=csv)
df.to_csv(tmp)
if os.path.exists(csv):
logger.warning("Removing stale EFIT cache: {csv}", csv=csv)
os.remove(csv)
os.link(tmp, csv)

def _d3d_nickname(self, params: NicknameSettingParams) -> str:
"""
Expand All @@ -254,6 +378,14 @@ def _d3d_nickname(self, params: NicknameSettingParams) -> str:
if runtag == "DIS" and params.disruption_time is None:
return DefaultNicknameSetting().get_tree_name(params)

# prefetch if possible
if self._d3d_efit_trees is not None:
tree = self._d3d_efit_trees.get(params.shot_id)
if tree is not None:
return tree
return DefaultNicknameSetting().get_tree_name(params)

# fallback single query
efit_trees = params.database.query(
"select tree from code_rundb.dbo.plasmas where "
f"shot = {params.shot_id} and runtag = '{runtag}' and deleted = 0 order by idx",
Expand Down
1 change: 1 addition & 0 deletions disruption_py/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def get_shots_data(
)

took = -time.time()
retrieval_settings.efit_nickname_setting.prefetch_db(database, tokamak)
with Pool(processes=num_processes) as pool:
args = zip(
repeat(tokamak),
Expand Down
Loading