Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -38,8 +39,13 @@
*/
public abstract class AbstractEndpointSelector implements EndpointSelector {

private static final AtomicIntegerFieldUpdater<AbstractEndpointSelector> initializedUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractEndpointSelector.class, "initialized");

private final EndpointGroup endpointGroup;
private final EndpointAsyncSelector asyncSelector;
// 0 - not initialized, 1 - initialized
private volatile int initialized;

/**
* Creates a new instance that selects an {@link Endpoint} from the specified {@link EndpointGroup}.
Expand All @@ -59,18 +65,48 @@ protected final EndpointGroup group() {
/**
* Initialize this {@link EndpointSelector} to listen to the new endpoints emitted by the
* {@link EndpointGroup}. The new endpoints will be passed to {@link #updateNewEndpoints(List)}.
*
* <p>This method is called automatically when the first selection is made. However, if you want to
* start listening to the {@link EndpointGroup} earlier, you can call this method manually.
*/
@UnstableApi
protected final void initialize() {
endpointGroup.addListener(this::refreshEndpoints, true);
tryInitialize();
}

private void tryInitialize() {
if (initialized == 1) {
return;
}

if (initializedUpdater.compareAndSet(this, 0, 1)) {
endpointGroup.addListener(this::refreshEndpoints, true);
}
}

@VisibleForTesting
boolean isInitialized() {
return initialized == 1;
}

@Override
public CompletableFuture<Endpoint> select(ClientRequestContext ctx, ScheduledExecutorService executor,
long timeoutMillis) {
tryInitialize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question) For my understanding, it seems reasonable to assume that pretty much all cases of extending AbstractEndpointSelector will want to listen to the endpointGroup.
Why isn't tryInitialize() called from the constructor? Was there a race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. tryInitialize() may invoke updateNewEndpoints synchronously if the endpointGroup has been updated with the initial endpoints. Since updateNewEndpoints can access member fields of subclasses, calling tryInitialize() in the constructor of AbstractEndpointSelector may result in updateNewEndpoints being invoked before those fields are initialized.

DefaultEndpointSelector(EndpointGroup endpointGroup,
LoadBalancerFactory<T> loadBalancerFactory) {
super(endpointGroup);
this.loadBalancerFactory = loadBalancerFactory;

protected void updateNewEndpoints(List<Endpoint> endpoints) {
lock.lock();
try {
if (closed) {
return;
}
loadBalancer = loadBalancerFactory.newLoadBalancer(loadBalancer, endpoints);

return asyncSelector.select(ctx, executor, endpointGroup.selectionTimeoutMillis());
}

@Override
public final @Nullable Endpoint selectNow(ClientRequestContext ctx) {
tryInitialize();
return doSelectNow(ctx);
}

/**
* Selects an {@link Endpoint} from this {@link EndpointGroup} immediately.
*/
@Nullable
protected abstract Endpoint doSelectNow(ClientRequestContext ctx);

private void refreshEndpoints(List<Endpoint> endpoints) {
// Allow subclasses to update the endpoints first.
updateNewEndpoints(endpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ final class DefaultEndpointSelector<T extends LoadBalancer<Endpoint, ClientReque
}
});
}
initialize();
}

@Override
Expand All @@ -70,7 +69,7 @@ protected void updateNewEndpoints(List<Endpoint> endpoints) {

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
public Endpoint doSelectNow(ClientRequestContext ctx) {
final T loadBalancer = this.loadBalancer;
if (loadBalancer == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ class TestEndpointSelector extends AbstractEndpointSelector {

protected TestEndpointSelector(EndpointGroup endpointGroup) {
super(endpointGroup);
initialize();
}

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
public Endpoint doSelectNow(ClientRequestContext ctx) {
if (counter.getAndIncrement() >= failAfter) {
throw cause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ void immediateSelection() {
final Endpoint endpoint = Endpoint.of("foo");
final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
final AbstractEndpointSelector endpointSelector = newSelector(endpoint);
assertThat(endpointSelector.isInitialized()).isFalse();
assertThat(endpointSelector.select(ctx, ctx.eventLoop(), Long.MAX_VALUE))
.isCompletedWithValue(endpoint);
// The AbstractEndpointSelector should be initialized when the first selection is made.
assertThat(endpointSelector.isInitialized()).isTrue();
assertThat(endpointSelector.pendingFutures()).isEmpty();
}

Expand All @@ -56,8 +59,11 @@ void delayedSelection() {
final DynamicEndpointGroup group = new DynamicEndpointGroup();
final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
final AbstractEndpointSelector endpointSelector = newSelector(group);
assertThat(endpointSelector.isInitialized()).isFalse();
final CompletableFuture<Endpoint> future = endpointSelector.select(ctx, ctx.eventLoop(),
Long.MAX_VALUE);
// The AbstractEndpointSelector should be initialized when the first selection is made.
assertThat(endpointSelector.isInitialized()).isTrue();
assertThat(future).isNotDone();

final Endpoint endpoint = Endpoint.of("foo");
Expand Down Expand Up @@ -128,16 +134,14 @@ void testRampingUpInitialSelection() {
}

private static AbstractEndpointSelector newSelector(EndpointGroup endpointGroup) {
final AbstractEndpointSelector selector = new AbstractEndpointSelector(endpointGroup) {
return new AbstractEndpointSelector(endpointGroup) {

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
public Endpoint doSelectNow(ClientRequestContext ctx) {
final List<Endpoint> endpoints = endpointGroup.endpoints();
return endpoints.isEmpty() ? null : endpoints.get(0);
}
};
selector.initialize();
return selector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,11 @@ private final class XdsEndpointGroupSelector extends AbstractEndpointSelector {

XdsEndpointGroupSelector(EndpointGroup endpointGroup) {
super(endpointGroup);
initialize();
}

@Override
@Nullable
public Endpoint selectNow(ClientRequestContext ctx) {
public Endpoint doSelectNow(ClientRequestContext ctx) {
final XdsLoadBalancer loadBalancer = XdsEndpointGroup.this.loadBalancer;
if (loadBalancer == null) {
return null;
Expand Down
Loading