Conversation
|
🌿 Preview your docs: https://opik-preview-ea958085-5ff9-4dc9-a19e-dd326f1af0ac.docs.buildwithfern.com/docs/opik The following broken links were found: Page: https://opik-preview-ea958085-5ff9-4dc9-a19e-dd326f1af0ac.docs.buildwithfern.com/docs/opik/integrations/harbor Page: https://opik-preview-ea958085-5ff9-4dc9-a19e-dd326f1af0ac.docs.buildwithfern.com/docs/opik/integrations/harbor/ 📌 Results for commit f8b4aee |
Backend Tests - Integration Group 12251 tests 248 ✅ 3m 5s ⏱️ Results for commit cc67a16. ♻️ This comment has been updated with latest results. |
Backend Tests - Integration Group 15235 tests 233 ✅ 3m 39s ⏱️ Results for commit c6e0bb1. ♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.14)361 tests ±0 359 ✅ ±0 14m 4s ⏱️ -9s Results for commit 4a10f06. ± Comparison against base commit d295a1e. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.12)361 tests ±0 359 ✅ ±0 14m 12s ⏱️ -35s Results for commit 4a10f06. ± Comparison against base commit d295a1e. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.13)361 tests ±0 359 ✅ ±0 14m 12s ⏱️ -36s Results for commit 4a10f06. ± Comparison against base commit d295a1e. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.11)361 tests ±0 359 ✅ ±0 14m 20s ⏱️ -29s Results for commit 4a10f06. ± Comparison against base commit d295a1e. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Python SDK E2E Tests Results (Python 3.10)361 tests ±0 359 ✅ ±0 14m 4s ⏱️ -18s Results for commit 4a10f06. ± Comparison against base commit d295a1e. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
Nimrod007
left a comment
There was a problem hiding this comment.
Backend Review
1. N+1 Redis reads in nextBridgeCommands (non-blocking)
In LocalRunnerServiceImpl.nextBridgeCommands (~lines 1397-1415): the status batch correctly updates all commands in one round-trip, but then each command is read individually with readAllMap() — that's N extra Redis calls. Since you already have the command IDs and just wrote the status, you could read all commands in the same batch (or a second batch) to cut this to 2 round-trips total instead of N+2.
Not blocking, but worth considering if bridge usage scales — at maxCommands=20 this is 22 Redis calls where 2-3 would do.
2. Missing workspace validation on bridge commands (blocking)
loadValidatedBridgeCommand validates that runnerId matches the command, but doesn't check that the command's workspace_id matches the caller's workspace. The runner ownership check covers it indirectly today (a runner belongs to one workspace), but for defense-in-depth we should validate workspaceId.equals(fields.get(BRIDGE_FIELD_WORKSPACE_ID)) here — same pattern as loadValidatedJob does for jobs.
This applies to all callers: reportBridgeCommandResult, getBridgeCommand, and awaitBridgeCommand. The method signature already receives runnerId but should also take workspaceId and enforce the check.
Nimrod007
left a comment
There was a problem hiding this comment.
Update on review comment #2 (workspace validation)
Softening my earlier "request changes" on the workspace validation point. Looking more closely:
The caller's workspace is validated indirectly through the chain: requestContext.getWorkspaceId() → validateRunnerOwnership(runnerId, workspaceId, userName) → loadValidatedBridgeCommand(runnerId, commandId). Since a runner is always scoped to one workspace, this holds today.
Adding workspaceId.equals(fields.get(BRIDGE_FIELD_WORKSPACE_ID)) in loadValidatedBridgeCommand would be a nice defense-in-depth consistency with loadValidatedJob, but it's not an active vulnerability. Consider it a suggestion, not a blocker.
Nimrod007
left a comment
There was a problem hiding this comment.
Approving — the workspace validation concern is just a suggestion for defense-in-depth, not a blocker. The N+1 Redis reads point is also non-blocking.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
StringRedisClient.getList already applies StringCodec.INSTANCE internally, so passing it as a second argument causes a compilation error. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
petrotiurin
left a comment
There was a problem hiding this comment.
Will continue the review shortly.
| ) | ||
|
|
||
| LOGGER.info("Runner activated") | ||
| if os.environ.get("OPIK_SUPERVISED") != "true": |
There was a problem hiding this comment.
nit: we already have supervised local variable that indicates if we're supervised or not, let's use it.
| _DEFAULT_COMMAND_TIMEOUT = 30.0 | ||
| _MIN_COMMAND_TIMEOUT = 1.0 | ||
| _MAX_COMMAND_TIMEOUT = 300.0 |
There was a problem hiding this comment.
Let's add units to those timeouts so it's obvious what they represent. e.g. _MAX_COMMAND_TIMEOUT_SECONDS
| inflight: Set[Future] = set() | ||
| inflight_lock = threading.Lock() |
There was a problem hiding this comment.
Do we need to keep track of the futures currently in flight? Can we rely only on semaphore for backpressure? We're not using them anywhere and it just adds additional locking/unlocking.
| def _poll(self) -> List[BridgeCommandItem]: | ||
| resp = self._api.runners.next_bridge_commands( | ||
| self._runner_id, | ||
| max_commands=10, |
There was a problem hiding this comment.
Let's make max_commands a constant as well.
| on_command_start: Optional[Any] = None, | ||
| on_command_end: Optional[Any] = None, |
There was a problem hiding this comment.
This should be at least Optional[Callable], even better with the types it expects, since we rely on a specific interface in L143
| if os.environ.get("OPIK_SUPERVISED") != "true": | ||
| heartbeat_thread = threading.Thread( | ||
| target=self._heartbeat_loop, | ||
| daemon=True, | ||
| ) | ||
| heartbeat_thread.start() |
There was a problem hiding this comment.
Why do we need this env var? Would we ever run the runner unsupervised?
| _ENTRYPOINT_PATTERNS = [ | ||
| re.compile(r"entrypoint\s*=\s*True"), | ||
| ] |
There was a problem hiding this comment.
for typescript, boolean true is lowercased, true, but follows the same pattern. We should add entrypoint:\s*true here.
| _CONFIGURATION_PATTERNS = [ | ||
| re.compile(r"AgentConfig"), | ||
| ] |
There was a problem hiding this comment.
This won't work for typescript as we don't use the same inheritance pattern there. Perhaps it's more straight-forward to check for presence of getAgentConfigVersion/get_agent_config_version calls here?
| _HEARTBEAT_INTERVAL = 5.0 | ||
| _GRACEFUL_TIMEOUT = 10 |
There was a problem hiding this comment.
Let's add _SECONDS suffix if this is seconds.
There was a problem hiding this comment.
Can we add some e2e tests here? Just the happy paths so we know all the components work as expected.
BorisTkachenko
left a comment
There was a problem hiding this comment.
@collincunn Left some BE related comments
| } | ||
|
|
||
| return BridgeCommandBatchResponse.builder().commands(items).build(); | ||
| }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic())) |
There was a problem hiding this comment.
Don't use fully qualified Objects, add import
| if (depth > DEEP_MERGE_MAX_DEPTH || !base.isObject() || !override.isObject()) { | ||
| return override; | ||
| } | ||
| com.fasterxml.jackson.databind.node.ObjectNode result = base.deepCopy(); |
There was a problem hiding this comment.
Don't use fully qualified Objects, plz add import for it
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName); | ||
| List<String> capabilities = body != null ? body.capabilities() : null; |
There was a problem hiding this comment.
Not needed here, you are still checking for null in the service layer.
| @ApiResponse(responseCode = "404", description = "Runner not found or not connected", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "409", description = "Runner does not support bridge", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "429", description = "Too many requests", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public Response submitBridgeCommand(@PathParam("runnerId") UUID runnerId, |
There was a problem hiding this comment.
To match existing codebase would be better to rename to createBridgeCommand
| RBatch readBatch = redisClient.createBatch(); | ||
| List<RFuture<Map<String, String>>> readFutures = new ArrayList<>(commandIds.size()); | ||
| for (String cmdIdStr : commandIds) { | ||
| UUID commandId = UUID.fromString(cmdIdStr); | ||
| readFutures.add(readBatch.<String, String>getMap( | ||
| bridgeCommandKey(commandId), StringCodec.INSTANCE).readAllMapAsync()); | ||
| } | ||
| readBatch.execute(); | ||
|
|
||
| List<String> liveCommandIds = new ArrayList<>(); | ||
| List<Map<String, String>> liveFields = new ArrayList<>(); | ||
| RList<String> activeList = redisClient.getList(activeKey); | ||
| for (int i = 0; i < commandIds.size(); i++) { | ||
| Map<String, String> fields = readFutures.get(i).toCompletableFuture().join(); |
There was a problem hiding this comment.
RBatch.execute() returns a BatchResult whose getResponses() gives the results in the same order the commands were queued. So you can replace the futures pattern with:
BatchResult<?> batchResult = readBatch.execute();
List<?> responses = batchResult.getResponses();
for (int i = 0; i < commandIds.size(); i++) {
@SuppressWarnings("unchecked")
Map<String, String> fields = (Map<String, String>) responses.get(i);
// ... rest of loop
}
This eliminates the RFuture list and the toCompletableFuture().join() calls entirely. The batch is synchronous here anyway (you call execute() and block), so the futures aren't buying you anything.
| JsonNode result, | ||
| JsonNode error, |
There was a problem hiding this comment.
Probably we want to add here a validation that at least one of these is not null?
| def backoff_wait( | ||
| shutdown_event: threading.Event, backoff: float, cap: float = 30.0 | ||
| ) -> None: | ||
| """Sleep with jitter, interruptible by the shutdown event. | ||
|
|
||
| Waits between 50-100% of the backoff value, capped at ``cap`` seconds. | ||
| """ | ||
| wait = min(backoff, cap) * (0.5 + random.random() * 0.5) | ||
| shutdown_event.wait(wait) |
There was a problem hiding this comment.
This does not seem to be "common" and is used only in the bridge loop. Was it meant to be reused in in_process_loop?
| class FileMutationQueue: | ||
| """Per-file lock keyed by realpath. Serializes writes to the same file.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._locks: Dict[str, threading.Lock] = {} | ||
| self._meta_lock = threading.Lock() | ||
|
|
There was a problem hiding this comment.
Would FileMutationRegistry or FileLockRegistry be a better fit here? Having a queue in the name implies we keep track of mutations in this class, which is not the case.
| @patch.dict("os.environ", {"OPIK_SUPERVISED": "true"}) | ||
| def test_supervised__skips_heartbeat_thread(self) -> None: |
There was a problem hiding this comment.
Not sure I follow why do we want to skip the heartbeat when supervised?
Details
Read docs
Change checklist
Issues
Testing
Documentation