构建支持 MLOps 的混合数据特征存储:CockroachDB 与 Milvus 的深度集成与索引优化实践


一个全球化业务的推荐系统,其在线服务对特征(Feature)获取的延迟要求是 P99 在 15ms 以内。这个技术指标本身并不新鲜,但挑战在于特征的混合性:一部分是结构化的标量特征,如用户年龄、商品价格、最近7日点击率等,要求强一致性与事务支持;另一部分则是非结构化的向量特征,如通过深度学习模型生成的用户兴趣 embedding、商品图片 embedding,要求高效的近似最近邻(ANN)查询能力。

最初的架构评审中,团队提出了两种看似更简单的方案。

方案A:单一数据库聚合方案 (PostgreSQL + pgvector)

这个方案的吸引力在于其运维的简单性。使用单一的 PostgreSQL 实例,通过 pgvector 插件来存储和查询向量数据,同时利用其成熟的事务能力处理结构化数据。

  • 优势:

    • 单一技术栈,降低了开发和运维的心智负担。
    • 数据一致性模型简单,所有数据在一个事务内完成读写。
  • 劣势:

    • 全球化部署的短板: PostgreSQL 的原生分布式能力有限。要实现低延迟的全球读写,需要依赖复杂的外部组件进行分片和主从复制,这本身就引入了巨大的架构复杂性和潜在的故障点。对于一个核心在线系统,这是难以接受的风险。
    • 向量查询性能瓶颈: pgvector 是一个优秀的插件,但在我们的压力测试中,当数据集超过一亿且并发查询(QPS)达到数千时,其 ANN 查询的延迟和吞吐量与专用的向量数据库(如 Milvus)相比,存在明显差距。特别是索引构建时间和内存占用,对运维造成了不小的压力。

方案B:纯向量数据库方案 (Milvus + 属性过滤)

这个方案试图将所有数据都塞进 Milvus。Milvus 支持在向量上附加标量字段,并进行属性过滤。

  • 优势:

    • 极致的向量检索性能。
    • 统一的数据入口。
  • 劣势:

    • 事务能力缺失: Milvus 并非为事务处理(OLTP)设计。对于需要频繁更新、且要求强一致性的标量特征(例如,用户的实时账户余额、库存等),使用 Milvus 存在数据不一致的风险。
    • 复杂的标量查询: Milvus 的属性过滤功能虽然强大,但其表达能力和查询优化远不如一个成熟的 SQL 数据库。复杂的组合条件查询性能不佳,且不支持事务。

这两种方案都无法同时满足我们对全球化分布式、强一致性事务和高性能向量检索这三个核心要求。因此,我们最终确定了一个混合架构:CockroachDB + Milvus。CockroachDB 作为分布式 SQL 数据库,负责存储结构化特征,提供全球分布下的低延迟读写和 ACID 保证。Milvus 则作为专门的向量数据库,处理 embedding 的存储与检索。这个选择的代价是架构复杂度的提升,尤其是在数据同步和 MLOps 流程整合上,这也是本文实现的核心。

架构概览与数据流

我们的目标是构建一个逻辑上统一、物理上分离的特征存储。当在线推荐服务需要某个用户的完整特征时,它会并发地向 CockroachDB 和 Milvus 发起请求,然后将结果在内存中合并。

graph TD
    subgraph MLOps Offline Pipeline [离线/近实时特征处理]
        A[原始数据源: Kafka] --> B{特征工程: Flink/Spark};
        B --> C[结构化特征];
        B --> D[Embedding模型];
        D --> E[向量特征];
    end

    subgraph Feature Storage [混合特征存储层]
        C --> F(CockroachDB Cluster);
        E --> G(Milvus Cluster);
    end
    
    subgraph Data Sync [数据同步 MLOps 组件]
        H[CDC/消息队列] --> I{Feature Sync Service};
        I -- upsert --> F;
        I -- upsert --> G;
    end

    subgraph Online Serving [在线推荐服务]
        J[API Gateway] --> K{Recommender Service};
        K -- Get Scalar Features --> F;
        K -- Get Vector Feature --> G;
    end

    F -- Changefeeds --> H;
    
    style F fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#ccf,stroke:#333,stroke-width:2px

这里的关键是 Feature Sync Service,它是 MLOps 流程中的一个核心自动化组件。它的职责是保证 CockroachDB 和 Milvus 中关于同一个实体(如 user_id)的数据在逻辑上是一致的。在真实项目中,这个服务消费来自 CockroachDB Changefeeds 的数据变更事件,确保任何结构化特征的更新都能触发相应向量的更新或重新计算。

