Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

When running CTAS/RTAS, use the nullable schema of the input query to create the table.

Why are the changes needed?

It's very likely to run CTAS/RTAS with non-nullable input query, e.g. CREATE TABLE t AS SELECT 1. However, it's surprising to users if they can't write null to this table later. Non-nullable is kind of a constraint of the column and should be specified by users explicitly.

For reference, Postgres also use nullable schema for CTAS:

> create table t1(i int not null);

> insert into t1 values (1);

> create table t2 as select i from t1;

> \d+ t1;
 Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
--------+---------+-----------+----------+---------+---------+--------------+-------------
 i      | integer |           | not null |         | plain   |              | 

> \d+ t2;
 Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
--------+---------+-----------+----------+---------+---------+--------------+-------------
 i      | integer |           |          |         | plain   |              | 

File source V1 has the same behavior.

Does this PR introduce any user-facing change?

Yes, after this PR CTAS/RTAS creates tables with nullable schema, then users can insert null values later.

How was this patch tested?

new test

@cloud-fan
Copy link
Contributor Author

cc @brkyvz @zsxwing @rdblue @maropu

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109492 has finished for PR 25536 at commit 655d07e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Aug 21, 2019

+1

case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT 1 i")

val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor and non-blocking... "table_name" repeated many times and is it better to make it a test class variable and each test case referencing it?

sorry maybe I'm being too nitpick lol

PR looks good to me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to me if you don't have to jump around too much when reading the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in your PR https://github.com/apache/spark/pull/25507/files DataSourceV2DataFrameSessionCatalogSuite.scala, there is a class variable v2Format being referenced by all test cases

protected val v2Format: String = classOf[InMemoryTableProvider].getName

so I thought it's a style convention...

@brkyvz
Copy link
Contributor

brkyvz commented Aug 21, 2019

I'm also a +1 on this (and PR LGTM). However a follow up on this. Can we ensure that the written data really has the schema as nullable as well? I've seen many parquet files corrupted with nulls written to non-nullable fields according to Parquet's schema

@rdblue
Copy link
Contributor

rdblue commented Aug 21, 2019

I've seen many parquet files corrupted with nulls written to non-nullable fields according to Parquet's schema

I've not hit this problem. What version of Parquet are you using?

Usually, if the Spark plan is wrong about nullability, we get NPEs thrown in codegen sections because of assignment to primitive types. Though that doesn't affect strings and decimals.

}

