Skip to content
This repository was archived by the owner on Feb 27, 2025. It is now read-only.

Commit 8d36396

Browse files
committed
leave out auto col and use user providec col mapping
1 parent 6d9e49f commit 8d36396

1 file changed

Lines changed: 12 additions & 43 deletions

File tree

src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object BulkCopyUtils extends Logging {
3535
* a connection, sets connection properties and does a BulkWrite. Called when writing data to
3636
* master instance and data pools both. URL in options is used to create the relevant connection.
3737
*
38-
* @param itertor - iterator for row of the partition.
38+
* @param iterator - iterator for row of the partition.
3939
* @param dfColMetadata - array of ColumnMetadata type
4040
* @param options - SQLServerBulkJdbcOptions with url for the connection
4141
*/
@@ -179,32 +179,6 @@ object BulkCopyUtils extends Logging {
179179
conn.createStatement.executeQuery(queryStr)
180180
}
181181

182-
/**
183-
* getAutoCols
184-
* utility function to get auto generated columns.
185-
* Use auto generated column names to exclude them when matching schema.
186-
*/
187-
private[spark] def getAutoCols(
188-
conn: Connection,
189-
table: String): List[String] = {
190-
// auto cols union computed cols, generated always cols, and node / edge table auto cols
191-
val queryStr = s"""SELECT name
192-
FROM sys.columns
193-
WHERE object_id = OBJECT_ID('${table}')
194-
AND (is_computed = 1 -- computed column
195-
OR generated_always_type > 0 -- generated always / temporal table
196-
OR (is_hidden = 0 AND graph_type = 2)) -- graph table
197-
"""
198-
199-
val autoColRs = conn.createStatement.executeQuery(queryStr)
200-
val autoCols = ListBuffer[String]()
201-
while (autoColRs.next()) {
202-
val colName = autoColRs.getString("name")
203-
autoCols.append(colName)
204-
}
205-
autoCols.toList
206-
}
207-
208182
/**
209183
* getColMetadataMap
210184
* Utility function convert result set meta data to array.
@@ -290,35 +264,30 @@ object BulkCopyUtils extends Logging {
290264
val dfCols = df.schema
291265

292266
val tableCols = getSchema(rs, JdbcDialects.get(url))
293-
val autoCols = getAutoCols(conn, dbtable)
294267

295268
val columnsToWriteSet = columnsToWrite.split(",").toSet
296269
logDebug(s"columnsToWrite: $columnsToWriteSet")
297270

298271
val prefix = "Spark Dataframe and SQL Server table have differing"
299272

300-
// auto columns should not exist in df
301-
assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck,
302-
s"${prefix} numbers of columns")
303-
304273
// if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table.
305-
if (columnsToWrite == "") {
306-
val result = new Array[ColumnMetadata](columnsToWriteSet.size)
274+
var metadataLen = tableCols.length
275+
if (columnsToWrite.isEmpty) {
276+
assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck,
277+
s"${prefix} numbers of columns")
307278
} else {
308-
val result = new Array[ColumnMetadata](tableCols.length - autoCols.length)
279+
metadataLen = columnsToWriteSet.size
309280
}
310281

311-
var nonAutoColIndex = 0
282+
var colMappingIndex = 0
283+
val result = new Array[ColumnMetadata](metadataLen)
312284

313285
for (i <- 0 to tableCols.length-1) {
314286
val tableColName = tableCols(i).name
315287
var dfFieldIndex = -1
288+
// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
316289
if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) {
317-
// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
318290
logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list")
319-
} else if (autoCols.contains(tableColName)) {
320-
// if auto columns, skip column mapping and ColumnMetadata
321-
logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex")
322291
}else{
323292
var dfColName:String = ""
324293
if (isCaseSensitive) {
@@ -361,15 +330,15 @@ object BulkCopyUtils extends Logging {
361330
s" DF col ${dfColName} nullable config is ${dfCols(dfFieldIndex).nullable} " +
362331
s" Table col ${tableColName} nullable config is ${tableCols(i).nullable}")
363332

364-
// Schema check passed for element, Create ColMetaData only for non auto generated column
365-
result(nonAutoColIndex) = new ColumnMetadata(
333+
// Schema check passed for element, Create ColMetaData for columns
334+
result(colMappingIndex) = new ColumnMetadata(
366335
rs.getMetaData().getColumnName(i+1),
367336
rs.getMetaData().getColumnType(i+1),
368337
rs.getMetaData().getPrecision(i+1),
369338
rs.getMetaData().getScale(i+1),
370339
dfFieldIndex
371340
)
372-
nonAutoColIndex += 1
341+
colMappingIndex += 1
373342
}
374343
}
375344
result

0 commit comments

Comments
 (0)