Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -429,10 +430,10 @@ public Response getDatasetItems(
}), maxItems = 2000)))
})
public ChunkedOutput<JsonNode> streamDatasetItems(
@RequestBody(content = @Content(schema = @Schema(implementation = DatasetItemStreamRequest.class))) @NotNull @Valid DatasetItemStreamRequest request) {
var workspaceId = requestContext.get().getWorkspaceId();
var userName = requestContext.get().getUserName();
var visibility = requestContext.get().getVisibility();
@RequestBody(content = @Content(schema = @Schema(implementation = DatasetItemStreamRequest.class))) @NotNull @Valid DatasetItemStreamRequest request,
@Context HttpServletResponse httpResponse) {
var ctxSnapshot = requestContext.get();
var workspaceId = ctxSnapshot.getWorkspaceId();

// Suppress unchecked cast warning since we already pass DatasetItemFilter reference to newFilters
@SuppressWarnings("unchecked")
Expand All @@ -441,12 +442,24 @@ public ChunkedOutput<JsonNode> streamDatasetItems(

log.info("Streaming dataset items for dataset '{}', projectName '{}' on workspaceId '{}'",
request.datasetName(), request.projectName(), workspaceId);
var items = itemService.getItems(workspaceId, request, queryFilters, visibility)
.contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName)
.put(RequestContext.WORKSPACE_ID, workspaceId));

service.resolveDatasetByName(DatasetIdentifier.builder()
.datasetName(request.datasetName())
.projectName(request.projectName())
.build());

var items = itemService.getItems(workspaceId, request, queryFilters)
.contextWrite(ctx -> setRequestContext(ctx, ctxSnapshot));
var outputStream = streamer.getOutputStream(items);
Comment thread
thiagohora marked this conversation as resolved.

log.info("Streamed dataset items for dataset '{}', projectName '{}' on workspaceId '{}'",
request.datasetName(), request.projectName(), workspaceId);

String fallbackMessage = requestContext.get().getWorkspaceFallbackMessage();
if (fallbackMessage != null) {
httpResponse.addHeader(RequestContext.WORKSPACE_FALLBACK_HEADER, fallbackMessage);
Comment thread
thiagohora marked this conversation as resolved.
}

return outputStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ Mono<Void> createFromSpans(UUID datasetId, Set<UUID> spanIds,

Mono<DatasetItemPage> getItems(int page, int size, DatasetItemSearchCriteria datasetItemSearchCriteria);

Flux<DatasetItem> getItems(String workspaceId, DatasetItemStreamRequest request,
List<DatasetItemFilter> filters, Visibility visibility);
Flux<DatasetItem> getItems(String workspaceId, DatasetItemStreamRequest request, List<DatasetItemFilter> filters);

Mono<PageColumns> getOutputColumns(UUID datasetId, Set<UUID> experimentIds);

Expand Down Expand Up @@ -730,17 +729,16 @@ private String createChangeDescription(long count, boolean isFilterBased) {

@WithSpan
public Flux<DatasetItem> getItems(@NonNull String workspaceId, @NonNull DatasetItemStreamRequest request,
@NonNull List<DatasetItemFilter> filters, Visibility visibility) {
@NonNull List<DatasetItemFilter> filters) {
log.info("Getting dataset items for dataset '{}' (hasFilters={}), version='{}', workspaceId='{}'",
request.datasetName(), !filters.isEmpty(),
request.datasetVersion(), workspaceId);

return datasetService.resolveDatasetByName(
return datasetService.resolveDatasetByNameAsync(
DatasetIdentifier.builder()
.datasetName(request.datasetName())
.projectName(request.projectName())
.build(),
visibility)
.build())
.flatMap(dataset -> Mono.deferContextual(ctx -> {
// Ensure dataset is migrated if lazy migration is enabled
return ensureLazyMigration(dataset.id(), workspaceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface DatasetService {

Dataset findByName(String workspaceId, DatasetIdentifier identifier, Visibility visibility);

Mono<Dataset> resolveDatasetByName(DatasetIdentifier identifier, Visibility visibility);
Mono<Dataset> resolveDatasetByNameAsync(DatasetIdentifier identifier);

void delete(DatasetIdentifier identifier);

Expand All @@ -109,6 +109,8 @@ public interface DatasetService {
long getDailyCreatedCount();

void updateStatus(UUID id, String workspaceId, DatasetStatus status);

Dataset resolveDatasetByName(DatasetIdentifier identifier);
}

@Singleton
Expand Down Expand Up @@ -377,15 +379,37 @@ public Dataset findByName(@NonNull String workspaceId, @NonNull DatasetIdentifie
}

@Override
public Mono<Dataset> resolveDatasetByName(@NonNull DatasetIdentifier identifier, Visibility visibility) {
public Dataset resolveDatasetByName(@NonNull DatasetIdentifier identifier) {
String workspaceId = requestContext.get().getWorkspaceId();
return Mono.fromCallable(() -> {
UUID projectId = null;
if (StringUtils.isNotBlank(identifier.projectName())) {
projectId = projectService.findProjectIdByName(workspaceId, identifier.projectName()).orElse(null);
}
return findByName(workspaceId, identifier.datasetName(), projectId, visibility);
}).subscribeOn(Schedulers.boundedElastic());
UUID projectId = null;
boolean projectNameProvided = StringUtils.isNotBlank(identifier.projectName());
Comment thread
thiagohora marked this conversation as resolved.

if (projectNameProvided) {
projectId = projectService.findProjectIdByName(workspaceId, identifier.projectName()).orElse(null);
}

Dataset dataset = findByName(workspaceId, identifier.datasetName(), projectId,
requestContext.get().getVisibility());
// Project name was given but couldn't be resolved to a known project — dataset found workspace-wide
if (projectNameProvided && projectId == null) {
requestContext.get().setWorkspaceFallbackFor("Dataset", identifier.datasetName());
}
return dataset;
}

@Override
public Mono<Dataset> resolveDatasetByNameAsync(@NonNull DatasetIdentifier identifier) {
return Mono.deferContextual(ctx -> {
String workspaceId = ctx.get(RequestContext.WORKSPACE_ID);
Visibility visibility = ctx.get(RequestContext.VISIBILITY);
Comment thread
thiagohora marked this conversation as resolved.
return Mono.fromCallable(() -> {
UUID projectId = null;
if (StringUtils.isNotBlank(identifier.projectName())) {
projectId = projectService.findProjectIdByName(workspaceId, identifier.projectName()).orElse(null);
}
return findByName(workspaceId, identifier.datasetName(), projectId, visibility);
}).subscribeOn(Schedulers.boundedElastic());
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ public static Context setRequestContext(Context ctx, Provider<RequestContext> re
Optional.ofNullable(requestContext.get().getVisibility()).orElse(Visibility.PRIVATE));
}

public static Context setRequestContext(Context ctx, RequestContext requestContext) {
return ctx.put(RequestContext.USER_NAME, requestContext.getUserName())
.put(RequestContext.WORKSPACE_ID, requestContext.getWorkspaceId())
.put(RequestContext.WORKSPACE_NAME, requestContext.getWorkspaceName())
.put(RequestContext.VISIBILITY,
Optional.ofNullable(requestContext.getVisibility()).orElse(Visibility.PRIVATE));
}

public static Context setRequestContext(Context ctx, String workspaceId, String userName, Visibility visibility) {
return ctx.put(RequestContext.USER_NAME, userName)
.put(RequestContext.WORKSPACE_ID, workspaceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,14 +882,7 @@ public Response callApplyDatasetItemChanges(UUID datasetId, DatasetItemChanges c

public List<DatasetItem> streamDatasetItems(DatasetItemStreamRequest request, String apiKey,
String workspaceName) {
try (var response = client.target(RESOURCE_PATH.formatted(baseURI))
.path("items")
.path("stream")
.request()
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(request))) {

try (var response = callStreamDatasetItems(request, apiKey, workspaceName)) {
assertThat(response.getStatusInfo().getStatusCode()).isEqualTo(HttpStatus.SC_OK);

// Read the chunked output as a string and parse each line as a DatasetItem
Expand All @@ -904,4 +897,14 @@ public List<DatasetItem> streamDatasetItems(DatasetItemStreamRequest request, St
.toList();
}
}

public Response callStreamDatasetItems(DatasetItemStreamRequest request, String apiKey, String workspaceName) {
return client.target(RESOURCE_PATH.formatted(baseURI))
.path("items")
.path("stream")
.request()
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(request));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.comet.opik.api.resources.v1.priv;

import com.comet.opik.api.Dataset;
import com.comet.opik.api.DatasetExpansion;
import com.comet.opik.api.DatasetExpansionResponse;
import com.comet.opik.api.DatasetIdentifier;
import com.comet.opik.api.DatasetItem;
import com.comet.opik.api.DatasetItemStreamRequest;
import com.comet.opik.api.Visibility;
Expand Down Expand Up @@ -97,6 +99,9 @@ void testStreamErrorHandling() {
when(requestContext.getWorkspaceId())
.thenReturn(workspaceId);

when(requestContext.getWorkspaceName())
.thenReturn(DEFAULT_WORKSPACE_NAME);

when(requestContext.getVisibility())
.thenReturn(Visibility.PRIVATE);

Expand All @@ -110,7 +115,10 @@ void testStreamErrorHandling() {

var request = DatasetItemStreamRequest.builder().datasetName(datasetName).steamLimit(500).build();

when(itemService.getItems(eq(workspaceId), eq(request), any(), eq(Visibility.PRIVATE)))
when(service.resolveDatasetByName(any(DatasetIdentifier.class)))
.thenReturn(factory.manufacturePojo(Dataset.class));

when(itemService.getItems(eq(workspaceId), eq(request), any()))
.thenReturn(Flux.defer(() -> itemFlux));

try (var response = EXT.target("/v1/private/datasets/items/stream")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4577,6 +4577,29 @@ void streamDataItems__whenNonExistingProjectName__thenReturnItemsWithoutProjectS

assertThat(actualItems).hasSize(items.size());
}

@Test
@DisplayName("when streaming dataset items with non-existing project_name, then return X-Opik-Deprecation header")
void streamDataItems__whenNonExistingProjectName__thenReturnDeprecationHeader() {
var batch = DatasetResourceClient.buildDatasetItemBatch(factory).toBuilder()
.items(List.of(DatasetResourceClient.buildDatasetItem(factory).toBuilder().id(null).build()))
.datasetId(null)
.build();

putAndAssert(batch, TEST_WORKSPACE, API_KEY);

var streamRequest = DatasetItemStreamRequest.builder()
.datasetName(batch.datasetName())
Comment thread
thiagohora marked this conversation as resolved.
.projectName("nonexistent-project-" + UUID.randomUUID())
.build();

try (var response = datasetResourceClient.callStreamDatasetItems(streamRequest, API_KEY, TEST_WORKSPACE)) {
assertThat(response.getStatusInfo().getStatusCode()).isEqualTo(200);
assertThat(response.getHeaderString(RequestContext.WORKSPACE_FALLBACK_HEADER))
.isEqualTo(RequestContext.WORKSPACE_FALLBACK_MESSAGE_TEMPLATE.formatted("Dataset",
batch.datasetName()));
}
}
}

private DatasetItem getItemAndAssert(DatasetItem expectedDatasetItem, String workspaceName, String apiKey) {
Expand Down
Loading