以 MongoDB 聚合框架构建服务于 MLOps 的分布式特征工程流水线


一个失控的特征工程环境,是我们团队在项目初期面临的第一个技术难题。数十个Jupyter Notebook散落在不同工程师的机器上,每个Notebook都连接生产MongoDB,执行一些复杂的聚合查询,生成一组特征,然后手动导出CSV喂给模型训练。版本控制是一场灾难,数据血缘完全无法追溯,任何一个上游数据模式的微小变更都可能导致下游模型在无声中失效。我们需要一个系统,一个能将特征提取过程规范化、版本化、自动化,并与我们现有的MongoDB生态无缝集成的MLOps解决方案。

定义问题:混乱与可重复性之间的鸿沟

我们的核心数据资产是存储在MongoDB中的数十亿级用户行为日志,这是一个典型的分片集群。业务需求是基于这些原始日志,为多个机器学习模型(如用户流失预测、LTV评估)计算复杂的聚合特征。例如,“用户过去7天、30天的活跃天数”、“用户过去一个月内最活跃的3个功能模块及其使用次数”等等。

这些任务的本质是分布式聚合计算,这自然让人联想到MapReduce。但问题的关键不在于计算本身,而在于如何管理这个计算过程。一个生产级的特征工程流水线必须满足以下要求:

  1. 版本化:特征计算逻辑(即聚合查询)必须被严格版本化,与模型版本、代码版本对齐。
  2. 可追溯性:每一次特征生成任务的执行都必须有记录。我们需要知道是哪个版本的逻辑、在什么时间、处理了哪个时间范围的原始数据、生成了哪份输出。
  3. 幂等性与原子性:任务可以被安全地重跑,且多次运行结果一致。输出应该是原子性的,避免出现中间状态或部分成功的数据污染下游。
  4. 资源隔离:特征工程任务是计算密集型的,不能严重影响为线上业务提供服务的核心MongoDB集群的性能。
  5. 低运维成本:团队规模不大,我们希望尽可能复用现有技术栈,避免引入新的、复杂的分布式计算框架。

方案A:经典的ETL与外部计算集群(Spark/Hadoop)

这是最常见的架构思路。

graph TD
    subgraph "方案A: 外部计算架构"
        A[MongoDB 主集群] -- 定时导出/CDC --> B[数据湖 S3/HDFS];
        B -- 读取 --> C[Spark/Hadoop 集群];
        C -- 执行MapReduce任务 --> D[处理结果];
        D -- 写回 --> E[特征存储 Feature Store / MongoDB];
        F[MLOps Orchestrator] -- 触发 --> C;
        F -- 记录元数据 --> G[元数据数据库 PostgreSQL/MySQL];
    end

优势分析:

  • 功能强大且通用:Spark拥有丰富的API和库,能够处理几乎任何复杂的转换逻辑,包括引入Python UDFs。
  • 计算与存储分离:计算任务在独立的Spark集群上执行,可以根据需要弹性伸缩,与MongoDB集群物理隔离,互不影响。这是该方案最大的吸引力。
  • 生态成熟:围绕Spark和Hadoop有大量的工具和最佳实践。

劣势分析:

  • 架构复杂性剧增:引入了一整套新的技术栈(Spark、HDFS/S3、可能的YARN/Mesos)。部署、监控、维护这套系统的成本非常高。
  • 数据移动成本:需要将海量数据从MongoDB移动到数据湖。这个ETL过程本身就是一个复杂的工程,涉及网络带宽、存储成本、以及数据同步延迟和一致性问题。
  • 运维负担:即使使用云服务(如AWS EMR),管理一个独立的计算集群仍需要专门的知识和人力投入。对于我们团队而言,这是一个不小的负担。

在真实项目中,数据的移动是性能和成本的瓶颈。一次全量特征计算可能意味着TB级的数据在不同系统间穿梭,这不仅慢,而且昂贵。

方案B:利用MongoDB原生能力的”库内计算”

这个方案的核心思想是:数据在哪里,计算就在哪里发生。我们不移动数据,而是将计算任务推送到数据所在的MongoDB集群中去执行。MongoDB的聚合框架(Aggregation Framework)本质上就是一个功能强大的、在数据库内核中实现的分布式数据处理流水线,其设计思想与MapReduce有异曲同工之妙。

