package bridge // event_serializer.go - 将 engine.Event 序列化为 BridgeEvent JSON. // // 这是引擎层和传输层之间的"翻译器": // - 引擎层:Go 接口 + type switch(编译时类型安全) // - 传输层:JSON 字符串,推给远端客户端(语言无关) // // 升华改进(ELEVATED): 早期方案 在每个传输路径中分别处理事件序列化, // 导致 SSE/WS 两路代码不一致.我们统一在 EventSerializer 中处理,一处修改全部生效. // 替代方案:<每个 SessionConn 实现自己的序列化> - 否决:多协议维护成本翻倍. import ( "encoding/json" "fmt" "sync/atomic" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/engine" ) // EventSerializer 将 engine.Event 转换为可推送给客户端的 BridgeEvent. // 每个 SessionConn 持有一个 EventSerializer 实例,保持事件 ID 单调递增. type EventSerializer struct { sessionID string seq atomic.Uint64 // 精妙之处(CLEVER): atomic 避免锁,高频调用安全 } // NewEventSerializer 创建绑定到指定会话的序列化器. func NewEventSerializer(sessionID string) *EventSerializer { return &EventSerializer{sessionID: sessionID} } // Serialize 将 engine.Event 转换为 BridgeEvent. // // 精妙之处(CLEVER): 每次调用原子递增 seq,即使并发调用也不会产生重复 ID. // ID 格式 "-" 确保跨会话唯一--多会话场景下客户端能区分来源. // // 对未知事件类型返回 type="unknown" 而非 error--引擎新版本增加事件类型时, // 旧版客户端不会崩溃(向前兼容). // // Shape 提取到 eventPayload helper, 让 SubAgentEvent 能递归取 Inner 的 // 类型和 payload 做扁平化合并 (避免 17 case struct literal 在两处重复). // // Shape extraction moved to eventPayload helper so SubAgentEvent can // recursively obtain its Inner's type and payload for flat merging (avoids // duplicating the 17 struct literals across two switches). func (s *EventSerializer) Serialize(evt engine.Event) BridgeEvent { seq := s.seq.Add(1) id := fmt.Sprintf("%s-%d", s.sessionID, seq) now := time.Now() evtType, payload := eventPayload(evt) return BridgeEvent{ ID: id, Type: evtType, SessionID: s.sessionID, Payload: payload, Timestamp: now, } } // eventPayload 返回给定 Event 对应的 (type, payload JSON). SubAgentEvent // 递归调用自己拿 Inner 的 (innerType, innerPayload) 再合并为扁平 // {subagent_id, event_type, ...inner_payload} shape, 消费端只需一次 // JSON.parse 就能拿到子 agent ID 和内层业务字段, 不用两层 unwrap. // // eventPayload returns the (type, payload) pair for a given Event. // SubAgentEvent calls this function recursively on its Inner and flattens // the result into {subagent_id, event_type, ...inner_payload} so consumers // only need one JSON.parse to reach both the sub-agent ID and the inner // business fields — no nested unwrap. func eventPayload(evt engine.Event) (string, json.RawMessage) { switch e := evt.(type) { case *engine.TextDeltaEvent: return "text_delta", mustMarshal(map[string]string{"text": e.Text}) case *engine.TextEvent: return "text", mustMarshal(map[string]string{"text": e.Text}) case *engine.ThinkingDeltaEvent: return "thinking_delta", mustMarshal(map[string]string{"text": e.Text}) case *engine.ThinkingEvent: return "thinking", mustMarshal(map[string]string{"text": e.Text}) case *engine.ToolUseEvent: return "tool_use", mustMarshal(struct { ID string `json:"id"` Name string `json:"name"` Input map[string]any `json:"input,omitempty"` }{e.ID, e.ToolName, e.Input}) case *engine.ToolResultEvent: return "tool_result", mustMarshal(struct { ID string `json:"id"` Name string `json:"name"` Output string `json:"output"` IsError bool `json:"is_error"` Truncated bool `json:"truncated,omitempty"` StoredPath string `json:"stored_path,omitempty"` }{e.ID, e.ToolName, e.Output, e.IsError, e.Truncated, e.StoredPath}) case *engine.ToolProgressEvent: return "tool_progress", mustMarshal(struct { ID string `json:"id"` Name string `json:"name"` Progress float64 `json:"progress"` Detail string `json:"detail,omitempty"` }{e.ID, e.ToolName, e.Progress, e.Detail}) case *engine.PermissionRequestEvent: return "permission_request", mustMarshal(struct { ID string `json:"id"` ToolName string `json:"tool_name"` Input map[string]any `json:"input,omitempty"` Message string `json:"message"` }{e.ID, e.ToolName, e.Input, e.Message}) case *engine.PlanApprovalEvent: // plan_approval surfaces the ExitPlanMode tool's approval gate to // external subscribers (TUI / SDK) so they can render an approval // UI and invoke event.Approve / Reject asynchronously (push mode, // racing against the ApprovalPolicy pull path — first-to-resolve // wins inside PlanModeManager.Exit). // // Approve / Reject func fields are intentionally omitted from the // JSON payload — functions aren't marshallable, and the push // callback only makes sense to in-process Go subscribers who hold // the event pointer directly. SSE / remote clients use a separate // resolve API (to be added) to feed decisions back to the engine. // // plan_approval 把 ExitPlanMode 工具的审批闸暴露给外部订阅者 (TUI // / SDK), 让它们渲染审批 UI 并异步调 event.Approve / Reject (push // 模式, 与 ApprovalPolicy pull 路径竞速 — PlanModeManager.Exit 内 // first-to-resolve 胜). // // Approve / Reject func 字段故意不序列化 — 函数不可 marshal, push // 回调仅对持有 event 指针的进程内 Go 订阅者有意义. SSE / 远端客户 // 端走另一路 resolve API (待加) 把决策回传给引擎. return "plan_approval", mustMarshal(struct { SessionID string `json:"session_id"` Plan string `json:"plan"` FilePath string `json:"file_path,omitempty"` Steps []engine.PlanStep `json:"steps,omitempty"` }{e.SessionID, e.Plan, e.FilePath, e.Steps}) case *engine.TurnStartEvent: // context_window surfaces the model's full context window in tokens // (provider.Models() truth) so clients can render budget bars without // re-implementing model lookups. // // context_window 透传模型完整上下文窗口 token 数 (由 provider.Models() // 提供), 让客户端可直接渲染预算进度条, 无需再自行维护模型表. return "turn_start", mustMarshal(struct { Turn int `json:"turn"` Model string `json:"model,omitempty"` ContextWindow int `json:"context_window,omitempty"` }{e.TurnNumber, e.Model, e.ContextWindowTokens}) case *engine.TurnEndEvent: // max_tokens is the per-turn output cap actually sent to the provider // (after FastMode / reasoning downgrade). Clients use it to spot when // replies truncate because they hit the cap vs stopped naturally. // // max_tokens 是本轮实际发给 provider 的 output 上限 (经 FastMode / // reasoning 降档后的值). 客户端据此判断回复截断是否命中上限, 还是自然 // 停止. return "turn_end", mustMarshal(struct { Turn int `json:"turn"` InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` CostUSD float64 `json:"cost_usd"` MaxTokens int `json:"max_tokens,omitempty"` }{e.TurnNumber, e.InputTokens, e.OutputTokens, e.CostUSD, e.MaxTokens}) case *engine.DoneEvent: return "done", mustMarshal(struct { TurnCount int `json:"turn_count"` TotalInputTokens int `json:"total_input_tokens"` TotalOutputTokens int `json:"total_output_tokens"` TotalCostUSD float64 `json:"total_cost_usd"` Reason string `json:"reason,omitempty"` }{e.TurnCount, e.TotalInputTokens, e.TotalOutputTokens, e.TotalCostUSD, e.Reason}) case *engine.ErrorEvent: msg := "" if e.Err != nil { msg = e.Err.Error() } return "error", mustMarshal(struct { Code string `json:"code"` Message string `json:"message"` Suggestion string `json:"suggestion,omitempty"` Retryable bool `json:"retryable"` }{e.Code, msg, e.Suggestion, e.Retryable}) case *engine.WarningEvent: return "warning", mustMarshal(struct { Code string `json:"code"` Message string `json:"message"` Detail string `json:"detail,omitempty"` }{e.Code, e.Message, e.Detail}) case *engine.CompactEvent: return "compact", mustMarshal(struct { Summary string `json:"summary,omitempty"` TokensBefore int `json:"tokens_before"` TokensAfter int `json:"tokens_after"` }{e.Summary, e.TokensBefore, e.TokensAfter}) case *engine.SessionInfoEvent: return "session_info", mustMarshal(struct { SessionID string `json:"session_id"` Title string `json:"title,omitempty"` TurnCount int `json:"turn_count"` InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` CostUSD float64 `json:"cost_usd"` }{e.SessionID, e.Title, e.TurnCount, e.InputTokens, e.OutputTokens, e.CostUSD}) case *engine.PermissionLearnEvent: type ruleJSON struct { Rule string `json:"rule"` Desc string `json:"description"` } rules := make([]ruleJSON, len(e.Rules)) for i, r := range e.Rules { rules[i] = ruleJSON{r.RuleString, r.Description} } return "permission_learn", mustMarshal(struct { Rules []ruleJSON `json:"rules"` }{rules}) case *engine.ToolSummaryEvent: return "tool_summary", mustMarshal(struct { ID string `json:"id"` Name string `json:"name"` Summary string `json:"summary"` }{e.ID, e.ToolName, e.Summary}) case *engine.SlashCommandEvent: return "slash_command", mustMarshal(struct { Name string `json:"name"` Args string `json:"args,omitempty"` }{e.Name, e.Args}) case *engine.SubAgentStartEvent: // start_time 用 UnixMilli 而非 RFC3339 字符串: JSON 消费端 (JS / TUI) // 解析整数更容易, 且毫秒精度对子 agent 时延分析够用. 空串字段 (Cwd / // Model) omitempty 避免对旧消费端添噪. // // start_time uses UnixMilli rather than an RFC3339 string: JSON // consumers (JS / TUI) parse integers more easily, and millisecond // precision is adequate for sub-agent latency analysis. Empty-string // fields (Cwd / Model) are omitempty to avoid noising older consumers. return "subagent_start", mustMarshal(struct { SubAgentID string `json:"subagent_id"` Description string `json:"description,omitempty"` Cwd string `json:"cwd,omitempty"` Model string `json:"model,omitempty"` StartTime int64 `json:"start_time_ms"` }{e.SubAgentID, e.Description, e.Cwd, e.Model, e.StartTime.UnixMilli()}) case *engine.SubAgentEndEvent: // duration_ms 用整数毫秒对齐 start_time_ms. Status 用 SubAgentStatus // string 值 (running/completed/failed/cancelled) — 不暴露 Go iota 整数 // 语义, 跨语言消费端只需比字符串. Error omitempty: 成功路径省略减少 // SSE payload 体积. // // duration_ms uses integer milliseconds to match start_time_ms. // Status serializes as the SubAgentStatus string (running/completed // /failed/cancelled) — we don't expose the Go iota integer semantics, // so cross-language consumers just compare strings. Error is // omitempty: successful paths skip it to trim SSE payload size. return "subagent_end", mustMarshal(struct { SubAgentID string `json:"subagent_id"` DurationMS int64 `json:"duration_ms"` Status string `json:"status"` Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` }{e.SubAgentID, e.Duration.Milliseconds(), string(e.Status), e.Result, e.Error}) case *engine.SubAgentEvent: // 扁平包装 shape: 递归序列化 Inner 拿到 (innerType, innerPayload), // 合并 subagent_id + event_type + inner payload 的 JSON key 到同 // 一层. 消费端 JSON.parse 一次即可读到 subagent_id 和业务字段, // 不用嵌套 unwrap. 遇到 Inner 类型未知 (type="unknown" payload= // "{}") 仍安全 — merge 只加 2 个 key, inner "{}" 不冲突. // // Flat wrapping shape: recursively serialize Inner to get // (innerType, innerPayload), then merge subagent_id + event_type // + inner payload JSON keys onto one level. Consumers reach both // sub-agent ID and business fields with a single JSON.parse, no // nested unwrap. Unknown Inner type (type="unknown" payload="{}") // is safe — merge only adds 2 keys and inner "{}" doesn't conflict. innerType, innerPayload := eventPayload(e.Inner) return "subagent_event", mergePayload(innerPayload, map[string]any{ "subagent_id": e.SubAgentID, "event_type": innerType, }) default: // 历史包袱(LEGACY): 未知事件类型序列化为空 payload 而非 error-- // 新版引擎增加事件类型时旧版平台不会崩溃(向前兼容). // 未来可以加版本协商,客户端声明支持的事件版本集合. return "unknown", json.RawMessage(`{}`) } } // mergePayload 把 extras 合并进 base (JSON object), 返回新的 json.RawMessage. // base 非 object 时退化为仅 extras 的 object — 因为 SubAgentEvent 包装的 // Inner 都是 eventPayload 返回的 JSON object (所有 17 case 都 marshal struct), // 此兜底只防御未来 Inner 可能包一个数组 / 字符串 primitive. 冲突键用 extras // 覆盖 base — 让 SubAgentEvent 的 subagent_id / event_type 永远胜出 (Inner // 不太可能自己有这两个键, 但语义上 subagent 层级更外). // // mergePayload merges extras into base (a JSON object), returning a new // json.RawMessage. When base is not an object, falls back to an object // containing only extras — SubAgentEvent Inner always comes from // eventPayload which marshals structs, so this branch only guards against // future Inner shapes that might wrap an array / string primitive. // Conflicting keys are overwritten by extras — ensures SubAgentEvent's // subagent_id / event_type always win (Inner rarely carries those keys, // but semantically the subagent layer is outermost). func mergePayload(base json.RawMessage, extras map[string]any) json.RawMessage { var obj map[string]json.RawMessage if err := json.Unmarshal(base, &obj); err != nil || obj == nil { obj = map[string]json.RawMessage{} } for k, v := range extras { b, err := json.Marshal(v) if err != nil { continue } obj[k] = b } return mustMarshal(obj) } // mustMarshal 序列化为 JSON,panic on error. // // 精妙之处(CLEVER): 用 panic 而非 error 返回--所有调用点都是内联匿名结构体, // 字段类型全是 Go 基础类型,序列化不可能失败.如果 panic,说明是代码 bug, // 应该在开发阶段暴露而不是在生产中静默. // 替代方案:<忽略 error,返回 nil payload> - 否决:静默丢失数据更难调试. func mustMarshal(v any) json.RawMessage { b, err := json.Marshal(v) if err != nil { panic(fmt.Sprintf("bridge: mustMarshal: %v", err)) } return b }