我们团队的机器学习模型迭代流程正面临一个典型的“黑盒”困境。模型在 Kubeflow Pipelines 上运行,触发机制是手动的,CI/CD 过程仅限于 CircleCI 构建一个 Docker 镜像。我们能交付模型,但我们无法回答关键问题:我们的交付效率有多高?从代码提交到模型在准生产环境验证,到底需要多久?部署频率是每周一次还是每天三次?当一个模型训练任务失败时,它是偶发抖动还是系统性问题?
没有数据,就没有改进。在工程领域,我们早已习惯用 DORA (DevOps Research and Assessment) 指标来衡量软件交付的效能,但在 MLOps 领域,这套实践却鲜有落地。问题在于,MLOps 的流水线比传统软件更复杂,它包含了数据预处理、模型训练、验证、部署等多个长周期、状态复杂的阶段。
我们的目标是:为基于 Kubeflow 的 MLOps 流程建立一套自动化的 DORA 指标度量体系。技术栈选型很明确:CircleCI 作为 CI/CD 执行器,Kubeflow 作为 ML 平台,Prometheus 作为指标存储,Grafana 作为可视化前端。挑战在于如何将这些解耦的系统串联起来,捕获关键事件,并计算出有意义的指标。
架构设计与数据流
核心思路是创建一个中心化的事件转换器,我们将它命名为 cicd-telemetry-service
。这个服务负责接收来自不同系统(主要是 CircleCI)的 Webhook 事件,将非结构化的事件数据转换为结构化的 Prometheus 指标,然后暴露给 Prometheus 服务器进行抓取。
graph TD subgraph Git Repository A[Developer pushes commit] --> B{GitHub}; end subgraph CircleCI Cloud B -- Webhook --> C[CircleCI Workflow]; C -- Job Steps --> D{Build Docker Image}; C -- Job Steps --> E{Trigger Kubeflow Pipeline}; C -- Webhook --> F[cicd-telemetry-service]; end subgraph Kubernetes Cluster G[Prometheus Operator] -- Scrapes --> F; F -- Exposes /metrics --> G; E -- Runs on --> H[Kubeflow Pipelines]; I[Grafana] -- Queries --> J[Prometheus]; G -- Manages --> J; end subgraph Users K[ML Engineer / SRE] --> I; end style F fill:#f9f,stroke:#333,stroke-width:2px
数据流如下:
- 开发者向 GitHub 提交代码。
- GitHub 触发 CircleCI 工作流。
- CircleCI 工作流执行构建、测试,并最终触发 Kubeflow Pipeline。
- 在 CircleCI 工作流的关键节点(如:工作流开始、部署任务开始、部署任务结束),通过
curl
调用cicd-telemetry-service
的 API,上报事件。 -
cicd-telemetry-service
将接收到的事件(例如 “deployment_finished”)转换为 Prometheus 指标(例如mlops_deployment_finished_total
计数器增加)。 - Prometheus 定期从
cicd-telemetry-service
的/metrics
端点拉取指标。 - Grafana 使用 PromQL 查询 Prometheus,计算并展示 DORA 指标。
核心组件实现:cicd-telemetry-service
这个服务是整个系统的粘合剂。我们选择 Go 语言实现,因为它静态编译、资源占用小,非常适合作为云原生环境中的一个辅助微服务。
这里的坑在于,我们不希望这个服务与任何特定的 CI/CD 工具强绑定。因此,它接收的应该是一个通用、标准化的事件 JSON 结构,而不是 CircleCI 的原生 Webhook 格式。转换工作在 CircleCI 的 config.yml
中通过 curl
的参数来完成。
main.go
- 服务入口与路由
package main
import (
"log"
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// main function to setup router and start server
func main() {
// Register Prometheus metrics
prometheus.MustRegister(deploymentCounter)
prometheus.MustRegister(leadTimeHistogram)
prometheus.MustRegister(deploymentDurationHistogram)
// Setup HTTP server
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/event", eventHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Printf("Starting cicd-telemetry-service on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
metrics.go
- 定义 DORA 指标
在真实项目中,指标的定义至关重要。我们需要精确定义每个指标的标签(labels),以便后续在 Grafana 中进行多维度切片分析,例如按服务、按环境、按触发者。
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
// deploymentCounter: Tracks the total number of deployments.
// Used for calculating Deployment Frequency and Change Failure Rate.
deploymentCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "mlops",
Name: "deployment_finished_total",
Help: "Total number of finished MLOps deployments.",
},
[]string{"service", "env", "status", "commit_sha"}, // status can be "success" or "failure"
)
// leadTimeHistogram: Measures the time from first commit to deployment.
// This is a core DORA metric. The buckets should be tuned based on typical lead times.
leadTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "mlops",
Name: "lead_time_for_change_seconds",
Help: "Lead time for change from commit to production deployment.",
// Buckets in seconds: 15min, 30min, 1h, 2h, 4h, 8h, 24h
Buckets: []float64{900, 1800, 3600, 7200, 14400, 28800, 86400},
},
[]string{"service", "env"},
)
// deploymentDurationHistogram: Measures the duration of the deployment job itself.
// Useful for identifying bottlenecks in the CI/CD process.
deploymentDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "mlops",
Name: "deployment_duration_seconds",
Help: "Duration of the MLOps deployment job.",
// Buckets in seconds: 1min, 5min, 10min, 20min, 30min
Buckets: []float64{60, 300, 600, 1200, 1800},
},
[]string{"service", "env", "status"},
)
)
handler.go
- 事件处理逻辑
这个处理器是核心逻辑所在。它需要解析传入的 JSON,验证数据,然后更新对应的 Prometheus 指标。一个常见的错误是直接在 handler 中处理耗时操作,这里我们只是更新内存中的指标,Prometheus client library 会处理并发安全问题。
package main
import (
"encoding/json"
"log"
"net/http"
"time"
)
// GenericEvent represents the standardized event structure we expect.
type GenericEvent struct {
EventType string `json:"event_type"` // e.g., "deployment_finished"
Service string `json:"service"`
Environment string `json:"env"`
Status string `json:"status"` // "success", "failure"
CommitSHA string `json:"commit_sha"`
CommitTimestamp int64 `json:"commit_timestamp"` // Unix timestamp
JobDurationSec float64 `json:"job_duration_sec"`
}
// eventHandler parses incoming events and updates metrics.
func eventHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is accepted", http.StatusMethodNotAllowed)
return
}
var event GenericEvent
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&event); err != nil {
log.Printf("Error decoding event payload: %v", err)
http.Error(w, "Invalid payload", http.StatusBadRequest)
return
}
// Basic validation
if event.Service == "" || event.Environment == "" || event.Status == "" {
log.Printf("Missing required fields in event: %+v", event)
http.Error(w, "Missing required fields: service, env, status", http.StatusBadRequest)
return
}
log.Printf("Received event: %+v", event)
// Process the event based on its type
switch event.EventType {
case "deployment_finished":
// Increment deployment counter
deploymentCounter.WithLabelValues(event.Service, event.Environment, event.Status, event.CommitSHA).Inc()
// Record deployment duration if available
if event.JobDurationSec > 0 {
deploymentDurationHistogram.WithLabelValues(event.Service, event.Environment, event.Status).Observe(event.JobDurationSec)
}
// Record lead time for change if this was a successful deployment and commit timestamp is provided
if event.Status == "success" && event.CommitTimestamp > 0 {
commitTime := time.Unix(event.CommitTimestamp, 0)
leadTime := time.Since(commitTime).Seconds()
leadTimeHistogram.WithLabelValues(event.Service, event.Environment).Observe(leadTime)
}
default:
log.Printf("Unknown event type: %s", event.EventType)
// We don't return an error here, just log it. The service should be resilient to unknown event types.
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Event processed"))
}
部署到 Kubernetes
服务的部署是标准的 Kubernetes 操作。关键是创建 Service
以暴露端口,以及创建 ServiceMonitor
(如果使用 Prometheus Operator) 或在 Deployment
中添加 annotations 让 Prometheus 能够发现它。
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: cicd-telemetry-service
namespace: observability
labels:
app: cicd-telemetry-service
spec:
replicas: 2 # Production should have at least 2 replicas for HA
selector:
matchLabels:
app: cicd-telemetry-service
template:
metadata:
labels:
app: cicd-telemetry-service
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '8080'
prometheus.io/path: '/metrics'
spec:
containers:
- name: server
image: your-repo/cicd-telemetry-service:v1.0.2
ports:
- containerPort: 8080
name: http
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 100m
memory: 128Mi
env:
- name: PORT
value: "8080"
---
apiVersion: v1
kind: Service
metadata:
name: cicd-telemetry-service
namespace: observability
spec:
selector:
app: cicd-telemetry-service
ports:
- port: 80
targetPort: 8080
protocol: TCP
name: http
CircleCI 工作流配置
这是将理论付诸实践的地方。我们需要修改 .circleci/config.yml
,在流程的关键步骤中插入对 cicd-telemetry-service
的调用。
一个常见的错误是在每个 job 结束时都发送事件。这会导致数据冗余且难以分析。正确的做法是在工作流的逻辑终点,也就是代表一次“部署”的那个 job 完成后,发送一个聚合了所有相关信息的 deployment_finished
事件。
version: 2.1
orbs:
kfp: kfc-ci/kubeflow-[email protected]
commands:
# Command to notify our telemetry service
notify_telemetry:
parameters:
event_type:
type: string
status:
type: string
steps:
- run:
name: Notify Telemetry Service
when: always # Run this step even if previous steps fail
command: |
# Fetch the timestamp of the commit that triggered this workflow
COMMIT_TIMESTAMP=$(git log -1 --pretty=%ct ${CIRCLE_SHA1})
# Use CIRCLE_JOB environment variable to determine job status for failure reporting
JOB_STATUS="<< parameters.status >>"
if [ "$JOB_STATUS" == "failure" ] && [ "$CIRCLE_JOB" != "deploy_model_to_staging" ]; then
# This isn't the final deployment job, so we don't report failure yet
exit 0
fi
# Calculate job duration if available
# Note: This is a simplified duration. For precise metrics, you might need CircleCI API.
JOB_DURATION=$(expr $(date +%s) - ${CIRCLE_BUILD_START_TIME:-$(date +%s)})
# Construct JSON payload
JSON_PAYLOAD=$(cat <<EOF
{
"event_type": "<< parameters.event_type >>",
"service": "model-forecasting-A",
"env": "staging",
"status": "${JOB_STATUS}",
"commit_sha": "${CIRCLE_SHA1}",
"commit_timestamp": ${COMMIT_TIMESTAMP},
"job_duration_sec": ${JOB_DURATION}
}
EOF
)
# Send the event. TELEMETRY_SERVICE_URL is a CircleCI context variable.
curl -X POST -H "Content-Type: application/json" \
-d "${JSON_PAYLOAD}" \
${TELEMETRY_SERVICE_URL}/event
workflows:
build_and_deploy_model:
jobs:
- build_and_push_image:
filters:
branches:
only:
- main
- deploy_model_to_staging:
requires:
- build_and_push_image
context:
- kubernetes-creds
- telemetry-service-secrets
jobs:
build_and_push_image:
docker:
- image: cimg/python:3.9-build
steps:
- checkout
# ... steps to build docker image and push to ECR/GCR ...
- run: echo "Image build complete."
deploy_model_to_staging:
docker:
- image: cimg/base:2022.01
environment:
# These should come from context or project environment variables
KUBEFLOW_ENDPOINT: "https://<your-kubeflow-url>"
steps:
- checkout
- kfp/install-kfp-sdk
- run:
name: Compile and upload Kubeflow pipeline
command: |
# This is where your actual pipeline logic resides
# It compiles a Python DSL file and triggers a run on Kubeflow
python3 pipelines/my_model_pipeline.py --endpoint $KUBEFLOW_ENDPOINT --commit_sha $CIRCLE_SHA1
# This is the crucial part: notify telemetry on success or failure
- notify_telemetry:
event_type: "deployment_finished"
status: "success"
- notify_telemetry:
on_fail: true
event_type: "deployment_finished"
status: "failure"
这个配置的精髓在于 notify_telemetry
命令和 on_fail
的使用。无论 deploy_model_to_staging
作业成功还是失败,我们都能捕获到 deployment_finished
事件,只是 status
标签不同。这对于计算“变更失败率”至关重要。
Grafana 仪表盘与 PromQL 查询
最后一步是可视化。我们创建一个新的 Grafana 仪表盘,添加四个面板,分别对应 DORA 的四个核心指标。
1. 部署频率 (Deployment Frequency)
- 类型: Stat / Time series
- PromQL:
sum(rate(mlops_deployment_finished_total{service="$service", env="$env", status="success"}[$__rate_interval]))
- 解读: 这个查询计算了在选定时间范围内,成功的部署速率。我们使用
rate()
函数来处理计数器重置问题,并获得一个平滑的、标准化的“每秒部署次数”值,Grafana 可以轻松地将其转换为“每天/每周部署次数”。
2. 变更前置时间 (Lead Time for Changes)
- 类型: Histogram / Heatmap
- PromQL:
histogram_quantile(0.95, sum(rate(mlops_lead_time_for_change_seconds_bucket{service="$service", env="$env"}[5m])) by (le))
- 解读: 这是最复杂的指标之一。我们使用
histogram_quantile
函数计算直方图数据的 P95 百分位数。这意味着“95%的变更,其前置时间都小于这个值”。监控 P95 比监控平均值更能反映用户体验的“最差情况”,避免被少数极快或极慢的部署所误导。
3. 变更失败率 (Change Failure Rate)
- 类型: Gauge / Stat
- PromQL:
( sum(increase(mlops_deployment_finished_total{service="$service", env="$env", status="failure"}[$__range])) / sum(increase(mlops_deployment_finished_total{service="$service", env="$env"}[$__range])) ) * 100
- 解读: 查询非常直观:失败的部署增量除以总的部署增量。我们使用
increase()
而不是rate()
,因为它能更好地处理在时间窗口内发生的离散事件计数。
4. 服务恢复时间 (Time to Restore Service)
- 注意: 这个指标是最难自动度量的,因为它需要定义什么是“服务中断”以及什么是“恢复”。在我们的 MLOps 场景中,可以简化为“从一次失败的部署到下一次成功的部署之间的时间”。这需要更复杂的 PromQL,通常涉及到
timestamp()
和向量匹配,或者通过告警系统(如 Alertmanager)记录事件来计算。一个简化的初版实现可以是手动记录,或者通过分析部署事件序列来近似。
当前方案的局限性与未来迭代
这套系统为我们的 MLOps 流程提供了前所未有的可见性,但它并非完美。
首先,cicd-telemetry-service
是一个自研组件,虽然简单,但仍有维护成本,并且是系统中的一个潜在单点故障。在生产环境中,需要保证其高可用性。
其次,“变更前置时间”的定义目前是从“代码提交”到“部署任务结束”。在 MLOps 中,一个更精确的度量可能应该是从“实验想法产生”或“Jira ticket 创建”到“模型产生业务价值”。这需要与项目管理工具集成,已经超出了纯粹的 CI/CD 范畴。
最后,我们对“失败”的定义还比较粗浅,仅限于 CircleCI 作业的成功或失败。一个部署成功的模型,可能会因为性能不佳或预测结果有偏差而在业务上是“失败”的。真正的 MLOps 可观测性闭环,需要将模型在线上环境的性能监控数据(例如预测准确率、延迟)回传,并与部署事件关联。这需要引入模型监控系统(如 Seldon Core Analytics, Arize AI)并将它们的告警作为“变更失败”的另一个来源。