核心实现:数据模型与同步服务

1. CockroachDB 数据表设计与索引

CockroachDB 的表设计需要充分考虑其分布式特性。我们将用户ID作为主键,并利用其 REGIONAL BY ROW 的能力,将用户数据固定在离用户最近的区域,从而实现低延迟读写。

-- 用户结构化特征表
-- 'user_id' 是全局唯一的用户标识
-- 'region' 字段用于 CockroachDB 的 geo-partitioning
CREATE TABLE user_scalar_features (
    user_id UUID PRIMARY KEY,
    region crdb_internal_region NOT VISIBLE, -- 用于地理分区
    age INT,
    gender STRING,
    country_code STRING,
    last_7d_clicks INT,
    last_30d_purchases DECIMAL(10, 2),
    updated_at TIMESTAMPTZ DEFAULT now(),
    
    -- 索引设计:
    -- 1. 主键 user_id 本身就是一个高效的点查索引
    -- 2. 在 'updated_at' 上创建索引,用于后台清理或分析任务
    INDEX idx_updated_at (updated_at DESC),
    
    -- 家庭索引,将相关索引数据与主数据存储在一起,优化读取性能
    FAMILY "primary" (user_id, age, gender, country_code, last_7d_clicks, last_30d_purchases, updated_at, region)
)
PARTITION BY LIST (region) (
    PARTITION us_east VALUES IN ('us-east1'),
    PARTITION eu_west VALUES IN ('europe-west1'),
    PARTITION ap_south VALUES IN ('asia-south1')
);

-- 设置表的生存区域
ALTER TABLE user_scalar_features SET LOCALITY REGIONAL BY ROW AS "region";

-- 示例: 插入数据时指定区域
-- 在真实应用中,'region' 字段由应用层根据用户 IP 或注册地等信息确定
-- INSERT INTO user_scalar_features (user_id, region, age, ...) 
-- VALUES ('...', 'us-east1', 30, ...);

这里的 PARTITION BY LISTSET LOCALITY REGIONAL BY ROW 是 CockroachDB 性能优化的关键。它告诉数据库将特定区域用户的数据物理地存储在该区域的节点上。当一个来自 us-east1 的请求查询一个 us-east1 的用户时,请求会被直接路由到本地节点,避免了跨大洋的网络延迟。这是一个常见的错误点:很多团队用了分布式数据库,却没有利用其地理分布特性,导致性能远未达到预期。

2. Milvus Collection 设计与索引

Milvus 中的 Collection 对应数据库中的表。我们需要存储用户ID和对应的 embedding 向量。

# milvus_schema.py
from pymilvus import CollectionSchema, FieldSchema, DataType

# 定义 Embedding 维度
EMBEDDING_DIM = 128

# 用户向量字段
user_id_field = FieldSchema(
  name="user_id",
  dtype=DataType.VARCHAR,
  is_primary=True,
  max_length=36, # UUID a_b_c_d_e -> 36
  description="User's unique identifier, same as in CockroachDB"
)

# 用户 Embedding 向量字段
embedding_field = FieldSchema(
  name="embedding",
  dtype=DataType.FLOAT_VECTOR,
  dim=EMBEDDING_DIM,
  description="User's interest embedding vector"
)

# 创建 Collection Schema
user_embedding_schema = CollectionSchema(
  fields=[user_id_field, embedding_field],
  description="User embedding collection for recommendation",
  enable_dynamic_field=False # 生产环境建议关闭,保证 Schema 严格
)

# 索引参数:这是性能调优的核心
# HNSW 是一种基于图的索引,在召回率和查询效率之间有很好的平衡
# 'M': 图中每个节点的最大边数。值越大,图越复杂,索引构建越慢,但召回率越高。
# 'efConstruction': 索引构建时的搜索范围。值越大,索引质量越高,但构建时间越长。
INDEX_PARAMS = {
  "metric_type": "L2", # 欧氏距离,也可以是 IP (内积)
  "index_type": "HNSW",
  "params": {
      "M": 16,
      "efConstruction": 256
  }
}

# 查询时参数
# 'ef': 查询时的搜索范围。值越大,查询越精确(召回率高),但延迟也越高。
# 这是一个需要在业务指标(召回率)和系统指标(延迟)之间权衡的关键参数。
SEARCH_PARAMS = {
    "params": {
        "ef": 128
    }
}

COLLECTION_NAME = "user_embeddings"

