AI-Native架构:大模型服务设计与实现

AI-Native架构:大模型服务设计与实现

概述

在AI时代,将大语言模型能力深度集成到应用平台已成为趋势。本文将介绍如何在微服务平台中设计和实现AI-Native的大模型服务(LLM Service),涵盖多频道管理、流式响应、Agent执行等核心能力。

什么是AI-Native架构

AI-Native架构是指从设计之初就围绕AI能力构建的系统架构,而非后期添加AI功能。关键特征包括:

  • 频道化架构:支持多租户、多场景的LLM配置隔离
  • 流式响应:实时输出AI生成的内容
  • Agent框架:支持多步骤任务执行
  • 技能系统(SKills):可扩展的领域专家能力
  • MCP集成:标准化工具调用协议

LLM服务架构设计

服务结构

llm.api/
├── app.js                     # Fastify应用入口
├── routes/
│   ├── chat/root.js          # 聊天API路由
│   ├── agent/root.js         # Agent任务路由
│   └── channel/root.js       # 频道管理路由
├── services/
│   ├── agent/                # Agent服务
│   ├── core/                 # 核心服务
│   └── openai/               # OpenAI集成
├── skill/                    # Skills系统
│   ├── skillManager.js       # Skills管理器
│   ├── skillChat.js          # 聊天集成
│   ├── skillParser.js        # SKILL.md解析器
│   └── skills/               # 内置Skills
│       ├── code-reviewer/    # 代码审查专家
│       ├── document-extractor/ # 文档提取专家
│       └── ...
├── mcp/                      # MCP工具管理
│   ├── clientManager.js      # MCP客户端管理
│   ├── toolManager.js        # 工具调用管理
│   └── sseClient.js          # SSE客户端
└── grpc/                     # gRPC服务
    ├── clients/              # gRPC客户端
    └── servers/              # gRPC服务端

核心数据模型

// 频道(Channel)模型 - 隔离不同场景的LLM配置
{
  channelCode: "default",           // 频道标识
  channelType: "chat",              // 频道类型:chat/agent
  model: "gpt-4",                   // 使用的模型
  temperature: 0.7,                 // 温度参数
  maxTokens: 4096,                  // 最大Token数
  systemPrompt: "You are a helpful assistant.",
  skills: ["code-reviewer"],        // 启用的Skills
  mcpServers: ["file-system"],      // MCP服务配置
  streamMode: true                  // 是否流式输出
}

// 消息(Message)模型
{
  messageId: "msg-uuid",
  channelCode: "default",
  role: "user",                     // user/assistant/system
  content: "Hello!",
  metadata: {
    tokens: 10,
    model: "gpt-4",
    finishReason: "stop"
  }
}

多频道会话管理

频道管理实现

// services/core/channelService.js
class ChannelService {
  constructor(fastify) {
    this.fastify = fastify
    this.channels = new Map() // 内存缓存
  }

  // 创建或获取频道
  async getOrCreateChannel(channelCode, userId) {
    const cacheKey = `${userId}:${channelCode}`
    
    if (this.channels.has(cacheKey)) {
      return this.channels.get(cacheKey)
    }
    
    // 从数据库加载频道配置
    const channel = await this.fastify.mongo.db
      .collection('channels')
      .findOne({ channelCode, userId })
    
    if (!channel) {
      // 使用默认配置创建新频道
      return this.createDefaultChannel(userId, channelCode)
    }
    
    this.channels.set(cacheKey, channel)
    return channel
  }

  // 获取频道配置(包含模型参数)
  async getChannelConfig(channelCode) {
    const channel = await this.getChannel(channelCode)
    return {
      model: channel.model,
      temperature: channel.temperature,
      maxTokens: channel.maxTokens,
      systemPrompt: channel.systemPrompt
    }
  }
}

会话上下文管理

