diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e11f3e2f1cfb..9800e2708917e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added +- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920)) ### Changed diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index d2eea496887c1..6b0855b1a1c3f 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -121,6 +121,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()); processors.put(HierarchicalRoutingProcessor.TYPE, new HierarchicalRoutingProcessor.Factory()); + processors.put(TemporalRoutingProcessor.TYPE, new TemporalRoutingProcessor.Factory()); processors.put(AclRoutingProcessor.TYPE, new AclRoutingProcessor.Factory()); return filterForAllowlistSetting(parameters.env.settings(), processors); } diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/TemporalRoutingProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/TemporalRoutingProcessor.java new file mode 100644 index 0000000000000..dd3eb5dce4f27 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/TemporalRoutingProcessor.java @@ -0,0 +1,319 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.common.Nullable; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.time.DateFormatters; +import org.opensearch.core.common.Strings; +import org.opensearch.ingest.AbstractProcessor; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAccessor; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * Processor that sets document routing based on temporal structure. + * + * This processor extracts a timestamp from a specified field, truncates it + * to a configurable granularity (hour/day/week/month), and uses the resulting + * temporal bucket to compute a routing value for improved temporal locality. + * + * Introduced in OpenSearch 3.2.0 to enable intelligent document co-location + * based on time-based patterns for log and metrics workloads. + */ +public final class TemporalRoutingProcessor extends AbstractProcessor { + + public static final String TYPE = "temporal_routing"; + private static final String DEFAULT_FORMAT = "strict_date_optional_time"; + + private final String timestampField; + private final Granularity granularity; + private final DateFormatter dateFormatter; + private final boolean ignoreMissing; + private final boolean overrideExisting; + private final boolean hashBucket; + + /** + * Supported temporal granularities + */ + public enum Granularity { + /** Hour granularity for hourly bucketing */ + HOUR(ChronoUnit.HOURS), + /** Day granularity for daily bucketing */ + DAY(ChronoUnit.DAYS), + /** Week granularity for weekly bucketing (ISO week) */ + WEEK(ChronoUnit.WEEKS), + /** Month granularity for monthly bucketing */ + MONTH(ChronoUnit.MONTHS); + + private final ChronoUnit chronoUnit; + + Granularity(ChronoUnit chronoUnit) { + this.chronoUnit = chronoUnit; + } + + /** + * Gets the ChronoUnit associated with this granularity + * @return the ChronoUnit + */ + public ChronoUnit getChronoUnit() { + return chronoUnit; + } + + /** + * Parses a string value to a Granularity enum + * @param value the string representation of the granularity + * @return the corresponding Granularity enum value + * @throws IllegalArgumentException if the value is not valid + */ + public static Granularity fromString(String value) { + try { + return valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid granularity: " + value + ". Supported values are: hour, day, week, month"); + } + } + } + + TemporalRoutingProcessor( + String tag, + @Nullable String description, + String timestampField, + Granularity granularity, + String format, + boolean ignoreMissing, + boolean overrideExisting, + boolean hashBucket + ) { + super(tag, description); + this.timestampField = timestampField; + this.granularity = granularity; + this.dateFormatter = DateFormatter.forPattern(format); + this.ignoreMissing = ignoreMissing; + this.overrideExisting = overrideExisting; + this.hashBucket = hashBucket; + } + + @Override + public IngestDocument execute(IngestDocument document) throws Exception { + // Check if routing already exists and we shouldn't override + if (!overrideExisting) { + try { + Object existingRouting = document.getFieldValue("_routing", Object.class, true); + if (existingRouting != null) { + return document; + } + } catch (Exception e) { + // Field doesn't exist, continue with processing + } + } + + Object timestampValue = document.getFieldValue(timestampField, Object.class, ignoreMissing); + + if (timestampValue == null && ignoreMissing) { + return document; + } + + if (timestampValue == null) { + throw new IllegalArgumentException("field [" + timestampField + "] not present as part of path [" + timestampField + "]"); + } + + String routingValue = computeRoutingValue(timestampValue.toString()); + document.setFieldValue("_routing", routingValue); + + return document; + } + + /** + * Computes routing value from timestamp by truncating to granularity + * and optionally hashing for distribution + */ + private String computeRoutingValue(String timestamp) { + // Parse timestamp using DateFormatter and convert to ZonedDateTime + TemporalAccessor accessor = dateFormatter.parse(timestamp); + ZonedDateTime dateTime = DateFormatters.from(accessor, Locale.ROOT, ZoneOffset.UTC); + + // Truncate to granularity + ZonedDateTime truncated = truncateToGranularity(dateTime); + + // Create temporal bucket key + String temporalBucket = createTemporalBucketKey(truncated); + + // Optionally hash for distribution + if (hashBucket) { + byte[] bucketBytes = temporalBucket.getBytes(StandardCharsets.UTF_8); + long hash = MurmurHash3.hash128(bucketBytes, 0, bucketBytes.length, 0, new MurmurHash3.Hash128()).h1; + return String.valueOf(hash == Long.MIN_VALUE ? 0L : (hash < 0 ? -hash : hash)); + } + + return temporalBucket; + } + + /** + * Truncates datetime to the specified granularity + * + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.truncateToGranularity() + * in the search-pipeline-common module to ensure consistent temporal bucketing. + */ + private ZonedDateTime truncateToGranularity(ZonedDateTime dateTime) { + switch (granularity) { + case HOUR: + return dateTime.withMinute(0).withSecond(0).withNano(0); + case DAY: + return dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); + case WEEK: + // Truncate to start of week (Monday) + ZonedDateTime dayTruncated = dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); + return dayTruncated.with(java.time.temporal.TemporalAdjusters.previousOrSame(java.time.DayOfWeek.MONDAY)); + case MONTH: + return dateTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + default: + throw new IllegalArgumentException("Unsupported granularity: " + granularity); + } + } + + /** + * Creates a string key for the temporal bucket + * + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.createTemporalBucket() + * in the search-pipeline-common module. Both processors must generate identical bucket keys for the + * same input to ensure documents are routed to the same shards during ingest and search. + * + * TODO: Consider moving this shared logic to a common module when search and ingest pipelines + * can share code more easily. + */ + private String createTemporalBucketKey(ZonedDateTime truncated) { + switch (granularity) { + case HOUR: + return truncated.getYear() + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()) + + "T" + + String.format(Locale.ROOT, "%02d", truncated.getHour()); + case DAY: + return truncated.getYear() + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()); + case WEEK: + // Use ISO week format: YYYY-WNN + int weekOfYear = truncated.get(java.time.temporal.WeekFields.ISO.weekOfWeekBasedYear()); + int weekYear = truncated.get(java.time.temporal.WeekFields.ISO.weekBasedYear()); + return weekYear + "-W" + String.format(Locale.ROOT, "%02d", weekOfYear); + case MONTH: + return truncated.getYear() + "-" + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()); + default: + throw new IllegalArgumentException("Unsupported granularity: " + granularity); + } + } + + @Override + public String getType() { + return TYPE; + } + + String getTimestampField() { + return timestampField; + } + + Granularity getGranularity() { + return granularity; + } + + DateFormatter getDateFormatter() { + return dateFormatter; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + boolean isOverrideExisting() { + return overrideExisting; + } + + boolean isHashBucket() { + return hashBucket; + } + + /** + * Factory for creating TemporalRoutingProcessor instances + */ + public static final class Factory implements Processor.Factory { + + @Override + public TemporalRoutingProcessor create( + Map processorFactories, + String tag, + @Nullable String description, + Map config + ) throws Exception { + + String timestampField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "timestamp_field"); + String granularityStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "granularity"); + String format = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "format"); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); + boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override_existing", true); + boolean hashBucket = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "hash_bucket", false); + + // Set default format if not provided + if (format == null) { + format = DEFAULT_FORMAT; + } + + // Validation + if (Strings.isNullOrEmpty(timestampField)) { + throw newConfigurationException(TYPE, tag, "timestamp_field", "cannot be null or empty"); + } + + if (Strings.isNullOrEmpty(granularityStr)) { + throw newConfigurationException(TYPE, tag, "granularity", "cannot be null or empty"); + } + + Granularity granularity; + try { + granularity = Granularity.fromString(granularityStr); + } catch (IllegalArgumentException e) { + throw newConfigurationException(TYPE, tag, "granularity", e.getMessage()); + } + + // Validate date format + try { + DateFormatter.forPattern(format); + } catch (Exception e) { + throw newConfigurationException(TYPE, tag, "format", "invalid date format: " + e.getMessage()); + } + + return new TemporalRoutingProcessor( + tag, + description, + timestampField, + granularity, + format, + ignoreMissing, + overrideExisting, + hashBucket + ); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java index 620a3c4639256..149c27c5eb789 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java @@ -75,6 +75,7 @@ public void testAllowlistNotSpecified() throws IOException { "uppercase", "split", "hierarchical_routing", + "temporal_routing", "acl_routing" ); assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet()); diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/TemporalRoutingProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/TemporalRoutingProcessorTests.java new file mode 100644 index 0000000000000..d80c3d74db95c --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/TemporalRoutingProcessorTests.java @@ -0,0 +1,298 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TemporalRoutingProcessorTests extends OpenSearchTestCase { + + public void testExecuteHourlyGranularity() throws Exception { + TemporalRoutingProcessor processor = createProcessor("@timestamp", "hour", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("@timestamp", "2023-12-15T14:30:45.123Z"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-12-15T14")); + } + + public void testExecuteDailyGranularity() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-12-15")); + } + + public void testExecuteWeeklyGranularity() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "week", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + // Friday, December 15, 2023 - should be week 50 of 2023 + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-W50")); + } + + public void testExecuteMonthlyGranularity() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "month", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-12")); + } + + public void testExecuteWithHashBucket() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, true); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + String routing = result.getFieldValue("_routing", String.class); + assertThat(routing, notNullValue()); + // Should be a numeric hash, not the date string + assertThat(routing.matches("\\d+"), equalTo(true)); + } + + public void testIgnoreMissingField() throws Exception { + TemporalRoutingProcessor processor = createProcessor("missing_field", "day", "strict_date_optional_time", true, true, false); + + Map document = new HashMap<>(); + document.put("other_field", "value"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + // Should not have added routing + assertFalse(result.hasField("_routing")); + } + + public void testMissingFieldThrowsException() throws Exception { + TemporalRoutingProcessor processor = createProcessor("missing_field", "day", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("other_field", "value"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + assertThat(exception.getMessage(), containsString("field [missing_field] not present as part of path [missing_field]")); + } + + public void testOverrideExistingRouting() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + document.put("_routing", "existing_routing"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + // Should override existing routing + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-12-15")); + } + + public void testPreserveExistingRouting() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, false, false); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15T14:30:45.123Z"); + document.put("_routing", "existing_routing"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + // Should preserve existing routing + assertThat(result.getFieldValue("_routing", String.class), equalTo("existing_routing")); + } + + public void testInvalidDateFormat() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", "invalid-date"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + expectThrows(Exception.class, () -> processor.execute(ingestDocument)); + } + + public void testCustomDateFormat() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "yyyy-MM-dd HH:mm:ss", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", "2023-12-15 14:30:45"); + IngestDocument ingestDocument = new IngestDocument("index", "id", null, null, null, document); + + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result.getFieldValue("_routing", String.class), equalTo("2023-12-15")); + } + + public void testDifferentTimestampFields() throws Exception { + TemporalRoutingProcessor processor1 = createProcessor("created_at", "day", "strict_date_optional_time", false, true, false); + TemporalRoutingProcessor processor2 = createProcessor("updated_at", "day", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("created_at", "2023-12-15T14:30:45.123Z"); + document.put("updated_at", "2023-12-20T10:15:30.456Z"); + + IngestDocument ingestDocument1 = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + IngestDocument result1 = processor1.execute(ingestDocument1); + assertThat(result1.getFieldValue("_routing", String.class), equalTo("2023-12-15")); + + IngestDocument ingestDocument2 = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + IngestDocument result2 = processor2.execute(ingestDocument2); + assertThat(result2.getFieldValue("_routing", String.class), equalTo("2023-12-20")); + } + + public void testFactory() throws Exception { + TemporalRoutingProcessor.Factory factory = new TemporalRoutingProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("timestamp_field", "@timestamp"); + config.put("granularity", "day"); + config.put("format", "strict_date_optional_time"); + config.put("ignore_missing", false); + config.put("override_existing", true); + config.put("hash_bucket", false); + + TemporalRoutingProcessor processor = factory.create(Collections.emptyMap(), "test", "test processor", config); + + assertThat(processor.getType(), equalTo(TemporalRoutingProcessor.TYPE)); + assertThat(processor.getTimestampField(), equalTo("@timestamp")); + assertThat(processor.getGranularity(), equalTo(TemporalRoutingProcessor.Granularity.DAY)); + assertThat(processor.isIgnoreMissing(), equalTo(false)); + assertThat(processor.isOverrideExisting(), equalTo(true)); + assertThat(processor.isHashBucket(), equalTo(false)); + } + + public void testFactoryValidation() throws Exception { + TemporalRoutingProcessor.Factory factory = new TemporalRoutingProcessor.Factory(); + + // Test missing timestamp_field + Map config1 = new HashMap<>(); + config1.put("granularity", "day"); + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), "test", null, config1) + ); + assertThat(exception.getMessage(), containsString("timestamp_field")); + + // Test missing granularity + Map config2 = new HashMap<>(); + config2.put("timestamp_field", "@timestamp"); + exception = expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), "test", null, config2)); + assertThat(exception.getMessage(), containsString("granularity")); + + // Test invalid granularity + Map config3 = new HashMap<>(); + config3.put("timestamp_field", "@timestamp"); + config3.put("granularity", "invalid"); + exception = expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), "test", null, config3)); + assertThat(exception.getMessage(), containsString("Invalid granularity")); + + // Test invalid format + Map config4 = new HashMap<>(); + config4.put("timestamp_field", "@timestamp"); + config4.put("granularity", "day"); + config4.put("format", "invalid_format"); + exception = expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), "test", null, config4)); + assertThat(exception.getMessage(), containsString("invalid date format")); + } + + public void testAllGranularityTypes() throws Exception { + String timestamp = "2023-12-15T14:30:45.123Z"; + + // Test all granularity types + TemporalRoutingProcessor hourProcessor = createProcessor("timestamp", "hour", "strict_date_optional_time", false, true, false); + TemporalRoutingProcessor dayProcessor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, false); + TemporalRoutingProcessor weekProcessor = createProcessor("timestamp", "week", "strict_date_optional_time", false, true, false); + TemporalRoutingProcessor monthProcessor = createProcessor("timestamp", "month", "strict_date_optional_time", false, true, false); + + Map document = new HashMap<>(); + document.put("timestamp", timestamp); + + IngestDocument hourDoc = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + IngestDocument dayDoc = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + IngestDocument weekDoc = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + IngestDocument monthDoc = new IngestDocument("index", "id", null, null, null, new HashMap<>(document)); + + assertThat(hourProcessor.execute(hourDoc).getFieldValue("_routing", String.class), equalTo("2023-12-15T14")); + assertThat(dayProcessor.execute(dayDoc).getFieldValue("_routing", String.class), equalTo("2023-12-15")); + assertThat(weekProcessor.execute(weekDoc).getFieldValue("_routing", String.class), equalTo("2023-W50")); + assertThat(monthProcessor.execute(monthDoc).getFieldValue("_routing", String.class), equalTo("2023-12")); + } + + public void testConsistentHashingForSameTimeWindow() throws Exception { + TemporalRoutingProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", false, true, true); + + // Different timestamps within same day should produce same routing + Map document1 = new HashMap<>(); + document1.put("timestamp", "2023-12-15T08:00:00Z"); + + Map document2 = new HashMap<>(); + document2.put("timestamp", "2023-12-15T16:30:45Z"); + + IngestDocument ingestDoc1 = new IngestDocument("index", "id1", null, null, null, document1); + IngestDocument ingestDoc2 = new IngestDocument("index", "id2", null, null, null, document2); + + IngestDocument result1 = processor.execute(ingestDoc1); + IngestDocument result2 = processor.execute(ingestDoc2); + + // Should have same routing since they're on the same day + assertThat(result1.getFieldValue("_routing", String.class), equalTo(result2.getFieldValue("_routing", String.class))); + } + + // Helper method to create processor + private TemporalRoutingProcessor createProcessor( + String timestampField, + String granularity, + String format, + boolean ignoreMissing, + boolean overrideExisting, + boolean hashBucket + ) { + return new TemporalRoutingProcessor( + "test", + "test processor", + timestampField, + TemporalRoutingProcessor.Granularity.fromString(granularity), + format, + ignoreMissing, + overrideExisting, + hashBucket + ); + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index 69467ea68c395..2ce32b14bef31 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -83,6 +83,8 @@ public Map> getRequestProcesso new OversampleRequestProcessor.Factory(), HierarchicalRoutingSearchProcessor.TYPE, new HierarchicalRoutingSearchProcessor.Factory(), + TemporalRoutingSearchProcessor.TYPE, + new TemporalRoutingSearchProcessor.Factory(), AclRoutingSearchProcessor.TYPE, new AclRoutingSearchProcessor.Factory() ) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessor.java new file mode 100644 index 0000000000000..6b9e6eaadc280 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessor.java @@ -0,0 +1,423 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.apache.lucene.search.BooleanClause; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.time.DateFormatters; +import org.opensearch.core.common.Strings; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilderVisitor; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.search.pipeline.AbstractProcessor; +import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAccessor; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * A search request processor that automatically adds routing to search requests + * based on temporal range information found in queries. + * + * This processor works in conjunction with the TemporalRoutingProcessor + * (ingest pipeline) to optimize searches by routing them only to shards + * that contain documents within the specified time ranges. + * + * Example: A query with range filter on timestamp field will only + * search shards containing documents for those temporal buckets. + */ +public class TemporalRoutingSearchProcessor extends AbstractProcessor implements SearchRequestProcessor { + + /** The processor type identifier */ + public static final String TYPE = "temporal_routing_search"; + private static final String DEFAULT_FORMAT = "strict_date_optional_time"; + + private final String timestampField; + private final Granularity granularity; + private final DateFormatter dateFormatter; + private final boolean enableAutoDetection; + private final boolean hashBucket; + + /** + * Supported temporal granularities + */ + public enum Granularity { + /** Hour granularity for hourly bucketing */ + HOUR(ChronoUnit.HOURS), + /** Day granularity for daily bucketing */ + DAY(ChronoUnit.DAYS), + /** Week granularity for weekly bucketing (ISO week) */ + WEEK(ChronoUnit.WEEKS), + /** Month granularity for monthly bucketing */ + MONTH(ChronoUnit.MONTHS); + + private final ChronoUnit chronoUnit; + + Granularity(ChronoUnit chronoUnit) { + this.chronoUnit = chronoUnit; + } + + /** + * Gets the ChronoUnit associated with this granularity + * @return the ChronoUnit + */ + public ChronoUnit getChronoUnit() { + return chronoUnit; + } + + /** + * Parses a string value to a Granularity enum + * @param value the string representation of the granularity + * @return the corresponding Granularity enum value + * @throws IllegalArgumentException if the value is not valid + */ + public static Granularity fromString(String value) { + try { + return valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid granularity: " + value + ". Supported values are: hour, day, week, month"); + } + } + } + + TemporalRoutingSearchProcessor( + String tag, + String description, + boolean ignoreFailure, + String timestampField, + Granularity granularity, + String format, + boolean enableAutoDetection, + boolean hashBucket + ) { + super(tag, description, ignoreFailure); + this.timestampField = timestampField; + this.granularity = granularity; + this.dateFormatter = DateFormatter.forPattern(format); + this.enableAutoDetection = enableAutoDetection; + this.hashBucket = hashBucket; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public SearchRequest processRequest(SearchRequest request) throws Exception { + // Skip if routing is already explicitly set + if (request.routing() != null && !request.routing().isEmpty()) { + return request; + } + + Set routingValues = new HashSet<>(); + + // Extract temporal range information from the search request using visitor pattern + if (request.source() != null && request.source().query() != null) { + TemporalRangeExtractionVisitor visitor = new TemporalRangeExtractionVisitor(routingValues); + request.source().query().visit(visitor); + } + + // If we found temporal range information, compute routing and apply it + if (!routingValues.isEmpty()) { + Set computedRouting = new HashSet<>(); + for (String temporalBucket : routingValues) { + if (hashBucket) { + String routingValue = hashTemporalBucket(temporalBucket); + computedRouting.add(routingValue); + } else { + computedRouting.add(temporalBucket); + } + } + + if (!computedRouting.isEmpty()) { + // Join multiple routing values with comma + String routing = String.join(",", computedRouting); + request.routing(routing); + } + } + + return request; + } + + /** + * Visitor implementation for extracting temporal ranges from queries + */ + private class TemporalRangeExtractionVisitor implements QueryBuilderVisitor { + private final Set temporalBuckets; + + TemporalRangeExtractionVisitor(Set temporalBuckets) { + this.temporalBuckets = temporalBuckets; + } + + @Override + public void accept(QueryBuilder qb) { + if (qb instanceof RangeQueryBuilder) { + RangeQueryBuilder rangeQuery = (RangeQueryBuilder) qb; + if (timestampField.equals(rangeQuery.fieldName())) { + extractTemporalBucketsFromRange(rangeQuery); + } + } + // The visitor pattern will automatically handle other query types + } + + @Override + public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) { + // Only process MUST and FILTER clauses as they restrict results + // SHOULD and MUST_NOT don't guarantee document presence on specific shards + if (occur == BooleanClause.Occur.MUST || occur == BooleanClause.Occur.FILTER) { + return this; + } + // Return a no-op visitor for SHOULD and MUST_NOT clauses + return QueryBuilderVisitor.NO_OP_VISITOR; + } + + /** + * Extracts temporal buckets from a range query + */ + private void extractTemporalBucketsFromRange(RangeQueryBuilder rangeQuery) { + try { + Object from = rangeQuery.from(); + Object to = rangeQuery.to(); + + if (from != null && to != null) { + ZonedDateTime fromDate = parseTimestamp(from.toString()); + ZonedDateTime toDate = parseTimestamp(to.toString()); + + // Generate all temporal buckets in the range + generateTemporalBucketsInRange(fromDate, toDate); + } else if (from != null) { + // Only lower bound + ZonedDateTime fromDate = parseTimestamp(from.toString()); + String bucket = createTemporalBucket(fromDate); + temporalBuckets.add(bucket); + } else if (to != null) { + // Only upper bound + ZonedDateTime toDate = parseTimestamp(to.toString()); + String bucket = createTemporalBucket(toDate); + temporalBuckets.add(bucket); + } + } catch (Exception e) { + // If we can't parse the dates, skip temporal routing + // This allows the query to fall back to searching all shards + } + } + + /** + * Generates all temporal buckets within a date range + */ + private void generateTemporalBucketsInRange(ZonedDateTime from, ZonedDateTime to) { + ZonedDateTime current = truncateToGranularity(from); + ZonedDateTime end = truncateToGranularity(to); + + // Add buckets up to a reasonable limit to avoid too many routing values + // TODO: Make maxBuckets configurable via processor configuration + int maxBuckets = 100; // Hard-coded limit for now + int bucketCount = 0; + + while (!current.isAfter(end) && bucketCount < maxBuckets) { + String bucket = createTemporalBucket(current); + temporalBuckets.add(bucket); + + current = incrementByGranularity(current); + bucketCount++; + } + } + } + + /** + * Parses timestamp string to ZonedDateTime + */ + private ZonedDateTime parseTimestamp(String timestamp) { + TemporalAccessor accessor = dateFormatter.parse(timestamp); + return DateFormatters.from(accessor, Locale.ROOT, ZoneOffset.UTC); + } + + /** + * Truncates datetime to the specified granularity + * + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingProcessor.truncateToGranularity() + * in the ingest-common module to ensure consistent temporal bucketing. + */ + private ZonedDateTime truncateToGranularity(ZonedDateTime dateTime) { + switch (granularity) { + case HOUR: + return dateTime.withMinute(0).withSecond(0).withNano(0); + case DAY: + return dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); + case WEEK: + // Truncate to start of week (Monday) + ZonedDateTime dayTruncated = dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); + return dayTruncated.with(java.time.temporal.TemporalAdjusters.previousOrSame(java.time.DayOfWeek.MONDAY)); + case MONTH: + return dateTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + default: + throw new IllegalArgumentException("Unsupported granularity: " + granularity); + } + } + + /** + * Increments datetime by the specified granularity + */ + private ZonedDateTime incrementByGranularity(ZonedDateTime dateTime) { + switch (granularity) { + case HOUR: + return dateTime.plusHours(1); + case DAY: + return dateTime.plusDays(1); + case WEEK: + return dateTime.plusWeeks(1); + case MONTH: + return dateTime.plusMonths(1); + default: + throw new IllegalArgumentException("Unsupported granularity: " + granularity); + } + } + + /** + * Creates a temporal bucket key from a datetime + * + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingProcessor.createTemporalBucketKey() + * in the ingest-common module. Both processors must generate identical bucket keys for the same + * input to ensure documents are routed to the same shards during ingest and search. + * + * TODO: Consider moving this shared logic to a common module when search and ingest pipelines + * can share code more easily. + */ + private String createTemporalBucket(ZonedDateTime dateTime) { + ZonedDateTime truncated = truncateToGranularity(dateTime); + + switch (granularity) { + case HOUR: + return truncated.getYear() + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()) + + "T" + + String.format(Locale.ROOT, "%02d", truncated.getHour()); + case DAY: + return truncated.getYear() + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) + + "-" + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()); + case WEEK: + // Use ISO week format: YYYY-WNN + int weekOfYear = truncated.get(java.time.temporal.WeekFields.ISO.weekOfWeekBasedYear()); + int weekYear = truncated.get(java.time.temporal.WeekFields.ISO.weekBasedYear()); + return weekYear + "-W" + String.format(Locale.ROOT, "%02d", weekOfYear); + case MONTH: + return truncated.getYear() + "-" + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()); + default: + throw new IllegalArgumentException("Unsupported granularity: " + granularity); + } + } + + /** + * Hashes temporal bucket for distribution + */ + private String hashTemporalBucket(String temporalBucket) { + byte[] bucketBytes = temporalBucket.getBytes(StandardCharsets.UTF_8); + long hash = MurmurHash3.hash128(bucketBytes, 0, bucketBytes.length, 0, new MurmurHash3.Hash128()).h1; + return String.valueOf(hash == Long.MIN_VALUE ? 0L : (hash < 0 ? -hash : hash)); + } + + /** + * Factory for creating TemporalRoutingSearchProcessor instances + */ + public static final class Factory implements Processor.Factory { + + /** + * Creates a new Factory instance + */ + public Factory() {} + + /** + * Creates a new TemporalRoutingSearchProcessor instance + * + * @param processorFactories available processor factories + * @param tag processor tag + * @param description processor description + * @param ignoreFailure whether to ignore failures + * @param config processor configuration + * @param pipelineContext pipeline context + * @return new TemporalRoutingSearchProcessor instance + * @throws Exception if configuration is invalid + */ + @Override + public TemporalRoutingSearchProcessor create( + Map> processorFactories, + String tag, + String description, + boolean ignoreFailure, + Map config, + Processor.PipelineContext pipelineContext + ) throws Exception { + + String timestampField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "timestamp_field"); + String granularityStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "granularity"); + String format = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "format"); + boolean enableAutoDetection = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "enable_auto_detection", true); + boolean hashBucket = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "hash_bucket", false); + + // Set default format if not provided + if (format == null) { + format = DEFAULT_FORMAT; + } + + // Validation + if (Strings.isNullOrEmpty(timestampField)) { + throw newConfigurationException(TYPE, tag, "timestamp_field", "cannot be null or empty"); + } + + if (Strings.isNullOrEmpty(granularityStr)) { + throw newConfigurationException(TYPE, tag, "granularity", "cannot be null or empty"); + } + + Granularity granularity; + try { + granularity = Granularity.fromString(granularityStr); + } catch (IllegalArgumentException e) { + throw newConfigurationException(TYPE, tag, "granularity", e.getMessage()); + } + + // Validate date format + try { + DateFormatter.forPattern(format); + } catch (Exception e) { + throw newConfigurationException(TYPE, tag, "format", "invalid date format: " + e.getMessage()); + } + + return new TemporalRoutingSearchProcessor( + tag, + description, + ignoreFailure, + timestampField, + granularity, + format, + enableAutoDetection, + hashBucket + ); + } + } +} diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java index 66b6d65802333..b170e46d0cc36 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java @@ -82,7 +82,14 @@ public void testAllowlistNotSpecified() throws IOException { final Settings settings = Settings.EMPTY; try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) { assertEquals( - Set.of("oversample", "filter_query", "script", "hierarchical_routing_search", "acl_routing_search"), + Set.of( + "oversample", + "filter_query", + "script", + "hierarchical_routing_search", + "temporal_routing_search", + "acl_routing_search" + ), plugin.getRequestProcessors(createParameters(settings)).keySet() ); assertEquals( diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessorTests.java new file mode 100644 index 0000000000000..4d0e3d35b4e41 --- /dev/null +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TemporalRoutingSearchProcessorTests.java @@ -0,0 +1,335 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.AbstractBuilderTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class TemporalRoutingSearchProcessorTests extends AbstractBuilderTestCase { + + public void testRangeQueryRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("@timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("@timestamp").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + assertThat(transformedRequest.routing(), equalTo("2023-12-15")); + } + + public void testMultiDayRangeQueryRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z").to("2023-12-17T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + String routing = transformedRequest.routing(); + assertTrue("Should contain multiple routing values", routing.contains(",")); + assertTrue("Should contain 2023-12-15", routing.contains("2023-12-15")); + assertTrue("Should contain 2023-12-16", routing.contains("2023-12-16")); + assertTrue("Should contain 2023-12-17", routing.contains("2023-12-17")); + } + + public void testHourlyGranularityRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "hour", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15T14:30:00Z").to("2023-12-15T16:45:00Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + String routing = transformedRequest.routing(); + assertTrue("Should contain hourly buckets", routing.contains("2023-12-15T14")); + assertTrue("Should contain hourly buckets", routing.contains("2023-12-15T15")); + assertTrue("Should contain hourly buckets", routing.contains("2023-12-15T16")); + } + + public void testWeeklyGranularityRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "week", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z") // Week 50 + .to("2023-12-22T23:59:59Z"); // Week 51 + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + String routing = transformedRequest.routing(); + assertTrue("Should contain weekly buckets", routing.contains("2023-W50")); + assertTrue("Should contain weekly buckets", routing.contains("2023-W51")); + } + + public void testMonthlyGranularityRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "month", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-11-15T00:00:00Z").to("2024-01-15T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + String routing = transformedRequest.routing(); + assertTrue("Should contain monthly buckets", routing.contains("2023-11")); + assertTrue("Should contain monthly buckets", routing.contains("2023-12")); + assertTrue("Should contain monthly buckets", routing.contains("2024-01")); + } + + public void testHashBucketRouting() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, true); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + String routing = transformedRequest.routing(); + // Should be a numeric hash, not the date string + assertThat(routing.matches("\\d+"), equalTo(true)); + } + + public void testBoolQueryWithRangeFilter() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder rangeFilter = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z"); + QueryBuilder textQuery = new TermQueryBuilder("content", "log message"); + QueryBuilder boolQuery = new BoolQueryBuilder().must(textQuery).filter(rangeFilter); + + SearchSourceBuilder source = new SearchSourceBuilder().query(boolQuery); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + assertThat(transformedRequest.routing(), equalTo("2023-12-15")); + } + + public void testShouldClauseIgnored() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + // Query with range only in should clause + QueryBuilder query = new BoolQueryBuilder().should( + new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z") + ); + + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + // Should not add routing since range is only in should clause + assertNull("Should not add routing for should clauses", transformedRequest.routing()); + } + + public void testNonTimestampFieldIgnored() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("other_field").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + // Should not add routing for different field + assertThat(transformedRequest.routing(), nullValue()); + } + + public void testExistingRoutingPreserved() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z").to("2023-12-15T23:59:59Z"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source).routing("existing_routing"); + + SearchRequest transformedRequest = processor.processRequest(request); + + // Existing routing should be preserved + assertThat(transformedRequest.routing(), equalTo("existing_routing")); + } + + public void testEmptySearchRequest() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + SearchRequest request = new SearchRequest(); + SearchRequest transformedRequest = processor.processRequest(request); + + // No routing should be added for empty request + assertThat(transformedRequest.routing(), nullValue()); + } + + public void testInvalidDateFormatIgnored() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("invalid-date").to("another-invalid-date"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + // Should not throw exception, should fallback to no routing + SearchRequest transformedRequest = processor.processRequest(request); + assertThat(transformedRequest.routing(), nullValue()); + } + + public void testCustomDateFormat() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "yyyy-MM-dd HH:mm:ss", true, false); + + QueryBuilder query = new RangeQueryBuilder("timestamp").from("2023-12-15 00:00:00").to("2023-12-15 23:59:59"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(source); + + SearchRequest transformedRequest = processor.processRequest(request); + + assertThat(transformedRequest.routing(), notNullValue()); + assertThat(transformedRequest.routing(), equalTo("2023-12-15")); + } + + public void testOpenEndedRanges() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, false); + + // Test range with only lower bound + QueryBuilder queryFrom = new RangeQueryBuilder("timestamp").from("2023-12-15T00:00:00Z"); + SearchSourceBuilder sourceFrom = new SearchSourceBuilder().query(queryFrom); + SearchRequest requestFrom = new SearchRequest().source(sourceFrom); + + SearchRequest transformedRequestFrom = processor.processRequest(requestFrom); + assertThat(transformedRequestFrom.routing(), equalTo("2023-12-15")); + + // Test range with only upper bound + QueryBuilder queryTo = new RangeQueryBuilder("timestamp").to("2023-12-15T23:59:59Z"); + SearchSourceBuilder sourceTo = new SearchSourceBuilder().query(queryTo); + SearchRequest requestTo = new SearchRequest().source(sourceTo); + + SearchRequest transformedRequestTo = processor.processRequest(requestTo); + assertThat(transformedRequestTo.routing(), equalTo("2023-12-15")); + } + + public void testConsistentRoutingForSameTimeWindow() throws Exception { + TemporalRoutingSearchProcessor processor = createProcessor("timestamp", "day", "strict_date_optional_time", true, true); + + // Different ranges within same day should produce same routing + QueryBuilder query1 = new RangeQueryBuilder("timestamp").from("2023-12-15T08:00:00Z").to("2023-12-15T12:00:00Z"); + + QueryBuilder query2 = new RangeQueryBuilder("timestamp").from("2023-12-15T14:00:00Z").to("2023-12-15T18:00:00Z"); + + SearchSourceBuilder source1 = new SearchSourceBuilder().query(query1); + SearchSourceBuilder source2 = new SearchSourceBuilder().query(query2); + SearchRequest request1 = new SearchRequest().source(source1); + SearchRequest request2 = new SearchRequest().source(source2); + + SearchRequest result1 = processor.processRequest(request1); + SearchRequest result2 = processor.processRequest(request2); + + // Should have same routing since they're on the same day + assertThat(result1.routing(), equalTo(result2.routing())); + } + + public void testFactory() throws Exception { + TemporalRoutingSearchProcessor.Factory factory = new TemporalRoutingSearchProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("timestamp_field", "@timestamp"); + config.put("granularity", "day"); + config.put("format", "strict_date_optional_time"); + config.put("enable_auto_detection", true); + config.put("hash_bucket", false); + + TemporalRoutingSearchProcessor processor = factory.create(Collections.emptyMap(), "test", "test processor", false, config, null); + + assertThat(processor.getType(), equalTo(TemporalRoutingSearchProcessor.TYPE)); + } + + public void testFactoryValidation() throws Exception { + TemporalRoutingSearchProcessor.Factory factory = new TemporalRoutingSearchProcessor.Factory(); + + // Test missing timestamp_field + Map config1 = new HashMap<>(); + config1.put("granularity", "day"); + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), "test", null, false, config1, null) + ); + assertThat(exception.getMessage(), containsString("timestamp_field")); + + // Test missing granularity + Map config2 = new HashMap<>(); + config2.put("timestamp_field", "@timestamp"); + exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), "test", null, false, config2, null) + ); + assertThat(exception.getMessage(), containsString("granularity")); + + // Test invalid granularity + Map config3 = new HashMap<>(); + config3.put("timestamp_field", "@timestamp"); + config3.put("granularity", "invalid"); + exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), "test", null, false, config3, null) + ); + assertThat(exception.getMessage(), containsString("Invalid granularity")); + + // Test invalid format + Map config4 = new HashMap<>(); + config4.put("timestamp_field", "@timestamp"); + config4.put("granularity", "day"); + config4.put("format", "invalid_format"); + exception = expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), "test", null, false, config4, null) + ); + assertThat(exception.getMessage(), containsString("invalid date format")); + } + + // Helper method to create processor + private TemporalRoutingSearchProcessor createProcessor( + String timestampField, + String granularity, + String format, + boolean enableAutoDetection, + boolean hashBucket + ) { + return new TemporalRoutingSearchProcessor( + "test", + "test processor", + false, + timestampField, + TemporalRoutingSearchProcessor.Granularity.fromString(granularity), + format, + enableAutoDetection, + hashBucket + ); + } +}