Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ public class PerformanceAnalyzerController {
// This file should contain "true" or "false", indicating whether batch metrics is currently
// enabled or not.
private static final String BATCH_METRICS_ENABLED_CONF = "batch_metrics_enabled.conf";
private static final String THREAD_CONTENTION_MONITORING_ENABLED_CONF = "thread_contention_monitoring_enabled.conf";
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerController.class);
public static final int DEFAULT_NUM_OF_SHARDS_PER_COLLECTION = 0;

private boolean paEnabled;
private boolean rcaEnabled;
private boolean loggingEnabled;
private boolean batchMetricsEnabled;
private boolean threadContentionMonitoringEnabled;
private volatile int shardsPerCollection;
private static final boolean paEnabledDefaultValue = false;
private static final boolean rcaEnabledDefaultValue = true;
private static final boolean loggingEnabledDefaultValue = false;
private static final boolean batchMetricsEnabledDefaultValue = false;
private static final boolean threadContentionMonitoringEnabledDefaultValue = false;
private final ScheduledMetricCollectorsExecutor scheduledMetricCollectorsExecutor;

public PerformanceAnalyzerController(
Expand All @@ -52,6 +55,7 @@ public PerformanceAnalyzerController(
initRcaStateFromConf();
initLoggingStateFromConf();
initBatchMetricsStateFromConf();
initThreadContentionMonitoringStateFromConf();
shardsPerCollection = DEFAULT_NUM_OF_SHARDS_PER_COLLECTION;
}

Expand Down Expand Up @@ -88,6 +92,10 @@ public boolean isBatchMetricsEnabled() {
return batchMetricsEnabled;
}

public boolean isThreadContentionMonitoringEnabled() {
return threadContentionMonitoringEnabled;
}

/**
* Reads the shardsPerCollection parameter in NodeStatsMetric
*
Expand Down Expand Up @@ -166,6 +174,23 @@ public void updateBatchMetricsState(final boolean shouldEnable) {
saveStateToConf(this.batchMetricsEnabled, BATCH_METRICS_ENABLED_CONF);
}

/**
* Updates the state of the thread contention monitoring api.
*
* @param shouldEnable The desired state of the thread contention monitoring api. False to disable, and true to
* enable.
*/
public void updateThreadContentionMonitoringState(final boolean shouldEnable) {
if (shouldEnable && !isPerformanceAnalyzerEnabled()) {
return;
}
this.threadContentionMonitoringEnabled = shouldEnable;
if (scheduledMetricCollectorsExecutor != null) {
scheduledMetricCollectorsExecutor.setThreadContentionMonitoringEnabled(threadContentionMonitoringEnabled);
}
saveStateToConf(this.threadContentionMonitoringEnabled, THREAD_CONTENTION_MONITORING_ENABLED_CONF);
}

private void initPerformanceAnalyzerStateFromConf() {
Path filePath = Paths.get(getDataDirectory(), PERFORMANCE_ANALYZER_ENABLED_CONF);
PerformanceAnalyzerPlugin.invokePrivileged(
Expand Down Expand Up @@ -249,6 +274,29 @@ private void initBatchMetricsStateFromConf() {
});
}

private void initThreadContentionMonitoringStateFromConf() {
Path filePath = Paths.get(getDataDirectory(), THREAD_CONTENTION_MONITORING_ENABLED_CONF);
PerformanceAnalyzerPlugin.invokePrivileged(
() -> {
boolean threadContentionMonitoringEnabledFromConf;
try {
threadContentionMonitoringEnabledFromConf = readBooleanFromFile(filePath);
} catch (NoSuchFileException e) {
LOG.debug("Error reading Performance Analyzer state from Conf file", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error reading thread contention monitoring state?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eirsep nit: I think this change was missed in the revision.

saveStateToConf(
threadContentionMonitoringEnabledDefaultValue, THREAD_CONTENTION_MONITORING_ENABLED_CONF);
threadContentionMonitoringEnabledFromConf = threadContentionMonitoringEnabledDefaultValue;
} catch (Exception e) {
LOG.debug("Error reading thread contention monitoring state from Conf file", e);
threadContentionMonitoringEnabledFromConf = threadContentionMonitoringEnabledDefaultValue;
}

// For thread contention monitoring to be enabled, both PA and thread contention monitoring
// need to enabled.
updateThreadContentionMonitoringState(paEnabled && threadContentionMonitoringEnabledFromConf);
});
}

private boolean readBooleanFromFile(final Path filePath) throws Exception {
try (Scanner sc = new Scanner(filePath)) {
String nextLine = sc.nextLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public final class PerformanceAnalyzerClusterSettings {
/**
* Cluster setting that controls state for various PA components. Bit 0: Perf Analyzer
* enabled/disabled Bit 1: RCA enabled/disabled Bit 2: Logging enabled/disabled Bit 3: Batch
* Metrics enabled/disabled
* Metrics enabled/disabled Bit 4: Thread Contention Monitoring enabled/disabled
*/
public static final Setting<Integer> COMPOSITE_PA_SETTING =
Setting.intSetting(
Expand All @@ -25,7 +25,8 @@ public enum PerformanceAnalyzerFeatureBits {
PA_BIT,
RCA_BIT,
LOGGING_BIT,
BATCH_METRICS_BIT
BATCH_METRICS_BIT,
THREAD_CONTENTION_MONITORING_BIT
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class PerformanceAnalyzerClusterSettingHandler implements ClusterSettingL
private static final int BATCH_METRICS_ENABLED_BIT_POS =
PerformanceAnalyzerClusterSettings.PerformanceAnalyzerFeatureBits.BATCH_METRICS_BIT
.ordinal();
private static final int THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS =
PerformanceAnalyzerClusterSettings.PerformanceAnalyzerFeatureBits.THREAD_CONTENTION_MONITORING_BIT
.ordinal();

private final PerformanceAnalyzerController controller;
private final ClusterSettingsManager clusterSettingsManager;
Expand All @@ -45,7 +48,8 @@ public PerformanceAnalyzerClusterSettingHandler(
controller.isPerformanceAnalyzerEnabled(),
controller.isRcaEnabled(),
controller.isLoggingEnabled(),
controller.isBatchMetricsEnabled());
controller.isBatchMetricsEnabled(),
controller.isThreadContentionMonitoringEnabled());
}

/**
Expand Down Expand Up @@ -92,6 +96,17 @@ public void updateBatchMetricsSetting(final boolean state) {
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, settingIntValue);
}

/**
* Updates the Thread Contention Monitoring setting across the cluster.
*
* @param state The desired state for Thread Contention Monitoring setting.
*/
public void updateThreadContentionMonitoringSetting(final boolean state) {
final Integer settingIntValue = getThreadContentionMonitoringSettingValueFromState(state);
clusterSettingsManager.updateSetting(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, settingIntValue);
}

/**
* Handler that gets called when there is a new value for the setting that this listener is
* listening to.
Expand All @@ -106,6 +121,7 @@ public void onSettingUpdate(final Integer newSettingValue) {
controller.updateRcaState(getRcaStateFromSetting(newSettingValue));
controller.updateLoggingState(getLoggingStateFromSetting(newSettingValue));
controller.updateBatchMetricsState(getBatchMetricsStateFromSetting(newSettingValue));
controller.updateThreadContentionMonitoringState(getThreadContentionMonitoringStateFromSetting(newSettingValue));
}
}

Expand All @@ -130,7 +146,8 @@ private Integer initializeClusterSettingValue(
final boolean paEnabled,
final boolean rcaEnabled,
final boolean loggingEnabled,
final boolean batchMetricsEnabled) {
final boolean batchMetricsEnabled,
final boolean threadContentionMonitoringEnabled) {
int clusterSetting = CLUSTER_SETTING_DISABLED_VALUE;

clusterSetting = paEnabled ? setBit(clusterSetting, PA_ENABLED_BIT_POS) : clusterSetting;
Expand All @@ -145,6 +162,10 @@ private Integer initializeClusterSettingValue(
batchMetricsEnabled
? setBit(clusterSetting, BATCH_METRICS_ENABLED_BIT_POS)
: clusterSetting;
clusterSetting =
threadContentionMonitoringEnabled
? setBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS)
: clusterSetting;
}
return clusterSetting;
}
Expand All @@ -161,10 +182,10 @@ private boolean getPAStateFromSetting(final int settingValue) {

/**
* Converts the boolean PA state to composite cluster setting. If Performance Analyzer is being
* turned off, it will also turn off RCA, logging, and batch metrics.
* turned off, it will also turn off RCA, logging, batch metrics and thread contention monitoring.
*
* @param state the state of performance analyzer. Will enable performance analyzer if true,
* disables performance analyzer, RCA, logging, and batch metrics.
* disables performance analyzer, RCA, logging, batch metrics and thread contention monitoring.
* @return composite cluster setting as an integer.
*/
private Integer getPASettingValueFromState(final boolean state) {
Expand All @@ -176,10 +197,12 @@ private Integer getPASettingValueFromState(final boolean state) {
return resetBit(
resetBit(
resetBit(
resetBit(clusterSetting, PA_ENABLED_BIT_POS),
RCA_ENABLED_BIT_POS),
LOGGING_ENABLED_BIT_POS),
BATCH_METRICS_ENABLED_BIT_POS);
resetBit(
resetBit(clusterSetting, PA_ENABLED_BIT_POS),
RCA_ENABLED_BIT_POS),
LOGGING_ENABLED_BIT_POS),
BATCH_METRICS_ENABLED_BIT_POS),
THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS);
}
}

Expand Down Expand Up @@ -213,6 +236,16 @@ private boolean getBatchMetricsStateFromSetting(final int settingValue) {
return ((settingValue >> BATCH_METRICS_ENABLED_BIT_POS) & BIT_ONE) == ENABLED_VALUE;
}

/**
* Extracts the boolean value for thread contention monitoring state from the cluster setting.
*
* @param settingValue The composite setting value.
* @return true if the THREAD_CONTENTION_MONITORING bit is set, false otherwise.
*/
private boolean getThreadContentionMonitoringStateFromSetting(final int settingValue) {
return ((settingValue >> THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS) & BIT_ONE) == ENABLED_VALUE;
}

/**
* Converts the boolean RCA state to composite cluster setting. Enables RCA only if performance
* analyzer is also set. Otherwise, results in a no-op.
Expand Down Expand Up @@ -272,6 +305,26 @@ private Integer getBatchMetricsSettingValueFromState(final boolean shouldEnable)
}
}

/**
* Converts the boolean thread contention monitoring state to composite cluster setting. Enables thread contention
* monitoring only if performance analyzer is also set. Otherwise, results in a no-op.
*
* @param shouldEnable the state of thread contention monitoring. Will try to enable if true, disables thread
* contention monitoring if false.
* @return composite cluster setting as an integer.
*/
private Integer getThreadContentionMonitoringSettingValueFromState(final boolean shouldEnable) {
int clusterSetting = currentClusterSetting;

if (shouldEnable) {
return checkBit(currentClusterSetting, PA_ENABLED_BIT_POS)
? setBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS)
: clusterSetting;
} else {
return resetBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS);
}
}

