@@ -2520,6 +2520,109 @@ public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords
25202520 assertEquals(25, sharePartition.nextFetchOffset());
25212521 }
25222522
2523+ @Test
2524+ public void testAcquireBatchPriorToStartOffset() {
2525+ Persister persister = Mockito.mock(Persister.class);
2526+ ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
2527+ Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
2528+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
2529+ PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
2530+ List.of(
2531+ new PersisterStateBatch(5L, 99L, RecordState.AVAILABLE.id, (short) 1)
2532+ ))))));
2533+ Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
2534+
2535+ SharePartition sharePartition = SharePartitionBuilder.builder()
2536+ .withPersister(persister)
2537+ .withSharePartitionMetrics(sharePartitionMetrics)
2538+ .build();
2539+ sharePartition.maybeInitialize();
2540+
2541+ // Validate the cached state after initialization.
2542+ assertEquals(5, sharePartition.nextFetchOffset());
2543+ assertEquals(5, sharePartition.startOffset());
2544+ assertEquals(1, sharePartition.cachedState().size());
2545+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
2546+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
2547+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
2548+
2549+ // Acquire offsets prior to start offset. Should not acquire any records.
2550+ fetchAcquiredRecords(sharePartition.acquire(
2551+ MEMBER_ID,
2552+ ShareAcquireMode.RECORD_LIMIT,
2553+ 10,
2554+ 10,
2555+ DEFAULT_FETCH_OFFSET,
2556+ fetchPartitionData(memoryRecords(0, 5)),
2557+ FETCH_ISOLATION_HWM),
2558+ 0);
2559+
2560+ // Validate the cached state remains unchanged.
2561+ assertEquals(5, sharePartition.nextFetchOffset());
2562+ assertEquals(5, sharePartition.startOffset());
2563+ assertEquals(1, sharePartition.cachedState().size());
2564+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
2565+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
2566+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
2567+ }
2568+
2569+ /**
2570+ * Test validates the scenario where the partition has been re-initialized and the first fetch batch
2571+ * has start offset in middle and no overlap with the batch returned from the persister during initialization.
2572+ * In this case, the acquire logic should respect the start offset and not allow acquiring records
2573+ * prior to the start offset.
2574+ */
2575+ @Test
2576+ public void testAcquireBatchWithMovedStartOffsetAndNoOverlapWithCachedBatch() {
2577+ Persister persister = Mockito.mock(Persister.class);
2578+ ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
2579+ Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
2580+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
2581+ PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
2582+ List.of(
2583+ new PersisterStateBatch(15L, 20L, RecordState.ARCHIVED.id, (short) 1)
2584+ ))))));
2585+ Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
2586+
2587+ SharePartition sharePartition = SharePartitionBuilder.builder()
2588+ .withPersister(persister)
2589+ .withSharePartitionMetrics(sharePartitionMetrics)
2590+ .build();
2591+ sharePartition.maybeInitialize();
2592+
2593+ // Validate the cached state after initialization.
2594+ assertEquals(5, sharePartition.nextFetchOffset());
2595+ assertEquals(5, sharePartition.startOffset());
2596+ assertEquals(1, sharePartition.cachedState().size());
2597+ assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
2598+ assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
2599+ assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(15L).batchState());
2600+ // As there is a gap between 5-14 offsets, gap window should be created.
2601+ assertNotNull(sharePartition.persisterReadResultGapWindow());
2602+ assertEquals(5, sharePartition.persisterReadResultGapWindow().gapStartOffset());
2603+
2604+ // Acquire offsets starting prior to start offset and going beyond it. Only offsets from 5-9 should
2605+ // be acquired.
2606+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
2607+ MEMBER_ID,
2608+ ShareAcquireMode.RECORD_LIMIT,
2609+ 10,
2610+ 10,
2611+ 5L,
2612+ fetchPartitionData(memoryRecords(0, 10)),
2613+ FETCH_ISOLATION_HWM),
2614+ 5);
2615+
2616+ assertArrayEquals(expectedAcquiredRecord(5, 9, 1).toArray(), acquiredRecordsList.toArray());
2617+ assertEquals(2, sharePartition.cachedState().size());
2618+ assertEquals(10, sharePartition.nextFetchOffset());
2619+ assertEquals(5L, sharePartition.startOffset());
2620+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
2621+ assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
2622+ assertNotNull(sharePartition.persisterReadResultGapWindow());
2623+ assertEquals(10, sharePartition.persisterReadResultGapWindow().gapStartOffset());
2624+ }
2625+
25232626 @Test
25242627 public void testNextFetchOffsetInitialState() {
25252628 SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -10635,6 +10738,113 @@ public void testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode()
1063510738 assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().max());
1063610739 }
1063710740
10741+ /**
10742+ * Test validates the scenario where the partition has been re-initialized with a moved start offset.
10743+ * In this case, the acquire logic should respect the new start offset and not allow acquiring
10744+ * records before the new start offset. For the test, simulate a scenario where the log batch is 0-99
10745+ * offsets, but post re-initialization, the start offset is moved to 5 hence share partition should
10746+ * have 5-99 offsets in AVAILABLE state in the cache. Post acquire and acknowledge of 5-14 offsets,
10747+ * the start offset is moved to 15. Next acquire on the same log batch should only allow acquiring
10748+ * offsets from 15 offset and should not create any other entries in the cache.
10749+ */
10750+ @Test
10751+ public void testAcquireBatchInRecordLimitModeWithMovedStartOffset() {
10752+ Persister persister = Mockito.mock(Persister.class);
10753+ ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
10754+ Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
10755+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
10756+ PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
10757+ List.of(
10758+ new PersisterStateBatch(5L, 99L, RecordState.AVAILABLE.id, (short) 1)
10759+ ))))));
10760+ Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
10761+
10762+ SharePartition sharePartition = SharePartitionBuilder.builder()
10763+ .withPersister(persister)
10764+ .withSharePartitionMetrics(sharePartitionMetrics)
10765+ .build();
10766+ sharePartition.maybeInitialize();
10767+
10768+ // Validate the cached state after initialization.
10769+ assertEquals(5, sharePartition.nextFetchOffset());
10770+ assertEquals(5, sharePartition.startOffset());
10771+ assertEquals(1, sharePartition.cachedState().size());
10772+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
10773+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
10774+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
10775+
10776+ // Send the batch of 0-99 offsets for acquire, only 5-14 should be acquired. Though the start
10777+ // offset for the share partition is 5 and the log batch base offset is 0, but acquire should
10778+ // respect the share partition start offset.
10779+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
10780+ MEMBER_ID,
10781+ ShareAcquireMode.RECORD_LIMIT,
10782+ 10,
10783+ 10,
10784+ 5L,
10785+ fetchPartitionData(memoryRecords(0, 100)),
10786+ FETCH_ISOLATION_HWM),
10787+ 10);
10788+
10789+ assertArrayEquals(expectedAcquiredRecords(5, 14, 2).toArray(), acquiredRecordsList.toArray());
10790+
10791+ assertEquals(15, sharePartition.nextFetchOffset());
10792+ assertEquals(5L, sharePartition.startOffset());
10793+ assertEquals(1, sharePartition.cachedState().size());
10794+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
10795+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
10796+
10797+ // Offset state should be maintained since partial offsets in the batch are acquired.
10798+ assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(5L).batchState());
10799+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(5L).state());
10800+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(14L).state());
10801+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).offsetState().get(15L).state());
10802+
10803+ // Acknowledge the acquired offsets 5-14 so the start offset moves to 15.
10804+ WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
10805+ Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
10806+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
10807+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
10808+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
10809+ sharePartition.acknowledge(MEMBER_ID, List.of(
10810+ new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.ACCEPT.id))
10811+ ));
10812+
10813+ assertEquals(15, sharePartition.nextFetchOffset());
10814+ assertEquals(15L, sharePartition.startOffset());
10815+ assertEquals(1, sharePartition.cachedState().size());
10816+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
10817+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
10818+
10819+ assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(5L).state());
10820+ assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(14L).state());
10821+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).offsetState().get(15L).state());
10822+
10823+ // Re-acquire on the same log batch of 0-99 offsets, only 15-24 should be acquired.
10824+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
10825+ MEMBER_ID,
10826+ ShareAcquireMode.RECORD_LIMIT,
10827+ 10,
10828+ 10,
10829+ 15,
10830+ fetchPartitionData(memoryRecords(0, 100)),
10831+ FETCH_ISOLATION_HWM),
10832+ 10);
10833+
10834+ assertArrayEquals(expectedAcquiredRecords(15, 24, 2).toArray(), acquiredRecordsList.toArray());
10835+ assertEquals(25, sharePartition.nextFetchOffset());
10836+ assertEquals(15L, sharePartition.startOffset());
10837+ assertEquals(1, sharePartition.cachedState().size());
10838+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
10839+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
10840+
10841+ assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(5L).state());
10842+ assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).offsetState().get(14L).state());
10843+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(15L).state());
10844+ assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).offsetState().get(24L).state());
10845+ assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).offsetState().get(25L).state());
10846+ }
10847+
1063810848 @Test
1063910849 public void testAcknowledgeInRecordLimitMode() {
1064010850 Persister persister = Mockito.mock(Persister.class);
0 commit comments