选择 HNSW 索引是因为它在多数场景下提供了最佳的性能-召回率权衡。MefConstruction 是构建时参数,一旦设定通常不会更改。而 ef 是查询时参数,我们可以动态调整它。例如,对于延迟不敏感的离线任务,可以调高 ef 以获得更精确的结果;对于在线服务,则选择一个能在10ms内完成查询且召回率可接受的值。在我们的项目中,通过压测发现 ef=128 是一个很好的起点。

3. 生产级数据同步服务

这个服务的核心是健壮性、幂等性和可观测性。我们使用 Python 实现,依赖 psycopg2 连接 CockroachDB,pymilvus 连接 Milvus,并用 pydantic 进行配置管理。

# feature_sync_service.py
import os
import logging
import time
from typing import List, Dict, Any
from uuid import UUID

import psycopg2
from psycopg2.extras import DictCursor
from pymilvus import connections, utility, Collection
from pydantic_settings import BaseSettings

# --- 配置管理 ---
class MilvusSettings(BaseSettings):
    HOST: str = "localhost"
    PORT: int = 19530
    ALIAS: str = "default"

class CockroachDBSettings(BaseSettings):
    # 使用 Libpq connection string 格式,易于配置
    # "postgresql://user:password@host:port/database?sslmode=verify-full&sslrootcert=..."
    DSN: str

class Settings(BaseSettings):
    MILVUS: MilvusSettings = MilvusSettings()
    COCKROACHDB: CockroachDBSettings = CockroachDBSettings()
    BATCH_SIZE: int = 100
    SYNC_INTERVAL_SECONDS: int = 5

# --- 日志配置 ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 全局配置实例
settings = Settings()

# --- 模拟 Embedding 生成 ---
# 在真实 MLOps 流程中,这会调用一个模型服务
def generate_embedding(user_data: Dict[str, Any]) -> List[float]:
    # 这里的逻辑非常复杂,可能依赖用户的多种标量特征
    # 为简化示例,我们生成一个伪随机向量
    # 注意:为了可复现,可以基于 user_id 和其他特征生成确定性向量
    import numpy as np
    # 确保维度匹配
    return np.random.rand(128).tolist()

class FeatureSynchronizer:
    def __init__(self):
        self._connect()
        self.milvus_collection = self._get_milvus_collection()

    def _connect(self):
        """建立数据库连接,包含重试逻辑"""
        try:
            logger.info("Connecting to Milvus...")
            connections.connect(
                alias=settings.MILVUS.ALIAS,
                host=settings.MILVUS.HOST,
                port=settings.MILVUS.PORT
            )
            logger.info("Connecting to CockroachDB...")
            self.crdb_conn = psycopg2.connect(dsn=settings.COCKROACHDB.DSN)
            logger.info("Connections established.")
        except Exception as e:
            logger.error(f"Failed to connect to databases: {e}")
            raise

    def _get_milvus_collection(self) -> Collection:
        """获取 Milvus Collection 对象,如果不存在则报错"""
        if not utility.has_collection("user_embeddings"):
            # 在生产环境中,Schema 应该由 CI/CD 流程管理,服务不应负责创建
            logger.error("Milvus collection 'user_embeddings' does not exist.")
            raise RuntimeError("Milvus collection not found.")
        return Collection("user_embeddings")

    def fetch_updated_users_from_crdb(self, last_sync_time: str) -> List[Dict[str, Any]]:
        """从 CockroachDB 拉取自上次同步以来有更新的用户数据"""
        # 这里的核心是利用 CockroachDB 的 AS OF SYSTEM TIME 功能,或者一个 updated_at 字段
        # 来实现增量拉取,避免全量扫描。
        query = """
        SELECT user_id, age, gender, country_code, last_7d_clicks, last_30d_purchases
        FROM user_scalar_features
        WHERE updated_at > %s
        ORDER BY updated_at
        LIMIT %s;
        """
        try:
            with self.crdb_conn.cursor(cursor_factory=DictCursor) as cursor:
                cursor.execute(query, (last_sync_time, settings.BATCH_SIZE))
                records = cursor.fetchall()
                return [dict(row) for row in records]
        except psycopg2.Error as e:
            logger.error(f"Error fetching data from CockroachDB: {e}")
            # 数据库错误通常需要重连
            self.crdb_conn.close()
            self._connect()
            return []

    def sync_batch(self, users: List[Dict[str, Any]]):
        """将一个批次的用户数据同步到 Milvus"""
        if not users:
            return

        user_ids = [str(user['user_id']) for user in users]
        
        # 1. 生成 Embeddings
        # 在真实系统中,这可能是一个批处理 RPC 调用
        embeddings = [generate_embedding(user) for user in users]
        
        # 2. 准备 upsert 数据
        # Milvus 的 upsert 操作是幂等的,如果主键已存在则更新,否则插入
        entities = [
            {"user_id": uid, "embedding": emb}
            for uid, emb in zip(user_ids, embeddings)
        ]

        try:
            logger.info(f"Upserting {len(entities)} entities to Milvus...")
            mutation_result = self.milvus_collection.upsert(entities)
            
            # 必须检查 mutation_result,确认操作成功
            if mutation_result.upsert_count != len(entities):
                 logger.warning(
                    f"Mismatch in upsert count. "
                    f"Expected: {len(entities)}, Actual: {mutation_result.upsert_count}"
                )

            logger.info(f"Upsert successful. Primary keys: {mutation_result.primary_keys}")

        except Exception as e:
            logger.error(f"Failed to upsert data to Milvus: {e}")
            # 这里可以加入重试逻辑或将失败批次推送到死信队列
            
    def run_sync_loop(self):
        """主同步循环"""
        # 在生产环境中,last_sync_time 应该持久化存储,例如存入 Redis 或一个专门的表中
        last_sync_time = "1970-01-01T00:00:00Z"
        
        while True:
            try:
                logger.info(f"Fetching users updated since {last_sync_time}...")
                updated_users = self.fetch_updated_users_from_crdb(last_sync_time)
                
                if updated_users:
                    self.sync_batch(updated_users)
                    # 更新时间戳到本批次最后一个记录的时间
                    # 注意:在真实的 CDC 场景中,应使用变更日志的位点
                    # last_sync_time = get_last_timestamp(updated_users)
                    logger.info(f"Batch processed. Found {len(updated_users)} new updates.")
                else:
                    logger.info("No new updates found.")

                time.sleep(settings.SYNC_INTERVAL_SECONDS)

            except (psycopg2.Error, RuntimeError) as e:
                logger.error(f"A critical error occurred in sync loop: {e}. Reconnecting...")
                time.sleep(10) # 发生严重错误后等待一段时间再重试
                self._connect()
            except KeyboardInterrupt:
                logger.info("Sync service shutting down.")
                self.crdb_conn.close()
                connections.disconnect(settings.MILVUS.ALIAS)
                break


if __name__ == "__main__":
    # 单元测试思路:
    # 1. Mock psycopg2 和 pymilvus 客户端。
    # 2. 测试 fetch_updated_users_from_crdb 在不同返回值下的行为 (空列表, 正常数据, 抛出异常)。
    # 3. 测试 sync_batch 是否正确地构建了 Milvus 的实体结构。
    # 4. 测试 run_sync_loop 的循环和异常处理逻辑。
    
    # export COCKROACHDB_DSN="postgresql://..."
    # python feature_sync_service.py
    
    synchronizer = FeatureSynchronizer()
    synchronizer.run_sync_loop()

这个服务虽然简化了 embedding 生成过程,但其结构是生产级的。它包含了配置管理、日志、连接重试、批量处理和基本的错误处理。在 MLOps 体系中,这个服务会被容器化,并作为 Kubernetes deployment 持续运行。

架构的局限性与未来迭代方向

此混合架构并非银弹。它的首要局限性在于引入了两个独立且复杂的分布式系统,运维成本显著高于单一数据库方案。Feature Sync Service 成为一个关键的单点,必须保证其高可用。

其次,数据一致性模型是“最终一致性”。从 CockroachDB 中的特征更新到 Milvus 中对应的 embedding 更新之间,存在一个时间窗口(由 SYNC_INTERVAL_SECONDS 和处理延迟决定)。在这个窗口期,模型可能会拿到新的标量特征和旧的向量特征,这可能对某些对数据新鲜度极度敏感的模型产生影响。

未来的迭代可以从以下几个方面展开:

  1. 切换到事件驱动: 将同步服务从轮询模式改为事件驱动模式。利用 CockroachDB 的 Change Data Capture (CDC) 功能,将数据变更实时推送到 Kafka,同步服务作为消费者近乎实时地处理更新。这将大大缩短数据不一致的窗口期。
  2. 特征版本化: 在 CockroachDB 和 Milvus 中都引入特征版本号。在线服务查询时要求标量特征和向量特征的版本号一致,如果不一致则可以触发降级策略,例如使用上一版本的特征或者只使用标量特征。这是 MLOps 中保障模型稳定性的重要实践。
  3. 查询层抽象: 开发一个轻量级的特征存储 SDK,封装对 CockroachDB 和 Milvus 的并发查询、结果合并、缓存和降级逻辑。这能让上游的应用开发者透明地使用特征,而无需关心底层的存储细节。

  目录