Skip to content
jfarcand edited this page Feb 21, 2026 · 5 revisions

AI / LLM Streaming

Atmosphere 4.0 provides a first-class SPI for streaming AI/LLM responses token-by-token to browsers over WebSocket. The atmosphere-ai module defines the core interfaces, and optional adapter modules bridge popular AI frameworks.

Architecture

Browser  ←—WebSocket—→  @ManagedService  →  StreamingSession  →  AiStreamingAdapter
                                                    ↑
                                              Direct write to
                                            AtmosphereResource
                                         (bypasses broadcaster)

When a user sends a prompt, the @Message handler creates a StreamingSession and passes it to an adapter. The adapter calls the AI model's streaming API, forwarding each token to session.send(token). Tokens are written directly to the WebSocket connection — they bypass the broadcaster to avoid re-triggering @Message handlers.

Modules

Module Artifact Purpose
Core SPI atmosphere-ai StreamingSession, AiStreamingAdapter, AiConfig, built-in LLM client
Spring AI atmosphere-spring-ai Adapter for Spring AI ChatClient (Flux-based)
LangChain4j atmosphere-langchain4j Adapter for LangChain4j StreamingChatLanguageModel
Embabel atmosphere-embabel Adapter for Embabel Agent Framework (Kotlin)

Quick Start — Built-in LLM Client

The simplest path uses atmosphere-ai with its built-in OpenAiCompatibleClient. This works with OpenAI, Google Gemini, Ollama, Azure OpenAI, and any OpenAI-compatible endpoint — with zero additional dependencies.

1. Add the dependency

<dependency>
    <groupId>org.atmosphere</groupId>
    <artifactId>atmosphere-ai</artifactId>
    <version>4.0.1</version>
</dependency>

2. Configure the LLM

Set environment variables:

Variable Description Default
LLM_MODE remote (cloud API) or local (Ollama) remote
LLM_MODEL Model name: gemini-2.5-flash, gpt-4o, llama3.2, etc. gemini-2.5-flash
LLM_API_KEY API key (or GEMINI_API_KEY for Gemini)
LLM_BASE_URL Override the endpoint (auto-detected if omitted) auto

The endpoint is auto-detected from the model name:

  • gemini-*https://generativelanguage.googleapis.com/v1beta/openai
  • gpt-*, o1-*, o3-*https://api.openai.com/v1
  • local mode → http://localhost:11434/v1 (Ollama)

Or configure programmatically in Spring Boot:

@Configuration
public class LlmConfig {
    @Bean
    AiConfig.LlmSettings llmSettings(@Value("${llm.mode:remote}") String mode,
                                      @Value("${llm.model:gemini-2.5-flash}") String model,
                                      @Value("${llm.api-key:}") String apiKey,
                                      @Value("${llm.base-url:}") String baseUrl) {
        return AiConfig.configure(mode, model, apiKey, baseUrl.isBlank() ? null : baseUrl);
    }
}

The corresponding application.yml:

llm:
  mode: ${LLM_MODE:remote}
  model: ${LLM_MODEL:gemini-2.5-flash}
  api-key: ${LLM_API_KEY:${GEMINI_API_KEY:}}
  base-url: ${LLM_BASE_URL:}

3. Create the server endpoint

@ManagedService(path = "/ai-chat")
public class AiChat {

    @Inject private AtmosphereResource resource;

    @Message
    public void onMessage(String prompt) {
        var settings = AiConfig.get();
        var session = StreamingSessions.start(resource);

        var request = ChatCompletionRequest.builder(settings.model())
                .system("You are a helpful assistant.")
                .user(prompt)
                .build();

        // Stream on a virtual thread — non-blocking
        Thread.startVirtualThread(() -> settings.client().streamChatCompletion(request, session));
    }
}

4. Connect from the browser

import { subscribeStreaming } from 'atmosphere.js';

const handle = await subscribeStreaming(atmosphere, {
  url: '/ai-chat',
  transport: 'websocket',
}, {
  onToken:    (token) => output.textContent += token,
  onProgress: (msg)   => status.textContent = msg,
  onComplete: ()      => console.log('Done'),
  onError:    (err)   => console.error(err),
});

handle.send('Explain virtual threads in Java 21');

Or with React:

import { useStreaming } from 'atmosphere.js/react';

function AiChat() {
  const { fullText, isStreaming, send } = useStreaming({
    request: { url: '/ai-chat', transport: 'websocket' },
  });

  return (
    <div>
      <button onClick={() => send('What is Atmosphere?')} disabled={isStreaming}>Ask</button>
      <p>{fullText}</p>
    </div>
  );
}

Wire Protocol

Every message is a JSON object written directly to the WebSocket:

{"type":"token","data":"Hello","sessionId":"abc-123","seq":1}
{"type":"token","data":" world","sessionId":"abc-123","seq":2}
{"type":"progress","data":"Thinking...","sessionId":"abc-123","seq":3}
{"type":"metadata","data":"{\"model\":\"gemini-2.5-flash\"}","sessionId":"abc-123","seq":4}
{"type":"complete","data":"","sessionId":"abc-123","seq":5}
Type Description
token A single token/chunk from the LLM
progress A human-readable status update (e.g., "Searching documents...")
metadata Structured metadata (model name, usage stats)
complete Stream finished successfully
error Stream failed — data contains the error message

