基于 CircleCI、Kubeflow 与 Grafana 构建 MLOps 管道的 DORA 指标可观测性实践


我们团队的机器学习模型迭代流程正面临一个典型的“黑盒”困境。模型在 Kubeflow Pipelines 上运行,触发机制是手动的,CI/CD 过程仅限于 CircleCI 构建一个 Docker 镜像。我们能交付模型,但我们无法回答关键问题:我们的交付效率有多高?从代码提交到模型在准生产环境验证,到底需要多久?部署频率是每周一次还是每天三次?当一个模型训练任务失败时,它是偶发抖动还是系统性问题?

没有数据,就没有改进。在工程领域,我们早已习惯用 DORA (DevOps Research and Assessment) 指标来衡量软件交付的效能,但在 MLOps 领域,这套实践却鲜有落地。问题在于,MLOps 的流水线比传统软件更复杂,它包含了数据预处理、模型训练、验证、部署等多个长周期、状态复杂的阶段。

我们的目标是:为基于 Kubeflow 的 MLOps 流程建立一套自动化的 DORA 指标度量体系。技术栈选型很明确:CircleCI 作为 CI/CD 执行器,Kubeflow 作为 ML 平台,Prometheus 作为指标存储,Grafana 作为可视化前端。挑战在于如何将这些解耦的系统串联起来,捕获关键事件,并计算出有意义的指标。

架构设计与数据流

核心思路是创建一个中心化的事件转换器,我们将它命名为 cicd-telemetry-service。这个服务负责接收来自不同系统(主要是 CircleCI)的 Webhook 事件,将非结构化的事件数据转换为结构化的 Prometheus 指标,然后暴露给 Prometheus 服务器进行抓取。

graph TD
    subgraph Git Repository
        A[Developer pushes commit] --> B{GitHub};
    end

    subgraph CircleCI Cloud
        B -- Webhook --> C[CircleCI Workflow];
        C -- Job Steps --> D{Build Docker Image};
        C -- Job Steps --> E{Trigger Kubeflow Pipeline};
        C -- Webhook --> F[cicd-telemetry-service];
    end

    subgraph Kubernetes Cluster
        G[Prometheus Operator] -- Scrapes --> F;
        F -- Exposes /metrics --> G;
        E -- Runs on --> H[Kubeflow Pipelines];
        I[Grafana] -- Queries --> J[Prometheus];
        G -- Manages --> J;
    end

    subgraph Users
        K[ML Engineer / SRE] --> I;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px

数据流如下:

  1. 开发者向 GitHub 提交代码。
  2. GitHub 触发 CircleCI 工作流。
  3. CircleCI 工作流执行构建、测试,并最终触发 Kubeflow Pipeline。
  4. 在 CircleCI 工作流的关键节点(如:工作流开始、部署任务开始、部署任务结束),通过 curl 调用 cicd-telemetry-service 的 API,上报事件。
  5. cicd-telemetry-service 将接收到的事件(例如 “deployment_finished”)转换为 Prometheus 指标(例如 mlops_deployment_finished_total 计数器增加)。
  6. Prometheus 定期从 cicd-telemetry-service/metrics 端点拉取指标。
  7. Grafana 使用 PromQL 查询 Prometheus,计算并展示 DORA 指标。

核心组件实现:cicd-telemetry-service

这个服务是整个系统的粘合剂。我们选择 Go 语言实现,因为它静态编译、资源占用小,非常适合作为云原生环境中的一个辅助微服务。

这里的坑在于,我们不希望这个服务与任何特定的 CI/CD 工具强绑定。因此,它接收的应该是一个通用、标准化的事件 JSON 结构,而不是 CircleCI 的原生 Webhook 格式。转换工作在 CircleCI 的 config.yml 中通过 curl 的参数来完成。

main.go - 服务入口与路由

package main

