Back to Blog
New Features2026-03-2412 min read

Proactive Companion Engine: How Five Components Decide When and What to Say

From event-driven to asyncio non-blocking scheduling, revealing the decision engine behind AI proactive care and anti-harassment cooldown.

WeClaw 主动陪伴引擎实战:五大组件如何协力决策"何时"和"说什么"

系列文章第 21 篇 - 从事件驱动到 asyncio 非阻塞调度,揭秘 AI 主动关怀背后的决策引擎


📚 专栏信息

《从零到一构建跨平台 AI 助手:WeClaw 实战指南》专栏

专栏定位:面向开发者和技术决策者的实战专栏,用真实案例和完整代码带你理解如何构建生产级 AI 应用

本系列共 25 篇,分为八大模块

📖 模块一【通讯架构设计】(3 篇):混合通讯、设备绑定、请求路由
🔧 模块二【核心技术实现】(4 篇):WebSocket 路由、心跳重连、离线队列
🛡️ 模块三【安全与治理】(3 篇):密钥管理、Token 吊销、速率限制
🔍 模块四【调试与监控】(2 篇):全链路追踪、日志分析
💡 模块五【问题诊断实战】(3 篇):典型问题排查与修复
⚙️ 模块六【性能优化】(1 篇):启动速度、内存优化
🤖 模块七【主动陪伴系统】(3 篇):决策引擎、防骚扰机制、渐进式建档
🚀 模块八【架构演进史】(1 篇):从 0 到 1 的完整历程

  • 模块定位:主动陪伴系统 · 第 1 篇(共 3 篇)
  • 前置知识:了解 EventBus 发布-订阅模式、asyncio 基础
  • 关联文章:第 22 篇(防骚扰冷却机制)、第 23 篇(渐进式建档)

👨‍💻 作者与项目

作者简介:翁勇刚 WENG YONGGANG
新概念龙虾-WeClaw 开发团队负责人,一群专注于跨平台 AI 应用的实践者
理念:"让 AI 不只是被动工具,更是主动关怀用户的生活伙伴"


📝 摘要

本文结构概览: 本文首先从三个并发关怀请求的真实场景出发,分析为什么需要事件驱动的主动陪伴架构;然后用"餐厅后厨"比喻讲解五大组件(CooldownManager、MoodDetector、OpportunityDetector、InteractionOrchestrator、CompanionEngine)的协作原理;接着通过 1400+ 行核心代码详解七层评分系统和四个时机检测器的实现;随后还原一次"三请求只执行一个"的问题排查过程;最后给出评分缓存和事件优先级等性能优化策略。

背景:在构建 WeClaw 跨平台 AI 助手时,我们面临一个关键挑战:如何让 AI 主动关怀用户,而不是只被动等待用户提问?传统的定时轮询方案要么骚扰用户,要么错过最佳时机。

核心问题:如何设计一套决策系统,能够智能判断"何时"发起关怀(时机检测)、"说什么"关怀内容(主题选择)、同时避免过度骚扰(冷却控制)?

解决方案:采用五大组件协作的事件驱动架构。CooldownManager 控制频率,MoodDetector 感知情绪,OpportunityDetector 捕捉时机,InteractionOrchestrator 编排执行,CompanionEngine 统一调度决策。

关键成果

  • 支持 15+ 种关怀主题(早安问候、心情关怀、生日提醒等)
  • 七层评分系统实现精准的主动关怀时机判断
  • 防骚扰机制:每日配额 + 拒绝惩罚 + 连续忽略检测
  • 事件驱动:工具调用自动触发上下文关怀(旅行意图 → 攻略建议)

适合读者:有 Python 基础,对 AI 助手设计、事件驱动架构、asyncio 异步编程感兴趣的开发者

阅读时长:约 20 分钟

关键词CompanionEngine事件驱动asyncio主动关怀评分系统防骚扰EventBus


一、为什么需要"事件驱动"的主动陪伴?——从三个并发请求说起

1.1 场景重现:当三种关怀同时到来

想象这个早晨场景:

  • 8:00 AM:系统检测到"早安问候"的定时触发条件满足
  • 8:01 AM:用户刚才的消息里提到"昨晚没睡好,有点累",MoodDetector 识别出疲惫情绪
  • 8:02 AM:健康工具记录到血压 145/95,超过警戒线

三个关怀请求几乎同时到来:

┌─────────────────────────────────────────────────────────────┐
│  [08:00] 定时触发 → 早安问候                                  │
│  [08:01] 情绪检测 → 心情关怀(疲惫安慰)                        │
│  [08:02] 健康警报 → 就医建议(血压偏高)                        │
└─────────────────────────────────────────────────────────────┘

问题来了:AI 应该说什么?全说?选一个?怎么选?

如果全部发送,用户会被连续三条消息轰炸;如果只选一个,选哪个最合适?

1.2 方案对比:定时轮询 vs 硬性规则 vs 事件驱动

让我们看看三种主动关怀方案的优缺点:

方案像什么?(比喻)优点缺点
定时轮询闹钟按时响铃实现简单,可预测不考虑用户状态,容易骚扰;错过突发时机
硬性规则红绿灯固定切换逻辑清晰,易调试缺乏灵活性;规则爆炸(组合情况太多)
事件驱动餐厅服务员察言观色响应及时,上下文感知架构复杂;需要精心设计事件优先级
# ❌ 错误示范:定时轮询,不管用户状态
class BadTimerApproach:
    def __init__(self):
        # 每小时检查一次,不管用户在干什么
        schedule.every().hour.do(self.send_care)
    
    def send_care(self):
        # 问题 1:用户可能正在忙碌会议
        # 问题 2:连续发送会造成骚扰
        # 问题 3:错过"用户刚提到累了"这种即时时机
        self.send("今天过得怎么样?")
# ✅ 正确做法:事件驱动 + 多维度评分
class GoodEventDrivenApproach:
    async def on_event(self, event_type: str, data: Any):
        """响应各类事件,动态决策"""
        # 1. 情绪变化事件 → 可能需要心情关怀
        # 2. 工具调用事件 → 可能需要上下文关怀
        # 3. 定时事件 → 兜底的日常关怀
        
        # 计算此刻最适合的关怀主题
        best_topic = self.calculate_best_topic()
        
        # 检查是否满足触发条件
        if self.can_interact() and best_topic.score > threshold:
            await self.initiate_care(best_topic)

1.3 核心挑战:五组件协作的难题

要实现智能的主动陪伴,我们需要解决五个核心问题:

  1. 时机判断:何时发起关怀?(OpportunityDetector)
  2. 频率控制:如何避免骚扰?(CooldownManager)
  3. 情绪感知:用户当前心情如何?(MoodDetector)
  4. 执行编排:关怀消息怎么构建和发送?(InteractionOrchestrator)
  5. 统一调度:多个组件如何协同工作?(CompanionEngine)

这就引出了我们的五大组件架构——让五个"专家"各司其职,协力决策。


二、核心概念解析 —— 用"餐厅后厨"理解五大组件

2.1 什么是五大组件架构?

官方定义

五大组件架构是 WeClaw 主动陪伴系统的核心设计模式,通过职责分离将复杂的决策逻辑拆解为五个独立组件,各组件通过 EventBus 通信,由 CompanionEngine 统一调度。

大白话解释: 想象一家高档餐厅的后厨,每个关怀消息就像一道菜。要把"菜"端到用户面前,需要五个角色配合:

┌─────────────────────────────────────────────────────────────────────┐
│                        餐厅后厨比喻                                  │
├─────────────────────────────────────────────────────────────────────┤
│  CooldownManager     = 厨师长      控制出菜节奏,防止上太快撑着客人     │
│  MoodDetector        = 食评家      观察客人表情,判断口味偏好           │
│  OpportunityDetector = 服务员      听到客人聊天内容,发现推荐时机       │
│  InteractionOrchestrator = 传菜员  把菜装盘、端上桌、记录反馈           │
│  CompanionEngine     = 餐厅经理    统筹全局,决定今天推什么特色菜       │
└─────────────────────────────────────────────────────────────────────┘

2.2 工作原理流程图:从事件到关怀的完整链路

                          ┌──────────────────┐
                          │  外部事件输入     │
                          │ (用户输入/工具调用/定时器) │
                          └────────┬─────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         EventBus 事件总线                            │
│    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐               │
│    │ USER_INPUT  │  │ TOOL_CALL   │  │ CRON_TRIGGER│               │
│    └──────┬──────┘  └──────┬──────┘  └──────┬──────┘               │
└───────────┼────────────────┼────────────────┼───────────────────────┘
            │                │                │
            ▼                ▼                ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│   MoodDetector    │ │ OpportunityDetector│ │  CompanionEngine  │
│  (情绪检测)        │ │  (时机检测)         │ │  (定时调度)        │
│  ┌─────────────┐  │ │  ┌─────────────┐  │ │  ┌─────────────┐  │
│  │ 关键词匹配   │  │ │  │ 旅行意图检测 │  │ │  │ 30分钟轮询  │  │
│  │ 情绪得分计算 │  │ │  │ 大额支出检测 │  │ │  │ 时段主题匹配 │  │
│  └─────────────┘  │ │  │ 健康警报检测 │  │ │  └─────────────┘  │
│                   │ │  └─────────────┘  │ │                   │
└─────────┬─────────┘ └─────────┬─────────┘ └─────────┬─────────┘
          │                     │                     │
          │   mood_adjustment   │  contextual_care    │  cron_trigger
          │                     │                     │
          └──────────────────┐  │  ┌──────────────────┘
                             │  │  │
                             ▼  ▼  ▼
              ┌────────────────────────────────────┐
              │         CompanionEngine            │
              │     calculate_interaction_score()  │
              │  ┌──────────────────────────────┐  │
              │  │       七层评分系统            │  │
              │  │  1. 基础分 (priority * 10)   │  │
              │  │  2. 时段匹配 (+0~20)         │  │
              │  │  3. 冷却期检查 (0/pass)      │  │
              │  │  4. 紧急度加成 (+0~30)       │  │
              │  │  5. 特殊紧急 (+0~30)         │  │
              │  │  6. 用户空闲度 (+/-20)       │  │
              │  │  7. 情绪调节 (+/-15)         │  │
              │  └──────────────────────────────┘  │
              │                 │                  │
              │         score > 30?                │
              │            │    │                  │
              │         YES│    │NO                │
              │            ▼    ▼                  │
              └────────────┬────┬──────────────────┘
                           │    │
                      触发  │    │ 跳过
                           ▼    
              ┌────────────────────────────────────┐
              │    InteractionOrchestrator         │
              │         initiate_care()            │
              │  ┌──────────────────────────────┐  │
              │  │ 1. 获取交互锁                 │  │
              │  │ 2. 冷却检查                   │  │
              │  │ 3. 用户忙碌检查               │  │
              │  │ 4. 构建消息                   │  │
              │  │ 5. EventBus.emit()           │  │
              │  │ 6. 记录交互                   │  │
              │  │ 7. 释放锁                     │  │
              │  └──────────────────────────────┘  │
              └────────────────────────────────────┘
                               │
                               ▼
              ┌────────────────────────────────────┐
              │       CooldownManager              │
              │  ┌──────────────────────────────┐  │
              │  │ • 每日配额检查 (5次/天)       │  │
              │  │ • 拒绝惩罚期 (4小时)          │  │
              │  │ • 连续忽略计数 (≤2次)         │  │
              │  │ • 交互锁 (同时只允许1个)      │  │
              │  └──────────────────────────────┘  │
              └────────────────────────────────────┘
                               │
                               ▼
              ┌────────────────────────────────────┐
              │           UI 层响应                 │
              │   COMPANION_CARE_TRIGGERED 事件    │
              └────────────────────────────────────┘

关键步骤解读

  1. 事件捕获:EventBus 捕获用户输入、工具调用、定时触发等事件
  2. 并行处理:MoodDetector 分析情绪,OpportunityDetector 检测时机
  3. 评分决策:CompanionEngine 综合七层因素计算执行分数
  4. 执行编排:分数超过阈值后,InteractionOrchestrator 执行完整的关怀流程
  5. 冷却控制:CooldownManager 全程参与,确保不骚扰用户

2.3 对比表:单体架构 vs 五组件架构

维度单体架构五组件架构区别说明
可维护性差(God Class)好(单一职责)五组件各管一摊,改 MoodDetector 不影响 CooldownManager
可测试性难(高耦合)易(可独立测试)可单独 Mock EventBus 测试 OpportunityDetector
扩展性低(改一处动全身)高(插件式扩展)新增"天气关怀"只需注册新 CareTopic
调试复杂度简单(单入口)中等(需跟踪事件)通过日志追踪 topic_id 可还原完整链路
性能开销略高(事件分发)EventBus 分发开销 < 1ms,可忽略

为什么选择五组件架构?

因为主动陪伴系统的复杂性在于多维度决策——时机、情绪、频率、内容缺一不可。单体架构会导致 if-else 爆炸,而五组件架构让每个维度的逻辑内聚、易于演进。


三、实战代码详解 —— 核心实现逐行解析

3.1 数据结构:CareTopic 与 CareTopicRegistry

首先看关怀主题的数据结构定义:

# src/core/companion_topics.py

@dataclass
class CareTopic:
    """
    关怀主题数据类
    
    定义单个关怀主题的所有属性,包括触发条件、时间约束和对话模板。
    
    Attributes:
        topic_id: 唯一标识符
        name: 显示名称
        category: 主题分类 (health/finance/social/family/emotional/lifestyle)
        priority: 优先级 (1-10, 10为最高)
        min_interval_hours: 最小触发间隔(小时)
        best_time_slots: 最佳触发时段列表 ["morning", "afternoon", "evening"]
        prompt_template: 对话模板,支持 {变量} 替换
        requires_profile: 依赖的用户档案字段列表
        enabled: 是否启用
    """
    topic_id: str
    name: str
    category: str
    priority: int                                       # 关键:1-10 的优先级
    min_interval_hours: int                             # 关键:冷却期时长
    best_time_slots: list[str] = field(default_factory=list)
    prompt_template: str = ""
    requires_profile: list[str] = field(default_factory=list)
    enabled: bool = True

字段说明

  • priority: 优先级决定基础分数,生日提醒(9) > 心情关怀(7) > 早安问候(5)
  • min_interval_hours: 最小间隔,防止同一主题重复触发(早安=24h,心情=12h)
  • best_time_slots: 最佳时段,早安只在 morning,心情在 afternoon/evening

预定义主题示例

# src/core/companion_topics.py

class CareTopicRegistry:
    """关怀主题注册表"""
    
    def _register_defaults(self) -> None:
        """注册所有预定义的关怀主题"""
        default_topics = [
            # 日常关怀主题
            CareTopic(
                topic_id="morning_greeting",
                name="早安问候",
                category="lifestyle",
                priority=5,
                min_interval_hours=24,
                best_time_slots=["morning"],
                prompt_template="早上好呀!今天{weather_hint},{schedule_hint}有什么计划吗?"
            ),
            CareTopic(
                topic_id="mood_check",
                name="心情关怀",
                category="emotional",
                priority=7,                              # 关键:情感类优先级较高
                min_interval_hours=12,
                best_time_slots=["afternoon", "evening"],
                prompt_template="今天过得怎么样呀?最近心情如何?"
            ),
            CareTopic(
                topic_id="birthday_remind",
                name="生日提醒",
                category="social",
                priority=9,                              # 关键:最高优先级
                min_interval_hours=8760,                 # 一年才触发一次
                best_time_slots=["morning"],
                prompt_template="{person_name}的生日就在{days_left}天后了!要不要准备一下?",
                requires_profile=["social_contacts"]
            ),
            CareTopic(
                topic_id="medical_remind",
                name="就医提醒",
                category="health",
                priority=8,                              # 关键:健康警报优先级高
                min_interval_hours=168,
                best_time_slots=["morning"],
                prompt_template="注意到你最近的{metric_name}数据偏高,建议找时间去医院检查一下比较放心",
                requires_profile=["health_data"]
            ),
            # ... 更多主题
        ]
        
        for topic in default_topics:
            self.register(topic)

设计亮点

  1. 数据驱动:新增关怀主题只需添加 CareTopic 实例,无需改代码逻辑
  2. 模板变量{weather_hint}{person_name} 支持个性化消息
  3. 依赖声明requires_profile 声明所需用户数据,缺失时可触发建档

3.2 核心方法 1:calculate_interaction_score() — 七层评分系统

这是 CompanionEngine 的核心决策方法,决定某个主题此刻是否应该触发:

# src/core/companion_engine.py

def calculate_interaction_score(self, topic: CareTopic) -> float:
    """计算主题此刻的执行分数。
    
    综合考虑优先级、时段匹配度、冷却期、紧急度、用户状态等因素。
    
    Args:
        topic: 关怀主题
        
    Returns:
        执行分数 (0-100),越高越应该执行
    """
    # ========== 第 1 层:基础分数 ==========
    # 优先级 * 10,范围 10-100
    score = topic.priority * 10
    
    # ========== 第 2 层:时段匹配度 +0~20 ==========
    current_slot = get_time_slot()  # "morning" / "afternoon" / "evening"
    if current_slot in topic.best_time_slots:
        score += 20                  # 关键:在最佳时段加 20 分
    elif topic.best_time_slots:
        score += 5                   # 不在最佳时段,但仍可执行
    
    # ========== 第 3 层:冷却期硬性检查 ==========
    hours = self._cooldown.hours_since(topic.topic_id)
    if hours < topic.min_interval_hours:
        logger.debug("主题 %s 仍在冷却期 (%.1f < %d 小时)", 
                    topic.topic_id, hours, topic.min_interval_hours)
        return 0                     # 关键:冷却期内直接返回 0 分
    
    # ========== 第 4 层:紧急度加成 +0~30 ==========
    # 基于距离冷却期结束的程度,越久没触发越紧急
    cooldown_ratio = min(hours / topic.min_interval_hours, 2.0)
    urgency_bonus = min((cooldown_ratio - 1) * 15, 30)
    score += urgency_bonus
    
    # ========== 第 5 层:特殊紧急情况 +0~30 ==========
    # 如生日临近
    if topic.topic_id == "birthday_remind":
        context = self._build_context(topic)
        days_left = context.get("days_left_raw", 999)
        if days_left <= 3:
            score += 30              # 关键:生日 3 天内加 30 分
        elif days_left <= 7:
            score += 15
    
    # ========== 第 6 层:用户空闲度 +/-0~20 ==========
    if self._orchestrator._is_user_idle_for(5):
        score += 10                  # 空闲 5 分钟以上
    if self._orchestrator._is_user_idle_for(15):
        score += 10                  # 空闲 15 分钟以上
    if self._orchestrator._is_user_busy():
        score -= 20                  # 关键:用户正忙减 20 分
    
    # ========== 第 7 层:情绪调节 +/-0~15 ==========
    mood_adjustment = self._mood_detector.get_mood_adjusted_topic_score(
        topic, self._current_mood
    )
    score += mood_adjustment
    
    # ========== 最终检查:每日预算 ==========
    if not self._cooldown.can_interact():
        logger.debug("每日预算已用完或处于冷却期")
        return 0
    
    return max(0, min(100, score))   # 注意:限制在 0-100 范围

评分公式可视化

最终分数 = 基础分(priority×10)
         + 时段匹配(+0~20)
         + 紧急度(+0~30)
         + 特殊紧急(+0~30)
         + 用户空闲度(+/-20)
         + 情绪调节(+/-15)
         - 冷却期内则直接归零
         - 每日预算用完则直接归零

示例:早安问候在 8:00 AM
- 基础分:5 × 10 = 50
- 时段匹配:morning ∈ ["morning"] → +20
- 紧急度:假设 26 小时未触发,ratio=1.08 → +1.2
- 用户空闲:空闲 10 分钟 → +10
- 情绪:中性 → +0
= 50 + 20 + 1.2 + 10 = 81.2 分 ✅ 超过阈值 30,触发!

3.3 核心方法 2:OpportunityDetector 的四个检测器

OpportunityDetector 通过监听 EventBus 的 TOOL_CALL 事件,从用户行为中发现关怀时机:

# src/core/companion_engine.py

class OpportunityDetector:
    """通过 EventBus 监听用户行为,检测关怀时机。"""
    
    def _setup_listeners(self) -> None:
        """设置事件监听器。"""
        # 关键:监听工具调用事件,优先级 200(较高)
        self._event_bus.on(
            EventType.TOOL_CALL,
            self._on_tool_call,
            priority=200
        )
        # 监听 Agent 响应事件
        self._event_bus.on(
            EventType.AGENT_RESPONSE,
            self._on_agent_response,
            priority=200
        )
    
    async def _on_tool_call(self, event_type: str, data: Any) -> None:
        """处理工具调用事件。"""
        if self._engine is None:
            return
        
        # 安全获取工具名和参数
        tool_name = getattr(data, "tool_name", None) or (
            data.get("tool_name") if isinstance(data, dict) else None
        )
        arguments = getattr(data, "arguments", {}) or (
            data.get("arguments", {}) if isinstance(data, dict) else {}
        )
        
        if not tool_name:
            return
        
        logger.debug("检测到工具调用: %s", tool_name)
        
        # 关键:并行检查四种关怀时机
        try:
            await self._check_inference_rules(tool_name, arguments)
            await self._check_travel_intent(tool_name, arguments)
            await self._check_budget_alert(tool_name, arguments)
            await self._check_health_concern(tool_name, arguments)
        except Exception as e:
            logger.error("时机检测异常: %s", e)

