diff --git a/docs/architecture/investing-ops-automation.md b/docs/architecture/investing-ops-automation.md new file mode 100644 index 000000000000..d6b313003366 --- /dev/null +++ b/docs/architecture/investing-ops-automation.md @@ -0,0 +1,75 @@ +# Investing Research Ops Automation + +This slice closes `#514`. + +## Implementation plan + +- persist explicit research-op schedules instead of relying on ad hoc reminders +- register those schedules inside the always-on process, following the same global scheduler pattern as ingestion +- materialize delivery content into a durable audit trail so operators can inspect every unattended run + +## Runtime model + +State file: + +- `~/.local/state/zee/investing/ops-automation.json` + +The state file stores two records: + +- `schedules[]` + - workflow, cadence, enabled state, optional symbol scope, output format, delivery target, and last-run audit summary +- `deliveries[]` + - one record per unattended or operator-triggered run with artifact type/id, delivered content, timestamps, and error detail when a run fails + +Supported workflows: + +- `daily-portfolio-brief` +- `earnings-preview-packet` +- `earnings-review-packet` + +Supported delivery target: + +- `audit-log` + - the rendered artifact is persisted directly into the delivery record so operators can review the exact unattended output later + +## Operator surfaces + +CLI: + +- `zee investing ops schedule create` +- `zee investing ops schedule read` +- `zee investing ops schedule list` +- `zee investing ops schedule update` +- `zee investing ops schedule run` +- `zee investing ops delivery read` +- `zee investing ops delivery list` + +Tool surface: + +- `zee:invest-ops` + +Supported tool actions: + +- `create-schedule` +- `update-schedule` +- `read-schedule` +- `list-schedules` +- `run-schedule` +- `read-delivery` +- `list-deliveries` + +## Always-on integration + +- the always-on daemon registers enabled research-op schedules during startup +- schedules run as global scheduler tasks so they keep firing without GitHub Actions or external cron +- the daily portfolio brief workflow materializes the latest persisted portfolio briefing +- earnings workflows locate the latest matching synthesis execution, regenerate the packet if needed, and store the delivered content in the audit trail + +## Telemetry + +This slice emits: + +- `investing.ops.schedule` + - emitted when schedules are created, updated, or registered with the resident scheduler +- `investing.ops.delivery` + - emitted on every unattended or operator-triggered run with workflow, artifact linkage, format, target, and error detail when applicable diff --git a/packages/zee/src/cli/cmd/always-on.ts b/packages/zee/src/cli/cmd/always-on.ts index 8be63fa09be7..0df7f9b595bc 100644 --- a/packages/zee/src/cli/cmd/always-on.ts +++ b/packages/zee/src/cli/cmd/always-on.ts @@ -39,6 +39,7 @@ import { type OrchestrationVisualMode, type VisualOrchestrationSink, } from "@root/orchestration-visual" +import { registerInvestingOpsSchedules } from "@root/domain/investing/ops-automation" import os from "os" const log = Log.create({ service: "always-on" }) @@ -695,6 +696,22 @@ export async function startAlwaysOnProcess(opts: AlwaysOnOptions): Promise + yargs + .option("workflow", { + type: "string", + choices: [...INVESTING_OPS_WORKFLOWS], + demandOption: true, + describe: "automation workflow to schedule", + }) + .option("schedule-minutes", { + type: "number", + demandOption: true, + describe: "recurring cadence in minutes", + }) + .option("enabled", { + type: "boolean", + describe: "whether the schedule should be active", + }) + .option("symbol", { + type: "string", + describe: "required symbol for earnings workflows", + }) + .option("watchlist-symbol", { + type: "array", + string: true, + describe: "optional watchlist override for daily portfolio briefs", + }) + .option("format", { + type: "string", + choices: [...INVESTING_OPS_FORMATS], + default: "markdown", + describe: "delivery format", + }) + .option("delivery-target", { + type: "string", + choices: [...INVESTING_OPS_DELIVERY_TARGETS], + default: "audit-log", + describe: "delivery destination", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { + workflow?: string + scheduleMinutes?: number + enabled?: boolean + symbol?: string + watchlistSymbol?: string[] + format?: string + deliveryTarget?: string + json?: boolean + }) => { + if (!args.workflow || !args.scheduleMinutes) { + throw new Error("workflow and scheduleMinutes are required") + } + const schedule = createInvestingOpsSchedule({ + workflow: args.workflow as InvestingOpsWorkflow, + scheduleMinutes: args.scheduleMinutes, + enabled: args.enabled, + symbol: args.symbol, + watchlistSymbols: args.watchlistSymbol, + format: (args.format as "json" | "markdown" | undefined) ?? "markdown", + deliveryTarget: (args.deliveryTarget as "audit-log" | undefined) ?? "audit-log", + }) + if (args.json) { + console.log(JSON.stringify(schedule, null, 2)) + return + } + + console.log(`${schedule.id}`) + console.log(`- workflow=${schedule.workflow} every=${schedule.scheduleMinutes}m enabled=${schedule.enabled}`) + console.log(`- symbol=${schedule.symbol ?? "n/a"} format=${schedule.format} target=${schedule.deliveryTarget}`) + }, +}) + +const InvestingOpsScheduleReadCommand = cmd({ + command: "read ", + describe: "read one persisted research ops schedule", + builder: (yargs: Argv) => + yargs + .positional("scheduleId", { + type: "string", + demandOption: true, + describe: "persisted ops schedule identifier", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { scheduleId?: string; json?: boolean }) => { + if (!args.scheduleId) { + throw new Error("scheduleId is required") + } + const schedule = getInvestingOpsSchedule(args.scheduleId) + const payload = schedule ?? { error: `Ops schedule not found: ${args.scheduleId}` } + if (args.json || !schedule) { + console.log(JSON.stringify(payload, null, 2)) + return + } + + console.log(`${schedule.id}`) + console.log(`- workflow=${schedule.workflow} every=${schedule.scheduleMinutes}m enabled=${schedule.enabled}`) + console.log(`- symbol=${schedule.symbol ?? "n/a"} format=${schedule.format} target=${schedule.deliveryTarget}`) + console.log(`- lastStatus=${schedule.audit.lastStatus ?? "never"} lastRunAt=${schedule.audit.lastRunAt ?? "n/a"}`) + }, +}) + +const InvestingOpsScheduleListCommand = cmd({ + command: "list", + describe: "list persisted research ops schedules", + builder: (yargs: Argv) => + yargs + .option("workflow", { + type: "string", + choices: [...INVESTING_OPS_WORKFLOWS], + describe: "optional workflow filter", + }) + .option("enabled", { + type: "boolean", + describe: "optional enabled filter", + }) + .option("symbol", { + type: "string", + describe: "optional symbol filter", + }) + .option("limit", { + type: "number", + default: 10, + describe: "maximum number of schedules to return", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { workflow?: string; enabled?: boolean; symbol?: string; limit?: number; json?: boolean }) => { + const schedules = listInvestingOpsSchedules({ + workflow: args.workflow as InvestingOpsWorkflow | undefined, + enabled: args.enabled, + symbol: args.symbol, + limit: args.limit, + }) + if (args.json) { + console.log(JSON.stringify({ schedules, count: schedules.length }, null, 2)) + return + } + for (const schedule of schedules) { + console.log( + `- ${schedule.id}: workflow=${schedule.workflow} every=${schedule.scheduleMinutes}m enabled=${schedule.enabled} symbol=${schedule.symbol ?? "n/a"} target=${schedule.deliveryTarget} format=${schedule.format}`, + ) + } + }, +}) + +const InvestingOpsScheduleUpdateCommand = cmd({ + command: "update ", + describe: "update a persisted research ops schedule", + builder: (yargs: Argv) => + yargs + .positional("scheduleId", { + type: "string", + demandOption: true, + describe: "persisted ops schedule identifier", + }) + .option("enabled", { + type: "boolean", + describe: "updated enabled state", + }) + .option("schedule-minutes", { + type: "number", + describe: "updated recurring cadence in minutes", + }) + .option("symbol", { + type: "string", + describe: "updated symbol for earnings workflows", + }) + .option("watchlist-symbol", { + type: "array", + string: true, + describe: "updated watchlist override for daily briefs", + }) + .option("format", { + type: "string", + choices: [...INVESTING_OPS_FORMATS], + describe: "updated delivery format", + }) + .option("delivery-target", { + type: "string", + choices: [...INVESTING_OPS_DELIVERY_TARGETS], + describe: "updated delivery destination", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { + scheduleId?: string + enabled?: boolean + scheduleMinutes?: number + symbol?: string + watchlistSymbol?: string[] + format?: string + deliveryTarget?: string + json?: boolean + }) => { + if (!args.scheduleId) { + throw new Error("scheduleId is required") + } + const schedule = updateInvestingOpsSchedule({ + scheduleId: args.scheduleId, + enabled: args.enabled, + scheduleMinutes: args.scheduleMinutes, + symbol: args.symbol, + watchlistSymbols: args.watchlistSymbol, + format: args.format as "json" | "markdown" | undefined, + deliveryTarget: args.deliveryTarget as "audit-log" | undefined, + }) + if (args.json) { + console.log(JSON.stringify(schedule, null, 2)) + return + } + + console.log(`${schedule.id}`) + console.log(`- workflow=${schedule.workflow} every=${schedule.scheduleMinutes}m enabled=${schedule.enabled}`) + console.log(`- symbol=${schedule.symbol ?? "n/a"} format=${schedule.format} target=${schedule.deliveryTarget}`) + }, +}) + +const InvestingOpsScheduleRunCommand = cmd({ + command: "run ", + describe: "run one persisted research ops schedule immediately", + builder: (yargs: Argv) => + yargs + .positional("scheduleId", { + type: "string", + demandOption: true, + describe: "persisted ops schedule identifier", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { scheduleId?: string; json?: boolean }) => { + if (!args.scheduleId) { + throw new Error("scheduleId is required") + } + const delivery = await runInvestingOpsSchedule({ + scheduleId: args.scheduleId, + }) + if (args.json) { + console.log(JSON.stringify(delivery, null, 2)) + return + } + + console.log(`${delivery.id}`) + console.log(`- workflow=${delivery.workflow} status=${delivery.status} target=${delivery.deliveryTarget}`) + console.log(`- artifact=${delivery.artifactKind}:${delivery.artifactId ?? "n/a"} symbol=${delivery.symbol ?? "n/a"}`) + console.log(`- summary=${delivery.summary}`) + if (delivery.content) { + console.log(`\n${delivery.content}`) + } + }, +}) + +const InvestingOpsScheduleCommand = cmd({ + command: "schedule", + describe: "persisted research ops schedules", + builder: (yargs: Argv) => + yargs + .command(InvestingOpsScheduleCreateCommand) + .command(InvestingOpsScheduleReadCommand) + .command(InvestingOpsScheduleListCommand) + .command(InvestingOpsScheduleUpdateCommand) + .command(InvestingOpsScheduleRunCommand) + .demandCommand(), + async handler() {}, +}) + +const InvestingOpsDeliveryReadCommand = cmd({ + command: "read ", + describe: "read one research ops delivery record", + builder: (yargs: Argv) => + yargs + .positional("deliveryId", { + type: "string", + demandOption: true, + describe: "persisted ops delivery identifier", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { deliveryId?: string; json?: boolean }) => { + if (!args.deliveryId) { + throw new Error("deliveryId is required") + } + const delivery = getInvestingOpsDeliveryRecord(args.deliveryId) + const payload = delivery ?? { error: `Ops delivery not found: ${args.deliveryId}` } + if (args.json || !delivery) { + console.log(JSON.stringify(payload, null, 2)) + return + } + + console.log(`${delivery.id}`) + console.log(`- workflow=${delivery.workflow} status=${delivery.status} target=${delivery.deliveryTarget}`) + console.log(`- artifact=${delivery.artifactKind}:${delivery.artifactId ?? "n/a"} symbol=${delivery.symbol ?? "n/a"}`) + console.log(`- summary=${delivery.summary}`) + if (delivery.error) { + console.log(`- error=${delivery.error}`) + } + if (delivery.content) { + console.log(`\n${delivery.content}`) + } + }, +}) + +const InvestingOpsDeliveryListCommand = cmd({ + command: "list", + describe: "list research ops delivery records", + builder: (yargs: Argv) => + yargs + .option("schedule-id", { + type: "string", + describe: "optional schedule filter", + }) + .option("workflow", { + type: "string", + choices: [...INVESTING_OPS_WORKFLOWS], + describe: "optional workflow filter", + }) + .option("status", { + type: "string", + choices: ["ok", "error"], + describe: "optional run status filter", + }) + .option("symbol", { + type: "string", + describe: "optional symbol filter", + }) + .option("limit", { + type: "number", + default: 10, + describe: "maximum number of delivery records to return", + }) + .option("json", { + type: "boolean", + default: false, + describe: "output as JSON", + }), + handler: async (args: { + scheduleId?: string + workflow?: string + status?: string + symbol?: string + limit?: number + json?: boolean + }) => { + const deliveries = listInvestingOpsDeliveryRecords({ + scheduleId: args.scheduleId, + workflow: args.workflow as InvestingOpsWorkflow | undefined, + status: args.status as "ok" | "error" | undefined, + symbol: args.symbol, + limit: args.limit, + }) + if (args.json) { + console.log(JSON.stringify({ deliveries, count: deliveries.length }, null, 2)) + return + } + for (const delivery of deliveries) { + console.log( + `- ${delivery.id}: workflow=${delivery.workflow} status=${delivery.status} artifact=${delivery.artifactKind}:${delivery.artifactId ?? "n/a"} symbol=${delivery.symbol ?? "n/a"} summary=${delivery.summary}`, + ) + } + }, +}) + +const InvestingOpsDeliveryCommand = cmd({ + command: "delivery", + describe: "research ops delivery audit trail", + builder: (yargs: Argv) => + yargs + .command(InvestingOpsDeliveryReadCommand) + .command(InvestingOpsDeliveryListCommand) + .demandCommand(), + async handler() {}, +}) + +const InvestingOpsCommand = cmd({ + command: "ops", + describe: "unattended research ops schedules and delivery audit trail", + builder: (yargs: Argv) => + yargs + .command(InvestingOpsScheduleCommand) + .command(InvestingOpsDeliveryCommand) + .demandCommand(), + async handler() {}, +}) + const InvestingBriefingCreateCommand = cmd({ command: "create", describe: "create a persisted daily portfolio briefing", @@ -731,6 +1150,7 @@ export const InvestingCommand = cmd({ .command(InvestingEventCommand) .command(InvestingThesisCommand) .command(InvestingEarningsPacketCommand) + .command(InvestingOpsCommand) .command(InvestingBriefingCommand) .demandCommand(), async handler() {}, diff --git a/packages/zee/src/flux/types.ts b/packages/zee/src/flux/types.ts index 4a818080120c..d42be5e52b62 100644 --- a/packages/zee/src/flux/types.ts +++ b/packages/zee/src/flux/types.ts @@ -68,6 +68,8 @@ export type FluxKind = | "investing.valuation.packet.export" | "investing.earnings.packet" | "investing.earnings.packet.export" + | "investing.ops.schedule" + | "investing.ops.delivery" | "investing.thesis.record" | "investing.thesis.revision" | "investing.thesis.confidence" diff --git a/packages/zee/test/investing/ops-automation.test.ts b/packages/zee/test/investing/ops-automation.test.ts new file mode 100644 index 000000000000..6cf4e63d9b41 --- /dev/null +++ b/packages/zee/test/investing/ops-automation.test.ts @@ -0,0 +1,354 @@ +import { describe, expect, spyOn, test } from "bun:test" +import { mkdirSync } from "node:fs" +import path from "node:path" +import { FluxRecorder } from "../../src/flux" +import { normalizeInvestingConnectorEntities } from "../../src/investing/entities" +import { classifyInvestingConnectorEvents, upsertInvestingEvents } from "../../src/investing/events" +import { tmpdir } from "../fixture/fixture" +import { createInvestingResearchArtifact } from "../../../../src/domain/investing/artifacts" +import { + createInvestingOpsSchedule, + getInvestingOpsSchedule, + listInvestingOpsDeliveryRecords, + registerInvestingOpsSchedules, +} from "../../../../src/domain/investing/ops-automation" +import { + getInvestingResearchExecutionStateFile, + type InvestingResearchExecution, +} from "../../../../src/domain/investing/executor" +import { createInvestingResearchPlan } from "../../../../src/domain/investing/planner" +import { opsAutomationTool } from "../../../../src/domain/investing/tools" +import { + recordInvestingThesisRevision, + syncInvestingThesisContext, + thesisKeyForSymbol, +} from "../../../../src/domain/investing/thesis" + +function makeToolContext() { + return { + sessionId: "session-1", + messageId: "message-1", + agent: "zee", + abort: new AbortController().signal, + metadata: () => {}, + } +} + +async function seedEventDelta(symbol: string, headline: string) { + const events = classifyInvestingConnectorEvents({ + connector: "news", + entities: normalizeInvestingConnectorEntities({ + connector: "news", + collectedAt: "2026-03-15T10:00:00.000Z", + data: [ + { + symbol, + articleId: `${symbol.toLowerCase()}-${headline.toLowerCase().replace(/[^a-z0-9]+/g, "-")}`, + publishedAt: "2026-03-15T09:30:00.000Z", + title: headline, + summary: `${headline} for ${symbol}.`, + sector: "Technology", + }, + ], + }), + }) + await upsertInvestingEvents({ events }) +} + +function seedThesis(symbol: string, summary: string) { + const thesisKey = thesisKeyForSymbol(symbol) + syncInvestingThesisContext({ + thesisKey, + symbol, + summary, + valuation: { + valuationCaseId: `valuation_case:equity:${symbol.toLowerCase()}:base`, + packetId: `valuation-packet-${symbol.toLowerCase()}`, + runId: `valuation-run-${symbol.toLowerCase()}`, + signal: "re-rate-up", + fairValue: 125, + currentPrice: 100, + upsidePercent: 25, + }, + }) + recordInvestingThesisRevision({ + thesisKey, + symbol, + changeType: "refresh", + summary, + thesis: `${summary} Thesis body for ${symbol}.`, + conviction: "high", + posture: "bullish", + evidence: [ + { + kind: "research-evidence", + id: `evidence-${symbol.toLowerCase()}`, + label: `[E1] Research summary for ${symbol}`, + link: `evidence:research-${symbol}:E1`, + toolId: "zee:invest-research", + }, + { + kind: "valuation-packet", + id: `valuation-packet-${symbol.toLowerCase()}`, + label: `Valuation packet for ${symbol}`, + link: `valuation-packet:valuation-packet-${symbol.toLowerCase()}`, + toolId: "zee:invest-valuation", + }, + ], + valuation: { + valuationCaseId: `valuation_case:equity:${symbol.toLowerCase()}:base`, + packetId: `valuation-packet-${symbol.toLowerCase()}`, + runId: `valuation-run-${symbol.toLowerCase()}`, + signal: "re-rate-up", + fairValue: 125, + currentPrice: 100, + upsidePercent: 25, + }, + }) +} + +function makeExecution(input: { + id: string + planId: string + taskId: string + workflow: string + symbol: string + synthesis: string +}): InvestingResearchExecution { + return { + id: input.id, + planId: input.planId, + taskId: input.taskId, + workflow: input.workflow, + status: "ok", + startedAt: "2026-03-15T10:00:00.000Z", + finishedAt: "2026-03-15T10:05:00.000Z", + synthesis: input.synthesis, + provenance: null, + evidence: [ + { + id: `${input.id}:E1`, + citation: "E1", + link: `evidence:${input.id}:E1`, + toolId: "zee:invest-research", + sourceLabel: "Research endpoint", + args: { symbol: input.symbol }, + collectedAt: "2026-03-15T10:01:00.000Z", + status: "completed", + summary: `${input.symbol} setup summary`, + data: { symbol: input.symbol, summary: `${input.symbol} research summary` }, + }, + { + id: `${input.id}:E2`, + citation: "E2", + link: `evidence:${input.id}:E2`, + toolId: "zee:invest-valuation", + sourceLabel: "Investing Valuation Kernel", + args: { symbol: input.symbol }, + collectedAt: "2026-03-15T10:02:00.000Z", + status: "completed", + summary: `${input.symbol} valuation snapshot`, + data: { + id: `valuation-run-${input.symbol.toLowerCase()}`, + valuationCaseId: `valuation_case:equity:${input.symbol.toLowerCase()}:base`, + blendedFairValue: 125, + currentPrice: 100, + upsidePercent: 25, + thesisContext: { + signal: "re-rate-up", + }, + }, + }, + ], + } +} + +async function persistExecution(execution: InvestingResearchExecution): Promise { + const stateFile = getInvestingResearchExecutionStateFile() + mkdirSync(path.dirname(stateFile), { recursive: true }) + await Bun.write( + stateFile, + JSON.stringify({ + version: 1, + executions: [execution], + }), + ) +} + +async function withOpsState(fn: () => Promise): Promise { + await using dir = await tmpdir() + const originalStateHome = process.env.XDG_STATE_HOME + const originalPortfolioFile = process.env.ZEE_INVESTING_PORTFOLIO_FILE + const originalWatchlistFile = process.env.ZEE_INVESTING_WATCHLIST_FILE + process.env.XDG_STATE_HOME = dir.path + process.env.ZEE_INVESTING_PORTFOLIO_FILE = `${dir.path}/portfolio.json` + process.env.ZEE_INVESTING_WATCHLIST_FILE = `${dir.path}/watchlist.json` + + await Bun.write( + process.env.ZEE_INVESTING_PORTFOLIO_FILE, + JSON.stringify({ + positions: [{ symbol: "NVDA", shares: 10, average_cost: 95 }], + }), + ) + await Bun.write( + process.env.ZEE_INVESTING_WATCHLIST_FILE, + JSON.stringify({ + watchlist: [{ symbol: "MSFT" }], + }), + ) + + try { + return await fn() + } finally { + if (originalStateHome === undefined) { + delete process.env.XDG_STATE_HOME + } else { + process.env.XDG_STATE_HOME = originalStateHome + } + if (originalPortfolioFile === undefined) { + delete process.env.ZEE_INVESTING_PORTFOLIO_FILE + } else { + process.env.ZEE_INVESTING_PORTFOLIO_FILE = originalPortfolioFile + } + if (originalWatchlistFile === undefined) { + delete process.env.ZEE_INVESTING_WATCHLIST_FILE + } else { + process.env.ZEE_INVESTING_WATCHLIST_FILE = originalWatchlistFile + } + } +} + +describe("investing ops automation", () => { + test("registers unattended schedules and persists a delivery audit trail for daily briefings", async () => { + await withOpsState(async () => { + const recordSpy = spyOn(FluxRecorder, "record") + const schedule = createInvestingOpsSchedule({ + workflow: "daily-portfolio-brief", + scheduleMinutes: 15, + format: "markdown", + }) + + const tasks: Array<{ id: string; interval: number; run: () => void | Promise }> = [] + const registrations = registerInvestingOpsSchedules({ + register: (task) => { + tasks.push(task) + }, + }) + + expect(registrations).toHaveLength(1) + expect(registrations[0]?.scheduleId).toBe(schedule.id) + expect(tasks[0]?.interval).toBe(15 * 60 * 1000) + + await tasks[0]?.run() + + const deliveries = listInvestingOpsDeliveryRecords({ + scheduleId: schedule.id, + }) + expect(deliveries).toHaveLength(1) + expect(deliveries[0]?.status).toBe("ok") + expect(deliveries[0]?.artifactKind).toBe("portfolio-briefing") + expect(deliveries[0]?.content).toContain("Portfolio Briefing:") + expect(getInvestingOpsSchedule(schedule.id)?.audit.lastStatus).toBe("ok") + expect(recordSpy.mock.calls.some((call) => call[0]?.kind === "investing.ops.schedule")).toBe(true) + expect(recordSpy.mock.calls.some((call) => call[0]?.kind === "investing.ops.delivery")).toBe(true) + }) + }) + + test("tool surface can create, update, run, and inspect earnings automation deliveries", async () => { + await withOpsState(async () => { + seedThesis("NVDA", "NVDA enters earnings with an improving setup.") + await seedEventDelta("NVDA", "NVDA raises guidance ahead of earnings") + + const plan = createInvestingResearchPlan({ + objective: "Prepare a pre-earnings preview for NVDA", + }) + const task = plan.tasks.find((entry) => entry.id === "preview-brief") + if (!task) throw new Error("preview task should exist") + + const execution = makeExecution({ + id: "research-execution-ops", + planId: plan.id, + taskId: task.id, + workflow: plan.workflow, + symbol: "NVDA", + synthesis: "Automated earnings packet delivery should surface the latest setup.", + }) + const artifact = createInvestingResearchArtifact({ + execution, + plan, + task, + }) + execution.artifactId = artifact.id + await persistExecution(execution) + + const runtime = await opsAutomationTool.init() + const ctx = makeToolContext() + + const createResult = await runtime.execute( + { + action: "create-schedule", + workflow: "earnings-preview-packet", + scheduleMinutes: 30, + symbol: "NVDA", + format: "json", + deliveryTarget: "audit-log", + }, + ctx, + ) + const created = JSON.parse(createResult.output) as { id: string } + expect(created.id).toBeDefined() + + const updateResult = await runtime.execute( + { + action: "update-schedule", + scheduleId: created.id, + format: "markdown", + }, + ctx, + ) + expect(JSON.parse(updateResult.output)).toMatchObject({ + id: created.id, + format: "markdown", + }) + + const runResult = await runtime.execute( + { + action: "run-schedule", + scheduleId: created.id, + }, + ctx, + ) + const delivery = JSON.parse(runResult.output) as { id: string; status: string; content: string } + expect(delivery.status).toBe("ok") + + const readDeliveryResult = await runtime.execute( + { + action: "read-delivery", + deliveryId: delivery.id, + }, + ctx, + ) + expect(JSON.parse(readDeliveryResult.output)).toMatchObject({ + id: delivery.id, + status: "ok", + }) + + const listDeliveriesResult = await runtime.execute( + { + action: "list-deliveries", + workflow: "earnings-preview-packet", + symbol: "NVDA", + limit: 5, + }, + ctx, + ) + const listed = JSON.parse(listDeliveriesResult.output) as { + count: number + deliveries: Array<{ id: string; content: string }> + } + expect(listed.count).toBeGreaterThan(0) + expect(listed.deliveries.some((entry) => entry.id === delivery.id)).toBe(true) + expect(listed.deliveries[0]?.content ?? "").toContain("Earnings Preview Packet: NVDA") + }) + }) +}) diff --git a/src/domain/investing/ops-automation.ts b/src/domain/investing/ops-automation.ts new file mode 100644 index 000000000000..47f64a7b4a66 --- /dev/null +++ b/src/domain/investing/ops-automation.ts @@ -0,0 +1,595 @@ +/** + * Investing Research Ops Automation + * + * Registers unattended portfolio and earnings workflows, materializes the + * resulting briefing or packet content, and persists a complete delivery audit + * trail for operator review. + */ + +import { randomUUID } from "node:crypto"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { FluxRecorder } from "../../../packages/zee/src/flux"; +import { Scheduler } from "../../../packages/zee/src/scheduler"; +import { Log } from "../../../packages/zee/src/util/log"; +import { Instance } from "../../../packages/zee/src/project/instance"; +import { + createInvestingPortfolioBriefing, + renderInvestingPortfolioBriefing, +} from "./briefings"; +import { + createInvestingEarningsPacket, + exportInvestingEarningsPacket, +} from "./earnings-packets"; +import { + getInvestingResearchExecution, + listInvestingResearchExecutions, +} from "./executor"; +import { getInvestingResearchPlan } from "./planner"; + +const log = Log.create({ service: "investing:ops-automation" }); + +export const INVESTING_OPS_WORKFLOWS = [ + "daily-portfolio-brief", + "earnings-preview-packet", + "earnings-review-packet", +] as const; + +export type InvestingOpsWorkflow = (typeof INVESTING_OPS_WORKFLOWS)[number]; + +export const INVESTING_OPS_DELIVERY_TARGETS = ["audit-log"] as const; +export type InvestingOpsDeliveryTarget = + (typeof INVESTING_OPS_DELIVERY_TARGETS)[number]; + +export const INVESTING_OPS_FORMATS = ["json", "markdown"] as const; +export type InvestingOpsFormat = (typeof INVESTING_OPS_FORMATS)[number]; + +export type InvestingOpsRunStatus = "ok" | "error"; + +export interface InvestingOpsSchedule { + id: string; + workflow: InvestingOpsWorkflow; + enabled: boolean; + scheduleMinutes: number; + symbol?: string; + watchlistSymbols?: string[]; + format: InvestingOpsFormat; + deliveryTarget: InvestingOpsDeliveryTarget; + createdAt: string; + updatedAt: string; + audit: { + lastRunAt?: string; + lastStatus?: InvestingOpsRunStatus; + lastDeliveryId?: string; + lastArtifactId?: string; + lastError?: string; + }; +} + +export interface InvestingOpsDeliveryRecord { + id: string; + scheduleId: string; + workflow: InvestingOpsWorkflow; + status: InvestingOpsRunStatus; + deliveredAt: string; + deliveryTarget: InvestingOpsDeliveryTarget; + format: InvestingOpsFormat; + artifactKind: "portfolio-briefing" | "earnings-packet"; + artifactId?: string; + symbol?: string; + summary: string; + content: string; + error?: string; +} + +type OpsAutomationState = { + version: 1; + schedules: InvestingOpsSchedule[]; + deliveries: InvestingOpsDeliveryRecord[]; +}; + +type CreateInvestingOpsScheduleInput = { + workflow: InvestingOpsWorkflow; + scheduleMinutes: number; + enabled?: boolean; + symbol?: string; + watchlistSymbols?: string[]; + format?: InvestingOpsFormat; + deliveryTarget?: InvestingOpsDeliveryTarget; +}; + +type UpdateInvestingOpsScheduleInput = { + scheduleId: string; + enabled?: boolean; + scheduleMinutes?: number; + symbol?: string; + watchlistSymbols?: string[]; + format?: InvestingOpsFormat; + deliveryTarget?: InvestingOpsDeliveryTarget; +}; + +type RegisterTask = typeof Scheduler.register; + +function getOpsStateDir(): string { + const stateDir = process.env.XDG_STATE_HOME + ? path.join(process.env.XDG_STATE_HOME, "zee") + : path.join(os.homedir(), ".local", "state", "zee"); + return path.join(stateDir, "investing"); +} + +export function getInvestingOpsAutomationStateFile(): string { + return path.join(getOpsStateDir(), "ops-automation.json"); +} + +function ensureOpsStateDir(): void { + mkdirSync(getOpsStateDir(), { recursive: true }); +} + +function readOpsState(): OpsAutomationState { + const filePath = getInvestingOpsAutomationStateFile(); + if (!existsSync(filePath)) { + return { version: 1, schedules: [], deliveries: [] }; + } + + try { + const parsed = JSON.parse(readFileSync(filePath, "utf-8")) as Partial; + return { + version: 1, + schedules: Array.isArray(parsed.schedules) ? parsed.schedules : [], + deliveries: Array.isArray(parsed.deliveries) ? parsed.deliveries : [], + }; + } catch (error) { + log.warn("failed to read ops automation state", { + error: error instanceof Error ? error.message : String(error), + }); + return { version: 1, schedules: [], deliveries: [] }; + } +} + +function writeOpsState(state: OpsAutomationState): void { + ensureOpsStateDir(); + writeFileSync(getInvestingOpsAutomationStateFile(), JSON.stringify(state, null, 2) + "\n", "utf-8"); +} + +function normalizeSymbol(symbol: string | undefined): string | undefined { + const normalized = symbol?.trim().toUpperCase(); + return normalized ? normalized : undefined; +} + +function uniqueSymbols(symbols?: string[]): string[] | undefined { + const items = [...new Set((symbols ?? []).map((symbol) => symbol.trim().toUpperCase()).filter(Boolean))]; + return items.length > 0 ? items : undefined; +} + +function assertScheduleInput(input: { + workflow: InvestingOpsWorkflow; + symbol?: string; + scheduleMinutes?: number; +}): void { + if (input.scheduleMinutes != null && (!Number.isFinite(input.scheduleMinutes) || input.scheduleMinutes <= 0)) { + throw new Error("scheduleMinutes must be a positive integer."); + } + + if ( + (input.workflow === "earnings-preview-packet" || input.workflow === "earnings-review-packet") && + !normalizeSymbol(input.symbol) + ) { + throw new Error(`${input.workflow} schedules require a symbol.`); + } +} + +function emitScheduleTelemetry( + schedule: InvestingOpsSchedule, + method: "create" | "update" | "scheduler", + taskId?: string, +): void { + FluxRecorder.record({ + traceID: schedule.id, + direction: "internal", + domain: "investing", + kind: "investing.ops.schedule", + status: "ok", + method, + path: schedule.workflow, + route: taskId ?? schedule.id, + metadata: { + scheduleId: schedule.id, + workflow: schedule.workflow, + scheduleMinutes: schedule.scheduleMinutes, + enabled: schedule.enabled, + symbol: schedule.symbol, + watchlistSymbols: schedule.watchlistSymbols, + format: schedule.format, + deliveryTarget: schedule.deliveryTarget, + }, + }); +} + +function taskIdForSchedule(scheduleId: string): string { + return `investing.ops.${scheduleId}`; +} + +function renderBriefingContent(input: { + format: InvestingOpsFormat; + artifact: Awaited>; +}): string { + return input.format === "json" + ? JSON.stringify(input.artifact, null, 2) + : renderInvestingPortfolioBriefing(input.artifact); +} + +function workflowForScheduleToPlan( + workflow: InvestingOpsWorkflow, +): "earnings-preview" | "earnings-review" | null { + switch (workflow) { + case "earnings-preview-packet": + return "earnings-preview"; + case "earnings-review-packet": + return "earnings-review"; + default: + return null; + } +} + +function artifactKindForWorkflow( + workflow: InvestingOpsWorkflow, +): "portfolio-briefing" | "earnings-packet" { + return workflow === "daily-portfolio-brief" ? "portfolio-briefing" : "earnings-packet"; +} + +function findLatestExecutionForSchedule(schedule: InvestingOpsSchedule): { + execution: NonNullable>; + plan: NonNullable>; + taskId: string; +} | null { + const workflow = workflowForScheduleToPlan(schedule.workflow); + if (!workflow) return null; + + const symbol = normalizeSymbol(schedule.symbol); + const candidates = listInvestingResearchExecutions({ limit: 500 }) + .map((execution) => { + const plan = getInvestingResearchPlan(execution.planId); + if (!plan) return null; + const task = plan.tasks.find((entry) => entry.id === execution.taskId); + if (!task || task.phase !== "synthesis") return null; + if (plan.workflow !== workflow) return null; + if (symbol && !plan.symbols.map((item) => item.toUpperCase()).includes(symbol)) return null; + return { execution, plan, taskId: task.id }; + }) + .filter( + ( + item, + ): item is { + execution: NonNullable>; + plan: NonNullable>; + taskId: string; + } => Boolean(item), + ) + .sort((left, right) => right.execution.finishedAt.localeCompare(left.execution.finishedAt)); + + return candidates[0] ?? null; +} + +async function materializeSchedule(schedule: InvestingOpsSchedule): Promise<{ + artifactId: string; + artifactKind: "portfolio-briefing" | "earnings-packet"; + summary: string; + content: string; + symbol?: string; +}> { + switch (schedule.workflow) { + case "daily-portfolio-brief": { + const briefing = await createInvestingPortfolioBriefing({ + watchlistSymbols: schedule.watchlistSymbols, + }); + return { + artifactId: briefing.id, + artifactKind: "portfolio-briefing", + summary: briefing.summary, + content: renderBriefingContent({ artifact: briefing, format: schedule.format }), + }; + } + case "earnings-preview-packet": + case "earnings-review-packet": { + const latest = findLatestExecutionForSchedule(schedule); + if (!latest) { + throw new Error(`No matching research execution found for ${schedule.workflow}${schedule.symbol ? ` (${schedule.symbol})` : ""}.`); + } + + const task = latest.plan.tasks.find((entry) => entry.id === latest.taskId); + if (!task) { + throw new Error(`Research task context not found for execution: ${latest.execution.id}`); + } + + const packet = await createInvestingEarningsPacket({ + execution: latest.execution, + plan: latest.plan, + task, + }); + const exported = + schedule.format === "json" + ? { packet, content: JSON.stringify(packet, null, 2) } + : exportInvestingEarningsPacket({ + packetId: packet.id, + format: "markdown", + }); + return { + artifactId: packet.id, + artifactKind: "earnings-packet", + summary: packet.summary, + content: exported.content, + symbol: packet.symbol, + }; + } + } +} + +export function createInvestingOpsSchedule( + input: CreateInvestingOpsScheduleInput, +): InvestingOpsSchedule { + assertScheduleInput({ + workflow: input.workflow, + symbol: input.symbol, + scheduleMinutes: input.scheduleMinutes, + }); + + const now = new Date().toISOString(); + const schedule: InvestingOpsSchedule = { + id: `investing-ops-schedule-${randomUUID().slice(0, 12)}`, + workflow: input.workflow, + enabled: input.enabled ?? true, + scheduleMinutes: Math.floor(input.scheduleMinutes), + symbol: normalizeSymbol(input.symbol), + watchlistSymbols: uniqueSymbols(input.watchlistSymbols), + format: input.format ?? "markdown", + deliveryTarget: input.deliveryTarget ?? "audit-log", + createdAt: now, + updatedAt: now, + audit: {}, + }; + + const state = readOpsState(); + state.schedules = [schedule, ...state.schedules.filter((entry) => entry.id !== schedule.id)]; + writeOpsState(state); + emitScheduleTelemetry(schedule, "create"); + return schedule; +} + +export function updateInvestingOpsSchedule( + input: UpdateInvestingOpsScheduleInput, +): InvestingOpsSchedule { + const state = readOpsState(); + const existing = state.schedules.find((schedule) => schedule.id === input.scheduleId); + if (!existing) { + throw new Error(`Ops schedule not found: ${input.scheduleId}`); + } + + const next: InvestingOpsSchedule = { + ...existing, + enabled: input.enabled ?? existing.enabled, + scheduleMinutes: input.scheduleMinutes != null ? Math.floor(input.scheduleMinutes) : existing.scheduleMinutes, + symbol: input.symbol !== undefined ? normalizeSymbol(input.symbol) : existing.symbol, + watchlistSymbols: + input.watchlistSymbols !== undefined ? uniqueSymbols(input.watchlistSymbols) : existing.watchlistSymbols, + format: input.format ?? existing.format, + deliveryTarget: input.deliveryTarget ?? existing.deliveryTarget, + updatedAt: new Date().toISOString(), + }; + + assertScheduleInput({ + workflow: next.workflow, + symbol: next.symbol, + scheduleMinutes: next.scheduleMinutes, + }); + + state.schedules = [next, ...state.schedules.filter((entry) => entry.id !== next.id)]; + writeOpsState(state); + emitScheduleTelemetry(next, "update"); + return next; +} + +export function getInvestingOpsSchedule(scheduleId: string): InvestingOpsSchedule | null { + const state = readOpsState(); + return state.schedules.find((schedule) => schedule.id === scheduleId) ?? null; +} + +export function listInvestingOpsSchedules(options?: { + workflow?: InvestingOpsWorkflow; + enabled?: boolean; + symbol?: string; + limit?: number; +}): InvestingOpsSchedule[] { + const symbol = normalizeSymbol(options?.symbol); + const state = readOpsState(); + return state.schedules + .filter((schedule) => (options?.workflow ? schedule.workflow === options.workflow : true)) + .filter((schedule) => (typeof options?.enabled === "boolean" ? schedule.enabled === options.enabled : true)) + .filter((schedule) => (symbol ? schedule.symbol === symbol : true)) + .slice(0, options?.limit ?? 20); +} + +export async function runInvestingOpsSchedule(input: { + scheduleId: string; +}): Promise { + const state = readOpsState(); + const schedule = state.schedules.find((entry) => entry.id === input.scheduleId); + if (!schedule) { + throw new Error(`Ops schedule not found: ${input.scheduleId}`); + } + + const deliveredAt = new Date().toISOString(); + + try { + const materialized = await materializeSchedule(schedule); + const delivery: InvestingOpsDeliveryRecord = { + id: `investing-ops-delivery-${randomUUID().slice(0, 12)}`, + scheduleId: schedule.id, + workflow: schedule.workflow, + status: "ok", + deliveredAt, + deliveryTarget: schedule.deliveryTarget, + format: schedule.format, + artifactKind: materialized.artifactKind, + artifactId: materialized.artifactId, + symbol: materialized.symbol ?? schedule.symbol, + summary: materialized.summary, + content: materialized.content, + }; + + schedule.audit = { + lastRunAt: deliveredAt, + lastStatus: "ok", + lastDeliveryId: delivery.id, + lastArtifactId: materialized.artifactId, + lastError: undefined, + }; + schedule.updatedAt = deliveredAt; + + state.schedules = [schedule, ...state.schedules.filter((entry) => entry.id !== schedule.id)]; + state.deliveries = [delivery, ...state.deliveries].slice(0, 500); + writeOpsState(state); + + FluxRecorder.record({ + traceID: delivery.id, + direction: "internal", + domain: "investing", + kind: "investing.ops.delivery", + status: "ok", + method: "run", + path: schedule.workflow, + route: delivery.id, + metadata: { + scheduleId: schedule.id, + artifactKind: delivery.artifactKind, + artifactId: delivery.artifactId, + symbol: delivery.symbol, + deliveryTarget: delivery.deliveryTarget, + format: delivery.format, + }, + }); + + return delivery; + } catch (error) { + const delivery: InvestingOpsDeliveryRecord = { + id: `investing-ops-delivery-${randomUUID().slice(0, 12)}`, + scheduleId: schedule.id, + workflow: schedule.workflow, + status: "error", + deliveredAt, + deliveryTarget: schedule.deliveryTarget, + format: schedule.format, + artifactKind: artifactKindForWorkflow(schedule.workflow), + symbol: schedule.symbol, + summary: `Delivery failed for ${schedule.workflow}.`, + content: "", + error: error instanceof Error ? error.message : String(error), + }; + + schedule.audit = { + lastRunAt: deliveredAt, + lastStatus: "error", + lastDeliveryId: delivery.id, + lastArtifactId: schedule.audit.lastArtifactId, + lastError: delivery.error, + }; + schedule.updatedAt = deliveredAt; + + state.schedules = [schedule, ...state.schedules.filter((entry) => entry.id !== schedule.id)]; + state.deliveries = [delivery, ...state.deliveries].slice(0, 500); + writeOpsState(state); + + FluxRecorder.record({ + traceID: delivery.id, + direction: "internal", + domain: "investing", + kind: "investing.ops.delivery", + status: "error", + method: "run", + path: schedule.workflow, + route: delivery.id, + metadata: { + scheduleId: schedule.id, + symbol: delivery.symbol, + deliveryTarget: delivery.deliveryTarget, + format: delivery.format, + error: delivery.error, + }, + }); + + return delivery; + } +} + +export function getInvestingOpsDeliveryRecord( + deliveryId: string, +): InvestingOpsDeliveryRecord | null { + const state = readOpsState(); + return state.deliveries.find((delivery) => delivery.id === deliveryId) ?? null; +} + +export function listInvestingOpsDeliveryRecords(options?: { + scheduleId?: string; + workflow?: InvestingOpsWorkflow; + status?: InvestingOpsRunStatus; + symbol?: string; + limit?: number; +}): InvestingOpsDeliveryRecord[] { + const symbol = normalizeSymbol(options?.symbol); + const state = readOpsState(); + return state.deliveries + .filter((delivery) => (options?.scheduleId ? delivery.scheduleId === options.scheduleId : true)) + .filter((delivery) => (options?.workflow ? delivery.workflow === options.workflow : true)) + .filter((delivery) => (options?.status ? delivery.status === options.status : true)) + .filter((delivery) => (symbol ? delivery.symbol === symbol : true)) + .slice(0, options?.limit ?? 20); +} + +export function registerInvestingOpsSchedules(input: { + directory?: string; + register?: RegisterTask; +}): Array<{ scheduleId: string; taskId: string; workflow: InvestingOpsWorkflow; scheduleMinutes: number }> { + const register = input.register ?? Scheduler.register; + const withDirectory = async (fn: () => Promise) => { + if (input.directory) { + return await Instance.provide({ + directory: input.directory, + fn, + }); + } + return await fn(); + }; + + const state = readOpsState(); + const registrations: Array<{ scheduleId: string; taskId: string; workflow: InvestingOpsWorkflow; scheduleMinutes: number }> = []; + + for (const schedule of state.schedules) { + if (!schedule.enabled) continue; + const taskId = taskIdForSchedule(schedule.id); + register({ + id: taskId, + interval: schedule.scheduleMinutes * 60 * 1000, + scope: "global", + run: async () => { + try { + await withDirectory(async () => await runInvestingOpsSchedule({ scheduleId: schedule.id })); + } catch (error) { + log.warn("ops schedule run failed", { + scheduleId: schedule.id, + workflow: schedule.workflow, + error: error instanceof Error ? error.message : String(error), + }); + } + }, + }); + emitScheduleTelemetry(schedule, "scheduler", taskId); + registrations.push({ + scheduleId: schedule.id, + taskId, + workflow: schedule.workflow, + scheduleMinutes: schedule.scheduleMinutes, + }); + } + + return registrations; +} diff --git a/src/domain/investing/tools.ts b/src/domain/investing/tools.ts index e9e631090aff..f35ef1950a1c 100644 --- a/src/domain/investing/tools.ts +++ b/src/domain/investing/tools.ts @@ -58,6 +58,18 @@ import { getInvestingPortfolioBriefingStateFile, listInvestingPortfolioBriefings, } from "./briefings"; +import { + createInvestingOpsSchedule, + getInvestingOpsDeliveryRecord, + getInvestingOpsSchedule, + INVESTING_OPS_DELIVERY_TARGETS, + INVESTING_OPS_FORMATS, + INVESTING_OPS_WORKFLOWS, + listInvestingOpsDeliveryRecords, + listInvestingOpsSchedules, + runInvestingOpsSchedule, + updateInvestingOpsSchedule, +} from "./ops-automation"; import { INVESTING_EVENT_CLASSIFICATIONS, INVESTING_EVENT_CONNECTORS, @@ -1246,6 +1258,179 @@ export const portfolioBriefingsTool: ToolDefinition = { }), }; +const OpsAutomationParams = z.discriminatedUnion("action", [ + z.object({ + action: z.literal("create-schedule"), + workflow: z.enum(INVESTING_OPS_WORKFLOWS).describe("Ops workflow to automate"), + scheduleMinutes: z.number().min(1).describe("Recurring cadence in minutes"), + enabled: z.boolean().optional().default(true).describe("Whether the schedule is active"), + symbol: z.string().optional().describe("Required for earnings packet workflows"), + watchlistSymbols: z.array(z.string()).optional().describe("Optional watchlist override for daily briefs"), + format: z.enum(INVESTING_OPS_FORMATS).optional().default("markdown").describe("Delivery format"), + deliveryTarget: z.enum(INVESTING_OPS_DELIVERY_TARGETS).optional().default("audit-log").describe("Delivery destination"), + }), + z.object({ + action: z.literal("update-schedule"), + scheduleId: z.string().describe("Persisted ops schedule identifier"), + enabled: z.boolean().optional().describe("Updated enabled state"), + scheduleMinutes: z.number().min(1).optional().describe("Updated cadence in minutes"), + symbol: z.string().optional().describe("Updated symbol for earnings workflows"), + watchlistSymbols: z.array(z.string()).optional().describe("Updated watchlist override"), + format: z.enum(INVESTING_OPS_FORMATS).optional().describe("Updated delivery format"), + deliveryTarget: z.enum(INVESTING_OPS_DELIVERY_TARGETS).optional().describe("Updated delivery destination"), + }), + z.object({ + action: z.literal("read-schedule"), + scheduleId: z.string().describe("Persisted ops schedule identifier"), + }), + z.object({ + action: z.literal("list-schedules"), + workflow: z.enum(INVESTING_OPS_WORKFLOWS).optional().describe("Optional workflow filter"), + enabled: z.boolean().optional().describe("Optional enabled filter"), + symbol: z.string().optional().describe("Optional symbol filter"), + limit: z.number().min(1).max(100).default(10).describe("Maximum number of schedules to return"), + }), + z.object({ + action: z.literal("run-schedule"), + scheduleId: z.string().describe("Persisted ops schedule identifier"), + }), + z.object({ + action: z.literal("read-delivery"), + deliveryId: z.string().describe("Persisted ops delivery identifier"), + }), + z.object({ + action: z.literal("list-deliveries"), + scheduleId: z.string().optional().describe("Optional schedule filter"), + workflow: z.enum(INVESTING_OPS_WORKFLOWS).optional().describe("Optional workflow filter"), + status: z.enum(["ok", "error"]).optional().describe("Optional run status filter"), + symbol: z.string().optional().describe("Optional symbol filter"), + limit: z.number().min(1).max(100).default(10).describe("Maximum number of delivery records to return"), + }), +]); + +export const opsAutomationTool: ToolDefinition = { + id: "zee:invest-ops", + category: "domain", + init: async () => ({ + description: `Manage unattended investing research-op schedules and inspect their delivery audit trail.`, + parameters: OpsAutomationParams, + execute: async (args, ctx): Promise => { + switch (args.action) { + case "create-schedule": { + ctx.metadata({ title: `Creating ops schedule for ${args.workflow}` }); + const schedule = createInvestingOpsSchedule({ + workflow: args.workflow, + scheduleMinutes: args.scheduleMinutes, + enabled: args.enabled, + symbol: args.symbol, + watchlistSymbols: args.watchlistSymbols, + format: args.format, + deliveryTarget: args.deliveryTarget, + }); + return { + title: "Investing Ops Automation", + metadata: { action: args.action, scheduleId: schedule.id, workflow: schedule.workflow }, + output: JSON.stringify(schedule, null, 2), + }; + } + case "update-schedule": { + ctx.metadata({ title: `Updating ops schedule ${args.scheduleId}` }); + const schedule = updateInvestingOpsSchedule({ + scheduleId: args.scheduleId, + enabled: args.enabled, + scheduleMinutes: args.scheduleMinutes, + symbol: args.symbol, + watchlistSymbols: args.watchlistSymbols, + format: args.format, + deliveryTarget: args.deliveryTarget, + }); + return { + title: "Investing Ops Automation", + metadata: { action: args.action, scheduleId: schedule.id, workflow: schedule.workflow }, + output: JSON.stringify(schedule, null, 2), + }; + } + case "read-schedule": { + ctx.metadata({ title: `Loading ops schedule ${args.scheduleId}` }); + const schedule = getInvestingOpsSchedule(args.scheduleId); + return { + title: "Investing Ops Automation", + metadata: { action: args.action, scheduleId: args.scheduleId, found: Boolean(schedule) }, + output: JSON.stringify(schedule ?? { error: `Ops schedule not found: ${args.scheduleId}` }, null, 2), + }; + } + case "list-schedules": { + ctx.metadata({ title: "Listing ops schedules" }); + const schedules = listInvestingOpsSchedules({ + workflow: args.workflow, + enabled: args.enabled, + symbol: args.symbol, + limit: args.limit, + }); + return { + title: "Investing Ops Automation", + metadata: { + action: args.action, + count: schedules.length, + workflow: args.workflow, + enabled: args.enabled, + symbol: args.symbol, + }, + output: JSON.stringify({ schedules, count: schedules.length }, null, 2), + }; + } + case "run-schedule": { + ctx.metadata({ title: `Running ops schedule ${args.scheduleId}` }); + const delivery = await runInvestingOpsSchedule({ + scheduleId: args.scheduleId, + }); + return { + title: "Investing Ops Automation", + metadata: { + action: args.action, + scheduleId: args.scheduleId, + deliveryId: delivery.id, + status: delivery.status, + }, + output: JSON.stringify(delivery, null, 2), + }; + } + case "read-delivery": { + ctx.metadata({ title: `Loading ops delivery ${args.deliveryId}` }); + const delivery = getInvestingOpsDeliveryRecord(args.deliveryId); + return { + title: "Investing Ops Automation", + metadata: { action: args.action, deliveryId: args.deliveryId, found: Boolean(delivery) }, + output: JSON.stringify(delivery ?? { error: `Ops delivery not found: ${args.deliveryId}` }, null, 2), + }; + } + case "list-deliveries": { + ctx.metadata({ title: "Listing ops deliveries" }); + const deliveries = listInvestingOpsDeliveryRecords({ + scheduleId: args.scheduleId, + workflow: args.workflow, + status: args.status, + symbol: args.symbol, + limit: args.limit, + }); + return { + title: "Investing Ops Automation", + metadata: { + action: args.action, + count: deliveries.length, + scheduleId: args.scheduleId, + workflow: args.workflow, + status: args.status, + symbol: args.symbol, + }, + output: JSON.stringify({ deliveries, count: deliveries.length }, null, 2), + }; + } + } + }, + }), +}; + // ============================================================================= // Nautilus Trading Tool // ============================================================================= @@ -1421,6 +1606,7 @@ export const INVESTING_TOOLS = [ valuationPacketTool, earningsPacketTool, portfolioBriefingsTool, + opsAutomationTool, researchPlannerTool, researchExecutorTool, researchArtifactsTool,