@@ -103,6 +103,21 @@ async function acquireTablePositionLock(trx: DbTransaction, tableId: string) {
103103 )
104104}
105105
106+ /**
107+ * Returns the next auto-assigned `position` for a table (max(position) + 1, or 0
108+ * if empty). Callers must hold `acquireTablePositionLock` to avoid two concurrent
109+ * writers computing the same value against the same snapshot.
110+ */
111+ async function nextAutoPosition ( trx : DbTransaction , tableId : string ) : Promise < number > {
112+ const [ { maxPos } ] = await trx
113+ . select ( {
114+ maxPos : sql < number > `coalesce(max(${ userTableRows . position } ), -1)` . mapWith ( Number ) ,
115+ } )
116+ . from ( userTableRows )
117+ . where ( eq ( userTableRows . tableId , tableId ) )
118+ return maxPos + 1
119+ }
120+
106121const TIMEOUT_CAP_MS = 10 * 60_000
107122
108123/**
@@ -681,14 +696,7 @@ export async function insertRow(
681696 )
682697 }
683698 } else {
684- const [ { maxPos } ] = await trx
685- . select ( {
686- maxPos : sql < number > `coalesce(max(${ userTableRows . position } ), -1)` . mapWith ( Number ) ,
687- } )
688- . from ( userTableRows )
689- . where ( eq ( userTableRows . tableId , data . tableId ) )
690-
691- targetPosition = maxPos + 1
699+ targetPosition = await nextAutoPosition ( trx , data . tableId )
692700 }
693701
694702 return trx
@@ -801,14 +809,8 @@ export async function batchInsertRows(
801809 return trx . insert ( userTableRows ) . values ( rowsToInsert ) . returning ( )
802810 }
803811
804- const [ { maxPos } ] = await trx
805- . select ( {
806- maxPos : sql < number > `coalesce(max(${ userTableRows . position } ), -1)` . mapWith ( Number ) ,
807- } )
808- . from ( userTableRows )
809- . where ( eq ( userTableRows . tableId , data . tableId ) )
810-
811- const rowsToInsert = data . rows . map ( ( rowData , i ) => buildRow ( rowData , maxPos + 1 + i ) )
812+ const startPos = await nextAutoPosition ( trx , data . tableId )
813+ const rowsToInsert = data . rows . map ( ( rowData , i ) => buildRow ( rowData , startPos + i ) )
812814
813815 return trx . insert ( userTableRows ) . values ( rowsToInsert ) . returning ( )
814816 } )
@@ -1056,55 +1058,33 @@ export async function upsertRow(
10561058
10571059 const now = new Date ( )
10581060
1059- if ( existingRow ) {
1060- const [ updatedRow ] = await trx
1061- . update ( userTableRows )
1062- . set ( {
1063- data : data . data ,
1064- updatedAt : now ,
1065- } )
1066- . where ( eq ( userTableRows . id , existingRow . id ) )
1067- . returning ( )
1068-
1069- return {
1070- row : {
1071- id : updatedRow . id ,
1072- data : updatedRow . data as RowData ,
1073- position : updatedRow . position ,
1074- createdAt : updatedRow . createdAt ,
1075- updatedAt : updatedRow . updatedAt ,
1076- } ,
1077- operation : 'update' as const ,
1078- }
1079- }
1080-
1081- await acquireTablePositionLock ( trx , data . tableId )
1082-
1083- // Re-check after acquiring the lock: a concurrent upsert that started
1084- // before us may have inserted the matching row between our initial
1085- // `existingRow` read and now. Without this, both transactions would
1086- // proceed to insert and produce a duplicate that bypasses the
1087- // app-level unique check.
1088- const [ racedRow ] = await trx
1089- . select ( )
1090- . from ( userTableRows )
1091- . where (
1092- and (
1093- eq ( userTableRows . tableId , data . tableId ) ,
1094- eq ( userTableRows . workspaceId , data . workspaceId ) ,
1095- matchFilter
1061+ // Resolve which row (if any) we should update. If the initial SELECT missed,
1062+ // acquire the lock and re-check — a concurrent upsert may have inserted the
1063+ // matching row between our SELECT and the INSERT path, and without the
1064+ // re-check both transactions would insert and produce a duplicate that
1065+ // bypasses the app-level unique check.
1066+ let matchedRowId = existingRow ?. id
1067+ if ( ! matchedRowId ) {
1068+ await acquireTablePositionLock ( trx , data . tableId )
1069+ const [ racedRow ] = await trx
1070+ . select ( { id : userTableRows . id } )
1071+ . from ( userTableRows )
1072+ . where (
1073+ and (
1074+ eq ( userTableRows . tableId , data . tableId ) ,
1075+ eq ( userTableRows . workspaceId , data . workspaceId ) ,
1076+ matchFilter
1077+ )
10961078 )
1097- )
1098- . limit ( 1 )
1079+ . limit ( 1 )
1080+ matchedRowId = racedRow ?. id
1081+ }
10991082
1100- if ( racedRow ) {
1083+ if ( matchedRowId ) {
11011084 const [ updatedRow ] = await trx
11021085 . update ( userTableRows )
1103- . set ( {
1104- data : data . data ,
1105- updatedAt : now ,
1106- } )
1107- . where ( eq ( userTableRows . id , racedRow . id ) )
1086+ . set ( { data : data . data , updatedAt : now } )
1087+ . where ( eq ( userTableRows . id , matchedRowId ) )
11081088 . returning ( )
11091089
11101090 return {
@@ -1119,21 +1099,14 @@ export async function upsertRow(
11191099 }
11201100 }
11211101
1122- const [ { maxPos } ] = await trx
1123- . select ( {
1124- maxPos : sql < number > `coalesce(max(${ userTableRows . position } ), -1)` . mapWith ( Number ) ,
1125- } )
1126- . from ( userTableRows )
1127- . where ( eq ( userTableRows . tableId , data . tableId ) )
1128-
11291102 const [ insertedRow ] = await trx
11301103 . insert ( userTableRows )
11311104 . values ( {
11321105 id : `row_${ generateId ( ) . replace ( / - / g, '' ) } ` ,
11331106 tableId : data . tableId ,
11341107 workspaceId : data . workspaceId ,
11351108 data : data . data ,
1136- position : maxPos + 1 ,
1109+ position : await nextAutoPosition ( trx , data . tableId ) ,
11371110 createdAt : now ,
11381111 updatedAt : now ,
11391112 ...( data . userId ? { createdBy : data . userId } : { } ) ,
0 commit comments