Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 76 additions & 24 deletions packages/opencode/src/trigger/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@ import { Log } from "../util/log"
export namespace Trigger {
const log = Log.create({ service: "trigger" })

const Interval = z.object({
type: z.literal("interval"),
interval: z.number().int().positive(),
})

const Once = z.object({
type: z.literal("once"),
at: z.number().int().nonnegative(),
})

const ScheduleInfo = z.discriminatedUnion("type", [Interval, Once])
const ScheduleInput = z.discriminatedUnion("type", [
Interval.extend({
interval: z.number().int().min(10).max(86_400_000),
}),
Once,
])

const Action = z.discriminatedUnion("type", [
z.object({
type: z.literal("command"),
Expand All @@ -40,10 +58,7 @@ export namespace Trigger {
export const Info = z
.object({
id: z.string(),
schedule: z.object({
type: z.literal("interval"),
interval: z.number().int().positive(),
}),
schedule: ScheduleInfo,
action: Action.optional(),
webhook_secret: z.string().min(1).optional(),
enabled: z.boolean(),
Expand All @@ -60,12 +75,40 @@ export namespace Trigger {
})
export type Info = z.infer<typeof Info>

export const CreateInput = z.object({
interval: z.number().int().min(10).max(86_400_000),
const CreateBase = {
action: Action.optional(),
webhook_secret: z.string().min(1).optional(),
})
export type CreateInput = z.infer<typeof CreateInput>
}

export const CreateInput = z.union([
z
.object({
interval: z.number().int().min(10).max(86_400_000),
...CreateBase,
})
.transform((input) => ({
...input,
schedule: {
type: "interval" as const,
interval: input.interval,
},
})),
z.object({
schedule: ScheduleInput,
...CreateBase,
}),
])
export type CreateInput =
| {
interval: number
action?: z.infer<typeof Action>
webhook_secret?: string
}
| {
schedule: z.input<typeof ScheduleInput>
action?: z.infer<typeof Action>
webhook_secret?: string
}

export const Event = {
Fired: BusEvent.define(
Expand Down Expand Up @@ -241,15 +284,26 @@ export namespace Trigger {

const run = Effect.fnUntraced(function* (item: Info, source: Source) {
const at = Date.now()
const next = {
...item,
runs: item.runs + 1,
time: {
...item.time,
last: at,
next: at + item.schedule.interval,
},
}
const next =
item.schedule.type === "interval"
? {
...item,
runs: item.runs + 1,
time: {
...item.time,
last: at,
next: at + item.schedule.interval,
},
}
: {
...item,
enabled: false,
runs: item.runs + 1,
time: {
...item.time,
last: at,
},
}
data.set(item.id, next)
yield* save(next)
yield* bus.publish(Event.Fired, {
Expand Down Expand Up @@ -308,19 +362,17 @@ export namespace Trigger {

const create = Effect.fn("Trigger.create")(function* (input: CreateInput) {
const now = Date.now()
const cfg = CreateInput.parse(input)
const item = {
id: `trg_${randomUUID().replaceAll("-", "")}`,
schedule: {
type: "interval" as const,
interval: input.interval,
},
action: input.action,
webhook_secret: input.webhook_secret,
schedule: cfg.schedule,
action: cfg.action,
webhook_secret: cfg.webhook_secret,
enabled: true,
runs: 0,
time: {
created: now,
next: now + input.interval,
next: cfg.schedule.type === "interval" ? now + cfg.schedule.interval : cfg.schedule.at,
},
} satisfies Info
data.set(item.id, item)
Expand Down
88 changes: 88 additions & 0 deletions packages/opencode/test/trigger/trigger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,94 @@ describe("trigger service", () => {
})
})

test("loads persisted one-shot trigger after instance disposal", async () => {
await using tmp = await tmpdir({ git: true })

const at = Date.now() + 5_000
const created = await Instance.provide({
directory: tmp.path,
fn: async () => {
return await Trigger.create({
schedule: {
type: "once",
at,
},
})
},
})

await Instance.provide({
directory: tmp.path,
fn: async () => Instance.dispose(),
})

await Instance.provide({
directory: tmp.path,
fn: async () => {
expect(await Trigger.get(created.id)).toEqual(created)
},
})
})

test("fires one-shot trigger once when due", async () => {
await using tmp = await tmpdir({ git: true })

await Instance.provide({
directory: tmp.path,
fn: async () => {
const at = Date.now() + 20
const item = await Trigger.create({
schedule: {
type: "once",
at,
},
})

await Bun.sleep(80)

expect(await Trigger.get(item.id)).toMatchObject({
id: item.id,
schedule: {
type: "once",
at,
},
runs: 1,
last: {
source: "schedule",
status: "success",
time: expect.any(Number),
},
})
},
})
})

test("does not repeat one-shot trigger after firing", async () => {
await using tmp = await tmpdir({ git: true })

await Instance.provide({
directory: tmp.path,
fn: async () => {
const item = await Trigger.create({
schedule: {
type: "once",
at: Date.now() + 20,
},
})

await Bun.sleep(80)
const first = await Trigger.get(item.id)

await Bun.sleep(80)
const next = await Trigger.get(item.id)

expect(first.runs).toBe(1)
expect(next.runs).toBe(1)
expect(next.last).toEqual(first.last)
},
})
})

test("fires command action for an idle session", async () => {
await using tmp = await tmpdir({ git: true })

Expand Down