graph TD
    subgraph "方案B: 库内计算架构"
        A[MongoDB 集群]
        A -- 包含 --> B[原始数据分片];
        A -- 包含 --> C[特征数据分片];
        A -- 包含 --> D[MLOps元数据];
        
        E[MLOps Orchestrator] -- 1. 写入执行记录到 D --> D;
        E -- 2. 发起聚合查询 --> A;
        A -- 3. 在集群内部并行执行聚合 --> B;
        B -- `$merge` 原子写入 --> C;
        A -- 4. 更新执行记录状态 --> D;
    end

优势分析:

  • 架构极简:没有额外的大数据组件。整个流程只涉及一个外部的Orchestrator(可以是简单的Python脚本、Airflow DAG或一个轻量级服务)和MongoDB本身。
  • 零数据移动:计算发生在数据节点上。聚合操作会被mongos路由分发到各个分片并行执行,最后合并结果。这消除了网络传输瓶颈和ETL的复杂性。
  • 运维成本低:我们只需要维护好MongoDB集群,这是我们已经具备的能力。
  • 原子性与一致性:利用聚合管道中的$out$merge阶段,可以将结果原子性地写入一个新集合或更新现有集合,天然地保证了任务的原子性和幂等性。

劣势分析:

  • 计算与存储耦合:这是最大的风险。长时间运行的、高CPU消耗的聚合任务可能会抢占数据库资源,影响线上业务的读写性能。
  • 表达能力有限:聚合框架虽然强大,但它不是一个通用的计算引擎。它没有Spark那样丰富的库生态,复杂的、需要自定义代码逻辑(UDFs)的转换会变得非常困难甚至不可能。
  • 资源规划要求高:需要对MongoDB集群的资源进行精细规划,并利用MongoDB的特性(如为分析任务设置专用标签的副本集节点、Read Preference等)来隔离负载。

最终决策与理由

我们选择了方案B。

决策的关键在于对我们具体需求的权衡。我们95%的特征工程任务,本质上都是对JSON文档的分组、过滤、投影和计算,这些恰好是MongoDB聚合框架最擅长的。引入Spark来解决剩下5%的极端复杂场景,其带来的运维成本远大于收益。

计算与存储耦合的风险是真实存在的,但可以通过架构设计来缓解。我们的策略是:

  1. 利用副本集进行读写分离:将计算密集型的聚合任务路由到专门用于分析的secondary节点。这通过在数据库连接字符串中指定readPreference=secondaryreadPreferenceTags来实现。
  2. 调度窗口:在业务低峰期(如凌晨)调度执行大规模的全量特征计算任务。
  3. 精细的资源监控:通过MongoDB Atlas或自建的监控系统,严密监控聚合任务的性能指标(如CPU使用率、执行时间、扫描文档数等),并设置告警。

最终,架构的简洁性和低运维成本成为了决定性因素。我们宁愿在聚合查询的编写上多花一些心思,也不愿引入一个全新的分布式系统。

核心实现:代码与数据模型

整个系统的实现包含三个核心部分:MLOps元数据模型、特征管道定义、以及执行器代码。

1. MLOps元数据模型

我们在MongoDB中创建了两个核心集合来管理整个流程:feature_pipelinespipeline_runs

feature_pipelines 集合 - 定义特征计算逻辑

这个集合存储了所有特征工程管道的定义,并且是版本化的。

// Document in 'feature_pipelines' collection
{
  "_id": ObjectId("653b5a9b1c9d440001a1b1c1"),
  "name": "user_activity_features_v2",
  "version": 2,
  "description": "计算用户7日和30日的活跃天数、会话次数和总交互事件数。V2版本增加了功能偏好计算。",
  "source_collection": "user_events",
  "target_collection": "features_user_activity",
  "pipeline": [
    // MongoDB Aggregation Pipeline Stages as an array of BSON objects
    // This is the core "MapReduce" logic
    // ... pipeline stages here ...
  ],
  "parameters": {
    "end_date": "ISODate() placeholder",
    "days_window": [7, 30]
  },
  "created_at": ISODate("2023-10-27T10:00:00Z"),
  "author": "data_scientist_A"
}
  • nameversion 构成了唯一标识,这是版本化的关键。
  • pipeline 字段直接嵌入了MongoDB聚合管道的阶段数组。这使得计算逻辑本身也成了可被查询和管理的数据。

pipeline_runs 集合 - 追踪每一次执行

每当一个管道被触发执行时,我们都会在这个集合中创建一个文档来记录。

// Document in 'pipeline_runs' collection
{
  "_id": ObjectId("653b5b1e1c9d440001a1b1c2"),
  "pipeline_id": ObjectId("653b5a9b1c9d440001a1b1c1"),
  "pipeline_name": "user_activity_features_v2",
  "pipeline_version": 2,
  "status": "RUNNING", // ENQUEUED, RUNNING, SUCCEEDED, FAILED
  "trigger_params": {
    "end_date": ISODate("2023-10-26T23:59:59Z"),
    "lookback_days": 30
  },
  "execution_stats": {
    "start_time": ISODate("2023-10-27T11:00:00Z"),
    "end_time": null,
    "duration_seconds": null,
    "error_message": null,
    "documents_written": null
  },
  "output_collection": "features_user_activity_run_653b5b1e1c9d440001a1b1c2"
}
  • 这个文档提供了完整的审计日志。status 字段清晰地展示了任务的生命周期。
  • trigger_params 记录了本次运行的具体参数,保证了可复现性。

2. 聚合管道:特征工程的“心脏”

这是实际的计算逻辑。以下是一个生成用户活跃度特征的聚合管道示例,它将被存储在feature_pipelines文档的pipeline字段中。

[
  {
    "$match": {
      "timestamp": {
        "$gte": "ISODate('YYYY-MM-DDTHH:mm:ssZ')", // Parameterized start_date
        "$lt": "ISODate('YYYY-MM-DDTHH:mm:ssZ')"  // Parameterized end_date
      },
      "event_type": { "$in": ["app_launch", "click", "purchase"] }
    }
  },
  {
    "$addFields": {
      "event_date": {
        "$dateToString": { "format": "%Y-%m-%d", "date": "$timestamp" }
      }
    }
  },
  {
    "$group": {
      "_id": {
        "user_id": "$user_id",
        "event_date": "$event_date"
      },
      "session_count": { "$addToSet": "$session_id" },
      "event_count": { "$sum": 1 },
      "module_clicks": {
        "$push": {
          "$cond": [ { "$eq": ["$event_type", "click"] }, "$payload.module_id", null ]
        }
      }
    }
  },
  {
    "$project": {
      "_id": 0,
      "user_id": "$_id.user_id",
      "event_date": { "$toDate": "$_id.event_date" },
      "session_count": { "$size": "$session_count" },
      "event_count": "$event_count",
      "module_clicks": {
        "$filter": { "input": "$module_clicks", "as": "item", "cond": { "$ne": ["$$item", null] } }
      }
    }
  },
  {
    "$group": {
      "_id": "$user_id",
      "daily_stats": {
        "$push": {
          "date": "$event_date",
          "sessions": "$session_count",
          "events": "$event_count",
          "modules": "$module_clicks"
        }
      }
    }
  },
  {
    "$addFields": {
      "features": {
        "active_days_7d": {
          "$size": {
            "$filter": {
              "input": "$daily_stats", "as": "d", "cond": {
                "$gte": [ "$$d.date", "ISODate('YYYY-MM-DDTHH:mm:ssZ')" ] // Parameterized 7_days_ago
              }
            }
          }
        },
        "total_events_30d": {
          "$sum": {
            "$map": {
              "input": {
                "$filter": {
                  "input": "$daily_stats", "as": "d", "cond": {
                    "$gte": [ "$$d.date", "ISODate('YYYY-MM-DDTHH:mm:ssZ')" ] // Parameterized 30_days_ago
                  }
                }
              },
              "as": "d",
              "in": "$$d.events"
            }
          }
        }
        // ... more features can be calculated here
      },
      "updated_at": "ISODate('YYYY-MM-DDTHH:mm:ssZ')" // Parameterized execution_time
    }
  },
  {
      "$project": {
          "_id": 1,
          "features": 1,
          "updated_at": 1
      }
  },
  {
    "$merge": {
      "into": "features_user_activity",
      "on": "_id",
      "whenMatched": "replace",
      "whenNotMatched": "insert"
    }
  }
]

这个管道的最后一步至关重要:

  • $merge: 这是一个原子性的写操作。它会用新计算的特征文档去更新目标集合features_user_activityon: "_id"指定了用户ID为主键。如果用户已存在,则替换旧文档(whenMatched: "replace");如果不存在,则插入新文档(whenNotMatched: "insert")。这完美地实现了幂等性。

3. 执行器(Orchestrator)

这是一个Python脚本,作为整个流程的驱动引擎。它负责读取管道定义、填充参数、执行聚合、并更新运行状态。

