0%

跟着🐈NanoBot学AI个人助理设计和开发2:一波流!

前言

最近在看OpenClaw这个当今最火的AI个人助理,想通过OpenClaw来研究下这种个人智能助理的设计和开发原理。但是发现OpenClaw太“重”了,动不动就是几十万行代码,层层封装,想从源码层面理解它的运行逻辑(它还是typescript语言),或者想自己魔改加个小功能,往往要翻半天文档。

直到我遇到了 nanobot

它给我的第一感觉就是“干净”。核心代码只有 4000 行左右(大概只有OpenClaw 的 1%),但麻雀虽小,五脏俱全。它去掉了很多复杂的抽象,保留了 Agent 最核心的能力。

nanobot 内置了非常丰富的渠道支持。你可以把它接入 Telegram、Discord、Slack,甚至是国内的飞书、钉钉、QQ 和微信(通过 Mochat)。我现在把它挂在飞书上,平时想查个资料、翻译段文本,或者只是单纯想找个“人”聊聊代码思路,随时掏出手机就能发消息,它会像一个真正的助理一样回复你。

那么就从nanobot源码开始学习AI个人助理吧。

0在这里
1在这里

只聊一句

前面运行的nanobot gateway实际上启动了一个 全功能的AI服务端。你可以把它想象成一个繁忙的指挥中心,它同时处理着多条战线的工作。它是nanobot最典型的使用场景,但是这种模式有点复杂,不利于我们研究源码。
这里将先从nanobot agent -m这种模式入手,研究下“只与agent聊一句天,它背后干了啥”。

具体流程

nanobot agent -m 命令用于在 单次消息模式 (Single Message Mode) 下运行智能体。它允许你从命令行直接发送一条指令给智能体,智能体执行完毕并返回结果后程序即刻退出,而不会进入交互式聊天界面。

下面是该命令从输入到输出的详细执行流程:

CLI 入口与参数解析

文件: nanobot/cli/commands.py

当你运行 nanobot agent -m "你的指令" 时:

  • Typer 解析: agent 函数被调用,-m 参数的值被赋给 message 变量。
  • 环境准备:
    • load_config() 加载项目配置。
    • 初始化 MessageBus (消息总线) 和 LLMProvider (大模型提供商)。
    • 初始化核心控制器 AgentLoop

这里非常关键,一下引出了3个关键概念,下面一一详细研究下。

消息总线

MessageBus 类是一个基于 asyncio.Queue 的异步消息总线,其核心目的是解耦消息的生产者(Chat Channels,如 CLI、Slack、Discord 等)和消费者(Agent Core 智能体核心)。

它实现了经典的 生产者-消费者模式,并通过双向队列分别处理“输入”和“输出”。

1. 核心结构

nanobot/bus/queue.py

MessageBus 内部维护了两个主要的异步队列:

  • self.inbound: 用于存放发给智能体的消息(用户 -> 智能体)。
  • self.outbound: 用于存放智能体发出的回复(智能体 -> 用户)。
  • self._outbound_subscribers: 一个字典,用于管理不同渠道(Channel)的回调函数,确保消息能准确路由回来源渠道。

2. 主要功能模块

A. 输入流处理 (Inbound Flow)

当用户发送消息时(例如通过 CLI 输入或 Slack 发送):

  • publish_inbound(msg) (L25): 外部渠道调用此方法,将 InboundMessage 放入输入队列。
  • consume_inbound() (L29): 智能体核心循环 (AgentLoop) 调用此方法,阻塞等待并获取下一条待处理的消息。

B. 输出流处理 (Outbound Flow)

当智能体处理完任务并生成回复时:

  • publish_outbound(msg) (L33): 智能体调用此方法,将 OutboundMessage 放入输出队列。
  • consume_outbound() (L37): 手动消费输出消息的方法(通常用于简单的同步场景)。

C. 订阅与分发机制 (Subscription & Dispatch)

为了支持多渠道并发(例如同时运行 CLI 和 Slack 机器人),MessageBus 提供了订阅机制:

  • subscribe_outbound(channel, callback) (L41): 允许特定渠道注册回调函数。例如,Slack 适配器会订阅 channel="slack",并提供一个将消息发送到 Slack API 的回调函数。
  • dispatch_outbound() (L51): 这是一个后台任务循环。它不断从 outbound 队列中取出消息,根据消息的 channel 属性查找对应的订阅者,并执行回调函数。这确保了如果消息来自 Slack,回复也会发回 Slack,而不是发到 CLI。

