Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public interface FatalErrorReporter extends Thread.UncaughtExceptionHandler {

@FunctionalInterface
interface Interceptor {
void intercept(@NotNull String message, @NotNull Throwable throwable);
/**
* Report a fatal error.
*
* @param message the message
* @param throwable the throwable
* @param isFromUncaught true iff called from
* {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)}.
*/
void intercept(@NotNull String message, @NotNull Throwable throwable, boolean isFromUncaught);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private final class FatalException extends RuntimeException {

@Override
public final void report(@NotNull final String message, @NotNull final Throwable throwable) {
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable));
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable, false));
reportImpl(message, throwable, false);
}

Expand All @@ -56,7 +56,7 @@ public final void reportAsync(@NotNull final String message) {
@Override
public final void uncaughtException(@NotNull final Thread thread, @NotNull final Throwable throwable) {
final String message = "Uncaught exception in thread " + thread.getName();
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable));
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable, true));
reportImpl(message, throwable, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.db.util.VariableProvider;
import io.deephaven.db.v2.DynamicNode;
import io.deephaven.figures.FigureWidgetTranslator;
import io.deephaven.grpc_api.session.SessionCloseableObserver;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.grpc_api.session.SessionState;
import io.deephaven.grpc_api.session.SessionState.ExportBuilder;
Expand All @@ -33,13 +34,11 @@
import io.deephaven.lang.shared.lsp.CompletionCancelled;
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.backplane.script.grpc.*;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -402,40 +401,16 @@ public void fetchFigure(FetchFigureRequest request, StreamObserver<FetchFigureRe
});
}

