Skip to content

Commit cf0486a

Browse files
Integ Test for RemotePublication can be disabled by rolling restart of masters (#15677)
Signed-off-by: Shivansh Arora <hishiv@amazon.com> (cherry picked from commit 2eb148c) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 00b2fcc commit cf0486a

2 files changed

Lines changed: 137 additions & 7 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 126 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,45 @@
88

99
package org.opensearch.gateway.remote;
1010

11-
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
11+
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1212
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1313
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1414
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1515
import org.opensearch.client.Client;
16+
import org.opensearch.cluster.ClusterState;
17+
import org.opensearch.cluster.coordination.CoordinationState;
18+
import org.opensearch.cluster.coordination.PersistedStateRegistry;
19+
import org.opensearch.cluster.coordination.PublishClusterStateStats;
1620
import org.opensearch.common.blobstore.BlobPath;
1721
import org.opensearch.common.settings.Settings;
18-
import org.opensearch.common.util.FeatureFlags;
1922
import org.opensearch.discovery.DiscoveryStats;
2023
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
2124
import org.opensearch.indices.recovery.RecoverySettings;
2225
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
2326
import org.opensearch.repositories.RepositoriesService;
2427
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2528
import org.opensearch.repositories.fs.ReloadableFsRepository;
29+
import org.opensearch.test.InternalTestCluster;
2630
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
2731
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
2832
import org.junit.Before;
2933

3034
import java.io.IOException;
3135
import java.nio.charset.StandardCharsets;
3236
import java.util.Base64;
37+
import java.util.HashSet;
3338
import java.util.Locale;
3439
import java.util.Map;
40+
import java.util.Objects;
41+
import java.util.Set;
3542
import java.util.function.Function;
3643
import java.util.stream.Collectors;
3744

45+
import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS;
46+
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
47+
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
48+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
49+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING;
3850
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
3951
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
4052
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
@@ -65,10 +77,7 @@ public void setup() {
6577

6678
@Override
6779
protected Settings featureFlagSettings() {
68-
return Settings.builder()
69-
.put(super.featureFlagSettings())
70-
.put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled)
71-
.build();
80+
return Settings.builder().put(super.featureFlagSettings()).put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled).build();
7281
}
7382

7483
@Override
@@ -173,11 +182,121 @@ public void testRemotePublicationDownloadStats() {
173182
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
174183
.cluster()
175184
.prepareNodesStats(dataNode)
176-
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
185+
.addMetric(DISCOVERY.metricName())
177186
.get();
178187

179188
assertDataNodeDownloadStats(nodesStatsResponseDataNode);
189+
}
190+
191+
public void testRemotePublicationDisabledByRollingRestart() throws Exception {
192+
prepareCluster(3, 2, INDEX_NAME, 1, 2);
193+
ensureStableCluster(5);
194+
ensureGreen(INDEX_NAME);
195+
196+
Set<String> clusterManagers = internalCluster().getClusterManagerNames();
197+
Set<String> restartedMasters = new HashSet<>();
198+
199+
for (String clusterManager : clusterManagers) {
200+
internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() {
201+
@Override
202+
public Settings onNodeStopped(String nodeName) {
203+
restartedMasters.add(nodeName);
204+
return Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, false).build();
205+
}
206+
207+
@Override
208+
public void doAfterNodes(int n, Client client) {
209+
String activeCM = internalCluster().getClusterManagerName();
210+
Set<String> followingCMs = clusterManagers.stream()
211+
.filter(node -> !Objects.equals(node, activeCM))
212+
.collect(Collectors.toSet());
213+
boolean activeCMRestarted = restartedMasters.contains(activeCM);
214+
NodesStatsResponse response = client().admin()
215+
.cluster()
216+
.prepareNodesStats(followingCMs.toArray(new String[0]))
217+
.clear()
218+
.addMetric(DISCOVERY.metricName())
219+
.get();
220+
// after master is flipped to restarted master, publication should happen on Transport
221+
response.getNodes().forEach(nodeStats -> {
222+
if (activeCMRestarted) {
223+
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
224+
assertTrue(
225+
stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0
226+
);
227+
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
228+
} else {
229+
DiscoveryStats stats = nodeStats.getDiscoveryStats();
230+
assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount());
231+
assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount());
232+
assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount());
233+
}
234+
});
235+
236+
NodesInfoResponse nodesInfoResponse = client().admin()
237+
.cluster()
238+
.prepareNodesInfo(activeCM)
239+
.clear()
240+
.addMetric(SETTINGS.metricName())
241+
.get();
242+
// if masterRestarted is true Publication Setting should be false, and vice versa
243+
assertTrue(
244+
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted
245+
);
246+
247+
followingCMs.forEach(node -> {
248+
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
249+
CoordinationState.PersistedState remoteState = registry.getPersistedState(
250+
PersistedStateRegistry.PersistedStateType.REMOTE
251+
);
252+
if (activeCMRestarted) {
253+
assertNull(remoteState.getLastAcceptedState());
254+
// assertNull(remoteState.getLastAcceptedManifest());
255+
} else {
256+
ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL)
257+
.getLastAcceptedState();
258+
ClusterState remotePersistedState = remoteState.getLastAcceptedState();
259+
assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata()));
260+
assertEquals(localState.nodes(), remotePersistedState.nodes());
261+
assertEquals(localState.routingTable(), remotePersistedState.routingTable());
262+
assertEquals(localState.customs(), remotePersistedState.customs());
263+
}
264+
});
265+
}
266+
});
267+
268+
}
269+
ensureGreen(INDEX_NAME);
270+
ensureStableCluster(5);
271+
272+
String activeCM = internalCluster().getClusterManagerName();
273+
Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet());
274+
NodesStatsResponse response = client().admin()
275+
.cluster()
276+
.prepareNodesStats(followingCMs.toArray(new String[0]))
277+
.clear()
278+
.addMetric(DISCOVERY.metricName())
279+
.get();
280+
response.getNodes().forEach(nodeStats -> {
281+
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
282+
assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0);
283+
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
284+
});
285+
NodesInfoResponse nodesInfoResponse = client().admin()
286+
.cluster()
287+
.prepareNodesInfo(activeCM)
288+
.clear()
289+
.addMetric(SETTINGS.metricName())
290+
.get();
291+
// if masterRestarted is true Publication Setting should be false, and vice versa
292+
assertFalse(REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()));
180293

294+
followingCMs.forEach(node -> {
295+
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
296+
CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
297+
assertNull(remoteState.getLastAcceptedState());
298+
// assertNull(remoteState.getLastAcceptedManifest());
299+
});
181300
}
182301

183302
private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {

test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,6 +2143,17 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
21432143
}
21442144
}
21452145

2146+
/**
2147+
* Returns the name of all the cluster managers in the cluster
2148+
*/
2149+
public Set<String> getClusterManagerNames() {
2150+
return nodes.entrySet()
2151+
.stream()
2152+
.filter(entry -> CLUSTER_MANAGER_NODE_PREDICATE.test(entry.getValue()))
2153+
.map(entry -> entry.getKey())
2154+
.collect(Collectors.toSet());
2155+
}
2156+
21462157
/**
21472158
* Returns the name of the current cluster-manager node in the cluster.
21482159
*/

0 commit comments

Comments
 (0)