构建一套基于tRPC与FastAPI的类型安全异构微服务架构


我们面临一个典型的现代技术团队困境:核心API和BFF(Backend for Frontend)层由TypeScript团队主导,他们极其依赖端到端的类型安全来保证开发效率和线上稳定性;而另一边,数据科学和算法团队则坚定地选择Python生态,利用其丰富的库进行复杂的数据处理。当这两个世界需要协作时,传统的RESTful API加OpenAPI文档的模式开始暴露出其固有的脆弱性。

问题不在于文档能否生成,而在于类型契约的保障是运行时的,而非编译时的。一个Python服务的字段变更,如果没有及时更新OpenAPI文档并重新生成客户端代码,TypeScript服务在编译阶段将一无所知,直到运行时调用失败才暴露问题。在追求高可用性和快速迭代的生产环境中,这种延迟发现的错误是不可接受的。

我们需要一个方案,它既能尊重两个技术栈的生态优势,又能提供跨语言的、编译时级别(或接近编译时)的类型契约保障。

方案权衡:REST vs. 契约优先的RPC

方案A:标准化的REST + OpenAPI

这是最常见的做法。FastAPI可以自动生成符合OpenAPI 3.0规范的JSON文档。TypeScript侧可以使用 openapi-typescript 之类的工具,根据这个JSON文档生成类型定义文件。

  • 优势:

    • 技术成熟,生态工具丰富。
    • 松耦合,基于HTTP,易于调试和理解。
    • 无缝集成API网关、负载均衡等现有基础设施。
  • 劣势:

    • 类型安全是“建议性”而非“强制性”的。 整个流程的健壮性依赖于开发者纪律和CI/CD流程的完备性。任何环节的疏漏(忘记更新文档、生成脚本执行失败)都会破坏类型安全。
    • 开发体验割裂。 TypeScript开发者在调用Python服务时,感觉像是在与一个黑盒交互,无法获得类似调用内部tRPC procedure那样的自动补全和类型推断。
    • 运行时开销。 JSON序列化/反序列化以及HTTP协议本身的开销,在高频内部调用场景下可能成为瓶颈。

方案B:共享契约驱动的混合架构

这个方案的核心思想是,将服务间的接口契约(Contract)提升为一等公民,独立于任何服务的实现。这个契约成为连接两个技术栈的“真理之源”。

我们选择使用TypeScript的类型定义(.d.ts)作为这个真理之源。原因在于tRPC可以原生消费它,而我们可以构建一个轻量级工具将这些TypeScript类型定义转换为FastAPI可以理解的Pydantic模型。

  • 优势:

    • 强类型契约保障。 契约的任何变更都会在CI流程中触发TypeScript服务和Python服务的类型检查,将错误拦截在编译阶段。
    • 统一的开发体验。 对TypeScript开发者而言,调用Python服务和调用另一个TypeScript服务几乎没有区别。
    • 实现解耦。 只要遵守契约,任何一边的服务内部如何实现、如何重构,都不会影响另一方。
  • 劣势:

    • 需要额外的构建步骤。 需要一个脚本来同步TypeScript定义到Pydantic模型,增加了CI/CD的复杂度。
    • 认知成本。 团队需要理解并遵循“契约先行”的开发模式。

对于我们这个对稳定性要求极高的项目,方案B的长期收益远大于其初期的构建成本。将潜在的运行时错误转化为编译时错误,是架构设计中一个极具价值的权衡。因此,我们决定采纳方案B。

核心实现概览

我们将构建一个包含两个核心服务和一个共享契约包的monorepo。

  1. bff-service (TypeScript, tRPC, Express): 面向客户端的BFF层,负责业务逻辑编排。它会调用data-service
  2. data-service (Python, FastAPI, MongoDB): 数据处理服务,负责复杂的计算和数据库交互。
  3. @core/contracts (TypeScript Definitions): 共享的类型定义包,是两个服务间通信的唯一契约。

整体架构和数据流如下所示:

graph TD
    subgraph "Monorepo"
        direction LR
        subgraph "packages"
            Contracts["@core/contracts (TS Definitions)"]
        end
        subgraph "services"
            BFF["bff-service (tRPC)"]
            DataService["data-service (FastAPI)"]
        end
    end

    subgraph "Build & CI Process"
        direction LR
        BuildBFF["Build bff-service"]
        BuildData["Build data-service"]
        TypeGenerator["TS -> Pydantic Generator"]
    end
    
    subgraph "Runtime Environment (OCI Containers)"
        Client["Client (Web/Mobile)"]
        APIGateway["API Gateway"]
        BFFContainer["Container: bff-service"]
        DataContainer["Container: data-service"]
        DB["MongoDB"]
    end

    Contracts -- "Consumed by" --> BFF
    Contracts -- "Input for" --> TypeGenerator
    TypeGenerator -- "Generates" --> PydanticModels[Pydantic Models]
    PydanticModels -- "Used by" --> DataService

    BFF -- "Builds into" --> BuildBFF
    DataService -- "Builds into" --> BuildData
    
    BuildBFF --> BFFContainer
    BuildData --> DataContainer

    Client --> APIGateway
    APIGateway --> BFFContainer
    BFFContainer -- "HTTP Call (RPC-like)" --> DataContainer
    DataContainer --> DB

步骤化实现

1. 定义共享契约 (@core/contracts)

这是整个架构的基石。我们用纯粹的TypeScript类型来定义数据结构和过程。

packages/contracts/src/index.ts:

// packages/contracts/src/index.ts

/**
 * @description Represents a user profile in the system.
 */
export interface UserProfile {
  userId: string;
  username: string;
  email: string;
  createdAt: Date;
}

/**
 * @description Input structure for the data analysis procedure.
 */
export interface AnalysisRequest {
  userId: string;
  metrics: Array<'performance' | 'engagement' | 'retention'>;
  timeframe: '7d' | '30d' | '90d';
}

/**
 * @description The result of a data analysis operation.
 */
export interface AnalysisResult {
  userId: string;
  reportId: string;
  generatedAt: Date;
  scores: Record<string, number>; // e.g., { performance: 0.85, engagement: 0.92 }
}

// We can define the "API" shape here as a plain object.
// This is not a tRPC router, just a type definition of the procedures.
export type DataServiceContract = {
  getUserProfileById: (userId: string) => Promise<UserProfile | null>;
  runAnalysis: (request: AnalysisRequest) => Promise<AnalysisResult>;
};

2. 实现数据服务 (data-service - FastAPI)

这里的挑战在于如何让Python理解TypeScript的契约。我们编写一个简单的构建时脚本来完成这个转换。

首先是Python服务的核心逻辑。

services/data-service/app/main.py:

# services/data-service/app/main.py
import os
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient

# These models are NOT manually written. They are generated by our script.
from .generated_models import UserProfile, AnalysisRequest, AnalysisResult 

# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Database Connection ---
MONGO_URL = os.getenv("MONGO_URL", "mongodb://mongodb:27017")
DB_NAME = "heterogeneous_db"