The seq field is a monotonically increasing counter for deduplication on reconnect.

Core SPI — StreamingSession

The StreamingSession interface is the bridge between any AI framework and Atmosphere:

public interface StreamingSession extends AutoCloseable {
    String sessionId();
    void send(String token);           // push a token
    void sendMetadata(String key, Object value);  // structured metadata
    void progress(String message);     // status update
    void complete();                   // success
    void complete(String summary);     // success with summary
    void error(Throwable t);           // failure
    boolean isClosed();
}

Create one with StreamingSessions.start(resource). The DefaultStreamingSession implementation writes JSON directly to AtmosphereResource, bypassing the broadcaster.

Core SPI — AiStreamingAdapter

Implement this to bridge any AI framework:

public interface AiStreamingAdapter<T> {
    String name();
    void stream(T request, StreamingSession session);
}

Example for a custom framework:

public class MyAdapter implements AiStreamingAdapter<MyPrompt> {
    @Override public String name() { return "my-ai"; }

    @Override
    public void stream(MyPrompt request, StreamingSession session) {
        myModel.streamTokens(request,
            token -> session.send(token),
            ()    -> session.complete(),
            err   -> session.error(err));
    }
}

Adapter: Spring AI

Bridges Spring AI's ChatClient Flux-based streaming to StreamingSession.

<dependency>
    <groupId>org.atmosphere</groupId>
    <artifactId>atmosphere-spring-ai</artifactId>
    <version>4.0.1</version>
</dependency>
@ManagedService(path = "/ai-chat")
public class AiChat {
    @Inject private AtmosphereResource resource;
    @Inject private ChatClient chatClient;

    private final SpringAiStreamingAdapter adapter = new SpringAiStreamingAdapter();

    @Message
    public void onMessage(String prompt) {
        var session = StreamingSessions.start(resource);
        adapter.stream(chatClient, prompt, session);
    }
}

The adapter subscribes to chatClient.prompt(prompt).stream().chatResponse() and pushes each ChatResponse token through the session.

Adapter: LangChain4j

Bridges LangChain4j's callback-based StreamingChatLanguageModel to StreamingSession.

<dependency>
    <groupId>org.atmosphere</groupId>
    <artifactId>atmosphere-langchain4j</artifactId>
    <version>4.0.1</version>
</dependency>
@ManagedService(path = "/ai-chat")
public class AiChat {
    @Inject private AtmosphereResource resource;

    private final LangChain4jStreamingAdapter adapter = new LangChain4jStreamingAdapter();

    @Message
    public void onMessage(String prompt) {
        var session = StreamingSessions.start(resource);
        var model = OpenAiStreamingChatModel.builder()
                .baseUrl(AiConfig.get().baseUrl())
                .apiKey(AiConfig.get().client().apiKey())
                .modelName(AiConfig.get().model())
                .build();

        var chatRequest = ChatRequest.builder()
                .messages(List.of(UserMessage.from(prompt)))
                .build();

        Thread.startVirtualThread(() -> adapter.stream(model, chatRequest, session));
    }
}

AtmosphereStreamingResponseHandler converts LangChain4j's onNext/onComplete/onError callbacks to StreamingSession calls.

Adapter: Embabel Agent Framework

Bridges Embabel's OutputChannel pattern for agentic AI. The Embabel AgentPlatform handles planning, tool calling, and orchestration — Atmosphere streams the agent events to the browser.

<dependency>
    <groupId>org.atmosphere</groupId>
    <artifactId>atmosphere-embabel</artifactId>
    <version>4.0.1</version>
</dependency>
<dependency>
    <groupId>com.embabel.agent</groupId>
    <artifactId>embabel-agent-platform-autoconfigure</artifactId>
    <version>0.3.4</version>
</dependency>

Define an Embabel agent:

@Agent(name = "chat-assistant", description = "Answers user questions")
public class ChatAssistantAgent {
    @Action(description = "Answer the user's question")
    public String answer(String userMessage) {
        return "Answer clearly and concisely: " + userMessage;
    }
}

Run it through Atmosphere:

var session = StreamingSessions.start(resource);
var agent = agentPlatform.agents().stream()
        .filter(a -> "chat-assistant".equals(a.getName()))
        .findFirst().orElseThrow();

var agentRequest = new AgentRequest("chat-assistant", channel -> {
    var options = ProcessOptions.DEFAULT.withOutputChannel(channel);
    agentPlatform.runAgentFrom(agent, options, Map.of("userMessage", prompt));
    return Unit.INSTANCE;
});
Thread.startVirtualThread(() -> adapter.stream(agentRequest, session));

AtmosphereOutputChannel translates agent events (thinking, tool calls, results) into StreamingSession calls with appropriate progress / token / complete messages.

