feat(adk): add WithShouldSendEvent for selective event filtering#930
Open
shentongmartin wants to merge 49 commits intoalpha/09from
Open
feat(adk): add WithShouldSendEvent for selective event filtering#930shentongmartin wants to merge 49 commits intoalpha/09from
shentongmartin wants to merge 49 commits intoalpha/09from
Conversation
feat(agentic_model): - format print - support agentic chat template - support to compose agentic odel&agentic tools node - support agentic tool node - support agentic message concat
#882) feat(adk): add agentmd middleware for auto-injecting Agents.md into model input Change-Id: I34add4f925a23c6d6821925c482a21f6cddfddd4
Change-Id: Ifb3be9fadeabdd8f3b6985bb47d2fb38a7004beb
- Add inputItem param to OnAgentEvent callback - Move RunOptions from static config field to GenInput return value - Change CancelOption from pointer to value type Change-Id: I5bff76223fedb2b48fc5671e90d635a9479b20f6 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change-Id: Iaa0da27dcab3fdd446828a592142a4504278daf7
Change-Id: If20fa78dba82a1c177c8ec47090050ea8c1354ed
* fix(adk): skip saving checkpoint when TurnLoop is idle When Stop() is called on an idle TurnLoop (no active agent run, no unhandled items, no canceled items), the resulting checkpoint contains no meaningful state. Skip saving such checkpoints to avoid unnecessary store writes. - Add isIdle check in cleanup() before checkpoint save decision - Add TestTurnLoop_StopWhileIdle_SkipsCheckpoint test Change-Id: I6aeaff5ed5833a971cb95298193fdb96d904baf8 * fix(internal): merge id2State in PopulateInterruptState instead of replacing PopulateInterruptState merged id2Addr entries one by one but replaced id2State wholesale. In a parallel workflow resume, two goroutines share the same globalResumeInfo. If one goroutine's compose graph called PopulateInterruptState (replacing id2State with compose-only entries) before the other goroutine looked up its outer-level entry, the lookup returned a zero-value InterruptState with State=nil, triggering the 'has no state' panic in ChatModelAgent.Resume. Change id2State handling to merge entry by entry, consistent with id2Addr. Change-Id: Ia21f65289bff7beb2bc383fb033926ad9c92d7e7 * fix(adk): keep watching for cancel escalation after stopSig.done When watchStopSignal entered the stopSig.done branch, it processed the initial cancel and then blocked on <-done (turn completion), never looping back to check notify. This meant a subsequent Stop() call with a higher cancel mode (e.g. CancelImmediate) was never forwarded to the agent, causing TestTurnLoop_Stop_EscalatesCancelMode to time out. Replace the blocking <-done with an inner loop that selects on both done and notify, so escalation signals are always delivered. Also apply the generation-based dedup check consistent with the notify branch. Change-Id: Ia6a04d00a2b44625ffbcb625ff0e559c12ed145f
…r agent cancellation (#929) * fix(adk): prevent panic when orphaned tool goroutine sends event after agent cancellation When CancelAfterChatModel times out and escalates to CancelImmediate, GraphInterrupt fires with timeout=0. The compose graph returns immediately, orphaning parallel tool goroutines. When an orphaned tool completes, eventSenderToolWrapper tries to send an event via the AsyncGenerator which is already closed, causing 'send on closed channel' panic. - Add isImmediateCancelled() to cancelContext for checking immediateChan - Make chatModelAgentExecCtx.send cancel-aware: skip send when immediate cancel is active - Use trySend as safety net for the TOCTOU race window - Route SendEvent() through execCtx.send() instead of direct generator.Send() Change-Id: Ic7e0194c860e2692a3cddc559911ab379024f650 * test(adk): add test for orphaned tool goroutine panic after CancelImmediate - unit_send_after_close: directly reproduces the panic by sending to a closed generator with isImmediateCancelled=true - unit_send_after_close_without_cancel_ctx: verifies trySend safety net prevents panic even without cancelCtx - integration_cancel_escalation_orphans_tool: end-to-end test with slow tool, CancelAfterChatModel timeout escalation, and orphaned goroutine Change-Id: Ia82fa957b102ccc2ac42094d18d4b15db2a1701c * test(adk): improve coverage for orphaned tool goroutine fix Add test cases for: - nil execCtx and nil generator defensive guards - nil cancelContext in isImmediateCancelled - TOCTOU race window (isImmediateCancelled=false but generator closed) - SendEvent public API with closed generator - SendEvent without exec context Change-Id: I197c36f34675f5376cbe5f830b15db6ca873cd1f
Add SendEventOption type with WithShouldSendEvent predicate that allows users to control whether specific events should be sent through eventSenderModelWrapper and eventSenderToolWrapper. The predicate receives a defensive copy of the AgentEvent (via copyAgentEvent), so it is safe to inspect any field including streaming MessageStream without affecting the original event delivered to the Runner consumer. Both NewEventSenderModelWrapper and NewEventSenderToolWrapper now accept variadic SendEventOption, remaining fully backward-compatible. Change-Id: I0fd19dea433ac8487ab8f2870db6a6e77b15ff55
| // the default event sender to avoid duplicate events. | ||
| // | ||
| // Event Filtering: | ||
| // Both NewEventSenderModelWrapper and NewEventSenderToolWrapper accept |
There was a problem hiding this comment.
🚨 Breaking API Changes Detected
Package: github.com/cloudwego/eino/adk
Incompatible changes:
- NewEventSenderModelWrapper: changed from func() ChatModelAgentMiddleware to func(...SendEventOption) ChatModelAgentMiddleware
- NewEventSenderToolWrapper: changed from func() ChatModelAgentMiddleware to func(...SendEventOption) ChatModelAgentMiddleware
Review Guidelines
Please ensure that:
- The changes are absolutely necessary
- They are properly documented
- Migration guides are provided if needed
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## alpha/09 #930 +/- ##
===========================================
Coverage ? 81.50%
===========================================
Files ? 158
Lines ? 18934
Branches ? 0
===========================================
Hits ? 15432
Misses ? 2365
Partials ? 1137 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
9546746 to
93fa7f4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Selective Event Filtering for Event Sender Wrappers
Problem
Users have no way to control which events are emitted by
eventSenderModelWrapper/eventSenderToolWrapper. Every model response and tool result unconditionally produces an event. When users want to suppress events for internal/helper tools or filter model responses, they must build custom middleware from scratch.Solution
Add a
WithShouldSendEventoption that bothNewEventSenderModelWrapperandNewEventSenderToolWrapperaccept:The predicate receives a defensive copy of the
*AgentEvent(via the existingcopyAgentEvent), so users can safely inspect any field—including consuming a streamingMessageStream—without breaking the original event delivered to theRunnerconsumer.Decisions
Why a predicate on
*AgentEventinstead of a simpler filter (e.g., tool name allowlist)?A predicate gives maximum flexibility—users can filter by role, tool name, action presence, message content (non-streaming), or any combination. A simpler API would either be too restrictive or grow into multiple options over time.
Why defensive copy before calling the predicate?
If a user's predicate consumes the
MessageStream(e.g., callingRecv()to inspect content), the original event's stream would be permanently drained.copyAgentEventforks the stream via.Copy(2), giving the predicate an independent copy. For non-streaming events, the cost is a lightweight struct copy.Why share
SendEventOptionbetween both wrappers?Both wrappers produce
*AgentEventwith the same structure. A shared option type avoids unnecessary duplication while letting users apply the same filtering logic to both if desired.Key Insight
Single-use streaming resources (
MessageStream) require defensive copying at API boundaries where user code might consume them. The existingcopyAgentEvent+ stream.Copy(2)pattern solves this: fork the stream, give one fork to the user callback, keep the other for the framework. This "tee at the boundary" pattern ensures user-facing predicates can never break the internal event pipeline.Summary
WithShouldSendEvent(func(*AgentEvent) bool)option on both event sender wrapperscopyAgentEventbefore calling the predicate事件发送器包装器的选择性事件过滤
问题
用户无法控制
eventSenderModelWrapper/eventSenderToolWrapper发出哪些事件。每个模型响应和工具结果都会无条件地产生事件。当用户想要抑制内部/辅助工具的事件或过滤模型响应时,必须从头构建自定义中间件。解决方案
添加
WithShouldSendEvent选项,NewEventSenderModelWrapper和NewEventSenderToolWrapper均可接受:谓词函数接收
*AgentEvent的防御性副本(通过已有的copyAgentEvent),因此用户可以安全地检查任何字段——包括消费流式MessageStream——而不会破坏交付给Runner消费者的原始事件。设计决策
为什么使用
*AgentEvent谓词而非更简单的过滤器(如工具名称白名单)?谓词提供最大灵活性——用户可以按角色、工具名称、Action 存在性、消息内容(非流式)或任意组合进行过滤。更简单的 API 要么限制过多,要么随时间增长为多个选项。
为什么在调用谓词前进行防御性复制?
如果用户的谓词消费了
MessageStream(例如调用Recv()检查内容),原始事件的流将被永久耗尽。copyAgentEvent通过.Copy(2)分叉流,为谓词提供独立副本。对于非流式事件,开销仅为轻量级结构体复制。为什么两个包装器共享
SendEventOption?两个包装器都产生相同结构的
*AgentEvent。共享选项类型避免了不必要的重复,同时允许用户在需要时对两者应用相同的过滤逻辑。关键洞察
单次使用的流式资源(
MessageStream)在用户代码可能消费它们的 API 边界处需要防御性复制。已有的copyAgentEvent+ 流.Copy(2)模式解决了这个问题:分叉流,将一个分支给用户回调,保留另一个给框架。这种"边界处分流"模式确保用户端谓词永远不会破坏内部事件管道。总结
WithShouldSendEvent(func(*AgentEvent) bool)选项copyAgentEvent进行防御性复制