// services/core/conversationService.js
class ConversationService {
  // 获取会话历史
  async getConversationHistory(channelCode, limit = 20) {
    const messages = await this.fastify.mongo.db
      .collection('messages')
      .find({ channelCode })
      .sort({ createdAt: -1 })
      .limit(limit)
      .toArray()
    
    return messages.reverse().map(msg => ({
      role: msg.role,
      content: msg.content
    }))
  }

  // 保存消息
  async saveMessage(channelCode, role, content, metadata = {}) {
    const message = {
      channelCode,
      role,
      content,
      metadata,
      createdAt: new Date()
    }
    
    await this.fastify.mongo.db
      .collection('messages')
      .insertOne(message)
    
    return message
  }
}

流式响应(SSE)实现

SSE服务端实现

// routes/chat/root.js
export default async function chatRoutes(fastify, opts) {
  
  // 流式聊天接口
  fastify.post('/stream', async (request, reply) => {
    const { channelCode, message } = request.body
    
    // 设置SSE响应头
    reply.raw.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'X-Accel-Buffering': 'no'  // 禁用Nginx缓冲
    })
    
    try {
      // 获取频道配置
      const channel = await fastify.channelService
        .getOrCreateChannel(channelCode, request.user.userId)
      
      // 获取历史消息
      const history = await fastify.conversationService
        .getConversationHistory(channelCode)
      
      // 构建消息列表
      const messages = [
        { role: 'system', content: channel.systemPrompt },
        ...history,
        { role: 'user', content: message }
      ]
      
      // 调用LLM流式API
      const stream = await fastify.openai.chat.completions.create({
        model: channel.model,
        messages,
        temperature: channel.temperature,
        max_tokens: channel.maxTokens,
        stream: true
      })
      
      let fullResponse = ''
      
      // 流式输出
      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content || ''
        fullResponse += content
        
        // 发送SSE事件
        reply.raw.write(`data: ${JSON.stringify({
          type: 'delta',
          content,
          finishReason: chunk.choices[0]?.finish_reason
        })}\n\n`)
      }
      
      // 发送完成事件
      reply.raw.write(`data: ${JSON.stringify({
        type: 'done',
        fullResponse
      })}\n\n`)
      
      // 保存助手回复
      await fastify.conversationService.saveMessage(
        channelCode,
        'assistant',
        fullResponse
      )
      
    } catch (error) {
      // 发送错误事件
      reply.raw.write(`data: ${JSON.stringify({
        type: 'error',
        message: error.message
      })}\n\n`)
    }
    
    reply.raw.end()
  })
}

客户端消费示例

// 前端使用EventSource消费SSE
const eventSource = new EventSource('/api/llm/chat/stream', {
  method: 'POST',
  body: JSON.stringify({
    channelCode: 'default',
    message: 'Hello, AI!'
  })
})

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data)
  
  switch (data.type) {
    case 'delta':
      // 追加内容到UI
      appendMessageContent(data.content)
      break
    case 'done':
      // 完成处理
      eventSource.close()
      break
    case 'error':
      // 错误处理
      console.error(data.message)
      eventSource.close()
      break
  }
}

Agent任务执行框架

Agent任务执行框架

Agent任务模型

// Agent任务结构
{
  agentCode: "uuid",
  title: "生成课件大纲",
  agentType: "sequential",    // sequential/parallel
  status: "running",          // pending/running/completed/failed
  subAgents: [
    {
      step: 1,
      title: "生成大纲",
      agentType: "llm",
      status: "completed",
      output: {...}
    },
    {
      step: 2,
      title: "生成内容",
      agentType: "llm",
      status: "running",
      input: {...}
    }
  ],
  createdAt: Date,
  completedAt: Date
}

Agent执行器

// services/agent/index.js
class AgentExecutor {
  constructor(fastify) {
    this.fastify = fastify
    this.handlers = new Map()
    this.registerHandlers()
  }

  registerHandlers() {
    this.handlers.set('llm', new LLMHandler(this.fastify))
    this.handlers.set('grpc', new GRPCHandler(this.fastify))
    this.handlers.set('tool', new ToolHandler(this.fastify))
  }