db_client: AsyncIOMotorClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Handles startup and shutdown events."""
    global db_client
    logger.info("Connecting to MongoDB...")
    try:
        db_client = AsyncIOMotorClient(MONGO_URL)
        # Verify connection
        await db_client.admin.command('ping')
        logger.info("MongoDB connection successful.")
    except Exception as e:
        logger.error(f"Failed to connect to MongoDB: {e}")
        # In a real app, you might want to exit or handle this more gracefully
        db_client = None
    
    yield
    
    if db_client:
        logger.info("Closing MongoDB connection.")
        db_client.close()

app = FastAPI(lifespan=lifespan)

def get_db():
    if not db_client:
        raise HTTPException(status_code=503, detail="Database connection not available")
    return db_client[DB_NAME]

@app.get("/health", status_code=200)
async def health_check():
    return {"status": "ok"}

@app.get("/users/{user_id}", response_model=UserProfile)
async def get_user_profile_by_id(user_id: str):
    """
    Corresponds to the `getUserProfileById` procedure in the contract.
    """
    db = get_db()
    user_data = await db.users.find_one({"userId": user_id}, {"_id": 0})
    if not user_data:
        raise HTTPException(status_code=404, detail="User not found")
    return user_data

@app.post("/analysis", response_model=AnalysisResult)
async def run_analysis(request: AnalysisRequest):
    """
    Corresponds to the `runAnalysis` procedure in the contract.
    This simulates a heavy computation.
    """
    logger.info(f"Running analysis for user {request.userId} with metrics {request.metrics}")
    # In a real project, this would involve complex logic, db queries, etc.
    # Here, we just simulate a result.
    scores = {metric: round(0.7 + 0.3 * (hash(f"{request.userId}-{metric}") % 1000) / 1000, 2) for metric in request.metrics}
    
    result = {
        "userId": request.userId,
        "reportId": f"rep_{hash(request.userId)}",
        "generatedAt": "2023-10-27T10:00:00.000Z", # In production, use current time
        "scores": scores
    }
    
    # Optionally save the report to the database
    await get_db().reports.insert_one(result)

    return result

现在是关键部分:类型生成脚本。这是一个简化的概念验证,真实项目可能需要更健壮的AST解析器。

services/data-service/scripts/generate_models.py:

# services/data-service/scripts/generate_models.py
import re
from pathlib import Path

# --- Configuration ---
# Path to the shared contract file
CONTRACT_FILE = Path(__file__).parent.parent.parent.parent / "packages" / "contracts" / "src" / "index.ts"
# Output file for generated Pydantic models
OUTPUT_FILE = Path(__file__).parent.parent / "app" / "generated_models.py"

# --- Type Mapping ---
# A simple map from TypeScript types to Python types/Pydantic fields
TS_TO_PYDANTIC_MAP = {
    "string": "str",
    "number": "float", # Use float for general numbers
    "Date": "datetime",
    "boolean": "bool",
    "Array<string>": "List[str]",
    "Array<'performance' | 'engagement' | 'retention'>": "List[Literal['performance', 'engagement', 'retention']]",
    "Record<string, number>": "Dict[str, float]",
}

def convert_ts_type(ts_type: str) -> str:
    """Converts a TypeScript type string to a Python/Pydantic type string."""
    return TS_TO_PYDANTIC_MAP.get(ts_type.strip(), "Any")

def parse_ts_interface(content: str) -> str:
    """Parses TypeScript interfaces and generates Pydantic models."""
    pydantic_models = []
    # Find all interface blocks
    interface_regex = re.compile(r"export\s+interface\s+(\w+)\s+\{([^}]+)\}", re.DOTALL)
    
    for match in interface_regex.finditer(content):
        interface_name = match.group(1)
        fields_str = match.group(2)
        
        pydantic_fields = []
        # Find all field definitions
        field_regex = re.compile(r"(\w+)\s*:\s*([^;]+);")
        for field_match in field_regex.finditer(fields_str):
            field_name = field_match.group(1)
            ts_type = field_match.group(2).strip()
            py_type = convert_ts_type(ts_type)
            pydantic_fields.append(f"    {field_name}: {py_type}")
            
        model_def = f"class {interface_name}(BaseModel):\n" + "\n".join(pydantic_fields)
        pydantic_models.append(model_def)
        
    return "\n\n".join(pydantic_models)

def main():
    """Main execution function."""
    print(f"Reading contract from: {CONTRACT_FILE}")
    if not CONTRACT_FILE.exists():
        print("Error: Contract file not found!")
        exit(1)

    with open(CONTRACT_FILE, "r") as f:
        ts_content = f.read()

    pydantic_code = parse_ts_interface(ts_content)

    output_content = f"""# THIS FILE IS AUTO-GENERATED BY scripts/generate_models.py
# DO NOT EDIT THIS FILE MANUALLY
from pydantic import BaseModel
from typing import List, Dict, Any, Literal
from datetime import datetime