Client: atmosphere.js Streaming

Vanilla TypeScript

import { atmosphere, subscribeStreaming } from 'atmosphere.js';

const handle = await subscribeStreaming(atmosphere, {
  url: '/ai-chat',
  transport: 'websocket',
}, {
  onToken:    (token) => { /* append token to UI */ },
  onProgress: (msg)   => { /* show status */ },
  onMetadata: (meta)  => { /* model info, usage */ },
  onComplete: ()      => { /* done */ },
  onError:    (err)   => { /* handle error */ },
});

handle.send('Your prompt here');
handle.close(); // disconnect when done

React — useStreaming

import { useStreaming } from 'atmosphere.js/react';

function AiChat() {
  const { fullText, tokens, isStreaming, progress, metadata, error, send, reset } = useStreaming({
    request: { url: '/ai-chat', transport: 'websocket' },
  });

  return (
    <div>
      <button onClick={() => send('What is Atmosphere?')} disabled={isStreaming}>Ask</button>
      {isStreaming && <span>{progress ?? 'Generating…'}</span>}
      <p>{fullText}</p>
      <button onClick={reset}>Clear</button>
    </div>
  );
}

Vue — useStreaming

<script setup>
import { useStreaming } from 'atmosphere.js/vue';

const { fullText, isStreaming, send } = useStreaming(
  { url: '/ai-chat', transport: 'websocket' },
);
</script>

<template>
  <button @click="send('What is Atmosphere?')" :disabled="isStreaming">Ask</button>
  <p>{{ fullText }}</p>
</template>

Svelte — createStreamingStore

<script>
  import { createStreamingStore } from 'atmosphere.js/svelte';

  const { store: ai, send } = createStreamingStore(
    { url: '/ai-chat', transport: 'websocket' },
  );
</script>

<button on:click={() => send('What is Atmosphere?')} disabled={$ai.isStreaming}>Ask</button>
<p>{$ai.fullText}</p>

Samples

Sample AI Framework Source
AI Chat (built-in client) atmosphere-ai spring-boot-ai-chat
LangChain4j Chat atmosphere-langchain4j spring-boot-langchain4j-chat
Embabel Agent Chat atmosphere-embabel spring-boot-embabel-chat

Virtual Room Members — AI as a Room Participant

Instead of streaming AI responses through a dedicated endpoint, you can add an LLM as a virtual member of a Room. When anyone broadcasts a message, the AI receives it, generates a response, and broadcasts the reply back — just like a human participant.

How It Works

A VirtualRoomMember is a non-connection-based room participant — it has no WebSocket or HTTP connection. It participates purely through an onMessage callback:

public interface VirtualRoomMember {
    String id();
    void onMessage(Room room, String senderId, Object message);
    default Map<String, Object> metadata() { return Map.of(); }
}

When room.broadcast(message) is called, the room dispatches the message to all connected clients and all virtual members. Virtual members can call room.broadcast() themselves to reply. Presence events fire for virtual members just like for real connections — PresenceEvent.isVirtual() returns true for these.

LlmRoomMember — Built-in LLM Implementation

The atmosphere-ai module includes LlmRoomMember, a ready-to-use implementation backed by any OpenAI-compatible model:

var settings = AiConfig.get();
var assistant = new LlmRoomMember("assistant", settings.client(), settings.model());
room.joinVirtual(assistant);

// Now any room.broadcast("What is the weather?") triggers an LLM response
// streamed back to all human members via room.broadcast()

With a custom system prompt:

var assistant = new LlmRoomMember("code-reviewer", client, "gpt-4o",
    "You are a senior code reviewer. Be concise and constructive.");
room.joinVirtual(assistant);

Example: AI-Powered Chat Room

@RoomService(path = "/ai-room")
public class AiRoom {

    @Inject private AtmosphereResource resource;

    @Ready
    public void onReady(Room room) {
        // Add the AI assistant if not already present
        if (room.virtualMembers().isEmpty()) {
            var settings = AiConfig.get();
            room.joinVirtual(new LlmRoomMember("assistant",
                settings.client(), settings.model()));
        }
    }

    @Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})
    public ChatMessage onMessage(ChatMessage msg) {
        return msg; // broadcast to all humans + the AI virtual member
    }
}

The AI sees every message, generates a response on a virtual thread, and broadcasts it back. The response appears to all connected clients as a normal room message from "assistant".

Key Details

  • Loop preventionLlmRoomMember ignores messages where senderId equals its own id, preventing infinite loops.
  • Virtual threads — LLM calls run on virtual threads to avoid blocking the room.
  • Presenceroom.joinVirtual() fires a PresenceEvent(JOIN) and room.leaveVirtual() fires LEAVE. Use event.isVirtual() to distinguish from real connections.
  • MetadataLlmRoomMember reports {"type": "llm", "model": "gemini-2.5-flash"} via metadata(), visible in presence events.
  • Custom implementations — Implement VirtualRoomMember directly for bots, notification services, or any server-side logic that should participate in room conversations.

See Also

Clone this wiki locally