@@ -300,6 +300,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
300300 # Clean up on failure
301301 if etcd_process is not None and etcd_process .poll () is None :
302302 etcd_process .terminate ()
303+ try :
304+ etcd_process .wait (timeout = 5 )
305+ except subprocess .TimeoutExpired :
306+ etcd_process .kill ()
307+ etcd_process .wait ()
303308 if etcd_data_dir is not None :
304309 try :
305310 shutil .rmtree (etcd_data_dir , ignore_errors = True )
@@ -481,6 +486,7 @@ def close():
481486 etcd_process .wait (timeout = 5 )
482487 except subprocess .TimeoutExpired :
483488 etcd_process .kill ()
489+ etcd_process .wait ()
484490
485491 # Clean up etcd data directory
486492 if etcd_data_dir is not None and os .path .exists (etcd_data_dir ):
@@ -493,12 +499,21 @@ def close():
493499 # Stop datasystem worker via dscli command
494500 if worker_address :
495501 try :
496- subprocess .run (
502+ result = subprocess .run (
497503 ["dscli" , "stop" , "--worker_address" , worker_address ],
498504 timeout = 90 ,
499505 capture_output = True ,
500506 )
501- logger .info (f"Stopped datasystem worker at { worker_address } via dscli stop" )
507+ if result .returncode == 0 :
508+ logger .info (f"Stopped datasystem worker at { worker_address } via dscli stop" )
509+ else :
510+ error_msg = (result .stderr or result .stdout or b"" ).decode ()
511+ logger .warning (
512+ f"Failed to stop datasystem worker at { worker_address } . "
513+ f"Return code: { result .returncode } , Error: { error_msg } "
514+ )
515+ except subprocess .TimeoutExpired as err :
516+ logger .warning (f"dscli stop timed out for { worker_address } : { err } " )
502517 except Exception as e :
503518 logger .warning (f"Failed to stop datasystem worker via dscli: { e } " )
504519 else :
0 commit comments