import (
	"log"
	"net/http"
	"os"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// main function to setup router and start server
func main() {
	// Register Prometheus metrics
	prometheus.MustRegister(deploymentCounter)
	prometheus.MustRegister(leadTimeHistogram)
	prometheus.MustRegister(deploymentDurationHistogram)

	// Setup HTTP server
	http.Handle("/metrics", promhttp.Handler())
	http.HandleFunc("/event", eventHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	log.Printf("Starting cicd-telemetry-service on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

metrics.go - 定义 DORA 指标
在真实项目中,指标的定义至关重要。我们需要精确定义每个指标的标签(labels),以便后续在 Grafana 中进行多维度切片分析,例如按服务、按环境、按触发者。

package main

import (
	"github.com/prometheus/client_golang/prometheus"
)

var (
	// deploymentCounter: Tracks the total number of deployments.
	// Used for calculating Deployment Frequency and Change Failure Rate.
	deploymentCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "mlops",
			Name:      "deployment_finished_total",
			Help:      "Total number of finished MLOps deployments.",
		},
		[]string{"service", "env", "status", "commit_sha"}, // status can be "success" or "failure"
	)

	// leadTimeHistogram: Measures the time from first commit to deployment.
	// This is a core DORA metric. The buckets should be tuned based on typical lead times.
	leadTimeHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "mlops",
			Name:      "lead_time_for_change_seconds",
			Help:      "Lead time for change from commit to production deployment.",
			// Buckets in seconds: 15min, 30min, 1h, 2h, 4h, 8h, 24h
			Buckets: []float64{900, 1800, 3600, 7200, 14400, 28800, 86400},
		},
		[]string{"service", "env"},
	)

	// deploymentDurationHistogram: Measures the duration of the deployment job itself.
	// Useful for identifying bottlenecks in the CI/CD process.
	deploymentDurationHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "mlops",
			Name:      "deployment_duration_seconds",
			Help:      "Duration of the MLOps deployment job.",
			// Buckets in seconds: 1min, 5min, 10min, 20min, 30min
			Buckets: []float64{60, 300, 600, 1200, 1800},
		},
		[]string{"service", "env", "status"},
	)
)

handler.go - 事件处理逻辑
这个处理器是核心逻辑所在。它需要解析传入的 JSON,验证数据,然后更新对应的 Prometheus 指标。一个常见的错误是直接在 handler 中处理耗时操作,这里我们只是更新内存中的指标,Prometheus client library 会处理并发安全问题。

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"time"
)

// GenericEvent represents the standardized event structure we expect.
type GenericEvent struct {
	EventType      string `json:"event_type"` // e.g., "deployment_finished"
	Service        string `json:"service"`
	Environment    string `json:"env"`
	Status         string `json:"status"` // "success", "failure"
	CommitSHA      string `json:"commit_sha"`
	CommitTimestamp int64 `json:"commit_timestamp"` // Unix timestamp
	JobDurationSec float64 `json:"job_duration_sec"`
}

// eventHandler parses incoming events and updates metrics.
func eventHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is accepted", http.StatusMethodNotAllowed)
		return
	}

	var event GenericEvent
	decoder := json.NewDecoder(r.Body)
	if err := decoder.Decode(&event); err != nil {
		log.Printf("Error decoding event payload: %v", err)
		http.Error(w, "Invalid payload", http.StatusBadRequest)
		return
	}

	// Basic validation
	if event.Service == "" || event.Environment == "" || event.Status == "" {
		log.Printf("Missing required fields in event: %+v", event)
		http.Error(w, "Missing required fields: service, env, status", http.StatusBadRequest)
		return
	}
	
	log.Printf("Received event: %+v", event)

	// Process the event based on its type
	switch event.EventType {
	case "deployment_finished":
		// Increment deployment counter
		deploymentCounter.WithLabelValues(event.Service, event.Environment, event.Status, event.CommitSHA).Inc()

		// Record deployment duration if available
		if event.JobDurationSec > 0 {
			deploymentDurationHistogram.WithLabelValues(event.Service, event.Environment, event.Status).Observe(event.JobDurationSec)
		}

		// Record lead time for change if this was a successful deployment and commit timestamp is provided
		if event.Status == "success" && event.CommitTimestamp > 0 {
			commitTime := time.Unix(event.CommitTimestamp, 0)
			leadTime := time.Since(commitTime).Seconds()
			leadTimeHistogram.WithLabelValues(event.Service, event.Environment).Observe(leadTime)
		}

	default:
		log.Printf("Unknown event type: %s", event.EventType)
		// We don't return an error here, just log it. The service should be resilient to unknown event types.
	}

	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Event processed"))
}

部署到 Kubernetes

