Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1663,11 +1663,35 @@ protected void shutdownScheduledExecutorService(ScheduledExecutorService schedul
if (scheduledExecutorService == null) {
return;
}

// Graceful shutdown: stop accepting new tasks and wait for submitted tasks to complete
scheduledExecutorService.shutdown();

try {
scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted! ", ignore);
// Wait for tasks to complete, at most 60 seconds
if (!scheduledExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
Comment on lines +1671 to +1672
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The timeout has been increased from 5 seconds to 60 seconds per executor shutdown. Since shutdownScheduledExecutorService is called three times during broker shutdown (lines 1508, 1603, 1604), this could potentially extend total shutdown time from 15 seconds to 180 seconds (3 minutes) in the worst-case scenario where all three executors time out.

While this provides more time for graceful task completion, consider:

  1. Making the timeout configurable via BrokerConfig to allow operators to tune it based on their workload
  2. Adding metrics or logging to track actual shutdown durations to identify if such long timeouts are necessary
  3. Documenting the maximum expected shutdown time for operators

Note: The test at BrokerShutdownTest.testBrokerGracefulShutdown currently expects shutdown within 40 seconds, which would need to be updated if this long timeout is retained.

Copilot uses AI. Check for mistakes.
// If timeout, force shutdown all tasks
BrokerController.LOG.warn("ScheduledExecutorService did not terminate gracefully, forcing shutdown...");
List<Runnable> pendingTasks = scheduledExecutorService.shutdownNow();

if (!pendingTasks.isEmpty()) {
BrokerController.LOG.warn("ScheduledExecutorService had {} pending tasks that were cancelled",
pendingTasks.size());
}

// Wait again for a period to ensure all tasks are terminated
if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown");
} else {
BrokerController.LOG.info("ScheduledExecutorService terminated successfully after forced shutdown");
}
} else {
BrokerController.LOG.debug("ScheduledExecutorService terminated gracefully");
}
} catch (InterruptedException e) {
// If interrupted during waiting, force shutdown
BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted, forcing shutdown...", e);
scheduledExecutorService.shutdownNow();
Comment on lines +1683 to +1694
Copy link

Copilot AI Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second awaitTermination call after shutdownNow() can throw InterruptedException, but this exception is not caught here. If interrupted, the exception will propagate to the outer catch block at line 1691, which will call shutdownNow() again (redundant since it was already called) and set the interrupt flag. However, the success log at line 1686 would never be reached if interrupted.

Consider wrapping this second awaitTermination call in its own try-catch block to handle interruption properly, or document that interruption during forced shutdown is acceptable.

Suggested change
if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown");
} else {
BrokerController.LOG.info("ScheduledExecutorService terminated successfully after forced shutdown");
}
} else {
BrokerController.LOG.debug("ScheduledExecutorService terminated gracefully");
}
} catch (InterruptedException e) {
// If interrupted during waiting, force shutdown
BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted, forcing shutdown...", e);
scheduledExecutorService.shutdownNow();
try {
if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown");
} else {
BrokerController.LOG.info("ScheduledExecutorService terminated successfully after forced shutdown");
}
} catch (InterruptedException ie) {
BrokerController.LOG.warn("Interrupted while waiting for ScheduledExecutorService to terminate after forced shutdown", ie);
Thread.currentThread().interrupt();
}
} else {
BrokerController.LOG.debug("ScheduledExecutorService terminated gracefully");
}
} catch (InterruptedException e) {
// If interrupted during initial waiting, force shutdown
BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted during initial wait, forcing shutdown...", e);

Copilot uses AI. Check for mistakes.
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static PutMessageResult checkBeforePutMessage(BrokerController brokerCont
if (!brokerController.getMessageStore().getRunningFlags().isWriteable()) {
long value = PRINT_TIMES.getAndIncrement();
if ((value % 50000) == 0) {
LOG.warn("message store is not writeable, so putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits());
LOG.warn("message store is not writeable, putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits());
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
Expand Down
Loading