1717import math
1818import os
1919import shutil
20+ import socket
2021import subprocess
2122import tempfile
2223import time
@@ -218,23 +219,18 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
218219 start_new_session = True ,
219220 )
220221 time .sleep (3 ) # Wait for etcd to start
221- # TODO: check if etcd is healthy
222- etcd_is_healthy = etcd_process .poll () is None
223- # check
224- #
225- #
226- #
227- #
228- if not etcd_is_healthy :
229- # etcd exited immediately, indicate failure
230- # Clean up data directory on failure
231- try :
232- shutil .rmtree (etcd_data_dir , ignore_errors = True )
233- except Exception :
234- pass
222+
223+ if etcd_process .poll () is None :
224+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
225+ sock .settimeout (2 )
226+ result = sock .connect_ex ((host , port ))
227+ sock .close ()
228+ if result != 0 :
229+ raise RuntimeError (f"etcd process started but not listening on { host } :{ port } " )
230+ else :
235231 raise RuntimeError (f"etcd exited immediately with return code { etcd_process .returncode } " )
236232
237- # Wait a moment for etcd to be ready
233+ logger . info ( f" etcd started, PID: { etcd_process . pid } " )
238234 time .sleep (2 )
239235
240236 # ========== Start datasystem worker ==========
@@ -260,7 +256,7 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
260256 timeout = 90 ,
261257 )
262258 except subprocess .TimeoutExpired as err :
263- raise RuntimeError ("dscli start timed out" ) from err
259+ raise RuntimeError (f "dscli start timed out: { err } " ) from err
264260 # Wait for dscli to start and exit (it starts worker and exits)
265261 if ds_result .returncode == 0 and "[ OK ]" in ds_result .stdout :
266262 logger .info (f"dscli started Yuanrong datasystem worker at { worker_address } successfully." )
@@ -476,7 +472,7 @@ def close():
476472 try :
477473 subprocess .run (
478474 ["dscli" , "stop" , "--worker_address" , worker_address ],
479- timeout = 5 ,
475+ timeout = 90 ,
480476 capture_output = True ,
481477 )
482478 logger .info (f"Stopped datasystem worker at { worker_address } via dscli stop" )
0 commit comments