Skip to content

feat(adk): add WithShouldSendEvent for selective event filtering#930

Open
shentongmartin wants to merge 49 commits intoalpha/09from
feat/event_filter
Open

feat(adk): add WithShouldSendEvent for selective event filtering#930
shentongmartin wants to merge 49 commits intoalpha/09from
feat/event_filter

Conversation

@shentongmartin
Copy link
Copy Markdown
Contributor

Selective Event Filtering for Event Sender Wrappers

Problem

Users have no way to control which events are emitted by eventSenderModelWrapper / eventSenderToolWrapper. Every model response and tool result unconditionally produces an event. When users want to suppress events for internal/helper tools or filter model responses, they must build custom middleware from scratch.

Solution

Add a WithShouldSendEvent option that both NewEventSenderModelWrapper and NewEventSenderToolWrapper accept:

adk.NewEventSenderToolWrapper(adk.WithShouldSendEvent(func(event *adk.AgentEvent) bool {
    return event.Output.MessageOutput.ToolName != "internal_helper"
}))

The predicate receives a defensive copy of the *AgentEvent (via the existing copyAgentEvent), so users can safely inspect any field—including consuming a streaming MessageStream—without breaking the original event delivered to the Runner consumer.

Decisions

Why a predicate on *AgentEvent instead of a simpler filter (e.g., tool name allowlist)?

A predicate gives maximum flexibility—users can filter by role, tool name, action presence, message content (non-streaming), or any combination. A simpler API would either be too restrictive or grow into multiple options over time.

Why defensive copy before calling the predicate?

If a user's predicate consumes the MessageStream (e.g., calling Recv() to inspect content), the original event's stream would be permanently drained. copyAgentEvent forks the stream via .Copy(2), giving the predicate an independent copy. For non-streaming events, the cost is a lightweight struct copy.

Why share SendEventOption between both wrappers?

Both wrappers produce *AgentEvent with the same structure. A shared option type avoids unnecessary duplication while letting users apply the same filtering logic to both if desired.

Key Insight

Single-use streaming resources (MessageStream) require defensive copying at API boundaries where user code might consume them. The existing copyAgentEvent + stream .Copy(2) pattern solves this: fork the stream, give one fork to the user callback, keep the other for the framework. This "tee at the boundary" pattern ensures user-facing predicates can never break the internal event pipeline.

Summary

Problem Solution
No way to suppress specific model/tool events WithShouldSendEvent(func(*AgentEvent) bool) option on both event sender wrappers
User predicate could consume streaming events, breaking the Runner consumer Defensive copy via copyAgentEvent before calling the predicate

事件发送器包装器的选择性事件过滤

问题

用户无法控制 eventSenderModelWrapper / eventSenderToolWrapper 发出哪些事件。每个模型响应和工具结果都会无条件地产生事件。当用户想要抑制内部/辅助工具的事件或过滤模型响应时,必须从头构建自定义中间件。

解决方案

添加 WithShouldSendEvent 选项,NewEventSenderModelWrapperNewEventSenderToolWrapper 均可接受:

adk.NewEventSenderToolWrapper(adk.WithShouldSendEvent(func(event *adk.AgentEvent) bool {
    return event.Output.MessageOutput.ToolName != "internal_helper"
}))

谓词函数接收 *AgentEvent防御性副本(通过已有的 copyAgentEvent),因此用户可以安全地检查任何字段——包括消费流式 MessageStream——而不会破坏交付给 Runner 消费者的原始事件。

设计决策

为什么使用 *AgentEvent 谓词而非更简单的过滤器(如工具名称白名单)?

谓词提供最大灵活性——用户可以按角色、工具名称、Action 存在性、消息内容(非流式)或任意组合进行过滤。更简单的 API 要么限制过多,要么随时间增长为多个选项。

为什么在调用谓词前进行防御性复制?

如果用户的谓词消费了 MessageStream(例如调用 Recv() 检查内容),原始事件的流将被永久耗尽。copyAgentEvent 通过 .Copy(2) 分叉流,为谓词提供独立副本。对于非流式事件,开销仅为轻量级结构体复制。

为什么两个包装器共享 SendEventOption

两个包装器都产生相同结构的 *AgentEvent。共享选项类型避免了不必要的重复,同时允许用户在需要时对两者应用相同的过滤逻辑。

关键洞察

单次使用的流式资源(MessageStream)在用户代码可能消费它们的 API 边界处需要防御性复制。已有的 copyAgentEvent + 流 .Copy(2) 模式解决了这个问题:分叉流,将一个分支给用户回调,保留另一个给框架。这种"边界处分流"模式确保用户端谓词永远不会破坏内部事件管道。

总结

问题 解决方案
无法抑制特定模型/工具事件 两个事件发送器包装器上的 WithShouldSendEvent(func(*AgentEvent) bool) 选项
用户谓词可能消费流式事件,破坏 Runner 消费者 调用谓词前通过 copyAgentEvent 进行防御性复制

mrh997 and others added 30 commits March 30, 2026 09:49
feat(agentic_model):
- format print
- support agentic chat template
- support to compose agentic odel&agentic tools node
- support agentic tool node
- support agentic message concat
#882)

feat(adk): add agentmd middleware for auto-injecting Agents.md into model input

Change-Id: I34add4f925a23c6d6821925c482a21f6cddfddd4
luohq-bytedance and others added 19 commits March 30, 2026 09:49
Change-Id: Ifb3be9fadeabdd8f3b6985bb47d2fb38a7004beb
- Add inputItem param to OnAgentEvent callback
- Move RunOptions from static config field to GenInput return value
- Change CancelOption from pointer to value type