检测器 1:旅行意图检测

# src/core/companion_engine.py

async def _check_travel_intent(self, tool_name: str, arguments: dict) -> None:
    """检查旅行意图 → 建议旅游攻略。"""
    if tool_name != "search":
        return
    
    query = str(arguments.get("query", "")).lower()
    travel_keywords = ["机票", "酒店", "旅游", "景点", "攻略", "订票", "住宿"]
    
    # 关键:关键词匹配检测旅行意图
    if any(kw in query for kw in travel_keywords):
        logger.info("检测到旅行意图: %s", query)
        await self._engine.suggest_contextual_care(
            "suggest_travel_guide",
            {"query": query, "detected_keywords": [kw for kw in travel_keywords if kw in query]}
        )

检测器 2:大额支出检测

# src/core/companion_engine.py

async def _check_budget_alert(self, tool_name: str, arguments: dict) -> None:
    """检查大额支出 → 预算提醒。"""
    if tool_name != "finance":
        return
    
    amount = arguments.get("amount", 0)
    try:
        amount = float(amount)
    except (TypeError, ValueError):
        return
    
    # 关键:大额支出阈值(可配置)
    threshold = 1000
    if amount >= threshold:
        logger.info("检测到大额支出: %.2f", amount)
        await self._engine.suggest_contextual_care(
            "budget_alert",
            {"amount": amount, "threshold": threshold}
        )

检测器 3:健康警报检测

# src/core/companion_engine.py

async def _check_health_concern(self, tool_name: str, arguments: dict) -> None:
    """检查异常健康指标 → 就医建议。"""
    if tool_name != "health":
        return
    
    # 检查血压
    bp_systolic = arguments.get("bp_systolic")
    if bp_systolic is not None:
        try:
            bp_sys = int(bp_systolic)
            if bp_sys >= 140:                           # 关键:高血压阈值
                logger.info("检测到高血压: %d", bp_sys)
                await self._engine.suggest_contextual_care(
                    "health_alert",
                    {"metric_name": "血压", "value": bp_sys, "threshold": 140}
                )
                return
        except (TypeError, ValueError):
            pass
    
    # 检查血糖
    blood_glucose = arguments.get("blood_glucose")
    if blood_glucose is not None:
        try:
            glucose = float(blood_glucose)
            if glucose >= 7.0:                          # 关键:高血糖阈值
                logger.info("检测到高血糖: %.1f", glucose)
                await self._engine.suggest_contextual_care(
                    "health_alert",
                    {"metric_name": "血糖", "value": glucose, "threshold": 7.0}
                )
        except (TypeError, ValueError):
            pass

检测器 4:通用推断规则检测

# src/core/companion_engine.py

async def _check_inference_rules(self, tool_name: str, arguments: dict) -> None:
    """检查通用推断规则。"""
    if self._engine is None:
        return
    
    # 将参数转为字符串进行关键词匹配
    args_text = " ".join(str(v) for v in arguments.values() if v)
    
    for rule in INFERENCE_RULES:
        if rule.tool != tool_name:
            continue
        
        # 关键:检查关键词匹配
        matched_keywords = [kw for kw in rule.keyword_match if kw in args_text]
        if not matched_keywords:
            continue
        
        logger.info("推断规则匹配: tool=%s, keywords=%s, infer=%s", 
                   tool_name, matched_keywords, rule.infer)
        
        # 如果有关联动作,触发上下文关怀
        if rule.action:
            await self._engine.suggest_contextual_care(
                rule.action,
                {
                    "inferred": rule.infer,
                    "confidence": rule.confidence,
                    "matched_keywords": matched_keywords,
                }
            )

推断规则配置示例

# src/core/companion_topics.py

INFERENCE_RULES: list[InferenceRule] = [
    InferenceRule(
        tool="finance",
        keyword_match=["幼儿园", "学费", "奶粉", "尿布", "童装", "儿童"],
        infer={"has_children": "true"},
        confidence=0.8
    ),
    InferenceRule(
        tool="search",
        keyword_match=["机票", "酒店", "旅游", "景点", "攻略"],
        infer={"travel_intent": "true"},
        confidence=0.7,
        action="suggest_travel_guide"  # 关键:触发旅行攻略建议
    ),
    InferenceRule(
        tool="health",
        keyword_match=["高血压", "血压偏高"],
        infer={"hypertension_risk": "true"},
        confidence=0.9,
        action="schedule_bp_monitoring"  # 关键:触发血压监测提醒
    ),
]

3.4 核心方法 3:InteractionOrchestrator.initiate_care() — 交互完整生命周期

# src/core/companion_engine.py

class InteractionOrchestrator:
    """管理主动交互的执行流程。"""
    
    async def initiate_care(self, topic: CareTopic, context: dict[str, Any]) -> dict[str, Any]:
        """发起一次主动关怀。
        
        Args:
            topic: 关怀主题
            context: 上下文数据
            
        Returns:
            结果记录:
            - topic_id: 主题 ID
            - message: 消息内容
            - outcome: 结果 (triggered/deferred/blocked)
        """
        result = {
            "topic_id": topic.topic_id,
            "message": "",
            "outcome": "blocked",
        }
        
        # ========== 步骤 1:尝试获取交互锁 ==========
        # 关键:同一时间只允许一个主动交互
        if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
            logger.debug("无法获取交互锁,跳过: %s", topic.topic_id)
            result["outcome"] = "blocked"
            return result
        
        try:
            # ========== 步骤 2:冷却检查 ==========
            if not self._cooldown.can_interact():
                logger.debug("冷却检查未通过,跳过: %s", topic.topic_id)
                result["outcome"] = "blocked"
                return result
            
            # ========== 步骤 3:用户忙碌检查 ==========
            # 30 秒内有输入认为用户正忙
            if self._is_user_busy():
                logger.debug("用户正忙,延迟关怀: %s", topic.topic_id)
                result["outcome"] = "deferred"
                return result
            
            # ========== 步骤 4:构建消息 ==========
            message = self._build_message(topic, context)
            result["message"] = message
            
            # ========== 步骤 5:通过 EventBus 发布关怀触发事件 ==========
            # UI 层订阅此事件来显示关怀消息
            await self._event_bus.emit(
                self.COMPANION_CARE_TRIGGERED,
                {
                    "topic_id": topic.topic_id,
                    "topic_name": topic.name,
                    "category": topic.category,
                    "message": message,
                    "priority": topic.priority,
                    "timestamp": datetime.now().isoformat(),
                }
            )
            
            # ========== 步骤 6:记录交互 ==========
            self._cooldown.record_interaction(topic.topic_id, "triggered")
            result["outcome"] = "triggered"
            
            logger.info("主动关怀已触发: %s - %s", topic.topic_id, message[:50])
            
        finally:
            # ========== 步骤 7:释放锁 ==========
            # 关键:无论成功失败,都要释放锁
            self._cooldown.release_interaction_lock()
        
        return result

3.5 易错点与最佳实践

易错点 1:asyncio.Lock 不可重入

# ❌ 错误示范:在已持有锁的情况下再次 acquire
async def bad_nested_lock():
    await lock.acquire()
    # ... 一些逻辑 ...
    await lock.acquire()  # 死锁!asyncio.Lock 不可重入
    lock.release()
    lock.release()

# ✅ 正确做法:使用 locked() 检查或重构逻辑
async def good_lock_check():
    if lock.locked():
        logger.debug("锁已被占用,跳过")
        return False
    
    await lock.acquire()
    try:
        # 业务逻辑
        pass
    finally:
        lock.release()
    return True

易错点 2:EventBus 优先级设置

# ⚠️ 注意:优先级数值越大越先执行
self._event_bus.on(
    EventType.TOOL_CALL,
    self._on_tool_call,
    priority=200  # 关键:设置较高优先级,确保时机检测在其他处理之前
)

# 优先级参考:
# 200+ : 系统级监控(时机检测、审计日志)
# 100  : 核心业务处理
# 0    : 默认优先级
# -100 : 后处理(清理、统计)

最佳实践清单

  1. 锁的使用:始终用 try-finally 确保锁释放
  2. 事件优先级:监控类订阅者用高优先级(200+)
  3. 冷却期设计:不同主题设置不同间隔(早安 24h,心情 12h)
  4. 日志追踪:每个关键步骤打印 topic_id,便于链路追踪

四、问题诊断与修复 —— 从"三请求只执行一个"到完美并发

4.1 问题现象:请求被"吞掉"

用户报告

"早上同时触发了早安问候、心情关怀、健康提醒三个请求,但只看到了一条消息。其他两条去哪了?"

服务器日志

2026-03-22 08:00:01 | companion | INFO | 定时触发评分: morning_greeting = 81.2
2026-03-22 08:00:01 | companion | INFO | 主动关怀已触发: morning_greeting
2026-03-22 08:01:02 | companion | DEBUG | 无法获取交互锁,跳过: mood_check
2026-03-22 08:02:03 | companion | DEBUG | 冷却检查未通过,跳过: medical_remind

问题:三个请求,两个被跳过!

4.2 排查步骤:追踪评分系统的决策过程

1️⃣ 检查交互锁状态

# 查看 CooldownManager.acquire_interaction_lock()
async def acquire_interaction_lock(self, topic_id: str) -> bool:
    if self._interaction_lock.locked():
        logger.debug("交互锁已被占用,topic_id: %s", self._current_topic_id)
        return False  # ✅ 问题所在:mood_check 在 morning_greeting 还没完成时就尝试获取锁
    
    await self._interaction_lock.acquire()
    self._current_topic_id = topic_id
    return True

2️⃣ 检查冷却状态

# 查看 CooldownManager.can_interact()
def can_interact(self) -> bool:
    # 每日配额检查
    daily_count = self.get_daily_count()
    if daily_count >= self.daily_budget:
        return False  # ❌ morning_greeting 触发后 count+1,但还没到上限
    
    # 拒绝惩罚冷却期检查
    # ...
    return True  # ✅ 这里不是问题

3️⃣ 发现问题链路

┌───────────────────────────────────────────────────────────────────┐
│  时间线                                                           │
├───────────────────────────────────────────────────────────────────┤
│  08:00:01  morning_greeting 获取锁,开始执行                       │
│            │                                                      │
│  08:01:02  │ mood_check 尝试获取锁 → 失败(锁被占用)→ blocked     │
│            │                                                      │
│  08:01:05  morning_greeting 完成,释放锁                           │
│            │                                                      │
│  08:02:03  medical_remind 尝试获取锁 → 成功                        │
│            但 can_interact() 返回 False → blocked                 │
│            原因:daily_count=1,但冷却期检查...                    │
└───────────────────────────────────────────────────────────────────┘

根本原因

  1. 交互锁串行化:同一时间只允许一个关怀执行,后续请求被阻塞
  2. 没有请求队列:被阻塞的请求直接丢弃,而不是排队等待
  3. 时机窗口过短:mood_check 在 morning_greeting 执行期间到来,错过了

4.3 修复方案:请求队列 + 延迟执行

修复 1:增加请求队列

# ✅ 修改后:添加待处理请求队列
class InteractionOrchestrator:
    def __init__(self, cooldown: CooldownManager, event_bus: EventBus):
        self._cooldown = cooldown
        self._event_bus = event_bus
        self._pending_requests: list[tuple[CareTopic, dict]] = []  # 新增
    
    async def initiate_care(self, topic: CareTopic, context: dict) -> dict:
        if not await self._cooldown.acquire_interaction_lock(topic.topic_id):
            # 关键:无法获取锁时,加入队列而不是直接丢弃
            self._pending_requests.append((topic, context))
            logger.debug("加入待处理队列: %s (队列长度: %d)", 
                        topic.topic_id, len(self._pending_requests))
            return {"topic_id": topic.topic_id, "outcome": "queued"}
        
        try:
            # ... 原有执行逻辑 ...
            pass
        finally:
            self._cooldown.release_interaction_lock()
            # 关键:执行完成后,处理队列中的下一个请求
            await self._process_pending()
    
    async def _process_pending(self) -> None:
        """处理待处理队列中的请求"""
        if not self._pending_requests:
            return
        
        # 等待一小段时间,避免连续骚扰
        await asyncio.sleep(60)  # 1分钟间隔
        
        if self._pending_requests and self._cooldown.can_interact():
            topic, context = self._pending_requests.pop(0)
            await self.initiate_care(topic, context)

修复 2:评分时考虑队列状态

# ✅ 修改后:在评分时降低低优先级请求的分数
def calculate_interaction_score(self, topic: CareTopic) -> float:
    score = topic.priority * 10
    # ... 原有评分逻辑 ...
    
    # 新增:如果队列中已有高优先级请求,降低当前请求分数
    pending_high_priority = any(
        t.priority > topic.priority 
        for t, _ in self._orchestrator._pending_requests
    )
    if pending_high_priority:
        score -= 10  # 关键:让高优先级请求先执行
    
    return max(0, min(100, score))

4.4 验证结果

✅ 步骤 1:08:00:01 morning_greeting 获取锁,执行
✅ 步骤 2:08:01:02 mood_check 加入队列 (outcome=queued)
✅ 步骤 3:08:01:05 morning_greeting 完成,触发 _process_pending
✅ 步骤 4:08:02:05 mood_check 从队列取出,执行(间隔 1 分钟)
✅ 步骤 5:08:02:08 medical_remind 健康警报优先级高,插队执行

4.5 经验教训:Checklist

  • 并发请求需要队列机制,不能直接丢弃
  • 高优先级请求应该能"插队"
  • 连续关怀之间需要间隔(至少 1 分钟)
  • 日志要记录完整的决策链路(scoreoutcomequeue_length

避坑指南

  1. asyncio.Lock 不可重入:不要在持有锁时再次 acquire,会死锁
  2. 队列大小限制:防止队列无限增长,建议最多保留 5 个待处理请求
  3. 优先级反转:低优先级请求不应该阻塞高优先级请求

五、性能优化与最佳实践

5.1 性能瓶颈分析

Profiling 数据

calculate_interaction_score():  5.2ms   (含数据库查询)
_build_context():               3.1ms   (含数据库查询)
initiate_care():                2.8ms   (EventBus emit)
_check_inference_rules():       0.3ms   (纯内存)
get_time_slot():                0.01ms  (纯计算)

结论:数据库查询是主要瓶颈,hours_since()_build_context() 每次都查库。

5.2 优化策略

策略 1:评分结果缓存

# ❌ 优化前:每次评分都查数据库
def calculate_interaction_score(self, topic: CareTopic) -> float:
    hours = self._cooldown.hours_since(topic.topic_id)  # 查库
    # ...

# ✅ 优化后:缓存上次交互时间
class CooldownManager:
    def __init__(self, db_path: Path):
        self._hours_cache: dict[str, tuple[float, float]] = {}  # topic_id -> (hours, timestamp)
        self._cache_ttl = 60  # 缓存 60 秒
    
    def hours_since(self, topic_id: str) -> float:
        # 检查缓存
        if topic_id in self._hours_cache:
            cached_hours, cached_time = self._hours_cache[topic_id]
            if time.time() - cached_time < self._cache_ttl:
                return cached_hours + (time.time() - cached_time) / 3600
        
        # 缓存未命中,查数据库
        hours = self._query_hours_from_db(topic_id)
        self._hours_cache[topic_id] = (hours, time.time())
        return hours

代价:增加约 1KB 内存(15 个主题 × 64 字节)
收益:评分耗时从 5.2ms 降至 0.5ms(90% 提升)

策略 2:批量数据库查询

# ❌ 优化前:逐个查询
for topic in topics:
    hours = self._cooldown.hours_since(topic.topic_id)  # N 次查询

# ✅ 优化后:批量查询
def batch_hours_since(self, topic_ids: list[str]) -> dict[str, float]:
    """批量查询多个主题的上次交互时间"""
    placeholders = ",".join("?" * len(topic_ids))
    with self._conn() as conn:
        rows = conn.execute(f"""
            SELECT key, value FROM companion_state 
            WHERE key IN ({placeholders})
        """, [f"last_interaction_{tid}" for tid in topic_ids]).fetchall()
    
    result = {}
    for key, value in rows:
        topic_id = key.replace("last_interaction_", "")
        result[topic_id] = self._calculate_hours(value)
    
    return result

代价:代码稍复杂
收益:15 个主题的批量查询只需 1 次数据库访问

策略 3:事件订阅优先级优化

# ✅ 合理设置优先级,避免无效计算
class OpportunityDetector:
    def _setup_listeners(self) -> None:
        # 时机检测用高优先级,在其他处理之前完成
        self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=200)

