Skip to content

Commit b22add2

Browse files
authored
refactor(core): publish sync events to global event stream (#22347)
1 parent 67aaeca commit b22add2

File tree

10 files changed

+597
-409
lines changed

10 files changed

+597
-409
lines changed

packages/app/src/context/global-sdk.tsx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,12 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
155155
resetHeartbeat()
156156
streamErrorLogged = false
157157
const directory = event.directory ?? "global"
158-
const payload = event.payload
158+
if (event.payload.type === "sync") {
159+
continue
160+
}
161+
162+
const payload = event.payload as Event
163+
159164
const k = key(directory, payload)
160165
if (k) {
161166
const i = coalesced.get(k)

packages/opencode/src/bus/bus-event.ts

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,18 @@ export namespace BusEvent {
1616
}
1717

1818
export function payloads() {
19-
return z
20-
.discriminatedUnion(
21-
"type",
22-
registry
23-
.entries()
24-
.map(([type, def]) => {
25-
return z
26-
.object({
27-
type: z.literal(type),
28-
properties: def.properties,
29-
})
30-
.meta({
31-
ref: "Event" + "." + def.type,
32-
})
19+
return registry
20+
.entries()
21+
.map(([type, def]) => {
22+
return z
23+
.object({
24+
type: z.literal(type),
25+
properties: def.properties,
26+
})
27+
.meta({
28+
ref: "Event" + "." + def.type,
3329
})
34-
.toArray() as any,
35-
)
36-
.meta({
37-
ref: "Event",
3830
})
31+
.toArray()
3932
}
4033
}

packages/opencode/src/cli/cmd/tui/context/event.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ export function useEvent() {
88

99
function subscribe(handler: (event: Event) => void) {
1010
return sdk.event.on("event", (event) => {
11+
if (event.payload.type === "sync") {
12+
return
13+
}
14+
1115
// Special hack for truly global events
1216
if (event.directory === "global") {
1317
handler(event.payload)

packages/opencode/src/project/instance.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ const disposal = {
2121
all: undefined as Promise<void> | undefined,
2222
}
2323

24-
function emitDisposed(directory: string) {}
25-
2624
function boot(input: { directory: string; init?: () => Promise<any>; worktree?: string; project?: Project.Info }) {
2725
return iife(async () => {
2826
const ctx =

packages/opencode/src/server/instance/event.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import z from "zod"
12
import { Hono } from "hono"
23
import { describeRoute, resolver } from "hono-openapi"
34
import { streamSSE } from "hono/streaming"
45
import { Log } from "@/util/log"
56
import { BusEvent } from "@/bus/bus-event"
7+
import { SyncEvent } from "@/sync"
68
import { Bus } from "@/bus"
79
import { AsyncQueue } from "../../util/queue"
810

@@ -20,7 +22,11 @@ export const EventRoutes = () =>
2022
description: "Event stream",
2123
content: {
2224
"text/event-stream": {
23-
schema: resolver(BusEvent.payloads()),
25+
schema: resolver(
26+
z.union(BusEvent.payloads()).meta({
27+
ref: "Event",
28+
}),
29+
),
2430
},
2531
},
2632
},

packages/opencode/src/server/instance/global.ts

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export const GlobalRoutes = lazy(() =>
109109
directory: z.string(),
110110
project: z.string().optional(),
111111
workspace: z.string().optional(),
112-
payload: BusEvent.payloads(),
112+
payload: z.union([...BusEvent.payloads(), ...SyncEvent.payloads()]),
113113
})
114114
.meta({
115115
ref: "GlobalEvent",
@@ -135,52 +135,6 @@ export const GlobalRoutes = lazy(() =>
135135
})
136136
},
137137
)
138-
.get(
139-
"/sync-event",
140-
describeRoute({
141-
summary: "Subscribe to global sync events",
142-
description: "Get global sync events",
143-
operationId: "global.sync-event.subscribe",
144-
responses: {
145-
200: {
146-
description: "Event stream",
147-
content: {
148-
"text/event-stream": {
149-
schema: resolver(
150-
z
151-
.object({
152-
payload: SyncEvent.payloads(),
153-
})
154-
.meta({
155-
ref: "SyncEvent",
156-
}),
157-
),
158-
},
159-
},
160-
},
161-
},
162-
}),
163-
async (c) => {
164-
log.info("global sync event connected")
165-
c.header("Cache-Control", "no-cache, no-transform")
166-
c.header("X-Accel-Buffering", "no")
167-
c.header("X-Content-Type-Options", "nosniff")
168-
return streamEvents(c, (q) => {
169-
return SyncEvent.subscribeAll(({ def, event }) => {
170-
// TODO: don't pass def, just pass the type (and it should
171-
// be versioned)
172-
q.push(
173-
JSON.stringify({
174-
payload: {
175-
...event,
176-
type: SyncEvent.versionedType(def.type, def.version),
177-
},
178-
}),
179-
)
180-
})
181-
})
182-
},
183-
)
184138
.get(
185139
"/config",
186140
describeRoute({

packages/opencode/src/sync/index.ts

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ import z from "zod"
22
import type { ZodObject } from "zod"
33
import { EventEmitter } from "events"
44
import { Database, eq } from "@/storage/db"
5+
import { GlobalBus } from "@/bus/global"
56
import { Bus as ProjectBus } from "@/bus"
67
import { BusEvent } from "@/bus/bus-event"
8+
import { Instance } from "@/project/instance"
79
import { EventSequenceTable, EventTable } from "./event.sql"
10+
import { WorkspaceContext } from "@/control-plane/workspace-context"
811
import { EventID } from "./schema"
912
import { Flag } from "@/flag/flag"
1013

@@ -37,8 +40,6 @@ export namespace SyncEvent {
3740
let frozen = false
3841
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
3942

40-
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
41-
4243
export function reset() {
4344
frozen = false
4445
projectors = undefined
@@ -140,11 +141,6 @@ export namespace SyncEvent {
140141
}
141142

142143
Database.effect(() => {
143-
Bus.emit("event", {
144-
def,
145-
event,
146-
})
147-
148144
if (options?.publish) {
149145
const result = convertEvent(def.type, event.data)
150146
if (result instanceof Promise) {
@@ -154,6 +150,17 @@ export namespace SyncEvent {
154150
} else {
155151
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
156152
}
153+
154+
GlobalBus.emit("event", {
155+
directory: Instance.directory,
156+
project: Instance.project.id,
157+
workspace: WorkspaceContext.workspaceID,
158+
payload: {
159+
type: "sync",
160+
name: versionedType(def.type, def.version),
161+
...event,
162+
},
163+
})
157164
}
158165
})
159166
})
@@ -235,31 +242,23 @@ export namespace SyncEvent {
235242
})
236243
}
237244

238-
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
239-
Bus.on("event", handler)
240-
return () => Bus.off("event", handler)
241-
}
242-
243245
export function payloads() {
244-
return z
245-
.union(
246-
registry
247-
.entries()
248-
.map(([type, def]) => {
249-
return z
250-
.object({
251-
type: z.literal(type),
252-
aggregate: z.literal(def.aggregate),
253-
data: def.schema,
254-
})
255-
.meta({
256-
ref: "SyncEvent" + "." + def.type,
257-
})
246+
return registry
247+
.entries()
248+
.map(([type, def]) => {
249+
return z
250+
.object({
251+
type: z.literal("sync"),
252+
name: z.literal(type),
253+
id: z.string(),
254+
seq: z.number(),
255+
aggregateID: z.literal(def.aggregate),
256+
data: def.schema,
257+
})
258+
.meta({
259+
ref: "SyncEvent" + "." + def.type,
258260
})
259-
.toArray() as any,
260-
)
261-
.meta({
262-
ref: "SyncEvent",
263261
})
262+
.toArray()
264263
}
265264
}

packages/sdk/js/src/v2/gen/sdk.gen.ts

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ import type {
5151
GlobalDisposeResponses,
5252
GlobalEventResponses,
5353
GlobalHealthResponses,
54-
GlobalSyncEventSubscribeResponses,
5554
GlobalUpgradeErrors,
5655
GlobalUpgradeResponses,
5756
InstanceDisposeResponses,
@@ -237,20 +236,6 @@ class HeyApiRegistry<T> {
237236
}
238237
}
239238

240-
export class SyncEvent extends HeyApiClient {
241-
/**
242-
* Subscribe to global sync events
243-
*
244-
* Get global sync events
245-
*/
246-
public subscribe<ThrowOnError extends boolean = false>(options?: Options<never, ThrowOnError>) {
247-
return (options?.client ?? this.client).sse.get<GlobalSyncEventSubscribeResponses, unknown, ThrowOnError>({
248-
url: "/global/sync-event",
249-
...options,
250-
})
251-
}
252-
}
253-
254239
export class Config extends HeyApiClient {
255240
/**
256241
* Get global configuration
@@ -350,11 +335,6 @@ export class Global extends HeyApiClient {
350335
})
351336
}
352337

353-
private _syncEvent?: SyncEvent
354-
get syncEvent(): SyncEvent {
355-
return (this._syncEvent ??= new SyncEvent({ client: this.client }))
356-
}
357-
358338
private _config?: Config
359339
get config(): Config {
360340
return (this._config ??= new Config({ client: this.client }))

0 commit comments

Comments
 (0)