Utils.tryWithSafeFinallyAndFailureCallbacks({
val schema = query.schema.asNullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to update the schema nullability of the corresponding logical plans, CreateTableAsSelect and ReplaceTable, in the analyzer phase? Any reason to directly update the nullability of physical plans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's too much work if we need to transform the logical plan and add an extra Project to change the nullability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be incorrect to change the logical plan. The behavior of CTAS should be that tables are created with nullable types. The query used by CTAS should not be changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks. Looks ok to me.

@maropu
Copy link
Member

maropu commented Aug 21, 2019

+1; the change looks reasonable to me.

@cloud-fan cloud-fan closed this in ed3ea67 Aug 22, 2019
@cloud-fan
Copy link
Contributor Author

thanks for your reviews! merging to master

@xubo245
Copy link
Contributor

xubo245 commented Apr 26, 2020

Can CTAS support create table with schema?
Already support:

 create table t1(i1 int);
 insert into t1 select i2 from t2;

but don't support this:

create table t1 (i1 int) as select 2 from t2;

Can spark support it?

@sketchmind
Copy link

+1; the change looks reasonable to me.

I don't think it's reasonable, table should be same with schema of dataframe, and if the shema is different from what we want we should explicitly modify it and then create it instead of creating a table that is not the same as the schema , which can be confusing @maropu

@cloud-fan
Copy link
Contributor Author

@sketchmind Note that, we don't insert to a table only once. It doesn't make sense to define the table schema with its first insertion (CTAS). I think this is also why Postgres uses the same design.

@sketchmind
Copy link

sketchmind commented Jul 26, 2022

@sketchmind Note that, we don't insert to a table only once. It doesn't make sense to define the table schema with its first insertion (CTAS). I think this is also why Postgres uses the same design.

This is not only a problem with the first insert, a complete dataframe using the datasource createOrReplace API is also affected by this logic,which makes it impossible to create a NOT NULL table

@cloud-fan
Copy link
Contributor Author

But people can always create an empty table first and then insert? If we really want to support this case, we should allow people to define table schema in CTAS (or Scala createOrReplace) so that it can overwrite dataframe schema.

@sketchmind
Copy link

sketchmind commented Jul 26, 2022

But people can always create an empty table first and then insert? If we really want to support this case, we should allow people to define table schema in CTAS (or Scala createOrReplace) so that it can overwrite dataframe schema.

CreateOrReplace(create an empty table first and then insert) is a very frequently used function,it's just that most tables are not sensitive to whether a column is NULL or not, especially Hive, so this issue is not widely mentioned.
I think it is better for spark to leave this part of the logic to the catalog developers to manage instead of using NULL to create tables for all databases.
But no matter what, we should follow the predefined schema to create the table, otherwise we will artificially introduce wrong information, which is not in line with the programmer's intuition.

@pan3793
Copy link
Member

pan3793 commented Jan 31, 2023

@cloud-fan how about adding a configuration to allow users to control the behavior? e.g. spark.sql.reserveSchemaNullableInCTAS (default false)

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jan 31, 2023

Can you explain the use cases? It seems people agreed with the SQL behavior (consistent with postgres), but had concerns about Scala API. Note that, Spark also provides Scala APIs to create a table, so you can keep the nullability of query schema

val df = ...
spark.catalog.createTable(..., df.schema)
df.writeTo(...).append()

@pan3793
Copy link
Member

pan3793 commented Jan 31, 2023

I am stuck in the SQL case.

The background is, ClickHouse is extremely fast on single-wide table OLAP queries, and some data engineers want to use Spark to do heavy data preparation and save the result as a temp table into ClickHouse through pure SQL, usually, the result set is quite large, and won't do append/update/delete during the table's lifecycle.

Since NULL is not good for performance, ClickHouse has quite strict restrictions on table schema, e.g. the sorting keys are not allowed to be nullable

Using Nullable almost always negatively affects performance, keep this in mind when designing your databases.

https://clickhouse.com/docs/en/sql-reference/data-types/nullable

In the above use case, things become simple if the connector knows and respects the nullable of DataFrame's schema on CTAS.

@cloud-fan
Copy link
Contributor Author

Can you split your SQL CTAS to CREATE TABLE and INSERT? Or ALTER the table later to change column nullability? It seems like a special case for clickhouse benchmark, and I'm a bit reluctant to change Spark's behavior to fit it.

@pan3793
Copy link
Member

pan3793 commented Jan 31, 2023

Can you split your SQL CTAS to CREATE TABLE and INSERT?

Yes, it's a workaround, but it's not convenient since users need to explicitly list all column's definition.

Or ALTER the table later to change column nullability?

I'm afraid not, the CTAS failed because the generated schema violates the restriction.

Back to the case mentioned in this PR description, I think table schema evolution is an essential feature of the modern data lake table format, can INT NOT NULL safely evolute to INT NULL? If yes, I think there is no problem if we reserve the nullable on CTAS/RTAS.

BTW, I tested the provided CTAS case on MySQL 8, the result is different.

mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.32    |
+-----------+
1 row in set (0.00 sec)

mysql> create table t1(i int not null);
Query OK, 0 rows affected (0.04 sec)

mysql> insert into t1 values (1);
Query OK, 1 row affected (0.01 sec)

mysql> create table t2 as select i from t1;
Query OK, 1 row affected (0.03 sec)
Records: 1  Duplicates: 0  Warnings: 0

mysql> desc t1;
+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| i     | int  | NO   |     | NULL    |       |
+-------+------+------+-----+---------+-------+
1 row in set (0.00 sec)

mysql> desc t2;
+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| i     | int  | NO   |     | NULL    |       |
+-------+------+------+-----+---------+-------+
1 row in set (0.00 sec)

@cloud-fan
Copy link
Contributor Author

This is a good point. If Spark can do schema evolution during table insertion, then having non-nullable table schema in CTAS is not a problem anymore. We need a bit design about how data sources can claim its support of schema evolution, and what if they do want to enforce the non-nullable property.

cloud-fan pushed a commit that referenced this pull request May 9, 2023
### What changes were proposed in this pull request?

Add a new method `useNullableQuerySchema` in `Table`, to allow the DataSource implementation to declare whether they need to reserve schema nullability on CTAS/RTAS.

### Why are the changes needed?

SPARK-28837 forcibly uses the nullable schema on CTAS/RTAS, which seems too aggressive:

1. The existing matured RDBMSs have different behaviors for reserving schema nullability on CTAS/RTAS, as mentioned in #25536, PostgreSQL forcibly uses the nullable schema, but MySQL respects the query's output schema nullability.
2. Some OLAP systems(e.g. ClickHouse) are perf-sensitive for nullable, and have strict restrictions on table schema, e.g. the primary keys are not allowed to be nullable.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a new DSv2 API, but the default implementation reserves backward compatibility.

### How was this patch tested?

UTs are updated.

Closes #41070 from pan3793/SPARK-43390.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
### What changes were proposed in this pull request?

Add a new method `useNullableQuerySchema` in `Table`, to allow the DataSource implementation to declare whether they need to reserve schema nullability on CTAS/RTAS.

### Why are the changes needed?

SPARK-28837 forcibly uses the nullable schema on CTAS/RTAS, which seems too aggressive:

1. The existing matured RDBMSs have different behaviors for reserving schema nullability on CTAS/RTAS, as mentioned in apache#25536, PostgreSQL forcibly uses the nullable schema, but MySQL respects the query's output schema nullability.
2. Some OLAP systems(e.g. ClickHouse) are perf-sensitive for nullable, and have strict restrictions on table schema, e.g. the primary keys are not allowed to be nullable.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a new DSv2 API, but the default implementation reserves backward compatibility.

### How was this patch tested?

UTs are updated.

Closes apache#41070 from pan3793/SPARK-43390.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants