Skip to content

Commit 24207e2

Browse files
cloud-fanzhengruifeng
authored andcommitted
[SPARK-54834][SQL] Add new interfaces SimpleProcedure and SimpleFunction
### What changes were proposed in this pull request? It's common that a procedure does not have any overload and the `bind` method always return the same `BoundProcedure`. This PR adds a new interface `SimpleProcedure ` for this use case, which allows people to implement a `BoundProcedure` directly without thinking about how to bind. The same applies to v2 functions. ### Why are the changes needed? Simplify a common use case when people implement procedures and functions. ### Does this PR introduce _any_ user-facing change? No, it's developer facing ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? cursor 2.2.43 Closes #53595 from cloud-fan/procedure. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
1 parent 76c9516 commit 24207e2

4 files changed

Lines changed: 119 additions & 4 deletions

File tree

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog.functions;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.types.StructType;
22+
23+
/**
24+
* A function that does not require binding to input types.
25+
* <p>
26+
* This interface is designed for functions that have no overloads and do not need custom binding
27+
* logic. Implementations can directly provide function parameters and execution logic without
28+
* implementing the {@link UnboundFunction#bind(StructType) bind} method.
29+
* <p>
30+
* The default {@link #bind(StructType) bind} method simply returns {@code this}, as the function
31+
* is already considered bound.
32+
*
33+
* @since 4.2.0
34+
*/
35+
@Evolving
36+
public interface SimpleFunction extends UnboundFunction, BoundFunction {
37+
@Override
38+
default BoundFunction bind(StructType inputType) {
39+
return this;
40+
}
41+
}
42+
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog.procedures;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.types.StructType;
22+
23+
/**
24+
* A procedure that does not require binding to input types.
25+
* <p>
26+
* This interface is designed for procedures that have no overloads and do not need custom binding
27+
* logic. Implementations can directly provide procedure parameters and execution logic without
28+
* implementing the {@link UnboundProcedure#bind(StructType) bind} method.
29+
* <p>
30+
* The default {@link #bind(StructType) bind} method simply returns {@code this}, as the procedure
31+
* is already considered bound.
32+
*
33+
* @since 4.2.0
34+
*/
35+
@Evolving
36+
public interface SimpleProcedure extends UnboundProcedure, BoundProcedure {
37+
@Override
38+
default BoundProcedure bind(StructType inputType) {
39+
return this;
40+
}
41+
}
42+

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,25 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
702702
comparePlans(df1.queryExecution.optimizedPlan, df2.queryExecution.optimizedPlan)
703703
checkAnswer(df1, Row(3) :: Nil)
704704
}
705+
706+
test("simple function") {
707+
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
708+
addFunction(Identifier.of(Array("ns"), "simple_strlen"), SimpleStrLen)
709+
checkAnswer(sql("SELECT testcat.ns.simple_strlen('abc')"), Row(3) :: Nil)
710+
checkAnswer(sql("SELECT testcat.ns.simple_strlen('hello world')"), Row(11) :: Nil)
711+
}
712+
}
713+
714+
case object SimpleStrLen extends SimpleFunction with ScalarFunction[Int] {
715+
override def inputTypes(): Array[DataType] = Array(StringType)
716+
override def resultType(): DataType = IntegerType
717+
override def name(): String = "simple_strlen"
718+
override def description(): String = "simple string length function"
719+
720+
override def produceResult(input: InternalRow): Int = {
721+
val s = input.getString(0)
722+
s.length
723+
}
705724
}
706725

707726
case object StrLenDefault extends ScalarFunction[Int] {

sql/core/src/test/scala/org/apache/spark/sql/connector/ProcedureSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
2828
import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, DefaultValue, Identifier, InMemoryCatalog}
29-
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure}
29+
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, SimpleProcedure, UnboundProcedure}
3030
import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode
3131
import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode.{IN, INOUT, OUT}
3232
import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, LiteralValue}
@@ -486,6 +486,12 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft
486486
checkAnswer(sql("CALL cat.ns.sum(5)"), Row(9) :: Nil)
487487
}
488488

489+
test("simple procedure") {
490+
catalog.createProcedure(Identifier.of(Array("ns"), "simple_sum"), SimpleSum)
491+
checkAnswer(sql("CALL cat.ns.simple_sum(3, 7)"), Row(10) :: Nil)
492+
checkAnswer(sql("CALL cat.ns.simple_sum(in2 => 4, in1 => 6)"), Row(10) :: Nil)
493+
}
494+
489495
test("SPARK-51780: Implement DESC PROCEDURE") {
490496
catalog.createProcedure(Identifier.of(Array("ns"), "foo"), UnboundSum)
491497
catalog.createProcedure(Identifier.of(Array("ns", "db"), "abc"), UnboundLongSum)
@@ -610,7 +616,7 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft
610616
object UnboundNonExecutableSum extends UnboundProcedure {
611617
override def name: String = "sum"
612618
override def description: String = "sum integers"
613-
override def bind(inputType: StructType): BoundProcedure = Sum
619+
override def bind(inputType: StructType): BoundProcedure = NonExecutableSum
614620
}
615621

616622
object NonExecutableSum extends BoundProcedure {
@@ -633,10 +639,10 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft
633639
object UnboundSum extends UnboundProcedure {
634640
override def name: String = "sum"
635641
override def description: String = "sum integers"
636-
override def bind(inputType: StructType): BoundProcedure = Sum
642+
override def bind(inputType: StructType): BoundProcedure = new Sum
637643
}
638644

639-
object Sum extends BoundProcedure {
645+
class Sum extends BoundProcedure {
640646
override def name: String = "sum"
641647

642648
override def description: String = "sum integers"
@@ -897,4 +903,10 @@ class ProcedureSuite extends QueryTest with SharedSparkSession with BeforeAndAft
897903
override def defaultValue: DefaultValue = null
898904
override def comment: String = null
899905
}
906+
907+
object SimpleSum extends Sum with SimpleProcedure {
908+
override def name: String = "simple_sum"
909+
910+
override def description: String = "simple sum integers"
911+
}
900912
}

0 commit comments

Comments
 (0)