|
| 1 | +import { startWorkspaceSyncing } from "@/control-plane/workspace" |
| 2 | +import * as InstanceState from "@/effect/instance-state" |
| 3 | +import { Database, asc, and, eq, lte, not, or } from "@/storage" |
| 4 | +import { SyncEvent } from "@/sync" |
| 5 | +import { EventTable } from "@/sync/event.sql" |
| 6 | +import { Effect, Layer, Schema } from "effect" |
| 7 | +import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" |
| 8 | +import { Authorization } from "./auth" |
| 9 | + |
| 10 | +const root = "/sync" |
| 11 | +const ReplayEvent = Schema.Struct({ |
| 12 | + id: Schema.String, |
| 13 | + aggregateID: Schema.String, |
| 14 | + seq: Schema.Number, |
| 15 | + type: Schema.String, |
| 16 | + data: Schema.Record(Schema.String, Schema.Unknown), |
| 17 | +}).annotate({ identifier: "SyncReplayEvent" }) |
| 18 | +const ReplayPayload = Schema.Struct({ |
| 19 | + directory: Schema.String, |
| 20 | + events: Schema.NonEmptyArray(ReplayEvent), |
| 21 | +}).annotate({ identifier: "SyncReplayInput" }) |
| 22 | +const ReplayResponse = Schema.Struct({ |
| 23 | + sessionID: Schema.String, |
| 24 | +}).annotate({ identifier: "SyncReplayResponse" }) |
| 25 | +const HistoryPayload = Schema.Record(Schema.String, Schema.Number) |
| 26 | +const HistoryEvent = Schema.Struct({ |
| 27 | + id: Schema.String, |
| 28 | + aggregate_id: Schema.String, |
| 29 | + seq: Schema.Number, |
| 30 | + type: Schema.String, |
| 31 | + data: Schema.Record(Schema.String, Schema.Unknown), |
| 32 | +}).annotate({ identifier: "SyncHistoryEvent" }) |
| 33 | + |
| 34 | +export const SyncPaths = { |
| 35 | + start: `${root}/start`, |
| 36 | + replay: `${root}/replay`, |
| 37 | + history: `${root}/history`, |
| 38 | +} as const |
| 39 | + |
| 40 | +export const SyncApi = HttpApi.make("sync") |
| 41 | + .add( |
| 42 | + HttpApiGroup.make("sync") |
| 43 | + .add( |
| 44 | + HttpApiEndpoint.post("start", SyncPaths.start, { |
| 45 | + success: Schema.Boolean, |
| 46 | + }).annotateMerge( |
| 47 | + OpenApi.annotations({ |
| 48 | + identifier: "sync.start", |
| 49 | + summary: "Start workspace sync", |
| 50 | + description: "Start sync loops for workspaces in the current project that have active sessions.", |
| 51 | + }), |
| 52 | + ), |
| 53 | + HttpApiEndpoint.post("replay", SyncPaths.replay, { |
| 54 | + payload: ReplayPayload, |
| 55 | + success: ReplayResponse, |
| 56 | + }).annotateMerge( |
| 57 | + OpenApi.annotations({ |
| 58 | + identifier: "sync.replay", |
| 59 | + summary: "Replay sync events", |
| 60 | + description: "Validate and replay a complete sync event history.", |
| 61 | + }), |
| 62 | + ), |
| 63 | + HttpApiEndpoint.post("history", SyncPaths.history, { |
| 64 | + payload: HistoryPayload, |
| 65 | + success: Schema.Array(HistoryEvent), |
| 66 | + }).annotateMerge( |
| 67 | + OpenApi.annotations({ |
| 68 | + identifier: "sync.history.list", |
| 69 | + summary: "List sync events", |
| 70 | + description: |
| 71 | + "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.", |
| 72 | + }), |
| 73 | + ), |
| 74 | + ) |
| 75 | + .annotateMerge( |
| 76 | + OpenApi.annotations({ |
| 77 | + title: "sync", |
| 78 | + description: "Experimental HttpApi sync routes.", |
| 79 | + }), |
| 80 | + ) |
| 81 | + .middleware(Authorization), |
| 82 | + ) |
| 83 | + .annotateMerge( |
| 84 | + OpenApi.annotations({ |
| 85 | + title: "opencode experimental HttpApi", |
| 86 | + version: "0.0.1", |
| 87 | + description: "Experimental HttpApi surface for selected instance routes.", |
| 88 | + }), |
| 89 | + ) |
| 90 | + |
| 91 | +export const syncHandlers = Layer.unwrap( |
| 92 | + Effect.gen(function* () { |
| 93 | + const start = Effect.fn("SyncHttpApi.start")(function* () { |
| 94 | + startWorkspaceSyncing((yield* InstanceState.context).project.id) |
| 95 | + return true |
| 96 | + }) |
| 97 | + |
| 98 | + const replay = Effect.fn("SyncHttpApi.replay")(function* (ctx: { payload: typeof ReplayPayload.Type }) { |
| 99 | + const payload = Schema.decodeUnknownSync(ReplayPayload)(ctx.payload) |
| 100 | + const events: SyncEvent.SerializedEvent[] = payload.events.map((event) => ({ |
| 101 | + id: event.id, |
| 102 | + aggregateID: event.aggregateID, |
| 103 | + seq: event.seq, |
| 104 | + type: event.type, |
| 105 | + data: { ...event.data }, |
| 106 | + })) |
| 107 | + SyncEvent.replayAll(events) |
| 108 | + return { sessionID: events[0].aggregateID } |
| 109 | + }) |
| 110 | + |
| 111 | + const history = Effect.fn("SyncHttpApi.history")(function* (ctx: { payload: typeof HistoryPayload.Type }) { |
| 112 | + const exclude = Object.entries(ctx.payload) |
| 113 | + return Database.use((db) => |
| 114 | + db |
| 115 | + .select() |
| 116 | + .from(EventTable) |
| 117 | + .where( |
| 118 | + exclude.length > 0 |
| 119 | + ? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!) |
| 120 | + : undefined, |
| 121 | + ) |
| 122 | + .orderBy(asc(EventTable.seq)) |
| 123 | + .all(), |
| 124 | + ) |
| 125 | + }) |
| 126 | + |
| 127 | + return HttpApiBuilder.group(SyncApi, "sync", (handlers) => |
| 128 | + handlers.handle("start", start).handle("replay", replay).handle("history", history), |
| 129 | + ) |
| 130 | + }), |
| 131 | +) |
0 commit comments