Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/plugins/chat/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Logger,
OpenSearchDashboardsRequest,
Capabilities,
HttpAuth,
} from '../../../core/server';

import { ChatPluginSetup, ChatPluginStart } from './types';
Expand All @@ -27,6 +28,7 @@ export class ChatPlugin implements Plugin<ChatPluginSetup, ChatPluginStart> {
private readonly logger: Logger;
private readonly config$: Observable<ChatConfigType>;
private capabilitiesResolver?: (request: OpenSearchDashboardsRequest) => Promise<Capabilities>;
private httpAuth?: HttpAuth;

constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
Expand All @@ -38,6 +40,7 @@ export class ChatPlugin implements Plugin<ChatPluginSetup, ChatPluginStart> {
const config = await this.config$.pipe(first()).toPromise();
const router = core.http.createRouter();
const getCapabilitiesResolver = () => this.capabilitiesResolver;
const getHttpAuth = () => this.httpAuth;

// Register capability to indicate observability agent availability
core.capabilities.registerProvider(() => ({
Expand All @@ -53,7 +56,8 @@ export class ChatPlugin implements Plugin<ChatPluginSetup, ChatPluginStart> {
getCapabilitiesResolver,
config.mlCommonsAgentId,
config.observabilityAgentId,
config.forwardCredentials
config.forwardCredentials,
getHttpAuth
);

return {};
Expand All @@ -64,6 +68,7 @@ export class ChatPlugin implements Plugin<ChatPluginSetup, ChatPluginStart> {

this.capabilitiesResolver = (request: OpenSearchDashboardsRequest) =>
core.capabilities.resolveCapabilities(request);
this.httpAuth = core.http.auth;

return {};
}
Expand Down
141 changes: 126 additions & 15 deletions src/plugins/chat/server/routes/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import supertest from 'supertest';
import { setupServer } from '../../../../core/server/test_utils';
import { loggingSystemMock } from '../../../../core/server/mocks';
import { defineRoutes, generateOboToken } from './index';
import { defineRoutes, generateOboToken, getValidOboToken } from './index';
import { MLAgentRouterFactory } from './ml_routes/ml_agent_router';
import { MLAgentRouterRegistry } from './ml_routes/router_registry';
import { RequestHandlerContext, Logger } from '../../../../core/server';
Expand Down Expand Up @@ -874,14 +874,14 @@ describe('generateOboToken', () => {
} as unknown) as RequestHandlerContext;
});

it('should return OBO token on successful generation', async () => {
it('should return OBO token and duration on successful generation', async () => {
mockTransportRequest.mockResolvedValue({
body: { authenticationToken: 'obo-jwt-token-123' },
body: { authenticationToken: 'obo-jwt-token-123', durationSeconds: 300 },
});

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBe('obo-jwt-token-123');
expect(result).toEqual({ token: 'obo-jwt-token-123', durationSeconds: 300 });
expect(mockLogger.info).toHaveBeenCalledWith(
expect.stringContaining('OBO token generated for credential forwarding to AG-UI endpoint')
);
Expand All @@ -892,14 +892,24 @@ describe('generateOboToken', () => {
});
});

it('should default durationSeconds to 300 when not present in response', async () => {
mockTransportRequest.mockResolvedValue({
body: { authenticationToken: 'obo-jwt-token-123' },
});

const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(result).toEqual({ token: 'obo-jwt-token-123', durationSeconds: 300 });
});

it('should return undefined and warn when response has no authenticationToken', async () => {
mockTransportRequest.mockResolvedValue({
body: { unexpected: 'response' },
});

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBeUndefined();
expect(result).toBeUndefined();
expect(mockLogger.warn).toHaveBeenCalledWith(
'OBO token response did not contain authenticationToken'
);
Expand All @@ -908,9 +918,9 @@ describe('generateOboToken', () => {
it('should return undefined and warn on 404 (security plugin not installed)', async () => {
mockTransportRequest.mockRejectedValue({ statusCode: 404, message: 'Not Found' });

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBeUndefined();
expect(result).toBeUndefined();
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('OBO token generation unavailable (HTTP 404)')
);
Expand All @@ -919,9 +929,9 @@ describe('generateOboToken', () => {
it('should return undefined and warn on 400 (OBO not configured)', async () => {
mockTransportRequest.mockRejectedValue({ statusCode: 400, message: 'Bad Request' });

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBeUndefined();
expect(result).toBeUndefined();
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('OBO token generation unavailable (HTTP 400)')
);
Expand All @@ -930,9 +940,9 @@ describe('generateOboToken', () => {
it('should return undefined and log error on unexpected errors', async () => {
mockTransportRequest.mockRejectedValue(new Error('Connection refused'));

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBeUndefined();
expect(result).toBeUndefined();
expect(mockLogger.error).toHaveBeenCalledWith(
expect.stringContaining('Failed to generate OBO token: Connection refused')
);
Expand All @@ -941,11 +951,112 @@ describe('generateOboToken', () => {
it('should handle error with meta.statusCode for 404', async () => {
mockTransportRequest.mockRejectedValue({ meta: { statusCode: 404 } });

const token = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');
const result = await generateOboToken(mockContext, mockLogger, 'http://agui:3000');

expect(token).toBeUndefined();
expect(result).toBeUndefined();
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('OBO token generation unavailable (HTTP 404)')
);
});
});

describe('getValidOboToken', () => {
let mockLogger: Logger;
let mockContext: RequestHandlerContext;
let mockTransportRequest: jest.Mock;

beforeEach(() => {
mockLogger = ({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
} as unknown) as Logger;

mockTransportRequest = jest.fn();

mockContext = ({
core: {
opensearch: {
client: {
asCurrentUser: {
transport: {
request: mockTransportRequest,
},
},
},
},
},
} as unknown) as RequestHandlerContext;
});

it('should mint a new token when cache is empty', async () => {
mockTransportRequest.mockResolvedValue({
body: { authenticationToken: 'fresh-token', durationSeconds: 300 },
});

const token = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-a');

expect(token).toBe('fresh-token');
expect(mockTransportRequest).toHaveBeenCalledTimes(1);
});

it('should return cached token on subsequent calls within TTL', async () => {
mockTransportRequest.mockResolvedValue({
body: { authenticationToken: 'cached-token', durationSeconds: 300 },
});

// First call — mints
const token1 = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-b');
// Second call — should use cache
const token2 = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-b');

expect(token1).toBe('cached-token');
expect(token2).toBe('cached-token');
expect(mockTransportRequest).toHaveBeenCalledTimes(1); // Only one mint call
expect(mockLogger.debug).toHaveBeenCalledWith('Using cached OBO token');
});

it('should return undefined when token generation fails', async () => {
mockTransportRequest.mockRejectedValue(new Error('Connection refused'));

const token = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-c');

expect(token).toBeUndefined();
});

it('should use separate cache entries per user', async () => {
mockTransportRequest
.mockResolvedValueOnce({
body: { authenticationToken: 'token-user-d', durationSeconds: 300 },
})
.mockResolvedValueOnce({
body: { authenticationToken: 'token-user-e', durationSeconds: 300 },
});

const tokenD = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-d');
const tokenE = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', 'user-e');

expect(tokenD).toBe('token-user-d');
expect(tokenE).toBe('token-user-e');
expect(mockTransportRequest).toHaveBeenCalledTimes(2);
});

it('should skip caching when username is undefined to prevent cross-user token sharing', async () => {
mockTransportRequest
.mockResolvedValueOnce({
body: { authenticationToken: 'token-call-1', durationSeconds: 300 },
})
.mockResolvedValueOnce({
body: { authenticationToken: 'token-call-2', durationSeconds: 300 },
});

// Both calls without username should mint fresh tokens (no caching)
const token1 = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', undefined);
const token2 = await getValidOboToken(mockContext, mockLogger, 'http://agui:3000', undefined);

expect(token1).toBe('token-call-1');
expect(token2).toBe('token-call-2');
expect(mockTransportRequest).toHaveBeenCalledTimes(2); // No caching — minted twice
});
});
84 changes: 77 additions & 7 deletions src/plugins/chat/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,43 @@ import { Readable } from 'stream';
import {
IRouter,
Logger,
HttpAuth,
OpenSearchDashboardsRequest,
RequestHandlerContext,
Capabilities,
} from '../../../../core/server';
import { getPrincipalsFromRequest } from '../../../../core/server/utils';
import { MLAgentRouterFactory } from './ml_routes/ml_agent_router';
import { MLAgentRouterRegistry } from './ml_routes/router_registry';
import { injectSystemPrompt } from '../prompts';
import { getMemoryContainerId } from './utils/get_memory_container_id';

interface OboTokenResult {
token: string;
durationSeconds: number;
}

interface CachedOboToken {
token: string;
expiresAt: number;
}

/** In-memory cache of OBO tokens keyed by username */
const oboTokenCache = new Map<string, CachedOboToken>();

/** Refresh buffer — mint a new token this many ms before expiry */
const OBO_REFRESH_BUFFER_MS = 30_000;

/**
* Generate an On-Behalf-Of (OBO) token using the security plugin API.
* Returns the token string on success, or undefined if the endpoint is
* unavailable or OBO is not configured.
* Returns the token string and its duration on success, or undefined if the
* endpoint is unavailable or OBO is not configured.
*/
export async function generateOboToken(
context: RequestHandlerContext,
logger: Logger,
agUiUrl: string
): Promise<string | undefined> {
): Promise<OboTokenResult | undefined> {
try {
const client = context.core.opensearch.client.asCurrentUser;
const { body } = await client.transport.request({
Expand All @@ -37,9 +55,10 @@ export async function generateOboToken(
},
});
const token = (body as any)?.authenticationToken;
const durationSeconds = (body as any)?.durationSeconds;
if (token) {
logger.info(`OBO token generated for credential forwarding to AG-UI endpoint: ${agUiUrl}`);
return token;
return { token, durationSeconds: durationSeconds ?? 300 };
}
logger.warn('OBO token response did not contain authenticationToken');
return undefined;
Expand All @@ -56,6 +75,53 @@ export async function generateOboToken(
}
}

/**
* Get a valid OBO token for the current user, using a cached token if it has
* not yet expired. When the cached token is within the refresh buffer or
* missing, a fresh token is minted using the cookie-backed credentials
* available via `asCurrentUser`.
*/
export async function getValidOboToken(
context: RequestHandlerContext,
logger: Logger,
agUiUrl: string,
username?: string
): Promise<string | undefined> {
// When username is unknown, skip caching to avoid cross-user token sharing
if (!username) {
const result = await generateOboToken(context, logger, agUiUrl);
return result?.token;
}

const cached = oboTokenCache.get(username);

if (cached) {
if (cached.expiresAt - Date.now() > OBO_REFRESH_BUFFER_MS) {
logger.debug('Using cached OBO token');
return cached.token;
}
// Expired or within refresh buffer — remove stale entry
oboTokenCache.delete(username);
}

// Evict other expired entries to bound memory growth
for (const [key, entry] of oboTokenCache) {
if (entry.expiresAt <= Date.now()) {
oboTokenCache.delete(key);
}
}

const result = await generateOboToken(context, logger, agUiUrl);
if (result) {
oboTokenCache.set(username, {
token: result.token,
expiresAt: Date.now() + result.durationSeconds * 1000,
});
return result.token;
}
return undefined;
}

/**
* Forward request to external AG-UI server
*/
Expand Down Expand Up @@ -131,7 +197,8 @@ export function defineRoutes(
| undefined,
mlCommonsAgentId?: string,
observabilityAgentId?: string,
forwardCredentials?: boolean
forwardCredentials?: boolean,
getHttpAuth?: () => HttpAuth | undefined
) {
// Route for searching agent memory sessions (conversation history)
router.post(
Expand Down Expand Up @@ -288,10 +355,13 @@ export function defineRoutes(
});
}

// Generate OBO token when credential forwarding is enabled
// Get a valid OBO token (cached or freshly minted) when credential forwarding is enabled
let oboToken: string | undefined;
if (forwardCredentials) {
oboToken = await generateOboToken(context, logger, agUiUrl);
const httpAuth = getHttpAuth?.();
const principals = httpAuth ? getPrincipalsFromRequest(request, httpAuth) : undefined;
const username = principals?.users?.[0];
oboToken = await getValidOboToken(context, logger, agUiUrl, username);
}

// Forward to AG-UI capable endpoint. This is the default router.
Expand Down
Loading