0%

跟着🐈NanoBot学AI个人助理设计和开发3:一夫当关!

前言

最近在看OpenClaw这个当今最火的AI个人助理,想通过OpenClaw来研究下这种个人智能助理的设计和开发原理。但是发现OpenClaw太“重”了,动不动就是几十万行代码,层层封装,想从源码层面理解它的运行逻辑(它还是typescript语言),或者想自己魔改加个小功能,往往要翻半天文档。

直到我遇到了 nanobot

它给我的第一感觉就是“干净”。核心代码只有 4000 行左右(大概只有OpenClaw 的 1%),但麻雀虽小,五脏俱全。它去掉了很多复杂的抽象,保留了 Agent 最核心的能力。

nanobot 内置了非常丰富的渠道支持。你可以把它接入 Telegram、Discord、Slack,甚至是国内的飞书、钉钉、QQ 和微信(通过 Mochat)。我现在把它挂在飞书上,平时想查个资料、翻译段文本,或者只是单纯想找个“人”聊聊代码思路,随时掏出手机就能发消息,它会像一个真正的助理一样回复你。

那么就从nanobot源码开始学习AI个人助理吧。

0在这里
1在这里
2在这里

一夫当关

前面是看了“怎样只聊一句”,现在看看“网关模式”做了啥。

具体流程

nanobot gateway 是 Nanobot 的核心后台服务进程。简单来说,它就像一个不知疲倦的“接线员”兼“大脑”,负责连接外部世界(如 Telegram/微信)、管理内部任务(如定时提醒),并指挥 AI(Agent)进行思考和回复。

以下是 nanobot gateway 启动和运行的详细步骤解析:

第一阶段:初始化 (Initialization)

当你运行 nanobot gateway 命令时,系统会按顺序完成以下准备工作:

  1. 加载配置 (load_config):读取你的 ~/.nanobot/config.json 配置文件,确定要启用哪些功能。
  2. 创建消息总线 (MessageBus):这是一个内部的通信管道,用于在“接收消息的渠道”和“处理消息的 Agent”之间传递数据。
  3. 准备大脑 (AgentLoop):初始化 AI 核心循环。它加载了 LLM 提供商(如 OpenAI/Claude)、工具集(搜索、文件操作等)以及记忆管理器。
  4. 准备定时器 (CronService & HeartbeatService):
    • Cron: 加载保存的定时任务(如“每天早上 8 点提醒我看新闻”)。
    • Heartbeat: 设置心跳机制(默认每 30 分钟),用于自我保活或周期性自检。
  5. 准备渠道 (ChannelManager):根据配置初始化外部通信渠道(如 Telegram, WhatsApp, Slack 等)。

这个地方与之前的“只聊一句”又有很大差别,引入了新概念。

SessionManager

SessionManager 类是 Nanobot 用来管理聊天会话(Conversation Sessions)的核心组件。它的主要职责是持久化存储检索用户与机器人之间的聊天记录。

它通过管理 Session 对象来实现这一功能,每个 Session 代表一个独立的对话上下文(例如你和机器人在 Telegram 上的私聊,或者你在命令行里的即兴对话)。

核心功能

  1. 会话存储 (Storage)

    • 位置: 所有会话都保存在 ~/.nanobot/sessions/ 目录下。
    • 格式: 采用 JSONL (JSON Lines) 格式。
      • 文件名通常是 channel_chatid.jsonl(例如 telegram_123456789.jsonl)。
      • 文件的第一行始终是元数据 (Metadata),包含创建时间、更新时间等。
      • 后续每一行代表一条消息 (Message),包含角色(user/assistant)、内容、时间戳等。
    • 优点: JSONL 格式允许追加写入(Append-only),性能好且易于阅读和修复。
  2. 内存缓存 (Caching)

    • 为了减少磁盘 I/O,SessionManager 会在内存中维护一个 _cache 字典。
    • 当你频繁与机器人对话时,它会直接从内存中读取会话对象,只有在保存时才写入磁盘。

关键类与方法

1. Session (数据类)

代表单个会话的数据结构。

  • 属性:
    • key: 唯一标识符,格式通常为 channel:chat_id(如 telegram:12345)。
    • messages: 消息列表。
    • created_at / updated_at: 时间戳。
  • 方法:
    • add_message(role, content): 添加一条新消息。
    • get_history(max_messages): 获取最近的 N 条消息,并格式化为 LLM(大模型)可接受的格式(只保留 rolecontent)。
    • clear(): 清空当前会话的所有消息。

2. SessionManager (管理器类)

负责对 Session 进行增删改查。

  • __init__(workspace): 初始化,确保 ~/.nanobot/sessions 目录存在。
  • get_or_create(key): 最常用的方法
    1. 先查内存缓存。
    2. 缓存没有,就去磁盘加载。
    3. 磁盘也没有,就创建一个新的空白会话。
  • save(session): 将会话状态写回磁盘。它会重写整个文件,确保元数据和消息列表都是最新的。
  • delete(key): 删除某个会话(从内存移除并删除对应的 .jsonl 文件)。
  • list_sessions(): 扫描目录,读取所有 .jsonl 文件的第一行(元数据),返回所有会话的列表,按更新时间倒序排列。

代码结构图解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
classDiagram
class SessionManager {
- Path sessions_dir
- dict _cache
+ get_or_create(key) Session
+ save(session)
+ delete(key)
+ list_sessions()
}

class Session {
+ str key
+ list messages
+ datetime created_at
+ add_message(role, content)
+ get_history(max_messages)
}

SessionManager "1" --> "*" Session : manages

实际工作流程示例

当你发一条消息 “Hello” 给机器人时:

  1. 加载: gateway 调用 manager.get_or_create("telegram:123")。管理器发现内存里没有,于是去读 ~/.nanobot/sessions/telegram_123.jsonl,加载之前的聊天记录。
  2. 更新: Agent 处理完消息后,调用 session.add_message("user", "Hello")session.add_message("assistant", "Hi there!")
  3. 保存: Agent 调用 manager.save(session)。管理器将更新后的消息列表写回 jsonl 文件。

总结

SessionManager 是 Nanobot 的记忆中枢。它简单、高效地利用文件系统来保存对话上下文,确保机器人即使重启也能“记得”你们之前的聊天内容。

CronService

CronService 是 Nanobot 的定时任务调度器。它类似于 Linux 的 cron 或 Python 的 Celery,但更轻量且专为 Agent 设计。它的主要职责是管理和执行那些需要在未来某个时间点或周期性执行的任务。

核心功能

  1. 任务调度 (Scheduling)

    • 支持三种调度方式:
      • 一次性 (at): 在指定时间点执行一次(例如“明天早上 8 点”)。
      • 间隔循环 (every): 每隔一段时间执行一次(例如“每 30 分钟”)。
      • Cron 表达式 (cron): 使用标准 crontab 语法(例如 0 8 * * * 每天早上 8 点)。
    • 使用 _compute_next_run 函数计算下一次执行时间。
  2. 任务持久化 (Persistence)

    • 文件存储: 所有任务数据保存在 JSON 文件中(通常是 ~/.nanobot/data/cron/jobs.json)。
    • 结构: CronStore 包含一个任务列表,每个 CronJob 包含 ID、名称、调度规则、Payload(要做什么)以及运行状态。
    • 优点: 即使 Nanobot 重启,任务也不会丢失。重启后会重新计算下次运行时间。
  3. 任务执行 (Execution)

    • Payload: 任务的核心是一个 message(例如 “check weather”)。
    • 执行: 当时间到了,CronService 不会自己执行业务逻辑,而是调用回调函数 on_job(通常绑定到 commands.py 里的 agent.process_direct),把这个 message 扔给 Agent 去处理。
    • 结果分发: 如果任务配置了 deliver=True 和目标 to(如 Telegram 用户 ID),执行结果会自动发送给该用户。
  4. 定时器机制 (Timer Loop)

    • 它不使用“每秒轮询”这种低效方式。
    • 智能休眠: _arm_timer 方法会计算距离“最近一个待执行任务”还有多久(delay_s),然后 asyncio.sleep(delay_s)。这样在空闲时几乎不消耗 CPU。
    • 当有新任务添加或任务执行完毕时,会重新计算并重置定时器。

关键类与方法

CronService

  • __init__(store_path, on_job): 初始化服务,指定存储路径和执行回调。
  • start() / stop(): 启动/停止服务。启动时会加载任务并计算下次运行时间。
  • add_job(...): 添加新任务。会自动保存到磁盘并更新定时器。
  • remove_job(job_id): 删除任务。
  • _execute_job(job): 执行单个任务。
    1. 调用 self.on_job(job) 触发 Agent。
    2. 更新任务状态(last_status, last_run_at)。
    3. 对于一次性任务,执行完后将其禁用或删除;对于循环任务,计算下次运行时间。
    4. 保存状态到磁盘。
  • _arm_timer(): 核心调度逻辑。找到所有任务中 next_run_at_ms 最小的那个时间点,设置一个 asyncio 延时任务。

