团队扩张后,Code Review 流程的效率瓶颈开始显现。Pull Request (PR) 的平均合并时间不断拉长,但我们缺乏数据来定位具体问题:是某些模块的 리뷰가 너무 늦어서인가요? 是某些资深工程师成为了评审瓶颈?还是 CI 流程本身在拖后腿?我们迫切需要一个能够量化、搜索和分析整个研发流程的平台。
最初的构想是把所有 Git Hooks、CI/CD 流水线日志、Jira 事件都推送到现有的 Elasticsearch 集群。这方案看似简单,但在我们尝试了几天后,问题就暴露了:我们的核心需求是聚合分析,而非全文搜索。我们需要频繁地执行类似 GROUP BY
、JOIN
和窗口函数来计算各类周期性指标。在 Elasticsearch 上执行这类复杂聚合,不仅性能不理想,而且 DSL 的编写和维护成本也相当高。我们需要一个更适合 OLAP 场景的工具。
这就是 ClickHouse 进入我们视野的原因。其列式存储结构和为大规模聚合查询设计的引擎,完全契合我们的需求。我们的目标是构建一个系统,它能:
- 接收来自 GitLab/GitHub Webhooks 的结构化事件(PR 创建、评论、合并等)。
- 接收来自 Jenkins/GitLab CI 的流水线日志。
- 将这些数据持久化到 ClickHouse 中。
- 提供一个高效的查询接口,用于即席搜索和 Dashboard 展示。
- 整个基础设施的部署和配置必须通过 Ansible 实现自动化,以保证环境的一致性和可重复性。
数据模型与 ClickHouse 表结构设计
一切始于数据模型。我们需要一张宽表来容纳所有工程事件,避免查询时频繁 JOIN。在真实项目中,过早的规范化是性能杀手。
-- file: clickhouse_schema.sql
-- 定义工程事件统一存储表
-- 使用 MergeTree 引擎,这是ClickHouse最强大的表引擎
CREATE TABLE IF NOT EXISTS engineering_metrics.events ON CLUSTER '{cluster}'
(
-- 核心维度
event_id UUID DEFAULT generateUUIDv4(), -- 事件唯一ID
event_type String, -- 事件类型 (e.g., 'pr_opened', 'pr_commented', 'ci_build_finished')
event_source String, -- 事件来源 (e.g., 'gitlab', 'jenkins', 'sonarqube')
project_name String, -- 项目名称
timestamp DateTime64(3, 'Asia/Shanghai'), -- 事件发生时间,精确到毫秒
-- PR 相关维度
pr_id Nullable(UInt64),
pr_title Nullable(String),
pr_author Nullable(String),
pr_target_branch Nullable(String),
pr_source_branch Nullable(String),
-- CI/CD 相关维度
ci_pipeline_id Nullable(UInt64),
ci_job_name Nullable(String),
ci_status LowCardinality(Nullable(String)), -- 'success', 'failed', 'running'
ci_duration_ms Nullable(UInt64),
-- Code Review 相关维度
review_comment Nullable(String),
reviewer Nullable(String),
approver Nullable(String),
-- 代码质量维度
code_smells Nullable(Int32),
vulnerabilities Nullable(Int32),
coverage Nullable(Float32),
-- 原始载荷,用于未来追溯或深度分析
raw_payload String
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/engineering_metrics/events', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (project_name, event_type, timestamp)
SETTINGS index_granularity = 8192;
-- 为了支持分布式写入和查询,创建分布式表
CREATE TABLE IF NOT EXISTS engineering_metrics.events_distributed ON CLUSTER '{cluster}' AS engineering_metrics.events
ENGINE = Distributed(
'{cluster}', -- 集群名称
engineering_metrics, -- 本地数据库
events, -- 本地表
rand() -- 分片键,随机写入保证数据均匀
);
这里的选型决策至关重要:
-
ReplicatedReplacingMergeTree
: 我们选择这个引擎而不是普通的MergeTree
。它可以在副本之间同步数据,保证高可用。同时,ReplacingMergeTree
可以在数据重复时(基于ORDER BY
的键),保留最新版本,这对于处理可能重发的 Webhook 事件非常有用。 -
PARTITION BY toYYYYMM(timestamp)
: 按月分区。这是最常见的实践,它能极大地加速基于时间范围的查询,并且便于管理历史数据(例如,删除旧分区)。 -
ORDER BY (project_name, event_type, timestamp)
: 排序键是 ClickHouse 性能优化的核心。将查询中频繁用于过滤的字段(如project_name
)放在前面,可以利用其稀疏索引快速跳过大量不相关的数据块。 -
Distributed
引擎:它本身不存储数据,而是作为一个代理,将查询和写入请求路由到集群中的各个分片。这使得我们可以对应用层透明地扩展 ClickHouse 集群。
使用 Ansible 实现 ClickHouse 集群自动化部署
手动部署一个 ClickHouse 集群是繁琐且容易出错的。我们使用 Ansible 来标准化这个过程。整个 Ansible 项目结构如下:
ansible/
├── group_vars/
│ └── all.yml
├── roles/
│ ├── clickhouse_common/
│ │ ├── tasks/
│ │ │ └── main.yml
│ │ └── templates/
│ │ └── config.xml.j2
│ ├── clickhouse_server/
│ │ └── tasks/
│ │ └── main.yml
│ └── zookeeper/
│ └── tasks/
│ └── main.yml
├── inventory.ini
└── playbook.yml
inventory.ini
定义了我们的集群节点:
[clickhouse_nodes]
ch-node-01 ansible_host=10.0.1.11
ch-node-02 ansible_host=10.0.1.12
[zookeeper_nodes]
zk-node-01 ansible_host=10.0.1.21
[all:vars]
clickhouse_cluster_name=ch_cluster_prod
核心的 Ansible Playbook playbook.yml
:
---
- hosts: zookeeper_nodes
become: yes
roles:
- role: zookeeper
- hosts: clickhouse_nodes
become: yes
serial: 1 # 逐台部署,避免集群脑裂风险
roles:
- role: clickhouse_common
- role: clickhouse_server
下面是 clickhouse_common
角色的一个关键任务,用于分发集群配置。注意这里使用了 Jinja2 模板来动态生成配置。
# roles/clickhouse_common/tasks/main.yml
- name: Create ClickHouse config directory
file:
path: /etc/clickhouse-server/config.d
state: directory
owner: clickhouse
group: clickhouse
- name: Distribute ClickHouse cluster configuration
template:
src: config.xml.j2
dest: /etc/clickhouse-server/config.d/cluster_config.xml
owner: clickhouse
group: clickhouse
notify: restart clickhouse-server
# handlers/main.yml for restarting the service
- name: restart clickhouse-server
service:
name: clickhouse-server
state: restarted
config.xml.j2
模板文件是精髓所在,它根据 inventory.ini
动态生成集群拓扑:
<!-- templates/config.xml.j2 -->
<yandex>
<remote_servers>
<{{ clickhouse_cluster_name }}>
<shard>
<internal_replication>true</internal_replication>
{% for host in groups['clickhouse_nodes'] %}
<replica>
<host>{{ hostvars[host]['ansible_host'] }}</host>
<port>9000</port>
</replica>
{% endfor %}
</shard>
</{{ clickhouse_cluster_name }}>
</remote_servers>
<zookeeper-servers>
{% for host in groups['zookeeper_nodes'] %}
<node index="{{ loop.index }}">
<host>{{ hostvars[host]['ansible_host'] }}</host>
<port>2181</port>
</node>
{% endfor %}
</zookeeper-servers>
<macros>
<shard>01</shard>
<!-- The replica macro is dynamically resolved on each node -->
<replica>{{ ansible_hostname }}</replica>
<cluster>{{ clickhouse_cluster_name }}</cluster>
</macros>
</yandex>
这个模板使得我们可以用一套代码管理多个环境的集群,只需更换 inventory.ini
文件即可。
在容器编排平台中部署数据采集端
我们的 CI/CD 任务和应用服务都运行在 Kubernetes 集群上。因此,数据采集的逻辑也应该云原生化。我们选择 Vector 作为一个高性能的数据采集器。它将被部署为 Kubernetes DaemonSet,负责从各个节点收集日志,并作为 Webhook 的接收端。
下面是 Vector 的 Kubernetes DaemonSet
和 Service
配置:
# file: vector-k8s-deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: vector-webhook-ingestor
namespace: observability
spec:
selector:
app: vector
ports:
- protocol: TCP
port: 80
targetPort: 8686
type: ClusterIP
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: vector
namespace: observability
spec:
selector:
matchLabels:
app: vector
template:
metadata:
labels:
app: vector
spec:
containers:
- name: vector
image: timberio/vector:0.35.0-alpine
args: ["--config", "/etc/vector/vector.toml"]
ports:
- name: webhook
containerPort: 8686
volumeMounts:
- name: config
mountPath: /etc/vector/
volumes:
- name: config
configMap:
name: vector-config
Vector 的配置文件 vector.toml
是数据处理的核心。它定义了数据源(sources
)、转换(transforms
)和目的地(sinks
)。
# file: vector.toml in ConfigMap
# [sources.gitlab_webhook]
type = "http_server"
address = "0.0.0.0:8686"
decoding.codec = "json"
# [sources.ci_logs]
# type = "kubernetes_logs"
# ... more configuration for log collection from pods
[transforms.process_gitlab_pr]
type = "remap"
inputs = ["gitlab_webhook"]
source = '''
.event_type = "gitlab_" + .event_name
.event_source = "gitlab"
.project_name = .project.path_with_namespace
.timestamp = format_timestamp!(parse_timestamp!(.object_attributes.created_at, "%+"), format: "%Y-%m-%d %H:%M:%S")
# Extract PR details
.pr_id = .object_attributes.id
.pr_title = .object_attributes.title
.pr_author = .user.username
.pr_target_branch = .object_attributes.target_branch
.pr_source_branch = .object_attributes.source_branch
# Preserve the original payload
.raw_payload = encode_json(.)
'''
[sinks.clickhouse_metrics]
type = "clickhouse"
inputs = ["process_gitlab_pr"] # Add other processed inputs here
endpoint = "http://clickhouse-prod.internal:8123"
database = "engineering_metrics"
table = "events_distributed"
# Batching is crucial for ClickHouse performance
batch.max_bytes = 10000000 # 10MB
batch.timeout_secs = 5
healthcheck.enabled = true
# Error handling is important in production
acknowledgements.enabled = true
这段配置展示了 Vector 的强大之处:
-
http_server
source 直接将 Vector 变成一个 Webhook 接收服务。 -
remap
transform 使用 Vector Remap Language (VRL) 对传入的 JSON 数据进行实时清洗、转换和丰富,将其塑造成符合我们 ClickHouse 表结构的格式。这是一个轻量级的 ETL 过程。 -
clickhouse
sink 负责将处理后的数据高效地批量写入 ClickHouse。配置中的batch
参数对于避免对 ClickHouse 造成过大的写入压力至关重要。
实现搜索与分析查询
平台搭建完成后,我们可以开始回答最初提出的问题。所有的查询都是标准 SQL,这大大降低了使用门槛。
1. 搜索特定用户的 Code Review 评论
SELECT
timestamp,
project_name,
pr_title,
review_comment
FROM engineering_metrics.events_distributed
WHERE event_type = 'pr_commented'
AND reviewer = 'some_developer'
AND timestamp >= now() - INTERVAL 7 DAY
ORDER BY timestamp DESC
LIMIT 50;
这个查询利用了 ORDER BY
键中的 event_type
和 timestamp
,执行速度极快。
2. 分析各项目平均 PR 合并时间
-- This query requires joining events for the same PR
SELECT
project_name,
avg(merge_time - create_time) / 3600 AS avg_merge_hours
FROM
(
SELECT
pr_id,
project_name,
minIf(timestamp, event_type = 'pr_opened') AS create_time,
maxIf(timestamp, event_type = 'pr_merged') AS merge_time
FROM engineering_metrics.events_distributed
WHERE timestamp >= now() - INTERVAL 30 DAY
AND pr_id IS NOT NULL
GROUP BY pr_id, project_name
)
WHERE merge_time > create_time -- Filter out PRs that are not merged yet
GROUP BY project_name
ORDER BY avg_merge_hours DESC;
这是一个更复杂的分析查询,它展示了 ClickHouse 的聚合能力。通过 minIf
和 maxIf
条件聚合函数,我们在一次扫描中就能计算出每个 PR 的创建和合并时间,然后在外层查询中计算平均值。
3. 发现 CI 流水线失败热点
SELECT
project_name,
ci_job_name,
count(*) AS failure_count
FROM engineering_metrics.events_distributed
WHERE event_type = 'ci_build_finished'
AND ci_status = 'failed'
AND timestamp >= now() - INTERVAL 1 DAY
GROUP BY project_name, ci_job_name
ORDER BY failure_count DESC
LIMIT 10;
这个查询可以快速定位过去24小时内失败最频繁的 CI 任务,为我们指明了优化的方向。
架构图谱
为了更清晰地展示整个系统的数据流,下面是其架构的 Mermaid 图。
graph TD subgraph Git & CI/CD Platform A[GitLab / GitHub] -- PR & Commit Webhooks --> B; C[Jenkins / GitLab CI] -- Pipeline Status Webhooks --> B; end subgraph Kubernetes Cluster B(Vector Ingestion Service); D(Vector DaemonSet); B -- Raw Events --> D; D -- Structured Data --> E; end subgraph Data & Analytics Platform E[ClickHouse Distributed Table]; F[ClickHouse Node 1]; G[ClickHouse Node 2]; H[Zookeeper]; E -.-> F; E -.-> G; F <--> H; G <--> H; end subgraph Users I[Grafana Dashboard] -- SQL Queries --> E; J[Developer / SRE] -- Ad-hoc SQL Search --> E; end
这个架构虽然简单,但它具备了良好的水平扩展能力。当数据量增长时,我们可以通过 Ansible 向 ClickHouse 集群添加更多节点,并通过更新 Kubernetes DaemonSet
来增加 Vector 的处理能力,整个过程对上游数据源和下游用户是透明的。
方案局限与未来展望
当前这套系统并非完美。首先,它依赖于一个集中的 ZooKeeper 集群来管理 ClickHouse 副本,这本身构成了一个单点故障风险,尽管 ZooKeeper 本身是高可用的。ClickHouse 社区正在积极推广基于 ClickHouse Keeper 的无 ZooKeeper 方案,这是我们未来演进的方向。
其次,当前的 ReplicatedReplacingMergeTree
引擎对于乱序和延迟事件的处理并不完美。如果一个更新事件比创建事件先到达,它可能会被错误地丢弃。对于更严格的事件溯源场景,可能需要引入一个上游的消息队列(如 Kafka)来保证事件的顺序性,并在数据模型中引入版本号。
最后,虽然我们实现了数据采集和存储的自动化,但“搜索”和“分析”的用户界面还比较原始,主要依赖直接编写 SQL 或配置 Grafana。下一步计划是构建一个简单的领域特定搜索界面,将常用的查询模式封装成参数化的 API,从而让非技术背景的团队成员也能从中获取洞见。