Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def acquire(self):
"channel_name": "test",
"source_channel": "channel_aws_config_data",
"Dataproducts": ["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
"max_attempts": 3,
"retry_interval": 20,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def acquire(self):
"channel_name": "test",
"source_channel": "channel_aws_config_data",
"Dataproducts": ["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
"max_attempts": 3,
"retry_interval": 20,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"channel_name": "test",
"source_channel": "channel_aws_config_data",
"Dataproducts": ["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
"max_attempts": 3,
"retry_interval": 20,
}

account = {"spot_occupancy_config": pd.read_csv(os.path.join(DATA_DIR, "account_config.csv"))}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
config = {
"source_channel": "channel_aws_config_data",
"Dataproducts": ["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
"max_attempts": 3,
"retry_interval": 20,
}

account = {"spot_occupancy_config": pd.read_csv(os.path.join(DATA_DIR, "account_config.csv"))}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, config):
self.max_retries = config.get("max_retries", _MAX_RETRIES)
self.retry_backoff_factor = config.get("retry_backoff_factor", _RETRY_BACKOFF_FACTOR)
self.newt = newt.Newt(
config.get("passwd_file"), num_retries=self.max_retries, retry_backoff_factor=self.retry_backoff_factor
config.get("passwd_file"), max_retries=self.max_retries, retry_backoff_factor=self.retry_backoff_factor
)

def send_query(self):
Expand Down
2 changes: 1 addition & 1 deletion src/decisionengine_modules/NERSC/sources/NerscJobInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, config):
self.max_retries = config.get("max_retries", _MAX_RETRIES)
self.retry_backoff_factor = config.get("retry_backoff_factor", _RETRY_BACKOFF_FACTOR)
self.newt = newt.Newt(
config.get("passwd_file"), num_retries=self.max_retries, retry_backoff_factor=self.retry_backoff_factor
config.get("passwd_file"), max_retries=self.max_retries, retry_backoff_factor=self.retry_backoff_factor
)

def acquire(self):
Expand Down
6 changes: 3 additions & 3 deletions src/decisionengine_modules/NERSC/util/newt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class Newt:
def __init__(self, password_file, newt_base_url=None, num_retries=0, retry_backoff_factor=0):
def __init__(self, password_file, newt_base_url=None, max_retries=0, retry_backoff_factor=0):
"""
Constructor that takes path to password file and
optional Newt base URL
Expand All @@ -34,7 +34,7 @@ def __init__(self, password_file, newt_base_url=None, num_retries=0, retry_backo
if not self.newt_base_url.endswith("/"):
self.newt_base_url += "/"
self.session = requests.Session()
self.num_retries = num_retries
self.max_retries = max_retries
self.retry_backoff_factor = retry_backoff_factor
self.expiration_time = time.time()
self._add_retries_to_session()
Expand All @@ -45,7 +45,7 @@ def _add_retries_to_session(self):
:return: void
"""
retry = Retry(
status=self.num_retries,
status=self.max_retries,
status_forcelist=[500, 502, 503, 504, 507],
backoff_factor=self.retry_backoff_factor,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
}
""",
),
Parameter("nretries", default=0),
Parameter("max_retries", default=0),
Parameter("retry_interval", default=0),
)
@Source.produces(
Expand All @@ -43,9 +43,9 @@ def __init__(self, config):
"Factory_Entries_LCF": ("batch slurm",),
}

# The combination of nretries=10 and retry_interval=2 adds up to just
# The combination of max_retries=10 and retry_interval=2 adds up to just
# over 15 minutes
self.nretries = config.get("nretries", 0)
self.max_retries = config.get("max_retries", 0)
self.retry_interval = config.get("retry_interval", 0)

self.subsystem_name = "any"
Expand Down Expand Up @@ -76,7 +76,7 @@ def acquire(self):

retry_wrapper(
partial(condor_status.load, *(constraint, classad_attrs, self.condor_config)),
nretries=self.nretries,
max_retries=self.max_retries,
retry_interval=self.retry_interval,
logger=self.logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
}
""",
),
Parameter("nretries", default=0),
Parameter("max_retries", default=0),
Parameter("retry_interval", default=0),
)
@Source.produces(factoryglobal_manifests=pandas.DataFrame)
Expand All @@ -37,9 +37,9 @@ def __init__(self, config):
self.condor_config = config.get("condor_config")
self.factories = config.get("factories", [])

# The combination of nretries=10 and retry_interval=2 adds up to just
# The combination of max_retries=10 and retry_interval=2 adds up to just
# over 15 minutes
self.nretries = config.get("nretries", 0)
self.max_retries = config.get("max_retries", 0)
self.retry_interval = config.get("retry_interval", 0)