class MoodDetector:
    def _setup_listeners(self) -> None:
        # 情绪检测用中等优先级
        self._event_bus.on(EventType.USER_INPUT, self._on_user_input, priority=100)

class AuditLogger:
    def _setup_listeners(self) -> None:
        # 审计日志用低优先级,最后执行
        self._event_bus.on(EventType.TOOL_CALL, self._on_tool_call, priority=-100)

5.3 最佳实践总结

Do's(推荐做法):

  • ✅ 使用缓存减少数据库查询(TTL 60 秒足够)
  • ✅ 批量查询代替循环单查
  • ✅ 为事件订阅设置合理的优先级
  • ✅ 在日志中打印完整的评分明细,便于调试
  • ✅ 使用 try-finally 确保锁的释放

Don'ts(避免做法):

  • ❌ 在评分方法中进行复杂的 I/O 操作
  • ❌ 忽略并发请求的队列管理
  • ❌ 所有事件订阅用相同优先级(会导致执行顺序不确定)
  • ❌ 在 asyncio.Lock 内部再次 acquire(死锁风险)
  • ❌ 硬编码阈值(应该使用配置文件)

黄金法则

让评分计算尽量快(< 1ms),让执行逻辑尽量稳(锁 + 队列 + 重试)。


六、总结与展望

6.1 核心要点回顾

本文讲解了 WeClaw 主动陪伴系统的五大组件架构:

5 个核心组件

  1. CooldownManager(厨师长):控制出菜节奏,防止骚扰
  2. MoodDetector(食评家):感知用户情绪,调整关怀内容
  3. OpportunityDetector(服务员):监听用户行为,捕捉关怀时机
  4. InteractionOrchestrator(传菜员):执行关怀流程,管理生命周期
  5. CompanionEngine(餐厅经理):统一调度,七层评分决策

1 个核心公式

主动陪伴决策 = 
    七层评分(时段+紧急度+情绪+空闲度) 
    × 冷却控制(配额+惩罚+忽略) 
    × 事件驱动(工具调用+用户输入)

6.2 下一步学习方向

前置知识

  • ✅ Python asyncio 异步编程
  • ✅ EventBus 发布-订阅模式
  • ✅ SQLite 基本操作

后续主题

  • 📖 下一篇:《第 22 篇:防骚扰冷却机制深度解析 — 每日配额、拒绝惩罚与动态调整》
  • 🔜 下下一篇:《第 23 篇:渐进式用户建档 — 如何在自然对话中收集用户信息》

扩展阅读

6.3 互动环节

思考题

  1. 如果用户连续 3 次忽略主动关怀,系统应该如何调整策略?(提示:动态配额调整)
  2. 当检测到用户情绪低落(negative)时,应该优先触发哪类关怀主题?为什么?
  3. 如何设计一个"紧急关怀"机制,让某些高优先级关怀(如健康警报)可以突破冷却限制?

讨论话题

在你的 AI 助手项目中,你是如何实现"主动"功能的?是定时轮询还是事件驱动?遇到过哪些骚扰用户的问题?欢迎在评论区分享你的经验!


下期预告:《第 22 篇:防骚扰冷却机制深度解析》

  • 🛡️ 每日配额的动态调整算法(正反馈 → 增加,负反馈 → 减少)
  • ⏰ 拒绝惩罚期的设计思路(为什么是 4 小时?)
  • 📊 连续忽略检测与用户活跃度分析
  • 💾 状态持久化:跨进程的冷却状态同步

敬请期待!


附录 A:完整代码清单

文件路径行数作用
src/core/companion_engine.py1409 行五大组件核心实现
src/core/companion_topics.py439 行CareTopic 数据类、注册表、推断规则
src/core/events.py226 行事件类型定义(含 COMPANION_* 事件)

总代码量:约 2074 行
关键方法:18 个(calculate_interaction_score、initiate_care、check* 等)
预定义主题:15 个(早安、心情、生日、健康等)


附录 B:参考资料

  1. Python asyncio 官方文档
  2. 事件驱动架构 | Martin Fowler
  3. SQLite 性能优化
  4. 上一篇:《第 20 篇:[主题待定]》
  5. 下一篇:《第 22 篇:防骚扰冷却机制深度解析》

版权声明:本文为 CSDN 博主「翁勇刚」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接https://blog.csdn.net/yweng18/article/details/xxxxxx(待发布后更新)