Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
599e7cf
[OPIK-4380] [BE] Add experiment aggregates for denormalized metrics
thiagohora Feb 20, 2026
653e52c
[OPIK-4380] [BE] Add MySQL deadlock retry mechanism for concurrent da…
thiagohora Feb 20, 2026
672b5ce
Fix visibility
thiagohora Feb 20, 2026
19ad5e4
[OPIK-4380] [BE] Address PR review comments for experiment aggregates
thiagohora Feb 20, 2026
3978a0e
[OPIK-4382] [BE] Refactor experiment aggregates with import cleanup a…
thiagohora Feb 22, 2026
2a62ebb
[OPIK-4380] [BE] Fix table definition
thiagohora Feb 22, 2026
8dfbd70
[OPIK-4380] [BE] Address PR comments and consolidate DatasetItemServi…
thiagohora Feb 22, 2026
520bafb
Merge branch 'main' into thiaghora/OPIK-4380-experiment-denormalization
thiagohora Feb 22, 2026
14a49b1
[OPIK-4380] [BE] Fix missing log_comment and centralize search criter…
thiagohora Feb 22, 2026
032ad46
[OPIK-4380] [BE] Fix createVersionFromDelta consolidation after rebase
thiagohora Feb 22, 2026
c71235e
[OPIK-4380] [BE] Address PR review comments - fix type mismatch, extr…
thiagohora Feb 22, 2026
b14d687
[OPIK-4380] [BE] Extract shared helper for experiment data pagination
thiagohora Feb 22, 2026
48d79c0
Merge branch 'thiaghora/OPIK-4380-experiment-denormalization' into th…
thiagohora Feb 23, 2026
82fb51e
Merge branch 'main' into thiaghora/OPIK-4380-experiment-denormalization
thiagohora Feb 23, 2026
618986c
[OPIK-4383] [BE] Add Redis stream subscriber for debounced experiment…
thiagohora Feb 23, 2026
2f1796e
Revision 2: Address PR comments - add config defaults, remove toggle,…
thiagohora Feb 24, 2026
4c9f6fc
Revision 3: Add @Max(500) to consumerBatchSize and @NotNull to jobLoc…
thiagohora Feb 24, 2026
f679239
Merge branch 'main' into thiaghora/OPIK-4380-experiment-denormalization
thiagohora Feb 24, 2026
05c2f95
[OPIK-4380] [BE] Address PR review comments - fix TYPE_REFERENCE visi…
thiagohora Feb 26, 2026
350f49e
Revision 4: Address remaining JetoPistola review comments (#7, #8, #10)
thiagohora Feb 26, 2026
810fbd3
Revision 3: Add switchIfEmpty fallback for deleted traces in populate…
thiagohora Feb 26, 2026
5228408
Fix tests
thiagohora Feb 26, 2026
a11fdb0
Revision 6: Move countTotal log from DAO to service layer
thiagohora Mar 2, 2026
80369a1
Revision 7: Apply Spotless formatting
thiagohora Mar 2, 2026
f009442
Revision 8: Make populateAggregations(UUID, int) private
thiagohora Mar 2, 2026
441da52
Merge branch 'main' into thiaghora/OPIK-4380-experiment-denormalization
thiagohora Mar 2, 2026
12915c5
[OPIK-4380] [BE] Add evaluation_method support to experiment_aggregat…
thiagohora Mar 2, 2026
cf7c3ba
[OPIK-4380] [BE] Extract shared helper for experiment aggregation que…
thiagohora Mar 2, 2026
60a2ec3
[OPIK-4380] [BE] Enforce non-null contract on countTotal criteria par…
thiagohora Mar 2, 2026
db5f331
[OPIK-4380] [BE] Fix countTotal ignoring target project IDs in normal…
thiagohora Mar 2, 2026
2d27350
[OPIK-4380] [BE] Apply Spotless formatting
thiagohora Mar 2, 2026
b907350
Merge branch 'thiaghora/OPIK-4380-experiment-denormalization' into th…
thiagohora Mar 2, 2026
1315d56
Merge branch 'main' into thiagohora/OPIK-4382_metrics_computation_ser…
thiagohora Mar 2, 2026
82a69d1
Merge branch 'main' into thiagohora/OPIK-4382_metrics_computation_ser…
thiagohora Mar 2, 2026
230c45d
[OPIK-4382] [BE] Address PR review comments on experiment aggregates
thiagohora Mar 2, 2026
fad6915
Revision 3: Address PR comments E, F, G, H
thiagohora Mar 3, 2026
dadcabc
Revision 4: Fix ExperimentServiceTest to include ExperimentGroupEnric…
thiagohora Mar 3, 2026
791f62b
[OPIK-4382] [BE] Extract shared Row→ExperimentGroup mappers into Expe…
thiagohora Mar 3, 2026
11286ba
[OPIK-4382] [BE] Deduplicate bindGroupCriteria into ExperimentGroupMa…
thiagohora Mar 3, 2026
b7ba063
[OPIK-4382] [BE] Extract streamGroupQuery helper and fix null percent…
thiagohora Mar 3, 2026
1335553
[OPIK-4382] [BE] Consolidate cost/duration helpers into ExperimentGro…
thiagohora Mar 3, 2026
739477b
[OPIK-4382] [BE] Fix pagination count and add criteria filter tests
thiagohora Mar 3, 2026
d39c715
[OPIK-4380] [BE] Extract shared filter helpers into FilterQueryBuilder
thiagohora Mar 3, 2026
019e300
[OPIK-4382] [BE] Consolidate filter helpers in getExperimentItemsStat…
thiagohora Mar 3, 2026
b538376
Revision 9: Extract shared helpers to eliminate duplication across DAOs
thiagohora Mar 3, 2026
2d80a7d
[OPIK-4383] [BE] Add experiment aggregate event listener and no-op pu…
thiagohora Mar 4, 2026
6634138
Revision 2: Fix missing import for ExperimentAggregationPublisher
thiagohora Mar 4, 2026
613e8d4
[OPIK-4383] [BE] Add ExperimentAggregationPublisher, ExperimentDenorm…
thiagohora Mar 4, 2026
1d9064f
Merge branch 'main' into thiagohora/OPIK-4382_metrics_computation_ser…
thiagohora Mar 4, 2026
6b1fa21
Merge branch 'thiagohora/OPIK-4382_metrics_computation_service' into …
thiagohora Mar 4, 2026
bff0682
Merge branch 'thiagohora/OPIK-4383-redis-stream-subscriber-experiment…
thiagohora Mar 4, 2026
3c06cf8
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 4, 2026
bfdc17d
Fix tests setup
thiagohora Mar 4, 2026
98460ea
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 4, 2026
ad3d567
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 4, 2026
4e165e8
[OPIK-4383] [BE] Address PR review: move DAO logs to service layer
thiagohora Mar 4, 2026
b04d5c0
[OPIK-4383] [BE] Address PR review: extract shared DAO helper and fix…
thiagohora Mar 4, 2026
7324724
[OPIK-4383] [BE] Short-circuit deleteByTraceIds when no spans found
thiagohora Mar 5, 2026
388c1f1
[OPIK-4383] [BE] Fix cascade deletion failures after trace delete
thiagohora Mar 5, 2026
f2acb60
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 5, 2026
1fecbac
[OPIK-4383] [BE] Address PR review comments on ExperimentDenormalizat…
thiagohora Mar 5, 2026
7d35979
Fix @Every job interval config key casing and add jobs section to tes…
thiagohora Mar 5, 2026
88099b9
Replace @Every annotation with programmatic Quartz scheduling
thiagohora Mar 5, 2026
aae95b4
Add experiment context to error log and extract publishIfNotEmpty helper
thiagohora Mar 5, 2026
7b97e6e
Fix NPE in ExperimentAggregateEventListenerTest mock setup
thiagohora Mar 5, 2026
64d14c3
Merge branch 'main' into thiagohora/OPIK-4383_add_experiment_aggregat…
thiagohora Mar 9, 2026
08a96c8
[OPIK-4383] [BE] Remove DAO-level log.info from ExperimentAggregatesD…
thiagohora Mar 10, 2026
3904111
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 10, 2026
98f697c
Remove accidentally committed doc files
thiagohora Mar 10, 2026
7cfb60f
[OPIK-4383] [BE] refactor: extract triggerAggregation helper to centr…
thiagohora Mar 10, 2026
c43e7e7
[OPIK-4383] [BE] fix: restore TagOperations.tagUpdateFragment in Span…
thiagohora Mar 10, 2026
d59daca
Merge branch 'thiagohora/OPIK-4383_add_experiment_aggregate_event_lis…
thiagohora Mar 10, 2026
8931a39
Merge branch 'main' into thiagohora/OPIK-4383_add_experiment_aggregat…
thiagohora Mar 11, 2026
3a74920
Adding InterruptableJob
thiagohora Mar 11, 2026
80c35d3
[OPIK-4383] [BE] Address PR review: expand safety valve, env var prefix
thiagohora Mar 11, 2026
c4b493d
Merge branch 'main' into thiagohora/OPIK-4383_add_experiment_aggregat…
thiagohora Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ experimentDenormalization:
# Default: 3
# Description: Maximum number of retry attempts for failed messages (1-10)
maxRetries: ${OPIK_REDIS_EXPERIMENT_DENORM_MAX_RETRIES:-3}
# Default: 100
# Description: Maximum number of ZSET entries to read per page when flushing pending experiments (10-1000)
jobBatchSize: ${OPIK_EXPERIMENT_DENORM_JOB_BATCH_SIZE:-100}
# Default: 5s
# Description: Interval at which the denormalization job runs to flush pending experiments (min 1s)
jobInterval: ${OPIK_EXPERIMENT_DENORM_JOB_INTERVAL:-5s}
# Default: 10000
Comment thread
thiagohora marked this conversation as resolved.
# Description: Maximum length of the Redis stream. Older entries are trimmed when this limit is exceeded.
streamMaxLen: ${OPIK_REDIS_EXPERIMENT_DENORM_STREAM_MAX_LEN:-10000}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public void onExperimentUpdated(ExperimentUpdated event) {
return;
}
if (FINISHED_STATUSES.contains(event.newStatus())) {
publisher.publish(Set.of(event.experimentId()), event.workspaceId(), event.userName());
publisher.publish(Set.of(event.experimentId()), event.workspaceId(), event.userName())
.subscribe(null,
e -> log.error(
"Error triggering aggregation for experiment '{}' in workspace '{}'",
event.experimentId(), event.workspaceId(), e));
}
}

