Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions sdks/python/src/opik/runner/activate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Runner activation — called after the user's module loads to start the in-process loop."""

import atexit
import logging
import os
import signal
Expand All @@ -18,18 +19,23 @@

_started = False
_lock = threading.Lock()
_shutdown_by_signal = False


def install_signal_handlers(shutdown_event: threading.Event) -> None:
def install_signal_handlers(shutdown_event: threading.Event) -> bool:
def handler(signum: int, frame: object) -> None:
global _shutdown_by_signal
_shutdown_by_signal = True
LOGGER.info("Received signal %s, shutting down", signum)
shutdown_event.set()

try:
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
return True
except ValueError:
LOGGER.warning("Cannot install signal handlers outside main thread")
return False


def activate_runner() -> None:
Expand All @@ -44,12 +50,23 @@ def activate_runner() -> None:
_started = True

shutdown_event = threading.Event()
install_signal_handlers(shutdown_event)
if install_signal_handlers(shutdown_event):
atexit.register(_warn_if_no_blocking_call)

t = threading.Thread(target=_run, args=(shutdown_event,), daemon=True)
t.start()


def _warn_if_no_blocking_call() -> None:
if not _shutdown_by_signal:
console = Console(stderr=True)
console.print(
"\n[bold yellow]Warning:[/bold yellow] The process exited without blocking. "
"The runner needs the process to stay alive to process jobs.\n"
"Use a server framework like uvicorn or Flask to keep the process running.\n",
)


def _run(shutdown_event: threading.Event) -> None:
runner_id = os.environ.get("OPIK_RUNNER_ID", "")
project_name = os.environ.get("OPIK_PROJECT_NAME", "")
Expand Down
47 changes: 42 additions & 5 deletions sdks/python/tests/unit/runner/test_activate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@

import pytest

from opik.runner.activate import install_signal_handlers
import opik.runner.activate as activate_module


@pytest.fixture(autouse=True)
def restore_signal_handlers():
def restore_signal_handlers_and_flag():
prev_term = signal.getsignal(signal.SIGTERM)
prev_int = signal.getsignal(signal.SIGINT)
prev_flag = activate_module._shutdown_by_signal
yield
signal.signal(signal.SIGTERM, prev_term)
signal.signal(signal.SIGINT, prev_int)
activate_module._shutdown_by_signal = prev_flag


def test_install_signal_handlers__sets_shutdown_event_on_sigterm():
shutdown_event = threading.Event()
install_signal_handlers(shutdown_event)
activate_module.install_signal_handlers(shutdown_event)

signal.raise_signal(signal.SIGTERM)

Expand All @@ -26,7 +28,7 @@ def test_install_signal_handlers__sets_shutdown_event_on_sigterm():

def test_install_signal_handlers__sets_shutdown_event_on_sigint():
shutdown_event = threading.Event()
install_signal_handlers(shutdown_event)
activate_module.install_signal_handlers(shutdown_event)

signal.raise_signal(signal.SIGINT)

Expand All @@ -39,7 +41,7 @@ def test_install_signal_handlers__from_background_thread__does_not_raise():

def run():
try:
install_signal_handlers(shutdown_event)
activate_module.install_signal_handlers(shutdown_event)
except Exception as e:
errors.append(e)

Expand All @@ -49,3 +51,38 @@ def run():

assert not errors
assert not shutdown_event.is_set()


def test_signal_handler__sets_shutdown_by_signal_flag():
activate_module._shutdown_by_signal = False
shutdown_event = threading.Event()
activate_module.install_signal_handlers(shutdown_event)

signal.raise_signal(signal.SIGTERM)

assert activate_module._shutdown_by_signal is True


def test_signal_then_warn__full_flow_no_warning(capsys):
"""End-to-end: install handlers, receive signal, atexit callback stays silent."""
shutdown_event = threading.Event()
activate_module.install_signal_handlers(shutdown_event)

signal.raise_signal(signal.SIGTERM)

assert shutdown_event.is_set()
activate_module._warn_if_no_blocking_call()

captured = capsys.readouterr()
assert captured.err == ""


def test_no_signal_then_warn__full_flow_prints_warning(capsys):
"""End-to-end: install handlers, no signal received, atexit callback warns."""
shutdown_event = threading.Event()
activate_module.install_signal_handlers(shutdown_event)

activate_module._warn_if_no_blocking_call()

captured = capsys.readouterr()
assert "exited without blocking" in captured.err
12 changes: 12 additions & 0 deletions sdks/typescript/src/opik/runner/activate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { InProcessRunnerLoop } from "./InProcessRunnerLoop";
import { installPrefixedOutput } from "./prefixedOutput";

let _started = false;
let _shutdownBySignal = false;

export function activateRunner(): void {
if (process.env.OPIK_RUNNER_MODE !== "true") return;
Expand Down Expand Up @@ -80,13 +81,24 @@ async function _run(): Promise<void> {
loop.start();

const shutdownHandler = () => {
_shutdownBySignal = true;
logger.info("Received shutdown signal, stopping runner...");
loop.shutdown();
client.flush().catch(() => {}).finally(() => process.exit(0));
};

process.once("SIGTERM", shutdownHandler);
process.once("SIGINT", shutdownHandler);

process.on("exit", () => {
if (!_shutdownBySignal) {
console.error(
"\nWarning: The process exited without blocking. " +
"The runner needs the process to stay alive to process jobs.\n" +
"Use a server framework like express or fastify to keep the process running.\n"
);
}
});
}

function printBanner(runnerId: string, projectName: string): void {
Expand Down
81 changes: 81 additions & 0 deletions sdks/typescript/tests/unit/runner/activate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ describe("activateRunner", () => {

afterEach(() => {
process.env = originalEnv;
process.removeAllListeners("SIGTERM");
process.removeAllListeners("SIGINT");
process.removeAllListeners("exit");
vi.restoreAllMocks();
});

Expand Down Expand Up @@ -183,4 +186,82 @@ describe("activateRunner", () => {

expect(registerAgentsMock).toHaveBeenCalledTimes(1);
});

it("prints warning when process exits without signal", async () => {
registryMap.set("agent-a", {
name: "agent-a",
func: () => {},
project: "p",
params: [],
docstring: "",
});

const exitHandlers: (() => void)[] = [];
const processOnSpy = vi.spyOn(process, "on").mockImplementation(((event: string, handler: () => void) => {
if (event === "exit") exitHandlers.push(handler);
return process;
}) as typeof process.on);

const stderrSpy = vi.spyOn(console, "error").mockImplementation(() => {});

const { activateRunner } = await import("@/runner/activate");
activateRunner();
await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));

exitHandlers.forEach((h) => h());

expect(stderrSpy).toHaveBeenCalledWith(
expect.stringContaining("exited without blocking")
);

processOnSpy.mockRestore();
stderrSpy.mockRestore();
});

it("does not print warning when shutdown was triggered by signal", async () => {
registryMap.set("agent-a", {
name: "agent-a",
func: () => {},
project: "p",
params: [],
docstring: "",
});

const exitHandlers: (() => void)[] = [];
const signalHandlers: Map<string, () => void> = new Map();
const processOnSpy = vi.spyOn(process, "on").mockImplementation(((event: string, handler: () => void) => {
if (event === "exit") exitHandlers.push(handler);
return process;
}) as typeof process.on);
const processOnceSpy = vi.spyOn(process, "once").mockImplementation(((event: string, handler: () => void) => {
signalHandlers.set(event, handler);
return process;
}) as typeof process.once);

const stderrSpy = vi.spyOn(console, "error").mockImplementation(() => {});

const { activateRunner } = await import("@/runner/activate");
activateRunner();
await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));

const sigterm = signalHandlers.get("SIGTERM");
expect(sigterm).toBeDefined();

const exitSpy = vi.spyOn(process, "exit").mockImplementation((() => {}) as never);
sigterm!();
await new Promise((r) => setImmediate(r));

exitHandlers.forEach((h) => h());

expect(stderrSpy).not.toHaveBeenCalledWith(
expect.stringContaining("exited without blocking")
);

processOnSpy.mockRestore();
processOnceSpy.mockRestore();
exitSpy.mockRestore();
stderrSpy.mockRestore();
});
});
Loading