  // 执行任务
  async execute(agentTask) {
    const { subAgents } = agentTask
    
    for (const subAgent of subAgents) {
      const handler = this.handlers.get(subAgent.agentType)
      if (!handler) {
        throw new Error(`Unknown agent type: ${subAgent.agentType}`)
      }
      
      try {
        subAgent.status = 'running'
        await this.updateAgentStatus(agentTask.agentCode, subAgent)
        
        // 执行子任务
        const result = await handler.execute(subAgent)
        
        subAgent.status = 'completed'
        subAgent.output = result
        subAgent.completedAt = new Date()
        
      } catch (error) {
        subAgent.status = 'failed'
        subAgent.error = error.message
        throw error
      }
      
      await this.updateAgentStatus(agentTask.agentCode, subAgent)
    }
    
    return agentTask
  }
}

// LLM处理器
class LLMHandler {
  async execute(subAgent) {
    const { prompt, channelCode } = subAgent.input
    
    const response = await this.fastify.openai.chat.completions.create({
      model: 'gpt-4',
      messages: [{ role: 'user', content: prompt }]
    })
    
    return response.choices[0].message
  }
}

Skills技能系统

Skills系统允许通过声明式配置扩展AI的能力:

// skill/skillManager.js
class SkillManager {
  constructor() {
    this.skills = new Map()
    this.loadBuiltInSkills()
  }

  // 加载内置Skills
  loadBuiltInSkills() {
    const skillDirs = fs.readdirSync('./skill/skills')
    
    for (const dir of skillDirs) {
      const skillPath = `./skill/skills/${dir}/SKILL.md`
      if (fs.existsSync(skillPath)) {
        const skill = this.parseSkillFile(skillPath)
        this.skills.set(skill.code, skill)
      }
    }
  }

  // 解析SKILL.md
  parseSkillFile(filePath) {
    const content = fs.readFileSync(filePath, 'utf-8')
    // 解析Markdown格式的Skill定义
    return this.skillParser.parse(content)
  }

  // 获取频道启用的Skills
  getChannelSkills(channelCode) {
    const channel = this.channels.get(channelCode)
    return channel.skills.map(code => this.skills.get(code))
  }
}

SKILL.md示例

# 代码审查专家 (code-reviewer)

## 描述
专业的代码审查专家,擅长发现代码中的潜在问题、性能瓶颈和安全漏洞。

## 能力
- 代码质量分析
- 性能优化建议
- 安全漏洞检测
- 最佳实践推荐

## 系统提示
你是一位资深代码审查专家。请分析用户提供的代码,从以下维度给出建议:
1. 代码质量和可读性
2. 潜在的错误和异常处理
3. 性能优化机会
4. 安全漏洞
5. 设计模式应用

请用中文回复,使用Markdown格式组织内容。

MCP工具集成

MCP (Model Context Protocol) 提供标准化的工具调用机制:

// mcp/toolManager.js
class ToolManager {
  constructor(fastify) {
    this.fastify = fastify
    this.tools = new Map()
    this.clients = new Map()
  }

  // 注册工具
  registerTool(tool) {
    this.tools.set(tool.name, tool)
  }

  // 执行工具调用
  async executeToolCall(toolCall) {
    const { name, arguments: args } = toolCall
    const tool = this.tools.get(name)
    
    if (!tool) {
      throw new Error(`Tool not found: ${name}`)
    }
    
    return await tool.execute(args)
  }

  // 获取可用工具列表(用于LLM function calling)
  getAvailableTools() {
    return Array.from(this.tools.values()).map(tool => ({
      type: 'function',
      function: {
        name: tool.name,
        description: tool.description,
        parameters: tool.parameters
      }
    }))
  }
}

性能优化

1. 连接池管理

// 复用OpenAI客户端连接
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
  maxRetries: 3,
  timeout: 30000
})

2. 响应缓存

// 缓存常见查询结果
const cacheKey = `llm:${hashPrompt(prompt)}`
const cached = await redis.get(cacheKey)

if (cached) {
  return JSON.parse(cached)
}

