Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -659,24 +659,32 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
)

createPolicy(tier1Policy, tier1PolicyID)
createIndex(tier1IndexName, tier1PolicyID, mapping = sourceIndexMappingString)
// Create index WITHOUT policy first to avoid race condition where ISM starts
// rollup before data is indexed
createIndex(tier1IndexName, policyID = null, mapping = sourceIndexMappingString)

// Insert test data with varying timestamps and categories
val baseTime = Instant.parse("2024-01-01T00:00:00Z")
val bulkRequest = StringBuilder()
for (i in 1..100) {
val timestamp = "2024-01-01T${String.format("%02d", i % 24)}:00:00Z"
val timestamp = baseTime.plus((i % 24).toLong(), ChronoUnit.HOURS)
val category = if (i % 2 == 0) "electronics" else "books"
val userId = "user_${(i - 1) / 10 + 1}" // Creates user_1 to user_10
val value = i * 10.0
bulkRequest.append("""{"index":{"_index":"$tier1IndexName"}}""").append("\n")
bulkRequest.append("""{"timestamp":"$timestamp","category":"$category","user_id":"$userId","value":$value}""").append("\n")
}
client().makeRequest(
val bulkResponse = client().makeRequest(
"POST",
"/_bulk?refresh=true",
emptyMap(),
StringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON),
)
val bulkResponseMap = bulkResponse.asMap()
assertFalse("Bulk request should not have errors: $bulkResponseMap", bulkResponseMap["errors"] as Boolean)

// Now add policy after data is indexed to ensure rollup sees all data
addPolicyToIndex(tier1IndexName, tier1PolicyID)

// Execute Tier-1 rollup
assertIndexRolledUp(tier1IndexName, tier1PolicyID, tier1Rollup)
Expand Down
Loading