任务数据结构 (CronJob)

一个典型的任务数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"id": "a1b2c3d4",
"name": "每日早报",
"schedule": {
"kind": "cron",
"expr": "0 8 * * *" // 每天 8:00
},
"payload": {
"message": "总结今天的科技新闻", // Agent 收到的指令
"deliver": true,
"channel": "telegram",
"to": "123456789" // 结果发给谁
},
"state": {
"nextRunAtMs": 1700000000000
}
}

实际工作流程示例

  1. 用户指令: 你对机器人说“每天早上 8 点提醒我喝水”。
  2. 创建任务: Agent 调用 CronService.add_job,创建一个 cron 任务,schedule 为 0 8 * * *,payload message 为 “提醒用户喝水”。
  3. 等待: CronService 计算出下次 8 点的时间戳,进入休眠。
  4. 触发: 第二天早上 8 点,定时器醒来,调用 _execute_job
  5. 回调: CronService 调用 gateway 中的 on_cron_job 回调。
  6. Agent 执行: Agent 收到 “提醒用户喝水” 的指令,生成回复 “早上好,该喝水了!”。
  7. 推送: 因为 deliver=True,回复会自动推送到你的 Telegram。

总结

CronService 是 Nanobot 的生物钟。它让 Agent 具备了时间观念,不再只是被动等待用户说话,而是能主动在特定时间执行任务。它设计得非常轻量且健壮,利用文件系统持久化和 asyncio 实现高效调度。

HeartbeatService

HeartbeatService 是 Nanobot 的心跳机制,也是一种“周期性自检”服务。它的作用是让 Agent 即使在没有用户发消息的时候,也能每隔一段时间“醒来”一次,检查是否有后台任务需要处理。

CronService(基于精确时间调度)不同,HeartbeatService 更加简单粗暴,它基于一个固定的文件 HEARTBEAT.md 来驱动。

核心逻辑

  1. 心跳周期 (Interval)

    • 默认每 30 分钟(1800 秒)触发一次。
    • 这是一个死循环 (while self._running),每次执行完 await asyncio.sleep(self.interval_s) 后就会“跳动”一次。
  2. 驱动源 (HEARTBEAT.md)

    • 服务会检查工作区(Workspace)下是否存在 HEARTBEAT.md 文件。
    • 如果文件不存在或为空:Agent 继续睡觉,什么都不做。
    • 如果文件有内容:Agent 被唤醒。
  3. 唤醒与执行

    • 当心跳触发且 HEARTBEAT.md 有内容时,服务会调用 on_heartbeat 回调函数。
    • 这个回调实际上是向 Agent 发送了一条特殊的 Prompt
      1
      2
      3
      Read HEARTBEAT.md in your workspace (if it exists).
      Follow any instructions or tasks listed there.
      If nothing needs attention, reply with just: HEARTBEAT_OK
    • Agent 收到这条指令后,会读取 HEARTBEAT.md,执行里面列出的任务(例如“检查是否有新的待办事项”、“整理今天的日志”等)。
  4. 反馈机制

    • 如果 Agent 执行完任务,或者发现其实没什么要做的,它应该回复 HEARTBEAT_OK
    • HeartbeatService 收到这个回复后,会记录一条日志 “Heartbeat: OK”,表示本次心跳正常结束。

