Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
052fcd6
Introduce mypy check with loose settings
sakurai-youhei Nov 6, 2023
b81e380
Fix linter errors
sakurai-youhei Nov 6, 2023
9e57183
Type section to incomplete literal
sakurai-youhei Nov 6, 2023
bf2953b
Fix build failure
sakurai-youhei Nov 6, 2023
0d6d14a
Fix linter errors
sakurai-youhei Nov 6, 2023
bac38d4
Type section and key to literal
sakurai-youhei Nov 6, 2023
9366470
Shrink scope of mypy use
sakurai-youhei Nov 14, 2023
c4ee079
Revert unscoped changes
sakurai-youhei Nov 14, 2023
6d54889
Add test to check literal args
sakurai-youhei Nov 14, 2023
7335dc3
Add ignorance and disablement on out-of-scope checks
sakurai-youhei Nov 14, 2023
32afeba
Add more sections and keys
sakurai-youhei Nov 14, 2023
d9be250
Suppress pylint C0301
sakurai-youhei Nov 14, 2023
ba773be
Fix pylint W0640
sakurai-youhei Nov 14, 2023
7ae2540
Change variable names for better code semantic
sakurai-youhei Nov 15, 2023
97f01a0
Improve what noticed in self-review
sakurai-youhei Nov 15, 2023
fe56308
Replace Union with Optional
sakurai-youhei Nov 21, 2023
effa4d1
Split types from config module
sakurai-youhei Nov 22, 2023
e708324
Adjustments
sakurai-youhei Nov 22, 2023
e55b987
Change how to put ignore comment
sakurai-youhei Nov 22, 2023
d0c6dc5
Fix lint errors
sakurai-youhei Nov 22, 2023
ac399db
Fix test failure on Python 3.11
sakurai-youhei Nov 22, 2023
54d04c8
Minimize type.Config
sakurai-youhei Nov 23, 2023
cd17bfb
Refine TestConfigTypeHint
sakurai-youhei Nov 23, 2023
5895954
Flip the right and lest of equation
sakurai-youhei Nov 23, 2023
4693085
Consistent ignore[arg-type]
sakurai-youhei Nov 23, 2023
43ccbaa
Merge branch 'master' into literal-check-by-mypy
sakurai-youhei Jan 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ repos:
hooks:
- id: isort

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.6.1
hooks:
- id: mypy
args: [
"--config",
"pyproject.toml"
]

- repo: local
hooks:
- id: pylint
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/track/bulk_params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def seek(self, offset):
pass

def read(self):
return "\n".join(self.contents)
return "\n".join(self.contents) # type: ignore[arg-type] # TODO remove this ignore when introducing type hints

def readline(self):
return self.contents
Expand Down
22 changes: 11 additions & 11 deletions esrally/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from enum import Enum
from string import Template

from esrally import PROGRAM_NAME, exceptions, paths
from esrally import PROGRAM_NAME, exceptions, paths, types
from esrally.utils import io


Expand Down Expand Up @@ -50,7 +50,7 @@ def present(self):
"""
return os.path.isfile(self.location)

def load(self):
def load(self) -> configparser.ConfigParser:
config = configparser.ConfigParser()
config.read(self.location, encoding="utf-8")
return config
Expand All @@ -66,7 +66,7 @@ def store_default_config(self, template_path=None):
contents = src.read()
target.write(Template(contents).substitute(CONFIG_DIR=self.config_dir))

def store(self, config):
def store(self, config: configparser.ConfigParser):
io.ensure_dir(self.config_dir)
with open(self.location, "w", encoding="utf-8") as configfile:
config.write(configfile)
Expand All @@ -89,7 +89,7 @@ def location(self):
return os.path.join(self.config_dir, f"rally{config_name_suffix}.ini")


def auto_load_local_config(base_config, additional_sections=None, config_file_class=ConfigFile, **kwargs):
def auto_load_local_config(base_config, additional_sections=None, config_file_class=ConfigFile, **kwargs) -> types.Config:
"""
Loads a node-local configuration based on a ``base_config``. If an appropriate node-local configuration file is present, it will be
used (and potentially upgraded to the newest config version). Otherwise, a new one will be created and as many settings as possible
Expand Down Expand Up @@ -138,7 +138,7 @@ def __init__(self, config_name=None, config_file_class=ConfigFile, **kwargs):
self._opts = {}
self._clear_config()

def add(self, scope, section, key, value):
def add(self, scope, section: types.Section, key: types.Key, value):
"""
Adds or overrides a new configuration property.

Expand All @@ -149,7 +149,7 @@ def add(self, scope, section, key, value):
"""
self._opts[self._k(scope, section, key)] = value

def add_all(self, source, section):
def add_all(self, source, section: types.Section):
"""
Adds all config items within the given `section` from the `source` config object.

Expand All @@ -162,7 +162,7 @@ def add_all(self, source, section):
if source_section == section:
self.add(scope, source_section, key, v)

def opts(self, section, key, default_value=None, mandatory=True):
def opts(self, section: types.Section, key: types.Key, default_value=None, mandatory=True):
"""
Resolves a configuration property.

