Skip to content

Commit b2a65cf

Browse files
committed
Add new argument to track CPU&GPU utilization and memory usage (#2500)
* add argument to track resource usage * fix bug * fix a bug in a multi gpu case * use total cpu usage * add unit test * add mark to unit test * cover edge case * add pynvml in requirement * align with pre-commit * add license comment * update changelog * refine argument help * align with pre-commit * add version to requirement and raise an error if not supported values are given
1 parent 755ec85 commit b2a65cf

2 files changed

Lines changed: 266 additions & 66 deletions

File tree

src/otx/cli/tools/train.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
get_parser_and_hprams_data,
4444
)
4545
from otx.cli.utils.report import get_otx_report
46-
from otx.cli.utils.experiment import run_process_to_check_resource
4746
from otx.core.data.adapter import get_dataset_adapter
4847

4948

@@ -163,7 +162,11 @@ def get_args():
163162
)
164163
parser.add_argument(
165164
"--track-resource-usage",
166-
action="store_true",
165+
type=str,
166+
default=None,
167+
help="Track resources utilization and max memory usage and save values at the output path. "
168+
"The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. "
169+
"And 'all' is also available for choosing all resource types.",
167170
)
168171

169172
sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True)
@@ -275,9 +278,6 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b
275278
"if main process raises an error, all processes can be stuck."
276279
)
277280

278-
if args.track_resource_usage:
279-
run_process_to_check_resource(config_manager.output_path, exit_stack)
280-
281281
task = task_class(task_environment=environment, output_path=str(config_manager.output_path / "logs"))
282282

283283
output_model = ModelEntity(dataset, environment.get_model_configuration())

src/otx/cli/utils/experiment.py

Lines changed: 261 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,284 @@
1-
"""Utils function for experiments"""
1+
"""Utils function for experiments."""
2+
# Copyright (C) 2023 Intel Corporation
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
25

6+
import logging
37
import multiprocessing as mp
4-
import pynvml
5-
import psutil
6-
import yaml
78
import os
8-
from contextlib import ExitStack
9-
from typing import Union
9+
import time
10+
from abc import ABC, abstractmethod
1011
from pathlib import Path
12+
from statistics import mean
13+
from typing import Any, Dict, List, Optional, Union, no_type_check
1114

15+
import psutil
16+
import yaml
1217

13-
def run_process_to_check_resource(output_dir: Union[str, Path], exit_stack: ExitStack):
14-
if isinstance(output_dir, str):
15-
output_dir = Path(output_dir)
18+
try:
19+
import pynvml
20+
except ImportError:
21+
pynvml = None
1622

17-
gpu_used = os.environ.get("CUDA_VISIBLE_DEVICES", 0)
23+
logger = logging.getLogger(__name__)
24+
GIB = 1024**3
25+
AVAILABLE_RESOURCE_TYPE = ["cpu", "gpu"]
1826

19-
queue = mp.Queue()
20-
mem_check_p = mp.Process(target=check_resource, args=(queue, gpu_used))
21-
mem_check_p.start()
2227

23-
exit_stack.callback(mem_check_proc_callback, mem_check_p, queue, output_dir)
28+
class ResourceTracker:
29+
"""Class to track resources usage.
2430
31+
Args:
32+
resource_type (str, optional): Which resource to track. Available values are cpu, gpu or all now.
33+
Defaults to "all".
34+
gpu_ids (Optional[str]): GPU indices to record.
35+
"""
2536

26-
def mem_check_proc_callback(mem_check_p, queue, output_dir):
27-
queue.put(output_dir)
28-
mem_check_p.join(10)
29-
if mem_check_p.exitcode is None:
30-
mem_check_p.terminate()
31-
mem_check_p.close()
37+
def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None):
38+
if resource_type == "all":
39+
self._resource_type = AVAILABLE_RESOURCE_TYPE
40+
else:
41+
self._resource_type = [val for val in resource_type.split(",")]
3242

43+
gpu_ids_arr = None
44+
if gpu_ids is not None:
45+
gpu_ids_arr = [int(idx) for idx in gpu_ids.split(",")]
46+
gpu_ids_arr[0] = 0
3347

34-
def check_resource(queue: mp.Queue, gpu_idx: int = 0):
35-
pynvml.nvmlInit()
36-
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx)
37-
max_gpu_mem = 0
38-
avg_gpu_util = 0
39-
max_cpu_mem = 0
40-
avg_cpu_util = 0
41-
gib = 1024**3
42-
target_process = psutil.Process().parent()
48+
self._gpu_ids: Union[List[int], None] = gpu_ids_arr
49+
self._mem_check_proc: Union[mp.Process, None] = None
50+
self._queue: Union[mp.Queue, None] = None
4351

