Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/better-plums-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@tanstack/db-collections": patch
"@tanstack/db-example-react-todo": patch
"@tanstack/db": patch
---

Type PendingMutation whenever possible
8 changes: 4 additions & 4 deletions examples/react/todo/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ const createTodoCollection = (type: CollectionType) => {
if (collectionsCache.has(`todo`)) {
return collectionsCache.get(`todo`)
} else {
let newCollection: Collection
let newCollection: Collection<UpdateTodo>
if (type === CollectionType.Electric) {
newCollection = createCollection<UpdateTodo>(
electricCollectionOptions({
newCollection = createCollection(
electricCollectionOptions<UpdateTodo>({
id: `todos`,
shapeOptions: {
url: `http://localhost:3003/v1/shape`,
Expand Down Expand Up @@ -237,7 +237,7 @@ const createConfigCollection = (type: CollectionType) => {
if (collectionsCache.has(`config`)) {
return collectionsCache.get(`config`)
} else {
let newCollection: Collection
let newCollection: Collection<UpdateConfig>
if (type === CollectionType.Electric) {
newCollection = createCollection(
electricCollectionOptions({
Expand Down
20 changes: 13 additions & 7 deletions packages/db-collections/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,32 @@ export interface ElectricCollectionConfig<T extends Row<unknown>> {
* @param params Object containing transaction and mutation information
* @returns Promise resolving to an object with txid
*/
onInsert?: (params: MutationFnParams) => Promise<{ txid: string } | undefined>
onInsert?: (
params: MutationFnParams<T>
) => Promise<{ txid: string } | undefined>

/**
* Optional asynchronous handler function called before an update operation
* Must return an object containing a txid string
* @param params Object containing transaction and mutation information
* @returns Promise resolving to an object with txid
*/
onUpdate?: (params: MutationFnParams) => Promise<{ txid: string } | undefined>
onUpdate?: (
params: MutationFnParams<T>
) => Promise<{ txid: string } | undefined>

/**
* Optional asynchronous handler function called before a delete operation
* Must return an object containing a txid string
* @param params Object containing transaction and mutation information
* @returns Promise resolving to an object with txid
*/
onDelete?: (params: MutationFnParams) => Promise<{ txid: string } | undefined>
onDelete?: (
params: MutationFnParams<T>
) => Promise<{ txid: string } | undefined>
}

function isUpToDateMessage<T extends Row<unknown> = Row>(
function isUpToDateMessage<T extends Row<unknown>>(
message: Message<T>
): message is ControlMessage & { up_to_date: true } {
return isControlMessage(message) && message.headers.control === `up-to-date`
Expand Down Expand Up @@ -133,7 +139,7 @@ export function electricCollectionOptions<T extends Row<unknown>>(

// Create wrapper handlers for direct persistence operations that handle txid awaiting
const wrappedOnInsert = config.onInsert
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<T>) => {
const handlerResult = (await config.onInsert!(params)) ?? {}
const txid = (handlerResult as { txid?: string }).txid

Expand All @@ -149,7 +155,7 @@ export function electricCollectionOptions<T extends Row<unknown>>(
: undefined

const wrappedOnUpdate = config.onUpdate
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<T>) => {
const handlerResult = await config.onUpdate!(params)
const txid = (handlerResult as { txid?: string }).txid

Expand All @@ -165,7 +171,7 @@ export function electricCollectionOptions<T extends Row<unknown>>(
: undefined

const wrappedOnDelete = config.onDelete
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<T>) => {
const handlerResult = await config.onDelete!(params)
const txid = (handlerResult as { txid?: string }).txid

Expand Down
6 changes: 3 additions & 3 deletions packages/db-collections/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ export function queryCollectionOptions<

// Create wrapper handlers for direct persistence operations that handle refetching
const wrappedOnInsert = onInsert
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<TItem>) => {
const handlerResult = (await onInsert(params)) ?? {}
const shouldRefetch =
(handlerResult as { refetch?: boolean }).refetch !== false
Expand All @@ -298,7 +298,7 @@ export function queryCollectionOptions<
: undefined

const wrappedOnUpdate = onUpdate
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<TItem>) => {
const handlerResult = (await onUpdate(params)) ?? {}
const shouldRefetch =
(handlerResult as { refetch?: boolean }).refetch !== false
Expand All @@ -312,7 +312,7 @@ export function queryCollectionOptions<
: undefined

const wrappedOnDelete = onDelete
? async (params: MutationFnParams) => {
? async (params: MutationFnParams<TItem>) => {
const handlerResult = (await onDelete(params)) ?? {}
const shouldRefetch =
(handlerResult as { refetch?: boolean }).refetch !== false
Expand Down
12 changes: 8 additions & 4 deletions packages/db-collections/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
MutationFnParams,
PendingMutation,
Transaction,
TransactionWithMutations,
} from "@tanstack/db"
import type { Message, Row } from "@electric-sql/client"

Expand Down Expand Up @@ -415,8 +416,11 @@ describe(`Electric Integration`, () => {

it(`should throw an error if handler doesn't return a txid`, async () => {
// Create a mock transaction for testing
const mockTransaction = { id: `test-transaction` } as Transaction
const mockParams: MutationFnParams = { transaction: mockTransaction }
const mockTransaction = {
id: `test-transaction`,
mutations: [],
} as unknown as TransactionWithMutations<Row>
const mockParams: MutationFnParams<Row> = { transaction: mockTransaction }

// Create a handler that doesn't return a txid
const onInsert = vi.fn().mockResolvedValue({})
Expand Down Expand Up @@ -488,7 +492,7 @@ describe(`Electric Integration`, () => {
}

// Create a mutation function for the transaction
const mutationFn = vi.fn(async (params: MutationFnParams) => {
const mutationFn = vi.fn(async (params: MutationFnParams<Row>) => {
const txid = await fakeBackend.persist(params.transaction.mutations)

// Simulate server sending sync message after a delay
Expand All @@ -500,7 +504,7 @@ describe(`Electric Integration`, () => {
})

// Create direct persistence handler that returns the txid
const onInsert = vi.fn(async (params: MutationFnParams) => {
const onInsert = vi.fn(async (params: MutationFnParams<Row>) => {
return { txid: await mutationFn(params) }
})

Expand Down
32 changes: 16 additions & 16 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
* This is populated by createCollection
*/
public utils: Record<string, Fn> = {}
public transactions: Store<SortedMap<string, TransactionType>>
public transactions: Store<SortedMap<string, TransactionType<any>>>
public optimisticOperations: Derived<Array<OptimisticChangeMessage<T>>>
public derivedState: Derived<Map<string, T>>
public derivedArray: Derived<Array<T>>
Expand Down Expand Up @@ -250,7 +250,7 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
}

this.transactions = new Store(
new SortedMap<string, TransactionType>(
new SortedMap<string, TransactionType<any>>(
(a, b) => a.createdAt.getTime() - b.createdAt.getTime()
)
)
Expand All @@ -269,7 +269,7 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
const message: OptimisticChangeMessage<T> = {
type: mutation.type,
key: mutation.key,
value: mutation.modified as T,
value: mutation.modified,
isActive,
}
if (
Expand Down Expand Up @@ -684,8 +684,8 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
const mutation: PendingMutation<T> = {
mutationId: crypto.randomUUID(),
original: {},
modified: validatedData as Record<string, unknown>,
changes: validatedData as Record<string, unknown>,
modified: validatedData,
changes: validatedData,
key,
metadata: config?.metadata as unknown,
syncMetadata: this.config.sync.getSyncMetadata?.() || {},
Expand All @@ -710,7 +710,7 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
return ambientTransaction
} else {
// Create a new transaction with a mutation function that calls the onInsert handler
const directOpTransaction = new Transaction({
const directOpTransaction = new Transaction<T>({
mutationFn: async (params) => {
// Call the onInsert handler with the transaction
return this.config.onInsert!(params)
Expand Down Expand Up @@ -906,10 +906,10 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
// No need to check for onUpdate handler here as we've already checked at the beginning

// Create a new transaction with a mutation function that calls the onUpdate handler
const directOpTransaction = new Transaction({
mutationFn: async (transaction) => {
const directOpTransaction = new Transaction<T>({
mutationFn: async (params) => {
// Call the onUpdate handler with the transaction
return this.config.onUpdate!(transaction)
return this.config.onUpdate!(params)
},
})

Expand Down Expand Up @@ -944,7 +944,7 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
delete = (
ids: Array<string> | string,
config?: OperationConfig
): TransactionType => {
): TransactionType<any> => {
const ambientTransaction = getActiveTransaction()

// If no ambient transaction exists, check for an onDelete handler early
Expand All @@ -962,9 +962,9 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
for (const id of idsArray) {
const mutation: PendingMutation<T> = {
mutationId: crypto.randomUUID(),
original: (this.state.get(id) || {}) as Record<string, unknown>,
modified: (this.state.get(id) || {}) as Record<string, unknown>,
changes: (this.state.get(id) || {}) as Record<string, unknown>,
original: this.state.get(id) || {},
modified: this.state.get(id)!,
changes: this.state.get(id) || {},
key: id,
metadata: config?.metadata as unknown,
syncMetadata: (this.syncedMetadata.state.get(id) || {}) as Record<
Expand Down Expand Up @@ -993,11 +993,11 @@ export class CollectionImpl<T extends object = Record<string, unknown>> {
}

// Create a new transaction with a mutation function that calls the onDelete handler
const directOpTransaction = new Transaction({
const directOpTransaction = new Transaction<T>({
autoCommit: true,
mutationFn: async (transaction) => {
mutationFn: async (params) => {
// Call the onDelete handler with the transaction
return this.config.onDelete!(transaction)
return this.config.onDelete!(params)
},
})

Expand Down
38 changes: 20 additions & 18 deletions packages/db/src/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createDeferred } from "./deferred"
import type { Deferred } from "./deferred"
import type {
MutationFn,
PendingMutation,
TransactionConfig,
TransactionState,
Expand All @@ -24,8 +25,8 @@ function generateUUID() {
})
}

const transactions: Array<Transaction> = []
let transactionStack: Array<Transaction> = []
const transactions: Array<Transaction<any>> = []
let transactionStack: Array<Transaction<any>> = []

export function createTransaction(config: TransactionConfig): Transaction {
if (typeof config.mutationFn === `undefined`) {
Expand All @@ -51,27 +52,27 @@ export function getActiveTransaction(): Transaction | undefined {
}
}

function registerTransaction(tx: Transaction) {
function registerTransaction(tx: Transaction<any>) {
transactionStack.push(tx)
}

function unregisterTransaction(tx: Transaction) {
function unregisterTransaction(tx: Transaction<any>) {
transactionStack = transactionStack.filter((t) => t.id !== tx.id)
}

function removeFromPendingList(tx: Transaction) {
function removeFromPendingList(tx: Transaction<any>) {
const index = transactions.findIndex((t) => t.id === tx.id)
if (index !== -1) {
transactions.splice(index, 1)
}
}

export class Transaction {
export class Transaction<T extends object = Record<string, unknown>> {
public id: string
public state: TransactionState
public mutationFn
public mutations: Array<PendingMutation<any>>
public isPersisted: Deferred<Transaction>
public mutationFn: MutationFn<T>
public mutations: Array<PendingMutation<T>>
public isPersisted: Deferred<Transaction<T>>
public autoCommit: boolean
public createdAt: Date
public metadata: Record<string, unknown>
Expand All @@ -80,12 +81,12 @@ export class Transaction {
error: Error
}

constructor(config: TransactionConfig) {
constructor(config: TransactionConfig<T>) {
this.id = config.id!
this.mutationFn = config.mutationFn
this.state = `pending`
this.mutations = []
this.isPersisted = createDeferred()
this.isPersisted = createDeferred<Transaction<T>>()
this.autoCommit = config.autoCommit ?? true
this.createdAt = new Date()
this.metadata = config.metadata ?? {}
Expand All @@ -99,7 +100,7 @@ export class Transaction {
}
}

mutate(callback: () => void): Transaction {
mutate(callback: () => void): Transaction<T> {
if (this.state !== `pending`) {
throw `You can no longer call .mutate() as the transaction is no longer pending`
}
Expand Down Expand Up @@ -134,7 +135,7 @@ export class Transaction {
}
}

rollback(config?: { isSecondaryRollback?: boolean }): Transaction {
rollback(config?: { isSecondaryRollback?: boolean }): Transaction<T> {
const isSecondaryRollback = config?.isSecondaryRollback ?? false
if (this.state === `completed`) {
throw `You can no longer call .rollback() as the transaction is already completed`
Expand Down Expand Up @@ -173,7 +174,7 @@ export class Transaction {
}
}

async commit(): Promise<Transaction> {
async commit(): Promise<Transaction<T>> {
if (this.state !== `pending`) {
throw `You can no longer call .commit() as the transaction is no longer pending`
}
Expand All @@ -189,10 +190,11 @@ export class Transaction {
// Run mutationFn
try {
// At this point we know there's at least one mutation
// Use type assertion to tell TypeScript about this guarantee
const transactionWithMutations =
this as unknown as TransactionWithMutations
await this.mutationFn({ transaction: transactionWithMutations })
// We've already verified mutations is non-empty, so this cast is safe
// Use a direct type assertion instead of object spreading to preserve the original type
await this.mutationFn({
transaction: this as unknown as TransactionWithMutations<T>,
})

this.setState(`completed`)
this.touchCollection()
Expand Down
Loading