Expand Down Expand Up @@ -207,8 +211,14 @@ private Mono<Void> triggerAggregation(Set<UUID> entityIds, String workspaceId, S
.contextWrite(ctx -> ctx
.put(RequestContext.WORKSPACE_ID, workspaceId)
.put(RequestContext.USER_NAME, userName))
.filter(CollectionUtils::isNotEmpty)
.doOnNext(experimentIds -> publisher.publish(experimentIds, workspaceId, userName))
.then();
.flatMap(experimentIds -> publishIfNotEmpty(experimentIds, workspaceId, userName));
}

private Mono<Void> publishIfNotEmpty(Set<UUID> experimentIds, String workspaceId, String userName) {
if (CollectionUtils.isEmpty(experimentIds)) {
log.warn("No finished experiments to publish for workspace '{}'", workspaceId);
return Mono.empty();
}
return publisher.publish(experimentIds, workspaceId, userName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package com.comet.opik.api.resources.v1.jobs;

import com.comet.opik.api.events.ExperimentAggregationMessage;
import com.comet.opik.infrastructure.ExperimentDenormalizationConfig;
import com.comet.opik.infrastructure.lock.LockService;
import io.dropwizard.jobs.Job;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.UnableToInterruptJobException;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.stream.StreamAddArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.vyarus.dropwizard.guice.module.yaml.bind.Config;

import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.comet.opik.infrastructure.lock.LockService.Lock;

/**
* Scheduled job responsible for flushing debounced experiment aggregation events to the Redis stream.
*
* <p>Periodically (default 5s, configurable via {@code experimentDenormalization.jobInterval}) this job:
* <ol>
* <li>Queries the Redis ZSET index for members whose debounce window has elapsed.</li>
* <li>Each ZSET member encodes both workspaceId and experimentId as {@code "workspaceId:experimentId"},
* ensuring cross-workspace isolation for experiments that share the same UUID.</li>
* <li>Reads the userName from the associated Redis hash bucket.</li>
* <li>Publishes an {@link ExperimentAggregationMessage} to the Redis stream for each ready experiment.</li>
* <li>Removes the processed entry from both the ZSET index and the hash bucket.</li>
* <li>Handles stale ZSET entries (expired hash) by removing them without publishing.</li>
* </ol>
*
* <p>Uses a ZSET scored by expiry timestamp for O(log(N)+M) index lookups, avoiding full keyspace scans.
*/
@Slf4j
@Singleton
@DisallowConcurrentExecution
public class ExperimentDenormalizationJob extends Job implements InterruptableJob {

private static final Lock SCAN_LOCK_KEY = new Lock("experiment_denormalization_job:scan_lock");

private final AtomicBoolean interrupted = new AtomicBoolean(false);
private final ExperimentDenormalizationConfig config;
private final RedissonReactiveClient redisClient;
private final LockService lockService;

@Inject
public ExperimentDenormalizationJob(
@NonNull @Config("experimentDenormalization") ExperimentDenormalizationConfig config,
@NonNull RedissonReactiveClient redisClient,
@NonNull LockService lockService) {
this.config = config;
this.redisClient = redisClient;
this.lockService = lockService;
Comment thread
thiagohora marked this conversation as resolved.
}

@Override
public void doJob(JobExecutionContext context) {
if (!config.isEnabled()) {
log.debug("Experiment denormalization job is disabled, skipping");
return;
}

// Check for interruption before starting
if (interrupted.get()) {
log.info("Experiment denormalization job interrupted before execution, skipping");
return;
}

log.info("Starting experiment denormalization job - checking for pending experiments");

lockService.bestEffortLock(
SCAN_LOCK_KEY,
Mono.defer(() -> getExperimentsReadyToProcess()
.flatMap(this::processExperiment)
.onErrorContinue((throwable, experimentId) -> log.error(
Comment thread
thiagohora marked this conversation as resolved.
"Failed to process pending experiment '{}'",
experimentId, throwable))
.doOnComplete(
() -> log.info(
"Experiment denormalization job finished processing all ready experiments"))
.then()),
Mono.defer(() -> {
log.info(
"Could not acquire lock for scanning pending experiments, another job instance is running");
return Mono.empty();
}),
config.getJobLockTime().toJavaDuration(),
config.getJobLockWaitTime().toJavaDuration())
.subscribe(
__ -> log.info("Experiment denormalization job execution completed"),
error -> log.error("Experiment denormalization job interrupted while acquiring lock", error));
}

/**
* Queries the ZSET index in pages for experiment IDs whose debounce window has elapsed (score &lt;= now).
* Uses offset/count pagination to avoid materializing the entire ZSET into memory.
* Since each processed experiment is removed from the ZSET, we always query from offset 0.
* A safety counter caps the number of expand iterations (using batchSize as the limit) to
* prevent infinite loops if entries fail to be removed (e.g., due to errors swallowed by onErrorContinue).
*/
private Flux<String> getExperimentsReadyToProcess() {
long nowMillis = Instant.now().toEpochMilli();
int batchSize = config.getJobBatchSize();
var index = redisClient.<String>getScoredSortedSet(ExperimentDenormalizationConfig.PENDING_SET_KEY);
var iterations = new int[]{0};

log.debug("Checking for experiments ready to process (up to timestamp: '{}', batchSize: '{}')",
nowMillis, batchSize);

if (interrupted.get()) {
log.info("Experiment denormalization job interrupted before execution, skipping");
return Flux.empty();
}

return index.valueRange(Double.NEGATIVE_INFINITY, true, nowMillis, true, 0, batchSize)
.expand(collection -> {
if (collection.size() < batchSize) {
return Mono.empty();
}
iterations[0]++;
if (iterations[0] >= batchSize) {
log.warn("Reached maximum expand iterations '{}', stopping pagination to prevent infinite loop",
batchSize);
return Mono.empty();
}
return index.valueRange(Double.NEGATIVE_INFINITY, true, nowMillis, true, 0, batchSize);
})
.flatMapIterable(Function.identity())
.map(Object::toString);
Comment thread
thiagohora marked this conversation as resolved.
}

/**
* Processes a single pending experiment: publishes a stream message and cleans up the Redis state.
* The {@code member} is a compound key of the form {@code "workspaceId:experimentId"}, which
* ensures experiments with the same UUID in different workspaces are handled independently.
* If the hash bucket has already expired (stale ZSET entry), only the ZSET entry is removed.
*/
private Mono<Void> processExperiment(String member) {
int separatorIndex = member.indexOf(ExperimentDenormalizationConfig.MEMBER_SEPARATOR);
String workspaceId = member.substring(0, separatorIndex);
String experimentIdStr = member.substring(separatorIndex + 1);

Comment thread
thiagohora marked this conversation as resolved.
log.info("Processing pending experiment: '{}' for workspace: '{}'", experimentIdStr, workspaceId);

var bucket = redisClient.<String, String>getMap(ExperimentDenormalizationConfig.EXPERIMENT_KEY_PREFIX + member);
var index = redisClient.getScoredSortedSet(ExperimentDenormalizationConfig.PENDING_SET_KEY);
var stream = redisClient.getStream(config.getStreamName(), config.getCodec());

return bucket.get(ExperimentDenormalizationConfig.USER_NAME_FIELD)
.flatMap(userName -> {
var message = ExperimentAggregationMessage.builder()
.experimentId(UUID.fromString(experimentIdStr))
.workspaceId(workspaceId)
.userName(userName)
.build();

return stream.add(StreamAddArgs.entry(ExperimentDenormalizationConfig.PAYLOAD_FIELD, message))
.doOnNext(id -> log.info(
"Enqueued aggregation message for experiment '{}' with stream id '{}'",
experimentIdStr, id))
.then(bucket.delete())
.then(index.remove(member));
})
.switchIfEmpty(Mono.defer(() -> {
log.warn("Stale index entry found with no bucket data, removing member: '{}'", member);
return index.remove(member);
}))
.then()
.doOnSuccess(__ -> log.info("Successfully processed and removed pending experiment: '{}'",
experimentIdStr));
}

@Override
public void interrupt() throws UnableToInterruptJobException {
interrupted.set(true);
log.info("ExperimentDenormalizationJob reaper job interrupted");
}
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,69 @@
package com.comet.opik.domain.experiments.aggregations;

import com.comet.opik.infrastructure.ExperimentDenormalizationConfig;
import com.google.inject.ImplementedBy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMapReactive;
import org.redisson.api.RedissonReactiveClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.vyarus.dropwizard.guice.module.yaml.bind.Config;

import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.UUID;

@ImplementedBy(ExperimentAggregationPublisher.ExperimentAggregationPublisherImpl.class)
public interface ExperimentAggregationPublisher {

void publish(@NonNull Set<UUID> experimentIds, @NonNull String workspaceId, @NonNull String userName);
Mono<Void> publish(@NonNull Set<UUID> experimentIds, @NonNull String workspaceId, @NonNull String userName);

@Singleton
@Slf4j
class ExperimentAggregationPublisherImpl implements ExperimentAggregationPublisher {

private final ExperimentDenormalizationConfig config;
private final RedissonReactiveClient redisClient;

@Inject
ExperimentAggregationPublisherImpl() {
ExperimentAggregationPublisherImpl(
@NonNull @Config("experimentDenormalization") ExperimentDenormalizationConfig config,
@NonNull RedissonReactiveClient redisClient) {
this.config = config;
this.redisClient = redisClient;
}

@Override
public void publish(@NonNull Set<UUID> experimentIds, @NonNull String workspaceId,
public Mono<Void> publish(@NonNull Set<UUID> experimentIds, @NonNull String workspaceId,
@NonNull String userName) {
// TODO: implement debounce mechanism before enqueuing to Redis stream
log.debug(
"Experiment aggregation publish skipped for experiments '{}': debounce mechanism not yet implemented",
experimentIds);
if (!config.isEnabled() || experimentIds.isEmpty()) {
Comment thread
thiagohora marked this conversation as resolved.
log.info("Skipping publish: enabled='{}', experimentIds.size='{}'",
config.isEnabled(), experimentIds.size());
return Mono.empty();
}

Instant expiryTimestamp = Instant.now().plusMillis(config.getDebounceDelay().toMilliseconds());
var index = redisClient.getScoredSortedSet(ExperimentDenormalizationConfig.PENDING_SET_KEY);

Comment thread
thiagohora marked this conversation as resolved.
return Flux.fromIterable(experimentIds)
.flatMap(experimentId -> {
String member = workspaceId + ":" + experimentId;
RMapReactive<String, String> bucket = redisClient
.getMap(ExperimentDenormalizationConfig.EXPERIMENT_KEY_PREFIX + member);

return index.add(expiryTimestamp.toEpochMilli(), member)
.then(bucket.put(ExperimentDenormalizationConfig.USER_NAME_FIELD, userName))
.then(bucket.expire(Duration.ofMillis(config.getDebounceDelay().toMilliseconds() * 2)))
.doOnSuccess(__ -> log.info(
"Enqueued experiment '{}' for workspace '{}' in pending bucket with expiryTimestamp='{}'",
experimentId, workspaceId, expiryTimestamp));
})
.doOnError(error -> log.error("Error enqueueing experiments in pending bucket", error))
.then();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class ExperimentDenormalizationConfig implements StreamConfiguration {

public static final String PAYLOAD_FIELD = "message";
public static final String PENDING_SET_KEY = "experiment:denorm:pending";
public static final String EXPERIMENT_KEY_PREFIX = PENDING_SET_KEY + ":";
public static final String USER_NAME_FIELD = "userName";
public static final String MEMBER_SEPARATOR = ":";

@JsonProperty
private boolean enabled = false;
Expand Down Expand Up @@ -76,6 +79,13 @@ public class ExperimentDenormalizationConfig implements StreamConfiguration {
@JsonProperty
@Min(1) @Max(10) private int maxRetries = 3;

@Valid @JsonProperty
@Min(10) @Max(1000) private int jobBatchSize = 100;

@Valid @JsonProperty
@NotNull @MinDuration(value = 1, unit = TimeUnit.SECONDS)
private Duration jobInterval = Duration.seconds(5);

@Override
@JsonIgnore
public Codec getCodec() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.comet.opik.infrastructure.bi;

import com.comet.opik.api.resources.v1.jobs.DatasetVersionItemsTotalMigrationJob;
import com.comet.opik.api.resources.v1.jobs.ExperimentDenormalizationJob;
import com.comet.opik.api.resources.v1.jobs.TraceThreadsClosingJob;
import com.comet.opik.domain.alerts.MetricsAlertJob;
import com.comet.opik.infrastructure.ExperimentDenormalizationConfig;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.TraceThreadConfig;
import com.google.inject.Injector;
Expand Down Expand Up @@ -42,6 +44,7 @@ public void onEvent(GuiceyLifecycleEvent event) {
setupDailyJob();
setTraceThreadsClosingJob();
setMetricsAlertJob();
setExperimentDenormalizationJob();
scheduleDatasetVersionItemsTotalMigrationJobIfEnabled();
}

Expand Down Expand Up @@ -111,6 +114,41 @@ private void setTraceThreadsClosingJob() {
}
}

// This method sets up a job that periodically flushes debounced experiment aggregation events.
private void setExperimentDenormalizationJob() {
Comment thread
thiagohora marked this conversation as resolved.
ExperimentDenormalizationConfig denormConfig = injector.get().getInstance(OpikConfiguration.class)
.getExperimentDenormalization();

if (!denormConfig.isEnabled()) {
log.info("Experiment denormalization job is disabled, skipping job setup");
return;
}

Duration jobInterval = denormConfig.getJobInterval().toJavaDuration();

var jobDetail = JobBuilder.newJob(ExperimentDenormalizationJob.class)
.storeDurably()
.build();

var trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.startNow()
.withSchedule(
org.quartz.SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMilliseconds(jobInterval.toMillis())
.repeatForever())
.build();

try {
var scheduler = getScheduler();
scheduler.addJob(jobDetail, false);
scheduler.scheduleJob(trigger);
log.info("Experiment denormalization job scheduled successfully with interval '{}'", jobInterval);
} catch (SchedulerException e) {
log.error("Failed to schedule experiment denormalization job", e);
}
}

// This method sets up a job that periodically evaluates metrics-based alerts for cost and latency thresholds.
private void setMetricsAlertJob() {
try {
Expand Down
Loading
Loading