构建支持AI模型的实时特征管道 Ktor Flink与Prometheus的生产实践


为机器学习模型提供毫秒级新鲜度的特征,是许多实时AI应用(如在线推荐、实时风控)成功的关键。然而,构建一个兼具高吞吐、低延迟和高可用性的实时特征管道,是一项复杂的系统工程挑战。我们需要处理源源不断的数据流,进行有状态的计算,将结果存储在低延迟的存储中,并通过高性能API暴露给模型服务,同时保证全链路的可观测性。这不仅仅是技术的堆砌,更是对架构权衡的深刻理解。

定义复杂的工程问题:实时特征管道的技术要求

在着手设计之前,必须清晰地定义我们的目标和约束。一个生产级的实时特征管道需要满足以下几点核心要求:

  1. 数据源: 能够消费来自Kafka的高吞吐事件流(例如,用户行为日志、交易流水),峰值可达数十万QPS。
  2. 处理逻辑: 必须支持有状态的流式计算。例如,计算用户最近5分钟内的点击次数、过去1小时内的平均交易金额等。处理语义必须保证Exactly-Once,确保数据不重不丢。
  3. 服务接口: 提供一个根据主键(如user_id)查询特征向量的API。对这个API的性能要求极为苛刻:P99延迟必须控制在50ms以内。
  4. 可观测性: 必须能够监控整个管道的关键指标,包括数据流入速率、Flink作业处理延迟、Checkpoint成功率、特征服务API的QPS和延迟、以及中间件的健康状况。
  5. 可视化: 需要一个内部运维仪表盘,用于实时展示系统核心指标,帮助工程师快速定位问题。

架构决策的核心:Flink与Ktor之间的桥梁

整个系统的核心在于如何高效、可靠地连接流处理层(Apache Flink)和在线服务层(Ktor)。数据在Flink中计算完成后,必须以某种形式“物化”出来,供Ktor查询。这里存在几种主流方案,每种方案都有其鲜明的优缺点。

这个方案将Flink计算出的特征结果再写回一个新的Kafka Topic。Ktor服务作为消费者,订阅这个Topic,并将数据更新到其本地内存缓存(如Caffeine)中。

  • 优势:
    • 解耦: Flink和Ktor之间通过Kafka彻底解耦,双方的部署和伸缩互不影响。
    • 高吞吐: Kafka作为中间层,能够承受极高的写入吞吐量。
    • 回溯能力: Kafka的日志特性使得消费端可以重放数据,便于故障恢复或缓存重建。
  • 劣势:
    • 延迟: 引入了额外的网络往返(Flink -> Kafka -> Ktor),增加了端到端的延迟,对我们50ms的P99目标构成了严峻挑战。
    • 状态一致性: Ktor服务通常是无状态部署的多个实例。每个实例维护自己的本地缓存,这会导致不同实例间的缓存数据存在短暂不一致。
    • 资源开销: 增加了一个Kafka Topic及其维护成本。

此方案中,Flink将计算结果直接写入一个外部的高性能键值存储系统(如Redis或KeyDB)。Ktor服务在收到请求时,直接查询这个KV存储。

  • 优势:
    • 低延迟: Ktor查询Redis的延迟通常在1ms级别,非常适合低延迟场景。
    • 状态一致性: 所有Ktor实例共享同一个外部数据源,保证了数据的一致性。
    • 成熟度: Redis是业界广泛使用且极其成熟的解决方案,运维和客户端支持都非常完善。
  • 劣势:
    • 写入瓶颈: Flink作业需要频繁地对Redis进行写操作,在高吞吐场景下,Redis可能成为整个系统的瓶颈。这需要对Redis进行仔细的容量规划和性能调优。
    • 依赖外部系统: 引入了一个新的、需要高可用保障的外部依赖。

Flink自身提供了一个名为”Queryable State”的特性,允许外部应用直接查询Flink作业内部的状态。

  • 优势:
    • 极致低延迟: 数据无需离开Flink的TaskManager进程,Ktor直接通过RPC查询状态,理论上延迟最低。
    • 架构简化: 无需引入额外的中间存储系统。
  • 劣势:
    • 紧耦合: 服务层(Ktor)与计算层(Flink)紧密耦合。Flink作业的重启、扩缩容或失败都会直接影响到Ktor服务的可用性。
    • 运维复杂性: Queryable State的生产环境运维案例相对较少,其稳定性、安全性和性能调优比成熟的KV存储更具挑战性。
    • 查询能力有限: 只支持简单的点查(Key-lookup),复杂的查询模式不支持。

最终选择与理由

在真实项目中,稳定性和可维护性往往优先于极致的理论性能。方案C(Queryable State)虽然延迟最低,但其紧耦合和运维复杂性带来的风险过高,不适合作为我们平台的第一版选型。方案A(Kafka)的延迟链路太长,难以稳定满足50ms的P99要求。

因此,方案B (Flink -> Redis -> Ktor) 成为了我们最终的选择。它在延迟、解耦和系统成熟度之间取得了最佳平衡。Redis的写入瓶颈是已知的风险,但可以通过合理的Redis集群规划、客户端侧的批量写入(Batching)以及使用性能更强的Redis替代品(如KeyDB)来缓解。这是一个务实且可靠的生产级方案。

flowchart TD
    subgraph "数据源"
        Kafka_Input[Kafka: user_events]
    end

    subgraph "实时计算层 (Apache Flink)"
        Flink_Job[Flink Job: Stateful Feature Engineering]
    end

    subgraph "特征存储层"
        Redis[Redis Cluster: Feature KV Store]
    end

    subgraph "在线服务层 (Ktor)"
        Ktor_Service[Ktor Service: Feature API]
    end



    subgraph "消费端"
        ML_Model[ML Model Service]
    end
    
    subgraph "可观测性"
        Prometheus[Prometheus Server]
        Dashboard[Jotai Dashboard]
    end

    Kafka_Input -- events --> Flink_Job
    Flink_Job -- computed features --> Redis
    Ktor_Service -- query feature --> Redis
    ML_Model -- request feature --> Ktor_Service
    
    Flink_Job -- metrics --> Prometheus
    Ktor_Service -- metrics --> Prometheus
    Redis -- redis_exporter metrics --> Prometheus
    
    Dashboard -- promql query --> Prometheus

核心实现概览

这个Flink作业的核心是消费Kafka数据,按userId分组,然后在时间窗口上进行聚合,最后将结果写入Redis。

// build.gradle.kts dependencies
// implementation("org.apache.flink:flink-streaming-java:${flinkVersion}")
// implementation("org.apache.flink:flink-connector-kafka:${flinkVersion}")
// implementation("org.apache.flink:flink-connector-redis_2.12:1.1.5") // Note: Use a compatible redis connector
// implementation("org.apache.flink:flink-metrics-prometheus:${flinkVersion}")

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper

// Assume a simple data class for user events
data class UserEvent(val userId: String, val eventType: String, val timestamp: Long)

// Mapper to write aggregated counts to Redis
class FeatureSinkMapper : RedisMapper<Pair<String, Long>> {
    // We use HSET to store features for a user under a single hash key
    // KEY: "user_features:<userId>"
    // FIELD: "clicks_5min"
    // VALUE: <count>
    override fun getCommandDescription(): RedisCommandDescription {
        return RedisCommandDescription(RedisCommand.HSET, "user_features")
    }

    override fun getKeyFromData(data: Pair<String, Long>): String {
        return data.first // userId
    }

    override fun getValueFromData(data: Pair<String, Long>): String {
        return data.second.toString() // click count
    }
}


fun main() {
    val config = Configuration()
    // Enable Prometheus reporter on a random port. In production, fix this port.
    config.setString("metrics.reporter.prom.class", "org.apache.flink.metrics.prometheus.PrometheusReporter")
    config.setString("metrics.reporter.prom.port", "9250-9260")

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    env.enableCheckpointing(5000) // Checkpoint every 5 seconds for Exactly-Once

    val kafkaSource = KafkaSource.builder<String>()
        .setBootstrapServers("kafka:9092")
        .setTopics("user_events")
        .setGroupId("feature_engineering_group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(SimpleStringSchema())
        .build()

    val eventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
        // Basic JSON parsing and error handling. In production, use a robust library like Jackson.
        .mapNotNull {
            try {
                // A very basic parser. Replace with a proper one.
                val parts = it.split(",")
                UserEvent(parts[0], parts[1], parts[2].toLong())
            } catch (e: Exception) {
                // Log and drop malformed records
                null
            }
        }
        // Assign timestamps and watermarks for event time processing
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.forBoundedOutOfOrderness<UserEvent>(java.time.Duration.ofSeconds(10))
                .withTimestampAssigner { event, _ -> event.timestamp }
        )

    val featureStream = eventStream
        .keyBy { it.userId }
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .aggregate(object : AggregateFunction<UserEvent, Long, Long> {
            override fun createAccumulator() = 0L
            override fun add(value: UserEvent, accumulator: Long) = accumulator + 1
            override fun getResult(accumulator: Long) = accumulator
            override fun merge(a: Long, b: Long) = a + b
        }, 
        // ProcessWindowFunction to get window metadata and userId
        { key, _, inputs, out ->
            out.collect(Pair(key, inputs.first()))
        })

    // Configure Redis Sink
    val jedisPoolConfig = FlinkJedisPoolConfig.Builder()
        .setHost("redis")
        .setPort(6379)
        .setTimeout(2000)
        .build()
    
    featureStream.addSink(RedisSink(jedisPoolConfig, FeatureSinkMapper()))

    env.execute("Real-time Feature Engineering Job")
}

关键点:

  • Exactly-Once: 通过enableCheckpointing开启Flink的检查点机制,结合可重放的Kafka源和幂等的Redis HSET操作,可以实现端到端的Exactly-Once语义。
  • 可观测性: 通过配置flink-metrics-prometheus,Flink作业会自动暴露大量内部指标(如numRecordsInPerSecond, checkpoint_duration等),供Prometheus抓取。
  • Redis Sink: 使用flink-connector-redis将聚合结果写入Redis。这里我们选择HSET命令,将一个用户的所有特征存储在同一个Redis Hash中,这比为每个特征设置一个独立的key更高效。

2. Ktor 高性能特征服务

Ktor应用需要极致的性能。我们使用Netty引擎,并集成Lettuce作为异步Redis客户端。为了进一步降低延迟,我们还将在Ktor内部增加一个Caffeine本地缓存。

// build.gradle.kts dependencies
// implementation("io.ktor:ktor-server-core-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-content-negotiation-jvm:$ktorVersion")
// implementation("io.ktor:ktor-serialization-kotlinx-json-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-micrometer-jvm:$ktorVersion")
// implementation("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")
// implementation("io.lettuce:lettuce-core:$lettuceVersion")
// implementation("com.github.ben-manes.caffeine:caffeine:$caffeineVersion")

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.Caffeine
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.metrics.micrometer.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulRedisConnection
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import kotlinx.serialization.Serializable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit

@Serializable
data class FeatureVector(val userId: String, val features: Map<String, String>)

object FeatureRepository {
    private val redisClient: RedisClient = RedisClient.create("redis://redis:6379")
    private val connection: StatefulRedisConnection<String, String> = redisClient.connect()
    private val asyncCommands = connection.async()

    // A two-layer cache: Caffeine (L1, in-memory) -> Redis (L2, distributed)
    val cache: AsyncLoadingCache<String, Map<String, String>> = Caffeine.newBuilder()
        .maximumSize(10_000) // Max 10,000 users in local cache
        .expireAfterWrite(1, TimeUnit.MINUTES) // Cache entries for 1 minute
        .buildAsync { userId, _ ->
            // This lambda is called on cache miss
            fetchFromRedis(userId)
        }
    
    private fun fetchFromRedis(userId: String): CompletableFuture<Map<String, String>> {
        // In a real project, handle connection errors, empty results, etc.
        return asyncCommands.hgetall("user_features:$userId").toCompletableFuture()
    }
}

fun main() {
    embeddedServer(Netty, port = 8080, host = "0.0.0.0") {
        module()
    }.start(wait = true)
}

fun Application.module() {
    // Prometheus registry for metrics
    val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
    install(MicrometerMetrics) {
        registry = appMicrometerRegistry
    }
    install(ContentNegotiation) {
        json()
    }
    
    routing {
        get("/features/{userId}") {
            val userId = call.parameters["userId"] ?: return@get call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Missing userId")
            
            try {
                val features = FeatureRepository.cache.get(userId).await()
                if (features.isEmpty()) {
                    call.respond(io.ktor.http.HttpStatusCode.NotFound, "Features not found for user $userId")
                } else {
                    call.respond(FeatureVector(userId, features))
                }
            } catch (e: Exception) {
                // Log the exception properly
                log.error("Failed to fetch features for $userId", e)
                call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Error fetching features")
            }
        }
        
        get("/metrics") {
            call.respond(appMicrometerRegistry.scrape())
        }
    }
}

// Extension to convert CompletableFuture to Deferred for better coroutine integration
suspend fun <T> CompletableFuture<T>.await(): T = kotlinx.coroutines.future.await()

关键点:

  • 异步非阻塞: Ktor + Netty + Lettuce 构成了一个完全异步的I/O栈,这是实现高吞吐和低延迟的基础。
  • 两级缓存: Caffeine作为L1缓存,可以吸收对热点用户的大量重复查询,避免了对Redis的频繁访问,显著降低了P99延迟并保护了Redis。
  • 可观测性: ktor-server-micrometer插件可以自动收集HTTP请求的延迟、QPS、状态码分布等指标,并通过/metrics端点暴露给Prometheus。

3. 可观测性配置与前端可视化

Prometheus的配置非常直接,只需添加对Flink和Ktor的抓取目标。

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'flink'
    static_configs:
      - targets: ['flink-jobmanager:9249'] # Flink JobManager metrics port
      # In a real k8s setup, use service discovery
      # Add targets for each TaskManager (e.g., 'flink-taskmanager-1:9250')

  - job_name: 'ktor-feature-service'
    static_configs:
      - targets: ['ktor-service-1:8080', 'ktor-service-2:8080']
    metrics_path: /metrics

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121'] # Using redis_exporter sidecar

对于前端仪表盘,Jotai以其原子化(atomic)的状态管理模型,非常适合构建这种需要从多个数据源(Prometheus API)拉取并独立更新的监控界面。

// FeaturePipelineMetrics.tsx (React Component)
import { atom, useAtom } from 'jotai';
import { useQuery } from 'react-query'; // Or any data fetching library

// Atom to hold the latency data from Prometheus
const latencyAtom = atom<any[]>([]); 

// Async atom definition is more advanced, for simplicity we use a standard atom
// updated by a component hook.

const fetchLatencyData = async () => {
  // Prometheus API endpoint to query p99 latency for Ktor service
  const query = `histogram_quantile(0.99, sum(rate(http_server_requests_seconds_bucket{job="ktor-feature-service"}[5m])) by (le))`;
  const response = await fetch(`http://prometheus:9090/api/v1/query?query=${encodeURIComponent(query)}`);
  if (!response.ok) {
    throw new Error('Network response was not ok');
  }
  const data = await response.json();
  // Process data for charting...
  return data.data.result;
};

export const LatencyDashboard = () => {
  const [latencyData, setLatencyData] = useAtom(latencyAtom);
  
  // Use a library like react-query to handle periodic fetching
  const { data, error, isLoading } = useQuery('latency', fetchLatencyData, {
    refetchInterval: 15000, // Refetch every 15 seconds
    onSuccess: (newData) => setLatencyData(newData), // Update Jotai atom on success
  });

  if (isLoading) return <div>Loading P99 Latency...</div>;
  if (error) return <div>Error fetching data.</div>;
  
  // A placeholder for a charting library like Recharts or D3
  return (
    <div>
      <h2>API P99 Latency (ms)</h2>
      {/* <LineChart data={latencyData} /> */}
      <pre>{JSON.stringify(latencyData, null, 2)}</pre>
    </div>
  );
};

关键点:

  • 原子化状态: 每个指标(如QPS、延迟、Flink Checkpoint时长)都可以是独立的Jotai atom。当某个指标的数据更新时,只有订阅了该atom的组件会重新渲染,避免了不必要的全局刷新。
  • 数据驱动: UI完全由从Prometheus获取的数据驱动,Jotai负责状态的同步和分发。

架构的扩展性与局限性

当前这套基于Flink、Redis和Ktor的架构,为实时特征平台提供了一个坚实、可靠且高性能的起点。它的扩展性体现在几个方面:首先,可以通过增加更多的Flink TaskManager和Ktor实例来水平扩展计算和服务能力;其次,可以在Flink作业中加入更复杂的特征工程逻辑,甚至加载模型进行实时推理;最后,Ktor服务可以轻松扩展出gRPC接口,为内部服务提供更高效的通信协议。

然而,这套架构也并非没有局限性。其核心瓶颈在于Redis。当用户基数和特征维度急剧增长时(高基数问题),Redis集群的内存和CPU压力会成为主要挑战。此外,特征的Schema目前是隐式的,硬编码在代码中,一个更成熟的系统需要引入Schema Registry来管理特征的定义、版本和演进。最后,该架构主要解决了“实时特征”的问题,对于需要回溯历史数据的“离线特征”,还需要构建一套独立的批处理管道(例如使用Spark)将数据回填到Redis中,以形成完整的特征存储。
```


  目录