Skip to content

While client closing, the election listener throws an exception #1409

@sefamertkaya

Description

@sefamertkaya

Versions

  • etcd: 3.5.16
  • jetcd: 0.8.3
  • java: 21

Describe the bug

While etcd client is closing, the onError method of the listener that I registered to the observer in the election class is triggered. This problem only occurs when client closing. When I put a breakpoint, I saw that it was happening while the ConnectionManager was closing.

To Reproduce

Client:

this.client = Client.builder()
    .connectTimeout(Duration.ofSeconds(this.sessionTimeout))
    .endpoints(this.endpoints.toArray(new String[]{}))
    .build();

Observer:

this.election.observe(this.electionByteSequence, new ElectionListener());

ElectionListener

private static class ElectionListener implements Listener {

  @Override
  public void onNext(LeaderResponse response) {
    System.out.println("Leader lease id:" + response.getKv().getLease());
  }

  @Override
  public void onError(Throwable throwable) {
    throwable.printStackTrace();
  }

  @Override
  public void onCompleted() {
    System.out.println("Completed");
  }
}

Close

public void disconnect() {
  this.leaseKeepAliveClient.close();
  this.lease.revoke(this.leaseId);
  this.client.close();
}

Exception

io.etcd.jetcd.common.exception.EtcdException: Channel shutdownNow invoked
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException(EtcdExceptionFactory.java:35)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.fromStatus(EtcdExceptionFactory.java:83)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:79)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:74)
	at io.etcd.jetcd.impl.ElectionImpl.lambda$observe$3(ElectionImpl.java:119)
	at io.vertx.grpc.stub.StreamObserverReadStream.onError(StreamObserverReadStream.java:44)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:305)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:276)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:258)
	at io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:305)
	at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
	at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:750)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:690)
	at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:840)
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
	at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126)
	at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:835)
	at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55)
	at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1028)
	at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34)
	at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:691)
	at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:458)
	at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221)
	at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:441)
	at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278)
	at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
	at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
	at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:199)
	at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:444)
	at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:400)
	at io.grpc.netty.NettyClientHandler$6.visit(NettyClientHandler.java:791)
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.forEachActiveStream(DefaultHttp2Connection.java:1007)
	at io.netty.handler.codec.http2.DefaultHttp2Connection.forEachActiveStream(DefaultHttp2Connection.java:209)
	at io.grpc.netty.NettyClientHandler.forcefulClose(NettyClientHandler.java:782)
	at io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:347)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:895)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1010)
	at io.netty.channel.AbstractChannel.write(AbstractChannel.java:296)
	at io.grpc.netty.NettyClientTransport$6.run(NettyClientTransport.java:341)
	at io.grpc.netty.WriteQueue$RunnableCommand.run(WriteQueue.java:176)
	at io.grpc.netty.WriteQueue.flush(WriteQueue.java:128)
	at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
	at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Expected behavior

I think the oncomplete method should be triggered when closing. Maybe I did something wrong. Is there a different order for closing?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions