Skip to content

[NA] [BE] [SDK] Opik Connect#6074

Merged
Nimrod007 merged 32 commits intomainfrom
collinc/bridge
Apr 7, 2026
Merged

[NA] [BE] [SDK] Opik Connect#6074
Nimrod007 merged 32 commits intomainfrom
collinc/bridge

Conversation

@collincunn
Copy link
Copy Markdown
Collaborator

Details

Read docs

Change checklist

  • User facing
  • Documentation update

Issues

Testing

  • new unit and integration tests

Documentation

@github-actions github-actions bot added documentation Improvements or additions to documentation dependencies Pull requests that update a dependency file labels Apr 4, 2026
@github-actions github-actions bot added python Pull requests that update Python code java Pull requests that update Java code Backend tests Including test files, or tests related like configuration. typescript *.ts *.tsx Python SDK TypeScript SDK labels Apr 4, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Backend Tests - Integration Group 12

251 tests   248 ✅  3m 5s ⏱️
 46 suites    3 💤
 46 files      0 ❌

Results for commit cc67a16.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Backend Tests - Integration Group 15

235 tests   233 ✅  3m 39s ⏱️
 21 suites    2 💤
 21 files      0 ❌

Results for commit c6e0bb1.

♻️ This comment has been updated with latest results.

Comment thread apps/opik-documentation/documentation/fern/openapi/opik.yaml
Comment thread apps/opik-documentation/documentation/fern/openapi/opik.yaml
Comment thread sdks/python/src/opik/runner/supervisor.py
Comment thread sdks/python/tests/unit/runner/test_file_watcher.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_loop.py Outdated
Comment thread sdks/python/setup.py Outdated
Comment thread apps/opik-backend/src/main/java/com/comet/opik/domain/LocalRunnerService.java Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/edit_file.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/edit_file.py Outdated
Comment thread sdks/python/tests/unit/runner/bridge_handlers/test_list_files.py Outdated
Comment thread sdks/python/tests/unit/runner/bridge_handlers/test_edit_file.py Outdated
Comment thread sdks/python/src/opik/runner/supervisor.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/list_files.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/list_files.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_loop.py
Comment thread sdks/python/tests/unit/runner/bridge_handlers/test_read_file.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Python SDK E2E Tests Results (Python 3.14)

361 tests  ±0   359 ✅ ±0   14m 4s ⏱️ -9s
  1 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4a10f06. ± Comparison against base commit d295a1e.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d5284-09f6-74b4-b993-dcf50aef5c53]
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d56c5-66c0-7e5f-b4ab-300826724012]

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Python SDK E2E Tests Results (Python 3.12)

361 tests  ±0   359 ✅ ±0   14m 12s ⏱️ -35s
  1 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4a10f06. ± Comparison against base commit d295a1e.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d5284-05ce-7e28-941b-f9a661920002]
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d56c5-593b-7cbb-8412-54a1c148e6fa]

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Python SDK E2E Tests Results (Python 3.13)

361 tests  ±0   359 ✅ ±0   14m 12s ⏱️ -36s
  1 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4a10f06. ± Comparison against base commit d295a1e.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d5283-9c24-7280-9330-786e9616d942]
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d56c5-6ccf-7792-b290-5f06443bb023]

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Python SDK E2E Tests Results (Python 3.11)

361 tests  ±0   359 ✅ ±0   14m 20s ⏱️ -29s
  1 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4a10f06. ± Comparison against base commit d295a1e.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d5284-0aab-70f3-8b81-8a15b453f13b]
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d56c5-442b-76b4-85e6-9b9baa907012]

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 4, 2026

Python SDK E2E Tests Results (Python 3.10)

361 tests  ±0   359 ✅ ±0   14m 4s ⏱️ -18s
  1 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4a10f06. ± Comparison against base commit d295a1e.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d5283-b2fe-70d7-9eca-63b1d40fd78e]
tests.e2e.test_tracing ‑ test_opik_client__update_trace__happy_flow[None-None-None-None-019d56c5-89cd-7c98-baad-1f8db83816dd]

♻️ This comment has been updated with latest results.

Comment thread sdks/python/src/opik/cli/connect.py
Comment thread sdks/python/src/opik/runner/bridge_handlers/edit_file.py
Comment thread sdks/python/src/opik/runner/bridge_handlers/edit_utils.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/edit_file.py Outdated
Comment thread sdks/python/src/opik/rest_api/types/bridge_command_type.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_loop.py Outdated
Comment thread sdks/python/src/opik/runner/snapshot.py Outdated
Comment thread sdks/python/src/opik/runner/stability_guard.py
Comment thread sdks/python/src/opik/runner/tui.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/read_file.py Outdated
Comment thread sdks/python/src/opik/runner/bridge_handlers/common.py
Comment thread sdks/python/src/opik/runner/bridge_handlers/exec_command.py
Comment thread sdks/python/tests/unit/runner/test_bridge_handlers.py
Copy link
Copy Markdown
Collaborator

@Nimrod007 Nimrod007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backend Review

1. N+1 Redis reads in nextBridgeCommands (non-blocking)

In LocalRunnerServiceImpl.nextBridgeCommands (~lines 1397-1415): the status batch correctly updates all commands in one round-trip, but then each command is read individually with readAllMap() — that's N extra Redis calls. Since you already have the command IDs and just wrote the status, you could read all commands in the same batch (or a second batch) to cut this to 2 round-trips total instead of N+2.

Not blocking, but worth considering if bridge usage scales — at maxCommands=20 this is 22 Redis calls where 2-3 would do.

2. Missing workspace validation on bridge commands (blocking)

loadValidatedBridgeCommand validates that runnerId matches the command, but doesn't check that the command's workspace_id matches the caller's workspace. The runner ownership check covers it indirectly today (a runner belongs to one workspace), but for defense-in-depth we should validate workspaceId.equals(fields.get(BRIDGE_FIELD_WORKSPACE_ID)) here — same pattern as loadValidatedJob does for jobs.

This applies to all callers: reportBridgeCommandResult, getBridgeCommand, and awaitBridgeCommand. The method signature already receives runnerId but should also take workspaceId and enforce the check.

Copy link
Copy Markdown
Collaborator

@Nimrod007 Nimrod007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update on review comment #2 (workspace validation)

Softening my earlier "request changes" on the workspace validation point. Looking more closely:

The caller's workspace is validated indirectly through the chain: requestContext.getWorkspaceId()validateRunnerOwnership(runnerId, workspaceId, userName)loadValidatedBridgeCommand(runnerId, commandId). Since a runner is always scoped to one workspace, this holds today.

Adding workspaceId.equals(fields.get(BRIDGE_FIELD_WORKSPACE_ID)) in loadValidatedBridgeCommand would be a nice defense-in-depth consistency with loadValidatedJob, but it's not an active vulnerability. Consider it a suggestion, not a blocker.

Nimrod007
Nimrod007 previously approved these changes Apr 6, 2026
Copy link
Copy Markdown
Collaborator

@Nimrod007 Nimrod007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving — the workspace validation concern is just a suggestion for defense-in-depth, not a blocker. The N+1 Redis reads point is also non-blocking.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread apps/opik-backend/src/main/java/com/comet/opik/domain/LocalRunnerService.java Outdated
@github-actions github-actions bot removed the Frontend label Apr 6, 2026
StringRedisClient.getList already applies StringCodec.INSTANCE internally,
so passing it as a second argument causes a compilation error.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@petrotiurin petrotiurin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will continue the review shortly.

)

LOGGER.info("Runner activated")
if os.environ.get("OPIK_SUPERVISED") != "true":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we already have supervised local variable that indicates if we're supervised or not, let's use it.

Comment on lines +21 to +23
_DEFAULT_COMMAND_TIMEOUT = 30.0
_MIN_COMMAND_TIMEOUT = 1.0
_MAX_COMMAND_TIMEOUT = 300.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add units to those timeouts so it's obvious what they represent. e.g. _MAX_COMMAND_TIMEOUT_SECONDS

Comment on lines +72 to +73
inflight: Set[Future] = set()
inflight_lock = threading.Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep track of the futures currently in flight? Can we rely only on semaphore for backpressure? We're not using them anywhere and it just adds additional locking/unlocking.

def _poll(self) -> List[BridgeCommandItem]:
resp = self._api.runners.next_bridge_commands(
self._runner_id,
max_commands=10,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make max_commands a constant as well.

Comment on lines +58 to +59
on_command_start: Optional[Any] = None,
on_command_end: Optional[Any] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at least Optional[Callable], even better with the types it expects, since we rely on a specific interface in L143

Comment on lines +104 to +109
if os.environ.get("OPIK_SUPERVISED") != "true":
heartbeat_thread = threading.Thread(
target=self._heartbeat_loop,
daemon=True,
)
heartbeat_thread.start()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this env var? Would we ever run the runner unsupervised?

Comment on lines +31 to +33
_ENTRYPOINT_PATTERNS = [
re.compile(r"entrypoint\s*=\s*True"),
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for typescript, boolean true is lowercased, true, but follows the same pattern. We should add entrypoint:\s*true here.

Comment on lines +35 to +37
_CONFIGURATION_PATTERNS = [
re.compile(r"AgentConfig"),
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work for typescript as we don't use the same inheritance pattern there. Perhaps it's more straight-forward to check for presence of getAgentConfigVersion/get_agent_config_version calls here?

Comment on lines +28 to +29
_HEARTBEAT_INTERVAL = 5.0
_GRACEFUL_TIMEOUT = 10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add _SECONDS suffix if this is seconds.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some e2e tests here? Just the happy paths so we know all the components work as expected.

@Nimrod007 Nimrod007 merged commit a8f43d8 into main Apr 7, 2026
182 of 187 checks passed
@Nimrod007 Nimrod007 deleted the collinc/bridge branch April 7, 2026 12:12
Nimrod007 added a commit that referenced this pull request Apr 7, 2026
Nimrod007 added a commit that referenced this pull request Apr 7, 2026
@Nimrod007 Nimrod007 restored the collinc/bridge branch April 7, 2026 12:14
Copy link
Copy Markdown
Contributor

@BorisTkachenko BorisTkachenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@collincunn Left some BE related comments

}

return BridgeCommandBatchResponse.builder().commands(items).build();
}).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use fully qualified Objects, add import

if (depth > DEEP_MERGE_MAX_DEPTH || !base.isObject() || !override.isObject()) {
return override;
}
com.fasterxml.jackson.databind.node.ObjectNode result = base.deepCopy();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use fully qualified Objects, plz add import for it

String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName);
List<String> capabilities = body != null ? body.capabilities() : null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed here, you are still checking for null in the service layer.

@ApiResponse(responseCode = "404", description = "Runner not found or not connected", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "409", description = "Runner does not support bridge", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "429", description = "Too many requests", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response submitBridgeCommand(@PathParam("runnerId") UUID runnerId,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match existing codebase would be better to rename to createBridgeCommand

Comment on lines +1401 to +1414
RBatch readBatch = redisClient.createBatch();
List<RFuture<Map<String, String>>> readFutures = new ArrayList<>(commandIds.size());
for (String cmdIdStr : commandIds) {
UUID commandId = UUID.fromString(cmdIdStr);
readFutures.add(readBatch.<String, String>getMap(
bridgeCommandKey(commandId), StringCodec.INSTANCE).readAllMapAsync());
}
readBatch.execute();

List<String> liveCommandIds = new ArrayList<>();
List<Map<String, String>> liveFields = new ArrayList<>();
RList<String> activeList = redisClient.getList(activeKey);
for (int i = 0; i < commandIds.size(); i++) {
Map<String, String> fields = readFutures.get(i).toCompletableFuture().join();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RBatch.execute() returns a BatchResult whose getResponses() gives the results in the same order the commands were queued. So you can replace the futures pattern with:

BatchResult<?> batchResult = readBatch.execute();
List<?> responses = batchResult.getResponses();

for (int i = 0; i < commandIds.size(); i++) {
    @SuppressWarnings("unchecked")
    Map<String, String> fields = (Map<String, String>) responses.get(i);
    // ... rest of loop
}

This eliminates the RFuture list and the toCompletableFuture().join() calls entirely. The batch is synchronous here anyway (you call execute() and block), so the futures aren't buying you anything.

Comment on lines +15 to +16
JsonNode result,
JsonNode error,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we want to add here a validation that at least one of these is not null?

Comment thread sdks/python/src/opik/cli/connect.py
Comment on lines +127 to +135
def backoff_wait(
shutdown_event: threading.Event, backoff: float, cap: float = 30.0
) -> None:
"""Sleep with jitter, interruptible by the shutdown event.

Waits between 50-100% of the backoff value, capped at ``cap`` seconds.
"""
wait = min(backoff, cap) * (0.5 + random.random() * 0.5)
shutdown_event.wait(wait)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be "common" and is used only in the bridge loop. Was it meant to be reused in in_process_loop?

Comment thread sdks/python/src/opik/runner/bridge_handlers/exec_command.py
Comment on lines +22 to +28
class FileMutationQueue:
"""Per-file lock keyed by realpath. Serializes writes to the same file."""

def __init__(self) -> None:
self._locks: Dict[str, threading.Lock] = {}
self._meta_lock = threading.Lock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would FileMutationRegistry or FileLockRegistry be a better fit here? Having a queue in the name implies we keep track of mutations in this class, which is not the case.

Comment on lines +8 to +9
@patch.dict("os.environ", {"OPIK_SUPERVISED": "true"})
def test_supervised__skips_heartbeat_thread(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow why do we want to skip the heartbeat when supervised?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Backend dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation java Pull requests that update Java code Python SDK python Pull requests that update Python code tests Including test files, or tests related like configuration. TypeScript SDK typescript *.ts *.tsx

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants