融合 Axum 与 Spark 构建毫秒级延迟的混合式实时特征平台架构


为支持一个高并发推荐系统,我们面临一个典型的两难困境:模型需要两种截然不同的特征。第一种是“即时特征”,例如用户在当前会话中5秒内的点击行为、刚刚浏览的商品类别等,要求特征延迟在50毫秒以内。第二种是“历史特征”,如用户过去30天的购买力分层、品类偏好向量等,这些特征计算复杂,涉及海量历史数据,但可以容忍小时级的更新延迟。

现有的核心业务系统由Java构建,稳定但难以应对高并发的实时事件摄入。单纯的流式处理方案(如Flink)能解决实时性,但对于复杂、需要全量历史数据回溯的批处理任务,实现成本和资源消耗巨大。而纯粹的批处理方案(如定时Spark任务)则完全无法满足实时性要求。技术栈选型的核心矛盾在于,如何用一个可维护的架构,同时满足毫秒级的实时摄入、高效的时序数据存储和强大的离线计算能力,并与现有的Java生态平滑集成。

架构决策:为何选择 Axum + TimescaleDB + Spark 的异构组合

我们最终确定的架构是一个混合模型,它并非严格的Lambda或Kappa架构,而是一个针对特定问题的务实组合。

graph TD
    subgraph "实时路径 (Real-time Path)"
        A[Client Events] -->|HTTP/JSON| B(Axum Ingestion Service - Rust);
        B -->|Raw Events| C(TimescaleDB Hypertable);
        C -->|Continuous Aggregates| D(Real-time Features Table);
    end

    subgraph "批处理路径 (Batch Path)"
        E(Apache Spark Cluster - Java/Scala) -->|JDBC Read| C;
        E -->|Complex Feature Engineering| F(Batch Features Table);
    end

    subgraph "特征服务 (Feature Serving)"
        G[ML Model Service - Java] -->|Query| D;
        G -->|Query| F;
        G --> H(Recommendation Result);
    end

    style B fill:#34495e,stroke:#fff,stroke-width:2px,color:#fff
    style E fill:#f39c12,stroke:#fff,stroke-width:2px,color:#fff
    style C fill:#2980b9,stroke:#fff,stroke-width:2px,color:#fff

这个架构的决策逻辑如下:

  1. 摄入层 (Ingestion) - Axum (Rust): 应对高并发、低延迟的事件写入是首要挑战。Java生态中的Spring WebFlux虽然是异步非阻塞的,但在极限性能、内存占用和CPU效率上,与基于Tokio的Rust框架Axum相比仍有差距。选择Axum的核心原因是:

    • 极致性能与资源效率: Rust没有GC,编译后的原生代码执行效率极高。一个极简的Axum服务就能处理数万QPS,且内存占用稳定在几十MB,这对于需要水平扩展的摄入节点来说,成本效益显著。
    • 内存安全与并发可靠性: Rust的所有权和借用检查机制在编译期就消除了数据竞争等并发问题,对于要求7x24小时稳定运行的摄入服务至关重要。
    • 外科手术式集成: 摄入服务功能单一、边界清晰,用Rust重写这一热点路径,对现有Java系统的侵入性最小。
  2. 存储层 (Storage) - TimescaleDB: 我们需要一个既能处理海量时序数据写入,又能被Spark高效读取的数据库。

    • 原生时序支持: TimescaleDB作为PostgreSQL的扩展,其Hypertable能自动按时间对数据进行分区(分块),写入性能不会随数据量增长而衰减。这完美匹配了我们的事件流场景。
    • 连续聚合 (Continuous Aggregates): 这是TimescaleDB的杀手级特性。我们可以定义一个物化视图,它能自动、增量地在后台预计算实时特征(如每分钟的用户点击次数)。上层服务查询这些预计算结果,延迟极低。
    • 通用SQL接口: 它保留了完整的PostgreSQL功能和JDBC兼容性。这意味着现有的Java服务和Spark集群无需任何特殊驱动,就能像访问普通PG数据库一样读写数据,极大降低了集成成本。
  3. 计算层 (Computation) - Apache Spark (Java): 对于复杂的历史特征计算,Spark的分布式计算能力无可替代。

    • 技术复用: 团队已经具备深厚的Java和Spark使用经验,复用现有技术栈和基础设施是最稳妥的选择。
    • 强大的表达能力: Spark SQL和DataFrame API可以轻松实现复杂的时间窗口分析、用户行为序列建模等算法。
    • 生态整合: Spark能通过JDBC稳定地从TimescaleDB读取海量数据进行批处理,并将结果写回数据库或数据湖。

核心实现:代码层面的串联

1. Axum 实时事件摄入服务

这是整个架构的入口。服务必须做到轻量、高效且健壮。

Cargo.toml 依赖:

[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres", "time", "json" ] }
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

main.rs 核心代码:

use axum::{
    extract::{Json, State},
    http::StatusCode,
    response::{IntoResponse, Response},
    routing::post,
    Router,
};
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, info};

// 定义应用状态,主要用于共享数据库连接池
#[derive(Clone)]
struct AppState {
    pool: PgPool,
}

// 定义接收的事件体
// 在真实项目中,这里会有更复杂的字段和验证
#[derive(Deserialize, Debug)]
struct UserEvent {
    user_id: String,
    event_type: String,
    item_id: String,
    // 使用 serde_json::Value 可以接收任意结构的 JSON 对象
    properties: serde_json::Value, 
}

// 自定义应用错误类型,方便统一处理
#[derive(thiserror::Error, Debug)]
enum AppError {
    #[error("Database error: {0}")]
    Sqlx(#[from] sqlx::Error),
    #[error("Internal server error")]
    Internal,
}

// 实现 IntoResponse trait,让我们的错误类型可以被 Axum 转换为 HTTP 响应
impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, message) = match self {
            AppError::Sqlx(e) => {
                // 不应将详细的数据库错误暴露给客户端
                error!("Database query failed: {}", e);
                (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
            }
            AppError::Internal => {
                error!("An unexpected internal error occurred.");
                (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
            }
        };
        (status, message).into_response()
    }
}

#[tokio::main]
async fn main() {
    // 初始化日志
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    // 从环境变量读取数据库连接字符串,这是生产实践
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");

    // 创建数据库连接池
    // 生产环境中,连接数、超时等参数需要仔细调优
    let pool = PgPoolOptions::new()
        .max_connections(50)
        .connect(&database_url)
        .await
        .expect("Failed to create database pool");

    info!("Database pool created successfully.");

    let app_state = Arc::new(AppState { pool });

    // 定义路由
    let app = Router::new()
        .route("/events", post(ingest_event))
        .with_state(app_state.as_ref().clone());

    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
    info!("Listening on {}", addr);

    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

// 事件处理函数
async fn ingest_event(
    State(state): State<AppState>,
    Json(payload): Json<UserEvent>,
) -> Result<StatusCode, AppError> {
    info!("Received event for user: {}", payload.user_id);

    // 这里的 SQL 查询是关键。
    // `TIMESTAMPTZ 'NOW()'` 使用数据库服务器时间,避免客户端时间不一致问题。
    // `properties` 字段直接存为 JSONB 类型,灵活且可查询。
    sqlx::query!(
        r#"
        INSERT INTO user_events (event_time, user_id, event_type, item_id, properties)
        VALUES (NOW(), $1, $2, $3, $4)
        "#,
        payload.user_id,
        payload.event_type,
        payload.item_id,
        payload.properties,
    )
    .execute(&state.pool)
    .await?; // 使用 ? 操作符,如果出错会自动转换为 AppError::Sqlx

    // 单元测试思路:
    // 1. Mock 数据库连接,验证 SQL 语句和参数是否正确。
    // 2. 构造合法的 JSON payload,验证是否返回 201 CREATED。
    // 3. 构造非法的 JSON payload,验证是否返回 400 BAD REQUEST。
    // 4. 模拟数据库连接失败,验证是否返回 500 INTERNAL SERVER ERROR。

    Ok(StatusCode::CREATED)
}

2. TimescaleDB 数据表与连续聚合

数据库层面是承上启下的关键。

创建 Hypertable:

-- 原始事件表
CREATE TABLE user_events (
    event_time TIMESTAMPTZ NOT NULL,
    user_id    VARCHAR(100) NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    item_id    VARCHAR(100),
    properties JSONB
);

-- 将其转换为 TimescaleDB 的 Hypertable,按 event_time 字段分区
-- chunk_time_interval 设为 1 天,是生产环境中常见的配置
SELECT create_hypertable('user_events', 'event_time', chunk_time_interval => INTERVAL '1 day');

-- 为高频查询字段创建索引
CREATE INDEX ON user_events (user_id, event_time DESC);
CREATE INDEX ON user_events (event_type, event_time DESC);

创建连续聚合以生成实时特征:
假设我们需要一个实时特征:“每个用户最近5分钟的点击次数”。

-- 创建一个连续聚合视图
CREATE MATERIALIZED VIEW user_click_counts_5m
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', event_time) AS bucket,
    user_id,
    COUNT(*) AS click_count
FROM
    user_events
WHERE
    event_type = 'click'
GROUP BY
    bucket,
    user_id;

-- 设置刷新策略:每分钟刷新过去10分钟的数据
-- 这样既保证了数据的准实时性,又避免了频繁的全量刷新
SELECT add_continuous_aggregate_policy('user_click_counts_5m',
    start_offset => INTERVAL '10 minutes',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

ML模型服务可以直接查询 user_click_counts_5m 这个视图,通过 WHERE user_id = ? AND bucket > NOW() - INTERVAL '5 minutes' 这样的SQL,在几毫秒内获取到所需特征。

3. Apache Spark 批处理作业

这个作业将由调度系统(如Airflow)每天定时触发,计算复杂的历史特征。

Java Spark 代码片段:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import static org.apache.spark.sql.functions.*;

import java.util.Properties;

public class HistoricalFeatureJob {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("HistoricalFeatureJob")
                // .master("local[*]") // 本地测试时使用
                .getOrCreate();

        // 数据库连接属性
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "your_user");
        connectionProperties.put("password", "your_password");
        // 使用 PostgreSQL 的 JDBC 驱动
        connectionProperties.put("driver", "org.postgresql.Driver");

        String jdbcUrl = "jdbc:postgresql://your_timescaledb_host:5432/your_db";

        // 从 TimescaleDB 读取过去30天的数据
        // 这里的关键是使用 Spark 的分区读取能力来避免 OOM
        // 比如可以按日期分区并行读取,这里为了简化,读取全表
        Dataset<Row> events = spark.read()
                .jdbc(jdbcUrl, "user_events", connectionProperties)
                .where(col("event_time").gt(functions.current_timestamp().minus(expr("INTERVAL 30 DAYS"))));

        // 计算一个复杂的特征:每个用户过去30天内购买总额和购买次数
        Dataset<Row> userPurchaseFeatures = events
                .where(col("event_type").equalTo("purchase"))
                // properties 字段是 JSON,需要解析
                // `get_json_object` 是 Spark SQL 的函数
                .withColumn("price", get_json_object(col("properties"), "$.price").cast("double"))
                .withColumn("quantity", get_json_object(col("properties"), "$.quantity").cast("int"))
                .withColumn("total_value", col("price").multiply(col("quantity")))
                .groupBy("user_id")
                .agg(
                    sum("total_value").as("total_purchase_value_30d"),
                    count("*").as("purchase_count_30d")
                );
        
        // 计算另一个特征:用户最偏好的商品类别
        Dataset<Row> userCategoryPreference = events
                .withColumn("category", get_json_object(col("properties"), "$.category"))
                .filter(col("category").isNotNull())
                .groupBy("user_id", "category")
                .count()
                // 使用窗口函数找到每个用户互动次数最多的类别
                .withColumn("rank", rank().over(
                    org.apache.spark.sql.expressions.Window
                        .partitionBy("user_id")
                        .orderBy(col("count").desc())
                ))
                .where(col("rank").equalTo(1))
                .select("user_id", col("category").as("preferred_category_30d"));
        
        // 将多个特征关联起来
        Dataset<Row> finalFeatures = userPurchaseFeatures
                .join(userCategoryPreference, "user_id", "left_outer");
                
        // 将结果写回一个新的数据库表中,供线上服务查询
        // `SaveMode.Overwrite` 保证了每次任务运行时都会覆盖旧数据
        finalFeatures.write()
            .mode("Overwrite")
            .jdbc(jdbcUrl, "batch_user_features", connectionProperties);

        spark.stop();
    }
}

这个Java服务在查询时,会同时请求TimescaleDB的实时特征表和这个批处理结果表,将两种特征合并后提供给模型。

架构的局限性与未来迭代方向

这个架构并非银弹。一个显而易见的挑战是维护一个异构技术栈的复杂性。团队需要同时具备Rust、Java、Spark和PostgreSQL的专业知识。Rust的生态系统相对年轻,寻找经验丰富的工程师可能是一个挑战。

其次,实时路径和批处理路径的数据存在最终一致性的问题。在Spark任务运行期间,新写入的实时数据不会被包含在当次的计算结果中,这可能导致短时间内的特征不一致。对于某些对一致性要求极高的场景,这可能是一个问题。

未来的迭代方向可以考虑:

  1. 统一特征存储: 将实时和离线特征都写入一个高性能的在线KV存储(如Redis或DynamoDB),供Java服务层统一查询,以降低查询链路的复杂性。Spark将计算结果写入KV,实时聚合结果也通过一个简单的服务同步到KV。
  2. 探索流批一体: 随着Flink等框架的成熟,可以评估使用Flink SQL同时处理实时和批处理任务的可能性,以简化技术栈。但这需要对现有批处理逻辑进行大量重构,并评估其在处理海量历史回溯任务时的性能和成本。
  3. 服务化与平台化: 将特征的计算、存储和服务抽象成一个内部的特征平台(Feature Store)。这能让算法工程师通过配置文件或简单的API来定义和使用特征,而无需关心底层的实现细节,从而提升整个团队的研发效率。

  目录