self.subsystem_name = "any"
Expand Down Expand Up @@ -69,7 +69,7 @@ def acquire(self):

retry_wrapper(
partial(condor_status.load, *(constraint, classad_attrs, self.condor_config)),
nretries=self.nretries,
max_retries=self.max_retries,
retry_interval=self.retry_interval,
logger=self.logger,
)
Expand Down
6 changes: 3 additions & 3 deletions src/decisionengine_modules/htcondor/publishers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
@Publisher.supports_config(
Parameter("condor_config", type=str),
Parameter("x509_user_proxy", type=str),
Parameter("nretries", type=int),
Parameter("max_retries", type=int),
Parameter("retry_interval", type=int, comment="Number of seconds to wait between retries."),
)
class HTCondorManifests(Publisher.Publisher, metaclass=abc.ABCMeta):
def __init__(self, config):
super().__init__(config)
self.condor_config = config.get("condor_config")
self.x509_user_proxy = config.get("x509_user_proxy")
self.nretries = config.get("nretries")
self.max_retries = config.get("max_retries")
self.retry_interval = config.get("retry_interval")
self.update_ad_command = DEFAULT_UPDATE_AD_COMMAND
self.invalidate_ad_command = DEFAULT_INVALIDATE_AD_COMMAND
Expand Down Expand Up @@ -113,7 +113,7 @@ def condor_advertise(self, classads, collector_host=None, update_ad_command=DEFA
classads,
**{"collector_host": collector_host, "update_ad_command": update_ad_command},
),
self.nretries,
self.max_retries,
self.retry_interval,
logger=self.logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,62 @@
"module" : "decisionengine_modules.AWS.sources.AWSSpotPriceWithSourceProxy",
"name" : "AWSSpotPrice",
"parameters": {"channel_name": "channel_aws_config_data",
"Dataproducts":["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
},
"Dataproducts":["spot_occupancy_config"],
"max_attempts": 3,
"retry_interval": 20,
},
"schedule": 300,
},
"AWSOccupancy" : {
"module" : "decisionengine_modules.AWS.sources.AWSOccupancyWithSourceProxy",
"name" : "AWSOccupancy",
"parameters": {"channel_name": "channel_aws_config_data",
"Dataproducts":["spot_occupancy_config"],
"retries": 3,
"retry_timeout": 20,
},
"Dataproducts":["spot_occupancy_config"],
"max_attempts": 3,
"retry_interval": 20,
},
"schedule": 320,
},
"AWSInstancePerformance" : {
"module" : "decisionengine_modules.AWS.sources.AWSInstancePerformance",
"name" : "AWSInstancePerformance",
"parameters": {
"data_file": "%s/de_data/instance_performance.csv"%(os.environ.get('HOME'),),
"data_file": "%s/de_data/instance_performance.csv"%(os.environ.get('HOME'),),
},
"schedule": 360,
},
"AWSJobLimits" : {
"module" : "decisionengine_modules.source_proxy",
"name" : "SourceProxy",
"parameters": {"channel_name": "channel_aws_config_data",
"Dataproducts":[("aws_instance_limits", "Job_Limits")],
"retries": 3,
"retry_timeout": 20,
},
"schedule": 360,
"Dataproducts":[("aws_instance_limits", "Job_Limits")],
"max_attempts": 3,
"retry_interval": 20,
},
"schedule": 360,
},
},
"transforms" : {
"FigureOfMerit": {
"module" : "decisionengine_modules.AWS.transforms.FigureOfMerit",
"name": "FigureOfMerit",
"parameters" : {},
},
"name": "FigureOfMerit",
"parameters" : {},
},
},
"logicengines" : {
"logicengine1" : {
"module" : "framework.logicengine.LogicEngine",
"name" : "LogicEngine",
"parameters" : {
"rules": {"allow_to_publish_AWS": {
"expression":"(allow_AWS)",
"actions":["AWSFigureOfMerit", "AWSPricePerformance"],
"facts":["allow_AWS"],
},
},
"expression":"(allow_AWS)",
"actions":["AWSFigureOfMerit", "AWSPricePerformance"],
"facts":["allow_AWS"],
},
},
"facts" : {
"allow_AWS":"(True)"
},
"allow_AWS":"(True)"
},
},
},
},
Expand All @@ -68,23 +68,23 @@
"module" : "decisionengine_modules.AWS.publishers.AWS_price_performance",
"name" : "AWSPricePerformancePublisher",
"parameters" : {"publish_to_graphite" : True,
#"graphite_host": "fifemondata.fnal.gov",
"graphite_host": "lsdataitb.fnal.gov",
#"graphite_port": 2104,
"graphite_port": 2004,
"graphite_context":"hepcloud.aws",
"output_file": "%s/de_data/AWS_price_perf.csv"%(os.environ.get('HOME'),),},
#"graphite_host": "fifemondata.fnal.gov",
"graphite_host": "lsdataitb.fnal.gov",
#"graphite_port": 2104,
"graphite_port": 2004,
"graphite_context":"hepcloud.aws",
"output_file": "%s/de_data/AWS_price_perf.csv"%(os.environ.get('HOME'),),},
},
"AWSFigureOfMerit" : {
"module" : "decisionengine_modules.AWS.publishers.AWS_figure_of_merit",
"name" : "AWSFOMPublisher",
"parameters" : {"publish_to_graphite" : True,
#"graphite_host": "fifemondata.fnal.gov",
"graphite_host": "lsdataitb.fnal.gov",
#"graphite_port": 2104,
"graphite_port": 2004,
"graphite_context":"hepcloud.aws",
"output_file": "%s/de_data/AWS_figure_of_merit.csv"%(os.environ.get('HOME'),),},
#"graphite_host": "fifemondata.fnal.gov",
"graphite_host": "lsdataitb.fnal.gov",
#"graphite_port": 2104,
"graphite_port": 2004,
"graphite_context":"hepcloud.aws",
"output_file": "%s/de_data/AWS_figure_of_merit.csv"%(os.environ.get('HOME'),),},
}
},
"task_manager": {"data_TO": 60}
Expand Down
4 changes: 2 additions & 2 deletions src/decisionengine_modules/tests/test_factory_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
CONFIG_FACTORY_ENTRIES_BAD_WITH_TIMEOUT = {
"channel_name": "test",
"condor_config": "condor_config",
"nretries": 2,
"max_retries": 2,
"retry_interval": 2,
"factories": [
{
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_acquire_bad_with_timeout(self):
start = time.time()
result = entries.acquire()
end = time.time()
# Set by tuning nretries and the retry_interval
# Set by tuning max_retries and the retry_interval
assert end - start > 5
for df in result.values():
assert df.dropna().empty is True
Expand Down
4 changes: 2 additions & 2 deletions src/decisionengine_modules/tests/test_factory_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
CONFIG_BAD_WITH_TIMEOUT = {
"channel_name": "test",
"condor_config": "condor_config",
"nretries": 2,
"max_retries": 2,
"retry_interval": 2,
"factories": [
{
Expand Down Expand Up @@ -71,6 +71,6 @@ def test_acquire_bad_with_timeout(self):
start = time.time()
fg_df = fg.acquire()
end = time.time()
# Set by tuning nretries and the retry_interval
# Set by tuning max_retries and the retry_interval
assert end - start > 5
assert (fg_df["factoryglobal_manifests"] is None) or (len(fg_df["factoryglobal_manifests"]) == 0)
14 changes: 7 additions & 7 deletions src/decisionengine_modules/util/retry_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
from functools import partial, wraps


def retry_on_error(nretries=1, retry_interval=2, backoff=True):
def retry_on_error(max_retries=1, retry_interval=2, backoff=True):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
return retry_wrapper(partial(f, *args, **kwargs), nretries, retry_interval, backoff)
return retry_wrapper(partial(f, *args, **kwargs), max_retries, retry_interval, backoff)

return wrapper

return decorator


def retry_wrapper(f, nretries=1, retry_interval=2, backoff=True, logger=None):
"""Retry on error with parameters of how many times (nretries)
def retry_wrapper(f, max_retries=1, retry_interval=2, backoff=True, logger=None):
"""Retry on error with parameters of how many times (max_retries)
and interval in seconds (retry_interval).
If the function to be decorated is an instance method
and the values come from a configuration,
Expand All @@ -24,7 +24,7 @@ def retry_wrapper(f, nretries=1, retry_interval=2, backoff=True, logger=None):
Otherwise, use the default values or pass new values to the decorator.
"""
time2sleep = retry_interval
for i in range(nretries + 1):
for i in range(max_retries + 1):
try:
return f()
except Exception as e:
Expand All @@ -33,13 +33,13 @@ def retry_wrapper(f, nretries=1, retry_interval=2, backoff=True, logger=None):
fname = f.__name__
elif hasattr(f, "func"):
fname = f.func.__name__
if i == nretries:
if i == max_retries:
if logger is not None:
logger.exception(f"Error Function {fname} giving up with {e} after {i} retries")
raise e
if logger is not None:
logger.warning(
f"Function {fname} failed with {e} on try {i}/{nretries}. Sleeping {time2sleep:d} seconds"
f"Function {fname} failed with {e} on try {i}/{max_retries}. Sleeping {time2sleep:d} seconds"
)
time.sleep(time2sleep)
if backoff:
Expand Down
Loading