服务的部署是标准的 Kubernetes 操作。关键是创建 Service 以暴露端口,以及创建 ServiceMonitor (如果使用 Prometheus Operator) 或在 Deployment 中添加 annotations 让 Prometheus 能够发现它。

deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cicd-telemetry-service
  namespace: observability
  labels:
    app: cicd-telemetry-service
spec:
  replicas: 2 # Production should have at least 2 replicas for HA
  selector:
    matchLabels:
      app: cicd-telemetry-service
  template:
    metadata:
      labels:
        app: cicd-telemetry-service
      annotations:
        prometheus.io/scrape: 'true'
        prometheus.io/port: '8080'
        prometheus.io/path: '/metrics'
    spec:
      containers:
      - name: server
        image: your-repo/cicd-telemetry-service:v1.0.2
        ports:
        - containerPort: 8080
          name: http
        resources:
          requests:
            cpu: 50m
            memory: 64Mi
          limits:
            cpu: 100m
            memory: 128Mi
        env:
        - name: PORT
          value: "8080"
---
apiVersion: v1
kind: Service
metadata:
  name: cicd-telemetry-service
  namespace: observability
spec:
  selector:
    app: cicd-telemetry-service
  ports:
  - port: 80
    targetPort: 8080
    protocol: TCP
    name: http

CircleCI 工作流配置

这是将理论付诸实践的地方。我们需要修改 .circleci/config.yml,在流程的关键步骤中插入对 cicd-telemetry-service 的调用。

一个常见的错误是在每个 job 结束时都发送事件。这会导致数据冗余且难以分析。正确的做法是在工作流的逻辑终点,也就是代表一次“部署”的那个 job 完成后,发送一个聚合了所有相关信息的 deployment_finished 事件。

version: 2.1

orbs:
  kfp: kfc-ci/kubeflow-[email protected]

commands:
  # Command to notify our telemetry service
  notify_telemetry:
    parameters:
      event_type:
        type: string
      status:
        type: string
    steps:
      - run:
          name: Notify Telemetry Service
          when: always # Run this step even if previous steps fail
          command: |
            # Fetch the timestamp of the commit that triggered this workflow
            COMMIT_TIMESTAMP=$(git log -1 --pretty=%ct ${CIRCLE_SHA1})
            
            # Use CIRCLE_JOB environment variable to determine job status for failure reporting
            JOB_STATUS="<< parameters.status >>"
            if [ "$JOB_STATUS" == "failure" ] && [ "$CIRCLE_JOB" != "deploy_model_to_staging" ]; then
              # This isn't the final deployment job, so we don't report failure yet
              exit 0
            fi

            # Calculate job duration if available
            # Note: This is a simplified duration. For precise metrics, you might need CircleCI API.
            JOB_DURATION=$(expr $(date +%s) - ${CIRCLE_BUILD_START_TIME:-$(date +%s)})

            # Construct JSON payload
            JSON_PAYLOAD=$(cat <<EOF
            {
              "event_type": "<< parameters.event_type >>",
              "service": "model-forecasting-A",
              "env": "staging",
              "status": "${JOB_STATUS}",
              "commit_sha": "${CIRCLE_SHA1}",
              "commit_timestamp": ${COMMIT_TIMESTAMP},
              "job_duration_sec": ${JOB_DURATION}
            }
            EOF
            )

            # Send the event. TELEMETRY_SERVICE_URL is a CircleCI context variable.
            curl -X POST -H "Content-Type: application/json" \
                 -d "${JSON_PAYLOAD}" \
                 ${TELEMETRY_SERVICE_URL}/event

workflows:
  build_and_deploy_model:
    jobs:
      - build_and_push_image:
          filters:
            branches:
              only:
                - main
      - deploy_model_to_staging:
          requires:
            - build_and_push_image
          context:
            - kubernetes-creds
            - telemetry-service-secrets

