Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
Expand All @@ -30,35 +31,36 @@ final class DeadHostState implements Comparable<DeadHostState> {

private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
static final Supplier<Long> DEFAULT_TIME_SUPPLIER = System::nanoTime;

private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;
private final Supplier<Long> timeSupplier;

/**
* Build the initial dead state of a host. Useful when a working host stops functioning
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
*
* @param timeSupplier a way to supply the current time and allow for unit testing
*/
DeadHostState(TimeSupplier timeSupplier) {
DeadHostState(Supplier<Long> timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}

/**
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
* through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
*
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
*/
DeadHostState(DeadHostState previousDeadHostState) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = previousDeadHostState.timeSupplier;
}
Expand All @@ -69,7 +71,7 @@ final class DeadHostState implements Comparable<DeadHostState> {
* @return true if the host should be retried, false otherwise
*/
boolean shallBeRetried() {
return timeSupplier.nanoTime() - deadUntilNanos > 0;
return timeSupplier.get() - deadUntilNanos > 0;
}

/**
Expand All @@ -87,8 +89,8 @@ int getFailedAttempts() {
@Override
public int compareTo(DeadHostState other) {
if (timeSupplier != other.timeSupplier) {
throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
+ timeSupplier + " != " + other.timeSupplier + "]");
throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks");
}
return Long.compare(deadUntilNanos, other.deadUntilNanos);
}
Expand All @@ -101,23 +103,4 @@ public String toString() {
", timeSupplier=" + timeSupplier +
'}';
}

/**
* Time supplier that makes timing aspects pluggable to ease testing
*/
interface TimeSupplier {
TimeSupplier DEFAULT = new TimeSupplier() {
@Override
public long nanoTime() {
return System.nanoTime();
}

@Override
public String toString() {
return "nanoTime";
}
};

long nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public String getEndpoint() {
*/
public void addParameter(String name, String value) {
Objects.requireNonNull(name, "url parameter name cannot be null");
// .putIfAbsent(name, value) except we are in Java 7 which doesn't have that.
if (parameters.containsKey(name)) {
throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.client;

import org.apache.http.message.BasicHeader;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;

Expand All @@ -38,7 +38,7 @@ public final class RequestOptions {
* Default request options.
*/
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();

private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
Expand Down
38 changes: 11 additions & 27 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
Expand All @@ -72,6 +71,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -139,7 +139,11 @@ public static RestClientBuilder builder(Node... nodes) {
* @see Node#Node(HttpHost)
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hostsToNodes(hosts));
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
return new RestClientBuilder(nodes);
}

/**
Expand All @@ -163,17 +167,6 @@ public synchronized void setNodes(Collection<Node> nodes) {
this.blacklist.clear();
}

private static List<Node> hostsToNodes(HttpHost[] hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}

/**
* Get the list of nodes that the client knows about. The list is
* unmodifiable.
Expand Down Expand Up @@ -369,15 +362,11 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null) {
livingNodes.add(node);
continue;
}
if (deadness.shallBeRetried()) {
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
} else {
deadNodes.add(new DeadNode(node, deadness));
}
deadNodes.add(new DeadNode(node, deadness));
}

if (false == livingNodes.isEmpty()) {
Expand Down Expand Up @@ -415,12 +404,7 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
* to compare many things. This saves us a sort on the unfiltered
* list.
*/
nodeSelector.select(new Iterable<Node>() {
@Override
public Iterator<Node> iterator() {
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
}
});
nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
if (false == selectedDeadNodes.isEmpty()) {
return singletonList(Collections.min(selectedDeadNodes).node);
}
Expand All @@ -447,7 +431,7 @@ private void onResponse(Node node) {
private void onFailure(Node node) {
while(true) {
DeadHostState previousDeadHostState =
blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
if (previousDeadHostState == null) {
if (logger.isDebugEnabled()) {
logger.debug("added [" + node + "] to blacklist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,8 @@ public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
Expand All @@ -218,12 +214,7 @@ private CloseableHttpAsyncClient createHttpClient() {
}

final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return finalBuilder.build();
}
});
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("could not create the default ssl context", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.elasticsearch.client.DeadHostState.TimeSupplier;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand All @@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase {
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};

public void testInitialDeadHostStateDefaultTimeSupplier() {
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
long currentTime = System.nanoTime();
assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime));
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}

public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous);
Expand All @@ -58,10 +56,13 @@ public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
public void testCompareToTimeSupplier() {
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
final AtomicLong time = new AtomicLong(0);
for (int i = 0; i < numObjects; i++) {
if (i == 0) {
// this test requires a strictly increasing timer
deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier());
// this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight
// loop we always notice time moving forward. This does not happen for real timer implementations
// (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
deadHostStates[i] = new DeadHostState(time::incrementAndGet);
} else {
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]);
}
Expand All @@ -74,42 +75,39 @@ public void testCompareToTimeSupplier() {

public void testCompareToDifferingTimeSupplier() {
try {
new DeadHostState(TimeSupplier.DEFAULT).compareTo(
new DeadHostState(new ConfigurableTimeSupplier()));
new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo(
new DeadHostState(() -> 0L));
fail("expected failure");
} catch (IllegalArgumentException e) {
assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]",
e.getMessage());
assertEquals("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks", e.getMessage());
}
}

public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
final AtomicLong time = new AtomicLong(0);
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
deadHostState = new DeadHostState(time::get);
} else {
deadHostState = new DeadHostState(deadHostState);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}

public void testDeadHostStateTimeouts() {
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
DeadHostState previous = new DeadHostState(() -> 0L);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous);
Expand All @@ -123,37 +121,4 @@ public void testDeadHostStateTimeouts() {
previous = deadHostState;
}
}

static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
long nanoTime;

@Override
public long nanoTime() {
return nanoTime;
}

@Override
public String toString() {
return "configured[" + nanoTime + "]";
}
}

/**
* Simulates a monotonically strict increasing time (i.e. the value increases on every call to <code>#nanoTime()</code>). This ensures
* that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real
* timer implementations (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
*/
static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier {
private final AtomicLong time = new AtomicLong(0);

@Override
public long nanoTime() {
return time.incrementAndGet();
}

@Override
public String toString() {
return "strict monotonic[" + time.get() + "]";
}
}
}
Loading