Expand All @@ -182,7 +182,7 @@ def opts(self, section, key, default_value=None, mandatory=True):
else:
raise exceptions.ConfigError(f"No value for mandatory configuration: section='{section}', key='{key}'")

def all_opts(self, section):
def all_opts(self, section: types.Section):
"""
Finds all options in a section and returns them in a dict.

Expand All @@ -200,7 +200,7 @@ def all_opts(self, section):
scopes_per_key[key] = scope
return opts_in_section

def exists(self, section, key):
def exists(self, section: types.Section, key: types.Key):
"""
:param section: The configuration section.
:param key: The configuration key.
Expand Down Expand Up @@ -261,7 +261,7 @@ def _stored_config_version(self):
return int(self.opts("meta", "config.version", default_value=0, mandatory=False))

# recursively find the most narrow scope for a key
def _resolve_scope(self, section, key, start_from=Scope.invocation):
def _resolve_scope(self, section: types.Section, key: types.Key, start_from=Scope.invocation):
if self._k(start_from, section, key) in self._opts:
return start_from
elif start_from == Scope.application:
Expand All @@ -270,7 +270,7 @@ def _resolve_scope(self, section, key, start_from=Scope.invocation):
# continue search in the enclosing scope
return self._resolve_scope(section, key, Scope(start_from.value - 1))

def _k(self, scope, section, key):
def _k(self, scope, section: types.Section, key: types.Key):
if scope is None or scope == Scope.application:
return Scope.application, section, key
else:
Expand Down
48 changes: 29 additions & 19 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from typing import Callable
from typing import Callable, Optional

import thespian.actors

Expand All @@ -44,6 +44,7 @@
paths,
telemetry,
track,
types,
)
from esrally.client import delete_api_keys
from esrally.driver import runner, scheduler
Expand All @@ -61,7 +62,7 @@ class PrepareBenchmark:
Initiates preparation steps for a benchmark. The benchmark should only be started after StartBenchmark is sent.
"""

def __init__(self, config, track):
def __init__(self, config: types.Config, track):
"""
:param config: Rally internal configuration object.
:param track: The track to use.
Expand All @@ -79,7 +80,7 @@ class Bootstrap:
Prompts loading of track code on new actors
"""

def __init__(self, cfg, worker_id=None):
def __init__(self, cfg: types.Config, worker_id=None):
self.config = cfg
self.worker_id = worker_id

Expand All @@ -102,13 +103,13 @@ class TrackPrepared:


class StartTaskLoop:
def __init__(self, track_name, cfg):
def __init__(self, track_name, cfg: types.Config):
self.track_name = track_name
self.cfg = cfg


class DoTask:
def __init__(self, task, cfg):
def __init__(self, task, cfg: types.Config):
self.task = task
self.cfg = cfg

Expand Down Expand Up @@ -143,7 +144,7 @@ class StartWorker:
Starts a worker.
"""

def __init__(self, worker_id, config, track, client_allocations, client_contexts):
def __init__(self, worker_id, config: types.Config, track, client_allocations, client_contexts):
"""
:param worker_id: Unique (numeric) id of the worker.
:param config: Rally internal configuration object.
Expand Down Expand Up @@ -306,12 +307,12 @@ def receiveMsg_WakeupMessage(self, msg, sender):
self.driver.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

def create_client(self, host, cfg, worker_id):
def create_client(self, host, cfg: types.Config, worker_id):
worker = self.createActor(Worker, targetActorRequirements=self._requirements(host))
self.send(worker, Bootstrap(cfg, worker_id))
return worker

def start_worker(self, driver, worker_id, cfg, track, allocations, client_contexts=None):
def start_worker(self, driver, worker_id, cfg: types.Config, track, allocations, client_contexts=None):
self.send(driver, StartWorker(worker_id, cfg, track, allocations, client_contexts))

def drive_at(self, driver, client_start_timestamp):
Expand All @@ -333,7 +334,7 @@ def _requirements(self, host):
else:
return {"ip": host}

def prepare_track(self, hosts, cfg, track):
def prepare_track(self, hosts, cfg: types.Config, track):
self.track = track
self.logger.info("Starting prepare track process on hosts [%s]", hosts)
self.children = [self._create_track_preparator(h) for h in hosts]
Expand Down Expand Up @@ -373,7 +374,7 @@ def on_benchmark_complete(self, metrics):
self.send(self.benchmark_actor, BenchmarkComplete(metrics))


def load_local_config(coordinator_config):
def load_local_config(coordinator_config) -> types.Config:
cfg = config.auto_load_local_config(
coordinator_config,
additional_sections=[
Expand Down Expand Up @@ -404,7 +405,7 @@ def __init__(self):
self.task_preparation_actor = None
self.logger = logging.getLogger(__name__)
self.track_name = None
self.cfg = None
self.cfg: Optional[types.Config] = None

@actor.no_retry("task executor") # pylint: disable=no-value-for-parameter
def receiveMsg_StartTaskLoop(self, msg, sender):
Expand Down Expand Up @@ -471,7 +472,7 @@ def __init__(self):
self.status = self.Status.INITIALIZING
self.children = []
self.tasks = []
self.cfg = None
self.cfg: Optional[types.Config] = None
self.data_root_dir = None
self.track = None

Expand Down Expand Up @@ -501,6 +502,7 @@ def receiveMsg_BenchmarkFailure(self, msg, sender):

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_PrepareTrack(self, msg, sender):
assert self.cfg is not None
self.data_root_dir = self.cfg.opts("benchmarks", "local.dataset.cache")
tpr = TrackProcessorRegistry(self.cfg)
self.track = msg.track
Expand All @@ -520,6 +522,7 @@ def receiveMsg_PrepareTrack(self, msg, sender):
)

def resume(self):
assert self.cfg is not None
if not self.processors.empty():
self._seed_tasks(self.processors.get())
self.send_to_children_and_transition(
Expand All @@ -536,6 +539,7 @@ def _create_task_executor(self):

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_ReadyForWork(self, msg, task_execution_actor):
assert self.cfg is not None
if self.tasks:
next_task = self.tasks.pop()
else:
Expand All @@ -549,7 +553,7 @@ def receiveMsg_WorkerIdle(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, self.Status.PROCESSOR_RUNNING, self.Status.PROCESSOR_COMPLETE, self.resume)


def num_cores(cfg):
def num_cores(cfg: types.Config):
return int(cfg.opts("system", "available.cores", mandatory=False, default_value=multiprocessing.cpu_count()))


Expand All @@ -560,11 +564,11 @@ def num_cores(cfg):
class ClientContext:
client_id: int
parent_worker_id: int
api_key: ApiKey = None
api_key: Optional[ApiKey] = None


class Driver:
def __init__(self, driver_actor, config, es_client_factory_class=client.EsClientFactory):
def __init__(self, driver_actor, config: types.Config, es_client_factory_class=client.EsClientFactory):
"""
Coordinates all workers. It is technology-agnostic, i.e. it does not know anything about actors. To allow us to hook in an actor,
we provide a ``target`` parameter which will be called whenever some event has occurred. The ``target`` can use this to send
Expand Down Expand Up @@ -772,7 +776,11 @@ def start_benchmark(self):
self.number_of_steps = len(allocator.join_points) - 1
self.tasks_per_join_point = allocator.tasks_per_joinpoint

self.logger.info("Benchmark consists of [%d] steps executed by [%d] clients.", self.number_of_steps, len(self.allocations))
self.logger.info(
"Benchmark consists of [%d] steps executed by [%d] clients.",
self.number_of_steps,
len(self.allocations), # type: ignore[arg-type] # TODO remove the below ignore when introducing type hints
)
# avoid flooding the log if there are too many clients
if allocator.clients < 128:
self.logger.debug("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))
Expand Down Expand Up @@ -1209,7 +1217,7 @@ def __init__(self):
super().__init__()
self.driver_actor = None
self.worker_id = None
self.config = None
self.config: Optional[types.Config] = None
self.track = None
self.client_allocations = None
self.client_contexts = None
Expand Down Expand Up @@ -1239,6 +1247,7 @@ def receiveMsg_Bootstrap(self, msg, sender):

@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_StartWorker(self, msg, sender):
assert self.config is not None
self.logger.info("Worker[%d] is about to start.", msg.worker_id)
self.on_error = self.config.opts("driver", "on.error")
self.sample_queue_size = int(self.config.opts("reporting", "sample.queue.size", mandatory=False, default_value=1 << 20))
Expand Down Expand Up @@ -1343,6 +1352,7 @@ def receiveUnrecognizedMessage(self, msg, sender):
self.logger.debug("Worker[%d] received unknown message [%s] (ignoring).", self.worker_id, str(msg))

def drive(self):
assert self.config is not None
task_allocations = self.current_tasks_and_advance()
# skip non-tasks in the task list
while len(task_allocations) == 0:
Expand Down Expand Up @@ -1568,7 +1578,7 @@ def _merge(self, *args):
return result


def select_challenge(config, t):
def select_challenge(config: types.Config, t):
challenge_name = config.opts("track", "challenge.name")
selected_challenge = t.find_challenge_or_default(challenge_name)

Expand Down Expand Up @@ -1742,7 +1752,7 @@ def map_task_throughput(self, current_samples):


class AsyncIoAdapter:
def __init__(self, cfg, track, task_allocations, sampler, cancel, complete, abort_on_error, client_contexts, worker_id):
def __init__(self, cfg: types.Config, track, task_allocations, sampler, cancel, complete, abort_on_error, client_contexts, worker_id):
self.cfg = cfg
self.track = track
self.task_allocations = task_allocations
Expand Down
Loading