多语言微服务架构:Node.js与Python协作

多语言微服务架构:Node.js与Python协作

概述

在微服务架构中,根据场景选择最适合的编程语言是最佳实践。本文将介绍如何在微服务平台中实现Node.js与Python的协作,发挥各自技术优势。

技术选型策略

为什么混合使用

技术选型对比

服务划分

Node.js服务(7个)

服务 功能 选择Node.js的原因
llm.api 大模型服务 高并发SSE流式响应
ucenter.api 用户中心 RESTful API标准实践
doc.api 文件服务 流式上传下载处理
resource.api 资源管理 gRPC高性能通信
rag.api 知识库服务 MongoDB集成便利
statistic.api 统计分析 事件驱动架构
pptonline.api PPT服务 与前端技术栈统一

Python服务(1个)

服务 功能 选择Python的原因
transform.api 文档转换 PDF/Excel处理生态丰富

transform.api详解

Python技术栈

技术 版本 用途
Python 3.12 运行环境
FastAPI 0.115.12 Web框架
Uvicorn 0.34.1 ASGI服务器
pypdfium2 4.30.1 PDF处理
spacy 3.8.7 NLP处理
pandas 2.3.0 数据处理
openpyxl 3.1.5 Excel处理
xinference-client 1.14.0 OCR模型

项目结构

transform.api/
├── main.py                     # FastAPI应用入口
├── pyproject.toml              # Poetry配置
├── uv.lock                     # 依赖锁定
├── Dockerfile                  # 容器镜像
├── services/
│   ├── __init__.py
│   ├── pdf.py                  # PDF处理服务
│   ├── ocr.py                  # OCR识别服务
│   ├── excel.py                # Excel处理服务
│   ├── word.py                 # Word处理服务
│   ├── chunker.py              # 文本分段服务
│   └── nlp.py                  # NLP分析服务
├── routers/
│   ├── __init__.py
│   ├── pdf.py                  # PDF路由
│   ├── document.py             # 通用文档路由
│   └── health.py               # 健康检查
├── models/
│   ├── __init__.py
│   ├── requests.py             # 请求模型
│   └── responses.py            # 响应模型
└── utils/
    ├── __init__.py
    └── file_storage.py         # 文件存储工具

FastAPI应用配置

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from routers import pdf, document, health
from services.pdf import PDFService
from services.ocr import OCRService

# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化
    app.state.pdf_service = PDFService()
    app.state.ocr_service = OCRService()
    await app.state.pdf_service.initialize()
    await app.state.ocr_service.initialize()
    
    print("Transform API started")
    yield
    
    # 关闭时清理
    await app.state.pdf_service.cleanup()
    await app.state.ocr_service.cleanup()
    print("Transform API stopped")