关键方法解析

  • _is_heartbeat_empty(content):

    • 这是一个辅助函数,用来判断 HEARTBEAT.md 是否真的有“干货”。
    • 它会忽略空行、标题(# 开头)、HTML 注释(<!-- -->)以及空的复选框(- [ ])。
    • 只有当文件里有真正的文本内容时,才会触发心跳。这避免了因为文件里只写了个标题而频繁唤醒 Agent 浪费 Token。
  • _run_loop():

    • 后台常驻协程。
    • 只要 self._running 为 True,就无限循环:sleep -> _tick
  • _tick():

    • 单次心跳的逻辑。
      1. 读取 HEARTBEAT.md
      1. 检查是否为空 (_is_heartbeat_empty)。为空则直接返回。
      1. 调用 self.on_heartbeat(HEARTBEAT_PROMPT) 唤醒 Agent。
      1. 检查 Agent 的回复是否包含 HEARTBEAT_OK,记录日志。

设计意图

HeartbeatService 的设计初衷是实现异步的长运行任务状态维护,而不需要用户显式触发。

  • 场景 1:长任务队列

    • 你可以往 HEARTBEAT.md 里写入:”处理 data/raw 目录下的所有 PDF 文件”。
    • 即使你下线了,Agent 也会每半小时醒来一次,处理一部分文件,直到处理完把 HEARTBEAT.md 清空。
  • 场景 2:自我反思/整理

    • 可以在 HEARTBEAT.md 里写:”检查 MEMORY.md,如果太乱了就整理一下”。
    • Agent 会定期整理自己的记忆,保持“头脑清醒”。

总结

HeartbeatService 是一个基于文件的被动触发器。它通过检测 HEARTBEAT.md 文件的状态来决定是否唤醒 Agent。这是一种非常灵活的机制,允许用户通过简单地修改文件来控制 Agent 的后台行为,而无需编写复杂的 Cron 表达式。

ChannelManager

ChannelManager 是 Nanobot 的外交部长。它统一管理所有外部通信渠道(如 Telegram, WhatsApp, Slack 等),负责它们的生命周期(启动/停止)以及消息的分发。

核心职责

  1. 渠道初始化 (_init_channels)

    • 在启动时,根据 nanobot.toml 配置文件,动态加载并实例化启用的渠道。
    • 目前支持的渠道包括:Telegram, WhatsApp, Discord, 飞书, Mochat, 钉钉, Email, Slack, QQ 等。
    • 每个渠道都继承自 BaseChannel,拥有统一的接口(start, stop, send)。
  2. 生命周期管理

    • start_all(): 并发启动所有已启用的渠道。
      • 每个渠道的 start() 方法通常会启动一个轮询循环(Polling)或 Webhook 服务器来接收消息。
      • 同时启动一个 _dispatch_outbound 任务,负责处理发出去的消息。
    • stop_all(): 优雅关闭所有渠道,释放资源(如关闭 HTTP 连接)。
  3. 消息路由 (Routing)

    • 输入路由(隐式): 各个 Channel 实例内部会持有 bus(消息总线)的引用。当它们收到外部消息时,会直接调用 bus.publish_inbound(...) 将消息扔进总线,供 Agent 消费。ChannelManager 不直接干预这一步。
    • 输出路由(显式):
      • ChannelManager 启动一个后台任务 _dispatch_outbound
      • 这个任务不断从总线的输出队列 (consume_outbound) 中取出消息。
      • 它检查消息的 channel 属性(例如 “telegram”)。
      • 然后在自己的 self.channels 字典里找到对应的 Channel 实例,调用 channel.send(msg) 将消息发出去。

关键代码逻辑

  • _dispatch_outbound:
    这是核心的消息分发循环。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    while True:
    # 1. 从总线获取待发送消息
    msg = await bus.consume_outbound()

    # 2. 查找目标渠道
    channel = self.channels.get(msg.channel)

    # 3. 发送
    if channel:
    await channel.send(msg)
    这个设计实现了 Agent 与具体通信协议的解耦。Agent 只需要产生一个通用的 OutboundMessage,不需要知道对方是用 HTTP 请求还是 WebSocket 发送,也不需要知道对方的 API Key 是什么。所有这些脏活累活都由 ChannelManager 委托给具体的 Channel 实现类去处理。

总结

ChannelManager 是连接 Nanobot 内核(Agent)与外部世界(用户)的桥梁。它通过统一的接口屏蔽了不同聊天平台的差异,让 Agent 可以专注于处理逻辑,而不用关心消息是如何传输的。

第二阶段:并发启动 (Startup)

初始化完成后,Gateway 会启动以下四个主要服务:

1
2
3
4
5
6
7
try:
await cron.start()
await heartbeat.start()
await asyncio.gather(
agent.run(),
channels.start_all(),
)

  1. 启动定时任务调度器 (cron.start()):开始计时,等待任务触发。
  2. 启动心跳服务 (heartbeat.start()):开始倒计时。
  3. 启动 Agent 主循环 (agent.run()):Agent 进入“待机状态”,时刻监听消息总线上的输入消息
  4. 启动渠道管理 (channels.start_all()):
    • 启动所有启用的渠道(例如开始轮询 Telegram 消息,或监听 Webhook)。
    • 启动输出分发器 (_dispatch_outbound),时刻监听消息总线上的输出消息

这段代码是 nanobot gateway 启动的核心逻辑,它展示了从初始化后台服务进入主运行循环的过程。

代码逻辑分步解析

1
2
3
4
5
6
7
8
9
10
11
# 1. 启动定时任务服务 (非阻塞)
await cron.start()

# 2. 启动心跳服务 (非阻塞)
await heartbeat.start()

# 3. 并发启动 Agent 和 Channels (阻塞/长运行)
await asyncio.gather(
agent.run(),
channels.start_all(),
)

为什么要按这个顺序?

这个顺序的设计主要基于任务的性质(是非阻塞初始化,还是长运行死循环):

1. 为什么 cronheartbeat 先行?

  • 非阻塞初始化: cron.start()heartbeat.start() 方法内部仅仅是设置状态标志(_running = True)并创建后台的 asyncio.Task(计时器)。它们执行非常快,瞬间就会返回,不会阻塞主程序的执行。
  • 就绪原则: 按照依赖关系,最好先让内部的调度系统(定时器、心跳)准备就绪。这样一旦 Agent 开始工作,所有的定时触发机制都已经处于激活状态。

2. 为什么 agentchannels 要用 asyncio.gather

  • 阻塞运行:
    • agent.run() 是一个 while True 循环,它会一直卡在那里监听消息总线,直到程序退出。
    • channels.start_all() 也是一个阻塞操作,它会等待所有启用的渠道(Telegram, WhatsApp 等)运行,而这些渠道通常也是通过 while True 轮询或长连接挂起的。
  • 并发必要性:
    • 如果写成顺序执行(例如先 await agent.run()),代码就会永远停在这一行,Agent 虽然跑起来了,但 channels.start_all() 永远不会被执行,导致机器人无法收发消息。
    • asyncio.gather 的作用就是让这两个“死循环”在同一个事件循环中同时跑。Agent 负责“思考”,Channels 负责“听和说”,两者并行不悖。

总结图解

可以将这个启动过程想象成一家餐厅开门:

  1. cron.start(): 经理先打开闹钟,设置好提醒(比如“中午12点开启特价午餐”)。这一步只是拨个开关,马上就好。
  2. heartbeat.start(): 经理开启打卡机,每半小时检查一次员工状态。这一步也只是通个电,马上就好。
  3. asyncio.gather(...): 餐厅正式对外营业
    • agent.run(): 厨师(Agent) 站到灶台前,开始死循环等待订单。
    • channels.start_all(): 服务员(Channels) 站到门口,开始死循环等待顾客。

厨师和服务员必须同时进入工作状态,餐厅才能正常运转。

第三阶段:消息处理循环 (The Loop)

这是 Gateway 运行时的核心逻辑,分为“接收”和“发送”两条路径:

1. 输入路径:从 用户 到 Agent

当你在 Telegram 给机器人发一条消息时:

  1. 接收: TelegramChannel 收到消息。
  2. 发布: 渠道将消息封装为 InboundMessage(包含内容、发送者ID、渠道名),扔进消息总线
  3. 获取: 正在待机的 AgentLoop 从总线中抓取到这条消息。
  4. 思考与执行:
    • Agent 读取历史聊天记录。
    • Agent 将历史记录 + 新消息发送给 LLM(大模型)。
    • 工具调用: 如果 LLM 决定使用工具(如“搜索天气”),Agent 会执行工具代码,并将结果再次喂给 LLM。
    • 生成回复: LLM 生成最终的文本回复。
  5. 存储: Agent 将对话记录保存到数据库(SessionManager)。
  6. 输出: Agent 将回复封装为 OutboundMessage,扔回消息总线

2. 输出路径:从 Agent 到 用户

当 Agent 产生回复(或定时任务触发)时:

  1. 分发: ChannelManager 的输出分发器从总线中抓取到 OutboundMessage
  2. 路由: 分发器查看消息的标签(例如 channel="telegram")。
  3. 发送: 找到对应的 TelegramChannel 实例,调用其发送接口(如调用 Telegram API),将消息推送到你的手机上。

总结图解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph TD
User[用户] --发消息--> Channel[通信渠道 (Telegram/Slack)]
Channel --InboundMessage--> Bus[消息总线]
Bus --InboundMessage--> Agent[AgentLoop (大脑)]

Agent --思考/查资料--> Tools[工具集]
Tools --结果--> Agent

Agent --OutboundMessage--> Bus
Bus --OutboundMessage--> Dispatcher[渠道分发器]
Dispatcher --找到对应渠道--> Channel
Channel --回复--> User

Cron[定时任务] --触发--> Agent

简而言之,nanobot gateway 就是一个死循环,不断地搬运消息、触发 AI 思考、并把结果送回给用户。