3. 数据流向图解

1
2
3
4
5
6
graph LR
User[用户 (CLI/Slack)] -->|publish_inbound| InQueue[(Inbound Queue)]
InQueue -->|consume_inbound| Agent[智能体核心 (AgentLoop)]
Agent -->|publish_outbound| OutQueue[(Outbound Queue)]
OutQueue -->|dispatch_outbound| Dispatcher[分发器]
Dispatcher -->|callback| User

4. 代码细节解析

  • 非阻塞与并发: 使用 asyncio.Queue 保证了线程安全和协程并发能力。consume_inboundconsume_outbound 都是 await 操作,不会阻塞事件循环。
  • 错误处理: 在 dispatch_outbound 循环中 (L58-67),使用了 try-except 捕获回调执行中的异常,防止因为某个渠道的发送失败导致整个总线崩溃。同时使用了 asyncio.wait_for 来定期检查停止标志 _running
  • 生命周期管理: 提供了 stop() 方法 (L69) 来优雅地停止分发循环。

总结

MessageBus 是 Nanobot 架构中的中枢神经。它不仅实现了异步解耦,还通过订阅/分发机制支持了多渠道(Multi-channel)的扩展能力,使得核心智能体逻辑不需要关心消息是从哪里来的,也不需要知道如何发送回具体的平台,只需要关注处理 InboundMessage 并产生 OutboundMessage 即可。

LiteLLMProvider

LiteLLMProvider 类是 Nanobot 与各种大语言模型 (LLM) 进行交互的核心适配器。它利用 litellm 库的强大兼容性,实现了对 OpenAI、Anthropic、OpenRouter、Google Gemini 等数十种模型提供商的统一支持,而无需为每个提供商编写单独的代码。

代码位置:nanobot/providers/litellm_provider.py

1. 核心设计理念

该类采用 “配置驱动 + 统一接口” 的设计模式:

  • 统一接口: 继承自 LLMProvider,对外暴露标准的 chat() 方法。无论底层是 GPT-4 还是 Claude 3,调用方式都一样。
  • 配置驱动: 通过 providers/registry.py 中的注册表来管理不同模型的特性(如 API Key 环境变量名、模型前缀、参数覆盖等),避免了大量的 if-elif 硬编码。

2. 主要功能解析

A. 初始化与环境自动配置 (__init__ & _setup_env)

代码: L23-L71

LiteLLMProvider 实例化时:

  1. 检测网关: 调用 find_gateway 判断是否使用了 API 网关(如 OpenRouter 或自定义代理)。
  2. 设置环境变量: _setup_env 方法根据检测到的提供商(如 Anthropic 或 OpenAI),自动将传入的 api_key 设置到对应的环境变量中(如 ANTHROPIC_API_KEY)。这使得 litellm 库能自动找到凭证。
  3. LiteLLM 调优: 禁用调试日志 (suppress_debug_info) 并开启自动丢弃不支持参数 (drop_params),提高稳定性。

B. 智能模型路由 (_resolve_model)

代码: L73-L90

LiteLLM 要求模型名称带有前缀(如 anthropic/claude-3)。此方法自动处理这些细节:

  • 网关模式: 如果配置了网关,它会根据网关规则添加或去除前缀。
  • 标准模式: 自动为已知模型添加前缀。例如,如果用户配置 model: claude-3-opus,它会自动转换为 anthropic/claude-3-opus,用户无需操心前缀问题。

C. 模型参数微调 (_apply_model_overrides)

代码: L92-L100

不同的模型对参数有不同的偏好。例如,某些国产模型可能对 temperature 敏感。此方法允许通过注册表为特定模型注入特定的参数覆盖,确保最佳效果。

D. 对话请求与响应标准化 (chat & _parse_response)

代码: L102-L199

