diff --git a/databricks-builder-app/.env.example b/databricks-builder-app/.env.example index 43ccf211..c0d9acd5 100644 --- a/databricks-builder-app/.env.example +++ b/databricks-builder-app/.env.example @@ -1,9 +1,18 @@ -# Copy this file to .env.local and fill in your values +# Databricks Builder App - Environment Configuration +# ================================================== +# +# For LOCAL DEVELOPMENT: Copy this file to .env.local and fill in your values +# For DEPLOYMENT: Copy app.yaml.example to app.yaml and configure there +# +# This file is for local development only. When deploying to Databricks Apps, +# use app.yaml for environment configuration instead. # ============================================================================= -# Databricks Configuration +# Databricks Configuration (Local Development) # ============================================================================= -# Workspace URL and personal access token for local development +# Workspace URL and personal access token +# In production (Databricks Apps), authentication is handled automatically via +# the service principal's OAuth credentials - no token needed. DATABRICKS_HOST=https://your-workspace.cloud.databricks.com DATABRICKS_TOKEN=dapi... @@ -66,3 +75,41 @@ CLAUDE_CODE_STREAM_CLOSE_TIMEOUT=3600000 # Anthropic API key (optional - uses Databricks model serving by default) # ANTHROPIC_API_KEY=sk-ant-... + +# ============================================================================= +# DEPLOYMENT TO DATABRICKS APPS +# ============================================================================= +# +# To deploy this app to Databricks Apps: +# +# 1. Prerequisites: +# - Databricks CLI installed and configured (databricks auth login) +# - A Lakebase instance created in your workspace +# +# 2. Create the app: +# databricks apps create +# +# 3. Configure app.yaml: +# cp app.yaml.example app.yaml +# # Edit app.yaml with your Lakebase instance name and LLM settings +# +# 4. Add Lakebase as an app resource: +# databricks apps add-resource \ +# --resource-type database \ +# --resource-name lakebase \ +# --database-instance +# +# 5. Grant table permissions to the app's service principal: +# (Run this SQL in a notebook or SQL editor after first deployment) +# +# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public +# TO ``; +# GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public +# TO ``; +# ALTER DEFAULT PRIVILEGES IN SCHEMA public +# GRANT ALL ON TABLES TO ``; +# +# 6. Deploy: +# ./scripts/deploy.sh +# +# For more details, see the README.md diff --git a/databricks-builder-app/.gitignore b/databricks-builder-app/.gitignore index 00b1dd9e..9ed2c52d 100644 --- a/databricks-builder-app/.gitignore +++ b/databricks-builder-app/.gitignore @@ -4,6 +4,9 @@ skills/ # Projects directory (local development) projects/ +# Packages (copied from sibling dirs during deployment) +packages/ + # Conda environment (for Databricks Apps deployment) conda-env/ @@ -15,6 +18,7 @@ __pycache__/ # Environment .env.local +app.yaml # Node (client) client/node_modules/ diff --git a/databricks-builder-app/app.yaml b/databricks-builder-app/app.yaml deleted file mode 100644 index 183c23fc..00000000 --- a/databricks-builder-app/app.yaml +++ /dev/null @@ -1,41 +0,0 @@ -command: - - "uvicorn" - - "server.app:app" - - "--host" - - "0.0.0.0" - - "--port" - - "8080" - -env: - - name: ENV - value: "production" - - name: PROJECTS_BASE_DIR - value: "./projects" - - name: ENABLED_SKILLS - value: "databricks-python-sdk,spark-declarative-pipelines,synthetic-data-generation" - - # Database configuration (choose one approach): - # - # Option 1: Dynamic OAuth tokens (recommended for production) - # Uses Databricks SDK to automatically refresh tokens every 50 minutes - # - name: LAKEBASE_INSTANCE_NAME - # value: "my-lakebase-instance" - # - name: LAKEBASE_DATABASE_NAME - # value: "my_database" - # - # Option 2: Static URL with password (for local development) - # - name: LAKEBASE_PG_URL - # valueFrom: - # secretKeyRef: - # name: lakebase-pg-url - # key: url - # - # Optional database pool settings: - # - name: DB_POOL_SIZE - # value: "5" - # - name: DB_MAX_OVERFLOW - # value: "10" - # - name: DB_POOL_TIMEOUT - # value: "10" - # - name: DB_POOL_RECYCLE_INTERVAL - # value: "3600" diff --git a/databricks-builder-app/app.yaml.example b/databricks-builder-app/app.yaml.example new file mode 100644 index 00000000..1ca3262e --- /dev/null +++ b/databricks-builder-app/app.yaml.example @@ -0,0 +1,99 @@ +# Databricks Apps configuration for Builder App +# Copy this file to app.yaml and customize for your deployment +# +# Prerequisites: +# 1. Create the app: databricks apps create +# 2. Add Lakebase as a resource (see instructions below) +# 3. Configure your LLM provider settings + +command: + - "uvicorn" + - "server.app:app" + - "--host" + - "0.0.0.0" + - "--port" + - "$DATABRICKS_APP_PORT" + +env: + # ============================================================================= + # Application Settings + # ============================================================================= + - name: ENV + value: "production" + - name: PROJECTS_BASE_DIR + value: "./projects" + - name: PYTHONPATH + value: "/app/python/source_code/packages" + + # ============================================================================= + # Skills Configuration + # ============================================================================= + # Comma-separated list of skills to enable + - name: ENABLED_SKILLS + value: "agent-bricks,aibi-dashboards,asset-bundles,databricks-app-apx,databricks-app-python,databricks-config,databricks-docs,databricks-jobs,databricks-python-sdk,databricks-unity-catalog,mlflow-evaluation,spark-declarative-pipelines,synthetic-data-generation,unstructured-pdf-generation" + - name: SKILLS_ONLY_MODE + value: "false" + + # ============================================================================= + # Database Configuration (Lakebase) + # ============================================================================= + # IMPORTANT: You must add Lakebase as an app resource for database connectivity. + # + # Steps: + # 1. Create a Lakebase instance in your workspace (if not exists) + # 2. Add it as an app resource: + # databricks apps add-resource \ + # --resource-type database \ + # --resource-name lakebase \ + # --database-instance + # + # When added as a resource, Databricks automatically sets: + # - PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE + # + # You only need to specify the instance name for OAuth token generation: + - name: LAKEBASE_INSTANCE_NAME + value: "" + - name: LAKEBASE_DATABASE_NAME + value: "databricks_postgres" + + # ============================================================================= + # LLM Provider Configuration + # ============================================================================= + # Option 1: Databricks Foundation Models (default) + - name: LLM_PROVIDER + value: "DATABRICKS" + - name: DATABRICKS_MODEL + value: "databricks-meta-llama-3-3-70b-instruct" + - name: DATABRICKS_MODEL_MINI + value: "databricks-gemini-3-flash" + + # Option 2: Anthropic Claude (uncomment and add your key) + # - name: ANTHROPIC_API_KEY + # value: "" + + # Option 3: Azure OpenAI (uncomment and configure) + # - name: LLM_PROVIDER + # value: "AZURE" + # - name: AZURE_OPENAI_API_KEY + # value: "" + # - name: AZURE_OPENAI_ENDPOINT + # value: "https://.cognitiveservices.azure.com/" + # - name: AZURE_OPENAI_API_VERSION + # value: "2024-08-01-preview" + # - name: AZURE_OPENAI_DEPLOYMENT + # value: "gpt-4o" + # - name: AZURE_OPENAI_DEPLOYMENT_MINI + # value: "gpt-4o-mini" + + # ============================================================================= + # Claude SDK Configuration + # ============================================================================= + - name: CLAUDE_CODE_STREAM_CLOSE_TIMEOUT + value: "3600000" + + # ============================================================================= + # Permission Configuration + # ============================================================================= + # Grant created resources to this principal (e.g., "account users" for all) + - name: AUTO_GRANT_PERMISSIONS_TO + value: "account users" diff --git a/databricks-builder-app/client/src/components/FunLoader.tsx b/databricks-builder-app/client/src/components/FunLoader.tsx new file mode 100644 index 00000000..e450f766 --- /dev/null +++ b/databricks-builder-app/client/src/components/FunLoader.tsx @@ -0,0 +1,98 @@ +import { useEffect, useState } from 'react'; +import { Loader2 } from 'lucide-react'; +import { cn } from '@/lib/utils'; + +// Fun loading messages like Claude Code uses +const FUN_MESSAGES = [ + 'Thinking...', + 'Pondering...', + 'Contemplating...', + 'Ruminating...', + 'Cogitating...', + 'Deliberating...', + 'Musing...', + 'Reflecting...', + 'Analyzing...', + 'Processing...', + 'Computing...', + 'Synthesizing...', + 'Formulating...', + 'Architecting...', + 'Strategizing...', + 'Investigating...', + 'Researching...', + 'Exploring...', + 'Brainstorming...', + 'Ideating...', +]; + +interface TodoItem { + content: string; + status: 'pending' | 'in_progress' | 'completed'; +} + +interface FunLoaderProps { + todos?: TodoItem[]; + className?: string; +} + +export function FunLoader({ todos = [], className }: FunLoaderProps) { + const [messageIndex, setMessageIndex] = useState(() => + Math.floor(Math.random() * FUN_MESSAGES.length) + ); + + // Rotate messages every 2.5 seconds + useEffect(() => { + const interval = setInterval(() => { + setMessageIndex((prev) => (prev + 1) % FUN_MESSAGES.length); + }, 2500); + return () => clearInterval(interval); + }, []); + + // Calculate progress + const completedCount = todos.filter((t) => t.status === 'completed').length; + const totalCount = todos.length; + const progress = totalCount > 0 ? (completedCount / totalCount) * 100 : 0; + const currentTodo = todos.find((t) => t.status === 'in_progress'); + + return ( +
+ {/* Main loader with rotating message */} +
+ + + {FUN_MESSAGES[messageIndex]} + +
+ + {/* Progress section - only show if there are todos */} + {totalCount > 0 && ( +
+ {/* Progress bar */} +
+
+
+ + {/* Progress text */} +
+ + {completedCount} of {totalCount} tasks + + {Math.round(progress)}% +
+ + {/* Current task indicator */} + {currentTodo && ( +
+
+ {currentTodo.content} +
+ )} +
+ )} +
+ ); +} diff --git a/databricks-builder-app/client/src/pages/ProjectPage.tsx b/databricks-builder-app/client/src/pages/ProjectPage.tsx index 43747d75..a96bf623 100644 --- a/databricks-builder-app/client/src/pages/ProjectPage.tsx +++ b/databricks-builder-app/client/src/pages/ProjectPage.tsx @@ -2,15 +2,12 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { useNavigate, useParams } from 'react-router-dom'; import { useUser } from '@/contexts/UserContext'; import { - Brain, ChevronDown, - ChevronRight, ExternalLink, Loader2, MessageSquare, Send, Square, - Terminal, Wrench, } from 'lucide-react'; import { toast } from 'sonner'; @@ -19,6 +16,7 @@ import remarkGfm from 'remark-gfm'; import { MainLayout } from '@/components/layout/MainLayout'; import { Sidebar } from '@/components/layout/Sidebar'; import { SkillsExplorer } from '@/components/SkillsExplorer'; +import { FunLoader } from '@/components/FunLoader'; import { Button } from '@/components/ui/Button'; import { createConversation, @@ -30,7 +28,7 @@ import { fetchWarehouses, invokeAgent, } from '@/lib/api'; -import type { Cluster, Conversation, Message, Project, Warehouse } from '@/lib/types'; +import type { Cluster, Conversation, Message, Project, Warehouse, TodoItem } from '@/lib/types'; import { cn } from '@/lib/utils'; // Combined activity item for display @@ -44,92 +42,26 @@ interface ActivityItem { timestamp: number; } -// Collapsible activity section component +// Minimal activity indicator - shows only current tool being executed function ActivitySection({ items, - isStreaming, }: { items: ActivityItem[]; isStreaming: boolean; }) { - const [isExpanded, setIsExpanded] = useState(true); - if (items.length === 0) return null; + // Get the most recent tool_use item (current activity) + const currentTool = [...items].reverse().find((item) => item.type === 'tool_use'); + + if (!currentTool) return null; + return ( -
- - - {isExpanded && ( -
- {items.map((item) => ( -
-
- {item.type === 'thinking' && ( - <> - - Thinking - - )} - {item.type === 'tool_use' && ( - <> - - - {item.toolName} - - - )} - {item.type === 'tool_result' && ( - <> - - - Result {item.isError && '(error)'} - - - )} -
-
- {item.type === 'tool_use' && item.toolInput ? ( - - {JSON.stringify(item.toolInput, null, 2).slice(0, 500)} - {JSON.stringify(item.toolInput).length > 500 && '...'} - - ) : ( - - {item.content.slice(0, 300)} - {item.content.length > 300 && '...'} - - )} -
-
- ))} -
- )} +
+ + + Using {currentTool.toolName?.replace('mcp__databricks__', '')}... +
); } @@ -164,6 +96,7 @@ export default function ProjectPage() { const [isStreaming, setIsStreaming] = useState(false); const [streamingText, setStreamingText] = useState(''); const [activityItems, setActivityItems] = useState([]); + const [todos, setTodos] = useState([]); const [clusters, setClusters] = useState([]); const [selectedClusterId, setSelectedClusterId] = useState(); const [clusterDropdownOpen, setClusterDropdownOpen] = useState(false); @@ -364,6 +297,7 @@ export default function ProjectPage() { setIsStreaming(true); setStreamingText(''); setActivityItems([]); + setTodos([]); // Add user message to UI immediately const tempUserMessage: Message = { @@ -413,17 +347,32 @@ export default function ProjectPage() { setStreamingText(fullText); } // If we already have fullText from deltas, ignore this to avoid duplication - } else if (type === 'thinking') { - const thinking = event.thinking as string; - setActivityItems((prev) => [ - ...prev, - { - id: `thinking-${Date.now()}`, - type: 'thinking', - content: thinking, - timestamp: Date.now(), - }, - ]); + } else if (type === 'thinking' || type === 'thinking_delta') { + // Handle both complete thinking blocks and streaming thinking deltas + const thinking = (event.thinking as string) || ''; + if (thinking) { + setActivityItems((prev) => { + // For deltas, append to the last thinking item if it exists + if (type === 'thinking_delta' && prev.length > 0 && prev[prev.length - 1].type === 'thinking') { + const updated = [...prev]; + updated[updated.length - 1] = { + ...updated[updated.length - 1], + content: updated[updated.length - 1].content + thinking, + }; + return updated; + } + // For complete blocks or first delta, add new item + return [ + ...prev, + { + id: `thinking-${Date.now()}`, + type: 'thinking', + content: thinking, + timestamp: Date.now(), + }, + ]; + }); + } } else if (type === 'tool_use') { setActivityItems((prev) => [ ...prev, @@ -477,6 +426,12 @@ export default function ProjectPage() { } else if (type === 'cancelled') { // Agent was cancelled by user - show a toast notification toast.info('Generation stopped'); + } else if (type === 'todos') { + // Update todo list from agent + const todoItems = event.todos as TodoItem[]; + if (todoItems) { + setTodos(todoItems); + } } }, onError: (error) => { @@ -501,6 +456,9 @@ export default function ProjectPage() { } setStreamingText(''); setIsStreaming(false); + // Clear activity items after response is finalized - only show final answer + setActivityItems([]); + setTodos([]); if (conversationId && !currentConversation?.id) { const conv = await fetchConversation(projectId, conversationId); @@ -557,13 +515,12 @@ export default function ProjectPage() { return (
- {/* Chat Header */} - {currentConversation && ( -
-

- {currentConversation.title} -

-
+ {/* Chat Header - always show configuration controls */} +
+

+ {currentConversation?.title || 'New Chat'} +

+
{/* Catalog.Schema Input */}
@@ -731,22 +688,53 @@ export default function ProjectPage() { title="Workspace working folder for uploading files and pipelines" />
-
- )} +
{/* Messages */}
{messages.length === 0 && !streamingText ? (
-
+

- Start a conversation + What can I help you build?

- Ask Claude to help you with code in this project + I can help you build data pipelines, generate synthetic data, create dashboards, and more on Databricks.

+ + {/* Example prompts */} +
+ + + + +
) : ( @@ -770,7 +758,21 @@ export default function ProjectPage() { > {message.role === 'assistant' ? (
- + ( + + {children} + + ), + }} + > {message.content}
@@ -781,30 +783,15 @@ export default function ProjectPage() {
))} - {/* Activity section (thinking, tools) */} - {(activityItems.length > 0 || isStreaming) && ( - - )} - - {/* Streaming response */} - {streamingText && ( -
-
-
- - {streamingText} - -
-
-
+ {/* Activity section (thinking, tools) - shown above the loader */} + {activityItems.length > 0 && ( + )} - {/* Loading indicator */} - {isStreaming && !streamingText && activityItems.length === 0 && ( + {/* Fun loader with progress - shown while streaming (hides stream of consciousness) */} + {isStreaming && (
-
- -
+
)} diff --git a/databricks-builder-app/client/src/styles/globals.css b/databricks-builder-app/client/src/styles/globals.css index 5a303243..bb6326af 100644 --- a/databricks-builder-app/client/src/styles/globals.css +++ b/databricks-builder-app/client/src/styles/globals.css @@ -171,3 +171,13 @@ padding: 0; background: transparent; } + +.prose-xs a { + color: var(--color-accent-primary); + text-decoration: underline; + cursor: pointer; +} + +.prose-xs a:hover { + color: var(--color-accent-secondary); +} diff --git a/databricks-builder-app/client/src/vite-env.d.ts b/databricks-builder-app/client/src/vite-env.d.ts new file mode 100644 index 00000000..11f02fe2 --- /dev/null +++ b/databricks-builder-app/client/src/vite-env.d.ts @@ -0,0 +1 @@ +/// diff --git a/databricks-builder-app/pyproject.toml b/databricks-builder-app/pyproject.toml index 82a1bbf1..18cd0451 100644 --- a/databricks-builder-app/pyproject.toml +++ b/databricks-builder-app/pyproject.toml @@ -3,8 +3,8 @@ requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [tool.setuptools.packages.find] -where = ["."] -include = ["server*"] +where = [".", "packages"] +include = ["server*", "databricks_tools_core*", "databricks_mcp_server*"] exclude = ["client*", "tests*", "scripts*"] [project] @@ -28,11 +28,16 @@ dependencies = [ "psycopg2-binary>=2.9.11", # For alembic migrations (sync) # Claude Agent SDK (successor to claude-code-sdk) "claude-agent-sdk>=0.1.19", + "anthropic>=0.42.0", # Databricks MCP tools (databricks-mcp-server installed from sibling dir in dev) "mcp>=1.0.0", "fastmcp>=0.1.0", - "databricks-tools-core", - "databricks-mcp-server", + # databricks-tools-core and databricks-mcp-server are bundled in packages/ directory + "requests>=2.31.0", + "sqlglot>=20.0.0", + "sqlfluff>=3.0.0", + "litellm>=1.0.0", + "pymupdf>=1.24.0", # Conflict resolution pins for Databricks Apps pre-installed packages "tenacity==9.0.0", "pillow==11.1.0", diff --git a/databricks-builder-app/requirements.txt b/databricks-builder-app/requirements.txt new file mode 100644 index 00000000..21954e22 --- /dev/null +++ b/databricks-builder-app/requirements.txt @@ -0,0 +1,43 @@ +# FastAPI and server +fastapi[standard]>=0.115.8 +uvicorn>=0.34.0 +httpx>=0.28.0 +pydantic>=2.10.0 +python-dotenv>=1.0.1 + +# Databricks SDK +databricks-sdk>=0.81.0 + +# Database +sqlalchemy[asyncio]>=2.0.41 +alembic>=1.16.1 +psycopg[binary]>=3.2.0 +greenlet>=3.0.0 +psycopg2-binary>=2.9.11 + +# Claude Agent SDK +claude-agent-sdk>=0.1.19 +anthropic>=0.42.0 + +# MCP +mcp>=1.0.0 +fastmcp>=0.1.0 + +# Dependencies for databricks-tools-core +requests>=2.31.0 +sqlglot>=20.0.0 +sqlfluff>=3.0.0 +litellm>=1.0.0 +pymupdf>=1.24.0 + +# Conflict resolution pins for Databricks Apps pre-installed packages +tenacity==9.0.0 +pillow==11.1.0 +websockets==15.0 +pyarrow==18.1.0 +markupsafe==3.0.2 +rich==14.0.0 +protobuf>=3.12.0,<5 +pandas==2.3.0 +flask==3.1.0 +werkzeug==3.1.3 diff --git a/databricks-builder-app/scripts/deploy.sh b/databricks-builder-app/scripts/deploy.sh new file mode 100755 index 00000000..d0cdd4ed --- /dev/null +++ b/databricks-builder-app/scripts/deploy.sh @@ -0,0 +1,259 @@ +#!/bin/bash +# Deploy script for Databricks Builder App +# Deploys the app to Databricks Apps platform + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Script directories +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +REPO_ROOT="$(dirname "$PROJECT_DIR")" + +# Default values +APP_NAME="${APP_NAME:-}" +WORKSPACE_PATH="" +STAGING_DIR="" +SKIP_BUILD="${SKIP_BUILD:-false}" + +# Usage information +usage() { + echo "Usage: $0 [options]" + echo "" + echo "Deploy the Databricks Builder App to Databricks Apps platform." + echo "" + echo "Arguments:" + echo " app-name Name of the Databricks App (required)" + echo "" + echo "Options:" + echo " --skip-build Skip frontend build (use existing build)" + echo " --staging-dir DIR Custom staging directory (default: /tmp/-deploy)" + echo " -h, --help Show this help message" + echo "" + echo "Prerequisites:" + echo " 1. Databricks CLI configured (databricks auth login)" + echo " 2. App created in Databricks (databricks apps create )" + echo " 3. Lakebase added as app resource (for database)" + echo " 4. app.yaml configured with your settings" + echo "" + echo "Example:" + echo " $0 my-builder-app" + echo " APP_NAME=my-builder-app $0" + echo " $0 my-builder-app --skip-build" +} + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + usage + exit 0 + ;; + --skip-build) + SKIP_BUILD=true + shift + ;; + --staging-dir) + STAGING_DIR="$2" + shift 2 + ;; + -*) + echo -e "${RED}Error: Unknown option $1${NC}" + usage + exit 1 + ;; + *) + if [ -z "$APP_NAME" ]; then + APP_NAME="$1" + else + echo -e "${RED}Error: Unexpected argument $1${NC}" + usage + exit 1 + fi + shift + ;; + esac +done + +# Validate app name +if [ -z "$APP_NAME" ]; then + echo -e "${RED}Error: App name is required${NC}" + echo "" + usage + exit 1 +fi + +# Set derived paths +STAGING_DIR="${STAGING_DIR:-/tmp/${APP_NAME}-deploy}" + +echo -e "${BLUE}╔════════════════════════════════════════════════════════════╗${NC}" +echo -e "${BLUE}║ Databricks Builder App Deployment ║${NC}" +echo -e "${BLUE}╚════════════════════════════════════════════════════════════╝${NC}" +echo "" +echo -e " App Name: ${GREEN}${APP_NAME}${NC}" +echo -e " Staging Dir: ${STAGING_DIR}" +echo -e " Skip Build: ${SKIP_BUILD}" +echo "" + +# Check prerequisites +echo -e "${YELLOW}[1/6] Checking prerequisites...${NC}" + +# Check Databricks CLI +if ! command -v databricks &> /dev/null; then + echo -e "${RED}Error: Databricks CLI not found. Install with: pip install databricks-cli${NC}" + exit 1 +fi + +# Check if authenticated +if ! databricks auth describe &> /dev/null; then + echo -e "${RED}Error: Not authenticated with Databricks. Run: databricks auth login${NC}" + exit 1 +fi + +# Get workspace info +WORKSPACE_HOST=$(databricks auth describe --output json 2>/dev/null | python3 -c "import sys, json; print(json.load(sys.stdin).get('host', ''))" 2>/dev/null || echo "") +if [ -z "$WORKSPACE_HOST" ]; then + echo -e "${RED}Error: Could not determine Databricks workspace. Check your authentication.${NC}" + exit 1 +fi + +# Get current user for workspace path +CURRENT_USER=$(databricks current-user me --output json 2>/dev/null | python3 -c "import sys, json; print(json.load(sys.stdin).get('userName', ''))" 2>/dev/null || echo "") +if [ -z "$CURRENT_USER" ]; then + echo -e "${RED}Error: Could not determine current user.${NC}" + exit 1 +fi + +WORKSPACE_PATH="/Workspace/Users/${CURRENT_USER}/apps/${APP_NAME}" +echo -e " Workspace: ${WORKSPACE_HOST}" +echo -e " User: ${CURRENT_USER}" +echo -e " Deploy Path: ${WORKSPACE_PATH}" +echo "" + +# Check if app exists +echo -e "${YELLOW}[2/6] Verifying app exists...${NC}" +if ! databricks apps get "$APP_NAME" &> /dev/null; then + echo -e "${RED}Error: App '${APP_NAME}' does not exist.${NC}" + echo -e "Create it first with: ${GREEN}databricks apps create ${APP_NAME}${NC}" + exit 1 +fi +echo -e " ${GREEN}✓${NC} App '${APP_NAME}' exists" +echo "" + +# Build frontend +echo -e "${YELLOW}[3/6] Building frontend...${NC}" +cd "$PROJECT_DIR/client" + +if [ "$SKIP_BUILD" = true ]; then + if [ ! -d "out" ]; then + echo -e "${RED}Error: No existing build found at client/out. Cannot skip build.${NC}" + exit 1 + fi + echo -e " ${GREEN}✓${NC} Using existing build (--skip-build)" +else + # Install dependencies if needed + if [ ! -d "node_modules" ]; then + echo " Installing npm dependencies..." + npm install --silent + fi + + echo " Building production bundle..." + npm run build + echo -e " ${GREEN}✓${NC} Frontend built successfully" +fi +cd "$PROJECT_DIR" +echo "" + +# Prepare staging directory +echo -e "${YELLOW}[4/6] Preparing deployment package...${NC}" + +# Clean and create staging directory +rm -rf "$STAGING_DIR" +mkdir -p "$STAGING_DIR" + +# Copy server code +echo " Copying server code..." +cp -r server "$STAGING_DIR/" +cp app.yaml "$STAGING_DIR/" +cp requirements.txt "$STAGING_DIR/" + +# Copy frontend build +echo " Copying frontend build..." +cp -r client/out "$STAGING_DIR/client/" + +# Copy packages (databricks-tools-core and databricks-mcp-server) +echo " Copying Databricks packages..." +mkdir -p "$STAGING_DIR/packages" + +# Copy databricks-tools-core (only Python source, no tests) +mkdir -p "$STAGING_DIR/packages/databricks_tools_core" +cp -r "$REPO_ROOT/databricks-tools-core/databricks_tools_core/"* "$STAGING_DIR/packages/databricks_tools_core/" + +# Copy databricks-mcp-server (only Python source) +mkdir -p "$STAGING_DIR/packages/databricks_mcp_server" +cp -r "$REPO_ROOT/databricks-mcp-server/databricks_mcp_server/"* "$STAGING_DIR/packages/databricks_mcp_server/" + +# Copy skills +echo " Copying skills..." +mkdir -p "$STAGING_DIR/skills" +SKILLS_DIR="$REPO_ROOT/databricks-skills" +if [ -d "$SKILLS_DIR" ]; then + for skill_dir in "$SKILLS_DIR"/*/; do + skill_name=$(basename "$skill_dir") + # Skip template and non-skill directories + if [ "$skill_name" != "TEMPLATE" ] && [ -f "$skill_dir/SKILL.md" ]; then + cp -r "$skill_dir" "$STAGING_DIR/skills/" + fi + done +fi + +# Remove __pycache__ directories +find "$STAGING_DIR" -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true +find "$STAGING_DIR" -type f -name "*.pyc" -delete 2>/dev/null || true + +echo -e " ${GREEN}✓${NC} Deployment package prepared" +echo "" + +# Upload to workspace +echo -e "${YELLOW}[5/6] Uploading to Databricks workspace...${NC}" +echo " Target: ${WORKSPACE_PATH}" +databricks workspace import-dir "$STAGING_DIR" "$WORKSPACE_PATH" --overwrite 2>&1 | tail -5 +echo -e " ${GREEN}✓${NC} Upload complete" +echo "" + +# Deploy the app +echo -e "${YELLOW}[6/6] Deploying app...${NC}" +DEPLOY_OUTPUT=$(databricks apps deploy "$APP_NAME" --source-code-path "$WORKSPACE_PATH" 2>&1) +echo "$DEPLOY_OUTPUT" + +# Check deployment status +if echo "$DEPLOY_OUTPUT" | grep -q '"state":"SUCCEEDED"'; then + echo "" + echo -e "${GREEN}╔════════════════════════════════════════════════════════════╗${NC}" + echo -e "${GREEN}║ Deployment Successful! ║${NC}" + echo -e "${GREEN}╚════════════════════════════════════════════════════════════╝${NC}" + echo "" + + # Get app URL + APP_INFO=$(databricks apps get "$APP_NAME" --output json 2>/dev/null) + APP_URL=$(echo "$APP_INFO" | python3 -c "import sys, json; print(json.load(sys.stdin).get('url', 'N/A'))" 2>/dev/null || echo "N/A") + + echo -e " App URL: ${GREEN}${APP_URL}${NC}" + echo "" + echo " Next steps:" + echo " 1. Open the app URL in your browser" + echo " 2. If this is first deployment, add Lakebase as an app resource:" + echo " databricks apps add-resource $APP_NAME --resource-type database \\" + echo " --resource-name lakebase --database-instance " + echo "" +else + echo "" + echo -e "${RED}Deployment may have issues. Check the output above.${NC}" + exit 1 +fi diff --git a/databricks-builder-app/server/db/__init__.py b/databricks-builder-app/server/db/__init__.py index 1ef893b1..b161714e 100644 --- a/databricks-builder-app/server/db/__init__.py +++ b/databricks-builder-app/server/db/__init__.py @@ -15,11 +15,12 @@ stop_token_refresh, test_database_connection, ) -from .models import Base, Conversation, Message, Project +from .models import Base, Conversation, Execution, Message, Project __all__ = [ 'Base', 'Conversation', + 'Execution', 'Message', 'Project', 'create_tables', diff --git a/databricks-builder-app/server/db/database.py b/databricks-builder-app/server/db/database.py index 02824e95..9bc1247c 100644 --- a/databricks-builder-app/server/db/database.py +++ b/databricks-builder-app/server/db/database.py @@ -333,8 +333,8 @@ def init_database(database_url: Optional[str] = None) -> AsyncEngine: f"Failed to generate initial Lakebase token for instance: {instance_name}" ) - # Get username (current user's email) - username = _get_current_user_email() or instance_name + # Get username (prefer explicit env var for Databricks Apps where service principal is used) + username = os.environ.get("LAKEBASE_USERNAME") or _get_current_user_email() or instance_name # Resolve hostname for DNS workaround (macOS Python DNS issues with long hostnames) global _resolved_hostaddr diff --git a/databricks-builder-app/server/db/models.py b/databricks-builder-app/server/db/models.py index 1a51b767..46ff02f8 100644 --- a/databricks-builder-app/server/db/models.py +++ b/databricks-builder-app/server/db/models.py @@ -167,3 +167,53 @@ class ProjectBackup(Base): updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=utc_now, onupdate=utc_now, nullable=False ) + + +class Execution(Base): + """Stores execution state for session independence. + + Allows users to reconnect to running/completed executions after + navigating away or refreshing the page. + """ + + __tablename__ = 'executions' + + id: Mapped[str] = mapped_column(String(50), primary_key=True, default=generate_uuid) + conversation_id: Mapped[str] = mapped_column( + String(50), ForeignKey('conversations.id', ondelete='CASCADE'), nullable=False + ) + project_id: Mapped[str] = mapped_column( + String(50), ForeignKey('projects.id', ondelete='CASCADE'), nullable=False + ) + status: Mapped[str] = mapped_column( + String(20), nullable=False, default='running' + ) # running, completed, cancelled, error + events_json: Mapped[str] = mapped_column( + Text, nullable=False, default='[]' + ) # JSON array of events + error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utc_now, nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utc_now, onupdate=utc_now, nullable=False + ) + + __table_args__ = ( + Index('ix_executions_conversation_status', 'conversation_id', 'status'), + Index('ix_executions_conversation_created', 'conversation_id', 'created_at'), + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + import json + return { + 'id': self.id, + 'conversation_id': self.conversation_id, + 'project_id': self.project_id, + 'status': self.status, + 'events': json.loads(self.events_json) if self.events_json else [], + 'error': self.error, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/databricks-builder-app/server/routers/agent.py b/databricks-builder-app/server/routers/agent.py index cb00e073..f12b65c5 100644 --- a/databricks-builder-app/server/routers/agent.py +++ b/databricks-builder-app/server/routers/agent.py @@ -22,6 +22,7 @@ from ..services.agent import get_project_directory, stream_agent_response from ..services.backup_manager import mark_for_backup from ..services.storage import ConversationStorage, ProjectStorage +from ..services.title_generator import generate_title_async from ..services.user import get_current_user, get_current_token, get_workspace_url logger = logging.getLogger(__name__) @@ -101,11 +102,23 @@ async def invoke_agent(request: Request, body: InvokeAgentRequest): conversation_id = body.conversation_id if not conversation_id: - # Create new conversation with auto-title from message - title = body.message[:50] + ('...' if len(body.message) > 50 else '') - conversation = await conv_storage.create(title=title) + # Create new conversation with temporary title (will be updated by AI) + temp_title = body.message[:40].strip() + if len(body.message) > 40: + temp_title = temp_title.rsplit(' ', 1)[0] + '...' + conversation = await conv_storage.create(title=temp_title) conversation_id = conversation.id logger.info(f'Created new conversation: {conversation_id}') + + # Generate AI title in the background (fire-and-forget) + asyncio.create_task( + generate_title_async( + message=body.message, + conversation_id=conversation_id, + user_email=user_email, + project_id=body.project_id, + ) + ) else: # Verify conversation exists and get session_id for resumption conversation = await conv_storage.get(conversation_id) @@ -116,11 +129,12 @@ async def invoke_agent(request: Request, body: InvokeAgentRequest): # Get session_id from conversation for resumption session_id = conversation.session_id if conversation else None - # Create active stream + # Create active stream with user_email for persistence stream_manager = get_stream_manager() stream = await stream_manager.create_stream( project_id=body.project_id, conversation_id=conversation_id, + user_email=user_email, ) # Emit conversation_id as first event @@ -180,13 +194,23 @@ async def run_agent(): }) elif event_type == 'tool_use': + tool_name = event.get('tool_name', '') + tool_input = event.get('tool_input', {}) + stream.add_event({ 'type': 'tool_use', 'tool_id': event.get('tool_id', ''), - 'tool_name': event.get('tool_name', ''), - 'tool_input': event.get('tool_input', {}), + 'tool_name': tool_name, + 'tool_input': tool_input, }) + # Emit dedicated todos event when TodoWrite is called + if tool_name == 'TodoWrite' and 'todos' in tool_input: + stream.add_event({ + 'type': 'todos', + 'todos': tool_input['todos'], + }) + elif event_type == 'tool_result': content = event.get('content', '') is_error = event.get('is_error', False) @@ -473,3 +497,68 @@ async def list_project_files(request: Request, project_id: str): ) return {'project_id': project_id, 'files': files} + + +@router.get('/projects/{project_id}/conversations/{conversation_id}/executions') +async def get_conversation_executions( + request: Request, + project_id: str, + conversation_id: str, +): + """Get active and recent executions for a conversation. + + Returns the current active execution (if any) and recent completed ones. + This enables session independence - users can reconnect after navigating away. + """ + from ..services.storage import ExecutionStorage + + user_email = await get_current_user(request) + + # Verify project exists and belongs to user + project_storage = ProjectStorage(user_email) + project = await project_storage.get(project_id) + if not project: + raise HTTPException( + status_code=404, + detail=f'Project {project_id} not found' + ) + + # Get executions from database + exec_storage = ExecutionStorage(user_email, project_id, conversation_id) + + # Get active execution (if any) + active = await exec_storage.get_active() + + # Get recent executions + recent = await exec_storage.get_recent(limit=5) + + # Also check in-memory streams for this conversation + stream_manager = get_stream_manager() + in_memory_active = None + async with stream_manager._lock: + for stream in stream_manager._streams.values(): + if ( + stream.conversation_id == conversation_id + and not stream.is_complete + and not stream.is_cancelled + ): + in_memory_active = { + 'id': stream.execution_id, + 'conversation_id': stream.conversation_id, + 'project_id': stream.project_id, + 'status': 'running', + 'events': [e.data for e in stream.events], + 'error': stream.error, + 'created_at': None, + } + break + + return { + 'active': ( + in_memory_active + or (active.to_dict() if active else None) + ), + 'recent': [e.to_dict() for e in recent if e.id != ( + active.id if active else None + )], + } diff --git a/databricks-builder-app/server/services/active_stream.py b/databricks-builder-app/server/services/active_stream.py index 99f24697..f00e11ac 100644 --- a/databricks-builder-app/server/services/active_stream.py +++ b/databricks-builder-app/server/services/active_stream.py @@ -2,6 +2,9 @@ Handles background execution of Claude agent with event accumulation and cursor-based pagination for polling. + +Events are persisted to the database for session independence, +allowing users to reconnect after navigating away. """ import asyncio @@ -9,10 +12,15 @@ import time import uuid from dataclasses import dataclass, field -from typing import Any, Callable, Coroutine +from typing import Any, Callable, Coroutine, Optional logger = logging.getLogger(__name__) +# Batch size for persisting events to database +EVENT_PERSIST_BATCH_SIZE = 10 +# Maximum time between database syncs (seconds) +EVENT_PERSIST_INTERVAL = 5.0 + @dataclass class StreamEvent: @@ -28,24 +36,36 @@ class ActiveStream: Events are stored in an append-only list for cursor-based retrieval. The stream can be cancelled, and cleanup happens automatically. + Events are also persisted to the database for session independence. """ execution_id: str conversation_id: str project_id: str + user_email: str = '' # For database persistence events: list[StreamEvent] = field(default_factory=list) is_complete: bool = False is_cancelled: bool = False error: str | None = None task: asyncio.Task | None = None + persist_task: asyncio.Task | None = None created_at: float = field(default_factory=time.time) + _pending_events: list[dict] = field(default_factory=list) + _last_persist_time: float = field(default_factory=time.time) + _persist_index: int = 0 # Track which events have been persisted def add_event(self, event_data: dict[str, Any]) -> None: - """Add an event to the stream.""" - self.events.append(StreamEvent( + """Add an event to the stream and queue for persistence.""" + event = StreamEvent( timestamp=time.time(), data=event_data, - )) + ) + self.events.append(event) + # Queue event for database persistence + self._pending_events.append({ + 'timestamp': event.timestamp, + **event_data, + }) def get_events_since(self, cursor: float = 0.0) -> tuple[list[dict[str, Any]], float]: """Get all events since the given cursor timestamp. @@ -95,6 +115,24 @@ def cancel(self) -> bool: self.is_complete = True return True + def get_pending_events(self) -> list[dict]: + """Get and clear pending events for database persistence.""" + events = self._pending_events.copy() + self._pending_events.clear() + self._last_persist_time = time.time() + return events + + def should_persist(self) -> bool: + """Check if events should be persisted now.""" + if not self._pending_events: + return False + if len(self._pending_events) >= EVENT_PERSIST_BATCH_SIZE: + return True + elapsed = time.time() - self._last_persist_time + if elapsed >= EVENT_PERSIST_INTERVAL: + return True + return False + class ActiveStreamManager: """Manages multiple active streams with automatic cleanup.""" @@ -110,12 +148,14 @@ async def create_stream( self, project_id: str, conversation_id: str, + user_email: str = '', ) -> ActiveStream: """Create a new active stream. Args: project_id: Project ID conversation_id: Conversation ID + user_email: User email for database persistence Returns: New ActiveStream instance @@ -126,15 +166,85 @@ async def create_stream( execution_id=execution_id, conversation_id=conversation_id, project_id=project_id, + user_email=user_email, ) async with self._lock: self._streams[execution_id] = stream await self._cleanup_old_streams() - logger.info(f"Created active stream {execution_id} for conversation {conversation_id}") + # Persist to database for session independence + if user_email: + await self._persist_stream_to_db(stream) + + logger.info( + f"Created active stream {execution_id} " + f"for conversation {conversation_id}" + ) return stream + async def _persist_stream_to_db(self, stream: ActiveStream) -> None: + """Persist stream to database.""" + try: + from .storage import ExecutionStorage + storage = ExecutionStorage( + stream.user_email, + stream.project_id, + stream.conversation_id + ) + await storage.create(stream.execution_id) + logger.debug(f"Persisted stream {stream.execution_id} to database") + except Exception as e: + logger.warning(f"Failed to persist stream to database: {e}") + + async def persist_events(self, stream: ActiveStream) -> None: + """Persist pending events to database.""" + if not stream.user_email: + return + + events = stream.get_pending_events() + if not events: + return + + try: + from .storage import ExecutionStorage + storage = ExecutionStorage( + stream.user_email, + stream.project_id, + stream.conversation_id + ) + await storage.add_events(stream.execution_id, events) + logger.debug( + f"Persisted {len(events)} events for " + f"stream {stream.execution_id}" + ) + except Exception as e: + logger.warning(f"Failed to persist events to database: {e}") + + async def update_stream_status( + self, + stream: ActiveStream, + status: str, + error: Optional[str] = None, + ) -> None: + """Update stream status in database.""" + if not stream.user_email: + return + + try: + from .storage import ExecutionStorage + storage = ExecutionStorage( + stream.user_email, + stream.project_id, + stream.conversation_id + ) + await storage.update_status(stream.execution_id, status, error) + logger.debug( + f"Updated stream {stream.execution_id} status to {status}" + ) + except Exception as e: + logger.warning(f"Failed to update stream status: {e}") + async def get_stream(self, execution_id: str) -> ActiveStream | None: """Get a stream by execution ID.""" async with self._lock: @@ -173,6 +283,8 @@ async def start_stream( stream: The ActiveStream to populate with events agent_coroutine: Async function that yields events """ + manager = self # Reference for nested function + async def run_agent(): try: await agent_coroutine() @@ -184,18 +296,50 @@ async def run_agent(): except Exception as e: import traceback error_details = traceback.format_exc() - logger.error(f"Stream {stream.execution_id} error: {type(e).__name__}: {e}") - logger.error(f"Stream {stream.execution_id} traceback:\n{error_details}") - print(f"[STREAM ERROR] {stream.execution_id}: {type(e).__name__}: {e}", flush=True) - print(f"[STREAM TRACEBACK]\n{error_details}", flush=True) + logger.error( + f"Stream {stream.execution_id} error: " + f"{type(e).__name__}: {e}" + ) + logger.error( + f"Stream {stream.execution_id} traceback:\n{error_details}" + ) if not stream.is_complete: - # Provide more context in error message error_msg = f"{type(e).__name__}: {str(e)}" if 'Stream closed' in str(e): - error_msg = f"Agent communication interrupted ({type(e).__name__}): {str(e)}. This may occur when operations take longer than expected." + error_msg = ( + f"Agent communication interrupted " + f"({type(e).__name__}): {str(e)}. " + f"Operations may have exceeded timeout." + ) stream.mark_error(error_msg) + finally: + # Final persist of any remaining events + await manager.persist_events(stream) + # Update final status + if stream.is_cancelled: + await manager.update_stream_status( + stream, 'cancelled' + ) + elif stream.error: + await manager.update_stream_status( + stream, 'error', stream.error + ) + else: + await manager.update_stream_status( + stream, 'completed' + ) + + async def persist_loop(): + """Periodically persist events to database.""" + while not stream.is_complete and not stream.is_cancelled: + await asyncio.sleep(EVENT_PERSIST_INTERVAL) + if stream.should_persist(): + await manager.persist_events(stream) stream.task = asyncio.create_task(run_agent()) + # Start persistence loop if user_email is set + if stream.user_email: + stream.persist_task = asyncio.create_task(persist_loop()) logger.info(f"Started agent task for stream {stream.execution_id}") diff --git a/databricks-builder-app/server/services/agent.py b/databricks-builder-app/server/services/agent.py index fdfc6f23..fa5e3a47 100644 --- a/databricks-builder-app/server/services/agent.py +++ b/databricks-builder-app/server/services/agent.py @@ -251,6 +251,7 @@ async def stream_agent_response( default_schema=default_schema, warehouse_id=warehouse_id, workspace_folder=workspace_folder, + workspace_url=databricks_host, ) # Load Claude settings for Databricks model serving authentication diff --git a/databricks-builder-app/server/services/backup_manager.py b/databricks-builder-app/server/services/backup_manager.py index b8655ee1..b663dc5a 100644 --- a/databricks-builder-app/server/services/backup_manager.py +++ b/databricks-builder-app/server/services/backup_manager.py @@ -128,13 +128,59 @@ async def restore_backup(project_id: str) -> bool: return True +def _create_default_claude_md(project_dir: Path) -> None: + """Create a default CLAUDE.md file for a new project. + + Args: + project_dir: Path to the project directory + """ + claude_md_path = project_dir / 'CLAUDE.md' + if claude_md_path.exists(): + return + + default_content = """# Project Context + +This file tracks the Databricks resources created in this project. +The AI assistant will update this file as resources are created. + +## Configuration + +- **Catalog:** (not yet configured) +- **Schema:** (not yet configured) + +## Resources Created + +### Tables +(none yet) + +### Volumes +(none yet) + +### Pipelines +(none yet) + +### Jobs +(none yet) + +## Notes + +Add any project-specific notes or context here. +""" + + try: + claude_md_path.write_text(default_content) + logger.info(f'Created default CLAUDE.md in {project_dir}') + except Exception as e: + logger.warning(f'Failed to create CLAUDE.md: {e}') + + def ensure_project_directory(project_id: str) -> Path: """Ensure project directory exists, restoring from backup if needed. This is the main entry point for getting a project directory. If the directory doesn't exist, attempts to restore from backup. If no backup exists, creates an empty directory. - Also ensures skills are copied to the project. + Also ensures skills are copied to the project and CLAUDE.md exists. Args: project_id: The project UUID @@ -146,6 +192,7 @@ def ensure_project_directory(project_id: str) -> Path: project_dir = Path(PROJECTS_BASE_DIR).resolve() / project_id needs_skills = not project_dir.exists() or not (project_dir / '.claude' / 'skills').exists() + is_new_project = not project_dir.exists() if not project_dir.exists(): # Try to restore from backup @@ -174,6 +221,10 @@ def ensure_project_directory(project_id: str) -> Path: if needs_skills: copy_skills_to_project(project_dir) + # Create default CLAUDE.md for new projects (or if it doesn't exist) + if is_new_project or not (project_dir / 'CLAUDE.md').exists(): + _create_default_claude_md(project_dir) + return project_dir diff --git a/databricks-builder-app/server/services/storage.py b/databricks-builder-app/server/services/storage.py index ed765bd6..ae836407 100644 --- a/databricks-builder-app/server/services/storage.py +++ b/databricks-builder-app/server/services/storage.py @@ -5,10 +5,12 @@ from typing import Optional -from sqlalchemy import delete, func, select +import json + +from sqlalchemy import delete, func, select, update from sqlalchemy.orm import selectinload -from server.db import Conversation, Message, Project, session_scope +from server.db import Conversation, Execution, Message, Project, session_scope class ProjectStorage: @@ -333,6 +335,139 @@ async def add_message( return message +class ExecutionStorage: + """Execution state storage for session independence.""" + + def __init__(self, user_email: str, project_id: str, conversation_id: str): + self.user_email = user_email + self.project_id = project_id + self.conversation_id = conversation_id + + async def create(self, execution_id: str) -> Execution: + """Create a new execution record.""" + async with session_scope() as session: + # Verify conversation ownership via join + result = await session.execute( + select(Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Conversation.id == self.conversation_id, + Conversation.project_id == self.project_id, + Project.user_email == self.user_email, + ) + ) + if not result.scalar_one_or_none(): + raise ValueError('Conversation not found or access denied') + + execution = Execution( + id=execution_id, + conversation_id=self.conversation_id, + project_id=self.project_id, + status='running', + events_json='[]', + ) + session.add(execution) + await session.flush() + await session.refresh(execution) + return execution + + async def get(self, execution_id: str) -> Optional[Execution]: + """Get an execution by ID.""" + async with session_scope() as session: + result = await session.execute( + select(Execution) + .join(Conversation, Execution.conversation_id == Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Execution.id == execution_id, + Execution.conversation_id == self.conversation_id, + Project.user_email == self.user_email, + ) + ) + return result.scalar_one_or_none() + + async def get_active(self) -> Optional[Execution]: + """Get the active (running) execution for this conversation, if any.""" + async with session_scope() as session: + result = await session.execute( + select(Execution) + .join(Conversation, Execution.conversation_id == Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Execution.conversation_id == self.conversation_id, + Execution.status == 'running', + Project.user_email == self.user_email, + ) + .order_by(Execution.created_at.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + + async def get_recent(self, limit: int = 10) -> list[Execution]: + """Get recent executions for this conversation.""" + async with session_scope() as session: + result = await session.execute( + select(Execution) + .join(Conversation, Execution.conversation_id == Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Execution.conversation_id == self.conversation_id, + Project.user_email == self.user_email, + ) + .order_by(Execution.created_at.desc()) + .limit(limit) + ) + return list(result.scalars().all()) + + async def add_events(self, execution_id: str, events: list[dict]) -> bool: + """Append events to an execution's event list.""" + async with session_scope() as session: + result = await session.execute( + select(Execution) + .join(Conversation, Execution.conversation_id == Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Execution.id == execution_id, + Project.user_email == self.user_email, + ) + ) + execution = result.scalar_one_or_none() + if not execution: + return False + + # Load existing events and append new ones + existing_events = json.loads(execution.events_json) if execution.events_json else [] + existing_events.extend(events) + execution.events_json = json.dumps(existing_events) + return True + + async def update_status( + self, + execution_id: str, + status: str, + error: Optional[str] = None, + ) -> bool: + """Update execution status.""" + async with session_scope() as session: + result = await session.execute( + select(Execution) + .join(Conversation, Execution.conversation_id == Conversation.id) + .join(Project, Conversation.project_id == Project.id) + .where( + Execution.id == execution_id, + Project.user_email == self.user_email, + ) + ) + execution = result.scalar_one_or_none() + if not execution: + return False + + execution.status = status + if error: + execution.error = error + return True + + def get_project_storage(user_email: str) -> ProjectStorage: """Get project storage for a user.""" return ProjectStorage(user_email) @@ -341,3 +476,8 @@ def get_project_storage(user_email: str) -> ProjectStorage: def get_conversation_storage(user_email: str, project_id: str) -> ConversationStorage: """Get conversation storage for a project.""" return ConversationStorage(user_email, project_id) + + +def get_execution_storage(user_email: str, project_id: str, conversation_id: str) -> ExecutionStorage: + """Get execution storage for a conversation.""" + return ExecutionStorage(user_email, project_id, conversation_id) diff --git a/databricks-builder-app/server/services/system_prompt.py b/databricks-builder-app/server/services/system_prompt.py index 7d09e636..242f6cc6 100644 --- a/databricks-builder-app/server/services/system_prompt.py +++ b/databricks-builder-app/server/services/system_prompt.py @@ -9,6 +9,7 @@ def get_system_prompt( default_schema: str | None = None, warehouse_id: str | None = None, workspace_folder: str | None = None, + workspace_url: str | None = None, ) -> str: """Generate the system prompt for the Claude agent. @@ -20,6 +21,7 @@ def get_system_prompt( default_schema: Optional default schema name warehouse_id: Optional Databricks SQL warehouse ID for queries workspace_folder: Optional workspace folder for file uploads + workspace_url: Optional Databricks workspace URL for generating resource links Returns: System prompt string @@ -30,11 +32,14 @@ def get_system_prompt( if skills: skill_list = '\n'.join(f" - **{s['name']}**: {s['description']}" for s in skills) skills_section = f""" -## Skills +## Skills (LOAD FIRST!) -Load skills using the `Skill` tool for detailed guidance on specific topics. +**MANDATORY: ALWAYS load the most relevant skill BEFORE taking any action.** -Available skills: +Skills contain critical guidance, best practices, and exact tool usage patterns. +Do NOT proceed with ANY task until you have loaded the appropriate skill. + +Use the `Skill` tool to load skills. Available skills: {skill_list} """ @@ -106,14 +111,49 @@ def get_system_prompt( if default_schema: catalog_schema_section = catalog_schema_section.replace('{schema}', default_schema) + # Build workspace URL section for resource links + workspace_url_section = '' + if workspace_url: + workspace_url_section = f""" +## Workspace URL + +The Databricks workspace URL is: `{workspace_url}` + +Use this to construct clickable links in your responses (see Resource Links section below). +""" + return f"""# Databricks AI Dev Kit -{cluster_section}{warehouse_section}{workspace_folder_section}{catalog_schema_section} +{cluster_section}{warehouse_section}{workspace_folder_section}{catalog_schema_section}{workspace_url_section} You are a Databricks development assistant with access to MCP tools for building data pipelines, running SQL queries, managing infrastructure, and deploying assets to Databricks. -When given a task, complete ALL steps automatically without stopping for approval. -Execute the full workflow start to finish - do not present options or wait between steps. +## Response Format + +**CRITICAL: Keep your responses concise and action-focused.** + +- Do NOT include your reasoning process or chain-of-thought in your response +- Do NOT explain what you're about to do in detail before doing it +- DO show a brief plan (2-4 lines max) before creating resources +- DO provide clear, actionable output with resource links +- Your response should primarily contain: plans, results, and resource links + +## Plan Before Action + +**IMPORTANT: Before creating any Databricks resources (tables, volumes, pipelines, jobs), propose a brief plan first.** + +Present a 2-4 line summary of what you will create: +- What resources will be created (tables, volumes, pipelines) +- Where they will be stored (catalog.schema) +- Any data that will be generated + +Example: +> **Plan:** I'll create synthetic customer data in `ai_dev_kit.demo_schema`: +> - Generate 2,500 customers, 25,000 orders, 8,000 tickets +> - Save to volume `/Volumes/ai_dev_kit/demo_schema/raw_data` +> - Data will span the last 6 months with realistic patterns + +Then proceed with execution without waiting for approval. ## Project Context @@ -134,10 +174,74 @@ def get_system_prompt( {skills_section} +## Resource Links + +**CRITICAL: After creating ANY Databricks resource, ALWAYS provide a clickable link so the user can verify it.** + +Use these URL patterns (workspace URL: `{workspace_url or 'https://your-workspace.databricks.com'}`): + +| Resource | URL Pattern | +|----------|-------------| +| Table | `{workspace_url or 'WORKSPACE_URL'}/explore/data/{{catalog}}/{{schema}}/{{table}}` | +| Volume | `{workspace_url or 'WORKSPACE_URL'}/explore/data/volumes/{{catalog}}/{{schema}}/{{volume}}` | +| Pipeline | `{workspace_url or 'WORKSPACE_URL'}/pipelines/{{pipeline_id}}` | +| Job | `{workspace_url or 'WORKSPACE_URL'}/jobs/{{job_id}}` | +| Notebook | `{workspace_url or 'WORKSPACE_URL'}#workspace{{path}}` | + +**Example response after creating resources:** + +> Data generation complete! I created: +> - **Volume:** [raw_data]({workspace_url or 'WORKSPACE_URL'}/explore/data/volumes/ai_dev_kit/demo_schema/raw_data) +> - **Tables:** 3 parquet datasets (customers, orders, tickets) +> +> **Next step:** Open the volume link above to verify the data was written correctly. + +Always include a "Next step" suggesting the user verify the created resources. + +## Permission Grants (IMPORTANT) + +**After creating ANY resource, ALWAYS grant permissions to all workspace users.** + +This ensures all team members can access resources created by this app. + +| Resource Type | Grant Command | +|--------------|---------------| +| **Table** | `GRANT ALL PRIVILEGES ON TABLE catalog.schema.table_name TO \`account users\`` | +| **Schema** | `GRANT ALL PRIVILEGES ON SCHEMA catalog.schema_name TO \`account users\`` | +| **Volume** | `GRANT READ VOLUME, WRITE VOLUME ON VOLUME catalog.schema.volume_name TO \`account users\`` | +| **View** | `GRANT ALL PRIVILEGES ON VIEW catalog.schema.view_name TO \`account users\`` | + +**Example after creating a table:** + +CREATE TABLE my_catalog.my_schema.customers AS SELECT ...; +GRANT ALL PRIVILEGES ON TABLE my_catalog.my_schema.customers TO `account users`; + +**Example after creating a schema:** + +CREATE SCHEMA my_catalog.new_schema; +GRANT ALL PRIVILEGES ON SCHEMA my_catalog.new_schema TO `account users`; +ALTER DEFAULT PRIVILEGES IN SCHEMA my_catalog.new_schema GRANT ALL ON TABLES TO `account users`; + ## Workflow -1. **Load the relevant skill FIRST** - Skills contain detailed guidance and best practices -2. **Use MCP tools** for all Databricks operations -3. **Complete workflows automatically** - Don't stop halfway or ask users to do manual steps -4. **Verify results** - Use `get_table_details` to confirm data was written correctly +1. **IMMEDIATELY load the relevant skill** - This is NON-NEGOTIABLE. Load the skill FIRST before any other action +2. **Propose a brief plan** (2-4 lines) before creating resources +3. **Use MCP tools** for all Databricks operations +4. **Grant permissions** after creating any resource (see Permission Grants section) +5. **Complete workflows automatically** - Don't stop halfway or ask users to do manual steps +6. **Verify results** - Use `get_table_details` to confirm data was written correctly +7. **Provide resource links** - Always include clickable URLs for created resources + +### Skill Selection Guide + +| User Request | Skill to Load | +|--------------|---------------| +| Generate data, synthetic data, fake data, test data | `synthetic-data-generation` | +| Pipeline, ETL, bronze/silver/gold, data transformation | `spark-declarative-pipelines` | +| Dashboard, visualization, BI, charts | `aibi-dashboards` | +| Job, workflow, schedule, automation | `databricks-jobs` | +| SDK, API, Databricks client | `databricks-python-sdk` | +| Unity Catalog, tables, volumes, schemas | `databricks-unity-catalog` | +| Agent, chatbot, AI assistant | `agent-bricks` | +| App deployment, web app | `databricks-app-python` | """ diff --git a/databricks-builder-app/server/services/title_generator.py b/databricks-builder-app/server/services/title_generator.py new file mode 100644 index 00000000..89c5bba0 --- /dev/null +++ b/databricks-builder-app/server/services/title_generator.py @@ -0,0 +1,120 @@ +"""AI-powered conversation title generation. + +Uses Claude to generate concise, descriptive titles for conversations +based on the first user message. +""" + +import asyncio +import logging +import os + +import anthropic + +logger = logging.getLogger(__name__) + +# Cache the Anthropic client +_client = None + + +def _get_client() -> anthropic.AsyncAnthropic: + """Get or create the Anthropic client.""" + global _client + if _client is None: + # Use the same auth as the main agent (from .claude/settings.json) + api_key = os.environ.get('ANTHROPIC_API_KEY') + base_url = os.environ.get('ANTHROPIC_BASE_URL') + + if base_url: + _client = anthropic.AsyncAnthropic( + api_key=api_key or 'unused', # Databricks auth doesn't need real key + base_url=base_url, + ) + else: + _client = anthropic.AsyncAnthropic(api_key=api_key) + + return _client + + +async def generate_title(message: str, max_length: int = 40) -> str: + """Generate a concise title for a conversation based on the first message. + + Args: + message: The user's first message in the conversation + max_length: Maximum length of the generated title + + Returns: + A short, descriptive title (or truncated message as fallback) + """ + # Fallback: truncate message + fallback = message[:max_length].strip() + if len(message) > max_length: + fallback = fallback.rsplit(' ', 1)[0] + '...' + + try: + client = _get_client() + + response = await asyncio.wait_for( + client.messages.create( + model='claude-3-5-haiku-latest', + max_tokens=50, + messages=[ + { + 'role': 'user', + 'content': f'''Generate a very short title (3-6 words max) for this chat message. +The title should capture the main intent/topic. No quotes, no punctuation at the end. + +Message: {message[:500]} + +Title:''', + } + ], + ), + timeout=5.0, # 5 second timeout + ) + + # Extract title from response + title = response.content[0].text.strip() + + # Clean up: remove quotes, limit length + title = title.strip('"\'') + if len(title) > max_length: + title = title[:max_length].rsplit(' ', 1)[0] + '...' + + return title if title else fallback + + except asyncio.TimeoutError: + logger.warning('Title generation timed out, using fallback') + return fallback + except Exception as e: + logger.warning(f'Title generation failed: {e}, using fallback') + return fallback + + +async def generate_title_async( + message: str, + conversation_id: str, + user_email: str, + project_id: str, +) -> None: + """Generate a title and update the conversation in the background. + + This runs in a fire-and-forget pattern so it doesn't block the main response. + + Args: + message: The user's first message + conversation_id: ID of the conversation to update + user_email: User's email for storage access + project_id: Project ID for storage access + """ + try: + title = await generate_title(message) + + # Update the conversation title + from .storage import ConversationStorage + + storage = ConversationStorage(user_email, project_id) + await storage.update_title(conversation_id, title) + logger.info(f'Updated conversation {conversation_id} title to: {title}') + + except Exception as e: + logger.warning(f'Failed to update conversation title: {e}') diff --git a/databricks-builder-app/server/services/user.py b/databricks-builder-app/server/services/user.py index 52dd2128..595af65c 100644 --- a/databricks-builder-app/server/services/user.py +++ b/databricks-builder-app/server/services/user.py @@ -62,7 +62,8 @@ async def get_current_user(request: Request) -> str: async def get_current_token(request: Request) -> str | None: """Get the current user's Databricks access token. - In production (Databricks Apps), extracts from X-Forwarded-Access-Token header. + In production (Databricks Apps), returns None to use SP OAuth credentials. + Using user forwarded tokens conflicts with SP OAuth env vars. In development, uses DATABRICKS_TOKEN env var. Args: @@ -71,18 +72,17 @@ async def get_current_token(request: Request) -> str | None: Returns: Access token string, or None if not available """ - # Try to get token from header first (production mode) - token = request.headers.get('X-Forwarded-Access-Token') - if token: - logger.debug('Got token from X-Forwarded-Access-Token header') - return token + # In production (Databricks Apps), use SP OAuth credentials from env vars + # Don't use forwarded user tokens as they conflict with SP OAuth + if not _is_local_development(): + logger.debug('Production mode: using SP OAuth credentials (not user token)') + return None # Fall back to env var for development - if _is_local_development(): - token = os.getenv('DATABRICKS_TOKEN') - if token: - logger.debug('Got token from DATABRICKS_TOKEN env var') - return token + token = os.getenv('DATABRICKS_TOKEN') + if token: + logger.debug('Got token from DATABRICKS_TOKEN env var') + return token return None