@@ -338,6 +338,153 @@ record = records.get(3);
338338 assertEquals (0 , records .size ());
339339 }
340340
341+ @ Test
342+ public void testGapsForControlRecordsInAcquiredRange () {
343+ int numRecords = 10 ;
344+ // Create records with transaction markers (control records)
345+ Records rawRecords = newTransactionalRecords (numRecords );
346+
347+ // Acquire all records including the control record (offset 10 is the commit marker)
348+ ShareFetchResponseData .PartitionData partitionData = new ShareFetchResponseData .PartitionData ()
349+ .setRecords (rawRecords )
350+ .setAcquiredRecords (acquiredRecords (0L , numRecords + 1 ));
351+
352+ ShareCompletedFetch completedFetch = newShareCompletedFetch (partitionData );
353+ try (final Deserializers <String , String > deserializers = newStringDeserializers ()) {
354+ ShareInFlightBatch <String , String > batch = completedFetch .fetchRecords (deserializers , 15 , true );
355+ List <ConsumerRecord <String , String >> records = batch .getInFlightRecords ();
356+
357+ // Should get 10 actual records (control records are filtered out)
358+ assertEquals (10 , records .size ());
359+
360+ // Should have 1 gap for the control record at offset 10
361+ Acknowledgements acknowledgements = batch .getAcknowledgements ();
362+ assertEquals (1 , acknowledgements .size ());
363+ assertNull (acknowledgements .get (10L ), "Offset 10 (control record) should be a GAP (null)" );
364+ }
365+ }
366+
367+ @ Test
368+ public void testMixedRecordsAndGaps () {
369+ int startingOffset = 0 ;
370+
371+ // Acquire records 0-4 (exist), 10-14 (don't exist = gaps)
372+ List <ShareFetchResponseData .AcquiredRecords > acquiredRecords = new ArrayList <>();
373+ acquiredRecords .add (acquiredRecords (0L , 5 ).get (0 ));
374+ acquiredRecords .add (acquiredRecords (10L , 5 ).get (0 ));
375+
376+ ShareFetchResponseData .PartitionData partitionData = new ShareFetchResponseData .PartitionData ()
377+ .setRecords (newRecords (startingOffset , 10 ))
378+ .setAcquiredRecords (acquiredRecords ); // Acquire only records 0-4 and 10-14
379+
380+ Deserializers <String , String > deserializers = newStringDeserializers ();
381+
382+ ShareCompletedFetch completedFetch = newShareCompletedFetch (partitionData );
383+
384+ ShareInFlightBatch <String , String > batch = completedFetch .fetchRecords (deserializers , 20 , true );
385+ List <ConsumerRecord <String , String >> records = batch .getInFlightRecords ();
386+
387+ // Should get 5 actual records (0-4)
388+ assertEquals (5 , records .size ());
389+ for (int i = 0 ; i < 5 ; i ++) {
390+ assertEquals (i , records .get (i ).offset ());
391+ }
392+
393+ // Should have 5 gaps (10-14) in acknowledgements
394+ Acknowledgements acknowledgements = batch .getAcknowledgements ();
395+ assertEquals (5 , acknowledgements .size ());
396+
397+ // Verify GAP acknowledgements for offsets 10-14
398+ for (long offset = 10L ; offset <= 14L ; offset ++) {
399+ assertNull (acknowledgements .get (offset ), "Offset " + offset + " should be a GAP (null)" );
400+ }
401+ }
402+
403+ @ Test
404+ public void testAcknowledgementsIncludeOnlyGaps () {
405+ int startingOffset = 0 ;
406+ int numRecords = 10 ; // Records for 0-9
407+
408+ // Acquire only non-existent records 15-19 (all should be gaps)
409+ ShareFetchResponseData .PartitionData partitionData = new ShareFetchResponseData .PartitionData ()
410+ .setRecords (newRecords (startingOffset , numRecords )) // Records 0-9
411+ .setAcquiredRecords (acquiredRecords (15L , 5 )); // Acquire 15-19 (don't exist)
412+
413+ Deserializers <String , String > deserializers = newStringDeserializers ();
414+
415+ ShareCompletedFetch completedFetch = newShareCompletedFetch (partitionData );
416+
417+ ShareInFlightBatch <String , String > batch = completedFetch .fetchRecords (deserializers , 20 , true );
418+ List <ConsumerRecord <String , String >> records = batch .getInFlightRecords ();
419+
420+ // Should get no actual records
421+ assertEquals (0 , records .size ());
422+
423+ // Should have 5 gaps (15-19) in acknowledgements
424+ Acknowledgements acknowledgements = batch .getAcknowledgements ();
425+ assertEquals (5 , acknowledgements .size ());
426+
427+ // Verify all are GAP acknowledgements
428+ for (long offset = 15L ; offset <= 19L ; offset ++) {
429+ assertNull (acknowledgements .get (offset ), "Offset " + offset + " should be a GAP (null)" );
430+ }
431+ }
432+
433+ @ Test
434+ public void testGapsWithControlRecordsAtBeginningAndEnd () {
435+ // Create transactional records: control record, data records 1-5, control record at 6
436+ Time time = new MockTime ();
437+ ByteBuffer buffer = ByteBuffer .allocate (2048 );
438+
439+ // Write first control record (commit marker at offset 0)
440+ writeTransactionMarker (buffer , 0 , time );
441+
442+ // Write data records 1-5
443+ try (MemoryRecordsBuilder builder = MemoryRecords .builder (buffer ,
444+ RecordBatch .CURRENT_MAGIC_VALUE ,
445+ Compression .NONE ,
446+ TimestampType .CREATE_TIME ,
447+ 1 ,
448+ time .milliseconds (),
449+ PRODUCER_ID ,
450+ PRODUCER_EPOCH ,
451+ 0 ,
452+ true ,
453+ RecordBatch .NO_PARTITION_LEADER_EPOCH )) {
454+ for (int i = 0 ; i < 5 ; i ++)
455+ builder .append (new SimpleRecord (time .milliseconds (), "key" .getBytes (), "value" .getBytes ()));
456+ builder .build ();
457+ }
458+
459+ // Write second control record (commit marker at offset 6)
460+ writeTransactionMarker (buffer , 6 , time );
461+
462+ buffer .flip ();
463+ Records records = MemoryRecords .readableRecords (buffer );
464+
465+ // Acquire all offsets 0-6 (includes both control records and data records)
466+ ShareFetchResponseData .PartitionData partitionData = new ShareFetchResponseData .PartitionData ()
467+ .setRecords (records )
468+ .setAcquiredRecords (acquiredRecords (0L , 7 ));
469+
470+ ShareCompletedFetch completedFetch = newShareCompletedFetch (partitionData );
471+ try (final Deserializers <String , String > deserializers = newStringDeserializers ()) {
472+ ShareInFlightBatch <String , String > batch = completedFetch .fetchRecords (deserializers , 10 , true );
473+ List <ConsumerRecord <String , String >> fetchedRecords = batch .getInFlightRecords ();
474+
475+ // Should get 5 data records (1-5)
476+ assertEquals (5 , fetchedRecords .size ());
477+ assertEquals (1L , fetchedRecords .get (0 ).offset ());
478+ assertEquals (5L , fetchedRecords .get (4 ).offset ());
479+
480+ // Should have 2 gaps for the control records (offsets 0 and 6)
481+ Acknowledgements acknowledgements = batch .getAcknowledgements ();
482+ assertEquals (2 , acknowledgements .size ());
483+ assertNull (acknowledgements .get (0L ), "Offset 0 (control record) should be a GAP (null)" );
484+ assertNull (acknowledgements .get (6L ), "Offset 6 (control record) should be a GAP (null)" );
485+ }
486+ }
487+
341488 @ Test
342489 public void testAcquireOddRecords () {
343490 int startingOffset = 0 ;
0 commit comments