-
-
Notifications
You must be signed in to change notification settings - Fork 755
AI LLM Streaming
Atmosphere 4.0 provides a first-class SPI for streaming AI/LLM responses token-by-token to browsers over WebSocket. The atmosphere-ai module defines the core interfaces, and optional adapter modules bridge popular AI frameworks.
Browser ←—WebSocket—→ @ManagedService → StreamingSession → AiStreamingAdapter
↑
Direct write to
AtmosphereResource
(bypasses broadcaster)
When a user sends a prompt, the @Message handler creates a StreamingSession and passes it to an adapter. The adapter calls the AI model's streaming API, forwarding each token to session.send(token). Tokens are written directly to the WebSocket connection — they bypass the broadcaster to avoid re-triggering @Message handlers.
| Module | Artifact | Purpose |
|---|---|---|
| Core SPI | atmosphere-ai |
StreamingSession, AiStreamingAdapter, AiConfig, built-in LLM client |
| Spring AI | atmosphere-spring-ai |
Adapter for Spring AI ChatClient (Flux-based) |
| LangChain4j | atmosphere-langchain4j |
Adapter for LangChain4j StreamingChatLanguageModel
|
| Embabel | atmosphere-embabel |
Adapter for Embabel Agent Framework (Kotlin) |
The simplest path uses atmosphere-ai with its built-in OpenAiCompatibleClient. This works with OpenAI, Google Gemini, Ollama, Azure OpenAI, and any OpenAI-compatible endpoint — with zero additional dependencies.
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-ai</artifactId>
<version>4.0.1</version>
</dependency>Set environment variables:
| Variable | Description | Default |
|---|---|---|
LLM_MODE |
remote (cloud API) or local (Ollama) |
remote |
LLM_MODEL |
Model name: gemini-2.5-flash, gpt-4o, llama3.2, etc. |
gemini-2.5-flash |
LLM_API_KEY |
API key (or GEMINI_API_KEY for Gemini) |
— |
LLM_BASE_URL |
Override the endpoint (auto-detected if omitted) | auto |
The endpoint is auto-detected from the model name:
-
gemini-*→https://generativelanguage.googleapis.com/v1beta/openai -
gpt-*,o1-*,o3-*→https://api.openai.com/v1 -
localmode →http://localhost:11434/v1(Ollama)
Or configure programmatically in Spring Boot:
@Configuration
public class LlmConfig {
@Bean
AiConfig.LlmSettings llmSettings(@Value("${llm.mode:remote}") String mode,
@Value("${llm.model:gemini-2.5-flash}") String model,
@Value("${llm.api-key:}") String apiKey,
@Value("${llm.base-url:}") String baseUrl) {
return AiConfig.configure(mode, model, apiKey, baseUrl.isBlank() ? null : baseUrl);
}
}The corresponding application.yml:
llm:
mode: ${LLM_MODE:remote}
model: ${LLM_MODEL:gemini-2.5-flash}
api-key: ${LLM_API_KEY:${GEMINI_API_KEY:}}
base-url: ${LLM_BASE_URL:}@ManagedService(path = "/ai-chat")
public class AiChat {
@Inject private AtmosphereResource resource;
@Message
public void onMessage(String prompt) {
var settings = AiConfig.get();
var session = StreamingSessions.start(resource);
var request = ChatCompletionRequest.builder(settings.model())
.system("You are a helpful assistant.")
.user(prompt)
.build();
// Stream on a virtual thread — non-blocking
Thread.startVirtualThread(() -> settings.client().streamChatCompletion(request, session));
}
}import { subscribeStreaming } from 'atmosphere.js';
const handle = await subscribeStreaming(atmosphere, {
url: '/ai-chat',
transport: 'websocket',
}, {
onToken: (token) => output.textContent += token,
onProgress: (msg) => status.textContent = msg,
onComplete: () => console.log('Done'),
onError: (err) => console.error(err),
});
handle.send('Explain virtual threads in Java 21');Or with React:
import { useStreaming } from 'atmosphere.js/react';
function AiChat() {
const { fullText, isStreaming, send } = useStreaming({
request: { url: '/ai-chat', transport: 'websocket' },
});
return (
<div>
<button onClick={() => send('What is Atmosphere?')} disabled={isStreaming}>Ask</button>
<p>{fullText}</p>
</div>
);
}Every message is a JSON object written directly to the WebSocket:
{"type":"token","data":"Hello","sessionId":"abc-123","seq":1}
{"type":"token","data":" world","sessionId":"abc-123","seq":2}
{"type":"progress","data":"Thinking...","sessionId":"abc-123","seq":3}
{"type":"metadata","data":"{\"model\":\"gemini-2.5-flash\"}","sessionId":"abc-123","seq":4}
{"type":"complete","data":"","sessionId":"abc-123","seq":5}| Type | Description |
|---|---|
token |
A single token/chunk from the LLM |
progress |
A human-readable status update (e.g., "Searching documents...") |
metadata |
Structured metadata (model name, usage stats) |
complete |
Stream finished successfully |
error |
Stream failed — data contains the error message |
The seq field is a monotonically increasing counter for deduplication on reconnect.
The StreamingSession interface is the bridge between any AI framework and Atmosphere:
public interface StreamingSession extends AutoCloseable {
String sessionId();
void send(String token); // push a token
void sendMetadata(String key, Object value); // structured metadata
void progress(String message); // status update
void complete(); // success
void complete(String summary); // success with summary
void error(Throwable t); // failure
boolean isClosed();
}Create one with StreamingSessions.start(resource). The DefaultStreamingSession implementation writes JSON directly to AtmosphereResource, bypassing the broadcaster.
Implement this to bridge any AI framework:
public interface AiStreamingAdapter<T> {
String name();
void stream(T request, StreamingSession session);
}Example for a custom framework:
public class MyAdapter implements AiStreamingAdapter<MyPrompt> {
@Override public String name() { return "my-ai"; }
@Override
public void stream(MyPrompt request, StreamingSession session) {
myModel.streamTokens(request,
token -> session.send(token),
() -> session.complete(),
err -> session.error(err));
}
}Bridges Spring AI's ChatClient Flux-based streaming to StreamingSession.
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-spring-ai</artifactId>
<version>4.0.1</version>
</dependency>@ManagedService(path = "/ai-chat")
public class AiChat {
@Inject private AtmosphereResource resource;
@Inject private ChatClient chatClient;
private final SpringAiStreamingAdapter adapter = new SpringAiStreamingAdapter();
@Message
public void onMessage(String prompt) {
var session = StreamingSessions.start(resource);
adapter.stream(chatClient, prompt, session);
}
}The adapter subscribes to chatClient.prompt(prompt).stream().chatResponse() and pushes each ChatResponse token through the session.
Bridges LangChain4j's callback-based StreamingChatLanguageModel to StreamingSession.
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-langchain4j</artifactId>
<version>4.0.1</version>
</dependency>@ManagedService(path = "/ai-chat")
public class AiChat {
@Inject private AtmosphereResource resource;
private final LangChain4jStreamingAdapter adapter = new LangChain4jStreamingAdapter();
@Message
public void onMessage(String prompt) {
var session = StreamingSessions.start(resource);
var model = OpenAiStreamingChatModel.builder()
.baseUrl(AiConfig.get().baseUrl())
.apiKey(AiConfig.get().client().apiKey())
.modelName(AiConfig.get().model())
.build();
var chatRequest = ChatRequest.builder()
.messages(List.of(UserMessage.from(prompt)))
.build();
Thread.startVirtualThread(() -> adapter.stream(model, chatRequest, session));
}
}AtmosphereStreamingResponseHandler converts LangChain4j's onNext/onComplete/onError callbacks to StreamingSession calls.
Bridges Embabel's OutputChannel pattern for agentic AI. The Embabel AgentPlatform handles planning, tool calling, and orchestration — Atmosphere streams the agent events to the browser.
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-embabel</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.embabel.agent</groupId>
<artifactId>embabel-agent-platform-autoconfigure</artifactId>
<version>0.3.4</version>
</dependency>Define an Embabel agent:
@Agent(name = "chat-assistant", description = "Answers user questions")
public class ChatAssistantAgent {
@Action(description = "Answer the user's question")
public String answer(String userMessage) {
return "Answer clearly and concisely: " + userMessage;
}
}Run it through Atmosphere:
var session = StreamingSessions.start(resource);
var agent = agentPlatform.agents().stream()
.filter(a -> "chat-assistant".equals(a.getName()))
.findFirst().orElseThrow();
var agentRequest = new AgentRequest("chat-assistant", channel -> {
var options = ProcessOptions.DEFAULT.withOutputChannel(channel);
agentPlatform.runAgentFrom(agent, options, Map.of("userMessage", prompt));
return Unit.INSTANCE;
});
Thread.startVirtualThread(() -> adapter.stream(agentRequest, session));AtmosphereOutputChannel translates agent events (thinking, tool calls, results) into StreamingSession calls with appropriate progress / token / complete messages.
import { atmosphere, subscribeStreaming } from 'atmosphere.js';
const handle = await subscribeStreaming(atmosphere, {
url: '/ai-chat',
transport: 'websocket',
}, {
onToken: (token) => { /* append token to UI */ },
onProgress: (msg) => { /* show status */ },
onMetadata: (meta) => { /* model info, usage */ },
onComplete: () => { /* done */ },
onError: (err) => { /* handle error */ },
});
handle.send('Your prompt here');
handle.close(); // disconnect when doneimport { useStreaming } from 'atmosphere.js/react';
function AiChat() {
const { fullText, tokens, isStreaming, progress, metadata, error, send, reset } = useStreaming({
request: { url: '/ai-chat', transport: 'websocket' },
});
return (
<div>
<button onClick={() => send('What is Atmosphere?')} disabled={isStreaming}>Ask</button>
{isStreaming && <span>{progress ?? 'Generating…'}</span>}
<p>{fullText}</p>
<button onClick={reset}>Clear</button>
</div>
);
}<script setup>
import { useStreaming } from 'atmosphere.js/vue';
const { fullText, isStreaming, send } = useStreaming(
{ url: '/ai-chat', transport: 'websocket' },
);
</script>
<template>
<button @click="send('What is Atmosphere?')" :disabled="isStreaming">Ask</button>
<p>{{ fullText }}</p>
</template><script>
import { createStreamingStore } from 'atmosphere.js/svelte';
const { store: ai, send } = createStreamingStore(
{ url: '/ai-chat', transport: 'websocket' },
);
</script>
<button on:click={() => send('What is Atmosphere?')} disabled={$ai.isStreaming}>Ask</button>
<p>{$ai.fullText}</p>| Sample | AI Framework | Source |
|---|---|---|
| AI Chat (built-in client) | atmosphere-ai |
spring-boot-ai-chat |
| LangChain4j Chat | atmosphere-langchain4j |
spring-boot-langchain4j-chat |
| Embabel Agent Chat | atmosphere-embabel |
spring-boot-embabel-chat |
Instead of streaming AI responses through a dedicated endpoint, you can add an LLM as a virtual member of a Room. When anyone broadcasts a message, the AI receives it, generates a response, and broadcasts the reply back — just like a human participant.
A VirtualRoomMember is a non-connection-based room participant — it has no WebSocket or HTTP connection. It participates purely through an onMessage callback:
public interface VirtualRoomMember {
String id();
void onMessage(Room room, String senderId, Object message);
default Map<String, Object> metadata() { return Map.of(); }
}When room.broadcast(message) is called, the room dispatches the message to all connected clients and all virtual members. Virtual members can call room.broadcast() themselves to reply. Presence events fire for virtual members just like for real connections — PresenceEvent.isVirtual() returns true for these.
The atmosphere-ai module includes LlmRoomMember, a ready-to-use implementation backed by any OpenAI-compatible model:
var settings = AiConfig.get();
var assistant = new LlmRoomMember("assistant", settings.client(), settings.model());
room.joinVirtual(assistant);
// Now any room.broadcast("What is the weather?") triggers an LLM response
// streamed back to all human members via room.broadcast()With a custom system prompt:
var assistant = new LlmRoomMember("code-reviewer", client, "gpt-4o",
"You are a senior code reviewer. Be concise and constructive.");
room.joinVirtual(assistant);@RoomService(path = "/ai-room")
public class AiRoom {
@Inject private AtmosphereResource resource;
@Ready
public void onReady(Room room) {
// Add the AI assistant if not already present
if (room.virtualMembers().isEmpty()) {
var settings = AiConfig.get();
room.joinVirtual(new LlmRoomMember("assistant",
settings.client(), settings.model()));
}
}
@Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})
public ChatMessage onMessage(ChatMessage msg) {
return msg; // broadcast to all humans + the AI virtual member
}
}The AI sees every message, generates a response on a virtual thread, and broadcasts it back. The response appears to all connected clients as a normal room message from "assistant".
-
Loop prevention —
LlmRoomMemberignores messages wheresenderIdequals its ownid, preventing infinite loops. - Virtual threads — LLM calls run on virtual threads to avoid blocking the room.
-
Presence —
room.joinVirtual()fires aPresenceEvent(JOIN)androom.leaveVirtual()firesLEAVE. Useevent.isVirtual()to distinguish from real connections. -
Metadata —
LlmRoomMemberreports{"type": "llm", "model": "gemini-2.5-flash"}viametadata(), visible in presence events. -
Custom implementations — Implement
VirtualRoomMemberdirectly for bots, notification services, or any server-side logic that should participate in room conversations.
- Understanding @ManagedService — annotation-driven endpoints
-
Framework Hooks — React, Vue, Svelte —
useAtmosphere,useRoom,useStreaming - atmosphere.js API Reference — full type reference