[OPIK-4383] [BE] Add ExperimentAggregationPublisher, ExperimentDenormalizationJob and tests#5511
Merged
thiagohora merged 76 commits intomainfrom Mar 12, 2026
Conversation
- Add experiment_aggregates and experiment_item_aggregates tables - Implement ExperimentAggregatesDAO with population and query methods - Add ExperimentAggregatesService for aggregation management - Refactor DTOs into organized model classes: - ExperimentAggregatesModel: aggregation results - ExperimentEntityData: entity models - ExperimentSourceData: raw source data - ExperimentAggregatesUtils: utilities - Add FEEDBACK_SCORES_AGGREGATED filter strategy for map-based filtering - Add comprehensive integration tests (10/10 passing) - Configure batch size and parallelism settings
…taset operations Problem: - MySQL deadlock on dataset_version_tags composite PRIMARY KEY (workspace_id, dataset_id, tag) - Occurred during parallel dataset creation in same workspace - Multiple threads inserting "latest" tag for different datasets caused lock contention - Experiments with parallel execution were failing with MySQLTransactionRollbackException Solution: - Add handleOnDeadLocks() method in RetryUtils with: - 5 retry attempts with exponential backoff (250ms to 2s) - 0.5 jitter to reduce thundering herd effect - Recursive isDatabaseDeadlock() detection for MySQLTransactionRollbackException - Apply retry logic in DatasetItemService.setDatasetItemVersion() - Enables concurrent dataset creation for same workspace Impact: - Supports parallel experiment execution with proper deadlock handling - Test success rate improved from 0/10 to 10/10 in ExperimentAggregatesIntegrationTest
Fixed 11 automated review comments from baz-reviewer: CRITICAL fixes: - Prevent NPE on null span aggregations by adding coalesce() in SQL - Handle multi-project experiments with LIMIT 1 in GET_PROJECT_ID - Handle zero-item experiments with empty aggregation helpers - Bind feedback_scores_percentiles map instead of empty CAST HIGH priority fixes: - Use toDecimal128(12) instead of toDecimal64(9) for cost percentiles - Add null-safe tags handling with Optional.ofNullable() - Include exception objects in retry logging for stack traces MEDIUM priority fixes: - Add missing log_comment to SELECT_EXPERIMENT_BY_ID query - Add missing log_comment to GET_PROJECT_ID query LOW priority fixes: - Remove duplicate "id" binding in bindItemsParameters - Enhance batchSize config documentation with details All 11 integration tests passing.
…nd Optional patterns - Add missing imports for IntStream, ProjectStats, and other dependencies - Replace fully-qualified class names with proper imports across DAO and Service classes - Fix IS_NOT_EMPTY filter handling for FEEDBACK_SCORES_AGGREGATED strategies - Refactor null checks to use Optional in mapping methods: - mapFeedbackScoreAggregations, mapExperimentFromAggregates - mapFeedbackScoreData, mapExperimentGroupAggregationItem - Batch insert preparation with Optional chains - Improve code readability and maintainability with functional patterns
…ce methods - Fix tags NPE in ExperimentAggregatesDAO with defaultIfNull - Remove unnecessary FINAL clause from GET_EXPERIMENT_DATA query - Fix test naming in ExperimentAggregatesIntegrationTest - Consolidate 7 duplicate createVersionFromDelta methods into single canonical implementation - Remove debug logger from config-test.yml
…ia binding - Fix SELECT_EXPERIMENT_BY_ID to properly render log_comment metadata - Use getSTWithLogComment pattern in getExperimentFromAggregates - Ensures ClickHouse query logging populates workspace/experiment IDs - Centralize ExperimentSearchCriteria binding logic - Create ExperimentSearchCriteriaBinder utility class - Parameterize filter strategies to support both DAO variants - Eliminate 29-line duplication between ExperimentDAO and ExperimentAggregatesDAO - Single source of truth prevents DAOs from getting out of sync
- Update canonical method signature to include new parameters: - List<EvaluatorItem> evaluators - ExecutionPolicy executionPolicy - boolean clearExecutionPolicy - Update all 5 caller sites to pass new parameters: - Use changes.evaluators(), changes.executionPolicy(), changes.clearExecutionPolicy() when available - Pass null/false for auto-generated versions that inherit from base - Add imports for EvaluatorItem and ExecutionPolicy Fixes compilation errors introduced by rebase with upstream changes to DatasetVersionService
…act constants, remove DAO logging - Fixed BigDecimal[] to Double[] conversion for experiment_scores (matches ClickHouse Float64) - Extracted FilterStrategy lists to static final constants to avoid repeated allocations - Added @nonnull validation to populateExperimentAggregate parameter - Removed DAO layer logging, keeping service-level logging only
Extract streamWithExperimentPagination() helper method to eliminate duplication in getTracesData(), getSpansData(), and getFeedbackScoresData(). All three methods followed identical pattern: - asyncTemplate.stream with connection - getSTWithLogComment with cursor flag - Bind workspace_id, experiment_id, project_id, limit - Optional cursor binding - Result mapping Benefits: - Single source of truth for pagination binding logic - Prevents divergence when tweaking cursor/limit bindings - Reduces code from ~20 lines to ~10 lines per method - Type-safe generic implementation Note: CTE redundancy (3x experiment_items scan) is intentional to avoid passing large trace ID lists as parameters, which would cause performance issues with 10K+ traces.
…iagohora/OPIK-4382_metrics_computation_service
… aggregates recomputation - Add ExperimentDenormalizationConfig implementing StreamConfiguration with debounce, job lock, and per-experiment aggregation lock settings - Add ExperimentAggregationMessage as stream message record - Add ExperimentAggregatesSubscriber consuming from the denormalization stream; acquires a workspace-scoped distributed lock per experiment before calling populateAggregations() - Add experimentDenormalizationEnabled feature flag to ServiceTogglesConfig and FeatureFlags - Wire ExperimentDenormalizationConfig into OpikConfiguration - Update config.yml and config-test.yml with full experimentDenormalization block (enabled for tests) - Add ExperimentAggregatesSubscriberTest covering lifecycle gating and processEvent success/error paths
… rename tests - ExperimentDenormalizationConfig: add sensible defaults to all fields so Dropwizard validation doesn't fail when the config block is absent from old deployments (config.isEnabled()=false still gates the subscriber) - Remove experimentDenormalizationEnabled service toggle from ServiceTogglesConfig, FeatureFlags, config.yml and config-test.yml - the infrastructure gate (config.isEnabled()) is the single control point - Rename lifecycle test methods to camelCase per project conventions: startSkipsStartupWhenDisabled / stopSkipsShutdownWhenDisabled
…bility, redundant IN subquery, hardcoded context keys, Instant.now in loop, and inline defaultIfNull
…ExperimentAggregate
Operational logs belong in the service layer, not the DAO.
Removes the uncapped public batch size entry point. All callers now go through the public no-arg overload which reads batchSize safely from config.
…es pipeline - Add ClickHouse migration (000062) to add evaluation_method column to experiment_aggregates table - Add evaluationMethod field to ExperimentData record - Update GET_EXPERIMENT_DATA query to read evaluation_method from experiments - Update INSERT_EXPERIMENT_AGGREGATE to write evaluation_method to experiment_aggregates - Update SELECT_EXPERIMENT_BY_ID to read evaluation_method from experiment_aggregates - Fix Experiment record constructor call: insert EvaluationMethod at correct position (10)
…ries Reduce copy-paste in getTraceAggregations, getSpanAggregations, and getFeedbackScoreAggregations by extracting queryExperimentAggregation, which centralises the context-aware execution, workspace/experiment/project parameter binding, and singleOrEmpty pattern shared by all three methods.
…ameter Add @nonnull to ExperimentSearchCriteria in the interface and implementation so that a null argument fails fast with an explicit NullPointerException at the DAO boundary instead of crashing deep inside buildCountTemplate.
… path target_project_ids was only applied inside the project_deleted LEFT JOIN subquery; the main WHERE had no project restriction, so counts were workspace-wide. Reuse has_target_projects in the main WHERE so project_id IN :target_project_ids always takes effect. Also replace manual null/empty checks with CollectionUtils.isNotEmpty.
Contributor
SDK E2E Tests Results0 tests 0 ✅ 0s ⏱️ Results for commit 1fecbac. |
Contributor
SDK E2E Tests Results0 tests 0 ✅ 0s ⏱️ Results for commit 3a74920. ♻️ This comment has been updated with latest results. |
…t config The dropwizard-jobs framework uses WordUtils.uncapitalize(class.getSimpleName()) to look up the interval in the jobs map, so the key must be 'experimentDenormalizationJob' (lowercase first letter). Also adds the missing jobs section and jobBatchSize to config-test.yml.
Remove @every from ExperimentDenormalizationJob and schedule it programmatically in OpikGuiceyLifecycleEventListener, following the same pattern as TraceThreadsClosingJob. Add jobInterval config field to ExperimentDenormalizationConfig. Remove the jobs YAML section that caused deserialization errors with JobConfiguration's immutable map.
- Include experimentId and workspaceId in onExperimentUpdated error log - Extract publishIfNotEmpty helper to deduplicate filter+publish logic across triggerByExperimentIds, triggerByTraceIds, triggerBySpanIds
Stub publisher.publish() to return Mono.empty() in setUp so .subscribe() calls in production code don't NPE on null.
…AO methods Move operational logging responsibility to the service layer, consistent with earlier fixes for ExperimentItemDAO and SpanDAO in this PR.
…tener' of https://github.com/comet-ml/opik into thiagohora/OPIK-4383_add_experiment_aggregate_event_listener
These files were introduced during merge resolution but should not be part of the branch.
…alize guard+publish flow
…DAO BULK_UPDATE Restores proper tag handling in SpanDAO.BULK_UPDATE query that was regressed to a simple arrayConcat. Now uses TagOperations.tagUpdateFragment() which provides arrayDistinct(), tag limit enforcement (max 50), and tags_to_add/tags_to_remove support. Also adds the required short_circuit_function_evaluation SETTINGS for throwIf evaluation.
…tener' into thiagohora/OPIK-4383_add_experiment_aggregation_publisher_job_tests
Contributor
Contributor
Contributor
Base automatically changed from
thiagohora/OPIK-4383_add_experiment_aggregate_event_listener
to
main
March 11, 2026 10:43
…ion_publisher_job_tests
JetoPistola
reviewed
Mar 11, 2026
Contributor
|
👋 Review summary What looks good
Overall Inline comments: 1 blocker (pagination loop), 2 suggestions (defensive parsing, env var prefix), 1 nit (log levels) 🤖 Review posted via /review-github-pr |
- Add batchSize-capped iteration counter to expand() to prevent infinite loops when ZSET entries fail to be removed - Rename EXPERIMENT_DENORM_JOB_INTERVAL to OPIK_EXPERIMENT_DENORM_JOB_INTERVAL to follow the OPIK_ prefix convention
…ion_publisher_job_tests
BorisTkachenko
approved these changes
Mar 12, 2026
Contributor
BorisTkachenko
left a comment
There was a problem hiding this comment.
Looks good, one minor comment.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Details
Implements the debounce-based experiment aggregation trigger mechanism:
ExperimentAggregationPublisherworkspaceId:experimentIdcompound members to a Redis ZSET scored bynow + debounceDelay(sliding window debounce)userNamein a per-experiment hash bucket with TTL = 2× debounceDelayExperimentDenormalizationJob@Every("5s")+@DisallowConcurrentExecutionDropwizard jobExperimentAggregationMessageto the Redis stream, deletes the hash bucket, removes the ZSET entryflatMapnow returnsMono<Boolean>(notMono<Void>) soswitchIfEmptyis only triggered when the bucket is truly absentChange checklist
Issues
Testing
ExperimentAggregationPublisherTest(integration, real Redis container):workspaceId:experimentIdmember after publishuserNameExperimentDenormalizationJobTest(unit, Mockito):Documentation
No user-facing documentation changes. Internal Redis-based debounce mechanism for experiment aggregation recalculation triggers.