为支持一个高并发推荐系统,我们面临一个典型的两难困境:模型需要两种截然不同的特征。第一种是“即时特征”,例如用户在当前会话中5秒内的点击行为、刚刚浏览的商品类别等,要求特征延迟在50毫秒以内。第二种是“历史特征”,如用户过去30天的购买力分层、品类偏好向量等,这些特征计算复杂,涉及海量历史数据,但可以容忍小时级的更新延迟。
现有的核心业务系统由Java构建,稳定但难以应对高并发的实时事件摄入。单纯的流式处理方案(如Flink)能解决实时性,但对于复杂、需要全量历史数据回溯的批处理任务,实现成本和资源消耗巨大。而纯粹的批处理方案(如定时Spark任务)则完全无法满足实时性要求。技术栈选型的核心矛盾在于,如何用一个可维护的架构,同时满足毫秒级的实时摄入、高效的时序数据存储和强大的离线计算能力,并与现有的Java生态平滑集成。
架构决策:为何选择 Axum + TimescaleDB + Spark 的异构组合
我们最终确定的架构是一个混合模型,它并非严格的Lambda或Kappa架构,而是一个针对特定问题的务实组合。
graph TD subgraph "实时路径 (Real-time Path)" A[Client Events] -->|HTTP/JSON| B(Axum Ingestion Service - Rust); B -->|Raw Events| C(TimescaleDB Hypertable); C -->|Continuous Aggregates| D(Real-time Features Table); end subgraph "批处理路径 (Batch Path)" E(Apache Spark Cluster - Java/Scala) -->|JDBC Read| C; E -->|Complex Feature Engineering| F(Batch Features Table); end subgraph "特征服务 (Feature Serving)" G[ML Model Service - Java] -->|Query| D; G -->|Query| F; G --> H(Recommendation Result); end style B fill:#34495e,stroke:#fff,stroke-width:2px,color:#fff style E fill:#f39c12,stroke:#fff,stroke-width:2px,color:#fff style C fill:#2980b9,stroke:#fff,stroke-width:2px,color:#fff
这个架构的决策逻辑如下:
摄入层 (Ingestion) - Axum (Rust): 应对高并发、低延迟的事件写入是首要挑战。Java生态中的Spring WebFlux虽然是异步非阻塞的,但在极限性能、内存占用和CPU效率上,与基于Tokio的Rust框架Axum相比仍有差距。选择Axum的核心原因是:
- 极致性能与资源效率: Rust没有GC,编译后的原生代码执行效率极高。一个极简的Axum服务就能处理数万QPS,且内存占用稳定在几十MB,这对于需要水平扩展的摄入节点来说,成本效益显著。
- 内存安全与并发可靠性: Rust的所有权和借用检查机制在编译期就消除了数据竞争等并发问题,对于要求7x24小时稳定运行的摄入服务至关重要。
- 外科手术式集成: 摄入服务功能单一、边界清晰,用Rust重写这一热点路径,对现有Java系统的侵入性最小。
存储层 (Storage) - TimescaleDB: 我们需要一个既能处理海量时序数据写入,又能被Spark高效读取的数据库。
- 原生时序支持: TimescaleDB作为PostgreSQL的扩展,其Hypertable能自动按时间对数据进行分区(分块),写入性能不会随数据量增长而衰减。这完美匹配了我们的事件流场景。
- 连续聚合 (Continuous Aggregates): 这是TimescaleDB的杀手级特性。我们可以定义一个物化视图,它能自动、增量地在后台预计算实时特征(如每分钟的用户点击次数)。上层服务查询这些预计算结果,延迟极低。
- 通用SQL接口: 它保留了完整的PostgreSQL功能和JDBC兼容性。这意味着现有的Java服务和Spark集群无需任何特殊驱动,就能像访问普通PG数据库一样读写数据,极大降低了集成成本。
计算层 (Computation) - Apache Spark (Java): 对于复杂的历史特征计算,Spark的分布式计算能力无可替代。
- 技术复用: 团队已经具备深厚的Java和Spark使用经验,复用现有技术栈和基础设施是最稳妥的选择。
- 强大的表达能力: Spark SQL和DataFrame API可以轻松实现复杂的时间窗口分析、用户行为序列建模等算法。
- 生态整合: Spark能通过JDBC稳定地从TimescaleDB读取海量数据进行批处理,并将结果写回数据库或数据湖。
核心实现:代码层面的串联
1. Axum 实时事件摄入服务
这是整个架构的入口。服务必须做到轻量、高效且健壮。
Cargo.toml
依赖:
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres", "time", "json" ] }
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
main.rs
核心代码:
use axum::{
extract::{Json, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::post,
Router,
};
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, info};
// 定义应用状态,主要用于共享数据库连接池
#[derive(Clone)]
struct AppState {
pool: PgPool,
}
// 定义接收的事件体
// 在真实项目中,这里会有更复杂的字段和验证
#[derive(Deserialize, Debug)]
struct UserEvent {
user_id: String,
event_type: String,
item_id: String,
// 使用 serde_json::Value 可以接收任意结构的 JSON 对象
properties: serde_json::Value,
}
// 自定义应用错误类型,方便统一处理
#[derive(thiserror::Error, Debug)]
enum AppError {
#[error("Database error: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("Internal server error")]
Internal,
}
// 实现 IntoResponse trait,让我们的错误类型可以被 Axum 转换为 HTTP 响应
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match self {
AppError::Sqlx(e) => {
// 不应将详细的数据库错误暴露给客户端
error!("Database query failed: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
}
AppError::Internal => {
error!("An unexpected internal error occurred.");
(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
}
};
(status, message).into_response()
}
}
#[tokio::main]
async fn main() {
// 初始化日志
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
// 从环境变量读取数据库连接字符串,这是生产实践
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
// 创建数据库连接池
// 生产环境中,连接数、超时等参数需要仔细调优
let pool = PgPoolOptions::new()
.max_connections(50)
.connect(&database_url)
.await
.expect("Failed to create database pool");
info!("Database pool created successfully.");
let app_state = Arc::new(AppState { pool });
// 定义路由
let app = Router::new()
.route("/events", post(ingest_event))
.with_state(app_state.as_ref().clone());
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
info!("Listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
// 事件处理函数
async fn ingest_event(
State(state): State<AppState>,
Json(payload): Json<UserEvent>,
) -> Result<StatusCode, AppError> {
info!("Received event for user: {}", payload.user_id);
// 这里的 SQL 查询是关键。
// `TIMESTAMPTZ 'NOW()'` 使用数据库服务器时间,避免客户端时间不一致问题。
// `properties` 字段直接存为 JSONB 类型,灵活且可查询。
sqlx::query!(
r#"
INSERT INTO user_events (event_time, user_id, event_type, item_id, properties)
VALUES (NOW(), $1, $2, $3, $4)
"#,
payload.user_id,
payload.event_type,
payload.item_id,
payload.properties,
)
.execute(&state.pool)
.await?; // 使用 ? 操作符,如果出错会自动转换为 AppError::Sqlx
// 单元测试思路:
// 1. Mock 数据库连接,验证 SQL 语句和参数是否正确。
// 2. 构造合法的 JSON payload,验证是否返回 201 CREATED。
// 3. 构造非法的 JSON payload,验证是否返回 400 BAD REQUEST。
// 4. 模拟数据库连接失败,验证是否返回 500 INTERNAL SERVER ERROR。
Ok(StatusCode::CREATED)
}
2. TimescaleDB 数据表与连续聚合
数据库层面是承上启下的关键。
创建 Hypertable:
-- 原始事件表
CREATE TABLE user_events (
event_time TIMESTAMPTZ NOT NULL,
user_id VARCHAR(100) NOT NULL,
event_type VARCHAR(50) NOT NULL,
item_id VARCHAR(100),
properties JSONB
);
-- 将其转换为 TimescaleDB 的 Hypertable,按 event_time 字段分区
-- chunk_time_interval 设为 1 天,是生产环境中常见的配置
SELECT create_hypertable('user_events', 'event_time', chunk_time_interval => INTERVAL '1 day');
-- 为高频查询字段创建索引
CREATE INDEX ON user_events (user_id, event_time DESC);
CREATE INDEX ON user_events (event_type, event_time DESC);
创建连续聚合以生成实时特征:
假设我们需要一个实时特征:“每个用户最近5分钟的点击次数”。
-- 创建一个连续聚合视图
CREATE MATERIALIZED VIEW user_click_counts_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', event_time) AS bucket,
user_id,
COUNT(*) AS click_count
FROM
user_events
WHERE
event_type = 'click'
GROUP BY
bucket,
user_id;
-- 设置刷新策略:每分钟刷新过去10分钟的数据
-- 这样既保证了数据的准实时性,又避免了频繁的全量刷新
SELECT add_continuous_aggregate_policy('user_click_counts_5m',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
ML模型服务可以直接查询 user_click_counts_5m
这个视图,通过 WHERE user_id = ? AND bucket > NOW() - INTERVAL '5 minutes'
这样的SQL,在几毫秒内获取到所需特征。
3. Apache Spark 批处理作业
这个作业将由调度系统(如Airflow)每天定时触发,计算复杂的历史特征。
Java Spark 代码片段:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import static org.apache.spark.sql.functions.*;
import java.util.Properties;
public class HistoricalFeatureJob {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("HistoricalFeatureJob")
// .master("local[*]") // 本地测试时使用
.getOrCreate();
// 数据库连接属性
Properties connectionProperties = new Properties();
connectionProperties.put("user", "your_user");
connectionProperties.put("password", "your_password");
// 使用 PostgreSQL 的 JDBC 驱动
connectionProperties.put("driver", "org.postgresql.Driver");
String jdbcUrl = "jdbc:postgresql://your_timescaledb_host:5432/your_db";
// 从 TimescaleDB 读取过去30天的数据
// 这里的关键是使用 Spark 的分区读取能力来避免 OOM
// 比如可以按日期分区并行读取,这里为了简化,读取全表
Dataset<Row> events = spark.read()
.jdbc(jdbcUrl, "user_events", connectionProperties)
.where(col("event_time").gt(functions.current_timestamp().minus(expr("INTERVAL 30 DAYS"))));
// 计算一个复杂的特征:每个用户过去30天内购买总额和购买次数
Dataset<Row> userPurchaseFeatures = events
.where(col("event_type").equalTo("purchase"))
// properties 字段是 JSON,需要解析
// `get_json_object` 是 Spark SQL 的函数
.withColumn("price", get_json_object(col("properties"), "$.price").cast("double"))
.withColumn("quantity", get_json_object(col("properties"), "$.quantity").cast("int"))
.withColumn("total_value", col("price").multiply(col("quantity")))
.groupBy("user_id")
.agg(
sum("total_value").as("total_purchase_value_30d"),
count("*").as("purchase_count_30d")
);
// 计算另一个特征:用户最偏好的商品类别
Dataset<Row> userCategoryPreference = events
.withColumn("category", get_json_object(col("properties"), "$.category"))
.filter(col("category").isNotNull())
.groupBy("user_id", "category")
.count()
// 使用窗口函数找到每个用户互动次数最多的类别
.withColumn("rank", rank().over(
org.apache.spark.sql.expressions.Window
.partitionBy("user_id")
.orderBy(col("count").desc())
))
.where(col("rank").equalTo(1))
.select("user_id", col("category").as("preferred_category_30d"));
// 将多个特征关联起来
Dataset<Row> finalFeatures = userPurchaseFeatures
.join(userCategoryPreference, "user_id", "left_outer");
// 将结果写回一个新的数据库表中,供线上服务查询
// `SaveMode.Overwrite` 保证了每次任务运行时都会覆盖旧数据
finalFeatures.write()
.mode("Overwrite")
.jdbc(jdbcUrl, "batch_user_features", connectionProperties);
spark.stop();
}
}
这个Java服务在查询时,会同时请求TimescaleDB的实时特征表和这个批处理结果表,将两种特征合并后提供给模型。
架构的局限性与未来迭代方向
这个架构并非银弹。一个显而易见的挑战是维护一个异构技术栈的复杂性。团队需要同时具备Rust、Java、Spark和PostgreSQL的专业知识。Rust的生态系统相对年轻,寻找经验丰富的工程师可能是一个挑战。
其次,实时路径和批处理路径的数据存在最终一致性的问题。在Spark任务运行期间,新写入的实时数据不会被包含在当次的计算结果中,这可能导致短时间内的特征不一致。对于某些对一致性要求极高的场景,这可能是一个问题。
未来的迭代方向可以考虑:
- 统一特征存储: 将实时和离线特征都写入一个高性能的在线KV存储(如Redis或DynamoDB),供Java服务层统一查询,以降低查询链路的复杂性。Spark将计算结果写入KV,实时聚合结果也通过一个简单的服务同步到KV。
- 探索流批一体: 随着Flink等框架的成熟,可以评估使用Flink SQL同时处理实时和批处理任务的可能性,以简化技术栈。但这需要对现有批处理逻辑进行大量重构,并评估其在处理海量历史回溯任务时的性能和成本。
- 服务化与平台化: 将特征的计算、存储和服务抽象成一个内部的特征平台(Feature Store)。这能让算法工程师通过配置文件或简单的API来定义和使用特征,而无需关心底层的实现细节,从而提升整个团队的研发效率。