/**
* Sets the bit at the specified position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler {
RestConfig.PA_BASE_URI + "/logging/cluster/config";
public static final String BATCH_METRICS_CLUSTER_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/batch/cluster/config";
public static final String THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/threadContentionMonitoring/cluster/config";

public static final String LEGACY_PA_CLUSTER_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/cluster/config";
Expand All @@ -59,6 +61,16 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler {
public static final String LEGACY_BATCH_METRICS_CLUSTER_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/batch/cluster/config";

private static final List<Route> ROUTES =
unmodifiableList(
asList(
new Route(
RestRequest.Method.GET,
THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH),
new Route(
RestRequest.Method.POST,
THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH)
));
private static final List<ReplacedRoute> REPLACED_ROUTES =
unmodifiableList(
asList(
Expand Down Expand Up @@ -126,7 +138,7 @@ public String getName() {

@Override
public List<Route> routes() {
return Collections.emptyList();
return ROUTES;
}

@Override
Expand Down Expand Up @@ -169,6 +181,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
} else if (request.path().contains(BATCH_METRICS_CLUSTER_CONFIG_PATH)
|| request.path().contains(LEGACY_BATCH_METRICS_CLUSTER_CONFIG_PATH)) {
clusterSettingHandler.updateBatchMetricsSetting((Boolean) value);
} else if (request.path().contains(THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH)) {
clusterSettingHandler.updateThreadContentionMonitoringSetting((Boolean) value);
} else {
clusterSettingHandler.updatePerformanceAnalyzerSetting((Boolean) value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
public static final String RCA_ENABLED = "rcaEnabled";
public static final String PA_LOGGING_ENABLED = "loggingEnabled";
public static final String BATCH_METRICS_ENABLED = "batchMetricsEnabled";
public static final String THREAD_CONTENTION_MONITORING_ENABLED = "threadContentionMonitoringEnabled";
public static final String BATCH_METRICS_RETENTION_PERIOD_MINUTES =
"batchMetricsRetentionPeriodMinutes";
public static final String PERFORMANCE_ANALYZER_CONFIG_ACTION =
Expand All @@ -48,6 +49,8 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
public static final String PA_CONFIG_PATH = RestConfig.PA_BASE_URI + "/config";
public static final String LOGGING_CONFIG_PATH = RestConfig.PA_BASE_URI + "/logging/config";
public static final String BATCH_METRICS_CONFIG_PATH = RestConfig.PA_BASE_URI + "/batch/config";
public static final String THREAD_CONTENTION_MONITORING_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/threadContentionMonitoring/config";

public static final String LEGACY_RCA_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/rca/config";
Expand All @@ -56,7 +59,16 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
RestConfig.LEGACY_PA_BASE_URI + "/logging/config";
public static final String LEGACY_BATCH_METRICS_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/batch/config";

private static final List<Route> ROUTES =
unmodifiableList(
asList(
new Route(
RestRequest.Method.GET,
THREAD_CONTENTION_MONITORING_CONFIG_PATH),
new Route(
RestRequest.Method.POST,
THREAD_CONTENTION_MONITORING_CONFIG_PATH)
));
private static final List<ReplacedRoute> REPLACED_ROUTES =
unmodifiableList(
asList(
Expand Down Expand Up @@ -112,7 +124,7 @@ public static void setInstance(

@Override
public List<Route> routes() {
return Collections.emptyList();
return ROUTES;
}

@Override
Expand Down Expand Up @@ -172,12 +184,20 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
}

performanceAnalyzerController.updateBatchMetricsState(shouldEnable);
} else if (request.path().contains(THREAD_CONTENTION_MONITORING_CONFIG_PATH)) {
if (shouldEnable
&& !performanceAnalyzerController.isPerformanceAnalyzerEnabled()) {
return getChannelConsumerWithError(
"Error: PA not enabled. Enable PA before turning thread contention monitoring on");
}
performanceAnalyzerController.updateThreadContentionMonitoringState(shouldEnable);
} else {
// Disabling Performance Analyzer should disable the RCA framework as well.
if (!shouldEnable) {
performanceAnalyzerController.updateRcaState(false);
performanceAnalyzerController.updateLoggingState(false);
performanceAnalyzerController.updateBatchMetricsState(false);
performanceAnalyzerController.updateThreadContentionMonitoringState(false);
}
performanceAnalyzerController.updatePerformanceAnalyzerState(shouldEnable);
}
Expand Down Expand Up @@ -206,6 +226,9 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
builder.field(
BATCH_METRICS_ENABLED,
performanceAnalyzerController.isBatchMetricsEnabled());
builder.field(
THREAD_CONTENTION_MONITORING_ENABLED,
performanceAnalyzerController.isThreadContentionMonitoringEnabled());
builder.field(
BATCH_METRICS_RETENTION_PERIOD_MINUTES,
PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes());
Expand Down
Loading