{pydantic_code}
"""
    
    print(f"Writing generated Pydantic models to: {OUTPUT_FILE}")
    with open(OUTPUT_FILE, "w") as f:
        f.write(output_content)
    print("Model generation complete.")

if __name__ == "__main__":
    main()

这个脚本会被集成到data-servicepackage.json构建命令中,确保每次构建时模型都是最新的。

3. 实现BFF服务 (bff-service - tRPC)

BFF服务现在需要一个客户端来调用FastAPI服务,并且这个客户端的行为需要严格匹配DataServiceContract

services/bff-service/src/dataServiceClient.ts:

// services/bff-service/src/dataServiceClient.ts
import { DataServiceContract, AnalysisRequest } from '@core/contracts';
import axios from 'axios';

// This URL would come from environment variables in a real application
const DATA_SERVICE_BASE_URL = process.env.DATA_SERVICE_URL || 'http://data-service:8000';

/**
 * A type-safe client for the data-service.
 * It implements the DataServiceContract, ensuring that any changes in the contract
 * will cause a compile-time error here if the implementation is not updated.
 * This is the core of our cross-stack type safety.
 */
export const dataServiceClient: DataServiceContract = {
  async getUserProfileById(userId) {
    try {
      // The path and parameters must match the FastAPI endpoint
      const response = await axios.get(`${DATA_SERVICE_BASE_URL}/users/${userId}`);
      // Here, we trust that the FastAPI service respects the contract,
      // which is enforced by the generated Pydantic models.
      // For extra safety, we could use Zod to parse the response.
      return response.data;
    } catch (error) {
      if (axios.isAxiosError(error) && error.response?.status === 404) {
        return null;
      }
      // Proper error logging and handling should be implemented here
      console.error('Error fetching user profile:', error);
      throw new Error('Failed to communicate with data service');
    }
  },

  async runAnalysis(request: AnalysisRequest) {
    try {
      const response = await axios.post(`${DATA_SERVICE_BASE_URL}/analysis`, request);
      return response.data;
    } catch (error) {
      console.error('Error running analysis:', error);
      throw new Error('Failed to run analysis via data service');
    }
  },
};

接着,我们在tRPC router中使用这个客户端。

services/bff-service/src/router.ts:

// services/bff-service/src/router.ts
import { initTRPC } from '@trpc/server';
import { z } from 'zod';
import { dataServiceClient } from './dataServiceClient';

const t = initTRPC.create();

export const appRouter = t.router({
  // This procedure exposes the data service's functionality to the client
  getUser: t.procedure
    .input(z.object({ userId: z.string() }))
    .query(async ({ input }) => {
      const userProfile = await dataServiceClient.getUserProfileById(input.userId);
      if (!userProfile) {
        // Here we can handle not-found cases gracefully for the frontend
        return { success: false, message: 'User not found' };
      }
      return { success: true, user: userProfile };
    }),

  // Another procedure that acts as a proxy with added logic
  triggerAnalysis: t.procedure
    .input(z.object({
      userId: z.string(),
      metrics: z.array(z.enum(['performance', 'engagement', 'retention'])),
    }))
    .mutation(async ({ input }) => {
      // Some business logic could be here before calling the data service
      console.log(`BFF: Received request to analyze ${input.userId}`);

      const result = await dataServiceClient.runAnalysis({
        ...input,
        timeframe: '30d', // We can enforce or default certain parameters here
      });

      return { reportUrl: `/reports/${result.reportId}` };
    }),
});

export type AppRouter = typeof appRouter;

4. 单元测试 (Jest)

测试是保证系统稳定性的关键。在BFF服务中,我们需要用Jest来测试我们的tRPC procedure,同时模拟(mock)对data-service的调用,以隔离测试单元。

services/bff-service/src/router.test.ts:

// services/bff-service/src/router.test.ts
import { appRouter } from './router';
import { dataServiceClient } from './dataServiceClient';
import type { UserProfile } from '@core/contracts';

// Mock the dataServiceClient to prevent actual HTTP calls during tests
jest.mock('./dataServiceClient');
const mockedDataServiceClient = dataServiceClient as jest.Mocked<typeof dataServiceClient>;

describe('appRouter tests', () => {

  beforeEach(() => {
    // Clear mock history before each test
    jest.clearAllMocks();
  });

  describe('getUser procedure', () => {
    it('should return user data when a valid user ID is provided', async () => {
      // Arrange
      const caller = appRouter.createCaller({});
      const mockUser: UserProfile = {
        userId: 'user-123',
        username: 'testuser',
        email: '[email protected]',
        createdAt: new Date(),
      };
      mockedDataServiceClient.getUserProfileById.mockResolvedValue(mockUser);

      // Act
      const result = await caller.getUser({ userId: 'user-123' });

      // Assert
      expect(mockedDataServiceClient.getUserProfileById).toHaveBeenCalledWith('user-123');
      expect(result.success).toBe(true);
      expect(result.user).toEqual(mockUser);
    });

    it('should return a not-found message for a non-existent user ID', async () => {
      // Arrange
      const caller = appRouter.createCaller({});
      mockedDataServiceClient.getUserProfileById.mockResolvedValue(null);

      // Act
      const result = await caller.getUser({ userId: 'user-999' });
      
      // Assert
      expect(mockedDataServiceClient.getUserProfileById).toHaveBeenCalledWith('user-999');
      expect(result.success).toBe(false);
      expect(result.message).toBe('User not found');
      expect((result as any).user).toBeUndefined();
    });
  });
});

这个测试验证了getUser procedure的行为,而不依赖于data-service的真实运行,这是单元测试的核心原则。

5. 容器化 (OCI / Docker)

最后,我们将两个服务打包成符合OCI规范的容器镜像,以便于部署。

services/bff-service/Dockerfile:

# Stage 1: Build the application
FROM node:18-alpine AS builder
WORKDIR /app

# Copy dependency files
COPY package.json yarn.lock ./
# Copy monorepo dependency
COPY packages/contracts/package.json ./packages/contracts/

# Install dependencies including workspace dependencies
RUN yarn install --frozen-lockfile

# Copy source code
COPY . .

# Build the specific workspace
RUN yarn workspace bff-service build

# Stage 2: Create the production image
FROM node:18-alpine
WORKDIR /app

# Create a non-root user
RUN addgroup -S appgroup && adduser -S appuser -G appgroup
USER appuser

# Copy only necessary files from the builder stage
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/packages/contracts/package.json ./packages/contracts/package.json
COPY --from=builder /app/services/bff-service/package.json ./services/bff-service/package.json
COPY --from=builder /app/services/bff-service/dist ./services/bff-service/dist

ENV NODE_ENV=production
# Expose the port the app runs on
EXPOSE 4000

# Start the application
CMD ["node", "services/bff-service/dist/server.js"]

services/data-service/Dockerfile:

FROM python:3.11-slim

WORKDIR /code

# Create a non-root user for security
RUN useradd --create-home appuser
USER appuser

ENV PATH="/home/appuser/.local/bin:${PATH}"
ENV PYTHONUNBUFFERED 1

COPY --chown=appuser:appuser ./services/data-service/requirements.txt .

RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir --user -r requirements.txt

# This step is crucial. It runs our type generator script before copying app code.
# Assuming the contract code is accessible in the build context.
COPY --chown=appuser:appuser ./packages/contracts/src/index.ts /tmp/contract.ts
# We need to copy the script itself
COPY --chown=appuser:appuser ./services/data-service/scripts/generate_models.py /code/scripts/generate_models.py
# Modify script to read from /tmp
RUN sed -i "s|CONTRACT_FILE = .*|CONTRACT_FILE = Path('/tmp/contract.ts')|" /code/scripts/generate_models.py && \
    python /code/scripts/generate_models.py

COPY --chown=appuser:appuser ./services/data-service/app ./app

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

这两个Dockerfile都遵循了最佳实践,例如使用多阶段构建、非root用户运行等,确保了生产环境的镜像既小又安全。一个docker-compose.yml文件可以将整个系统串联起来进行本地开发和测试。

架构的扩展性与局限性

这种“契约先行”的异构架构模式为我们带来了显著的优势,尤其是在团队协作和系统长期可维护性方面。当新的Python服务需要加入时,我们只需在@core/contracts中定义它的接口,然后遵循相同的模式生成Pydantic模型即可。对于TypeScript侧,消费一个新的、类型完全安全的服务也变得极其简单。

然而,这个方案并非没有局限性。我们自己编写的TS -> Pydantic生成器目前还非常简陋,只能处理基础类型。如果契约中出现更复杂的泛型、联合类型或交叉类型,脚本就需要变得更加智能,可能需要引入一个完整的TypeScript AST解析库(如 ts-morph),这会增加维护成本。

此外,该方案在服务间通信上仍然依赖于HTTP/JSON。尽管对于大多数业务场景这已经足够,但在需要极致性能和低延迟的场景下,gRPC和Protobuf可能是更优的选择。不过,引入gRPC会带来更重的工具链和更陡峭的学习曲线,这又是一个新的权衡。当前的架构是在开发体验、类型安全和性能之间取得的一个务实平衡。


  目录