Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async function executeWebhookJobInternal(
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}

// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
const mergedStates = mergeSubblockState(blocks)

// Create serialized workflow
const serializer = new Serializer()
Expand Down
80 changes: 80 additions & 0 deletions apps/sim/lib/workflows/subblocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'

export const DEFAULT_SUBBLOCK_TYPE = 'short-input'

/**
* Merges subblock values into the provided subblock structures.
* Falls back to a default subblock shape when a value has no structure.
* @param subBlocks - Existing subblock definitions from the workflow
* @param values - Stored subblock values keyed by subblock id
* @returns Merged subblock structures with updated values
*/
export function mergeSubBlockValues(
subBlocks: Record<string, unknown> | undefined,
values: Record<string, unknown> | undefined
): Record<string, unknown> {
const merged = { ...(subBlocks || {}) } as Record<string, any>

if (!values) return merged

Object.entries(values).forEach(([subBlockId, value]) => {
if (merged[subBlockId] && typeof merged[subBlockId] === 'object') {
merged[subBlockId] = {
...(merged[subBlockId] as Record<string, unknown>),
value,
}
return
}

merged[subBlockId] = {
id: subBlockId,
type: DEFAULT_SUBBLOCK_TYPE,
value,
}
})

return merged
}

/**
* Merges workflow block states with explicit subblock values while maintaining block structure.
* Values that are null or undefined do not override existing subblock values.
* @param blocks - Block configurations from workflow state
* @param subBlockValues - Subblock values keyed by blockId -> subBlockId -> value
* @param blockId - Optional specific block ID to merge (merges all if not provided)
* @returns Merged block states with updated subblocks
*/
export function mergeSubblockStateWithValues(
blocks: Record<string, BlockState>,
subBlockValues: Record<string, Record<string, unknown>> = {},
blockId?: string
): Record<string, BlockState> {
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks

return Object.entries(blocksToProcess).reduce(
(acc, [id, block]) => {
if (!block) {
return acc
}

const blockSubBlocks = block.subBlocks || {}
const blockValues = subBlockValues[id] || {}
const filteredValues = Object.fromEntries(
Object.entries(blockValues).filter(([, value]) => value !== null && value !== undefined)
)

const mergedSubBlocks = mergeSubBlockValues(blockSubBlocks, filteredValues) as Record<
string,
SubBlockState
>

acc[id] = {
...block,
subBlocks: mergedSubBlocks,
}

return acc
},
{} as Record<string, BlockState>
)
}
43 changes: 26 additions & 17 deletions apps/sim/socket/database/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import postgres from 'postgres'
import { env } from '@/lib/core/config/env'
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { mergeSubBlockValues } from '@/lib/workflows/subblocks'
import {
BLOCK_OPERATIONS,
BLOCKS_OPERATIONS,
Expand Down Expand Up @@ -455,7 +456,7 @@ async function handleBlocksOperationTx(
}

case BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS: {
const { blocks, edges, loops, parallels } = payload
const { blocks, edges, loops, parallels, subBlockValues } = payload

logger.info(`Batch adding blocks to workflow ${workflowId}`, {
blockCount: blocks?.length || 0,
Expand All @@ -465,22 +466,30 @@ async function handleBlocksOperationTx(
})

if (blocks && blocks.length > 0) {
const blockValues = blocks.map((block: Record<string, unknown>) => ({
id: block.id as string,
workflowId,
type: block.type as string,
name: block.name as string,
positionX: (block.position as { x: number; y: number }).x,
positionY: (block.position as { x: number; y: number }).y,
data: (block.data as Record<string, unknown>) || {},
subBlocks: (block.subBlocks as Record<string, unknown>) || {},
outputs: (block.outputs as Record<string, unknown>) || {},
enabled: (block.enabled as boolean) ?? true,
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
advancedMode: (block.advancedMode as boolean) ?? false,
triggerMode: (block.triggerMode as boolean) ?? false,
height: (block.height as number) || 0,
}))
const blockValues = blocks.map((block: Record<string, unknown>) => {
const blockId = block.id as string
const mergedSubBlocks = mergeSubBlockValues(
block.subBlocks as Record<string, unknown>,
subBlockValues?.[blockId]
)

return {
id: blockId,
workflowId,
type: block.type as string,
name: block.name as string,
positionX: (block.position as { x: number; y: number }).x,
positionY: (block.position as { x: number; y: number }).y,
data: (block.data as Record<string, unknown>) || {},
subBlocks: mergedSubBlocks,
outputs: (block.outputs as Record<string, unknown>) || {},
enabled: (block.enabled as boolean) ?? true,
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
advancedMode: (block.advancedMode as boolean) ?? false,
triggerMode: (block.triggerMode as boolean) ?? false,
height: (block.height as number) || 0,
}
})

await tx.insert(workflowBlocks).values(blockValues)

Expand Down
70 changes: 3 additions & 67 deletions apps/sim/stores/workflows/server-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
* or React hooks, making it safe for use in Next.js API routes.
*/

import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'
import { mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
import type { BlockState } from '@/stores/workflows/workflow/types'

/**
* Server-safe version of mergeSubblockState for API routes
Expand All @@ -26,72 +27,7 @@ export function mergeSubblockState(
subBlockValues: Record<string, Record<string, any>> = {},
blockId?: string
): Record<string, BlockState> {
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks

return Object.entries(blocksToProcess).reduce(
(acc, [id, block]) => {
// Skip if block is undefined
if (!block) {
return acc
}

// Initialize subBlocks if not present
const blockSubBlocks = block.subBlocks || {}

// Get stored values for this block
const blockValues = subBlockValues[id] || {}

// Create a deep copy of the block's subBlocks to maintain structure
const mergedSubBlocks = Object.entries(blockSubBlocks).reduce(
(subAcc, [subBlockId, subBlock]) => {
// Skip if subBlock is undefined
if (!subBlock) {
return subAcc
}

// Get the stored value for this subblock
const storedValue = blockValues[subBlockId]

// Create a new subblock object with the same structure but updated value
subAcc[subBlockId] = {
...subBlock,
value: storedValue !== undefined && storedValue !== null ? storedValue : subBlock.value,
}

return subAcc
},
{} as Record<string, SubBlockState>
)

// Return the full block state with updated subBlocks
acc[id] = {
...block,
subBlocks: mergedSubBlocks,
}

// Add any values that exist in the provided values but aren't in the block structure
// This handles cases where block config has been updated but values still exist
Object.entries(blockValues).forEach(([subBlockId, value]) => {
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
// Create a minimal subblock structure
mergedSubBlocks[subBlockId] = {
id: subBlockId,
type: 'short-input', // Default type that's safe to use
value: value,
}
}
})

// Update the block with the final merged subBlocks (including orphaned values)
acc[id] = {
...block,
subBlocks: mergedSubBlocks,
}

return acc
},
{} as Record<string, BlockState>
)
return mergeSubblockStateWithValues(blocks, subBlockValues, blockId)
}

/**
Expand Down
92 changes: 35 additions & 57 deletions apps/sim/stores/workflows/utils.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'

export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
return edgesToAdd.filter((edge) => {
if (edge.source === edge.target) return false
return !currentEdges.some(
(e) =>
e.source === edge.source &&
e.sourceHandle === edge.sourceHandle &&
e.target === edge.target &&
e.targetHandle === edge.targetHandle
)
})
}

import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
import { getBlock } from '@/blocks'
import { normalizeName } from '@/executor/constants'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
Expand All @@ -32,6 +19,19 @@ const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']

export { normalizeName }

export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
return edgesToAdd.filter((edge) => {
if (edge.source === edge.target) return false
return !currentEdges.some(
(e) =>
e.source === edge.source &&
e.sourceHandle === edge.sourceHandle &&
e.target === edge.target &&
e.targetHandle === edge.targetHandle
)
})
}

export interface RegeneratedState {
blocks: Record<string, BlockState>
edges: Edge[]
Expand Down Expand Up @@ -201,27 +201,20 @@ export function prepareDuplicateBlockState(options: PrepareDuplicateBlockStateOp
Object.entries(subBlockValues).filter(([key]) => !WEBHOOK_SUBBLOCK_FIELDS.includes(key))
)

const mergedSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
const baseSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
? JSON.parse(JSON.stringify(sourceBlock.subBlocks))
: {}

WEBHOOK_SUBBLOCK_FIELDS.forEach((field) => {
if (field in mergedSubBlocks) {
delete mergedSubBlocks[field]
if (field in baseSubBlocks) {
delete baseSubBlocks[field]
}
})

Object.entries(filteredSubBlockValues).forEach(([subblockId, value]) => {
if (mergedSubBlocks[subblockId]) {
mergedSubBlocks[subblockId].value = value as SubBlockState['value']
} else {
mergedSubBlocks[subblockId] = {
id: subblockId,
type: 'short-input',
value: value as SubBlockState['value'],
}
}
})
const mergedSubBlocks = mergeSubBlockValues(baseSubBlocks, filteredSubBlockValues) as Record<
string,
SubBlockState
>

const block: BlockState = {
id: newId,
Expand Down Expand Up @@ -256,11 +249,16 @@ export function mergeSubblockState(
workflowId?: string,
blockId?: string
): Record<string, BlockState> {
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
const subBlockStore = useSubBlockStore.getState()

const workflowSubblockValues = workflowId ? subBlockStore.workflowValues[workflowId] || {} : {}

if (workflowId) {
return mergeSubblockStateWithValues(blocks, workflowSubblockValues, blockId)
}

const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks

return Object.entries(blocksToProcess).reduce(
(acc, [id, block]) => {
if (!block) {
Expand Down Expand Up @@ -339,9 +337,15 @@ export async function mergeSubblockStateAsync(
workflowId?: string,
blockId?: string
): Promise<Record<string, BlockState>> {
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
const subBlockStore = useSubBlockStore.getState()

if (workflowId) {
const workflowValues = subBlockStore.workflowValues[workflowId] || {}
return mergeSubblockStateWithValues(blocks, workflowValues, blockId)
}

const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks

// Process blocks in parallel for better performance
const processedBlockEntries = await Promise.all(
Object.entries(blocksToProcess).map(async ([id, block]) => {
Expand All @@ -358,16 +362,7 @@ export async function mergeSubblockStateAsync(
return null
}

let storedValue = null

if (workflowId) {
const workflowValues = subBlockStore.workflowValues[workflowId]
if (workflowValues?.[id]) {
storedValue = workflowValues[id][subBlockId]
}
} else {
storedValue = subBlockStore.getValue(id, subBlockId)
}
const storedValue = subBlockStore.getValue(id, subBlockId)

return [
subBlockId,
Expand All @@ -386,23 +381,6 @@ export async function mergeSubblockStateAsync(
subBlockEntries.filter((entry): entry is readonly [string, SubBlockState] => entry !== null)
) as Record<string, SubBlockState>

// Add any values that exist in the store but aren't in the block structure
// This handles cases where block config has been updated but values still exist
// IMPORTANT: This includes runtime subblock IDs like webhookId, triggerPath, etc.
if (workflowId) {
const workflowValues = subBlockStore.workflowValues[workflowId]
const blockValues = workflowValues?.[id] || {}
Object.entries(blockValues).forEach(([subBlockId, value]) => {
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
mergedSubBlocks[subBlockId] = {
id: subBlockId,
type: 'short-input',
value: value as SubBlockState['value'],
}
}
})
}

// Return the full block state with updated subBlocks (including orphaned values)
return [
id,
Expand Down
Loading