Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public void testDiscoveryStats() throws Exception {
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
assertBusy(
() -> assertThat(
internalCluster().clusterService(internalCluster().getClusterManagerName()).getMasterService().numberOfPendingTasks(),
internalCluster().clusterService(internalCluster().getClusterManagerName())
.getClusterManagerService()
.numberOfPendingTasks(),
equalTo(0)
)
); // see https://github.com/elastic/elasticsearch/issues/24388
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.getMasterService().pendingTasks();
List<PendingClusterTask> pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
Expand All @@ -413,7 +413,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
invoked2.await();

// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getClusterManagerService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void onFailure(String source, Exception e) {
}
Thread.sleep(100);

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ private boolean validateRequest(final ClusterHealthRequest request, ClusterState
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
}
Expand All @@ -341,9 +341,9 @@ private ClusterHealthResponse getResponse(
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void clusterManagerOperation(
ActionListener<PendingClusterTasksResponse> listener
) {
logger.trace("fetching pending tasks from cluster service");
final List<PendingClusterTask> pendingTasks = clusterService.getMasterService().pendingTasks();
final List<PendingClusterTask> pendingTasks = clusterService.getClusterManagerService().pendingTasks();
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

package org.opensearch.cluster;

import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;

import java.util.List;

Expand All @@ -49,15 +49,15 @@ public interface ClusterStateTaskListener {

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*/
default void onNoLongerClusterManager(String source) {
onFailure(source, new NotClusterManagerException("no longer cluster-manager. source: [" + source + "]"));
}

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*
* @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #onNoLongerClusterManager(String)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final boolean singleNodeDiscovery;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
Expand Down Expand Up @@ -191,7 +191,7 @@ public Coordinator(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier,
SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier,
Expand All @@ -203,15 +203,15 @@ public Coordinator(
) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
masterService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
Expand Down Expand Up @@ -260,7 +260,7 @@ public Coordinator(
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForClusterManagerService);
clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
Expand Down Expand Up @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
Expand Down Expand Up @@ -757,7 +757,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
}

private void cleanClusterManagerService() {
masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
Expand Down Expand Up @@ -1129,7 +1129,7 @@ private void scheduleReconfigurationIfNeeded() {
final ClusterState state = getLastAcceptedState();
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
logger.trace("scheduling reconfiguration");
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
reconfigurationTaskScheduled.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Priority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class JoinHelper {
Setting.Property.Deprecated
);

private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor;

Expand All @@ -122,7 +122,7 @@ public class JoinHelper {
JoinHelper(
Settings settings,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
TransportService transportService,
LongSupplier currentTermSupplier,
Supplier<ClusterState> currentStateSupplier,
Expand All @@ -132,7 +132,7 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService
) {
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -458,7 +458,7 @@ class LeaderJoinAccumulator implements JoinAccumulator {
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-join",
task,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down Expand Up @@ -543,7 +543,7 @@ public void close(Mode newMode) {
pendingAsTasks.put(JoinTaskExecutor.newBecomeClusterManagerTask(), (source, e) -> {});
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {});
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(
clusterManagerService.submitStateUpdateTasks(
stateUpdateSource,
pendingAsTasks,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

/**
* Main Cluster Manager Node Service
*
* @opensearch.internal
*/
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* @opensearch.internal
*/
public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;

private final ClusterApplierService clusterApplierService;

Expand Down Expand Up @@ -93,20 +93,20 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new MasterService(settings, clusterSettings, threadPool),
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
);
}

public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
MasterService masterService,
ClusterManagerService clusterManagerService,
ClusterApplierService clusterApplierService
) {
this.settings = settings;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand All @@ -132,18 +132,18 @@ public RerouteService getRerouteService() {
@Override
protected synchronized void doStart() {
clusterApplierService.start();
masterService.start();
clusterManagerService.start();
}

@Override
protected synchronized void doStop() {
masterService.stop();
clusterManagerService.stop();
clusterApplierService.stop();
}

@Override
protected synchronized void doClose() {
masterService.close();
clusterManagerService.close();
clusterApplierService.close();
}

Expand Down Expand Up @@ -228,8 +228,14 @@ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) {
addLocalNodeClusterManagerListener(listener);
}

public ClusterManagerService getClusterManagerService() {
return clusterManagerService;
}

/** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #getClusterManagerService()} */
@Deprecated
public MasterService getMasterService() {
return masterService;
return clusterManagerService;
}

/**
Expand All @@ -252,7 +258,7 @@ public ClusterApplierService getClusterApplierService() {

public static boolean assertClusterOrClusterManagerStateThread() {
assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(MasterService.CLUSTER_MANAGER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(ClusterManagerService.CLUSTER_MANAGER_UPDATE_THREAD_NAME)
: "not called from the master/cluster state update thread";
return true;
}
Expand Down Expand Up @@ -349,6 +355,6 @@ public <T> void submitStateUpdateTasks(
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, tasks, config, executor);
clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@
* Main Master Node Service
*
* @opensearch.internal
* @deprecated As of 2.2, because supporting inclusive language, replaced by {@link ClusterManagerService}.
*/
@Deprecated
public class MasterService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(MasterService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -337,8 +337,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexModule.NODE_STORE_ALLOW_MMAP,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Nullable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected boolean blockingAllowed() {
return Transports.assertNotTransportThread(BLOCKING_OP_REASON)
&& ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)
&& ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON)
&& MasterService.assertNotClusterManagerUpdateThread(BLOCKING_OP_REASON);
&& ClusterManagerService.assertNotClusterManagerUpdateThread(BLOCKING_OP_REASON);
}

@Override
Expand Down
Loading