const response = await openai.chat.completions.create({...})
await redis.setex(cacheKey, 3600, JSON.stringify(response))

3. 限流控制

// 基于Token Bucket的限流
const rateLimiter = new RateLimiter({
  tokensPerInterval: 100,
  interval: 'minute'
})

fastify.addHook('onRequest', async (request, reply) => {
  const allowed = await rateLimiter.tryRemoveTokens(1)
  if (!allowed) {
    reply.code(429).send({ error: 'Rate limit exceeded' })
  }
})

总结

AI-Native的LLM服务设计需要关注以下关键点:

  1. 频道隔离:多租户场景下的配置隔离和权限控制
  2. 流式响应:SSE实现实时交互体验
  3. Agent框架:支持复杂任务的多步骤编排
  4. Skills扩展:声明式能力扩展机制
  5. MCP标准化:统一工具调用协议
  6. 性能优化:连接池、缓存、限流等策略

下一篇将介绍RAG知识库系统的设计与实现。

阅读更多

Skills系统:可扩展AI能力设计

Skills系统:可扩展AI能力设计

概述 Skills系统是AI-Native架构中的重要组件,它允许通过声明式配置扩展AI的能力。本文将介绍Skills系统的设计与实现,让大模型能够像人类专家一样具备特定领域的能力。 什么是Skills系统 概念 Skills(技能)是一种声明式的AI能力扩展机制,类似于人类的"专业技能": 通用AI助手 专业AI助手(带Skills) ┌──────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ 用户:请帮我写代码 │ │ 用户:请帮我审查这段代码 │ │ │ │ │ │ AI:我是一个AI助手 │ │ AI:[激活

By 菱角
插件化架构设计模式

插件化架构设计模式

概述 插件化架构是一种将核心功能与扩展功能分离的设计模式,允许系统在运行时动态加载和卸载功能模块。本文将介绍如何在微服务平台中设计和实现插件化架构。 为什么需要插件化 插件化优势 1. 模块化:功能独立,边界清晰 2. 可扩展:按需加载,动态增删 3. 隔离性:插件间互不干扰 4. 可维护:独立开发、测试、部署 5. 可定制:用户按需选择功能 核心设计 架构概览 核心组件实现 1. 插件接口定义 // core/plugin.interface.ts // 插件接口 export interface IPlugin { // 插件名称 readonly name: string // 插件版本 readonly version: string // 插件配置 getConfig(): PluginConfig // 插件清单

By 菱角
gRPC服务通信设计与实践

gRPC服务通信设计与实践

概述 在微服务架构中,服务间通信是关键环节。相比REST API,gRPC提供了更高的性能和更强的类型安全。本文将介绍如何在微服务平台中设计和实现gRPC服务通信。 为什么选择gRPC gRPC vs REST对比 特性 gRPC REST 协议 HTTP/2 HTTP/1.1 序列化 Protocol Buffers (二进制) JSON (文本) 性能 高(二进制+压缩) 中(文本开销) 类型安全 强(代码生成) 弱(运行时检查) 流式通信 原生支持(双向流) 需额外实现(SSE/WebSocket) 代码生成 自动生成 手动编写 浏览器支持 需gRPC-Web 原生支持 调试难度

By 菱角
多语言微服务架构:Node.js与Python协作

多语言微服务架构:Node.js与Python协作

概述 在微服务架构中,根据场景选择最适合的编程语言是最佳实践。本文将介绍如何在微服务平台中实现Node.js与Python的协作,发挥各自技术优势。 技术选型策略 为什么混合使用 服务划分 Node.js服务(7个) 服务 功能 选择Node.js的原因 llm.api 大模型服务 高并发SSE流式响应 ucenter.api 用户中心 RESTful API标准实践 doc.api 文件服务 流式上传下载处理 resource.api 资源管理 gRPC高性能通信 rag.api 知识库服务 MongoDB集成便利 statistic.api 统计分析 事件驱动架构 pptonline.api PPT服务 与前端技术栈统一 Python服务(1个) 服务 功能 选择Python的原因

By 菱角