44-
num_counts = 0
45-
while True:
46-
# gpu util
47-
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(handle)
48-
avg_gpu_util += gpu_info.gpu
49-
num_counts += 1
52+
def start(self):
53+
"""Run a process which tracks resources usage."""
54+
if self._mem_check_proc is not None:
55+
logger.warning("Resource tracker started already. Please execute start after executing stop.")
56+
return
5057

51-
# gpu mem
52-
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
53-
mem_used = gpu_mem.used / gib
54-
if max_gpu_mem < mem_used:
55-
max_gpu_mem = mem_used
58+
self._queue = mp.Queue()
59+
self._mem_check_proc = mp.Process(
60+
target=_check_resource, args=(self._queue, self._resource_type, self._gpu_ids)
61+
)
62+
self._mem_check_proc.start()
5663

57-
# cpu mem
58-
# cpu_mem = psutil.virtual_memory()[3] / gib
59-
# cpu_mem = target_process.memory_percent()
60-
cpu_mem = target_process.memory_info().rss / gib
61-
if max_cpu_mem < cpu_mem:
62-
max_cpu_mem = cpu_mem
64+
def stop(self, output_path: Union[str, Path]):
65+
"""Terminate a process to record resources usage.
6366
64-
# cpu util
65-
cpu_percent = target_process.cpu_percent()
66-
avg_cpu_util += cpu_percent
67+
Args:
68+
output_path (Union[str, Path]): Output file path to save CPU & GPU utilization and max meory usage values.
69+
"""
70+
if self._mem_check_proc is None or not self._mem_check_proc.is_alive():
71+
return
72+
73+
if isinstance(output_path, str):
74+
output_path = Path(output_path)
75+
76+
self._queue.put(output_path)
77+
self._mem_check_proc.join(10)
78+
if self._mem_check_proc.exitcode is None:
79+
self._mem_check_proc.terminate()
80+
self._mem_check_proc.close()
81+
82+
self._mem_check_proc = None
83+
self._queue = None
84+
85+
86+
def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None):
87+
if resource_types is None:
88+
resource_types = []
89+
90+
trackers: Dict[str, ResourceRecorder] = {}
91+
for resource_type in resource_types:
92+
if resource_type == "cpu":
93+
trackers[resource_type] = CpuUsageRecorder()
94+
elif resource_type == "gpu":
95+
if pynvml is None:
96+
logger.warning("GPU can't be found. Tracking GPU usage is skipped.")
97+
continue
98+
trackers[resource_type] = GpuUsageRecorder(gpu_ids)
99+
else:
100+
raise ValueError(
101+
"Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type)
102+
)
103+
104+
if not trackers:
105+
logger.warning("There is no resource to record.")
106+
return
107+
108+
while True:
109+
for tracker in trackers.values():
110+
tracker.record()
67111

68112
if not queue.empty():
69113
break
70114

71-
pynvml.nvmlShutdown()
115+
time.sleep(0.01)
116+
72117
output_path = Path(queue.get())
73118

