1616 * limitations under the License.
1717 */
1818
19- package org .apache .hudi .io . storage ;
19+ package org .apache .hudi .io ;
2020
2121import org .apache .hudi .common .engine .TaskContextSupplier ;
2222import org .apache .hudi .common .model .HoodieBaseFile ;
23+ import org .apache .hudi .common .model .HoodieRecord ;
24+ import org .apache .hudi .common .model .HoodieRecordLocation ;
2325import org .apache .hudi .common .model .HoodieRecordPayload ;
2426import org .apache .hudi .common .util .Option ;
2527import org .apache .hudi .config .HoodieWriteConfig ;
2628import org .apache .hudi .exception .HoodieUpsertException ;
27- import org .apache .hudi .io .HoodieMergeHandle ;
2829import org .apache .hudi .keygen .BaseKeyGenerator ;
2930import org .apache .hudi .keygen .KeyGenUtils ;
3031import org .apache .hudi .table .HoodieTable ;
3435import org .apache .log4j .Logger ;
3536
3637import java .io .IOException ;
38+ import java .util .Collections ;
3739import java .util .Iterator ;
3840import java .util .Map ;
3941
4446 * Simplified Logic:
4547 * For every existing record
4648 * Write the record as is
47- * For all incoming records, write to file as is.
49+ * For all incoming records, write to file as is, without de-duplicating based on the record key .
4850 *
4951 * Illustration with simple data.
5052 * Incoming data:
51- * rec1_2, rec4_2, rec5_1, rec6_1
53+ * rec1_2, rec1_3, rec4_2, rec5_1, rec6_1
5254 * Existing data:
5355 * rec1_1, rec2_1, rec3_1, rec4_1
5456 *
5557 * For every existing record, write to storage as is.
5658 * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
5759 * Write all records from incoming set to storage
58- * => rec1_2, rec4_2, rec5_1 and rec6_1
60+ * => rec1_2, rec1_3, rec4_2, rec5_1 and rec6_1
5961 *
6062 * Final snapshot in storage
61- * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
63+ * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec1_3, rec4_2, rec5_1, rec6_1
6264 *
6365 * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
6466 * happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
6567 */
6668public class HoodieConcatHandle <T extends HoodieRecordPayload , I , K , O > extends HoodieMergeHandle <T , I , K , O > {
6769
6870 private static final Logger LOG = LogManager .getLogger (HoodieConcatHandle .class );
71+ // a representation of incoming records that tolerates duplicate keys
72+ private final Iterator <HoodieRecord <T >> recordItr ;
6973
70- public HoodieConcatHandle (HoodieWriteConfig config , String instantTime , HoodieTable hoodieTable , Iterator recordItr ,
71- String partitionPath , String fileId , TaskContextSupplier taskContextSupplier , Option <BaseKeyGenerator > keyGeneratorOpt ) {
72- super (config , instantTime , hoodieTable , recordItr , partitionPath , fileId , taskContextSupplier , keyGeneratorOpt );
74+ public HoodieConcatHandle (HoodieWriteConfig config , String instantTime , HoodieTable <T , I , K , O > hoodieTable ,
75+ Iterator <HoodieRecord <T >> recordItr , String partitionPath , String fileId ,
76+ TaskContextSupplier taskContextSupplier , Option <BaseKeyGenerator > keyGeneratorOpt ) {
77+ super (config , instantTime , hoodieTable , Collections .emptyIterator (), partitionPath , fileId , taskContextSupplier , keyGeneratorOpt );
78+ this .recordItr = recordItr ;
7379 }
7480
75- public HoodieConcatHandle (HoodieWriteConfig config , String instantTime , HoodieTable hoodieTable , Map keyToNewRecords , String partitionPath , String fileId ,
76- HoodieBaseFile dataFileToBeMerged , TaskContextSupplier taskContextSupplier ) {
77- super (config , instantTime , hoodieTable , keyToNewRecords , partitionPath , fileId , dataFileToBeMerged , taskContextSupplier ,
81+ public HoodieConcatHandle (HoodieWriteConfig config , String instantTime , HoodieTable hoodieTable ,
82+ Map <String , HoodieRecord <T >> keyToNewRecords , String partitionPath , String fileId ,
83+ HoodieBaseFile dataFileToBeMerged , TaskContextSupplier taskContextSupplier ) {
84+ super (config , instantTime , hoodieTable , Collections .emptyMap (), partitionPath , fileId , dataFileToBeMerged , taskContextSupplier ,
7885 Option .empty ());
86+ this .recordItr = keyToNewRecords .values ().iterator ();
7987 }
8088
8189 /**
@@ -94,4 +102,17 @@ public void write(GenericRecord oldRecord) {
94102 }
95103 recordsWritten ++;
96104 }
105+
106+ @ Override
107+ protected void writeIncomingRecords () throws IOException {
108+ while (recordItr .hasNext ()) {
109+ HoodieRecord <T > record = recordItr .next ();
110+ if (needsUpdateLocation ()) {
111+ record .unseal ();
112+ record .setNewLocation (new HoodieRecordLocation (instantTime , fileId ));
113+ record .seal ();
114+ }
115+ writeInsertRecord (record );
116+ }
117+ }
97118}
0 commit comments