构建从 GitLab CI/CD 到 OpenFaaS 的 Dask 计算函数自动化部署管道


团队的数据科学家交付了一批用于金融衍生品定价的蒙特卡洛模拟脚本,核心依赖 Dask 进行并行计算。在他们的 Jupyter环境中,这些脚本表现优异。但业务需求是将这些计算能力封装成按需调用的服务接口。最初的尝试是一场灾难:手动打包充满了依赖冲突的 Docker 镜像,镜像体积动辄超过 5GB,构建过程耗时近半小时,且每次部署到我们 Kubernetes 集群上的 OpenFaaS 网关时,都像是一次赌博。我们需要一个稳定、高效、自动化的流程,将代码变更可靠地转化为生产环境中的可用函数。

我们的目标很明确:每一次 git push 到主分支,都应该自动触发一个流水线,该流水线负责构建、测试并部署一个轻量级、高性能的 Docker 镜像到 OpenFaaS 平台。这个函数将作为客户端,将计算任务提交到一个预先存在的、独立的 Dask 集群。

第一步:优化镜像构建,根治臃肿问题

问题的根源在于 Python 的依赖管理和 Docker 的分层机制。一个简单的 pip install -r requirements.txt 会下载编译工具链、源码包以及大量临时文件,这些都会被打包进最终的镜像层。在真实项目中,这种做法是不可接受的。

解决方案是采用多阶段构建(Multi-stage build)。第一阶段,我们使用一个包含完整构建工具链的“构建器”镜像,将所有 Python 依赖编译并打包成 wheels。第二阶段,我们使用一个干净的、轻量的基础镜像,仅从第一阶段复制预先构建好的 wheels 进行安装。这种方式不仅能大幅度减小镜像体积,还能充分利用 Docker 的缓存机制,只要 requirements.txt 不变,构建器阶段就可以被完全缓存。

这是我们为 Dask 计算函数定制的 Dockerfile

# Stage 1: The Builder
# 使用一个包含完整构建工具链的镜像
FROM python:3.9-slim as builder

# 设置工作目录
WORKDIR /install

# 安装必要的系统依赖,用于编译某些Python库
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 创建一个 wheelhouse,将所有依赖编译并打包成 wheel 格式
# 这一步是关键,它预编译了所有内容,避免在最终镜像中进行编译
# --no-cache-dir 避免缓存污染
# --prefer-binary 优先使用二进制包,但对没有的会进行编译
RUN pip install --no-cache-dir wheel \
    && pip wheel --no-cache-dir --wheel-dir=/install/wheels -r requirements.txt

# Stage 2: The Final Image
# 使用一个非常轻量的基础镜像
FROM python:3.9-slim

WORKDIR /app

# 从构建器阶段复制预编译好的 wheels
COPY --from=builder /install/wheels /wheels

# 复制项目代码
COPY . .

# 从本地的 wheelhouse 安装依赖,这一步会非常快,且不会产生编译垃圾
# --no-index 禁止访问外部PyPI,确保只使用我们提供的wheels
# --find-links 指定本地的wheels目录
RUN pip install --no-cache-dir --no-index --find-links=/wheels -r requirements.txt \
    && rm -rf /wheels

# OpenFaaS 通过 watchdog 调用这个脚本来处理请求
CMD ["fwatchdog"]

这个 Dockerfile 将我们的镜像体积从 5.2GB 压缩到了约 800MB,CI/CD 流水线的构建时间也从 25 分钟缩短到了 5 分钟以内(在有缓存的情况下)。

第二步:定义 OpenFaaS 函数与 Dask 的交互模式

一个常见的错误是试图在 OpenFaaS 函数的容器内启动一个临时的 Dask 集群。这种模式对于短时、小规模计算尚可,但对于我们这种需要稳定、可共享计算资源的场景来说是低效且不稳定的。

正确的架构模式是“函数即客户端”。OpenFaaS 函数本身应保持轻量,它的唯一职责是接收请求、构造计算任务,然后将其提交给一个长期运行的、独立的 Dask 集群。函数通过环境变量获取 Dask Scheduler 的地址,从而实现与计算集群的解耦。

graph TD
    subgraph GitLab CI/CD Pipeline
        A[Git Push] --> B{Build & Push Image};
        B --> C{Deploy to OpenFaaS};
    end

    subgraph Kubernetes Cluster
        D[GitLab Runner] -- executes pipeline --> B;

        subgraph OpenFaaS
            E[API Gateway] -- routes request --> F[Function: dask-monte-carlo];
        end

        subgraph Dask Cluster
            G[Dask Scheduler]
            H1[Dask Worker 1]
            H2[Dask Worker 2]
            H3[Dask Worker N]
            G --- H1;
            G --- H2;
            G --- H3;
        end

        F -- submits job via TCP --> G;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#ccf,stroke:#333,stroke-width:2px

以下是我们的蒙特卡洛模拟函数 dask-monte-carlo/handler.py 的核心实现。注意其中的错误处理和日志记录,这在生产环境中至关重要。

# dask-monte-carlo/handler.py

import os
import json
import logging
import numpy as np
import pandas as pd
from dask.distributed import Client, TimeoutError as DaskTimeoutError

# 配置结构化日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 从环境变量获取 Dask Scheduler 地址
DASK_SCHEDULER_ADDRESS = os.environ.get("DASK_SCHEDULER_ADDRESS")

def simulate_portfolio(num_simulations: int, num_days: int, initial_value: float, mu: float, sigma: float):
    """
    执行蒙特卡洛模拟的核心计算逻辑。
    这个函数本身是纯计算,可以被 Dask worker 并行执行。
    """
    dt = 1 / 252  # 假设一年有252个交易日
    # 生成随机价格路径
    price_paths = np.zeros((num_days + 1, num_simulations))
    price_paths[0] = initial_value
    
    for t in range(1, num_days + 1):
        random_shocks = np.random.standard_normal(num_simulations)
        price_paths[t] = price_paths[t - 1] * np.exp((mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * random_shocks)
        
    return price_paths[-1] # 返回最终的资产价值分布

def handle(req):
    """
    OpenFaaS 的入口函数。
    负责解析请求,连接 Dask 集群,分发任务,并返回结果。
    """
    if not DASK_SCHEDULER_ADDRESS:
        err_msg = "DASK_SCHEDULER_ADDRESS environment variable not set."
        logger.error(err_msg)
        return json.dumps({"status": "error", "message": err_msg}), 500

    try:
        req_data = json.loads(req)
        logger.info(f"Received request with parameters: {req_data}")
        
        # 从请求中解析参数,并提供默认值
        num_simulations = int(req_data.get("simulations", 1_000_000))
        num_days = int(req_data.get("days", 252))
        initial_value = float(req_data.get("initial_value", 1000.0))
        mu = float(req_data.get("mu", 0.05)) #预期年化回报率
        sigma = float(req_data.get("sigma", 0.2)) #年化波动率
        
    except (json.JSONDecodeError, ValueError, TypeError) as e:
        err_msg = f"Invalid request body or parameters: {e}"
        logger.warning(err_msg)
        return json.dumps({"status": "error", "message": err_msg}), 400

    try:
        # 连接 Dask 集群,设置合理的超时
        logger.info(f"Connecting to Dask scheduler at {DASK_SCHEDULER_ADDRESS}...")
        with Client(DASK_SCHEDULER_ADDRESS, timeout="10s") as client:
            logger.info("Successfully connected to Dask scheduler.")
            
            # 将计算任务提交给 Dask 集群
            # dask.delayed 包装了我们的函数,使其可以懒加载
            # 这使得 Dask 可以构建任务图,然后高效执行
            lazy_results = client.submit(
                simulate_portfolio, 
                num_simulations, 
                num_days, 
                initial_value, 
                mu, 
                sigma
            )
            
            # 阻塞等待计算结果
            final_values = lazy_results.result()
            
            # 计算风险价值 (VaR)
            var_95 = np.percentile(final_values, 5)
            
            response = {
                "status": "success",
                "parameters": {
                    "simulations": num_simulations,
                    "days": num_days
                },
                "results": {
                    "mean_final_value": np.mean(final_values),
                    "std_dev_final_value": np.std(final_values),
                    "value_at_risk_95": var_95
                }
            }
            logger.info(f"Simulation completed successfully.")
            return json.dumps(response), 200

    except DaskTimeoutError:
        err_msg = f"Connection to Dask scheduler at {DASK_SCHEDULER_ADDRESS} timed out."
        logger.error(err_msg)
        return json.dumps({"status": "error", "message": err_msg}), 503 # Service Unavailable
        
    except Exception as e:
        err_msg = f"An unexpected error occurred during Dask computation: {e}"
        logger.exception(err_msg) # 使用 exception 会记录堆栈信息
        return json.dumps({"status": "error", "message": err_msg}), 500

第三步:编排 GitLab CI/CD 流水线

流水线是整个自动化流程的核心。我们将其划分为几个逻辑阶段:validate(代码检查)、build(构建镜像)、deploy_staging(部署到预发环境)和 deploy_production(手动触发部署到生产环境)。

在 GitLab CI/CD 中,我们需要配置一些关键的变量:

  • CI_REGISTRY_IMAGE: GitLab 内置的容器镜像仓库地址。
  • OPENFAAS_URL: OpenFaaS 网关地址。
  • OPENFAAS_PASSWORD: 用于认证的密码,存储为受保护的 CI/CD 变量。
  • DASK_SCHEDULER_STAGING / DASK_SCHEDULER_PRODUCTION: 指向不同环境的 Dask Scheduler 地址。

这是我们的 .gitlab-ci.yml 文件,它将所有部分串联起来:

# .gitlab-ci.yml

variables:
  # 使用 commit SHA 作为镜像标签,确保唯一性和可追溯性
  IMAGE_TAG: $CI_REGISTRY_IMAGE/dask-monte-carlo:$CI_COMMIT_SHORT_SHA
  # OpenFaaS 函数的配置文件
  STACK_FILE: ./stack.yml

stages:
  - validate
  - build
  - deploy_staging
  - deploy_production

# 使用官方的 faas-cli 镜像来执行 lint 操作
lint:
  stage: validate
  image: openfaas/faas-cli:latest-root
  script:
    - faas-cli lint -f ${STACK_FILE}
  rules:
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"

# 构建 Docker 镜像并推送到 GitLab 容器仓库
build:
  stage: build
  # 使用 Docker-in-Docker 服务来构建镜像
  image: docker:20.10.16
  services:
    - docker:20.10.16-dind
  before_script:
    # 登录到 GitLab 容器仓库
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
  script:
    # 构建镜像,使用我们自定义的 Dockerfile
    - docker build -t ${IMAGE_TAG} -f dask-monte-carlo/Dockerfile ./dask-monte-carlo
    # 推送镜像
    - docker push ${IMAGE_TAG}
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

# 部署到预发环境
deploy_staging:
  stage: deploy_staging
  image: openfaas/faas-cli:latest-root
  script:
    # 通过管道将密码传递给 faas-cli login,避免密码出现在日志中
    - echo -n "$OPENFAAS_STAGING_PASSWORD" | faas-cli login --username admin --password-stdin -g $OPENFAAS_STAGING_URL
    # 部署函数。DASK_SCHEDULER_ADDRESS 是通过 --env 传递的
    # 它会覆盖 stack.yml 中的同名环境变量
    - >
      faas-cli deploy
      -f ${STACK_FILE}
      --image ${IMAGE_TAG}
      --gateway $OPENFAAS_STAGING_URL
      --env DASK_SCHEDULER_ADDRESS=${DASK_SCHEDULER_STAGING}
  environment:
    name: staging
    url: $OPENFAAS_STAGING_URL
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

# 手动触发部署到生产环境
deploy_production:
  stage: deploy_production
  image: openfaas/faas-cli:latest-root
  script:
    - echo -n "$OPENFAAS_PROD_PASSWORD" | faas-cli login --username admin --password-stdin -g $OPENFAAS_PROD_URL
    - >
      faas-cli deploy
      -f ${STACK_FILE}
      --image ${IMAGE_TAG}
      --gateway $OPENFAAS_PROD_URL
      --env DASK_SCHEDULER_ADDRESS=${DASK_SCHEDULER_PRODUCTION}
  environment:
    name: production
    url: $OPENFAAS_PROD_URL
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
      when: manual # 关键:此作业需要手动点击才能运行

第四步:配置 OpenFaaS 函数栈

最后,我们需要一个 stack.yml 文件来描述我们的函数。这个文件被 faas-cli 用来理解函数的元数据、资源需求和构建上下文。

这里的关键点是通过 environment 块来声明函数运行时所需的环境变量,并为它们提供一个默认值(虽然在 CI/CD 流程中会被覆盖)。同时,明确声明 requestslimits 是一个好习惯,可以帮助 Kubernetes 调度器更好地管理资源。

# stack.yml
version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080

functions:
  dask-monte-carlo:
    lang: dockerfile
    handler: ./dask-monte-carlo # 构建上下文目录
    image: ${CI_REGISTRY_IMAGE}/dask-monte-carlo:${CI_COMMIT_SHORT_SHA} # 镜像名称,会被 CI 覆盖
    environment:
      # 提供一个默认或空的调度器地址
      DASK_SCHEDULER_ADDRESS: "tcp://dask-scheduler.dask:8786"
      # OpenFaaS 的写调试日志开关
      write_debug: "false"
    labels:
      com.openfaas.scale.min: "1"
      com.openfaas.scale.max: "10"
      com.openfaas.scale.zero: "true" # 允许缩容到0
      com.openfaas.scale.zero-duration: "10m" # 10分钟无流量则缩容到0
    limits:
      memory: "512Mi"
      cpu: "500m"
    requests:
      memory: "256Mi"
      cpu: "100m"

通过这一整套流程,我们成功地将一个复杂、依赖繁重的 Dask 计算任务,转化成了一个遵循 DevOps 最佳实践、可自动部署和伸缩的 Serverless 函数。数据科学家现在只需要关注 handler.py 中的算法逻辑和 requirements.txt 中的依赖,每一次代码合并都会被安全、可靠地发布到生产环境。

当前方案的局限性与未来迭代方向

尽管此方案解决了我们眼前的核心痛点,但它并非完美。首先,OpenFaaS 函数的冷启动问题对于需要低延迟响应的场景仍然是一个挑战。我们当前的函数镜像虽然经过优化,但启动一个包含 Dask 客户端的 Python 进程仍需数秒。因此,该架构更适用于可接受一定延迟的异步调用或非实时性请求。

其次,Dask 集群本身的管理是独立于此流水线之外的。一个更完整的平台工程方案应该将 Dask 集群的生命周期也纳入 IaC(基础设施即代码)管理,例如使用 Terraform 或 Kubernetes Operator,并通过 GitLab CI/CD 来自动化 Dask 集群的创建、更新和销毁。

最后,我们目前的“函数即客户端”模式,虽然有效隔离了计算和调度,但也意味着函数对外部 Dask 集群有强依赖。未来的探索方向之一是,根据任务的复杂性动态选择执行后端:对于轻量级计算,可以直接在函数容器内使用 Dask 的 LocalCluster;对于重量级计算,则路由到共享的 Dask 集群。这需要更智能的路由逻辑和函数设计,是平台演进的下一个目标。


  目录