6565import java .util .Collections ;
6666import java .util .List ;
6767import java .util .Map ;
68+ import java .util .concurrent .ExecutorService ;
69+ import java .util .concurrent .Executors ;
70+ import java .util .concurrent .TimeUnit ;
71+ import java .util .concurrent .atomic .AtomicBoolean ;
72+ import java .util .concurrent .atomic .AtomicInteger ;
6873import java .util .stream .Collectors ;
6974
7075import static org .apache .hudi .common .model .WriteOperationType .INSERT ;
7176import static org .apache .hudi .common .model .WriteOperationType .UPSERT ;
77+
78+ import static java .util .Arrays .asList ;
79+ import static java .util .Collections .emptyList ;
7280import static org .junit .jupiter .api .Assertions .assertDoesNotThrow ;
7381import static org .junit .jupiter .api .Assertions .assertEquals ;
7482import static org .junit .jupiter .api .Assertions .assertFalse ;
@@ -92,6 +100,53 @@ public void testTableOperations() throws Exception {
92100 verifyBaseMetadataTable ();
93101 }
94102
103+ @ Test
104+ public void testMultiReaderForHoodieBackedTableMetadata () throws Exception {
105+ final int taskNumber = 100 ;
106+ HoodieTableType tableType = HoodieTableType .COPY_ON_WRITE ;
107+ init (tableType );
108+ testTable .doWriteOperation ("000001" , INSERT , emptyList (), asList ("p1" ), 1 );
109+ HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata (context , writeConfig .getMetadataConfig (), writeConfig .getBasePath (), writeConfig .getSpillableMapBasePath (), false );
110+ assertTrue (tableMetadata .enabled ());
111+ List <String > metadataPartitions = tableMetadata .getAllPartitionPaths ();
112+ String partition = metadataPartitions .get (0 );
113+ String finalPartition = basePath + "/" + partition ;
114+ ArrayList <String > duplicatedPartitions = new ArrayList <>(taskNumber );
115+ for (int i = 0 ; i < taskNumber ; i ++) {
116+ duplicatedPartitions .add (finalPartition );
117+ }
118+ ExecutorService executors = Executors .newFixedThreadPool (taskNumber );
119+ AtomicBoolean flag = new AtomicBoolean (false );
120+ AtomicInteger count = new AtomicInteger (0 );
121+ AtomicInteger filesNumber = new AtomicInteger (0 );
122+
123+ for (String part : duplicatedPartitions ) {
124+ executors .submit (new Runnable () {
125+ @ Override
126+ public void run () {
127+ try {
128+ count .incrementAndGet ();
129+ while (true ) {
130+ if (count .get () == taskNumber ) {
131+ break ;
132+ }
133+ }
134+ FileStatus [] files = tableMetadata .getAllFilesInPartition (new Path (part ));
135+ filesNumber .addAndGet (files .length );
136+ LOG .warn (Arrays .toString (files ) + " : " + files .length );
137+ assertEquals (1 , files .length );
138+ } catch (Exception e ) {
139+ flag .set (true );
140+ }
141+ }
142+ });
143+ }
144+ executors .shutdown ();
145+ executors .awaitTermination (24 , TimeUnit .HOURS );
146+ assertFalse (flag .get ());
147+ assertEquals (filesNumber .get (), taskNumber );
148+ }
149+
95150 private void doWriteInsertAndUpsert (HoodieTestTable testTable ) throws Exception {
96151 doWriteInsertAndUpsert (testTable , "0000001" , "0000002" , false );
97152 }
0 commit comments