构建基于 Paxos 的分布式状态机以纳管 Milvus 与 OpenFaaS 向量处理工作流


我们面临一个棘手的状况:一个由 OpenFaaS 函数驱动的向量索引流水线,需要以高可用的方式管理 Milvus 集合的元数据和加载状态。数据批次的到达由 AWS SQS 消息触发,整个系统通过 容器化 部署。问题在于,流水线的控制平面——那个决定何时创建集合、加载哪个分区、更新哪个别名的组件——必须是无单点故障的。如果这个控制平面宕机,整个数据流入 Milvus 的链路就会中断。

在真实项目中,直接使用 ZooKeeper 或 etcd 是标准答案。但为了彻底搞懂这类系统的核心,我们决定自己构建一个简化的、基于 Paxos 协议的分布式状态机来作为这个高可用的控制平面。这个过程暴露了将一个理论算法工程化的诸多挑战,也澄清了它在现代云原生架构中的确切位置。

初步构想与技术选型的挣扎

最初的方案非常简单:用一个关系型数据库(如 PostgreSQL)来存储状态,所有控制平面的实例通过事务来读写状态。这很快被否决了。数据库本身成为了新的单点故障,并且在跨区域部署的场景下,写延迟会变得无法接受。

另一个想法是使用 Redis 的分布式锁。一个控制平面实例获取锁,成为 Leader,负责所有状态变更。其他实例作为 Standby。这在实践中是个巨大的坑。如果 Leader 实例在持有锁的情况下崩溃,锁的自动释放机制可能会导致脑裂(Split-brain),两个实例都认为自己是 Leader,从而做出冲突的决策,污染 Milvus 中的元数据。

最终,我们选择了最硬核但也最可靠的路径:实现一个基于 Multi-Paxos 的复制状态机 (Replicated State Machine)。所有控制平面的副本都是对等的。任何一个副本都可以提出状态变更(例如,“为集合 X 加载分区 P”)。这个提议会通过 Paxos 协议在副本集群中达成共识。一旦超过半数的副本(Quorum)就一个操作序列达成一致,该操作就会被应用到所有副本的本地状态机中,并最终触发对 Milvus 的操作。这保证了即使部分副本失效,只要集群的多数派存活,系统就能继续正确地处理状态变更。

整个系统的架构如下:

graph TD
    subgraph "AWS"
        SQS[AWS SQS 消息队列]
    end

    subgraph "Kubernetes / OpenFaaS"
        SQS -- 触发 --> Dispatcher[Dispatcher Function]
        Dispatcher -- 1. 提议状态变更 --> PaxosCluster
        Indexer[Indexer Function]
        Milvus[(Milvus 向量数据库)]
    end
    
    subgraph "高可用控制平面 (Paxos Cluster)"
        id1(Orchestrator Node 1)
        id2(Orchestrator Node 2)
        id3(Orchestrator Node 3)
        id1 <--> id2
        id2 <--> id3
        id3 <--> id1
        
        PaxosCluster{Paxos 共识模块} -- 2. 状态提交 --> StateMachine[复制状态机]
        StateMachine -- 3. 触发实际操作 --> Indexer
        Indexer -- 4. 写入数据 --> Milvus
    end

    style PaxosCluster fill:#f9f,stroke:#333,stroke-width:2px

核心实现:一个简化的 Multi-Paxos 状态机

我们将用 Python 来实现这个逻辑。为了聚焦核心,我们不会实现网络通信部分,而是用一个模拟的 Broker 类来代替,但在真实项目中这部分应由 gRPC 或类似的 RPC 框架实现。

1. 定义角色与消息体

Paxos 算法有三个核心角色:Proposer(提议者)、Acceptor(接受者)和 Learner(学习者)。在我们的实现中,每个 Orchestrator 节点都同时扮演这三个角色。

首先是消息的定义。一个常见的错误是消息定义不清晰,导致后续逻辑混乱。

# paxos_types.py
import time
from typing import NamedTuple, Any, Optional

# 提案编号,必须是全局唯一且单调递增的
# 在实践中,通常由 (round_number, server_id) 构成,以确保唯一性
ProposalID = NamedTuple('ProposalID', [('number', int), ('uid', str)])

class PrepareMessage(NamedTuple):
    """Phase 1a: Proposer 发送给 Acceptors 的准备请求"""
    proposal_id: ProposalID

class PromiseMessage(NamedTuple):
    """Phase 1b: Acceptor 对 Proposer 的回应"""
    proposal_id: ProposalID
    # 如果之前接受过提案,需要附带上一个接受的提案ID和值
    last_accepted_id: Optional[ProposalID]
    last_accepted_value: Optional[Any]

class ProposeMessage(NamedTuple):
    """Phase 2a: Proposer 收到多数派 Promise 后,发送接受请求"""
    proposal_id: ProposalID
    value: Any

class AcceptedMessage(NamedTuple):
    """Phase 2b: Acceptor 接受提案后的回应"""
    proposal_id: ProposalID
    value: Any

2. Acceptor 的实现

Acceptor 是 Paxos 算法的“内存”。它必须持久化两个关键状态:min_proposal_id(它承诺过的最低提案编号)和 accepted_proposal(它最后接受的提案)。这里的坑在于,这两个状态的更新必须是原子性的。在真实项目中,这意味着需要写入预写日志(WAL)并 fsync 到磁盘后才能发送网络回复。

# acceptor.py
import logging
from typing import Optional, Tuple
from paxos_types import ProposalID, PrepareMessage, PromiseMessage, ProposeMessage, AcceptedMessage

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class Acceptor:
    def __init__(self, server_id: str):
        self.server_id = server_id
        self.logger = logging.getLogger(f"Acceptor-{self.server_id}")
        
        # --- 持久化状态 ---
        # 在真实系统中,这部分需要写入磁盘 WAL
        self.min_proposal_id: Optional[ProposalID] = None
        self.accepted_id: Optional[ProposalID] = None
        self.accepted_value: Optional[any] = None
        # ---------------------
        
        self.logger.info("Acceptor initialized.")

    def handle_prepare(self, message: PrepareMessage) -> PromiseMessage:
        """处理来自 Proposer 的 Prepare 请求"""
        self.logger.info(f"Handling Prepare for proposal {message.proposal_id}")
        
        if self.min_proposal_id is None or message.proposal_id >= self.min_proposal_id:
            # 承诺不再接受任何编号小于此提案的请求
            self.min_proposal_id = message.proposal_id
            # 这里需要持久化 self.min_proposal_id
            self.logger.info(f"Promised proposal {message.proposal_id}. Responding with last accepted: {self.accepted_id}")
            return PromiseMessage(
                proposal_id=message.proposal_id,
                last_accepted_id=self.accepted_id,
                last_accepted_value=self.accepted_value
            )
        else:
            # 拒绝,因为已经承诺了一个更高的提案编号
            # 在 Paxos 中,这通常以不回复或返回一个 Nack (Negative Acknowledgement) 来表示
            self.logger.warning(f"Rejected Prepare for {message.proposal_id}, already promised {self.min_proposal_id}")
            # 返回一个空的PromiseMessage或特定的Nack消息
            return None 

    def handle_propose(self, message: ProposeMessage) -> Optional[AcceptedMessage]:
        """处理来自 Proposer 的 Propose 请求"""
        self.logger.info(f"Handling Propose for {message.proposal_id} with value '{message.value}'")
        
        # 只有当提案编号大于或等于我们承诺的最低编号时,才接受
        if self.min_proposal_id is None or message.proposal_id >= self.min_proposal_id:
            self.min_proposal_id = message.proposal_id
            self.accepted_id = message.proposal_id
            self.accepted_value = message.value
            # 这里需要原子性地持久化这三个状态
            self.logger.info(f"Accepted proposal {message.proposal_id} with value '{message.value}'.")
            return AcceptedMessage(proposal_id=message.proposal_id, value=message.value)
        else:
            self.logger.warning(f"Rejected Propose for {message.proposal_id}, already promised {self.min_proposal_id}")
            return None

3. Proposer 的实现

