为机器学习模型提供毫秒级新鲜度的特征,是许多实时AI应用(如在线推荐、实时风控)成功的关键。然而,构建一个兼具高吞吐、低延迟和高可用性的实时特征管道,是一项复杂的系统工程挑战。我们需要处理源源不断的数据流,进行有状态的计算,将结果存储在低延迟的存储中,并通过高性能API暴露给模型服务,同时保证全链路的可观测性。这不仅仅是技术的堆砌,更是对架构权衡的深刻理解。
定义复杂的工程问题:实时特征管道的技术要求
在着手设计之前,必须清晰地定义我们的目标和约束。一个生产级的实时特征管道需要满足以下几点核心要求:
- 数据源: 能够消费来自Kafka的高吞吐事件流(例如,用户行为日志、交易流水),峰值可达数十万QPS。
- 处理逻辑: 必须支持有状态的流式计算。例如,计算用户最近5分钟内的点击次数、过去1小时内的平均交易金额等。处理语义必须保证Exactly-Once,确保数据不重不丢。
- 服务接口: 提供一个根据主键(如
user_id
)查询特征向量的API。对这个API的性能要求极为苛刻:P99延迟必须控制在50ms以内。 - 可观测性: 必须能够监控整个管道的关键指标,包括数据流入速率、Flink作业处理延迟、Checkpoint成功率、特征服务API的QPS和延迟、以及中间件的健康状况。
- 可视化: 需要一个内部运维仪表盘,用于实时展示系统核心指标,帮助工程师快速定位问题。
架构决策的核心:Flink与Ktor之间的桥梁
整个系统的核心在于如何高效、可靠地连接流处理层(Apache Flink)和在线服务层(Ktor)。数据在Flink中计算完成后,必须以某种形式“物化”出来,供Ktor查询。这里存在几种主流方案,每种方案都有其鲜明的优缺点。
方案A: Flink -> Kafka -> Ktor (物化视图日志)
这个方案将Flink计算出的特征结果再写回一个新的Kafka Topic。Ktor服务作为消费者,订阅这个Topic,并将数据更新到其本地内存缓存(如Caffeine)中。
- 优势:
- 解耦: Flink和Ktor之间通过Kafka彻底解耦,双方的部署和伸缩互不影响。
- 高吞吐: Kafka作为中间层,能够承受极高的写入吞吐量。
- 回溯能力: Kafka的日志特性使得消费端可以重放数据,便于故障恢复或缓存重建。
- 劣势:
- 延迟: 引入了额外的网络往返(Flink -> Kafka -> Ktor),增加了端到端的延迟,对我们50ms的P99目标构成了严峻挑战。
- 状态一致性: Ktor服务通常是无状态部署的多个实例。每个实例维护自己的本地缓存,这会导致不同实例间的缓存数据存在短暂不一致。
- 资源开销: 增加了一个Kafka Topic及其维护成本。
方案B: Flink -> 外部KV存储 (如Redis) -> Ktor
此方案中,Flink将计算结果直接写入一个外部的高性能键值存储系统(如Redis或KeyDB)。Ktor服务在收到请求时,直接查询这个KV存储。
- 优势:
- 低延迟: Ktor查询Redis的延迟通常在1ms级别,非常适合低延迟场景。
- 状态一致性: 所有Ktor实例共享同一个外部数据源,保证了数据的一致性。
- 成熟度: Redis是业界广泛使用且极其成熟的解决方案,运维和客户端支持都非常完善。
- 劣势:
- 写入瓶颈: Flink作业需要频繁地对Redis进行写操作,在高吞吐场景下,Redis可能成为整个系统的瓶颈。这需要对Redis进行仔细的容量规划和性能调优。
- 依赖外部系统: 引入了一个新的、需要高可用保障的外部依赖。
方案C: Flink Queryable State
Flink自身提供了一个名为”Queryable State”的特性,允许外部应用直接查询Flink作业内部的状态。
- 优势:
- 极致低延迟: 数据无需离开Flink的TaskManager进程,Ktor直接通过RPC查询状态,理论上延迟最低。
- 架构简化: 无需引入额外的中间存储系统。
- 劣势:
- 紧耦合: 服务层(Ktor)与计算层(Flink)紧密耦合。Flink作业的重启、扩缩容或失败都会直接影响到Ktor服务的可用性。
- 运维复杂性: Queryable State的生产环境运维案例相对较少,其稳定性、安全性和性能调优比成熟的KV存储更具挑战性。
- 查询能力有限: 只支持简单的点查(Key-lookup),复杂的查询模式不支持。
最终选择与理由
在真实项目中,稳定性和可维护性往往优先于极致的理论性能。方案C(Queryable State)虽然延迟最低,但其紧耦合和运维复杂性带来的风险过高,不适合作为我们平台的第一版选型。方案A(Kafka)的延迟链路太长,难以稳定满足50ms的P99要求。
因此,方案B (Flink -> Redis -> Ktor) 成为了我们最终的选择。它在延迟、解耦和系统成熟度之间取得了最佳平衡。Redis的写入瓶颈是已知的风险,但可以通过合理的Redis集群规划、客户端侧的批量写入(Batching)以及使用性能更强的Redis替代品(如KeyDB)来缓解。这是一个务实且可靠的生产级方案。
flowchart TD subgraph "数据源" Kafka_Input[Kafka: user_events] end subgraph "实时计算层 (Apache Flink)" Flink_Job[Flink Job: Stateful Feature Engineering] end subgraph "特征存储层" Redis[Redis Cluster: Feature KV Store] end subgraph "在线服务层 (Ktor)" Ktor_Service[Ktor Service: Feature API] end subgraph "消费端" ML_Model[ML Model Service] end subgraph "可观测性" Prometheus[Prometheus Server] Dashboard[Jotai Dashboard] end Kafka_Input -- events --> Flink_Job Flink_Job -- computed features --> Redis Ktor_Service -- query feature --> Redis ML_Model -- request feature --> Ktor_Service Flink_Job -- metrics --> Prometheus Ktor_Service -- metrics --> Prometheus Redis -- redis_exporter metrics --> Prometheus Dashboard -- promql query --> Prometheus
核心实现概览
1. Apache Flink 特征计算作业
这个Flink作业的核心是消费Kafka数据,按userId
分组,然后在时间窗口上进行聚合,最后将结果写入Redis。
// build.gradle.kts dependencies
// implementation("org.apache.flink:flink-streaming-java:${flinkVersion}")
// implementation("org.apache.flink:flink-connector-kafka:${flinkVersion}")
// implementation("org.apache.flink:flink-connector-redis_2.12:1.1.5") // Note: Use a compatible redis connector
// implementation("org.apache.flink:flink-metrics-prometheus:${flinkVersion}")
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
// Assume a simple data class for user events
data class UserEvent(val userId: String, val eventType: String, val timestamp: Long)
// Mapper to write aggregated counts to Redis
class FeatureSinkMapper : RedisMapper<Pair<String, Long>> {
// We use HSET to store features for a user under a single hash key
// KEY: "user_features:<userId>"
// FIELD: "clicks_5min"
// VALUE: <count>
override fun getCommandDescription(): RedisCommandDescription {
return RedisCommandDescription(RedisCommand.HSET, "user_features")
}
override fun getKeyFromData(data: Pair<String, Long>): String {
return data.first // userId
}
override fun getValueFromData(data: Pair<String, Long>): String {
return data.second.toString() // click count
}
}
fun main() {
val config = Configuration()
// Enable Prometheus reporter on a random port. In production, fix this port.
config.setString("metrics.reporter.prom.class", "org.apache.flink.metrics.prometheus.PrometheusReporter")
config.setString("metrics.reporter.prom.port", "9250-9260")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(5000) // Checkpoint every 5 seconds for Exactly-Once
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("kafka:9092")
.setTopics("user_events")
.setGroupId("feature_engineering_group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()
val eventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
// Basic JSON parsing and error handling. In production, use a robust library like Jackson.
.mapNotNull {
try {
// A very basic parser. Replace with a proper one.
val parts = it.split(",")
UserEvent(parts[0], parts[1], parts[2].toLong())
} catch (e: Exception) {
// Log and drop malformed records
null
}
}
// Assign timestamps and watermarks for event time processing
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness<UserEvent>(java.time.Duration.ofSeconds(10))
.withTimestampAssigner { event, _ -> event.timestamp }
)
val featureStream = eventStream
.keyBy { it.userId }
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(object : AggregateFunction<UserEvent, Long, Long> {
override fun createAccumulator() = 0L
override fun add(value: UserEvent, accumulator: Long) = accumulator + 1
override fun getResult(accumulator: Long) = accumulator
override fun merge(a: Long, b: Long) = a + b
},
// ProcessWindowFunction to get window metadata and userId
{ key, _, inputs, out ->
out.collect(Pair(key, inputs.first()))
})
// Configure Redis Sink
val jedisPoolConfig = FlinkJedisPoolConfig.Builder()
.setHost("redis")
.setPort(6379)
.setTimeout(2000)
.build()
featureStream.addSink(RedisSink(jedisPoolConfig, FeatureSinkMapper()))
env.execute("Real-time Feature Engineering Job")
}
关键点:
- Exactly-Once: 通过
enableCheckpointing
开启Flink的检查点机制,结合可重放的Kafka源和幂等的RedisHSET
操作,可以实现端到端的Exactly-Once语义。 - 可观测性: 通过配置
flink-metrics-prometheus
,Flink作业会自动暴露大量内部指标(如numRecordsInPerSecond
,checkpoint_duration
等),供Prometheus抓取。 - Redis Sink: 使用
flink-connector-redis
将聚合结果写入Redis。这里我们选择HSET
命令,将一个用户的所有特征存储在同一个Redis Hash中,这比为每个特征设置一个独立的key更高效。
2. Ktor 高性能特征服务
Ktor应用需要极致的性能。我们使用Netty引擎,并集成Lettuce作为异步Redis客户端。为了进一步降低延迟,我们还将在Ktor内部增加一个Caffeine本地缓存。
// build.gradle.kts dependencies
// implementation("io.ktor:ktor-server-core-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-content-negotiation-jvm:$ktorVersion")
// implementation("io.ktor:ktor-serialization-kotlinx-json-jvm:$ktorVersion")
// implementation("io.ktor:ktor-server-micrometer-jvm:$ktorVersion")
// implementation("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")
// implementation("io.lettuce:lettuce-core:$lettuceVersion")
// implementation("com.github.ben-manes.caffeine:caffeine:$caffeineVersion")
import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.Caffeine
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.metrics.micrometer.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulRedisConnection
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import kotlinx.serialization.Serializable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
@Serializable
data class FeatureVector(val userId: String, val features: Map<String, String>)
object FeatureRepository {
private val redisClient: RedisClient = RedisClient.create("redis://redis:6379")
private val connection: StatefulRedisConnection<String, String> = redisClient.connect()
private val asyncCommands = connection.async()
// A two-layer cache: Caffeine (L1, in-memory) -> Redis (L2, distributed)
val cache: AsyncLoadingCache<String, Map<String, String>> = Caffeine.newBuilder()
.maximumSize(10_000) // Max 10,000 users in local cache
.expireAfterWrite(1, TimeUnit.MINUTES) // Cache entries for 1 minute
.buildAsync { userId, _ ->
// This lambda is called on cache miss
fetchFromRedis(userId)
}
private fun fetchFromRedis(userId: String): CompletableFuture<Map<String, String>> {
// In a real project, handle connection errors, empty results, etc.
return asyncCommands.hgetall("user_features:$userId").toCompletableFuture()
}
}
fun main() {
embeddedServer(Netty, port = 8080, host = "0.0.0.0") {
module()
}.start(wait = true)
}
fun Application.module() {
// Prometheus registry for metrics
val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
install(MicrometerMetrics) {
registry = appMicrometerRegistry
}
install(ContentNegotiation) {
json()
}
routing {
get("/features/{userId}") {
val userId = call.parameters["userId"] ?: return@get call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Missing userId")
try {
val features = FeatureRepository.cache.get(userId).await()
if (features.isEmpty()) {
call.respond(io.ktor.http.HttpStatusCode.NotFound, "Features not found for user $userId")
} else {
call.respond(FeatureVector(userId, features))
}
} catch (e: Exception) {
// Log the exception properly
log.error("Failed to fetch features for $userId", e)
call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Error fetching features")
}
}
get("/metrics") {
call.respond(appMicrometerRegistry.scrape())
}
}
}
// Extension to convert CompletableFuture to Deferred for better coroutine integration
suspend fun <T> CompletableFuture<T>.await(): T = kotlinx.coroutines.future.await()
关键点:
- 异步非阻塞: Ktor + Netty + Lettuce 构成了一个完全异步的I/O栈,这是实现高吞吐和低延迟的基础。
- 两级缓存: Caffeine作为L1缓存,可以吸收对热点用户的大量重复查询,避免了对Redis的频繁访问,显著降低了P99延迟并保护了Redis。
- 可观测性:
ktor-server-micrometer
插件可以自动收集HTTP请求的延迟、QPS、状态码分布等指标,并通过/metrics
端点暴露给Prometheus。
3. 可观测性配置与前端可视化
Prometheus的配置非常直接,只需添加对Flink和Ktor的抓取目标。
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'flink'
static_configs:
- targets: ['flink-jobmanager:9249'] # Flink JobManager metrics port
# In a real k8s setup, use service discovery
# Add targets for each TaskManager (e.g., 'flink-taskmanager-1:9250')
- job_name: 'ktor-feature-service'
static_configs:
- targets: ['ktor-service-1:8080', 'ktor-service-2:8080']
metrics_path: /metrics
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121'] # Using redis_exporter sidecar
对于前端仪表盘,Jotai以其原子化(atomic)的状态管理模型,非常适合构建这种需要从多个数据源(Prometheus API)拉取并独立更新的监控界面。
// FeaturePipelineMetrics.tsx (React Component)
import { atom, useAtom } from 'jotai';
import { useQuery } from 'react-query'; // Or any data fetching library
// Atom to hold the latency data from Prometheus
const latencyAtom = atom<any[]>([]);
// Async atom definition is more advanced, for simplicity we use a standard atom
// updated by a component hook.
const fetchLatencyData = async () => {
// Prometheus API endpoint to query p99 latency for Ktor service
const query = `histogram_quantile(0.99, sum(rate(http_server_requests_seconds_bucket{job="ktor-feature-service"}[5m])) by (le))`;
const response = await fetch(`http://prometheus:9090/api/v1/query?query=${encodeURIComponent(query)}`);
if (!response.ok) {
throw new Error('Network response was not ok');
}
const data = await response.json();
// Process data for charting...
return data.data.result;
};
export const LatencyDashboard = () => {
const [latencyData, setLatencyData] = useAtom(latencyAtom);
// Use a library like react-query to handle periodic fetching
const { data, error, isLoading } = useQuery('latency', fetchLatencyData, {
refetchInterval: 15000, // Refetch every 15 seconds
onSuccess: (newData) => setLatencyData(newData), // Update Jotai atom on success
});
if (isLoading) return <div>Loading P99 Latency...</div>;
if (error) return <div>Error fetching data.</div>;
// A placeholder for a charting library like Recharts or D3
return (
<div>
<h2>API P99 Latency (ms)</h2>
{/* <LineChart data={latencyData} /> */}
<pre>{JSON.stringify(latencyData, null, 2)}</pre>
</div>
);
};
关键点:
- 原子化状态: 每个指标(如QPS、延迟、Flink Checkpoint时长)都可以是独立的Jotai
atom
。当某个指标的数据更新时,只有订阅了该atom
的组件会重新渲染,避免了不必要的全局刷新。 - 数据驱动: UI完全由从Prometheus获取的数据驱动,Jotai负责状态的同步和分发。
架构的扩展性与局限性
当前这套基于Flink、Redis和Ktor的架构,为实时特征平台提供了一个坚实、可靠且高性能的起点。它的扩展性体现在几个方面:首先,可以通过增加更多的Flink TaskManager和Ktor实例来水平扩展计算和服务能力;其次,可以在Flink作业中加入更复杂的特征工程逻辑,甚至加载模型进行实时推理;最后,Ktor服务可以轻松扩展出gRPC接口,为内部服务提供更高效的通信协议。
然而,这套架构也并非没有局限性。其核心瓶颈在于Redis。当用户基数和特征维度急剧增长时(高基数问题),Redis集群的内存和CPU压力会成为主要挑战。此外,特征的Schema目前是隐式的,硬编码在代码中,一个更成熟的系统需要引入Schema Registry来管理特征的定义、版本和演进。最后,该架构主要解决了“实时特征”的问题,对于需要回溯历史数据的“离线特征”,还需要构建一套独立的批处理管道(例如使用Spark)将数据回填到Redis中,以形成完整的特征存储。
```