feat(ai): add streaming support to Conversation#1433
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1433 +/- ##
==========================================
+ Coverage 68.76% 68.95% +0.19%
==========================================
Files 361 362 +1
Lines 27776 28069 +293
==========================================
+ Hits 19099 19356 +257
- Misses 7831 7855 +24
- Partials 846 858 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This pull request adds first-class streaming support to the AI conversation/provider contracts and implements a concrete streaming response type, enabling providers (notably OpenAI) to emit incremental events and optionally render them as an HTTP streaming response.
Changes:
- Extend
contracts/aiwith streaming types (StreamEvent,StreamableResponse,StreamOption) and addStreammethods toConversationandProvider. - Implement
ai/streamable_response.go(event buffering +Each/Then+ SSE-styleHTTPResponse) and wireConversation.Stream()to update history on completion. - Refactor conversation option handling from
map[string]anyto a typed*contractsai.Optionsstruct, updating helpers, mocks, and tests.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| mocks/ai/StreamOption.go | Adds generated mock for the new StreamOption type. |
| mocks/ai/StreamableResponse.go | Adds generated mock for the new StreamableResponse contract. |
| mocks/ai/RenderFunc.go | Adds generated mock for RenderFunc used by streaming HTTP rendering. |
| mocks/ai/Provider.go | Updates provider mock to include the new Stream method. |
| mocks/ai/Option.go | Updates option mock to match the new typed *ai.Options signature. |
| mocks/ai/Conversation.go | Updates conversation mock to include the new Stream method. |
| contracts/ai/stream.go | Introduces streaming contracts (events/options/response interface). |
| contracts/ai/provider.go | Extends Provider contract with Stream(ctx, prompt) method. |
| contracts/ai/option.go | Replaces map-based options with typed Options and updates Option signature. |
| contracts/ai/ai.go | Extends Conversation contract with Stream(input) method. |
| ai/streamable_response.go | Implements StreamableResponse including Each, Then, and HTTPResponse. |
| ai/provider_test.go | Updates test provider to satisfy the new Provider interface. |
| ai/option.go | Updates option helpers to populate typed options; adds stream option helpers. |
| ai/option_test.go | Updates option tests for the typed options refactor. |
| ai/openai/provider.go | Adds OpenAI streaming implementation using NewStreaming and emits stream events. |
| ai/conversation.go | Adds Conversation.Stream() and appends final messages via Then(). |
| ai/application.go | Switches conversation option application to typed *contractsai.Options. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ai/streamable_response.go
Outdated
| var ( | ||
| _ contractsai.StreamableResponse = (*streamableResponse)(nil) | ||
|
|
||
| errStreamRunnerRequired = stderrors.New("ai stream runner is required") |
There was a problem hiding this comment.
Move this error to errors/list.go
| r.mu.Lock() | ||
| r.response = response | ||
| r.err = err | ||
| r.thenErr = nil | ||
| if err == nil && response != nil { | ||
| callbacks = append(callbacks, r.thenCallbacks...) | ||
| } | ||
| r.thenCallbacks = nil | ||
| r.mu.Unlock() |
There was a problem hiding this comment.
complete() snapshots thenCallbacks and clears r.thenCallbacks while r.finished is still false (it’s only set later at the end). If Then() is called concurrently after the snapshot/clear (lines 251-259) but before finished becomes true (line 274), it will append to r.thenCallbacks and that callback will never be executed (and may also be overwritten/ignored when err/thenErr are finalized). Consider marking the stream as finished (or introducing a completing/done state) under the lock before releasing it, and/or looping to drain any callbacks appended while callbacks are executing so Then() is never lost during completion.
Summary
Conversation.Stream()sends input to the AI provider and returns aStreamableResponsefor consuming tokens incrementallyStreamableResponse.HTTPResponse()pipes events to an HTTP response as Server-Sent Events by default, with optional status code and render overridesOptionis refactored fromfunc(map[string]any)tofunc(*Options)for compile-time safety; a separateStreamOptionpair configures HTTP renderingCloses goravel/goravel#917
Why
The
Conversationinterface previously only supported blockingPrompt()calls that returned the full response at once. AddingStream()lets applications forward tokens to the client as the model generates them, which is essential for chat-like experiences where users expect output to appear in real time.StreamableResponseprovides three methods:Eachfor iterating raw events,Thenfor registering a post-stream callback (e.g. to persist the conversation history after the stream finishes), andHTTPResponsefor wiring the stream into a Goravel HTTP handler with SSE headers applied automatically.