Proposer 的逻辑要复杂得多,因为它需要管理超时、重试,并处理来自不同 Acceptor 的多种响应。为了简化,我们只展示核心流程。

一个关键点是提案编号的生成。它必须是全局唯一且单调递增的。一个常见的做法是 (timestamp, server_id),但在时钟可能回拨的分布式系统中,更好的做法是 (local_monotonic_counter, server_id)

# proposer.py
import uuid
import time
import threading
from collections import defaultdict
from typing import List, Any, Optional

from paxos_types import ProposalID, PrepareMessage, PromiseMessage, ProposeMessage, AcceptedMessage

class Proposer:
    def __init__(self, server_id: str, acceptor_ids: List[str], network_broker):
        self.server_id = server_id
        self.acceptor_ids = acceptor_ids
        self.quorum_size = len(acceptor_ids) // 2 + 1
        self.network_broker = network_broker
        self.proposal_number = 0
        self.logger = logging.getLogger(f"Proposer-{self.server_id}")

    def _get_next_proposal_id(self) -> ProposalID:
        # 在真实系统中,需要确保单调递增,可以持久化 proposal_number
        self.proposal_number += 1
        return ProposalID(number=self.proposal_number, uid=self.server_id)

    def propose(self, value: Any) -> Optional[Any]:
        """发起一次 Paxos 提议"""
        proposal_id = self._get_next_proposal_id()
        self.logger.info(f"Starting proposal {proposal_id} for value '{value}'")

        # --- Phase 1: Prepare ---
        prepare_message = PrepareMessage(proposal_id=proposal_id)
        
        # 模拟网络广播
        self.network_broker.broadcast(self.acceptor_ids, prepare_message)
        
        # 等待并收集 Promises
        # 在真实系统中,这里会有超时和重试逻辑
        time.sleep(0.1) # 模拟网络延迟
        promises = self.network_broker.collect_responses(self.server_id, PromiseMessage)

        if len(promises) < self.quorum_size:
            self.logger.warning(f"Proposal {proposal_id} failed: did not receive quorum of promises. Got {len(promises)}.")
            return None

        self.logger.info(f"Proposal {proposal_id}: received {len(promises)} promises (quorum is {self.quorum_size}).")

        # 检查收到的 Promises。如果任何一个 Acceptor 已经接受了某个提案,
        # 我们必须提议那个提案的值。这是 Paxos 安全性的关键。
        highest_accepted_id = None
        value_to_propose = value
        
        for p in promises:
            if p.last_accepted_id:
                if highest_accepted_id is None or p.last_accepted_id > highest_accepted_id:
                    highest_accepted_id = p.last_accepted_id
                    value_to_propose = p.last_accepted_value
                    self.logger.info(f"Found a previously accepted value '{value_to_propose}' from proposal {highest_accepted_id}. Overriding my value.")

        # --- Phase 2: Propose ---
        propose_message = ProposeMessage(proposal_id=proposal_id, value=value_to_propose)
        self.network_broker.broadcast(self.acceptor_ids, propose_message)
        
        time.sleep(0.1)
        accepts = self.network_broker.collect_responses(self.server_id, AcceptedMessage)

        if len(accepts) >= self.quorum_size:
            self.logger.info(f"Proposal {proposal_id} for value '{value_to_propose}' was accepted by a quorum!")
            # 此时,值已经被选定 (chosen)
            # 广播给 Learners
            self.network_broker.broadcast_to_learners(AcceptedMessage(proposal_id, value_to_propose))
            return value_to_propose
        else:
            self.logger.error(f"Proposal {proposal_id} failed: did not receive quorum of accepts. Got {len(accepts)}.")
            return None

4. 组装为 Multi-Paxos 状态机

上面的代码实现的是 Basic Paxos,一次只为一个值达成共识。要管理一个操作序列,我们需要 Multi-Paxos。其核心思想是:

  1. 首先通过 Paxos 选举一个稳定的 Leader。
  2. 之后,只有 Leader 可以发起提议。
  3. Leader 为每个日志条目(log entry)运行一次 Paxos 实例来确定其值。
  4. 为了优化,Leader 可以跳过 Prepare 阶段,直接对后续的日志条目发起 Propose,只要它的 Leadership 没有被挑战。