74-
with (output_path / "resource_usage.yaml").open("w") as f:
75-
yaml.dump(
76-
{
77-
"max_cpu_mem(GiB)" : round(max_cpu_mem, 2),
78-
"avg_cpu_util(%)" : round(avg_cpu_util / num_counts, 2),
79-
"max_gpu_mem(GiB)" : round(max_gpu_mem, 2),
80-
"avg_gpu_util(%)" : round(avg_gpu_util / num_counts, 2),
81-
},
82-
f,
83-
default_flow_style=False
84-
)
119+
resource_record = {resource_type: tracker.report() for resource_type, tracker in trackers.items()}
120+
with output_path.open("w") as f:
121+
yaml.dump(resource_record, f, default_flow_style=False)
122+
123+
124+
class ResourceRecorder(ABC):
125+
"""Base calss for each resource recorder."""
126+
127+
@abstractmethod
128+
def record(self):
129+
"""Record a resource usage."""
130+
raise NotImplementedError
131+
132+
@abstractmethod
133+
def report(self):
134+
"""Aggregate all resource usages."""
135+
raise NotImplementedError
136+
137+
138+
class CpuUsageRecorder(ResourceRecorder):
139+
"""CPU usage recorder class.
140+
141+
Args:
142+
target_process Optional[psutil.Process]: Process to track.
143+
"""
144+
145+
def __init__(self):
146+
self._record_count: int = 0
147+
self._max_mem: Union[int, float] = 0
148+
self._avg_util: Union[int, float] = 0
149+
self._first_record = True
150+
151+
def record(self):
152+
"""Record CPU usage."""
153+
# cpu mem
154+
memory_info = psutil.virtual_memory()
155+
cpu_mem = (memory_info.total - memory_info.available) / GIB
156+
if self._max_mem < cpu_mem:
157+
self._max_mem = cpu_mem
158+
159+
# cpu util
160+
cpu_percent = psutil.cpu_percent()
161+
if self._first_record:
162+
self._first_record = False
163+
else:
164+
self._avg_util += cpu_percent
165+
self._record_count += 1
166+
167+
def report(self) -> Dict[str, str]:
168+
"""Aggregate CPU usage."""
169+
if self._record_count == 0:
170+
return {}
171+
172+
return {
173+
"max_memory_usage": f"{round(self._max_mem, 2)} GiB",
174+
"avg_util": f"{round(self._avg_util / self._record_count, 2)} %",
175+
}
176+
177+
178+
class GpuUsageRecorder(ResourceRecorder):
179+
"""GPU usage recorder class.
180+
181+
Args:
182+
gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded.
183+
"""
184+
185+
def __init__(self, gpu_ids: Optional[List[int]] = None):
186+
if gpu_ids is None:
187+
gpu_ids = [0]
188+
189+
self._record: Dict[int, Dict[str, Union[int, List[int]]]] = {}
190+
self._gpu_handlers: Dict[int, Any] = {}
191+
192+
pynvml.nvmlInit()
193+
gpu_to_track = self._get_gpu_to_track(gpu_ids)
194+
for gpu_idx in gpu_to_track:
195+
self._record[gpu_idx] = {"max_mem": 0, "util_record": []}
196+
self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx)
197+
198+
def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]:
199+
if "CUDA_VISIBLE_DEVICES" in os.environ:
200+
avaiable_gpus = [int(idx) for idx in os.environ["CUDA_VISIBLE_DEVICES"].split(",")]
201+
else:
202+
avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount()))
203+
return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids]
204+
205+
def record(self):
206+
"""Record GPU usage."""
207+
for gpu_idx, record in self._record.items():
208+
# gpu util
209+
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx])
210+
record["util_record"].append(gpu_info.gpu)
211+
212+
# gpu mem
213+
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx])
214+
mem_used = gpu_mem.used / GIB
215+
if record["max_mem"] < mem_used:
216+
record["max_mem"] = mem_used
217+
218+
@no_type_check
219+
def report(self) -> Dict[str, str]:
220+
"""Aggregate GPU usage."""
221+
if not list(self._record.values())[0]["util_record"]: # record isn't called
222+
return {}
223+
224+
total_max_mem = 0
225+
total_avg_util = 0
226+
gpus_record = self._record.copy()
227+
for gpu_idx in list(gpus_record.keys()):
228+
max_mem = gpus_record[gpu_idx]["max_mem"]
229+
if total_max_mem < max_mem:
230+
total_max_mem = max_mem
231+
232+
# Count utilization after it becomes bigger than 20% of max utilization
233+
max_util = max(gpus_record[gpu_idx]["util_record"])
234+
for idx, util in enumerate(gpus_record[gpu_idx]["util_record"]):
235+
if util * 5 > max_util:
236+
break
237+
avg_util = mean(gpus_record[gpu_idx]["util_record"][idx:])
238+
total_avg_util += avg_util
239+
240+
gpus_record[f"gpu_{gpu_idx}"] = {
241+
"avg_util": f"{round(avg_util, 2)} %",
242+
"max_mem": f"{round(max_mem, 2)} GiB",
243+
}
244+
del gpus_record[gpu_idx]
245+
246+
gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %"
247+
gpus_record["total_max_mem"] = f"{round(total_max_mem, 2)} GiB"
248+
249+
return gpus_record
250+
251+
def __del__(self):
252+
"""Shutdown nvml."""
253+
pynvml.nvmlShutdown()
254+
255+
256+
def set_arguments_to_cmd(command: List[str], key: str, value: Optional[str] = None, start_idx: int = 0, after_params: bool = False):
257+
"""Add arguments at proper position in command.
258+
259+
Args:
260+
keys (str): arguement key.
261+
value (str or None): argument value.
262+
command (List[str]): list includng a otx command entry and arguments.
263+
start_idx (int, optional): find a position to put arguments in after this value. Defaults to 0.
264+
after_params (bool): whether argument should be after `param` or not.
265+
"""
266+
if key in command:
267+
if value is not None:
268+
command[command.index(key) + 1] = value
269+
return
270+
271+
delimiters = ["demo", "deploy", "eval", "explain", "export", "find", "train", "optimize", "build", "run"]
272+
if not after_params:
273+
delimiters.append("params")
274+
275+
for i in range(start_idx, len(command)):
276+
if command[i] in delimiters:
277+
if value is not None:
278+
command.insert(i, value)
279+
command.insert(i, key)
280+
break
281+
else:
282+
command.append(key)
283+
if value is not None:
284+
command.append(value)

0 commit comments

Comments
 (0)