Change-Id: I5bff76223fedb2b48fc5671e90d635a9479b20f6
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change-Id: Iaa0da27dcab3fdd446828a592142a4504278daf7
Change-Id: If20fa78dba82a1c177c8ec47090050ea8c1354ed
* fix(adk): skip saving checkpoint when TurnLoop is idle

When Stop() is called on an idle TurnLoop (no active agent run, no
unhandled items, no canceled items), the resulting checkpoint contains
no meaningful state. Skip saving such checkpoints to avoid unnecessary
store writes.

- Add isIdle check in cleanup() before checkpoint save decision
- Add TestTurnLoop_StopWhileIdle_SkipsCheckpoint test

Change-Id: I6aeaff5ed5833a971cb95298193fdb96d904baf8

* fix(internal): merge id2State in PopulateInterruptState instead of replacing

PopulateInterruptState merged id2Addr entries one by one but replaced
id2State wholesale. In a parallel workflow resume, two goroutines share
the same globalResumeInfo. If one goroutine's compose graph called
PopulateInterruptState (replacing id2State with compose-only entries)
before the other goroutine looked up its outer-level entry, the lookup
returned a zero-value InterruptState with State=nil, triggering the
'has no state' panic in ChatModelAgent.Resume.

Change id2State handling to merge entry by entry, consistent with
id2Addr.

Change-Id: Ia21f65289bff7beb2bc383fb033926ad9c92d7e7

* fix(adk): keep watching for cancel escalation after stopSig.done

When watchStopSignal entered the stopSig.done branch, it processed the
initial cancel and then blocked on <-done (turn completion), never
looping back to check notify. This meant a subsequent Stop() call with
a higher cancel mode (e.g. CancelImmediate) was never forwarded to the
agent, causing TestTurnLoop_Stop_EscalatesCancelMode to time out.

Replace the blocking <-done with an inner loop that selects on both
done and notify, so escalation signals are always delivered. Also apply
the generation-based dedup check consistent with the notify branch.

Change-Id: Ia6a04d00a2b44625ffbcb625ff0e559c12ed145f
…r agent cancellation (#929)

* fix(adk): prevent panic when orphaned tool goroutine sends event after agent cancellation

When CancelAfterChatModel times out and escalates to CancelImmediate,
GraphInterrupt fires with timeout=0. The compose graph returns immediately,
orphaning parallel tool goroutines. When an orphaned tool completes,
eventSenderToolWrapper tries to send an event via the AsyncGenerator which
is already closed, causing 'send on closed channel' panic.

- Add isImmediateCancelled() to cancelContext for checking immediateChan
- Make chatModelAgentExecCtx.send cancel-aware: skip send when immediate cancel is active
- Use trySend as safety net for the TOCTOU race window
- Route SendEvent() through execCtx.send() instead of direct generator.Send()

Change-Id: Ic7e0194c860e2692a3cddc559911ab379024f650

* test(adk): add test for orphaned tool goroutine panic after CancelImmediate

- unit_send_after_close: directly reproduces the panic by sending to a
  closed generator with isImmediateCancelled=true
- unit_send_after_close_without_cancel_ctx: verifies trySend safety net
  prevents panic even without cancelCtx
- integration_cancel_escalation_orphans_tool: end-to-end test with slow
  tool, CancelAfterChatModel timeout escalation, and orphaned goroutine

Change-Id: Ia82fa957b102ccc2ac42094d18d4b15db2a1701c

* test(adk): improve coverage for orphaned tool goroutine fix

Add test cases for:
- nil execCtx and nil generator defensive guards
- nil cancelContext in isImmediateCancelled
- TOCTOU race window (isImmediateCancelled=false but generator closed)
- SendEvent public API with closed generator
- SendEvent without exec context

Change-Id: I197c36f34675f5376cbe5f830b15db6ca873cd1f
Add SendEventOption type with WithShouldSendEvent predicate that allows
users to control whether specific events should be sent through
eventSenderModelWrapper and eventSenderToolWrapper.

The predicate receives a defensive copy of the AgentEvent (via
copyAgentEvent), so it is safe to inspect any field including streaming
MessageStream without affecting the original event delivered to the
Runner consumer.

Both NewEventSenderModelWrapper and NewEventSenderToolWrapper now accept
variadic SendEventOption, remaining fully backward-compatible.

Change-Id: I0fd19dea433ac8487ab8f2870db6a6e77b15ff55
// the default event sender to avoid duplicate events.
//
// Event Filtering:
// Both NewEventSenderModelWrapper and NewEventSenderToolWrapper accept
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Breaking API Changes Detected

Package: github.com/cloudwego/eino/adk

Incompatible changes:

  • NewEventSenderModelWrapper: changed from func() ChatModelAgentMiddleware to func(...SendEventOption) ChatModelAgentMiddleware
  • NewEventSenderToolWrapper: changed from func() ChatModelAgentMiddleware to func(...SendEventOption) ChatModelAgentMiddleware
Review Guidelines

Please ensure that:

  • The changes are absolutely necessary
  • They are properly documented
  • Migration guides are provided if needed

⚠️ Please resolve this thread after reviewing the breaking changes.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 2, 2026

Codecov Report

❌ Patch coverage is 88.46154% with 3 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (alpha/09@fc5c067). Learn more about missing BASE report.

Files with missing lines Patch % Lines
adk/wrappers.go 88.46% 0 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##             alpha/09     #930   +/-   ##
===========================================
  Coverage            ?   81.50%           
===========================================
  Files               ?      158           
  Lines               ?    18934           
  Branches            ?        0           
===========================================
  Hits                ?    15432           
  Misses              ?     2365           
  Partials            ?     1137           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@shentongmartin shentongmartin force-pushed the alpha/09 branch 3 times, most recently from 9546746 to 93fa7f4 Compare April 8, 2026 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

6 participants