import os
import logging
from datetime import datetime, timedelta, timezone
from pymongo import MongoClient, ReadPreference
from pymongo.errors import PyMongoError
from bson import ObjectId

# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017/")
# For analytics, connect to a secondary node if available
MONGO_ANALYTICS_URI = os.environ.get("MONGO_ANALYTICS_URI", MONGO_URI) 
DB_NAME = "mlops_db"

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PipelineExecutor:
    def __init__(self, mongo_uri, analytics_uri, db_name):
        try:
            # Connection for metadata operations (primary)
            self.client = MongoClient(mongo_uri)
            self.db = self.client[db_name]

            # Connection for running heavy aggregations (secondary preferred)
            self.analytics_client = MongoClient(
                analytics_uri,
                read_preference=ReadPreference.SECONDARY_PREFERRED,
                readConcernLevel='majority'
            )
            self.analytics_db = self.analytics_client[db_name]
            
            logging.info("Successfully connected to MongoDB.")
        except PyMongoError as e:
            logging.error(f"Failed to connect to MongoDB: {e}")
            raise

    def _get_pipeline_definition(self, name: str, version: int):
        """Fetches a specific version of a pipeline from the metadata store."""
        pipeline_doc = self.db.feature_pipelines.find_one({"name": name, "version": version})
        if not pipeline_doc:
            raise ValueError(f"Pipeline '{name}' version {version} not found.")
        return pipeline_doc

    def _create_run_record(self, pipeline_doc: dict, params: dict) -> ObjectId:
        """Creates a run record in the pipeline_runs collection."""
        run_doc = {
            "pipeline_id": pipeline_doc["_id"],
            "pipeline_name": pipeline_doc["name"],
            "pipeline_version": pipeline_doc["version"],
            "status": "ENQUEUED",
            "trigger_params": params,
            "execution_stats": {
                "start_time": None,
                "end_time": None,
                "duration_seconds": None,
                "error_message": None,
                "documents_written": None,
            },
        }
        result = self.db.pipeline_runs.insert_one(run_doc)
        logging.info(f"Created run record with ID: {result.inserted_id}")
        return result.inserted_id

    def _update_run_status(self, run_id: ObjectId, status: str, stats: dict = None):
        """Updates the status and stats of a pipeline run."""
        update = {"$set": {"status": status}}
        if stats:
            for key, value in stats.items():
                update["$set"][f"execution_stats.{key}"] = value
        self.db.pipeline_runs.update_one({"_id": run_id}, update)
        logging.info(f"Updated run {run_id} status to {status}")

    def _parameterize_pipeline(self, pipeline: list, params: dict) -> list:
        """
        Recursively replaces placeholder strings in the pipeline with actual values.
        A real implementation might use a more robust templating engine.
        """
        import json
        pipeline_str = json.dumps(pipeline)
        for key, value in params.items():
            placeholder = f"ISODate('{key}')"
            # Format datetime objects correctly for JSON representation
            iso_value = value.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
            pipeline_str = pipeline_str.replace(placeholder, f'{{"$date": "{iso_value}"}}')
        return json.loads(pipeline_str)


    def run_pipeline(self, name: str, version: int, end_date_str: str, lookback_days: int):
        """
        Main execution function.
        """
        run_id = None
        try:
            pipeline_doc = self._get_pipeline_definition(name, version)
            
            # --- Parameterization ---
            end_date = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
            start_date = end_date - timedelta(days=lookback_days)
            params = { "end_date": end_date, "start_date": start_date }
            
            # This is a simplified parameterization for the example.
            # In production, you'd have more dynamic placeholders.
            pipeline_str = str(pipeline_doc["pipeline"])
            pipeline_str = pipeline_str.replace("'YYYY-MM-DDTHH:mm:ssZ'", f"'{start_date.isoformat()}'")
            
            # A more robust way using eval, but be careful with security
            from bson.son import SON
            pipeline_stages = eval(pipeline_str)
            
            run_id = self._create_run_record(pipeline_doc, {"end_date": end_date_str, "lookback_days": lookback_days})
            self._update_run_status(run_id, "RUNNING", {"start_time": datetime.now(timezone.utc)})
            
            start_time = datetime.now(timezone.utc)
            source_collection = self.analytics_db[pipeline_doc["source_collection"]]

            logging.info(f"Executing aggregation pipeline '{name}' v{version} on collection '{pipeline_doc['source_collection']}'...")
            
            # --- The Core Execution ---
            # allowDiskUse is critical for large aggregations that exceed memory limits.
            cursor = source_collection.aggregate(pipeline_stages, allowDiskUse=True)
            
            # The $merge stage does not return documents, so we just need to exhaust the cursor.
            for _ in cursor:
                pass

            end_time = datetime.now(timezone.utc)
            duration = (end_time - start_time).total_seconds()
            
            # In a real scenario, you might get stats from the $merge result if available.
            # For simplicity, we mark it as succeeded here.
            final_stats = {
                "end_time": end_time,
                "duration_seconds": duration,
            }
            self._update_run_status(run_id, "SUCCEEDED", final_stats)
            logging.info(f"Pipeline run {run_id} completed successfully in {duration:.2f} seconds.")

        except (ValueError, PyMongoError) as e:
            logging.error(f"Pipeline run failed: {e}")
            if run_id:
                self._update_run_status(run_id, "FAILED", {"error_message": str(e), "end_time": datetime.now(timezone.utc)})
        except Exception as e:
            # Catch-all for unexpected errors
            logging.error(f"An unexpected error occurred: {e}", exc_info=True)
            if run_id:
                self._update_run_status(run_id, "FAILED", {"error_message": "An unexpected system error occurred.", "end_time": datetime.now(timezone.utc)})


if __name__ == '__main__':
    # --- Unit Test / Invocation Example ---
    # In a real system, this would be triggered by a scheduler like Airflow, or a CI/CD pipeline.
    executor = PipelineExecutor(MONGO_URI, MONGO_ANALYTICS_URI, DB_NAME)
    
    # Simulate a daily run for a specific pipeline version
    executor.run_pipeline(
        name="user_activity_features_v2",
        version=2,
        end_date_str="2023-10-26T23:59:59Z",
        lookback_days=30
    )

这份执行器代码考虑了生产环境的几个要点:

  • 配置分离:数据库连接字符串通过环境变量传入。
  • 职责分离的连接:使用两个MongoClient实例,一个用于元数据读写(应连接到主节点),一个用于分析查询(优先连接到从节点)。
  • 详尽的日志:记录了流程的关键步骤。
  • 错误处理:通过try...except块捕获预期的和意外的异常,并更新pipeline_runs集合中的状态,这对于问题排查至关重要。
  • 健壮性allowDiskUse=True是处理大数据量聚合的必备选项,它允许MongoDB在聚合操作超出内存限制时使用磁盘临时文件。

架构的扩展性与局限性

这个架构虽然简洁高效,但并非万能。客观地认识其边界是做出正确技术选型的前提。

扩展路径:

  1. 与调度系统集成:当前的Python执行器可以被轻松地封装成一个Airflow Operator或Prefect Task,从而融入更复杂的DAG(有向无环图)工作流中。
  2. 构建特征目录feature_pipelines集合中的元数据是构建一个完整Feature Store目录的基础。可以开发一个简单的UI来展示所有可用的特征、它们的定义、版本历史和源数据。
  3. 增量计算:对于某些类型的特征,可以通过修改聚合管道和执行器逻辑来实现增量计算,而不是每次都全量回溯。这需要更精巧的状态管理。

固有局限性:

  1. 计算表达力:如前所述,所有计算逻辑必须能用MongoDB聚合框架的语法来表达。如果你的特征工程需要调用外部Python库(如scipy, gensim)进行复杂计算,或者需要执行复杂的图算法,那么这个方案就不适用了。这时,方案A中的Spark是更合适的选择。
  2. 资源争用风险:尽管可以通过路由到从节点来缓解,但计算负载终究是在数据库集群内部。如果一个设计拙劣的聚合查询(例如,没有有效利用索引导致全表扫描)被执行,它仍然可能对整个集群的稳定性造成冲击。这要求团队成员对MongoDB的查询优化有深刻的理解。
  3. 跨源数据整合:此架构最适用于数据源单一(或已全部汇集于MongoDB)的场景。如果特征工程需要关联来自PostgreSQL、S3等多个外部数据源的数据,那么将数据统一ETL到数据湖,再由Spark进行处理的方案A会更加合理。

这个基于MongoDB的“库内计算”方案,本质上是在通用性、性能和运维成本之间做出的一个权衡。它放弃了Spark的极致通用性,换来了架构的极度简化和与现有技术栈的深度整合,对于以文档型数据为核心、且聚合逻辑标准化的业务场景,这是一个极具性价比和工程效率的选择。


  目录