Skip to content

[SUPPORT] Hive Metastore Lock Provider throws IllegalArgumentException ALREADY_ACQUIRED #5702

@ganczarek

Description

@ganczarek

Describe the problem you faced

Hive-based lock provider throws IllegalArgumentException on the second try to acquire a lock.

I think it's because tryLock returns false when this.lock object was created, but not in LockState.ACQUIRED state:

return this.lock != null && this.lock.getState() == LockState.ACQUIRED;

this.lock is never reset between retries, so when Lock manager calls tryLock for the second time the validation fails:

ValidationUtils.checkArgument(this.lock == null, ALREADY_ACQUIRED.name());

To Reproduce

A second writer was writing to the table at that time, so it must have acquired a lock.

Expected behavior

Hudi should retry to acquire a lock.

Environment Description

  • Hudi version : 0.10.1
  • Spark version : 3.1.1
  • Hive version : 3.1.2
  • Hadoop version : 3.2.1
  • Storage (HDFS/S3/GCS..) : S3
  • Running on Docker? (yes/no) : no

Additional context

Lock config:

HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key                       -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,  
HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key            -> HoodieFailedWritesCleaningPolicy.LAZY.name,  
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key                      -> classOf[HiveMetastoreBasedLockProvider].getName,  
HoodieLockConfig.HIVE_DATABASE_NAME.key                            -> "test_db"",  
HoodieLockConfig.HIVE_TABLE_NAME.key                               -> "test_table",  
HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key                  -> (60 * 1000).toString,  
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key        -> (1 * 1000).toString,  
HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key                      -> 15.toString,  
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key               -> 2.toString,  
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key -> (10 * 1000).toString

Stacktrace

22/05/27 02:14:38 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
22/05/27 02:14:38 INFO metastore: Trying to connect to metastore with URI thrift://ip-10-203-95-204.eu-west-2.compute.internal:9083
22/05/27 02:14:38 INFO metastore: Opened a connection to metastore, current connections: 1
22/05/27 02:14:38 INFO metastore: Connected to metastore.
22/05/27 02:14:38 INFO HiveMetastoreBasedLockProvider: ACQUIRING lock at database test_db and table test_table
22/05/27 02:14:38 INFO LockManager: Retrying to acquire lock...
22/05/27 02:14:48 INFO HiveMetastoreBasedLockProvider: ACQUIRING lock at database test_db and table test_table
22/05/27 02:14:48 INFO TransactionManager: Transaction ending without a transaction owner
22/05/27 02:14:48 INFO HiveMetastoreBasedLockProvider: RELEASING lock at database test_db and table test_table
22/05/27 02:14:48 INFO HiveMetastoreBasedLockProvider: RELEASED lock at database test_db and table test_table
22/05/27 02:14:48 INFO TransactionManager: Transaction ended without a transaction owner
java.lang.IllegalArgumentException: ALREADY_ACQUIRED
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
	at org.apache.hudi.hive.HiveMetastoreBasedLockProvider.acquireLock(HiveMetastoreBasedLockProvider.java:136)
	at org.apache.hudi.hive.HiveMetastoreBasedLockProvider.tryLock(HiveMetastoreBasedLockProvider.java:112)
	at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:62)
	at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:51)
	at org.apache.hudi.client.SparkRDDWriteClient.getTableAndInitCtx(SparkRDDWriteClient.java:430)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)

Metadata

Metadata

Assignees

Labels

area:concurrencyConcurrency control and multi-writerpriority:criticalProduction degraded; pipelines stalled

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions