Skip to content

Commit caeb348

Browse files
committed
Introduce SessionService::TerminationNotification as best-effort attempt to notify on exit
1 parent 3e1257e commit caeb348

8 files changed

Lines changed: 173 additions & 35 deletions

File tree

FishUtil/src/main/java/io/deephaven/util/process/FatalErrorReporter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ public interface FatalErrorReporter extends Thread.UncaughtExceptionHandler {
2121

2222
@FunctionalInterface
2323
interface Interceptor {
24-
void intercept(@NotNull String message, @NotNull Throwable throwable);
24+
/**
25+
* Report a fatal error.
26+
*
27+
* @param message the message
28+
* @param throwable the throwable
29+
* @param isFromUncaught true iff called from
30+
* {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)}.
31+
*/
32+
void intercept(@NotNull String message, @NotNull Throwable throwable, boolean isFromUncaught);
2533
}
2634
}

FishUtil/src/main/java/io/deephaven/util/process/FatalErrorReporterBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private final class FatalException extends RuntimeException {
3333

3434
@Override
3535
public final void report(@NotNull final String message, @NotNull final Throwable throwable) {
36-
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable));
36+
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable, false));
3737
reportImpl(message, throwable, false);
3838
}
3939

@@ -56,7 +56,7 @@ public final void reportAsync(@NotNull final String message) {
5656
@Override
5757
public final void uncaughtException(@NotNull final Thread thread, @NotNull final Throwable throwable) {
5858
final String message = "Uncaught exception in thread " + thread.getName();
59-
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable));
59+
interceptors.forEach(interceptor -> interceptor.intercept(message, throwable, true));
6060
reportImpl(message, throwable, true);
6161
}
6262

grpc-api/src/main/java/io/deephaven/grpc_api/console/ConsoleServiceGrpcImpl.java

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.deephaven.db.util.VariableProvider;
1616
import io.deephaven.db.v2.DynamicNode;
1717
import io.deephaven.figures.FigureWidgetTranslator;
18+
import io.deephaven.grpc_api.session.SessionCloseableObserver;
1819
import io.deephaven.grpc_api.session.SessionService;
1920
import io.deephaven.grpc_api.session.SessionState;
2021
import io.deephaven.grpc_api.session.SessionState.ExportBuilder;
@@ -402,40 +403,16 @@ public void fetchFigure(FetchFigureRequest request, StreamObserver<FetchFigureRe
402403
});
403404
}
404405

405-
private class LogBufferStreamAdapter implements Closeable, LogBufferRecordListener {
406-
private final SessionState session;
406+
private static class LogBufferStreamAdapter extends SessionCloseableObserver<LogSubscriptionData>
407+
implements LogBufferRecordListener {
407408
private final LogSubscriptionRequest request;
408-
private final StreamObserver<LogSubscriptionData> responseObserver;
409-
private boolean isClosed = false;
410409

411410
public LogBufferStreamAdapter(
412411
final SessionState session,
413412
final LogSubscriptionRequest request,
414413
final StreamObserver<LogSubscriptionData> responseObserver) {
415-
this.session = session;
414+
super(session, responseObserver);
416415
this.request = request;
417-
this.responseObserver = responseObserver;
418-
session.addOnCloseCallback(this);
419-
((ServerCallStreamObserver<LogSubscriptionData>) responseObserver).setOnCancelHandler(this::tryClose);
420-
}
421-
422-
@Override
423-
public void close() {
424-
synchronized (this) {
425-
if (isClosed) {
426-
return;
427-
}
428-
isClosed = true;
429-
}
430-
431-
safelyExecute(() -> logBuffer.unsubscribe(this));
432-
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
433-
}
434-
435-
private void tryClose() {
436-
if (session.removeOnCloseCallback(this)) {
437-
close();
438-
}
439416
}
440417

441418
@Override
@@ -465,9 +442,9 @@ public void record(LogBufferRecord record) {
465442
synchronized (responseObserver) {
466443
responseObserver.onNext(payload);
467444
}
468-
} catch (Throwable t) {
445+
} catch (Throwable ignored) {
469446
// we are ignoring exceptions here deliberately, and just shutting down
470-
tryClose();
447+
close();
471448
}
472449
}
473450
}

grpc-api/src/main/java/io/deephaven/grpc_api/runner/DeephavenApiServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static void startMain(PrintStream out, PrintStream err)
8080

8181
// Close outstanding sessions to give any gRPCs closure.
8282
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
83-
sessionService::closeAllSessions);
83+
sessionService::onShutdown);
8484

