Skip to content

Commit d761c92

Browse files
authored
.Net ChatClientAgent Streaming API Impl. (#69)
* Add Streaming API * Removing InstructionsRole * Updating thread notification strategy * Fix net472 failing * Address typo
1 parent 89daf17 commit d761c92

5 files changed

Lines changed: 573 additions & 192 deletions

File tree

dotnet/src/Microsoft.Agents/ChatCompletion/ChatClientAgent.cs

Lines changed: 100 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Runtime.CompilerServices;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Microsoft.Extensions.AI;
@@ -19,6 +20,7 @@ public sealed class ChatClientAgent : Agent
1920
{
2021
private readonly ChatClientAgentOptions? _agentOptions;
2122
private readonly ILogger _logger;
23+
private readonly Type _chatClientType;
2224

2325
/// <summary>
2426
/// Initializes a new instance of the <see cref="ChatClientAgent"/> class.
@@ -30,27 +32,17 @@ public ChatClientAgent(IChatClient chatClient, ChatClientAgentOptions? options =
3032
{
3133
Throw.IfNull(chatClient);
3234

35+
this._chatClientType = chatClient.GetType();
3336
this.ChatClient = chatClient.AsAgentInvokingChatClient();
3437
this._agentOptions = options;
3538
this._logger = (loggerFactory ?? chatClient.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance).CreateLogger<ChatClientAgent>();
3639
}
3740

3841
/// <summary>
39-
/// The chat client.
42+
/// The underlying chat client used by the agent to invoke chat completions.
4043
/// </summary>
4144
public IChatClient ChatClient { get; }
4245

43-
/// <summary>
44-
/// Gets the role used for agent instructions. Defaults to "system".
45-
/// </summary>
46-
/// <remarks>
47-
/// Certain versions of "O*" series (deep reasoning) models require the instructions
48-
/// to be provided as "developer" role. Other versions support neither role and
49-
/// an agent targeting such a model cannot provide instructions. Agent functionality
50-
/// will be dictated entirely by the provided plugins.
51-
/// </remarks>
52-
public ChatRole InstructionsRole { get; set; } = ChatRole.System;
53-
5446
/// <inheritdoc/>
5547
public override string Id => this._agentOptions?.Id ?? base.Id;
5648

@@ -72,35 +64,16 @@ public override async Task<ChatResponse> RunAsync(
7264
{
7365
Throw.IfNull(messages);
7466

75-
// Retrieve chat options from the provided AgentRunOptions if available.
76-
ChatOptions? chatOptions = (options as ChatClientAgentRunOptions)?.ChatOptions;
77-
78-
var chatClientThread = this.ValidateOrCreateThreadType<ChatClientAgentThread>(thread, () => new());
79-
80-
// Add any existing messages from the thread to the messages to be sent to the chat client.
81-
List<ChatMessage> threadMessages = [];
82-
if (chatClientThread is IMessagesRetrievableThread messagesRetrievableThread)
83-
{
84-
await foreach (ChatMessage message in messagesRetrievableThread.GetMessagesAsync(cancellationToken).ConfigureAwait(false))
85-
{
86-
threadMessages.Add(message);
87-
}
88-
}
89-
90-
// Append to the existing thread messages the messages that were passed in to this call.
91-
threadMessages.AddRange(messages);
67+
(ChatClientAgentThread chatClientThread, ChatOptions? chatOptions, List<ChatMessage> threadMessages) =
68+
await this.PrepareThreadAndMessagesAsync(thread, messages, options, cancellationToken).ConfigureAwait(false);
9269

93-
// Update the messages with agent instructions.
94-
this.UpdateThreadMessagesWithAgentInstructions(threadMessages, options);
95-
96-
var agentName = this.Name ?? "UnnamedAgent";
97-
Type serviceType = this.ChatClient.GetType();
70+
var agentName = this.GetAgentName();
9871

99-
this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, agentName, serviceType);
72+
this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType);
10073

10174
ChatResponse chatResponse = await this.ChatClient.GetResponseAsync(threadMessages, chatOptions, cancellationToken).ConfigureAwait(false);
10275

103-
this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, serviceType, messages.Count);
76+
this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, messages.Count);
10477

10578
// Only notify the thread of new messages if the chatResponse was successful to avoid inconsistent messages state in the thread.
10679
await this.NotifyThreadOfNewMessagesAsync(chatClientThread, messages, cancellationToken).ConfigureAwait(false);
@@ -112,7 +85,7 @@ public override async Task<ChatResponse> RunAsync(
11285
}
11386

11487
// Convert the chat response messages to a valid IReadOnlyCollection for notification signatures below.
115-
var chatResponseMessages = chatResponse.Messages.ToArray();
88+
var chatResponseMessages = chatResponse.Messages as IReadOnlyCollection<ChatMessage> ?? chatResponse.Messages.ToArray();
11689

11790
await this.NotifyThreadOfNewMessagesAsync(chatClientThread, chatResponseMessages, cancellationToken).ConfigureAwait(false);
11891
if (options?.OnIntermediateMessages is not null)
@@ -124,28 +97,114 @@ public override async Task<ChatResponse> RunAsync(
12497
}
12598

12699
/// <inheritdoc/>
127-
public override IAsyncEnumerable<ChatResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
100+
public override async IAsyncEnumerable<ChatResponseUpdate> RunStreamingAsync(
101+
IReadOnlyCollection<ChatMessage> messages,
102+
AgentThread? thread = null,
103+
AgentRunOptions? options = null,
104+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
128105
{
129-
throw new System.NotImplementedException();
106+
Throw.IfNull(messages);
107+
108+
(ChatClientAgentThread chatClientThread, ChatOptions? chatOptions, List<ChatMessage> threadMessages) =
109+
await this.PrepareThreadAndMessagesAsync(thread, messages, options, cancellationToken).ConfigureAwait(false);
110+
111+
int messageCount = threadMessages.Count;
112+
var agentName = this.GetAgentName();
113+
114+
this._logger.LogAgentChatClientInvokingAgent(nameof(RunStreamingAsync), this.Id, agentName, this._chatClientType);
115+
116+
// Using the enumerator to ensure we consider the case where no updates are returned for notification.
117+
var responseUpdatesEnumerator = this.ChatClient.GetStreamingResponseAsync(threadMessages, chatOptions, cancellationToken).GetAsyncEnumerator(cancellationToken);
118+
119+
this._logger.LogAgentChatClientInvokedStreamingAgent(nameof(RunStreamingAsync), this.Id, agentName, this._chatClientType);
120+
121+
List<ChatResponseUpdate> responseUpdates = [];
122+
123+
// Ensure we start the streaming request
124+
var hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
125+
126+
// To avoid inconsistent state we only notify the thread of the input messages if no error occurs after the initial request.
127+
await this.NotifyThreadOfNewMessagesAsync(chatClientThread, messages, cancellationToken).ConfigureAwait(false);
128+
129+
while (hasUpdates)
130+
{
131+
var update = responseUpdatesEnumerator.Current;
132+
if (update is not null)
133+
{
134+
responseUpdates.Add(update);
135+
update.AuthorName ??= agentName;
136+
yield return update;
137+
}
138+
139+
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
140+
}
141+
142+
var chatResponse = responseUpdates.ToChatResponse();
143+
var chatResponseMessages = chatResponse.Messages as IReadOnlyCollection<ChatMessage> ?? chatResponse.Messages.ToArray();
144+
145+
await this.NotifyThreadOfNewMessagesAsync(chatClientThread, chatResponseMessages, cancellationToken).ConfigureAwait(false);
146+
if (options?.OnIntermediateMessages is not null)
147+
{
148+
await options.OnIntermediateMessages(chatResponseMessages).ConfigureAwait(false);
149+
}
130150
}
131151

132152
/// <inheritdoc/>
133153
public override AgentThread GetNewThread() => new ChatClientAgentThread();
134154

135155
#region Private
136156

157+
/// <summary>
158+
/// Prepares the thread, chat options, and messages for agent execution.
159+
/// </summary>
160+
/// <param name="thread">The conversation thread to use or create.</param>
161+
/// <param name="inputMessages">The input messages to use.</param>
162+
/// <param name="options">Optional parameters for agent invocation.</param>
163+
/// <param name="cancellationToken">The cancellation token.</param>
164+
/// <returns>A tuple containing the thread, chat options, and thread messages.</returns>
165+
private async Task<(ChatClientAgentThread thread, ChatOptions? chatOptions, List<ChatMessage> threadMessages)> PrepareThreadAndMessagesAsync(
166+
AgentThread? thread,
167+
IReadOnlyCollection<ChatMessage> inputMessages,
168+
AgentRunOptions? options,
169+
CancellationToken cancellationToken)
170+
{
171+
// Retrieve chat options from the provided AgentRunOptions if available.
172+
ChatOptions? chatOptions = (options as ChatClientAgentRunOptions)?.ChatOptions;
173+
174+
var chatClientThread = this.ValidateOrCreateThreadType<ChatClientAgentThread>(thread, () => new());
175+
176+
// Add any existing messages from the thread to the messages to be sent to the chat client.
177+
List<ChatMessage> threadMessages = [];
178+
if (chatClientThread is IMessagesRetrievableThread messagesRetrievableThread)
179+
{
180+
await foreach (ChatMessage message in messagesRetrievableThread.GetMessagesAsync(cancellationToken).ConfigureAwait(false))
181+
{
182+
threadMessages.Add(message);
183+
}
184+
}
185+
186+
// Update the messages with agent instructions.
187+
this.UpdateThreadMessagesWithAgentInstructions(threadMessages, options);
188+
189+
// Add the input messages to the end of thread messages.
190+
threadMessages.AddRange(inputMessages);
191+
192+
return (chatClientThread, chatOptions, threadMessages);
193+
}
194+
137195
private void UpdateThreadMessagesWithAgentInstructions(List<ChatMessage> threadMessages, AgentRunOptions? options)
138196
{
139197
if (!string.IsNullOrWhiteSpace(options?.AdditionalInstructions))
140198
{
141-
threadMessages.Insert(0, new(this.InstructionsRole, options?.AdditionalInstructions) { AuthorName = this.Name });
199+
threadMessages.Insert(0, new(ChatRole.System, options?.AdditionalInstructions) { AuthorName = this.Name });
142200
}
143201

144202
if (!string.IsNullOrWhiteSpace(this.Instructions))
145203
{
146-
threadMessages.Insert(0, new(this.InstructionsRole, this.Instructions) { AuthorName = this.Name });
204+
threadMessages.Insert(0, new(ChatRole.System, this.Instructions) { AuthorName = this.Name });
147205
}
148206
}
149207

208+
private string GetAgentName() => this.Name ?? "UnnamedAgent";
150209
#endregion
151210
}

dotnet/src/Microsoft.Agents/ChatCompletion/ChatClientAgentExtensions.cs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,49 @@ namespace Microsoft.Agents;
1414
public static class ChatClientAgentExtensions
1515
{
1616
/// <summary>
17-
/// Allow running a chat client agent with a <see cref="ChatOptions"/> configuration.
17+
/// Run the agent with the provided message and arguments.
1818
/// </summary>
1919
/// <param name="agent">Target agent to run.</param>
20-
/// <param name="messages">Messages to send to the agent.</param>
21-
/// <param name="thread">Optional thread to use for the agent.</param>
22-
/// <param name="agentOptions">Optional agent run options.</param>
20+
/// <param name="messages">The messages to pass to the agent.</param>
21+
/// <param name="thread">The conversation thread to continue with this invocation. If not provided, creates a new thread. The thread will be mutated with the provided messages and agent reponse.</param>
22+
/// <param name="agentRunOptions">Optional parameters for agent invocation.</param>
2323
/// <param name="chatOptions">Optional chat options.</param>
24-
/// <param name="cancellationToken">Optional cancellation token.</param>
25-
/// <returns>A task representing the asynchronous operation, with the chat response.</returns>
24+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
25+
/// <returns>A <see cref="ChatResponse"/> containing the list of <see cref="ChatMessage"/> items.</returns>
2626
public static Task<ChatResponse> RunAsync(
2727
this ChatClientAgent agent,
2828
IReadOnlyCollection<ChatMessage> messages,
2929
AgentThread? thread = null,
30-
AgentRunOptions? agentOptions = null,
30+
AgentRunOptions? agentRunOptions = null,
3131
ChatOptions? chatOptions = null,
3232
CancellationToken cancellationToken = default)
3333
{
3434
Throw.IfNull(agent);
3535
Throw.IfNull(messages);
3636

37-
return agent.RunAsync(messages, thread, new ChatClientAgentRunOptions(agentOptions, chatOptions), cancellationToken);
37+
return agent.RunAsync(messages, thread, new ChatClientAgentRunOptions(agentRunOptions, chatOptions), cancellationToken);
38+
}
39+
40+
/// <summary>
41+
/// Run the agent with the provided message and arguments.
42+
/// </summary>
43+
/// <param name="agent">Target agent to run.</param>
44+
/// <param name="messages">The messages to pass to the agent.</param>
45+
/// <param name="thread">The conversation thread to continue with this invocation. If not provided, creates a new thread. The thread will be mutated with the provided messages and agent reponse.</param>
46+
/// <param name="agentRunOptions">Optional parameters for agent invocation.</param>
47+
/// <param name="chatOptions">Optional chat options.</param>
48+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
49+
public static IAsyncEnumerable<ChatResponseUpdate> RunStreamingAsync(
50+
this ChatClientAgent agent,
51+
IReadOnlyCollection<ChatMessage> messages,
52+
AgentThread? thread = null,
53+
AgentRunOptions? agentRunOptions = null,
54+
ChatOptions? chatOptions = null,
55+
CancellationToken cancellationToken = default)
56+
{
57+
Throw.IfNull(agent);
58+
Throw.IfNull(messages);
59+
60+
return agent.RunStreamingAsync(messages, thread, new ChatClientAgentRunOptions(agentRunOptions, chatOptions), cancellationToken);
3861
}
3962
}

dotnet/src/Microsoft.Agents/ChatCompletion/ChatClientAgentLogMessages.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,27 @@ internal static partial class ChatClientAgentLogMessages
2323
[LoggerMessage(
2424
EventId = 0,
2525
Level = LogLevel.Debug,
26-
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoking service {ServiceType}.")]
26+
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoking client {ClientType}.")]
2727
public static partial void LogAgentChatClientInvokingAgent(
2828
this ILogger logger,
2929
string methodName,
3030
string agentId,
3131
string agentName,
32-
Type serviceType);
32+
Type clientType);
3333

3434
/// <summary>
3535
/// Logs <see cref="ChatClientAgent"/> invoked agent (complete).
3636
/// </summary>
3737
[LoggerMessage(
3838
EventId = 0,
3939
Level = LogLevel.Information,
40-
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoked service {ServiceType} with message count: {MessageCount}.")]
40+
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoked client {ClientType} with message count: {MessageCount}.")]
4141
public static partial void LogAgentChatClientInvokedAgent(
4242
this ILogger logger,
4343
string methodName,
4444
string agentId,
4545
string agentName,
46-
Type serviceType,
46+
Type clientType,
4747
int messageCount);
4848

4949
/// <summary>
@@ -52,11 +52,11 @@ public static partial void LogAgentChatClientInvokedAgent(
5252
[LoggerMessage(
5353
EventId = 0,
5454
Level = LogLevel.Information,
55-
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoked service {ServiceType}.")]
55+
Message = "[{MethodName}] Agent {AgentId}/{AgentName} Invoked client {ClientType}.")]
5656
public static partial void LogAgentChatClientInvokedStreamingAgent(
5757
this ILogger logger,
5858
string methodName,
5959
string agentId,
6060
string agentName,
61-
Type serviceType);
61+
Type clientType);
6262
}

0 commit comments

Comments
 (0)