将我们的 Orchestrator 节点组合起来,每个节点都运行一个 Acceptor 实例和一个 Proposer 实例。它们通过一个模拟的网络层通信。状态机本身就是一个日志(list),Learner 负责将达成共识的值追加到这个日志中。

# orchestrator_node.py
# (这是一个高度简化的集成示例)

class OrchestratorNode:
    def __init__(self, server_id: str, all_node_ids: List[str], network_broker):
        self.server_id = server_id
        self.state_machine_log = [] # 这就是复制状态机
        self.acceptor = Acceptor(server_id)
        self.proposer = Proposer(server_id, all_node_ids, network_broker)
        self.network_broker = network_broker
        self.network_broker.register(self, server_id)
        self.logger = logging.getLogger(f"Node-{self.server_id}")

    def receive_message(self, message):
        """网络消息入口"""
        if isinstance(message, PrepareMessage):
            promise = self.acceptor.handle_prepare(message)
            if promise:
                self.network_broker.send(message.proposal_id.uid, promise)
        elif isinstance(message, ProposeMessage):
            accepted = self.acceptor.handle_propose(message)
            if accepted:
                 # 回复 Proposer
                self.network_broker.send(message.proposal_id.uid, accepted)
        elif isinstance(message, AcceptedMessage):
            # 作为 Learner 的角色
            self.learn(message)

    def learn(self, message: AcceptedMessage):
        self.logger.info(f"Learned value '{message.value}' for proposal {message.proposal_id}")
        # 这里需要处理乱序和重复的 learned-value
        # 在 Multi-Paxos 中,我们是为特定的 log_index 学习 value
        self.state_machine_log.append(message.value)
        self.apply_state_change(message.value)

    def propose_state_change(self, command: dict):
        # 这里的 command 就是类似 {'op': 'CREATE_COLLECTION', 'name': 'test_col'} 的字典
        self.proposer.propose(command)
        
    def apply_state_change(self, command: dict):
        """将共识结果应用到本地状态"""
        op = command.get('op')
        self.logger.info(f"Applying command to state machine: {op}")
        if op == 'CREATE_COLLECTION':
            # 触发 OpenFaaS 函数去创建 Milvus 集合
            self.logger.info(f"Triggering Milvus operation: CREATE_COLLECTION with params: {command}")
            # ... 调用 Milvus client 或触发一个 OpenFaaS 函数 ...
        elif op == 'LOAD_PARTITION':
            # ... 触发加载数据的函数 ...
            pass

接入 OpenFaaS 和 SQS 工作流

