-
-
Notifications
You must be signed in to change notification settings - Fork 132
ported MCP to http stream instead of SSE #761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,12 +1,13 @@ | ||||||
| import "./load_env.js"; // MUST BE FIRST | ||||||
|
|
||||||
| import { Server } from "@modelcontextprotocol/sdk/server/index.js"; | ||||||
| import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; | ||||||
| import { | ||||||
| CallToolRequestSchema, | ||||||
| ListToolsRequestSchema, | ||||||
| import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; | ||||||
| import { | ||||||
| CallToolRequestSchema, | ||||||
| ListToolsRequestSchema, | ||||||
| ErrorCode, | ||||||
| McpError | ||||||
| } from "@modelcontextprotocol/sdk/types.js"; | ||||||
| import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; | ||||||
| import express from "express"; | ||||||
| import cors from "cors"; | ||||||
|
|
||||||
|
|
@@ -21,126 +22,123 @@ import { MOCK_USER_ID } from "./config.js"; | |||||
|
|
||||||
| console.error(`[MCP] Active Mock User ID: ${MOCK_USER_ID}`); | ||||||
|
|
||||||
| const handlers = [ | ||||||
| handleNutritionTool, | ||||||
| handleExerciseTool, | ||||||
| handleCheckinTool, | ||||||
| handleCoachTool, | ||||||
| handleProactiveTool, | ||||||
| handleVisionTool, | ||||||
| handleDevTool, | ||||||
| ]; | ||||||
|
|
||||||
| const allTools = [ | ||||||
| ...nutritionTools, | ||||||
| ...exerciseTools, | ||||||
| ...checkinTools, | ||||||
| ...coachTools, | ||||||
| ...proactiveTools, | ||||||
| ...visionTools, | ||||||
| ...devTools, | ||||||
| ]; | ||||||
|
|
||||||
| /** | ||||||
| * Factory function to create a new MCP Server instance. | ||||||
| * Using the low-level Server class to support raw JSON schemas (McpServer defaults them to empty). | ||||||
| * Called once per request in stateless mode (SDK requirement). | ||||||
| */ | ||||||
| function createMCPServer() { | ||||||
| const server = new Server( | ||||||
| { | ||||||
| name: "sparky-fitness-mcp", | ||||||
| version: "1.0.0", | ||||||
| }, | ||||||
| { | ||||||
| capabilities: { | ||||||
| tools: {}, | ||||||
| }, | ||||||
| } | ||||||
| { name: "sparky-fitness-mcp", version: "1.0.0" }, | ||||||
| { capabilities: { tools: {} } } | ||||||
| ); | ||||||
|
|
||||||
| // Register the tool listing handler | ||||||
| server.setRequestHandler(ListToolsRequestSchema, async () => { | ||||||
| return { | ||||||
| tools: [ | ||||||
| ...nutritionTools, | ||||||
| ...exerciseTools, | ||||||
| ...checkinTools, | ||||||
| ...coachTools, | ||||||
| ...proactiveTools, | ||||||
| ...visionTools, | ||||||
| ...devTools | ||||||
| ], | ||||||
| tools: allTools.map(tool => ({ | ||||||
| name: tool.name, | ||||||
| description: tool.description, | ||||||
| inputSchema: tool.inputSchema, | ||||||
| })) | ||||||
| }; | ||||||
| }); | ||||||
|
|
||||||
| server.setRequestHandler(CallToolRequestSchema, async (request: any) => { | ||||||
| // Register the tool calling handler | ||||||
| server.setRequestHandler(CallToolRequestSchema, async (request) => { | ||||||
| const { name, arguments: args } = request.params; | ||||||
| const handlers = [ | ||||||
| handleNutritionTool, | ||||||
| handleExerciseTool, | ||||||
| handleCheckinTool, | ||||||
| handleCoachTool, | ||||||
| handleProactiveTool, | ||||||
| handleVisionTool, | ||||||
| handleDevTool | ||||||
| ]; | ||||||
|
|
||||||
| for (const handler of handlers) { | ||||||
| const res = await handler(name, args); | ||||||
| if (res) return res; | ||||||
| const result = await handler(name, args); | ||||||
| if (result) return result; | ||||||
| } | ||||||
|
|
||||||
| throw new Error(`Unknown tool: ${name}`); | ||||||
| throw new McpError( | ||||||
| ErrorCode.MethodNotFound, | ||||||
| `Tool not found: ${name}` | ||||||
| ); | ||||||
| }); | ||||||
|
|
||||||
| return server; | ||||||
| } | ||||||
|
|
||||||
| const app = express(); | ||||||
| app.use(cors()); | ||||||
|
|
||||||
| // Map to store active SSE transports by their session ID | ||||||
| const transports = new Map<string, SSEServerTransport>(); | ||||||
| app.use(express.json()); | ||||||
|
|
||||||
| /** | ||||||
| * Optimized SSE Connection Handler | ||||||
| * MCP Streamable HTTP handler. | ||||||
| * | ||||||
| * IMPORTANT: The SDK's stateless transport CANNOT be reused across requests | ||||||
| * (it throws "Stateless transport cannot be reused across requests"). | ||||||
| * We must create a fresh Server + transport per request. | ||||||
| * | ||||||
| * We also force-inject the correct Accept header so clients like n8n that only | ||||||
| * send "Accept: application/json" don't get a 406 from the SDK's strict validation | ||||||
| * (which requires BOTH application/json AND text/event-stream). | ||||||
| */ | ||||||
| const handleSseConnection = async (req: express.Request, res: express.Response) => { | ||||||
| console.log(`[MCP] New SSE connection: ${req.method} ${req.path}`); | ||||||
|
|
||||||
| const transport = new SSEServerTransport(req.path, res); | ||||||
| const server = createMCPServer(); | ||||||
|
|
||||||
| await server.connect(transport); | ||||||
|
|
||||||
| if (transport.sessionId) { | ||||||
| transports.set(transport.sessionId, transport); | ||||||
| console.log(`[MCP] Session active: ${transport.sessionId}`); | ||||||
|
|
||||||
| res.on("close", () => { | ||||||
| console.log(`[MCP] Session closed: ${transport.sessionId}`); | ||||||
| transports.delete(transport.sessionId!); | ||||||
| server.close(); | ||||||
| const handleMcpRequest = async (req: express.Request, res: express.Response) => { | ||||||
| console.log(`[MCP] Request: ${req.method} ${req.path}`); | ||||||
|
|
||||||
| // Force the Accept header to satisfy SDK validation. | ||||||
| req.headers['accept'] = 'application/json, text/event-stream'; | ||||||
|
|
||||||
| try { | ||||||
| // Create a fresh server + transport per request (stateless mode requirement) | ||||||
| const server = createMCPServer(); | ||||||
| const transport = new StreamableHTTPServerTransport({ | ||||||
| sessionIdGenerator: undefined, // stateless mode | ||||||
| enableJsonResponse: true, // return JSON directly instead of SSE streams | ||||||
| }); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * Resilient Message Handler | ||||||
| */ | ||||||
| const handleSseMessage = async (req: express.Request, res: express.Response) => { | ||||||
| const sessionId = req.query.sessionId as string; | ||||||
|
|
||||||
| let transport = transports.get(sessionId); | ||||||
| if (!transport) { | ||||||
| console.log(`[MCP] Session ${sessionId} not found yet, retrying...`); | ||||||
| await new Promise(r => setTimeout(r, 200)); | ||||||
| transport = transports.get(sessionId); | ||||||
| } | ||||||
| await server.connect(transport); | ||||||
| await transport.handleRequest(req, res, req.body); | ||||||
|
|
||||||
| if (transport) { | ||||||
| await transport.handlePostMessage(req, res); | ||||||
| } else { | ||||||
| const activeSessions = Array.from(transports.keys()).join(", "); | ||||||
| console.error(`[MCP] ERROR: Unknown session ${sessionId}. Active: [${activeSessions}]`); | ||||||
| res.status(404).send("Unknown session"); | ||||||
| // Clean up after the response is sent | ||||||
| res.on("finish", () => { | ||||||
| transport.close().catch(() => {}); | ||||||
| }); | ||||||
| } catch (error) { | ||||||
| console.error("[MCP] Transport error:", error); | ||||||
| if (!res.headersSent) { | ||||||
| res.status(500).json({ | ||||||
| jsonrpc: "2.0", | ||||||
| error: { code: -32603, message: "Internal Server Error" }, | ||||||
| id: null, | ||||||
| }); | ||||||
| } | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| app.get("/mcp", handleSseConnection); | ||||||
| app.post("/mcp", handleSseMessage); | ||||||
| app.get("/sse", handleSseConnection); | ||||||
| app.post(["/sse", "/messages"], handleSseMessage); | ||||||
| app.all("/mcp", handleMcpRequest); | ||||||
|
|
||||||
| app.use(express.json()); | ||||||
| // Simple tools discovery endpoint (no MCP protocol overhead) | ||||||
| app.get("/mcp/tools", (_req, res) => { | ||||||
| res.json({ tools: allTools }); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This endpoint returns the entire tool objects from
Suggested change
|
||||||
| }); | ||||||
|
|
||||||
| const PORT = process.env.SPARKY_FITNESS_MCP_PORT || process.env.PORT || 5435; | ||||||
|
|
||||||
| if (process.stdout.isTTY || process.env.TRANSPORT === 'sse') { | ||||||
| app.listen(PORT, () => { | ||||||
| console.log(`Sparky MCP Server running on port ${PORT} (SSE mode)`); | ||||||
| }); | ||||||
| } else { | ||||||
| const server = createMCPServer(); | ||||||
| const transport = new StdioServerTransport(); | ||||||
| server.connect(transport).catch(console.error); | ||||||
| console.error("Sparky MCP Server running in Stdio mode"); | ||||||
| } | ||||||
| app.listen(PORT, () => { | ||||||
| console.log(`Sparky MCP Server running on port ${PORT} (Streamable HTTP mode)`); | ||||||
| }); | ||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -49,7 +49,41 @@ export const createFood = async (args: any) => { | |||||
| ] | ||||||
| ); | ||||||
|
|
||||||
| return { content: [{ type: "text", text: `✅ Created food "${food_name}" with variant ID ${createVariantRes.rows[0].id}.` }] }; | ||||||
| const newVariantId = createVariantRes.rows[0].id; | ||||||
| let message = `✅ Created food "${food_name}" (Variant ID: ${newVariantId}).`; | ||||||
|
|
||||||
| // Automatically log to diary if meal_type is provided | ||||||
| if (args.meal_type) { | ||||||
| const mealTypeId = await getMealTypeId(args.meal_type); | ||||||
| const date = args.entry_date || new Date().toISOString().split('T')[0]; | ||||||
|
|
||||||
| // Final quantity/unit for the log entry | ||||||
| const logQuantity = parseFloat(args.quantity) || macros?.serving_size || 100; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a potential bug here when
Suggested change
|
||||||
| const logUnit = args.unit || macros?.serving_unit || 'g'; | ||||||
|
|
||||||
| await query( | ||||||
| `INSERT INTO food_entries ( | ||||||
| user_id, food_id, variant_id, meal_type_id, quantity, unit, entry_date, | ||||||
| food_name, brand_name, calories, protein, carbs, fat, serving_size, serving_unit, | ||||||
| saturated_fat, polyunsaturated_fat, monounsaturated_fat, trans_fat, cholesterol, | ||||||
| sodium, potassium, dietary_fiber, sugars, vitamin_a, vitamin_c, calcium, iron, | ||||||
| glycemic_index, custom_nutrients, created_by_user_id | ||||||
| ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $1)`, | ||||||
| [ | ||||||
| MOCK_USER_ID, newFoodId, newVariantId, mealTypeId, | ||||||
| logQuantity, logUnit, date, | ||||||
| food_name, brand || '', macros?.calories || 0, macros?.protein || 0, macros?.carbs || 0, macros?.fat || 0, | ||||||
| macros?.serving_size || 100, macros?.serving_unit || 'g', | ||||||
| macros?.saturated_fat || 0, macros?.polyunsaturated_fat || 0, macros?.monounsaturated_fat || 0, macros?.trans_fat || 0, | ||||||
| macros?.cholesterol || 0, macros?.sodium || 0, macros?.potassium || 0, macros?.fiber || 0, macros?.sugar || 0, | ||||||
| macros?.vitamin_a || 0, macros?.vitamin_c || 0, macros?.calcium || 0, macros?.iron || 0, | ||||||
| macros?.gi || 'None', macros?.custom_nutrients || {} | ||||||
| ] | ||||||
| ); | ||||||
| message += ` Also logged to ${args.meal_type} for ${date}.`; | ||||||
| } | ||||||
|
|
||||||
| return { content: [{ type: "text", text: message }] }; | ||||||
| }; | ||||||
|
|
||||||
| export const searchMeal = async (args: any) => { | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swallowing errors with an empty
catchblock can hide potential issues during resource cleanup. It's better to at least log the error to aid in debugging iftransport.close()fails for some reason.