jobs:
  build_and_push_image:
    docker:
      - image: cimg/python:3.9-build
    steps:
      - checkout
      # ... steps to build docker image and push to ECR/GCR ...
      - run: echo "Image build complete."

  deploy_model_to_staging:
    docker:
      - image: cimg/base:2022.01
    environment:
      # These should come from context or project environment variables
      KUBEFLOW_ENDPOINT: "https://<your-kubeflow-url>"
    steps:
      - checkout
      - kfp/install-kfp-sdk
      - run:
          name: Compile and upload Kubeflow pipeline
          command: |
            # This is where your actual pipeline logic resides
            # It compiles a Python DSL file and triggers a run on Kubeflow
            python3 pipelines/my_model_pipeline.py --endpoint $KUBEFLOW_ENDPOINT --commit_sha $CIRCLE_SHA1
      # This is the crucial part: notify telemetry on success or failure
      - notify_telemetry:
          event_type: "deployment_finished"
          status: "success"
      - notify_telemetry:
          on_fail: true
          event_type: "deployment_finished"
          status: "failure"

这个配置的精髓在于 notify_telemetry 命令和 on_fail 的使用。无论 deploy_model_to_staging 作业成功还是失败,我们都能捕获到 deployment_finished 事件,只是 status 标签不同。这对于计算“变更失败率”至关重要。

Grafana 仪表盘与 PromQL 查询

最后一步是可视化。我们创建一个新的 Grafana 仪表盘,添加四个面板,分别对应 DORA 的四个核心指标。

1. 部署频率 (Deployment Frequency)

  • 类型: Stat / Time series
  • PromQL: sum(rate(mlops_deployment_finished_total{service="$service", env="$env", status="success"}[$__rate_interval]))
  • 解读: 这个查询计算了在选定时间范围内,成功的部署速率。我们使用 rate() 函数来处理计数器重置问题,并获得一个平滑的、标准化的“每秒部署次数”值,Grafana 可以轻松地将其转换为“每天/每周部署次数”。

2. 变更前置时间 (Lead Time for Changes)

  • 类型: Histogram / Heatmap
  • PromQL: histogram_quantile(0.95, sum(rate(mlops_lead_time_for_change_seconds_bucket{service="$service", env="$env"}[5m])) by (le))
  • 解读: 这是最复杂的指标之一。我们使用 histogram_quantile 函数计算直方图数据的 P95 百分位数。这意味着“95%的变更,其前置时间都小于这个值”。监控 P95 比监控平均值更能反映用户体验的“最差情况”,避免被少数极快或极慢的部署所误导。

3. 变更失败率 (Change Failure Rate)

  • 类型: Gauge / Stat
  • PromQL:
    (
      sum(increase(mlops_deployment_finished_total{service="$service", env="$env", status="failure"}[$__range]))
      /
      sum(increase(mlops_deployment_finished_total{service="$service", env="$env"}[$__range]))
    ) * 100
  • 解读: 查询非常直观:失败的部署增量除以总的部署增量。我们使用 increase() 而不是 rate(),因为它能更好地处理在时间窗口内发生的离散事件计数。

4. 服务恢复时间 (Time to Restore Service)

  • 注意: 这个指标是最难自动度量的,因为它需要定义什么是“服务中断”以及什么是“恢复”。在我们的 MLOps 场景中,可以简化为“从一次失败的部署到下一次成功的部署之间的时间”。这需要更复杂的 PromQL,通常涉及到 timestamp() 和向量匹配,或者通过告警系统(如 Alertmanager)记录事件来计算。一个简化的初版实现可以是手动记录,或者通过分析部署事件序列来近似。

当前方案的局限性与未来迭代

这套系统为我们的 MLOps 流程提供了前所未有的可见性,但它并非完美。

首先,cicd-telemetry-service 是一个自研组件,虽然简单,但仍有维护成本,并且是系统中的一个潜在单点故障。在生产环境中,需要保证其高可用性。

其次,“变更前置时间”的定义目前是从“代码提交”到“部署任务结束”。在 MLOps 中,一个更精确的度量可能应该是从“实验想法产生”或“Jira ticket 创建”到“模型产生业务价值”。这需要与项目管理工具集成,已经超出了纯粹的 CI/CD 范畴。

最后,我们对“失败”的定义还比较粗浅,仅限于 CircleCI 作业的成功或失败。一个部署成功的模型,可能会因为性能不佳或预测结果有偏差而在业务上是“失败”的。真正的 MLOps 可观测性闭环,需要将模型在线上环境的性能监控数据(例如预测准确率、延迟)回传,并与部署事件关联。这需要引入模型监控系统(如 Seldon Core Analytics, Arize AI)并将它们的告警作为“变更失败”的另一个来源。


  目录