现在,这个 Paxos 集群如何与我们的 Serverless 流水线集成?

  1. 容器化 Orchestrator:
    OrchestratorNode 应用打包成一个 Docker 镜像。部署时,我们会创建 3 个或 5 个副本(Pod),它们通过 Kubernetes 的 Headless Service 互相发现并建立通信。

    # Dockerfile for orchestrator
    FROM python:3.9-slim
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    COPY . .
    # CMD 需要启动 orchestrator 节点,并传入节点ID和集群成员信息
    # e.g., CMD ["python", "main.py", "--id", "node-0", "--peers", "node-0,node-1,node-2"]
    CMD ["python", "main.py"] 
  2. Dispatcher Function:
    这是一个轻量级的 OpenFaaS 函数,由 SQS 消息触发。它的唯一职责是解析 SQS 消息,并向 Paxos 集群的一个节点发起状态变更提议。

    # dispatcher-fn/handler.py
    import os
    import requests
    import json
    
    # Orchestrator 集群的入口,比如一个 K8s Service IP
    ORCHESTRATOR_ENDPOINT = os.environ.get("ORCHESTRATOR_ENDPOINT", "http://orchestrator.default.svc.cluster.local:8080")
    
    def handle(req):
        """
        由 SQS 消息触发,消息体是 req
        """
        try:
            event = json.loads(req)
            # 假设 SQS 消息格式为: {"s3_path": "s3://...", "collection_name": "...", "partition_name": "..."}
            s3_path = event.get("s3_path")
            collection_name = event.get("collection_name")
            
            if not s3_path or not collection_name:
                return {"status": "error", "message": "Missing s3_path or collection_name"}, 400
    
            # 构造状态变更命令
            command = {
                "op": "LOAD_PARTITION",
                "collection": collection_name,
                "partition": event.get("partition_name", "_default"),
                "source_path": s3_path,
                "timestamp": time.time() # 增加时间戳用于幂等性判断
            }
    
            # 向 Orchestrator 提议
            response = requests.post(f"{ORCHESTRATOR_ENDPOINT}/propose", json=command, timeout=5)
            response.raise_for_status()
            
            return {"status": "success", "message": "Proposal submitted"}, 202
    
        except Exception as e:
            # 错误处理和日志记录
            return {"status": "error", "message": str(e)}, 500
  3. Indexer Function:
    这个函数由 Orchestrator 在状态机应用了 LOAD_PARTITION 命令后触发。它负责连接 Milvus,从 S3 下载数据,并执行真正的索引操作。

    # indexer-fn/handler.py
    import os
    from pymilvus import connections, utility, Collection
    # 假设有一个下载 s3 文件的工具函数
    from s3_utils import download_vectors
    
    MILVUS_HOST = os.environ.get("MILVUS_HOST")
    MILVUS_PORT = os.environ.get("MILVUS_PORT")
    
    def handle(req):
        """
        由 Orchestrator 触发, req 包含加载任务的详细信息
        """
        try:
            task_info = json.loads(req)
            collection_name = task_info["collection"]
            s3_path = task_info["source_path"]
    
            # 1. 下载数据
            local_file_path = download_vectors(s3_path)
    
            # 2. 连接 Milvus
            connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
            if not utility.has_collection(collection_name):
                # 异常情况,集合应该已经被 Orchestrator 创建好了
                raise Exception(f"Collection {collection_name} does not exist.")
            
            collection = Collection(collection_name)
            
            # 3. 反序列化数据并插入
            # ... vector processing and insertion logic ...
            # entities = load_vectors_from_file(local_file_path)
            # mr = collection.insert(entities)
            # collection.load()
            
            # 4. 报告成功 (可选,可以回调 Orchestrator 更新任务状态)
            return {"status": "success", "milvus_response": mr.primary_keys}
    
        except Exception as e:
            # 这里的错误处理非常关键。因为共识已经达成,操作必须成功。
            # 所以需要实现重试、死信队列等机制。
            # 返回 500 会让 OpenFaaS 根据配置进行重试。
            return {"status": "error", "message": str(e)}, 500

遗留问题与未来迭代路径

这个自建的 Paxos 控制平面成功解决了状态管理的高可用问题,但它远非生产级。

首先,我们的 Paxos 实现极其简化。它没有处理成员变更(集群节点的增减),没有实现日志压缩(Log Compaction)来防止日志无限增长,也没有一个健壮的 Leader 选举和维持机制。在真实世界中,这些都是必须解决的复杂问题,而 Raft 算法在这些工程化方面提供了更清晰的指导。

其次,数据平面和控制平面的交互容错有待加强。当 Indexer Function 插入 Milvus 失败时会发生什么?状态机已经记录了“加载完成”这一意图。我们需要引入补偿事务或 Saga 模式。例如,Orchestrator 在触发 Indexer 后,应追踪其执行状态,如果多次重试后仍然失败,需要再通过 Paxos 提交一个“加载失败”的状态,并可能触发告警或清理操作。

最后,性能是个问题。为每一个 SQS 消息都跑一次完整的 Paxos 协议往返,开销是巨大的。生产级的系统会使用 Multi-Paxos,由一个稳定的 Leader 批量处理提议,从而显著降低延迟。

尽管如此,通过这个构建过程,我们深刻理解了分布式共识在现代数据流水线中的核心价值:它为原本无状态、混乱的 Serverless 组件,提供了一个可靠、有序和容错的状态核心。


  目录