这是真正干活的地方:

  1. 准备参数: 将消息历史、工具定义、Token 限制等封装成字典。
  2. 调用 LiteLLM: 使用 acompletion 异步发送请求。
  3. 结果解析:
    • 工具调用: 将原始 JSON 解析为标准的 ToolCallRequest 对象,处理 JSON 解析错误。
    • Token 统计: 提取 usage 信息。
    • 推理内容: 如果模型支持思维链(如 DeepSeek R1),提取 reasoning_content
    • 统一返回: 打包成 LLMResponse 对象返回给 AgentLoop

3. 为什么使用 LiteLLM?

  • 多模型支持: 一套代码支持 100+ 种模型。
  • 标准化: 屏蔽了不同 API(如 OpenAI 格式 vs Anthropic 格式)的差异,输入输出完全统一。
  • 容错性: drop_params=True 自动处理参数不兼容问题(例如某些模型不支持 tool_choice)。

总结

LiteLLMProvider 是 Nanobot 的 “万能插头”。它让 Nanobot 能够轻松切换底层模型,无论是本地部署的 Ollama,还是云端的 GPT-4/Claude-3,只需更改配置,无需修改代码。

AgentLoop

AgentLoop 类是 Nanobot 的大脑和心脏。它负责协调所有的核心组件:接收消息、管理记忆、调用大模型 (LLM)、执行工具,并最终生成回复。

代码位置:nanobot/agent/loop.py

以下是 AgentLoop 的详细解构:

1. 核心职责

AgentLoop 是一个自主的智能体运行时环境。它的主要工作流程是:

  1. 监听消息总线 (MessageBus)。
  2. 构建上下文 (Context),包括历史记录、长期记忆和可用工具。
  3. 进入思考-执行循环 (ReAct Loop)
    • 询问 LLM 下一步做什么。
    • 如果 LLM 决定调用工具,执行工具并将结果反馈给 LLM。
    • 重复上述步骤,直到 LLM 给出最终回复。
  4. 将结果发送回消息总线。

2. 关键组件 (初始化)

__init__ (L38) 中,它组装了以下关键模块:

  • MessageBus: 通信管道,用于收发消息。
  • LLMProvider: 大模型接口 (如 OpenAI, Anthropic),负责生成文本和工具调用。
  • ContextBuilder: 上下文构建器,负责将杂乱的信息(历史、文件、记忆)组装成 LLM 能理解的 Prompt。
  • ToolRegistry: 工具箱,管理所有可用工具 (读写文件、Web搜索、执行Shell等)。
  • SessionManager: 会话管理器,负责保存和加载聊天记录。
  • SubagentManager: 子智能体管理器,允许主智能体生成 (Spawn) 其他智能体来处理特定任务。

3. 运行模式

AgentLoop 支持两种运行模式:

A. 守护进程模式 (run)

代码: L113
这是智能体的主循环。它无限运行 (while self._running),不断从 bus.consume_inbound() 获取消息。这种模式适用于长期运行的服务(如连接到 Slack 或后台任务)。

B. 直接调用模式 (process_direct)

代码: L427
用于 CLI 命令 (如 nanobot agent -m "...") 或 Cron 定时任务。它不经过队列等待,直接处理传入的字符串并返回结果。

4. 核心思考逻辑 (_process_message)

这是最复杂也是最核心的部分 (L147),处理单条消息的完整流程如下:

第一阶段:上下文准备

  1. 加载会话: 根据 Session ID 获取历史聊天记录。
  2. 记忆压缩 (_consolidate_memory): 如果历史记录太长(超过 memory_window,默认 50 条),会触发后台压缩任务,将旧对话总结并存入 MEMORY.md,保持上下文窗口精简 (L366)。
  3. 工具注入: 为 Message/Spawn 等工具注入当前频道信息,确保回复能发回正确的地方。
  4. 构建 Prompt: self.context.build_messages(...) 将所有信息打包成 LLM 消息列表。

第二阶段:思考-执行循环 (The Loop)

代码位于 L200-L244。这是一个 while 循环,最大迭代次数由 max_iterations 控制(防止无限循环)。

  • Step 1: 询问 LLM
    调用 self.provider.chat(messages, tools=...)

  • Step 2: 判断行动

    • 情况 A: LLM 想要调用工具 (response.has_tool_calls)
      1. 将 LLM 的“思考过程”和“工具调用请求”加入消息历史。
      2. 执行工具: 遍历 tool_calls,调用 self.tools.execute(...) 执行实际代码(如读取文件、搜索网页)。
      3. 反馈结果: 将工具的执行结果 (tool_result) 作为新消息追加到历史中。
      4. 继续循环: 带着工具的结果,进入下一次迭代,让 LLM 决定下一步。
    • 情况 B: LLM 返回最终回复
      1. 获取文本内容。
      2. 跳出循环 (break)

第三阶段:收尾

  1. 保存记录: 将用户的输入、LLM 的回复以及使用过的工具列表保存到 Session 文件中 (L253)。
  2. 发送回复: 封装 OutboundMessage 并通过 Bus 发送出去。

5. 记忆系统 (_consolidate_memory)

代码: L366
这是一个非常关键的自我维护机制。当对话过长时,AgentLoop 不会简单地截断,而是:

  1. 提取旧的消息。
  2. 调用 LLM (作为 memory consolidation agent)。
  3. 要求 LLM 生成两样东西:
    • History Entry: 一段摘要,存入 HISTORY.md
    • Memory Update: 更新 MEMORY.md 中的长期事实(如用户偏好、项目背景)。
  4. 这样,即使原始对话被丢弃,关键信息也留在了长期记忆中。

总结

AgentLoop 实现了一个标准的 ReAct (Reasoning + Acting) 架构。它不仅仅是一个聊天机器人,更是一个能通过工具与环境交互、并通过记忆系统维持长期连贯性的智能体运行时。

进入单次执行模式

文件: nanobot/cli/commands.py

代码检查 if message: 是否存在:

  • 交互模式 (False): 如果没有 -m,代码会进入 else 分支启动交互式 Shell (run_interactive)。
  • 单次模式 (True): 存在 -m 时,定义并运行 run_once 异步函数:
    • 启动 _thinking_ctx() 上下文管理器,在终端显示 “nanobot is thinking…” 加载动画(除非开启了 --logs)。
    • 调用核心处理函数 agent_loop.process_direct(message, session_id)

核心处理循环 (AgentLoop)

文件: nanobot/agent/loop.py

process_direct 是从 CLI 到智能体逻辑的桥梁,它将你的字符串指令封装为 InboundMessage,然后调用内部的 _process_message 方法。这是智能体真正”思考”的地方:

A. 上下文构建

  • 会话管理: 根据 session_id 获取或创建会话记录 (Session),加载历史对话。
  • 记忆整合: 如果历史消息超过 memory_window (默认 50 条),触发 _consolidate_memory 将旧消息压缩为长期记忆 (L366)。
  • 工具准备: 为 Message/Spawn/Cron 等工具注入当前上下文信息。
  • 构建 Prompt: ContextBuilder 将系统提示词、历史记录、当前指令、长期记忆组合成发送给 LLM 的最终消息列表。

B. 思考与执行循环 (Loop)

文件: nanobot/agent/loop.py

智能体进入一个 while 循环 (最多 max_iterations 次,默认 20 次):

  1. 调用 LLM: 将消息列表和工具定义发送给模型 (self.provider.chat)。
  2. 处理响应:
    • 情况 1:模型决定调用工具 (Has Tool Calls)
      • 解析工具调用请求 (如 read_file, web_search 等)。
      • 执行工具逻辑 (self.tools.execute)。
      • 将工具执行结果追加到消息列表中。
      • 继续循环,让模型根据工具结果进行下一步思考。
    • 情况 2:模型返回文本 (No Tool Calls)
      • 获取最终回复内容。
      • 跳出循环

结果返回与展示

文件: nanobot/cli/commands.py

一旦 process_direct 返回最终文本:

  • 保存会话: 你的指令和智能体的回复被保存到 Session 中,供下次调用使用。
  • 打印输出: _print_agent_response 函数将结果渲染为 Markdown 格式打印到终端。
  • 程序退出: asyncio.run(run_once()) 结束,Python 进程退出。

总结

nanobot agent -m 本质上是 “启动环境 -> 执行一次思考循环 -> 打印结果 -> 退出” 的快捷通道,非常适合用于脚本集成或快速的单次问答任务。