一个全球化业务的推荐系统,其在线服务对特征(Feature)获取的延迟要求是 P99 在 15ms 以内。这个技术指标本身并不新鲜,但挑战在于特征的混合性:一部分是结构化的标量特征,如用户年龄、商品价格、最近7日点击率等,要求强一致性与事务支持;另一部分则是非结构化的向量特征,如通过深度学习模型生成的用户兴趣 embedding、商品图片 embedding,要求高效的近似最近邻(ANN)查询能力。
最初的架构评审中,团队提出了两种看似更简单的方案。
方案A:单一数据库聚合方案 (PostgreSQL + pgvector)
这个方案的吸引力在于其运维的简单性。使用单一的 PostgreSQL 实例,通过 pgvector
插件来存储和查询向量数据,同时利用其成熟的事务能力处理结构化数据。
优势:
- 单一技术栈,降低了开发和运维的心智负担。
- 数据一致性模型简单,所有数据在一个事务内完成读写。
劣势:
- 全球化部署的短板: PostgreSQL 的原生分布式能力有限。要实现低延迟的全球读写,需要依赖复杂的外部组件进行分片和主从复制,这本身就引入了巨大的架构复杂性和潜在的故障点。对于一个核心在线系统,这是难以接受的风险。
- 向量查询性能瓶颈:
pgvector
是一个优秀的插件,但在我们的压力测试中,当数据集超过一亿且并发查询(QPS)达到数千时,其 ANN 查询的延迟和吞吐量与专用的向量数据库(如 Milvus)相比,存在明显差距。特别是索引构建时间和内存占用,对运维造成了不小的压力。
方案B:纯向量数据库方案 (Milvus + 属性过滤)
这个方案试图将所有数据都塞进 Milvus。Milvus 支持在向量上附加标量字段,并进行属性过滤。
优势:
- 极致的向量检索性能。
- 统一的数据入口。
劣势:
- 事务能力缺失: Milvus 并非为事务处理(OLTP)设计。对于需要频繁更新、且要求强一致性的标量特征(例如,用户的实时账户余额、库存等),使用 Milvus 存在数据不一致的风险。
- 复杂的标量查询: Milvus 的属性过滤功能虽然强大,但其表达能力和查询优化远不如一个成熟的 SQL 数据库。复杂的组合条件查询性能不佳,且不支持事务。
这两种方案都无法同时满足我们对全球化分布式、强一致性事务和高性能向量检索这三个核心要求。因此,我们最终确定了一个混合架构:CockroachDB + Milvus。CockroachDB 作为分布式 SQL 数据库,负责存储结构化特征,提供全球分布下的低延迟读写和 ACID 保证。Milvus 则作为专门的向量数据库,处理 embedding 的存储与检索。这个选择的代价是架构复杂度的提升,尤其是在数据同步和 MLOps 流程整合上,这也是本文实现的核心。
架构概览与数据流
我们的目标是构建一个逻辑上统一、物理上分离的特征存储。当在线推荐服务需要某个用户的完整特征时,它会并发地向 CockroachDB 和 Milvus 发起请求,然后将结果在内存中合并。
graph TD subgraph MLOps Offline Pipeline [离线/近实时特征处理] A[原始数据源: Kafka] --> B{特征工程: Flink/Spark}; B --> C[结构化特征]; B --> D[Embedding模型]; D --> E[向量特征]; end subgraph Feature Storage [混合特征存储层] C --> F(CockroachDB Cluster); E --> G(Milvus Cluster); end subgraph Data Sync [数据同步 MLOps 组件] H[CDC/消息队列] --> I{Feature Sync Service}; I -- upsert --> F; I -- upsert --> G; end subgraph Online Serving [在线推荐服务] J[API Gateway] --> K{Recommender Service}; K -- Get Scalar Features --> F; K -- Get Vector Feature --> G; end F -- Changefeeds --> H; style F fill:#f9f,stroke:#333,stroke-width:2px style G fill:#ccf,stroke:#333,stroke-width:2px
这里的关键是 Feature Sync Service
,它是 MLOps 流程中的一个核心自动化组件。它的职责是保证 CockroachDB 和 Milvus 中关于同一个实体(如 user_id)的数据在逻辑上是一致的。在真实项目中,这个服务消费来自 CockroachDB Changefeeds 的数据变更事件,确保任何结构化特征的更新都能触发相应向量的更新或重新计算。
核心实现:数据模型与同步服务
1. CockroachDB 数据表设计与索引
CockroachDB 的表设计需要充分考虑其分布式特性。我们将用户ID作为主键,并利用其 REGIONAL BY ROW
的能力,将用户数据固定在离用户最近的区域,从而实现低延迟读写。
-- 用户结构化特征表
-- 'user_id' 是全局唯一的用户标识
-- 'region' 字段用于 CockroachDB 的 geo-partitioning
CREATE TABLE user_scalar_features (
user_id UUID PRIMARY KEY,
region crdb_internal_region NOT VISIBLE, -- 用于地理分区
age INT,
gender STRING,
country_code STRING,
last_7d_clicks INT,
last_30d_purchases DECIMAL(10, 2),
updated_at TIMESTAMPTZ DEFAULT now(),
-- 索引设计:
-- 1. 主键 user_id 本身就是一个高效的点查索引
-- 2. 在 'updated_at' 上创建索引,用于后台清理或分析任务
INDEX idx_updated_at (updated_at DESC),
-- 家庭索引,将相关索引数据与主数据存储在一起,优化读取性能
FAMILY "primary" (user_id, age, gender, country_code, last_7d_clicks, last_30d_purchases, updated_at, region)
)
PARTITION BY LIST (region) (
PARTITION us_east VALUES IN ('us-east1'),
PARTITION eu_west VALUES IN ('europe-west1'),
PARTITION ap_south VALUES IN ('asia-south1')
);
-- 设置表的生存区域
ALTER TABLE user_scalar_features SET LOCALITY REGIONAL BY ROW AS "region";
-- 示例: 插入数据时指定区域
-- 在真实应用中,'region' 字段由应用层根据用户 IP 或注册地等信息确定
-- INSERT INTO user_scalar_features (user_id, region, age, ...)
-- VALUES ('...', 'us-east1', 30, ...);
这里的 PARTITION BY LIST
和 SET LOCALITY REGIONAL BY ROW
是 CockroachDB 性能优化的关键。它告诉数据库将特定区域用户的数据物理地存储在该区域的节点上。当一个来自 us-east1
的请求查询一个 us-east1
的用户时,请求会被直接路由到本地节点,避免了跨大洋的网络延迟。这是一个常见的错误点:很多团队用了分布式数据库,却没有利用其地理分布特性,导致性能远未达到预期。
2. Milvus Collection 设计与索引
Milvus 中的 Collection 对应数据库中的表。我们需要存储用户ID和对应的 embedding 向量。
# milvus_schema.py
from pymilvus import CollectionSchema, FieldSchema, DataType
# 定义 Embedding 维度
EMBEDDING_DIM = 128
# 用户向量字段
user_id_field = FieldSchema(
name="user_id",
dtype=DataType.VARCHAR,
is_primary=True,
max_length=36, # UUID a_b_c_d_e -> 36
description="User's unique identifier, same as in CockroachDB"
)
# 用户 Embedding 向量字段
embedding_field = FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=EMBEDDING_DIM,
description="User's interest embedding vector"
)
# 创建 Collection Schema
user_embedding_schema = CollectionSchema(
fields=[user_id_field, embedding_field],
description="User embedding collection for recommendation",
enable_dynamic_field=False # 生产环境建议关闭,保证 Schema 严格
)
# 索引参数:这是性能调优的核心
# HNSW 是一种基于图的索引,在召回率和查询效率之间有很好的平衡
# 'M': 图中每个节点的最大边数。值越大,图越复杂,索引构建越慢,但召回率越高。
# 'efConstruction': 索引构建时的搜索范围。值越大,索引质量越高,但构建时间越长。
INDEX_PARAMS = {
"metric_type": "L2", # 欧氏距离,也可以是 IP (内积)
"index_type": "HNSW",
"params": {
"M": 16,
"efConstruction": 256
}
}
# 查询时参数
# 'ef': 查询时的搜索范围。值越大,查询越精确(召回率高),但延迟也越高。
# 这是一个需要在业务指标(召回率)和系统指标(延迟)之间权衡的关键参数。
SEARCH_PARAMS = {
"params": {
"ef": 128
}
}
COLLECTION_NAME = "user_embeddings"
选择 HNSW
索引是因为它在多数场景下提供了最佳的性能-召回率权衡。M
和 efConstruction
是构建时参数,一旦设定通常不会更改。而 ef
是查询时参数,我们可以动态调整它。例如,对于延迟不敏感的离线任务,可以调高 ef
以获得更精确的结果;对于在线服务,则选择一个能在10ms内完成查询且召回率可接受的值。在我们的项目中,通过压测发现 ef=128
是一个很好的起点。
3. 生产级数据同步服务
这个服务的核心是健壮性、幂等性和可观测性。我们使用 Python 实现,依赖 psycopg2
连接 CockroachDB,pymilvus
连接 Milvus,并用 pydantic
进行配置管理。
# feature_sync_service.py
import os
import logging
import time
from typing import List, Dict, Any
from uuid import UUID
import psycopg2
from psycopg2.extras import DictCursor
from pymilvus import connections, utility, Collection
from pydantic_settings import BaseSettings
# --- 配置管理 ---
class MilvusSettings(BaseSettings):
HOST: str = "localhost"
PORT: int = 19530
ALIAS: str = "default"
class CockroachDBSettings(BaseSettings):
# 使用 Libpq connection string 格式,易于配置
# "postgresql://user:password@host:port/database?sslmode=verify-full&sslrootcert=..."
DSN: str
class Settings(BaseSettings):
MILVUS: MilvusSettings = MilvusSettings()
COCKROACHDB: CockroachDBSettings = CockroachDBSettings()
BATCH_SIZE: int = 100
SYNC_INTERVAL_SECONDS: int = 5
# --- 日志配置 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 全局配置实例
settings = Settings()
# --- 模拟 Embedding 生成 ---
# 在真实 MLOps 流程中,这会调用一个模型服务
def generate_embedding(user_data: Dict[str, Any]) -> List[float]:
# 这里的逻辑非常复杂,可能依赖用户的多种标量特征
# 为简化示例,我们生成一个伪随机向量
# 注意:为了可复现,可以基于 user_id 和其他特征生成确定性向量
import numpy as np
# 确保维度匹配
return np.random.rand(128).tolist()
class FeatureSynchronizer:
def __init__(self):
self._connect()
self.milvus_collection = self._get_milvus_collection()
def _connect(self):
"""建立数据库连接,包含重试逻辑"""
try:
logger.info("Connecting to Milvus...")
connections.connect(
alias=settings.MILVUS.ALIAS,
host=settings.MILVUS.HOST,
port=settings.MILVUS.PORT
)
logger.info("Connecting to CockroachDB...")
self.crdb_conn = psycopg2.connect(dsn=settings.COCKROACHDB.DSN)
logger.info("Connections established.")
except Exception as e:
logger.error(f"Failed to connect to databases: {e}")
raise
def _get_milvus_collection(self) -> Collection:
"""获取 Milvus Collection 对象,如果不存在则报错"""
if not utility.has_collection("user_embeddings"):
# 在生产环境中,Schema 应该由 CI/CD 流程管理,服务不应负责创建
logger.error("Milvus collection 'user_embeddings' does not exist.")
raise RuntimeError("Milvus collection not found.")
return Collection("user_embeddings")
def fetch_updated_users_from_crdb(self, last_sync_time: str) -> List[Dict[str, Any]]:
"""从 CockroachDB 拉取自上次同步以来有更新的用户数据"""
# 这里的核心是利用 CockroachDB 的 AS OF SYSTEM TIME 功能,或者一个 updated_at 字段
# 来实现增量拉取,避免全量扫描。
query = """
SELECT user_id, age, gender, country_code, last_7d_clicks, last_30d_purchases
FROM user_scalar_features
WHERE updated_at > %s
ORDER BY updated_at
LIMIT %s;
"""
try:
with self.crdb_conn.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute(query, (last_sync_time, settings.BATCH_SIZE))
records = cursor.fetchall()
return [dict(row) for row in records]
except psycopg2.Error as e:
logger.error(f"Error fetching data from CockroachDB: {e}")
# 数据库错误通常需要重连
self.crdb_conn.close()
self._connect()
return []
def sync_batch(self, users: List[Dict[str, Any]]):
"""将一个批次的用户数据同步到 Milvus"""
if not users:
return
user_ids = [str(user['user_id']) for user in users]
# 1. 生成 Embeddings
# 在真实系统中,这可能是一个批处理 RPC 调用
embeddings = [generate_embedding(user) for user in users]
# 2. 准备 upsert 数据
# Milvus 的 upsert 操作是幂等的,如果主键已存在则更新,否则插入
entities = [
{"user_id": uid, "embedding": emb}
for uid, emb in zip(user_ids, embeddings)
]
try:
logger.info(f"Upserting {len(entities)} entities to Milvus...")
mutation_result = self.milvus_collection.upsert(entities)
# 必须检查 mutation_result,确认操作成功
if mutation_result.upsert_count != len(entities):
logger.warning(
f"Mismatch in upsert count. "
f"Expected: {len(entities)}, Actual: {mutation_result.upsert_count}"
)
logger.info(f"Upsert successful. Primary keys: {mutation_result.primary_keys}")
except Exception as e:
logger.error(f"Failed to upsert data to Milvus: {e}")
# 这里可以加入重试逻辑或将失败批次推送到死信队列
def run_sync_loop(self):
"""主同步循环"""
# 在生产环境中,last_sync_time 应该持久化存储,例如存入 Redis 或一个专门的表中
last_sync_time = "1970-01-01T00:00:00Z"
while True:
try:
logger.info(f"Fetching users updated since {last_sync_time}...")
updated_users = self.fetch_updated_users_from_crdb(last_sync_time)
if updated_users:
self.sync_batch(updated_users)
# 更新时间戳到本批次最后一个记录的时间
# 注意:在真实的 CDC 场景中,应使用变更日志的位点
# last_sync_time = get_last_timestamp(updated_users)
logger.info(f"Batch processed. Found {len(updated_users)} new updates.")
else:
logger.info("No new updates found.")
time.sleep(settings.SYNC_INTERVAL_SECONDS)
except (psycopg2.Error, RuntimeError) as e:
logger.error(f"A critical error occurred in sync loop: {e}. Reconnecting...")
time.sleep(10) # 发生严重错误后等待一段时间再重试
self._connect()
except KeyboardInterrupt:
logger.info("Sync service shutting down.")
self.crdb_conn.close()
connections.disconnect(settings.MILVUS.ALIAS)
break
if __name__ == "__main__":
# 单元测试思路:
# 1. Mock psycopg2 和 pymilvus 客户端。
# 2. 测试 fetch_updated_users_from_crdb 在不同返回值下的行为 (空列表, 正常数据, 抛出异常)。
# 3. 测试 sync_batch 是否正确地构建了 Milvus 的实体结构。
# 4. 测试 run_sync_loop 的循环和异常处理逻辑。
# export COCKROACHDB_DSN="postgresql://..."
# python feature_sync_service.py
synchronizer = FeatureSynchronizer()
synchronizer.run_sync_loop()
这个服务虽然简化了 embedding 生成过程,但其结构是生产级的。它包含了配置管理、日志、连接重试、批量处理和基本的错误处理。在 MLOps 体系中,这个服务会被容器化,并作为 Kubernetes deployment 持续运行。
架构的局限性与未来迭代方向
此混合架构并非银弹。它的首要局限性在于引入了两个独立且复杂的分布式系统,运维成本显著高于单一数据库方案。Feature Sync Service
成为一个关键的单点,必须保证其高可用。
其次,数据一致性模型是“最终一致性”。从 CockroachDB 中的特征更新到 Milvus 中对应的 embedding 更新之间,存在一个时间窗口(由 SYNC_INTERVAL_SECONDS
和处理延迟决定)。在这个窗口期,模型可能会拿到新的标量特征和旧的向量特征,这可能对某些对数据新鲜度极度敏感的模型产生影响。
未来的迭代可以从以下几个方面展开:
- 切换到事件驱动: 将同步服务从轮询模式改为事件驱动模式。利用 CockroachDB 的 Change Data Capture (CDC) 功能,将数据变更实时推送到 Kafka,同步服务作为消费者近乎实时地处理更新。这将大大缩短数据不一致的窗口期。
- 特征版本化: 在 CockroachDB 和 Milvus 中都引入特征版本号。在线服务查询时要求标量特征和向量特征的版本号一致,如果不一致则可以触发降级策略,例如使用上一版本的特征或者只使用标量特征。这是 MLOps 中保障模型稳定性的重要实践。
- 查询层抽象: 开发一个轻量级的特征存储 SDK,封装对 CockroachDB 和 Milvus 的并发查询、结果合并、缓存和降级逻辑。这能让上游的应用开发者透明地使用特征,而无需关心底层的存储细节。