Skip to content

[OPIK-4383] [BE] Add ExperimentAggregationPublisher, ExperimentDenormalizationJob and tests#5511

Merged
thiagohora merged 76 commits intomainfrom
thiagohora/OPIK-4383_add_experiment_aggregation_publisher_job_tests
Mar 12, 2026
Merged

[OPIK-4383] [BE] Add ExperimentAggregationPublisher, ExperimentDenormalizationJob and tests#5511
thiagohora merged 76 commits intomainfrom
thiagohora/OPIK-4383_add_experiment_aggregation_publisher_job_tests

Conversation

@thiagohora
Copy link
Copy Markdown
Contributor

Details

Implements the debounce-based experiment aggregation trigger mechanism:

ExperimentAggregationPublisher

  • Writes workspaceId:experimentId compound members to a Redis ZSET scored by now + debounceDelay (sliding window debounce)
  • Stores userName in a per-experiment hash bucket with TTL = 2× debounceDelay
  • Multiple publishes for the same experiment+workspace pair update the score (one ZSET entry maintained)
  • Workspace isolation: different workspaces with the same experiment UUID get separate ZSET entries

ExperimentDenormalizationJob

  • @Every("5s") + @DisallowConcurrentExecution Dropwizard job
  • Queries ZSET for members with score ≤ now (debounce window elapsed)
  • For each ready experiment: publishes ExperimentAggregationMessage to the Redis stream, deletes the hash bucket, removes the ZSET entry
  • Stale entry handling: if the hash bucket has already expired, removes the orphaned ZSET entry without publishing
  • Bug fix in reactive chain: flatMap now returns Mono<Boolean> (not Mono<Void>) so switchIfEmpty is only triggered when the bucket is truly absent

Change checklist

  • User facing
  • Documentation update

Issues

  • OPIK-4383

Testing

ExperimentAggregationPublisherTest (integration, real Redis container):

  • ZSET contains compound workspaceId:experimentId member after publish
  • ZSET score is set to a future timestamp (now + debounceDelay)
  • Hash bucket stores userName
  • Hash bucket TTL is positive and ≤ 2× debounceDelay
  • Disabled config skips all publishing
  • Empty experimentIds skips publishing
  • Same experimentId in different workspaces creates separate ZSET entries
  • Multiple publishes for the same experiment keep a single ZSET entry with updated score

ExperimentDenormalizationJobTest (unit, Mockito):

  • Disabled config skips processing
  • Lock not acquired skips Redis operations
  • Empty ZSET completes without processing
  • Happy path: publishes stream message, deletes hash bucket, removes ZSET entry
  • Stale entry (expired hash): removes ZSET entry without publishing
  • Batch: all ready experiments processed in a single job run

Documentation

No user-facing documentation changes. Internal Redis-based debounce mechanism for experiment aggregation recalculation triggers.

- Add experiment_aggregates and experiment_item_aggregates tables
- Implement ExperimentAggregatesDAO with population and query methods
- Add ExperimentAggregatesService for aggregation management
- Refactor DTOs into organized model classes:
  - ExperimentAggregatesModel: aggregation results
  - ExperimentEntityData: entity models
  - ExperimentSourceData: raw source data
  - ExperimentAggregatesUtils: utilities
- Add FEEDBACK_SCORES_AGGREGATED filter strategy for map-based filtering
- Add comprehensive integration tests (10/10 passing)
- Configure batch size and parallelism settings
…taset operations

Problem:
- MySQL deadlock on dataset_version_tags composite PRIMARY KEY (workspace_id, dataset_id, tag)
- Occurred during parallel dataset creation in same workspace
- Multiple threads inserting "latest" tag for different datasets caused lock contention
- Experiments with parallel execution were failing with MySQLTransactionRollbackException

Solution:
- Add handleOnDeadLocks() method in RetryUtils with:
  - 5 retry attempts with exponential backoff (250ms to 2s)
  - 0.5 jitter to reduce thundering herd effect
  - Recursive isDatabaseDeadlock() detection for MySQLTransactionRollbackException