8585
// Finally wait for gRPC to exit now.
8686
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.deephaven.grpc_api.session;
2+
3+
import io.grpc.stub.ServerCallStreamObserver;
4+
import io.grpc.stub.StreamObserver;
5+
6+
import java.io.Closeable;
7+
8+
import static io.deephaven.grpc_api.util.GrpcUtil.safelyExecuteLocked;
9+
10+
public abstract class SessionCloseableObserver<T> implements Closeable {
11+
protected final SessionState session;
12+
protected final StreamObserver<T> responseObserver;
13+
private boolean isClosed = false;
14+
15+
public SessionCloseableObserver(
16+
final SessionState session,
17+
final StreamObserver<T> responseObserver) {
18+
this.session = session;
19+
this.responseObserver = responseObserver;
20+
session.addOnCloseCallback(this);
21+
((ServerCallStreamObserver<T>) responseObserver).setOnCancelHandler(this::close);
22+
}
23+
24+
@Override
25+
public final void close() {
26+
session.removeOnCloseCallback(this);
27+
28+
synchronized (this) {
29+
if (isClosed) {
30+
return;
31+
}
32+
isClosed = true;
33+
}
34+
35+
onClose();
36+
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
37+
}
38+
39+
/**
40+
* Override this to perform any additional specific clean up that must be performed.
41+
*/
42+
void onClose() {
43+
44+
}
45+
}

grpc-api/src/main/java/io/deephaven/grpc_api/session/SessionService.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,27 @@
44
import com.google.protobuf.ByteString;
55
import io.deephaven.db.tables.utils.DBDateTime;
66
import io.deephaven.db.tables.utils.DBTimeUtils;
7+
import io.deephaven.grpc_api.util.GrpcUtil;
8+
import io.deephaven.proto.backplane.grpc.TerminationNotificationResponse;
79
import io.deephaven.util.auth.AuthContext;
810
import io.deephaven.grpc_api.util.Scheduler;
11+
import io.deephaven.util.process.ProcessEnvironment;
912
import io.grpc.Status;
1013
import io.grpc.StatusRuntimeException;
14+
import io.grpc.stub.StreamObserver;
1115
import org.jetbrains.annotations.NotNull;
1216
import org.jetbrains.annotations.Nullable;
1317

1418
import javax.inject.Inject;
1519
import javax.inject.Named;
1620
import javax.inject.Singleton;
1721
import java.util.Deque;
22+
import java.util.List;
1823
import java.util.Map;
1924
import java.util.UUID;
2025
import java.util.concurrent.ConcurrentHashMap;
2126
import java.util.concurrent.ConcurrentLinkedDeque;
27+
import java.util.concurrent.CopyOnWriteArrayList;
2228

2329
@Singleton
2430
public class SessionService {
@@ -36,6 +42,8 @@ public class SessionService {
3642
private boolean cleanupJobInstalled = false;
3743
private final SessionCleanupJob sessionCleanupJob = new SessionCleanupJob();
3844

45+
private final List<TerminationNotificationListener> terminationListeners = new CopyOnWriteArrayList<>();
46+
3947
@Inject()
4048
public SessionService(final Scheduler scheduler, final SessionState.Factory sessionFactory,
4149
@Named("session.tokenExpireMs") final long tokenExpireMs) {
@@ -51,8 +59,53 @@ public SessionService(final Scheduler scheduler, final SessionState.Factory sess
5159

5260
// Protect ourselves from rotation spam, but be loose enough that any reasonable refresh strategy works.
5361
this.tokenRotateMs = tokenExpireMs / 5;
62+
63+
ProcessEnvironment.getGlobalFatalErrorReporter().addInterceptor(this::onFatalError);
64+
}
65+
66+
private synchronized void onFatalError(
67+
@NotNull String message,
68+
@NotNull Throwable throwable,
69+
boolean isFromUncaught) {
70+
final TerminationNotificationResponse.Builder builder =
71+
TerminationNotificationResponse.newBuilder()
72+
.setAbnormalTermination(true)
73+
.setReason(message);
74+
75+
builder.addStackTrace(throwable.getMessage());
76+
for (final StackTraceElement element : throwable.getStackTrace()) {
77+
builder.addStackTrace(element.toString());
78+
}
79+
80+
final TerminationNotificationResponse notification = builder.build();
81+
terminationListeners.forEach(listener -> listener.sendMessage(notification));
82+
terminationListeners.clear();
83+
}
84+
85+
public synchronized void onShutdown() {
86+
final TerminationNotificationResponse notification = TerminationNotificationResponse.newBuilder()
87+
.setAbnormalTermination(false)
88+
.build();
89+
terminationListeners.forEach(listener -> listener.sendMessage(notification));
90+
terminationListeners.clear();
91+
92+
closeAllSessions();
93+
}
94+
95+
/**
96+
* Add a listener who receives a single notification when this process is exiting and yet able to communicate with
97+
* the observer.
98+
*
99+
* @param session the session the observer belongs to
100+
* @param responseObserver the observer to notify
101+
*/
102+
public void addTerminationListener(
103+
final SessionState session,
104+
final StreamObserver<TerminationNotificationResponse> responseObserver) {
105+
terminationListeners.add(new TerminationNotificationListener(session, responseObserver));
54106
}
55107

108+
56109
/**
57110
* Create a new session object for the provided auth context.
58111
*
@@ -168,7 +221,7 @@ public SessionState getOptionalSession() {
168221

169222
/**
170223
* Reduces the liveness of the session.
171-
*
224+
*
172225
* @param session the session to close
173226
*/
174227
public void closeSession(final SessionState session) {
@@ -178,7 +231,7 @@ public void closeSession(final SessionState session) {
178231
session.onExpired();
179232
}
180233

181-
public void closeAllSessions() {
234+
private void closeAllSessions() {
182235
for (final TokenExpiration token : outstandingCookies) {
183236
// close all exports/resources acquired by the session
184237
token.session.onExpired();
@@ -239,4 +292,25 @@ public void run() {
239292
}
240293
}
241294
}
295+
296+
private final class TerminationNotificationListener
297+
extends SessionCloseableObserver<TerminationNotificationResponse> {
298+
public TerminationNotificationListener(
299+
final SessionState session,
300+
final StreamObserver<TerminationNotificationResponse> responseObserver) {
301+
super(session, responseObserver);
302+
}
303+
304+
@Override
305+
void onClose() {
306+
terminationListeners.remove(this);
307+
}
308+
309+
void sendMessage(final TerminationNotificationResponse response) {
310+
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
311+
responseObserver.onNext(response);
312+
responseObserver.onCompleted();
313+
});
314+
}
315+
}
242316
}

grpc-api/src/main/java/io/deephaven/grpc_api/session/SessionServiceGrpcImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@
1313
import io.deephaven.io.logger.Logger;
1414
import io.deephaven.proto.backplane.grpc.*;
1515
import io.deephaven.util.auth.AuthContext;
16+
import io.deephaven.util.process.ProcessEnvironment;
1617
import io.grpc.*;
1718
import io.grpc.stub.ServerCallStreamObserver;
1819
import io.grpc.stub.StreamObserver;
1920

2021
import javax.inject.Inject;
2122
import javax.inject.Singleton;
23+
import java.util.List;
2224
import java.util.Optional;
2325
import java.util.UUID;
26+
import java.util.concurrent.CopyOnWriteArrayList;
2427

2528
public class SessionServiceGrpcImpl extends SessionServiceGrpc.SessionServiceImplBase {
2629
// TODO (#997): use flight AuthConstants
@@ -199,6 +202,15 @@ public void exportNotifications(final ExportNotificationRequest request,
199202
});
200203
}
201204

205+
@Override
206+
public void terminationNotification(TerminationNotificationRequest request,
207+
StreamObserver<TerminationNotificationResponse> responseObserver) {
208+
GrpcUtil.rpcWrapper(log, responseObserver, () -> {
209+
final SessionState session = service.getCurrentSession();
210+
service.addTerminationListener(session, responseObserver);
211+
});
212+
}
213+
202214
@Singleton
203215
public static class AuthServerInterceptor implements ServerInterceptor {
204216
private final SessionService service;
@@ -222,4 +234,5 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
222234
return Contexts.interceptCall(newContext, serverCall, metadata, serverCallHandler);
223235
}
224236
}
237+
225238
}

proto/proto-backplane-grpc/src/main/proto/deephaven/proto/session.proto

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ service SessionService {
6767
* updated exports. An export id of zero will be sent to indicate all pre-existing exports have been sent.
6868
*/
6969
rpc ExportNotifications(ExportNotificationRequest) returns (stream ExportNotification) {}
70+
71+
/*
72+
* Receive a best-effort message on-exit indicating why this worker is exiting. Reception of this message cannot be
73+
* guaranteed.
74+
*/
75+
rpc TerminationNotification(TerminationNotificationRequest) returns (TerminationNotificationResponse) {}
7076
}
7177

7278
/*
@@ -172,3 +178,18 @@ message ExportNotification {
172178
*/
173179
string dependent_handle = 4;
174180
}
181+
182+
message TerminationNotificationRequest {
183+
// Intentionally empty and is here for backwards compatibility should this API change.
184+
}
185+
186+
message TerminationNotificationResponse {
187+
// whether or not this termination is expected
188+
bool abnormal_termination = 1;
189+
// if additional information is available then provide it in this field
190+
string reason = 2;
191+
// if this is due to an exception, whether or not it was uncaught
192+
bool is_from_uncaught_exception = 3;
193+
// if the termination is caused by an exception of any sort, provide the detailed stack trace here
194+
repeated string stack_trace = 4;
195+
}

0 commit comments

Comments
 (0)