前言
最近在看OpenClaw这个当今最火的AI个人助理,想通过OpenClaw来研究下这种个人智能助理的设计和开发原理。但是发现OpenClaw太“重”了,动不动就是几十万行代码,层层封装,想从源码层面理解它的运行逻辑(它还是typescript语言),或者想自己魔改加个小功能,往往要翻半天文档。
直到我遇到了 nanobot。
它给我的第一感觉就是“干净”。核心代码只有 4000 行左右(大概只有OpenClaw 的 1%),但麻雀虽小,五脏俱全。它去掉了很多复杂的抽象,保留了 Agent 最核心的能力。
nanobot 内置了非常丰富的渠道支持。你可以把它接入 Telegram、Discord、Slack,甚至是国内的飞书、钉钉、QQ 和微信(通过 Mochat)。我现在把它挂在飞书上,平时想查个资料、翻译段文本,或者只是单纯想找个“人”聊聊代码思路,随时掏出手机就能发消息,它会像一个真正的助理一样回复你。
那么就从nanobot源码开始学习AI个人助理吧。
一夫当关
前面是看了“怎样只聊一句”,现在看看“网关模式”做了啥。
具体流程
nanobot gateway 是 Nanobot 的核心后台服务进程。简单来说,它就像一个不知疲倦的“接线员”兼“大脑”,负责连接外部世界(如 Telegram/微信)、管理内部任务(如定时提醒),并指挥 AI(Agent)进行思考和回复。
以下是 nanobot gateway 启动和运行的详细步骤解析:
第一阶段:初始化 (Initialization)
当你运行 nanobot gateway 命令时,系统会按顺序完成以下准备工作:
- 加载配置 (
load_config):读取你的~/.nanobot/config.json配置文件,确定要启用哪些功能。 - 创建消息总线 (
MessageBus):这是一个内部的通信管道,用于在“接收消息的渠道”和“处理消息的 Agent”之间传递数据。 - 准备大脑 (
AgentLoop):初始化 AI 核心循环。它加载了 LLM 提供商(如 OpenAI/Claude)、工具集(搜索、文件操作等)以及记忆管理器。 - 准备定时器 (
CronService&HeartbeatService):- Cron: 加载保存的定时任务(如“每天早上 8 点提醒我看新闻”)。
- Heartbeat: 设置心跳机制(默认每 30 分钟),用于自我保活或周期性自检。
- 准备渠道 (
ChannelManager):根据配置初始化外部通信渠道(如 Telegram, WhatsApp, Slack 等)。
这个地方与之前的“只聊一句”又有很大差别,引入了新概念。
SessionManager
SessionManager 类是 Nanobot 用来管理聊天会话(Conversation Sessions)的核心组件。它的主要职责是持久化存储和检索用户与机器人之间的聊天记录。
它通过管理 Session 对象来实现这一功能,每个 Session 代表一个独立的对话上下文(例如你和机器人在 Telegram 上的私聊,或者你在命令行里的即兴对话)。
核心功能
会话存储 (Storage)
- 位置: 所有会话都保存在
~/.nanobot/sessions/目录下。 - 格式: 采用 JSONL (JSON Lines) 格式。
- 文件名通常是
channel_chatid.jsonl(例如telegram_123456789.jsonl)。 - 文件的第一行始终是元数据 (Metadata),包含创建时间、更新时间等。
- 后续每一行代表一条消息 (Message),包含角色(user/assistant)、内容、时间戳等。
- 文件名通常是
- 优点: JSONL 格式允许追加写入(Append-only),性能好且易于阅读和修复。
- 位置: 所有会话都保存在
内存缓存 (Caching)
- 为了减少磁盘 I/O,
SessionManager会在内存中维护一个_cache字典。 - 当你频繁与机器人对话时,它会直接从内存中读取会话对象,只有在保存时才写入磁盘。
- 为了减少磁盘 I/O,
关键类与方法
1. Session (数据类)
代表单个会话的数据结构。
- 属性:
key: 唯一标识符,格式通常为channel:chat_id(如telegram:12345)。messages: 消息列表。created_at/updated_at: 时间戳。
- 方法:
add_message(role, content): 添加一条新消息。get_history(max_messages): 获取最近的 N 条消息,并格式化为 LLM(大模型)可接受的格式(只保留role和content)。clear(): 清空当前会话的所有消息。
2. SessionManager (管理器类)
负责对 Session 进行增删改查。
__init__(workspace): 初始化,确保~/.nanobot/sessions目录存在。get_or_create(key): 最常用的方法。- 先查内存缓存。
- 缓存没有,就去磁盘加载。
- 磁盘也没有,就创建一个新的空白会话。
save(session): 将会话状态写回磁盘。它会重写整个文件,确保元数据和消息列表都是最新的。delete(key): 删除某个会话(从内存移除并删除对应的.jsonl文件)。list_sessions(): 扫描目录,读取所有.jsonl文件的第一行(元数据),返回所有会话的列表,按更新时间倒序排列。
代码结构图解
1 | classDiagram |
实际工作流程示例
当你发一条消息 “Hello” 给机器人时:
- 加载:
gateway调用manager.get_or_create("telegram:123")。管理器发现内存里没有,于是去读~/.nanobot/sessions/telegram_123.jsonl,加载之前的聊天记录。 - 更新: Agent 处理完消息后,调用
session.add_message("user", "Hello")和session.add_message("assistant", "Hi there!")。 - 保存: Agent 调用
manager.save(session)。管理器将更新后的消息列表写回jsonl文件。
总结
SessionManager 是 Nanobot 的记忆中枢。它简单、高效地利用文件系统来保存对话上下文,确保机器人即使重启也能“记得”你们之前的聊天内容。
CronService
CronService 是 Nanobot 的定时任务调度器。它类似于 Linux 的 cron 或 Python 的 Celery,但更轻量且专为 Agent 设计。它的主要职责是管理和执行那些需要在未来某个时间点或周期性执行的任务。
核心功能
任务调度 (Scheduling)
- 支持三种调度方式:
- 一次性 (
at): 在指定时间点执行一次(例如“明天早上 8 点”)。 - 间隔循环 (
every): 每隔一段时间执行一次(例如“每 30 分钟”)。 - Cron 表达式 (
cron): 使用标准 crontab 语法(例如0 8 * * *每天早上 8 点)。
- 一次性 (
- 使用
_compute_next_run函数计算下一次执行时间。
- 支持三种调度方式:
任务持久化 (Persistence)
- 文件存储: 所有任务数据保存在 JSON 文件中(通常是
~/.nanobot/data/cron/jobs.json)。 - 结构:
CronStore包含一个任务列表,每个CronJob包含 ID、名称、调度规则、Payload(要做什么)以及运行状态。 - 优点: 即使 Nanobot 重启,任务也不会丢失。重启后会重新计算下次运行时间。
- 文件存储: 所有任务数据保存在 JSON 文件中(通常是
任务执行 (Execution)
- Payload: 任务的核心是一个
message(例如 “check weather”)。 - 执行: 当时间到了,
CronService不会自己执行业务逻辑,而是调用回调函数on_job(通常绑定到commands.py里的agent.process_direct),把这个 message 扔给 Agent 去处理。 - 结果分发: 如果任务配置了
deliver=True和目标to(如 Telegram 用户 ID),执行结果会自动发送给该用户。
- Payload: 任务的核心是一个
定时器机制 (Timer Loop)
- 它不使用“每秒轮询”这种低效方式。
- 智能休眠:
_arm_timer方法会计算距离“最近一个待执行任务”还有多久(delay_s),然后asyncio.sleep(delay_s)。这样在空闲时几乎不消耗 CPU。 - 当有新任务添加或任务执行完毕时,会重新计算并重置定时器。
关键类与方法
CronService
__init__(store_path, on_job): 初始化服务,指定存储路径和执行回调。start()/stop(): 启动/停止服务。启动时会加载任务并计算下次运行时间。add_job(...): 添加新任务。会自动保存到磁盘并更新定时器。remove_job(job_id): 删除任务。_execute_job(job): 执行单个任务。- 调用
self.on_job(job)触发 Agent。 - 更新任务状态(
last_status,last_run_at)。 - 对于一次性任务,执行完后将其禁用或删除;对于循环任务,计算下次运行时间。
- 保存状态到磁盘。
- 调用
_arm_timer(): 核心调度逻辑。找到所有任务中next_run_at_ms最小的那个时间点,设置一个 asyncio 延时任务。
任务数据结构 (CronJob)
一个典型的任务数据如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17{
"id": "a1b2c3d4",
"name": "每日早报",
"schedule": {
"kind": "cron",
"expr": "0 8 * * *" // 每天 8:00
},
"payload": {
"message": "总结今天的科技新闻", // Agent 收到的指令
"deliver": true,
"channel": "telegram",
"to": "123456789" // 结果发给谁
},
"state": {
"nextRunAtMs": 1700000000000
}
}
实际工作流程示例
- 用户指令: 你对机器人说“每天早上 8 点提醒我喝水”。
- 创建任务: Agent 调用
CronService.add_job,创建一个 cron 任务,schedule 为0 8 * * *,payload message 为 “提醒用户喝水”。 - 等待:
CronService计算出下次 8 点的时间戳,进入休眠。 - 触发: 第二天早上 8 点,定时器醒来,调用
_execute_job。 - 回调:
CronService调用gateway中的on_cron_job回调。 - Agent 执行: Agent 收到 “提醒用户喝水” 的指令,生成回复 “早上好,该喝水了!”。
- 推送: 因为
deliver=True,回复会自动推送到你的 Telegram。
总结
CronService 是 Nanobot 的生物钟。它让 Agent 具备了时间观念,不再只是被动等待用户说话,而是能主动在特定时间执行任务。它设计得非常轻量且健壮,利用文件系统持久化和 asyncio 实现高效调度。
HeartbeatService
HeartbeatService 是 Nanobot 的心跳机制,也是一种“周期性自检”服务。它的作用是让 Agent 即使在没有用户发消息的时候,也能每隔一段时间“醒来”一次,检查是否有后台任务需要处理。
与 CronService(基于精确时间调度)不同,HeartbeatService 更加简单粗暴,它基于一个固定的文件 HEARTBEAT.md 来驱动。
核心逻辑
心跳周期 (Interval)
- 默认每 30 分钟(1800 秒)触发一次。
- 这是一个死循环 (
while self._running),每次执行完await asyncio.sleep(self.interval_s)后就会“跳动”一次。
驱动源 (
HEARTBEAT.md)- 服务会检查工作区(Workspace)下是否存在
HEARTBEAT.md文件。 - 如果文件不存在或为空:Agent 继续睡觉,什么都不做。
- 如果文件有内容:Agent 被唤醒。
- 服务会检查工作区(Workspace)下是否存在
唤醒与执行
- 当心跳触发且
HEARTBEAT.md有内容时,服务会调用on_heartbeat回调函数。 - 这个回调实际上是向 Agent 发送了一条特殊的 Prompt:
1
2
3Read HEARTBEAT.md in your workspace (if it exists).
Follow any instructions or tasks listed there.
If nothing needs attention, reply with just: HEARTBEAT_OK - Agent 收到这条指令后,会读取
HEARTBEAT.md,执行里面列出的任务(例如“检查是否有新的待办事项”、“整理今天的日志”等)。
- 当心跳触发且
反馈机制
- 如果 Agent 执行完任务,或者发现其实没什么要做的,它应该回复
HEARTBEAT_OK。 HeartbeatService收到这个回复后,会记录一条日志 “Heartbeat: OK”,表示本次心跳正常结束。
- 如果 Agent 执行完任务,或者发现其实没什么要做的,它应该回复
关键方法解析
_is_heartbeat_empty(content):- 这是一个辅助函数,用来判断
HEARTBEAT.md是否真的有“干货”。 - 它会忽略空行、标题(
#开头)、HTML 注释(<!-- -->)以及空的复选框(- [ ])。 - 只有当文件里有真正的文本内容时,才会触发心跳。这避免了因为文件里只写了个标题而频繁唤醒 Agent 浪费 Token。
- 这是一个辅助函数,用来判断
_run_loop():- 后台常驻协程。
- 只要
self._running为 True,就无限循环:sleep->_tick。
_tick():- 单次心跳的逻辑。
- 读取
HEARTBEAT.md。
- 读取
- 检查是否为空 (
_is_heartbeat_empty)。为空则直接返回。
- 检查是否为空 (
- 调用
self.on_heartbeat(HEARTBEAT_PROMPT)唤醒 Agent。
- 调用
- 检查 Agent 的回复是否包含
HEARTBEAT_OK,记录日志。
- 检查 Agent 的回复是否包含
设计意图
HeartbeatService 的设计初衷是实现异步的长运行任务或状态维护,而不需要用户显式触发。
场景 1:长任务队列
- 你可以往
HEARTBEAT.md里写入:”处理 data/raw 目录下的所有 PDF 文件”。 - 即使你下线了,Agent 也会每半小时醒来一次,处理一部分文件,直到处理完把
HEARTBEAT.md清空。
- 你可以往
场景 2:自我反思/整理
- 可以在
HEARTBEAT.md里写:”检查 MEMORY.md,如果太乱了就整理一下”。 - Agent 会定期整理自己的记忆,保持“头脑清醒”。
- 可以在
总结
HeartbeatService 是一个基于文件的被动触发器。它通过检测 HEARTBEAT.md 文件的状态来决定是否唤醒 Agent。这是一种非常灵活的机制,允许用户通过简单地修改文件来控制 Agent 的后台行为,而无需编写复杂的 Cron 表达式。
ChannelManager
ChannelManager 是 Nanobot 的外交部长。它统一管理所有外部通信渠道(如 Telegram, WhatsApp, Slack 等),负责它们的生命周期(启动/停止)以及消息的分发。
核心职责
渠道初始化 (
_init_channels)- 在启动时,根据
nanobot.toml配置文件,动态加载并实例化启用的渠道。 - 目前支持的渠道包括:Telegram, WhatsApp, Discord, 飞书, Mochat, 钉钉, Email, Slack, QQ 等。
- 每个渠道都继承自
BaseChannel,拥有统一的接口(start,stop,send)。
- 在启动时,根据
生命周期管理
start_all(): 并发启动所有已启用的渠道。- 每个渠道的
start()方法通常会启动一个轮询循环(Polling)或 Webhook 服务器来接收消息。 - 同时启动一个
_dispatch_outbound任务,负责处理发出去的消息。
- 每个渠道的
stop_all(): 优雅关闭所有渠道,释放资源(如关闭 HTTP 连接)。
消息路由 (Routing)
- 输入路由(隐式): 各个 Channel 实例内部会持有
bus(消息总线)的引用。当它们收到外部消息时,会直接调用bus.publish_inbound(...)将消息扔进总线,供 Agent 消费。ChannelManager不直接干预这一步。 - 输出路由(显式):
ChannelManager启动一个后台任务_dispatch_outbound。- 这个任务不断从总线的输出队列 (
consume_outbound) 中取出消息。 - 它检查消息的
channel属性(例如 “telegram”)。 - 然后在自己的
self.channels字典里找到对应的 Channel 实例,调用channel.send(msg)将消息发出去。
- 输入路由(隐式): 各个 Channel 实例内部会持有
关键代码逻辑
_dispatch_outbound:
这是核心的消息分发循环。这个设计实现了 Agent 与具体通信协议的解耦。Agent 只需要产生一个通用的1
2
3
4
5
6
7
8
9
10while True:
# 1. 从总线获取待发送消息
msg = await bus.consume_outbound()
# 2. 查找目标渠道
channel = self.channels.get(msg.channel)
# 3. 发送
if channel:
await channel.send(msg)OutboundMessage,不需要知道对方是用 HTTP 请求还是 WebSocket 发送,也不需要知道对方的 API Key 是什么。所有这些脏活累活都由ChannelManager委托给具体的 Channel 实现类去处理。
总结
ChannelManager 是连接 Nanobot 内核(Agent)与外部世界(用户)的桥梁。它通过统一的接口屏蔽了不同聊天平台的差异,让 Agent 可以专注于处理逻辑,而不用关心消息是如何传输的。
第二阶段:并发启动 (Startup)
初始化完成后,Gateway 会启动以下四个主要服务:1
2
3
4
5
6
7try:
await cron.start()
await heartbeat.start()
await asyncio.gather(
agent.run(),
channels.start_all(),
)
- 启动定时任务调度器 (
cron.start()):开始计时,等待任务触发。 - 启动心跳服务 (
heartbeat.start()):开始倒计时。 - 启动 Agent 主循环 (
agent.run()):Agent 进入“待机状态”,时刻监听消息总线上的输入消息。 - 启动渠道管理 (
channels.start_all()):- 启动所有启用的渠道(例如开始轮询 Telegram 消息,或监听 Webhook)。
- 启动输出分发器 (
_dispatch_outbound),时刻监听消息总线上的输出消息。
这段代码是 nanobot gateway 启动的核心逻辑,它展示了从初始化后台服务到进入主运行循环的过程。
代码逻辑分步解析
1 | # 1. 启动定时任务服务 (非阻塞) |
为什么要按这个顺序?
这个顺序的设计主要基于任务的性质(是非阻塞初始化,还是长运行死循环):
1. 为什么 cron 和 heartbeat 先行?
- 非阻塞初始化:
cron.start()和heartbeat.start()方法内部仅仅是设置状态标志(_running = True)并创建后台的asyncio.Task(计时器)。它们执行非常快,瞬间就会返回,不会阻塞主程序的执行。 - 就绪原则: 按照依赖关系,最好先让内部的调度系统(定时器、心跳)准备就绪。这样一旦 Agent 开始工作,所有的定时触发机制都已经处于激活状态。
2. 为什么 agent 和 channels 要用 asyncio.gather?
- 阻塞运行:
agent.run()是一个while True循环,它会一直卡在那里监听消息总线,直到程序退出。channels.start_all()也是一个阻塞操作,它会等待所有启用的渠道(Telegram, WhatsApp 等)运行,而这些渠道通常也是通过while True轮询或长连接挂起的。
- 并发必要性:
- 如果写成顺序执行(例如先
await agent.run()),代码就会永远停在这一行,Agent 虽然跑起来了,但channels.start_all()永远不会被执行,导致机器人无法收发消息。 asyncio.gather的作用就是让这两个“死循环”在同一个事件循环中同时跑。Agent 负责“思考”,Channels 负责“听和说”,两者并行不悖。
- 如果写成顺序执行(例如先
总结图解
可以将这个启动过程想象成一家餐厅开门:
cron.start(): 经理先打开闹钟,设置好提醒(比如“中午12点开启特价午餐”)。这一步只是拨个开关,马上就好。heartbeat.start(): 经理开启打卡机,每半小时检查一次员工状态。这一步也只是通个电,马上就好。asyncio.gather(...): 餐厅正式对外营业:agent.run(): 厨师(Agent) 站到灶台前,开始死循环等待订单。channels.start_all(): 服务员(Channels) 站到门口,开始死循环等待顾客。
厨师和服务员必须同时进入工作状态,餐厅才能正常运转。
第三阶段:消息处理循环 (The Loop)
这是 Gateway 运行时的核心逻辑,分为“接收”和“发送”两条路径:
1. 输入路径:从 用户 到 Agent
当你在 Telegram 给机器人发一条消息时:
- 接收:
TelegramChannel收到消息。 - 发布: 渠道将消息封装为
InboundMessage(包含内容、发送者ID、渠道名),扔进消息总线。 - 获取: 正在待机的
AgentLoop从总线中抓取到这条消息。 - 思考与执行:
- Agent 读取历史聊天记录。
- Agent 将历史记录 + 新消息发送给 LLM(大模型)。
- 工具调用: 如果 LLM 决定使用工具(如“搜索天气”),Agent 会执行工具代码,并将结果再次喂给 LLM。
- 生成回复: LLM 生成最终的文本回复。
- 存储: Agent 将对话记录保存到数据库(SessionManager)。
- 输出: Agent 将回复封装为
OutboundMessage,扔回消息总线。
2. 输出路径:从 Agent 到 用户
当 Agent 产生回复(或定时任务触发)时:
- 分发:
ChannelManager的输出分发器从总线中抓取到OutboundMessage。 - 路由: 分发器查看消息的标签(例如
channel="telegram")。 - 发送: 找到对应的
TelegramChannel实例,调用其发送接口(如调用 Telegram API),将消息推送到你的手机上。
总结图解
1 | graph TD |
简而言之,nanobot gateway 就是一个死循环,不断地搬运消息、触发 AI 思考、并把结果送回给用户。