- Apply retry logic in DatasetItemService.setDatasetItemVersion()
- Enables concurrent dataset creation for same workspace

Impact:
- Supports parallel experiment execution with proper deadlock handling
- Test success rate improved from 0/10 to 10/10 in ExperimentAggregatesIntegrationTest
Fixed 11 automated review comments from baz-reviewer:

CRITICAL fixes:
- Prevent NPE on null span aggregations by adding coalesce() in SQL
- Handle multi-project experiments with LIMIT 1 in GET_PROJECT_ID
- Handle zero-item experiments with empty aggregation helpers
- Bind feedback_scores_percentiles map instead of empty CAST

HIGH priority fixes:
- Use toDecimal128(12) instead of toDecimal64(9) for cost percentiles
- Add null-safe tags handling with Optional.ofNullable()
- Include exception objects in retry logging for stack traces

MEDIUM priority fixes:
- Add missing log_comment to SELECT_EXPERIMENT_BY_ID query
- Add missing log_comment to GET_PROJECT_ID query

LOW priority fixes:
- Remove duplicate "id" binding in bindItemsParameters
- Enhance batchSize config documentation with details

All 11 integration tests passing.
…nd Optional patterns

- Add missing imports for IntStream, ProjectStats, and other dependencies
- Replace fully-qualified class names with proper imports across DAO and Service classes
- Fix IS_NOT_EMPTY filter handling for FEEDBACK_SCORES_AGGREGATED strategies
- Refactor null checks to use Optional in mapping methods:
  - mapFeedbackScoreAggregations, mapExperimentFromAggregates
  - mapFeedbackScoreData, mapExperimentGroupAggregationItem
  - Batch insert preparation with Optional chains
- Improve code readability and maintainability with functional patterns
…ce methods

- Fix tags NPE in ExperimentAggregatesDAO with defaultIfNull
- Remove unnecessary FINAL clause from GET_EXPERIMENT_DATA query
- Fix test naming in ExperimentAggregatesIntegrationTest
- Consolidate 7 duplicate createVersionFromDelta methods into single canonical implementation
- Remove debug logger from config-test.yml
…ia binding

- Fix SELECT_EXPERIMENT_BY_ID to properly render log_comment metadata
  - Use getSTWithLogComment pattern in getExperimentFromAggregates
  - Ensures ClickHouse query logging populates workspace/experiment IDs

- Centralize ExperimentSearchCriteria binding logic
  - Create ExperimentSearchCriteriaBinder utility class
  - Parameterize filter strategies to support both DAO variants
  - Eliminate 29-line duplication between ExperimentDAO and ExperimentAggregatesDAO
  - Single source of truth prevents DAOs from getting out of sync
- Update canonical method signature to include new parameters:
  - List<EvaluatorItem> evaluators
  - ExecutionPolicy executionPolicy
  - boolean clearExecutionPolicy

- Update all 5 caller sites to pass new parameters:
  - Use changes.evaluators(), changes.executionPolicy(), changes.clearExecutionPolicy() when available
  - Pass null/false for auto-generated versions that inherit from base

- Add imports for EvaluatorItem and ExecutionPolicy

Fixes compilation errors introduced by rebase with upstream changes to DatasetVersionService
…act constants, remove DAO logging

- Fixed BigDecimal[] to Double[] conversion for experiment_scores (matches ClickHouse Float64)
- Extracted FilterStrategy lists to static final constants to avoid repeated allocations
- Added @nonnull validation to populateExperimentAggregate parameter
- Removed DAO layer logging, keeping service-level logging only
Extract streamWithExperimentPagination() helper method to eliminate
duplication in getTracesData(), getSpansData(), and getFeedbackScoresData().

All three methods followed identical pattern:
- asyncTemplate.stream with connection
- getSTWithLogComment with cursor flag
- Bind workspace_id, experiment_id, project_id, limit
- Optional cursor binding
- Result mapping

Benefits:
- Single source of truth for pagination binding logic
- Prevents divergence when tweaking cursor/limit bindings
- Reduces code from ~20 lines to ~10 lines per method
- Type-safe generic implementation

Note: CTE redundancy (3x experiment_items scan) is intentional to avoid
passing large trace ID lists as parameters, which would cause performance
issues with 10K+ traces.
…iagohora/OPIK-4382_metrics_computation_service
… aggregates recomputation

- Add ExperimentDenormalizationConfig implementing StreamConfiguration with debounce, job lock, and per-experiment aggregation lock settings
- Add ExperimentAggregationMessage as stream message record
- Add ExperimentAggregatesSubscriber consuming from the denormalization stream; acquires a workspace-scoped distributed lock per experiment before calling populateAggregations()
- Add experimentDenormalizationEnabled feature flag to ServiceTogglesConfig and FeatureFlags
- Wire ExperimentDenormalizationConfig into OpikConfiguration
- Update config.yml and config-test.yml with full experimentDenormalization block (enabled for tests)
- Add ExperimentAggregatesSubscriberTest covering lifecycle gating and processEvent success/error paths
… rename tests

- ExperimentDenormalizationConfig: add sensible defaults to all fields so
  Dropwizard validation doesn't fail when the config block is absent from
  old deployments (config.isEnabled()=false still gates the subscriber)
- Remove experimentDenormalizationEnabled service toggle from
  ServiceTogglesConfig, FeatureFlags, config.yml and config-test.yml -
  the infrastructure gate (config.isEnabled()) is the single control point
- Rename lifecycle test methods to camelCase per project conventions:
  startSkipsStartupWhenDisabled / stopSkipsShutdownWhenDisabled
…bility, redundant IN subquery, hardcoded context keys, Instant.now in loop, and inline defaultIfNull
- #7: Remove "Used for testing and verification" from getExperimentFromAggregates javadoc
- #8: Replace recursive flatMap with Mono.expand() in populateExperimentItemsInBatches
- #10: Remove unrelated subscribeOn addition from DatasetItemService.createVersionFromDelta
Operational logs belong in the service layer, not the DAO.
Removes the uncapped public batch size entry point. All callers now go
through the public no-arg overload which reads batchSize safely from config.
…es pipeline

- Add ClickHouse migration (000062) to add evaluation_method column to experiment_aggregates table
- Add evaluationMethod field to ExperimentData record
- Update GET_EXPERIMENT_DATA query to read evaluation_method from experiments
- Update INSERT_EXPERIMENT_AGGREGATE to write evaluation_method to experiment_aggregates
- Update SELECT_EXPERIMENT_BY_ID to read evaluation_method from experiment_aggregates
- Fix Experiment record constructor call: insert EvaluationMethod at correct position (10)
…ries

Reduce copy-paste in getTraceAggregations, getSpanAggregations, and
getFeedbackScoreAggregations by extracting queryExperimentAggregation,
which centralises the context-aware execution, workspace/experiment/project
parameter binding, and singleOrEmpty pattern shared by all three methods.
…ameter

Add @nonnull to ExperimentSearchCriteria in the interface and implementation
so that a null argument fails fast with an explicit NullPointerException at
the DAO boundary instead of crashing deep inside buildCountTemplate.
… path

target_project_ids was only applied inside the project_deleted LEFT JOIN
subquery; the main WHERE had no project restriction, so counts were
workspace-wide. Reuse has_target_projects in the main WHERE so
project_id IN :target_project_ids always takes effect. Also replace
manual null/empty checks with CollectionUtils.isNotEmpty.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 5, 2026

SDK E2E Tests Results

0 tests   0 ✅  0s ⏱️
0 suites  0 💤
0 files    0 ❌

Results for commit 1fecbac.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 5, 2026

SDK E2E Tests Results

0 tests   0 ✅  0s ⏱️
0 suites  0 💤
0 files    0 ❌

Results for commit 3a74920.

♻️ This comment has been updated with latest results.

…t config

The dropwizard-jobs framework uses WordUtils.uncapitalize(class.getSimpleName())
to look up the interval in the jobs map, so the key must be
'experimentDenormalizationJob' (lowercase first letter). Also adds the missing
jobs section and jobBatchSize to config-test.yml.
Remove @every from ExperimentDenormalizationJob and schedule it
programmatically in OpikGuiceyLifecycleEventListener, following the
same pattern as TraceThreadsClosingJob. Add jobInterval config field
to ExperimentDenormalizationConfig. Remove the jobs YAML section that
caused deserialization errors with JobConfiguration's immutable map.
- Include experimentId and workspaceId in onExperimentUpdated error log
- Extract publishIfNotEmpty helper to deduplicate filter+publish logic
  across triggerByExperimentIds, triggerByTraceIds, triggerBySpanIds
Stub publisher.publish() to return Mono.empty() in setUp so
.subscribe() calls in production code don't NPE on null.
…AO methods

Move operational logging responsibility to the service layer, consistent
with earlier fixes for ExperimentItemDAO and SpanDAO in this PR.
…tener' of https://github.com/comet-ml/opik into thiagohora/OPIK-4383_add_experiment_aggregate_event_listener
These files were introduced during merge resolution but should
not be part of the branch.
…DAO BULK_UPDATE

Restores proper tag handling in SpanDAO.BULK_UPDATE query that was
regressed to a simple arrayConcat. Now uses TagOperations.tagUpdateFragment()
which provides arrayDistinct(), tag limit enforcement (max 50), and
tags_to_add/tags_to_remove support. Also adds the required
short_circuit_function_evaluation SETTINGS for throwIf evaluation.
…tener' into thiagohora/OPIK-4383_add_experiment_aggregation_publisher_job_tests
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 10, 2026

TS SDK E2E Tests - Node 18

236 tests  ±0   234 ✅ ±0   23m 36s ⏱️ + 1m 3s
 25 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit c4b493d. ± Comparison against base commit e69cbd7.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 10, 2026

TS SDK E2E Tests - Node 20

236 tests  ±0   234 ✅ ±0   22m 11s ⏱️ - 1m 30s
 25 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit c4b493d. ± Comparison against base commit e69cbd7.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 10, 2026

TS SDK E2E Tests - Node 22

236 tests  ±0   234 ✅ ±0   21m 7s ⏱️ -45s
 25 suites ±0     2 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit c4b493d. ± Comparison against base commit e69cbd7.

♻️ This comment has been updated with latest results.

Base automatically changed from thiagohora/OPIK-4383_add_experiment_aggregate_event_listener to main March 11, 2026 10:43
Comment thread apps/opik-backend/config.yml
@JetoPistola
Copy link
Copy Markdown
Contributor

👋 Review summary

What looks good

  • Clean debounce design using ZSET scores for O(log N) range queries — avoids full keyspace scans
  • Workspace isolation via compound workspaceId:experimentId members is well thought out
  • Stale entry handling (expired hash bucket) is a nice defensive touch
  • Thorough test coverage: integration tests with real Redis for the publisher, well-structured Mockito tests for the job
  • Consistent with existing patterns (AlertJob's lock + subscribe structure)

Overall
The approach is solid — ZSET-based debounce with a periodic flush job is the right pattern for this. The main concern is the expand pagination logic which can loop infinitely when a batch is full, since downstream processing hasn't removed entries before the next page is fetched.

Inline comments: 1 blocker (pagination loop), 2 suggestions (defensive parsing, env var prefix), 1 nit (log levels)

🤖 Review posted via /review-github-pr

- Add batchSize-capped iteration counter to expand() to prevent
  infinite loops when ZSET entries fail to be removed
- Rename EXPERIMENT_DENORM_JOB_INTERVAL to OPIK_EXPERIMENT_DENORM_JOB_INTERVAL
  to follow the OPIK_ prefix convention
@thiagohora thiagohora requested a review from JetoPistola March 11, 2026 19:49
Copy link
Copy Markdown
Contributor

@BorisTkachenko BorisTkachenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, one minor comment.

@thiagohora thiagohora merged commit 820ce43 into main Mar 12, 2026
58 checks passed
@thiagohora thiagohora deleted the thiagohora/OPIK-4383_add_experiment_aggregation_publisher_job_tests branch March 12, 2026 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Backend java Pull requests that update Java code tests Including test files, or tests related like configuration.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants