Nanobot —— 渠道系统与 Provider 抽象层
本文档拆解 Nanobot 的渠道系统(Channel)、消息总线(MessageBus)与 LLM Provider 抽象层的设计实现。
1. 消息总线设计
MessageBus 是渠道与 Agent 之间的解耦层,所有消息通过事件队列异步传递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ┌─────────────────────────────────────────────────────┐ │ MessageBus │ │ │ │ 渠道 A (Telegram) ──┐ │ │ 渠道 B (Discord) ──┤──▶ InboundMessage Queue │ │ 渠道 C (CLI) ──┘ │ │ │ ▼ │ │ AgentLoop.process() │ │ │ │ │ ▼ │ │ 渠道 A ◀────────────── OutboundMessage Queue │ │ 渠道 B ◀────────────── │ │ │ │ │ └─────────────────────────────────────────────────────┘
|
1.1 消息数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @dataclass class InboundMessage: channel_id: str chat_id: str user_id: str content: str media: list | None reply_to: str | None
@dataclass class OutboundMessage: channel_id: str chat_id: str content: str media: list | None metadata: dict
|
1.2 解耦的优势
| 场景 |
说明 |
| 渠道独立 |
单个渠道崩溃不影响其他渠道和 Agent 处理 |
| 消息缓冲 |
高频消息自然排队,不会冲垮 Agent |
| 多渠道并发 |
多个渠道同时接入,Agent 顺序处理 |
| 渠道热插拔 |
启停渠道不影响 Agent 核心运行 |
2. BaseChannel 抽象
所有渠道实现继承 BaseChannel,提供统一的消息收发接口。
2.1 BaseChannel 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class BaseChannel(ABC): """渠道基类,定义统一的消息接口"""
channel_id: str bus: MessageBus
@abstractmethod async def start(self) -> None: """启动渠道监听(轮询或 Webhook)"""
@abstractmethod async def stop(self) -> None: """停止渠道"""
@abstractmethod async def send_message(self, msg: OutboundMessage) -> None: """发送消息到渠道"""
async def transcribe_audio(self, audio: bytes) -> str: """音频转录(Whisper),子类按需调用"""
|
2.2 渠道实现示例(Telegram)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class TelegramChannel(BaseChannel): channel_id = "telegram"
async def start(self): app = Application.builder().token(self.config.token).build() app.add_handler(MessageHandler(filters.ALL, self._on_message)) await app.run_polling()
async def _on_message(self, update: Update, ctx: Context): msg = InboundMessage( channel_id="telegram", chat_id=str(update.effective_chat.id), user_id=str(update.effective_user.id), content=update.message.text or "", media=self._extract_media(update.message), ) await self.bus.put(msg)
async def send_message(self, msg: OutboundMessage): await self.app.bot.send_message( chat_id=msg.chat_id, text=msg.content, parse_mode="Markdown", )
|
3. 渠道支持矩阵
Nanobot 内置 15+ 渠道适配,覆盖主流即时通讯和协作平台:
| 分类 |
渠道 |
特性支持 |
| 即时通讯 |
Telegram |
文本 / 图片 / 文件 / 音频转录 / 按钮 |
|
Discord |
文本 / 附件 / Slash Command |
|
Slack |
文本 / 附件 / Block Kit |
|
WhatsApp |
文本 / 图片 / 音频 |
|
Matrix |
端对端加密消息 |
| 国内平台 |
飞书(Lark) |
文本 / 卡片消息 / At 消息 |
|
钉钉 |
文本 / Markdown / 卡片 |
|
企业微信 |
文本 / 文件 / 应用消息 |
|
微信(WeCom) |
公众号 / 企业号 |
|
QQ |
文本 / 图片(Bot API) |
| 开发工具 |
CLI |
终端交互,本地调试 |
|
WebSocket |
自定义前端接入 |
| 其他 |
Email |
邮件收发(IMAP/SMTP) |
|
MSTeams |
Microsoft Teams 集成 |
|
MoChat |
模拟 Chat(测试用) |
3.1 音频转录支持
支持音频消息的渠道(Telegram、WhatsApp)内置 Whisper 转录:
1 2 3 4 5 6 7 8 9 10 11 12
| async def transcribe_audio(self, audio: bytes) -> str: """使用 OpenAI Whisper 或 Groq 转录""" provider = self.config.transcription_provider if provider == "openai": result = await openai_client.audio.transcriptions.create( model="whisper-1", file=audio, language="zh" ) elif provider == "groq": result = await groq_client.audio.transcriptions.create( model="whisper-large-v3", file=audio ) return result.text
|
4. LLM Provider 抽象层
Provider 层将所有 LLM 的调用接口统一,Agent 核心无需感知底层模型差异。
4.1 LLMProvider 基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class LLMProvider(ABC): """统一 LLM 接口"""
@abstractmethod async def call_model( self, messages: list[dict], system: str, tools: list[dict], model: str, max_tokens: int, stream: bool = False, ) -> LLMResponse: """调用 LLM,返回统一响应格式"""
@abstractmethod def extract_tool_calls( self, response: LLMResponse ) -> list[ToolCallRequest]: """从响应中提取工具调用"""
@abstractmethod def extract_text(self, response: LLMResponse) -> str: """从响应中提取文本内容"""
|
4.2 统一响应格式
1 2 3 4 5 6 7 8 9 10 11 12
| @dataclass class LLMResponse: content: list stop_reason: str usage: TokenUsage model: str
@dataclass class ToolCallRequest: id: str name: str input: dict
|
4.3 各 Provider 实现对比
| Provider |
工具调用格式 |
系统提示位置 |
消息格式 |
| Anthropic |
tool_use block |
system 参数 |
标准 role/content |
| OpenAI |
function / tool_calls |
system role 消息 |
标准 role/content |
| Azure OpenAI |
同 OpenAI |
同 OpenAI |
同 OpenAI |
| GitHub Copilot |
同 OpenAI |
同 OpenAI |
同 OpenAI |
| OpenAI Compat |
同 OpenAI |
同 OpenAI |
各厂商可能有差异 |
4.4 Anthropic Provider 适配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class AnthropicProvider(LLMProvider):
async def call_model(self, messages, system, tools, model, max_tokens, stream): response = await self.client.messages.create( model=model, system=system, messages=messages, tools=tools, max_tokens=max_tokens, stream=stream, ) return LLMResponse( content=response.content, stop_reason=response.stop_reason, usage=TokenUsage( input=response.usage.input_tokens, output=response.usage.output_tokens, ), model=response.model, )
def extract_tool_calls(self, response) -> list[ToolCallRequest]: return [ ToolCallRequest(id=block.id, name=block.name, input=block.input) for block in response.content if block.type == "tool_use" ]
|
4.5 OpenAI 兼容 Provider 适配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class OpenAICompatProvider(LLMProvider):
async def call_model(self, messages, system, tools, model, ...): full_messages = [{"role": "system", "content": system}] + messages
openai_tools = self._convert_tools(tools)
response = await self.client.chat.completions.create( model=model, messages=full_messages, tools=openai_tools, ... ) return self._to_llm_response(response)
|
5. Provider 工厂与注册
5.1 工厂模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class ProviderFactory: """根据配置创建对应 Provider 实例"""
_registry: dict[str, type[LLMProvider]] = {}
@classmethod def register(cls, name: str, provider_cls: type): cls._registry[name] = provider_cls
@classmethod def create(cls, config: ProvidersConfig) -> LLMProvider: primary = config.primary provider_cls = cls._registry[primary] return provider_cls(config)
ProviderFactory.register("anthropic", AnthropicProvider) ProviderFactory.register("openai", OpenAICompatProvider) ProviderFactory.register("azure_openai", AzureOpenAIProvider) ProviderFactory.register("github_copilot", GitHubCopilotProvider)
|
5.2 Provider 切换
1 2 3 4 5 6 7 8 9 10 11
| { "providers": { "primary": "openai", "openai": { "base_url": "https://api.deepseek.com", "api_key": "sk-...", "default_model": "deepseek-chat" } } }
|
5.3 支持的 Provider
| Provider |
base_url 配置 |
代表模型 |
| Anthropic |
官方 API |
claude-sonnet-4-6 |
| OpenAI |
官方 API |
gpt-4o |
| Azure OpenAI |
Azure 端点 |
gpt-4o(Azure 部署) |
| GitHub Copilot |
GitHub API |
copilot-chat |
| DeepSeek |
api.deepseek.com |
deepseek-chat |
| 本地(Ollama) |
localhost:11434 |
llama3 / qwen |
| 30+ 其他 |
OpenAI 兼容 |
各厂商模型 |