Skip to content

Commit 94ff71b

Browse files
author
Rahul Karajgikar
committed
spotless apply and import cleanup
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 9496aa1 commit 94ff71b

11 files changed

Lines changed: 134 additions & 117 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java

Lines changed: 76 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@
3232

3333
package org.opensearch.cluster.coordination;
3434

35-
import java.util.Arrays;
36-
import java.util.Collection;
37-
import java.util.concurrent.atomic.AtomicBoolean;
3835
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
3936
import org.opensearch.cluster.ClusterChangedEvent;
4037
import org.opensearch.cluster.ClusterStateApplier;
@@ -54,10 +51,19 @@
5451
import org.opensearch.test.store.MockFSIndexStore;
5552
import org.opensearch.test.transport.MockTransportService;
5653
import org.opensearch.test.transport.StubbableTransport;
57-
import org.opensearch.transport.*;
54+
import org.opensearch.transport.Transport;
55+
import org.opensearch.transport.TransportChannel;
56+
import org.opensearch.transport.TransportConnectionListener;
57+
import org.opensearch.transport.TransportRequest;
58+
import org.opensearch.transport.TransportRequestHandler;
59+
import org.opensearch.transport.TransportService;
60+
61+
import java.util.Arrays;
62+
import java.util.Collection;
63+
import java.util.concurrent.atomic.AtomicBoolean;
5864

59-
import static org.hamcrest.Matchers.is;
6065
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
66+
import static org.hamcrest.Matchers.is;
6167

6268
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
6369
public class NodeJoinLeftIT extends OpenSearchIntegTestCase {
@@ -99,7 +105,7 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
99105
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
100106
.build();
101107
// start a cluster-manager node
102-
final String cm =internalCluster().startNode(nodeSettings);
108+
final String cm = internalCluster().startNode(nodeSettings);
103109

104110
System.out.println("--> spawning node t1");
105111
final String blueNodeName = internalCluster().startNode(
@@ -126,8 +132,10 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
126132
.get();
127133
System.out.println("--> done creating index");
128134
MockTransportService cmTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
129-
MockTransportService redTransportService =
130-
(MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
135+
MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(
136+
TransportService.class,
137+
redNodeName
138+
);
131139

132140
ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, cm);
133141
// simulate a slow applier on the cm
@@ -147,82 +155,79 @@ public void applyClusterState(ClusterChangedEvent event) {
147155

148156
@Override
149157
public void onConnectionOpened(Transport.Connection connection) {
150-
// try {
151-
// Thread.sleep(500);
152-
// } catch (InterruptedException e) {
153-
// throw new RuntimeException(e);
154-
// }
158+
// try {
159+
// Thread.sleep(500);
160+
// } catch (InterruptedException e) {
161+
// throw new RuntimeException(e);
162+
// }
155163

156164
}
157165

158166
@Override
159167
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
160-
// if (node.getName().equals("node_t2")) {
161-
// try {
162-
// Thread.sleep(250);
163-
// } catch (InterruptedException e) {
164-
// throw new RuntimeException(e);
165-
// }
166-
// }
168+
// if (node.getName().equals("node_t2")) {
169+
// try {
170+
// Thread.sleep(250);
171+
// } catch (InterruptedException e) {
172+
// throw new RuntimeException(e);
173+
// }
174+
// }
167175
}
168176

169-
// @Override
170-
// public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
171-
// try {
172-
// Thread.sleep(5000);
173-
// } catch (InterruptedException e) {
174-
// throw new RuntimeException(e);
175-
// }
176-
// }
177+
// @Override
178+
// public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
179+
// try {
180+
// Thread.sleep(5000);
181+
// } catch (InterruptedException e) {
182+
// throw new RuntimeException(e);
183+
// }
184+
// }
177185
});
178186
AtomicBoolean bb = new AtomicBoolean();
179187
// simulate followerchecker failure
180188

181-
ConnectionDelay handlingBehavior = new ConnectionDelay(
182-
FOLLOWER_CHECK_ACTION_NAME,
183-
()->{
184-
if (bb.get()) {
185-
return;
186-
}
187-
try {
188-
Thread.sleep(10);
189-
} catch (InterruptedException e) {
190-
throw new RuntimeException(e);
191-
}
192-
throw new NodeHealthCheckFailureException("non writable exception");
193-
});
189+
ConnectionDelay handlingBehavior = new ConnectionDelay(FOLLOWER_CHECK_ACTION_NAME, () -> {
190+
if (bb.get()) {
191+
return;
192+
}
193+
try {
194+
Thread.sleep(10);
195+
} catch (InterruptedException e) {
196+
throw new RuntimeException(e);
197+
}
198+
throw new NodeHealthCheckFailureException("non writable exception");
199+
});
194200
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, handlingBehavior);
195201

196-
197-
// for (int i=0 ;i < 1; i++) {
198-
// //cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
199-
// System.out.println("--> follower check, iteration: " + i);
200-
// bb.set(true); // pass followerchecker
201-
// System.out.println("--> setting bb to true, sleeping for 1500ms, iteration: " + i);
202-
// Thread.sleep(1500);
203-
// bb.set(false); // fail followerchecker
204-
// System.out.println("--> setting bb to false, iteration: " + i);
205-
// System.out.println("--> checking cluster health 2 nodes, iteration: " + i);
206-
// ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
207-
// assertThat(response1.isTimedOut(), is(false));
208-
// System.out.println("--> completed checking cluster health 2 nodes, iteration: " + i);
209-
// //internalCluster().stopRandomNode(InternalTestCluster.nameFilter(blueNodeName));
210-
// System.out.println("--> checking cluster health 3 nodes, iteration: " + i);
211-
// ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
212-
// assertThat(response2.isTimedOut(), is(false));
213-
// System.out.println("--> completed checking cluster health 3 nodes, iteration: " + i);
214-
// }
215-
// for (int i=0 ;i < 1; i++) {
216-
//
217-
// bb.set(true); // pass followerchecker
218-
//
219-
// Thread.sleep(1500);
220-
// System.out.println("--> manually disconnecting node, iteration: " + i);
221-
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
222-
// }
202+
// for (int i=0 ;i < 1; i++) {
203+
// //cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
204+
// System.out.println("--> follower check, iteration: " + i);
205+
// bb.set(true); // pass followerchecker
206+
// System.out.println("--> setting bb to true, sleeping for 1500ms, iteration: " + i);
207+
// Thread.sleep(1500);
208+
// bb.set(false); // fail followerchecker
209+
// System.out.println("--> setting bb to false, iteration: " + i);
210+
// System.out.println("--> checking cluster health 2 nodes, iteration: " + i);
211+
// ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
212+
// assertThat(response1.isTimedOut(), is(false));
213+
// System.out.println("--> completed checking cluster health 2 nodes, iteration: " + i);
214+
// //internalCluster().stopRandomNode(InternalTestCluster.nameFilter(blueNodeName));
215+
// System.out.println("--> checking cluster health 3 nodes, iteration: " + i);
216+
// ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
217+
// assertThat(response2.isTimedOut(), is(false));
218+
// System.out.println("--> completed checking cluster health 3 nodes, iteration: " + i);
219+
// }
220+
// for (int i=0 ;i < 1; i++) {
221+
//
222+
// bb.set(true); // pass followerchecker
223+
//
224+
// Thread.sleep(1500);
225+
// System.out.println("--> manually disconnecting node, iteration: " + i);
226+
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
227+
// }
223228

224229
// FAILS WITHOUT CODE CHANGES
225-
for (int i=0 ; i < 10; i++) {
230+
for (int i = 0; i < 10; i++) {
226231
bb.set(false); // fail followerchecker by force to trigger node disconnect
227232
System.out.println("--> disconnecting from red node, iteration: " + i);
228233
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
@@ -261,15 +266,13 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
261266
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
262267
assertThat(response.isTimedOut(), is(false));
263268
}
269+
264270
private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
265271

266272
private final String actionName;
267273
private final Runnable connectionBreaker;
268274

269-
private ConnectionDelay(
270-
String actionName,
271-
Runnable connectionBreaker
272-
) {
275+
private ConnectionDelay(String actionName, Runnable connectionBreaker) {
273276
this.actionName = actionName;
274277
this.connectionBreaker = connectionBreaker;
275278
}
@@ -282,7 +285,6 @@ public void messageReceived(
282285
Task task
283286
) throws Exception {
284287

285-
286288
connectionBreaker.run();
287289
handler.messageReceived(request, channel, task);
288290
}

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,11 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
180180
}
181181

182182
public void markPendingJoinsAsComplete(List<DiscoveryNode> nodesConnected) {
183-
for (final DiscoveryNode discoveryNode: nodesConnected) {
183+
for (final DiscoveryNode discoveryNode : nodesConnected) {
184184
transportService.markPendingJoinAsCompleted(discoveryNode);
185185
}
186186
}
187+
187188
public void disconnectFromNonBlockedNodesExcept(DiscoveryNodes discoveryNodes, DiscoveryNodes.Delta nodesDelta) {
188189
final List<Runnable> runnables = new ArrayList<>();
189190
synchronized (mutex) {

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,11 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
418418
synchronized (mutex) {
419419
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getClusterManagerNode();
420420
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
421-
logger.info("handlePublishRequest: handling version [{}] from [{}]", publishRequest.getAcceptedState().getVersion(), sourceNode);
421+
logger.info(
422+
"handlePublishRequest: handling version [{}] from [{}]",
423+
publishRequest.getAcceptedState().getVersion(),
424+
sourceNode
425+
);
422426

423427
if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) {
424428
// Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication
@@ -1360,18 +1364,20 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13601364
// join publish is failing
13611365
// before we publish, we might need
13621366
// can we recreate connections as part of publish if we don't find it?
1363-
// reconnect to any nodes that are trying to join, redundancy to avoid node connection wiping by concurrent node-join and left
1367+
// reconnect to any nodes that are trying to join, redundancy to avoid node connection wiping by concurrent node-join and
1368+
// left
13641369
// find diff of nodes from old state and new publishNodes
13651370
// this fails because we can't add blocking code to cluster manager thread
1366-
// for (DiscoveryNode addedNode : clusterChangedEvent.nodesDelta().addedNodes()) {
1367-
// // maybe add a listener here to handle failures
1368-
// try {
1369-
// transportService.connectToNode(addedNode);
1370-
// }
1371-
// catch (Exception e) {
1372-
// logger.info(() -> new ParameterizedMessage("[{}] failed reconnecting to [{}]", clusterChangedEvent.source(), addedNode), e);
1373-
// }
1374-
// }
1371+
// for (DiscoveryNode addedNode : clusterChangedEvent.nodesDelta().addedNodes()) {
1372+
// // maybe add a listener here to handle failures
1373+
// try {
1374+
// transportService.connectToNode(addedNode);
1375+
// }
1376+
// catch (Exception e) {
1377+
// logger.info(() -> new ParameterizedMessage("[{}] failed reconnecting to [{}]", clusterChangedEvent.source(), addedNode),
1378+
// e);
1379+
// }
1380+
// }
13751381

13761382
publication.start(followersChecker.getFaultyNodes());
13771383
}

server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -396,16 +396,18 @@ public void handleException(TransportException exp) {
396396

397397
final String reason;
398398

399-
// if (exp instanceof NodeNotConnectedException || exp.getCause() instanceof NodeNotConnectedException){
400-
// // NodeNotConnectedException will only happen if getConnection fails in TransportService.sendRequest
401-
// // This only happens if clusterConnectionManager.getConnection() does not find the entry in connectedNodes list
402-
// // This happens on node disconnection
403-
// // Need to validate that this only gets triggered from node-left side. we want to ensure actual disconnections work
404-
// failureCountSinceLastSuccess--;
405-
// logger.info(() -> new ParameterizedMessage("{} cache entry not found, but node is still in cluster state. ignoring this failure", FollowerChecker.this), exp);
406-
// scheduleNextWakeUp();
407-
// return;
408-
// }
399+
// if (exp instanceof NodeNotConnectedException || exp.getCause() instanceof NodeNotConnectedException){
400+
// // NodeNotConnectedException will only happen if getConnection fails in TransportService.sendRequest
401+
// // This only happens if clusterConnectionManager.getConnection() does not find the entry in connectedNodes list
402+
// // This happens on node disconnection
403+
// // Need to validate that this only gets triggered from node-left side. we want to ensure actual disconnections
404+
// work
405+
// failureCountSinceLastSuccess--;
406+
// logger.info(() -> new ParameterizedMessage("{} cache entry not found, but node is still in cluster state.
407+
// ignoring this failure", FollowerChecker.this), exp);
408+
// scheduleNextWakeUp();
409+
// return;
410+
// }
409411

410412
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
411413
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);

server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.OpenSearchException;
3939
import org.opensearch.cluster.ClusterManagerMetrics;
40-
import org.opensearch.cluster.ClusterState;
4140
import org.opensearch.cluster.node.DiscoveryNode;
4241
import org.opensearch.cluster.node.DiscoveryNodes;
4342
import org.opensearch.common.Nullable;
@@ -70,7 +69,6 @@
7069
import java.util.concurrent.atomic.AtomicLong;
7170
import java.util.concurrent.atomic.AtomicReference;
7271
import java.util.function.Consumer;
73-
import java.util.function.Supplier;
7472

7573
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
7674

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,12 @@
5252
import org.opensearch.gateway.remote.ClusterMetadataManifest;
5353
import org.opensearch.gateway.remote.RemoteClusterStateService;
5454
import org.opensearch.threadpool.ThreadPool;
55-
import org.opensearch.transport.*;
55+
import org.opensearch.transport.BytesTransportRequest;
56+
import org.opensearch.transport.TransportChannel;
57+
import org.opensearch.transport.TransportException;
58+
import org.opensearch.transport.TransportRequestOptions;
59+
import org.opensearch.transport.TransportResponseHandler;
60+
import org.opensearch.transport.TransportService;
5661

5762
import java.io.IOException;
5863
import java.util.HashMap;
@@ -607,8 +612,7 @@ private void sendClusterState(
607612
if (retryWithFullClusterStateOnFailure && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
608613
logger.debug("resending full cluster state to node {} reason {}", destination, exp.getDetailedMessage());
609614
sendFullClusterState(destination, listener);
610-
}
611-
else {
615+
} else {
612616
logger.info(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
613617
listener.onFailure(exp);
614618
}

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@
6969
import org.opensearch.threadpool.Scheduler;
7070
import org.opensearch.threadpool.ThreadPool;
7171

72-
import java.util.*;
72+
import java.util.Arrays;
73+
import java.util.Collection;
74+
import java.util.List;
75+
import java.util.Map;
76+
import java.util.Objects;
77+
import java.util.Optional;
7378
import java.util.concurrent.ConcurrentHashMap;
7479
import java.util.concurrent.CopyOnWriteArrayList;
7580
import java.util.concurrent.CountDownLatch;
@@ -579,7 +584,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
579584
logger.info("apply cluster state with version {}", newClusterState.version());
580585
callClusterStateAppliers(clusterChangedEvent, stopWatch);
581586

582-
//nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
587+
// nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
583588
nodeConnectionsService.disconnectFromNonBlockedNodesExcept(newClusterState.nodes(), clusterChangedEvent.nodesDelta());
584589

585590
assert newClusterState.coordinationMetadata()

0 commit comments

Comments
 (0)