Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/decisionengine_modules/GCE/transforms/GceFigureOfMerit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from decisionengine_modules.util.figure_of_merit import figure_of_merit


@Transform.consumes(GCE_Instance_Performance=pd.DataFrame, Factory_Entries_GCE=pd.DataFrame, GCE_Occupancy=pd.DataFrame)
@Transform.consumes(GCE_Instance_Performance=pd.DataFrame, Factory_Entries=pd.DataFrame, GCE_Occupancy=pd.DataFrame)
@Transform.produces(GCE_Price_Performance=pd.DataFrame, GCE_Figure_Of_Merit=pd.DataFrame)
class GceFigureOfMerit(Transform.Transform):
def __init__(self, config):
Expand All @@ -31,7 +31,7 @@ def transform(self, data_block):
sys.float_info.max,
)

factory_entries = self.Factory_Entries_GCE(data_block).fillna(0)
factory_entries = self.Factory_Entries(data_block).xs("GCE").fillna(0)
gce_occupancy = self.GCE_Occupancy(data_block).fillna(0)

figures_of_merit = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@Transform.supports_config(Parameter("entry_nersc_map", type=dict, comment="Maps jobs on NERSC to entry name"))
@Transform.consumes(
startd_manifests=pandas.DataFrame, Factory_Entries_LCF=pandas.DataFrame, Nersc_Job_Info=pandas.DataFrame
startd_manifests=pandas.DataFrame, Factory_Entries=pandas.DataFrame, Nersc_Job_Info=pandas.DataFrame
)
@Transform.produces(nersc_userpool_slots_comparison=dict)
class CompareNerscUserpoolSlots(Transform.Transform):
Expand All @@ -34,7 +34,7 @@ def transform(self, data_block):
self.logger.debug("in CompareNerscUserpoolSlots transform")
nersc_df = data_block["Nersc_Job_Info"]
userpool_slots_df = data_block["startd_manifests"]
factory_entry_df = data_block["Factory_Entries_LCF"]
factory_entry_df = data_block["Factory_Entries"].xs("LCF")

# constrain userpool slots with only batch slurm
userpool_slots_df = userpool_slots_df[userpool_slots_df["GLIDEIN_GridType"] == "batch slurm"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from decisionengine_modules.util import figure_of_merit as fom


@Transform.consumes(Nersc_Instance_Performance=pd.DataFrame, Factory_Entries_LCF=pd.DataFrame)
@Transform.consumes(Nersc_Instance_Performance=pd.DataFrame, Factory_Entries=pd.DataFrame)
@Transform.produces(Nersc_Price_Performance=pd.DataFrame, Nersc_Figure_Of_Merit=pd.DataFrame)
class NerscFigureOfMerit(Transform.Transform):
def __init__(self, config):
Expand Down Expand Up @@ -51,7 +51,7 @@ def transform(self, data_block):
sys.float_info.max,
)

factory_entries_lcf = self.Factory_Entries_LCF(data_block)
factory_entries_lcf = self.Factory_Entries(data_block).xs("LCF")

figures_of_merit = []
for _i, row in factory_entries_lcf.iterrows():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ def split_dataframe(df, at):
return df.iloc[:, 0:at], df.iloc[:, at:]


@Publisher.consumes(Factory_Entries=pandas.DataFrame, glideclient_manifests=pandas.DataFrame)
class GlideinWMSManifests(publisher.HTCondorManifests):
def __init__(self, config):
super().__init__(config)
self.queries = config.get("queries", {})
self._consumes = {f"Factory_Entries_{key}": pandas.DataFrame for key in ENTRY_TYPES.keys()}
self._consumes.update(glideclient_manifests=pandas.DataFrame)
self.classad_type = "glideclient"

def publish(self, datablock):
Expand Down Expand Up @@ -50,13 +49,19 @@ def create_invalidate_constraint(self, requests_df):
self.invalidate_ads_constraint[collector_host] = constraint

def dataframe_for_entrytype(self, allow_type, datablock):
requests_df = datablock.get("glideclient_manifests")
facts_df = datablock.get("de_logicengine_facts")

data_product_name = f"Factory_Entries_{allow_type}"
requests_df = datablock.get("glideclient_manifests")
if requests_df.empty:
return pandas.DataFrame()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but is it fairly trivial to add a test to make sure that if the glideclient_manifests data block is empty, we get an empty pandas.DataFrame()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll find a way to cover this.

fact_name = f"allow_{allow_type.lower()}_requests"
entries_df = datablock.get(data_product_name)
if requests_df.empty or entries_df.empty:
entries_df = datablock.get("Factory_Entries")
if not entries_df.index.isin([allow_type], level=0).any():
return pandas.DataFrame()

entries_df = entries_df.xs(allow_type)
if entries_df.empty:
return pandas.DataFrame()

joint_df = requests_df.merge(entries_df, left_on="ReqName", right_on="Name", suffixes=("", "_right"))
Expand Down
15 changes: 4 additions & 11 deletions src/decisionengine_modules/glideinwms/sources/factory_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@
Parameter("max_retries", default=0),
Parameter("retry_interval", default=0),
)
@Source.produces(
Factory_Entries_Grid=pandas.DataFrame,
Factory_Entries_AWS=pandas.DataFrame,
Factory_Entries_GCE=pandas.DataFrame,
Factory_Entries_LCF=pandas.DataFrame,
)
@Source.produces(Factory_Entries=pandas.DataFrame)
class FactoryEntries(Source.Source):
def __init__(self, config):
super().__init__(config)
Expand Down Expand Up @@ -111,12 +106,10 @@ def acquire(self):
if dataframe.empty:
# There were no entry classads in the factory collector or
# quering the collector failed
return {f"Factory_Entries_{key}": pandas.DataFrame() for key in ENTRY_TYPES.keys()}
return {"Factory_Entries": pandas.DataFrame()}

results = {}
for key, value in ENTRY_TYPES.items():
results[f"Factory_Entries_{key}"] = dataframe.loc[dataframe.GLIDEIN_GridType.isin(value)]
return results
dfs = [dataframe.loc[dataframe.GLIDEIN_GridType.isin(value)] for value in ENTRY_TYPES.values()]
return {"Factory_Entries": pandas.concat(dfs, keys=ENTRY_TYPES.keys())}


Source.describe(FactoryEntries)
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
"factoryglobal_manifests",
"job_manifests",
"job_clusters",
"Factory_Entries_LCF",
"Factory_Entries",
"startd_manifests",
"Factory_Entries_AWS",
"Grid_Figure_Of_Merit",
"GCE_Figure_Of_Merit",
"AWS_Figure_Of_Merit",
"Nersc_Figure_Of_Merit",
]

_SUPPORTED_ENTRY_TYPES = ["Factory_Entries_LCF", "Factory_Entries_AWS", "Factory_Entries_Grid", "Factory_Entries_GCE"]
_SUPPORTED_ENTRY_TYPES = ["LCF", "AWS", "Grid", "GCE"]

# TODO: Extend to use following in future
# 'Nersc_Job_Info', 'Nersc_Allocation_Info'
Expand Down Expand Up @@ -77,8 +76,9 @@ def transform(self, datablock):
fe_cfg = self.read_fe_config()
# Get factory global classad dataframe
factory_globals = datablock.get("factoryglobal_manifests")
factory_entries = datablock.get("Factory_Entries")
entries = pandas.DataFrame(
pandas.concat([datablock.get(et) for et in _SUPPORTED_ENTRY_TYPES], ignore_index=True, sort=True)
pandas.concat([factory_entries.xs(et) for et in _SUPPORTED_ENTRY_TYPES], ignore_index=True, sort=True)
)
if entries.empty:
self.logger.info("There are no entries to request resources from")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@Transform.supports_config(Parameter("price_performance", default=1))
@Transform.consumes(Factory_Entries_Grid=pandas.DataFrame)
@Transform.consumes(Factory_Entries=pandas.DataFrame)
@Transform.produces(Grid_Figure_Of_Merit=pandas.DataFrame)
class GridFigureOfMerit(Transform.Transform):
def __init__(self, config):
Expand All @@ -25,7 +25,7 @@ def transform(self, datablock):
"""

self.logger.debug("in GridFigureOfMerit transform")
entries = self.Factory_Entries_Grid(datablock)
entries = self.Factory_Entries(datablock).xs("Grid")
if entries is None:
entries = pandas.DataFrame({ATTR_ENTRYNAME: []})
foms = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
}


@Transform.consumes(Factory_Entries_AWS=pd.DataFrame)
@Transform.consumes(Factory_Entries=pd.DataFrame)
@Transform.produces(aws_instance_limits=pd.DataFrame, spot_occupancy_config=pd.DataFrame)
class AWSFactoryEntryData(Transform.Transform):
def __init__(self, config):
Expand All @@ -23,7 +23,7 @@ def transform(self, datablock):

# Get the dataframe containing AWS entries
self.logger.debug("in AWSFactoryEntryData transform")
aws_entries = self.Factory_Entries_AWS(datablock)
aws_entries = self.Factory_Entries(datablock).xs("AWS")

limits_df = pd.DataFrame()
so_config_dict = {}
Expand Down
8 changes: 8 additions & 0 deletions src/decisionengine_modules/tests/dataframe_for_entrytype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: 2017 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

import pandas as pd


def dataframe_for_entrytype(key, data):
return pd.concat({key: pd.DataFrame(data)})
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ def fe_group_classads_instance():
def test_consumes(fe_group_classads_instance):
consumes = [
"glideclient_manifests",
"Factory_Entries_Grid",
"Factory_Entries_AWS",
"Factory_Entries_GCE",
"Factory_Entries_LCF",
"Factory_Entries",
]
assert fe_group_classads_instance._consumes == dict.fromkeys(consumes, pd.DataFrame)

Expand All @@ -53,10 +50,14 @@ def test_publish(fe_group_classads_instance):
) as publish_to_condor:
datablock = {
"glideclient_manifests": _REQUEST_DF,
"Factory_Entries_Grid": pd.DataFrame({"Name": ["u", "v", "w"], "Other": [1, 2, 3], "CollectorHost": 14.0}),
"Factory_Entries_AWS": pd.DataFrame({"Name": ["x"], "Other": 5}),
"Factory_Entries_GCE": pd.DataFrame({"Name": ["y", "z"], "Other": 7}),
"Factory_Entries_LCF": pd.DataFrame(),
"Factory_Entries": pd.concat(
{
"Grid": pd.DataFrame({"Name": ["u", "v", "w"], "Other": [1, 2, 3], "CollectorHost": 14.0}),
"AWS": pd.DataFrame({"Name": ["x"], "Other": 5}),
"GCE": pd.DataFrame({"Name": ["y", "z"], "Other": 7}),
"LCF": pd.DataFrame(),
}
),
"de_logicengine_facts": pd.DataFrame(
{
"fact_name": [
Expand All @@ -79,6 +80,11 @@ def test_publish(fe_group_classads_instance):
assert actual_df.compare(expected_df).empty


def test_no_glideins(fe_group_classads_instance):
datablock = {"glideclient_manifests": pd.DataFrame()}
assert fe_group_classads_instance.dataframe_for_entrytype("some_entry_type", datablock).empty


def test_create_invalidate_constraint(fe_group_classads_instance):
expected_constraint = {
"col1.com": '(glideinmytype == "glideclient") && (stringlistmember(ClientName, "e1,e2,e3"))',
Expand Down
8 changes: 5 additions & 3 deletions src/decisionengine_modules/tests/test_GceFigureOfMerit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tabulate

from decisionengine_modules.GCE.transforms import GceFigureOfMerit
from decisionengine_modules.tests.dataframe_for_entrytype import dataframe_for_entrytype

DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
CSV_FILE = os.path.join(DATA_DIR, "GceOccupancy.output.fixture.csv")
Expand Down Expand Up @@ -39,16 +40,17 @@
"GCE_Instance_Performance": gce_instance_performance_df.reindex(
columns=("EntryName", "InstanceType", "AvailabilityZone", "OnDemandPrice", "PerfTtbarTotal")
),
"Factory_Entries_GCE": pd.DataFrame(
[
"Factory_Entries": dataframe_for_entrytype(
key="GCE",
data=[
{
"EntryName": "FNAL_HEPCLOUD_GOOGLE_us-central1-a_n1-standard-1",
"GlideinConfigPerEntryMaxIdle": 100,
"GlideinMonitorTotalStatusIdle": 10,
"GlideinConfigPerEntryMaxGlideins": 200,
"GlideinMonitorTotalStatusRunning": 100,
}
]
],
),
"GCE_Occupancy": GCE_OCCUPANCY_DF,
}
Expand Down
8 changes: 5 additions & 3 deletions src/decisionengine_modules/tests/test_NerscFigureOfMerit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from decisionengine.framework.modules.Module import verify_products
from decisionengine_modules.NERSC.transforms import NerscFigureOfMerit
from decisionengine_modules.tests.dataframe_for_entrytype import dataframe_for_entrytype

data_block = {
"Nersc_Instance_Performance": pd.DataFrame(
Expand All @@ -19,16 +20,17 @@
}
]
),
"Factory_Entries_LCF": pd.DataFrame(
[
"Factory_Entries": dataframe_for_entrytype(
key="LCF",
data=[
{
"EntryName": "CMSHTPC_T3_US_NERSC_Cori",
"GlideinConfigPerEntryMaxGlideins": 200,
"GlideinMonitorTotalStatusRunning": 99,
"GlideinConfigPerEntryMaxIdle": 10,
"GlideinMonitorTotalStatusIdle": 9,
}
]
],
),
}

Expand Down
11 changes: 5 additions & 6 deletions src/decisionengine_modules/tests/test_factory_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@

def test_produces():
entries = factory_entries.FactoryEntries(CONFIG_FACTORY_ENTRIES)
produces = {f"Factory_Entries_{entrytype}": pd.DataFrame for entrytype in factory_entries.ENTRY_TYPES.keys()}
assert entries._produces == produces
assert entries._produces == {"Factory_Entries": pd.DataFrame}


def test_acquire():
Expand Down Expand Up @@ -102,7 +101,7 @@ def test_acquire_correctionmap():

entries = factory_entries.FactoryEntries(CONFIG_FACTORY_ENTRIES_CORMAP)
with mock.patch.object(htcondor_query.CondorStatus, "fetch", return_value=utils.input_from_file(FIXTURE_FILE)):
dummypd = entries.acquire()
dummypd2 = dummypd["Factory_Entries_Grid"]
assert df1.equals(dummypd2[["GLIDEIN_Resource_Slots"]])
assert df2.equals(dummypd2[["GLIDEIN_CMSSite"]])
all_entries = entries.acquire()
dummypd = all_entries["Factory_Entries"].xs("Grid")
assert df1.equals(dummypd[["GLIDEIN_Resource_Slots"]])
assert df2.equals(dummypd[["GLIDEIN_CMSSite"]])
5 changes: 2 additions & 3 deletions src/decisionengine_modules/tests/test_grid_figure_of_merit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas

from decisionengine_modules.glideinwms.transforms.grid_figure_of_merit import GridFigureOfMerit
from decisionengine_modules.tests.dataframe_for_entrytype import dataframe_for_entrytype

grid_entries = ["g1", "g2", "g3", "g4", "g5"]
running = [5, 10, 15, 20, 200]
Expand All @@ -21,9 +22,7 @@
"GlideinConfigPerEntryMaxIdle": max_idle,
}

grid_df = pandas.DataFrame(entries)

datablock = {"Factory_Entries_Grid": grid_df}
datablock = {"Factory_Entries": dataframe_for_entrytype(key="Grid", data=entries)}


def test_eligible_resources_with_constraints():
Expand Down