private class LogBufferStreamAdapter implements Closeable, LogBufferRecordListener {
private final SessionState session;
private static class LogBufferStreamAdapter extends SessionCloseableObserver<LogSubscriptionData>
implements LogBufferRecordListener {
private final LogSubscriptionRequest request;
private final StreamObserver<LogSubscriptionData> responseObserver;
private boolean isClosed = false;

public LogBufferStreamAdapter(
final SessionState session,
final LogSubscriptionRequest request,
final StreamObserver<LogSubscriptionData> responseObserver) {
this.session = session;
super(session, responseObserver);
this.request = request;
this.responseObserver = responseObserver;
session.addOnCloseCallback(this);
((ServerCallStreamObserver<LogSubscriptionData>) responseObserver).setOnCancelHandler(this::tryClose);
}

@Override
public void close() {
synchronized (this) {
if (isClosed) {
return;
}
isClosed = true;
}

safelyExecute(() -> logBuffer.unsubscribe(this));
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
}

private void tryClose() {
if (session.removeOnCloseCallback(this)) {
close();
}
}

@Override
Expand Down Expand Up @@ -465,9 +440,9 @@ public void record(LogBufferRecord record) {
synchronized (responseObserver) {
responseObserver.onNext(payload);
}
} catch (Throwable t) {
} catch (Throwable ignored) {
// we are ignoring exceptions here deliberately, and just shutting down
tryClose();
close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void startMain(PrintStream out, PrintStream err)

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::closeAllSessions);
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.deephaven.grpc_api.session;

import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.io.Closeable;

import static io.deephaven.grpc_api.util.GrpcUtil.safelyExecuteLocked;

public abstract class SessionCloseableObserver<T> implements Closeable {
protected final SessionState session;
protected final StreamObserver<T> responseObserver;
private boolean isClosed = false;

public SessionCloseableObserver(
final SessionState session,
final StreamObserver<T> responseObserver) {
this.session = session;
this.responseObserver = responseObserver;
session.addOnCloseCallback(this);
((ServerCallStreamObserver<T>) responseObserver).setOnCancelHandler(this::close);
}

@Override
public final void close() {
session.removeOnCloseCallback(this);

synchronized (this) {
if (isClosed) {
return;
}
isClosed = true;
}

onClose();
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
}

/**
* Override this to perform any additional specific clean up that must be performed.
*/
void onClose() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,44 @@

import com.github.f4b6a3.uuid.UuidCreator;
import com.google.protobuf.ByteString;
import io.deephaven.configuration.Configuration;
import io.deephaven.db.tables.utils.DBDateTime;
import io.deephaven.db.tables.utils.DBTimeUtils;
import io.deephaven.util.auth.AuthContext;
import io.deephaven.grpc_api.util.GrpcUtil;
import io.deephaven.grpc_api.util.Scheduler;
import io.deephaven.proto.backplane.grpc.TerminationNotificationResponse;
import io.deephaven.util.auth.AuthContext;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

@Singleton
public class SessionService {

static final long MIN_COOKIE_EXPIRE_MS = 10_000; // 10 seconds
private static final int MAX_STACK_TRACE_CAUSAL_DEPTH =
Configuration.getInstance().getIntegerForClassWithDefault(SessionService.class,
"maxStackTraceCausedByDepth", 20);
private static final int MAX_STACK_TRACE_DEPTH =
Configuration.getInstance().getIntegerForClassWithDefault(SessionService.class,
"maxStackTraceDepth", 50);

private final Scheduler scheduler;
private final SessionState.Factory sessionFactory;
Expand All @@ -36,6 +52,8 @@ public class SessionService {
private boolean cleanupJobInstalled = false;
private final SessionCleanupJob sessionCleanupJob = new SessionCleanupJob();

private final List<TerminationNotificationListener> terminationListeners = new CopyOnWriteArrayList<>();

@Inject()
public SessionService(final Scheduler scheduler, final SessionState.Factory sessionFactory,
@Named("session.tokenExpireMs") final long tokenExpireMs) {
Expand All @@ -51,6 +69,65 @@ public SessionService(final Scheduler scheduler, final SessionState.Factory sess

// Protect ourselves from rotation spam, but be loose enough that any reasonable refresh strategy works.
this.tokenRotateMs = tokenExpireMs / 5;

if (ProcessEnvironment.tryGet() != null) {
ProcessEnvironment.getGlobalFatalErrorReporter().addInterceptor(this::onFatalError);
}
}

private synchronized void onFatalError(
@NotNull String message,
@NotNull Throwable throwable,
boolean isFromUncaught) {
final TerminationNotificationResponse.Builder builder =
TerminationNotificationResponse.newBuilder()
.setAbnormalTermination(true)
.setIsFromUncaughtException(isFromUncaught)
.setReason(message);

// TODO (core#801): revisit this error communication to properly match the API Error mode
for (int depth = 0; throwable != null && depth < MAX_STACK_TRACE_CAUSAL_DEPTH; ++depth) {
builder.addStackTraces(transformToProtoBuf(throwable));
throwable = throwable.getCause();
}

final TerminationNotificationResponse notification = builder.build();
terminationListeners.forEach(listener -> listener.sendMessage(notification));
terminationListeners.clear();
}

private static TerminationNotificationResponse.StackTrace transformToProtoBuf(@NotNull final Throwable throwable) {
return TerminationNotificationResponse.StackTrace.newBuilder()
.setType(throwable.getClass().getName())
.setMessage(Objects.toString(throwable.getMessage()))
.addAllElements(Arrays.stream(throwable.getStackTrace())
.limit(MAX_STACK_TRACE_DEPTH)
.map(StackTraceElement::toString)
.collect(Collectors.toList()))
.build();
}

public synchronized void onShutdown() {
final TerminationNotificationResponse notification = TerminationNotificationResponse.newBuilder()
.setAbnormalTermination(false)
.build();
terminationListeners.forEach(listener -> listener.sendMessage(notification));
terminationListeners.clear();

closeAllSessions();
}

/**
* Add a listener who receives a single notification when this process is exiting and yet able to communicate with
* the observer.
*
* @param session the session the observer belongs to
* @param responseObserver the observer to notify
*/
public void addTerminationListener(
final SessionState session,
final StreamObserver<TerminationNotificationResponse> responseObserver) {
terminationListeners.add(new TerminationNotificationListener(session, responseObserver));
}

/**
Expand Down Expand Up @@ -168,7 +245,7 @@ public SessionState getOptionalSession() {

/**
* Reduces the liveness of the session.
*
*
* @param session the session to close
*/
public void closeSession(final SessionState session) {
Expand Down Expand Up @@ -239,4 +316,25 @@ public void run() {
}
}
}

private final class TerminationNotificationListener
extends SessionCloseableObserver<TerminationNotificationResponse> {
public TerminationNotificationListener(
final SessionState session,
final StreamObserver<TerminationNotificationResponse> responseObserver) {
super(session, responseObserver);
}

@Override
void onClose() {
terminationListeners.remove(this);
}

void sendMessage(final TerminationNotificationResponse response) {
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(response);
responseObserver.onCompleted();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ public void exportNotifications(final ExportNotificationRequest request,
});
}

@Override
public void terminationNotification(TerminationNotificationRequest request,
StreamObserver<TerminationNotificationResponse> responseObserver) {
GrpcUtil.rpcWrapper(log, responseObserver, () -> {
final SessionState session = service.getCurrentSession();
service.addTerminationListener(session, responseObserver);
});
}

@Singleton
public static class AuthServerInterceptor implements ServerInterceptor {
private final SessionService service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ service SessionService {
* updated exports. An export id of zero will be sent to indicate all pre-existing exports have been sent.
*/
rpc ExportNotifications(ExportNotificationRequest) returns (stream ExportNotification) {}

/*
* Receive a best-effort message on-exit indicating why this server is exiting. Reception of this message cannot be
* guaranteed.
*/
rpc TerminationNotification(TerminationNotificationRequest) returns (TerminationNotificationResponse) {}
}

/*
Expand Down Expand Up @@ -172,3 +178,24 @@ message ExportNotification {
*/
string dependent_handle = 4;
}

message TerminationNotificationRequest {
// Intentionally empty and is here for backwards compatibility should this API change.
}

message TerminationNotificationResponse {
// whether or not this termination is expected
bool abnormal_termination = 1;
// if additional information is available then provide it in this field
string reason = 2;
// if this is due to an exception, whether or not it was uncaught
bool is_from_uncaught_exception = 3;
// if applicable, the list of stack traces in reverse causal order
repeated StackTrace stack_traces = 4;

message StackTrace {
string type = 1;
string message = 2;
repeated string elements = 3;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,18 @@ static AuthTokenPromiseSupplier oneShot(ConnectToken initialToken) {
private boolean connected;
private boolean closed;
private boolean hasDisconnected;
private boolean notifiedConnectionError = false;

public QueryConnectable(Supplier<Promise<ConnectToken>> authTokenPromiseSupplier) {
this.connection = JsLazy.of(() -> new WorkerConnection(this, authTokenPromiseSupplier));
}

public void notifyConnectionError(ResponseStreamWrapper.Status status) {
if (notifiedConnectionError) {
return;
}
notifiedConnectionError = true;

CustomEventInit event = CustomEventInit.create();
event.setDetail(JsPropertyMap.of(
"status", status.getCode(),
Expand Down
Loading