2020
2121import static org .apache .hadoop .yarn .conf .YarnConfiguration .DEFAULT_NM_REMOTE_APP_LOG_DIR ;
2222import static org .apache .hadoop .yarn .conf .YarnConfiguration .NM_REMOTE_APP_LOG_DIR ;
23+ import static org .apache .tez .dag .api .TezConfiguration .TEZ_THREAD_DUMP_INITIAL_DELAY ;
24+ import static org .apache .tez .dag .api .TezConfiguration .TEZ_THREAD_DUMP_INITIAL_DELAY_DEFAULT ;
2325import static org .apache .tez .dag .api .TezConfiguration .TEZ_THREAD_DUMP_INTERVAL ;
2426import static org .apache .tez .dag .api .TezConfiguration .TEZ_THREAD_DUMP_INTERVAL_DEFAULT ;
2527
2830import java .lang .management .ManagementFactory ;
2931import java .lang .management .ThreadInfo ;
3032import java .lang .management .ThreadMXBean ;
33+ import java .nio .charset .StandardCharsets ;
3134import java .util .concurrent .Executors ;
3235import java .util .concurrent .ScheduledExecutorService ;
3336import java .util .concurrent .TimeUnit ;
4952
5053public class TezThreadDumpHelper {
5154
52- private final long duration ;
55+ private final long threadDumpFrequency ;
56+ private final long threadDumpInitialDelay ;
5357 private final Path basePath ;
5458 private final FileSystem fs ;
5559
@@ -58,8 +62,10 @@ public class TezThreadDumpHelper {
5862
5963 private ScheduledExecutorService periodicThreadDumpServiceExecutor ;
6064
61- private TezThreadDumpHelper (long duration , Configuration conf ) throws IOException {
62- this .duration = duration ;
65+ private TezThreadDumpHelper (long threadDumpFrequency , long threadDumpInitialDelay , Configuration conf )
66+ throws IOException {
67+ this .threadDumpFrequency = threadDumpFrequency ;
68+ this .threadDumpInitialDelay = threadDumpInitialDelay ;
6369 Appender appender = org .apache .log4j .Logger .getRootLogger ().getAppender (TezConstants .TEZ_CONTAINER_LOGGER_NAME );
6470 if (appender instanceof TezContainerLogAppender ) {
6571 this .basePath = new Path (((TezContainerLogAppender ) appender ).getContainerLogDir ());
@@ -69,18 +75,21 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio
6975 this .basePath = new Path (conf .get (NM_REMOTE_APP_LOG_DIR , DEFAULT_NM_REMOTE_APP_LOG_DIR ));
7076 this .fs = this .basePath .getFileSystem (conf );
7177 }
72- LOG .info ("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " +
73- "path: {}" , duration , basePath );
78+ LOG .info ("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at "
79+ + "path: {} with an initial delay of {} " , threadDumpFrequency , basePath , threadDumpInitialDelay );
7480 }
7581
7682 public static TezThreadDumpHelper getInstance (Configuration conf ) {
77- long periodicThreadDumpFrequency = conf .getTimeDuration (TEZ_THREAD_DUMP_INTERVAL ,
78- TEZ_THREAD_DUMP_INTERVAL_DEFAULT , TimeUnit .MILLISECONDS );
79- Preconditions .checkArgument (periodicThreadDumpFrequency > 0 , "%s must be positive duration" ,
80- TEZ_THREAD_DUMP_INTERVAL );
83+ long threadDumpFrequency =
84+ conf .getTimeDuration (TEZ_THREAD_DUMP_INTERVAL , TEZ_THREAD_DUMP_INTERVAL_DEFAULT , TimeUnit .MILLISECONDS );
85+ Preconditions .checkArgument (threadDumpFrequency > 0 , "%s must be positive duration" , TEZ_THREAD_DUMP_INTERVAL );
86+ long threadDumpInitialDelay =
87+ conf .getTimeDuration (TEZ_THREAD_DUMP_INITIAL_DELAY , TEZ_THREAD_DUMP_INITIAL_DELAY_DEFAULT ,
88+ TimeUnit .MILLISECONDS );
89+ Preconditions .checkArgument (threadDumpInitialDelay >= 0 , "%s can not be negative" , TEZ_THREAD_DUMP_INITIAL_DELAY );
8190
8291 try {
83- return new TezThreadDumpHelper (periodicThreadDumpFrequency , conf );
92+ return new TezThreadDumpHelper (threadDumpFrequency , threadDumpInitialDelay , conf );
8493 } catch (IOException e ) {
8594 throw new TezUncheckedException ("Can not initialize periodic thread dump service" , e );
8695 }
@@ -91,7 +100,8 @@ public TezThreadDumpHelper start(String name) {
91100 new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("PeriodicThreadDumpService{" + name + "} #%d" )
92101 .build ());
93102 Runnable threadDumpCollector = new ThreadDumpCollector (basePath , name , fs );
94- periodicThreadDumpServiceExecutor .schedule (threadDumpCollector , duration , TimeUnit .MILLISECONDS );
103+ periodicThreadDumpServiceExecutor .scheduleWithFixedDelay (threadDumpCollector , threadDumpInitialDelay ,
104+ threadDumpFrequency , TimeUnit .MILLISECONDS );
95105 return this ;
96106 }
97107
@@ -128,7 +138,7 @@ public void run() {
128138 if (!Thread .interrupted ()) {
129139 try (FSDataOutputStream fsStream = fs .create (
130140 new Path (path , name + "_" + System .currentTimeMillis () + ".jstack" ));
131- PrintStream printStream = new PrintStream (fsStream , false , "UTF8" )) {
141+ PrintStream printStream = new PrintStream (fsStream , false , StandardCharsets . UTF_8 )) {
132142 printThreadInfo (printStream , name );
133143 } catch (IOException e ) {
134144 throw new RuntimeException (e );
0 commit comments