app = FastAPI(
    title="Transform API",
    description="Document transformation service",
    version="1.0.0",
    lifespan=lifespan
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由注册
app.include_router(health.router, prefix="/health")
app.include_router(pdf.router, prefix="/pdf")
app.include_router(document.router, prefix="/doc")

@app.get("/")
async def root():
    return {"message": "Transform API", "version": "1.0.0"}

PDF处理服务

PDF文本提取

# services/pdf.py
import pypdfium2 as pdfium
from typing import List, Dict, Optional
import asyncio

class PDFService:
    def __init__(self):
        self.ocr_service = None
    
    async def initialize(self):
        from services.ocr import OCRService
        self.ocr_service = OCRService()
        await self.ocr_service.initialize()
    
    async def extract_text(
        self, 
        file_path: str,
        ocr_enabled: bool = True,
        language: str = 'zh'
    ) -> Dict:
        """提取PDF文本内容"""
        
        # 在线程池中执行CPU密集型操作
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, 
            self._extract_text_sync,
            file_path,
            ocr_enabled,
            language
        )
    
    def _extract_text_sync(
        self,
        file_path: str,
        ocr_enabled: bool,
        language: str
    ) -> Dict:
        """同步执行PDF提取(在线程池中)"""
        
        pdf = pdfium.PdfDocument(file_path)
        
        pages = []
        full_text = []
        
        for page_num in range(len(pdf)):
            page = pdf[page_num]
            
            # 提取页面文本
            text_page = page.get_textpage()
            text = text_page.get_text_range()
            
            # 如果文本为空且启用OCR,进行图像识别
            if not text.strip() and ocr_enabled:
                text = self._ocr_page(page, language)
            
            pages.append({
                'pageNum': page_num + 1,
                'text': text,
                'charCount': len(text),
                'hasText': bool(text.strip())
            })
            
            full_text.append(text)
        
        return {
            'totalPages': len(pdf),
            'pages': pages,
            'fullText': '\n\n'.join(full_text),
            'charCount': sum(len(t) for t in full_text)
        }
    
    def _ocr_page(self, page, language: str) -> str:
        """对页面进行OCR识别"""
        # 渲染页面为图片
        bitmap = page.render(
            scale=2.0,
            rotation=0,
        )
        
        # 保存临时图片
        import tempfile
        with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
            bitmap.save_as_png(f.name)
            
            # 调用OCR服务
            text = asyncio.run(self.ocr_service.recognize(f.name, language))
            
            import os
            os.unlink(f.name)
            
            return text
    
    async def extract_metadata(self, file_path: str) -> Dict:
        """提取PDF元数据"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, self._extract_metadata_sync, file_path
        )
    
    def _extract_metadata_sync(self, file_path: str) -> Dict:
        pdf = pdfium.PdfDocument(file_path)
        
        metadata = {
            'pageCount': len(pdf),
            'version': pdf.get_version(),
            'metadata': {},
            'isTagged': pdf.is_tagged(),
        }
        
        # 提取文档元数据
        doc_metadata = pdf.get_metadata_dict()
        if doc_metadata:
            metadata['metadata'] = {
                'title': doc_metadata.get('Title', ''),
                'author': doc_metadata.get('Author', ''),
                'subject': doc_metadata.get('Subject', ''),
                'keywords': doc_metadata.get('Keywords', ''),
                'creator': doc_metadata.get('Creator', ''),
                'producer': doc_metadata.get('Producer', ''),
                'creationDate': doc_metadata.get('CreationDate', ''),
                'modDate': doc_metadata.get('ModDate', ''),
            }
        
        return metadata

OCR服务

# services/ocr.py
from xinference_client import Client as XinferenceClient
import openai
import base64

class OCRService:
    def __init__(self):
        self.xinference = None
        self.openai = None
    
    async def initialize(self):
        # 初始化Xinference客户端
        self.xinference = XinferenceClient(
            base_url="http://xinference-server:9997"
        )
        
        # 初始化OpenAI客户端(用于多模态识别)
        import os
        self.openai = openai.AsyncOpenAI(
            api_key=os.getenv("OPENAI_API_KEY")
        )
    
    async def recognize(self, image_path: str, language: str = 'zh') -> str:
        """识别图片中的文字"""
        
        try:
            # 优先使用Xinference OCR模型
            model = self.xinference.get_model("qwen2-vl-ocr")
            response = model.chat({
                'messages': [{
                    'role': 'user',
                    'content': [
                        {'type': 'image', 'image': image_path},
                        {'type': 'text', 'text': '提取图片中的所有文字'}
                    ]
                }]
            })
            
            return response['choices'][0]['message']['content']
            
        except Exception as e:
            print(f"Xinference OCR failed: {e}, fallback to OpenAI")
            return await self._recognize_with_openai(image_path, language)
    
    async def _recognize_with_openai(self, image_path: str, language: str) -> str:
        """使用OpenAI多模态模型识别"""
        
        with open(image_path, 'rb') as f:
            image_base64 = base64.b64encode(f.read()).decode()
        
        response = await self.openai.chat.completions.create(
            model="gpt-4-vision-preview",
            messages=[{
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"Extract all text from this image. Language: {language}"
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{image_base64}"
                        }
                    }
                ]
            }]
        )
        
        return response.choices[0].message.content

文本处理服务

智能分段

# services/chunker.py
import spacy
from typing import List, Dict, Optional
import re

class TextChunker:
    def __init__(self):
        self.nlp_zh = None
        self.nlp_en = None
    
    async def initialize(self):
        # 加载多语言NLP模型
        self.nlp_zh = spacy.load("zh_core_web_sm")
        self.nlp_en = spacy.load("en_core_web_sm")
    
    def chunk_text(
        self,
        text: str,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        language: str = 'zh'
    ) -> List[Dict]:
        """智能文本分段"""
        
        # 选择语言模型
        nlp = self.nlp_zh if language == 'zh' else self.nlp_en
        
        # 预处理文本
        text = self._preprocess(text)
        
        # 语义分段
        doc = nlp(text)
        
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sent in doc.sents:
            sent_text = sent.text.strip()
            sent_length = len(sent_text)
            
            # 检查是否超过限制
            if current_size + sent_length > chunk_size and current_chunk:
                # 保存当前块
                chunk_text = ' '.join(current_chunk)
                chunks.append({
                    'text': chunk_text,
                    'tokenCount': len(chunk_text),
                    'sentenceCount': len(current_chunk),
                    'startChar': sum(len(s) for s in chunks[:-1]) if chunks else 0,
                    'endChar': sum(len(s) for s in chunks[:-1]) + len(chunk_text) if chunks else len(chunk_text),
                })
                
                # 保留重叠部分
                overlap_sentences = self._get_overlap_sentences(
                    current_chunk, chunk_overlap
                )
                current_chunk = overlap_sentences + [sent_text]
                current_size = sum(len(s) for s in current_chunk)
            else:
                current_chunk.append(sent_text)
                current_size += sent_length
        
        # 处理最后一个块
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append({
                'text': chunk_text,
                'tokenCount': len(chunk_text),
                'sentenceCount': len(current_chunk),
                'startChar': sum(c['tokenCount'] for c in chunks),
                'endChar': sum(c['tokenCount'] for c in chunks) + len(chunk_text),
            })
        
        return chunks
    
    def _preprocess(self, text: str) -> str:
        """文本预处理"""
        # 去除多余空白
        text = re.sub(r'\s+', ' ', text)
        # 去除页眉页脚标记
        text = re.sub(r'\n\s*\d+\s*\n', '\n', text)
        return text.strip()
    
    def _get_overlap_sentences(self, sentences: List[str], overlap_size: int) -> List[str]:
        """获取重叠的句子"""
        overlap = []
        total_size = 0
        
        for sent in reversed(sentences):
            if total_size + len(sent) <= overlap_size:
                overlap.insert(0, sent)
                total_size += len(sent)
            else:
                break
        
        return overlap

NLP分析

# services/nlp.py
import spacy
from typing import List, Dict
from collections import Counter

class NLPService:
    def __init__(self):
        self.nlp_zh = None
        self.nlp_en = None
    
    async def initialize(self):
        self.nlp_zh = spacy.load("zh_core_web_sm")
        self.nlp_en = spacy.load("en_core_web_sm")
    
    def analyze(self, text: str, language: str = 'zh') -> Dict:
        """文本分析"""
        
        nlp = self.nlp_zh if language == 'zh' else self.nlp_en
        doc = nlp(text)
        
        # 命名实体识别
        entities = [
            {
                'text': ent.text,
                'label': ent.label_,
                'start': ent.start_char,
                'end': ent.end_char,
            }
            for ent in doc.ents
        ]
        
        # 关键词提取(基于词频)
        tokens = [
            token.text.lower()
            for token in doc
            if not token.is_stop and not token.is_punct and len(token.text) > 1
        ]
        keywords = [
            {'word': word, 'count': count}
            for word, count in Counter(tokens).most_common(10)
        ]
        
        # 句子情感分析(简单基于关键词)
        sentiment = self._analyze_sentiment(doc)
        
        return {
            'entities': entities,
            'keywords': keywords,
            'sentiment': sentiment,
            'stats': {
                'wordCount': len(doc),
                'sentenceCount': len(list(doc.sents)),
                'entityCount': len(entities),
            }
        }
    
    def _analyze_sentiment(self, doc) -> Dict:
        """简单情感分析"""
        # 基于正面/负面词库
        positive_words = {'好', '优秀', '成功', '喜欢', '推荐'}
        negative_words = {'差', '失败', '讨厌', '糟糕', '问题'}
        
        text = doc.text
        pos_count = sum(1 for word in positive_words if word in text)
        neg_count = sum(1 for word in negative_words if word in text)
        
        if pos_count > neg_count:
            return {'label': 'positive', 'score': min(pos_count / (pos_count + neg_count + 1), 1.0)}
        elif neg_count > pos_count:
            return {'label': 'negative', 'score': min(neg_count / (pos_count + neg_count + 1), 1.0)}
        else:
            return {'label': 'neutral', 'score': 0.5}

API路由

PDF处理路由

# routers/pdf.py
from fastapi import APIRouter, File, UploadFile, HTTPException
from typing import Optional
import tempfile
import os

from services.pdf import PDFService
from models.requests import PDFExtractRequest
from models.responses import PDFExtractResponse

router = APIRouter()

@router.post("/extract", response_model=PDFExtractResponse)
async def extract_pdf(
    file: UploadFile = File(...),
    ocr_enabled: bool = True,
    language: str = 'zh',
    extract_metadata: bool = True
):
    """提取PDF内容"""
    
    # 验证文件类型
    if not file.filename.endswith('.pdf'):
        raise HTTPException(status_code=400, detail="Only PDF files are allowed")
    
    # 保存上传文件
    with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name
    
    try:
        from main import app
        pdf_service: PDFService = app.state.pdf_service
        
        # 提取文本
        result = await pdf_service.extract_text(
            tmp_path,
            ocr_enabled=ocr_enabled,
            language=language
        )
        
        # 提取元数据
        metadata = None
        if extract_metadata:
            metadata = await pdf_service.extract_metadata(tmp_path)
        
        return PDFExtractResponse(
            success=True,
            data={
                'filename': file.filename,
                'pageCount': result['totalPages'],
                'pages': result['pages'],
                'fullText': result['fullText'],
                'charCount': result['charCount'],
                'metadata': metadata
            }
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
        
    finally:
        os.unlink(tmp_path)

@router.post("/chunk")
async def chunk_pdf(
    file: UploadFile = File(...),
    chunk_size: int = 500,
    chunk_overlap: int = 50,
    language: str = 'zh'
):
    """PDF分段"""
    
    from services.chunker import TextChunker
    
    # 提取文本
    with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name
    
    try:
        from main import app
        pdf_service = app.state.pdf_service
        
        result = await pdf_service.extract_text(tmp_path, language=language)
        
        # 分段
        chunker = TextChunker()
        await chunker.initialize()
        
        chunks = chunker.chunk_text(
            result['fullText'],
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            language=language
        )
        
        return {
            'success': True,
            'data': {
                'chunkCount': len(chunks),
                'chunks': chunks
            }
        }
        
    finally:
        os.unlink(tmp_path)

Node.js调用Python服务

gRPC客户端

// micro-platform/llm.api/grpc/clients/extractor.js
import grpc from '@grpc/grpc-js'
import protoLoader from '@grpc/proto-loader'

const PROTO_PATH = './grpc/proto/extractor.proto'

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true,
})

const extractorProto = grpc.loadPackageDefinition(packageDefinition).extractor

export class ExtractorClient {
  constructor(serviceUrl) {
    this.client = new extractorProto.ExtractorService(
      serviceUrl,
      grpc.credentials.createInsecure()
    )
  }

  // 提取PDF文本
  async extractPDF(filePath, options = {}) {
    return new Promise((resolve, reject) => {
      this.client.extractPDF({
        filePath,
        ocrEnabled: options.ocrEnabled ?? true,
        language: options.language ?? 'zh',
      }, (error, response) => {
        if (error) reject(error)
        else resolve(response)
      })
    })
  }

  // 分段文本
  async chunkText(text, options = {}) {
    return new Promise((resolve, reject) => {
      this.client.chunkText({
        text,
        chunkSize: options.chunkSize ?? 500,
        chunkOverlap: options.chunkOverlap ?? 50,
        language: options.language ?? 'zh',
      }, (error, response) => {
        if (error) reject(error)
        else resolve(response)
      })
    })
  }
}

HTTP调用示例

// Node.js服务调用transform.api
class TransformService {
  constructor() {
    this.baseUrl = process.env.TRANSFORM_API_URL || 'http://transform.api:8000'
  }

  async extractPDF(fileBuffer, options = {}) {
    const formData = new FormData()
    formData.append('file', new Blob([fileBuffer]), 'document.pdf')
    formData.append('ocr_enabled', String(options.ocrEnabled ?? true))
    formData.append('language', options.language ?? 'zh')

    const response = await fetch(`${this.baseUrl}/pdf/extract`, {
      method: 'POST',
      body: formData,
    })

    if (!response.ok) {
      throw new Error(`Transform API error: ${response.status}`)
    }

    return await response.json()
  }

  async chunkText(text, options = {}) {
    const response = await fetch(`${this.baseUrl}/doc/chunk`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        text,
        chunk_size: options.chunkSize ?? 500,
        chunk_overlap: options.chunkOverlap ?? 50,
        language: options.language ?? 'zh',
      }),
    })

    return await response.json()
  }
}

Docker配置

Python服务Dockerfile

# transform.api/Dockerfile
FROM python:3.12-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    libglib2.0-0 \
    libsm6 \
    libxext6 \
    libxrender-dev \
    libgomp1 \
    wget \
    && rm -rf /var/lib/apt/lists/*

# 安装uv
RUN pip install uv

WORKDIR /app

# 复制依赖文件
COPY pyproject.toml uv.lock ./

# 安装Python依赖
RUN uv sync --no-dev

# 下载Spacy模型
RUN uv run python -m spacy download zh_core_web_sm
RUN uv run python -m spacy download en_core_web_sm

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["uv", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

总结

多语言微服务协作的关键点:

  1. 合理分工:Node.js处理API和IO,Python处理计算密集型任务
  2. 服务通信:gRPC用于高性能调用,HTTP用于简单场景
  3. 异步处理:Python使用asyncio,Node.js使用async/await
  4. 线程池:Python CPU密集型任务放入线程池
  5. 模型共享:NLP模型在服务启动时预加载
  6. 错误处理:完善的降级策略和错误恢复

下一篇将介绍gRPC服务通信的设计与实践。

阅读更多

Skills系统:可扩展AI能力设计

Skills系统:可扩展AI能力设计

概述 Skills系统是AI-Native架构中的重要组件,它允许通过声明式配置扩展AI的能力。本文将介绍Skills系统的设计与实现,让大模型能够像人类专家一样具备特定领域的能力。 什么是Skills系统 概念 Skills(技能)是一种声明式的AI能力扩展机制,类似于人类的"专业技能": 通用AI助手 专业AI助手(带Skills) ┌──────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ 用户:请帮我写代码 │ │ 用户:请帮我审查这段代码 │ │ │ │ │ │ AI:我是一个AI助手 │ │ AI:[激活

By 菱角
插件化架构设计模式

插件化架构设计模式

概述 插件化架构是一种将核心功能与扩展功能分离的设计模式,允许系统在运行时动态加载和卸载功能模块。本文将介绍如何在微服务平台中设计和实现插件化架构。 为什么需要插件化 插件化优势 1. 模块化:功能独立,边界清晰 2. 可扩展:按需加载,动态增删 3. 隔离性:插件间互不干扰 4. 可维护:独立开发、测试、部署 5. 可定制:用户按需选择功能 核心设计 架构概览 核心组件实现 1. 插件接口定义 // core/plugin.interface.ts // 插件接口 export interface IPlugin { // 插件名称 readonly name: string // 插件版本 readonly version: string // 插件配置 getConfig(): PluginConfig // 插件清单

By 菱角
gRPC服务通信设计与实践

gRPC服务通信设计与实践

概述 在微服务架构中,服务间通信是关键环节。相比REST API,gRPC提供了更高的性能和更强的类型安全。本文将介绍如何在微服务平台中设计和实现gRPC服务通信。 为什么选择gRPC gRPC vs REST对比 特性 gRPC REST 协议 HTTP/2 HTTP/1.1 序列化 Protocol Buffers (二进制) JSON (文本) 性能 高(二进制+压缩) 中(文本开销) 类型安全 强(代码生成) 弱(运行时检查) 流式通信 原生支持(双向流) 需额外实现(SSE/WebSocket) 代码生成 自动生成 手动编写 浏览器支持 需gRPC-Web 原生支持 调试难度

By 菱角
CI/CD流水线:Jenkins + APISIX微服务部署实践

CI/CD流水线:Jenkins + APISIX微服务部署实践

概述 微服务架构的CI/CD流水线设计是DevOps实践的核心。本文将介绍如何使用Jenkins Pipeline、Docker和APISIX构建完整的微服务持续集成与持续部署系统。 架构概览 CI/CD流程 Jenkins Pipeline设计 流水线流程 ┌─────────────────────────────────────────────────────────────────────────────┐ │ Jenkins Pipeline 标准流程 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌─────────

By 菱角