一个看似简单的需求摆在面前:构建一个能与用户进行连续对话的LLM智能代理。用户通过无密码方式(WebAuthn)登录,系统需要记住上下文,并在分布式的、无状态的环境中可靠地运行。这里的核心矛盾点立刻浮现:Serverless架构的本质是无状态的,而一个有意义的LLM对话应用,其核心恰恰在于状态管理。
当一个请求通过API Gateway触发一个Python Lambda函数时,它处理完用户的输入并调用LLM API,然后这个Lambda实例的整个执行环境就被销毁了。用户的下一次提问,很可能会由一个全新的、对之前所有交互一无所知的Lambda实例来处理。如果不能有效解决会话状态的持久化,所谓的“连续对话”就无从谈起。
初步构想很简单:为每个会话引入一个外部状态存储。每次函数执行时,先从外部存储加载历史对话,处理完当前回合后,再将新的对话内容写回。问题在于,选择什么样的存储?
关系型数据库(如Postgres on RDS)?对于这种写密集型(每次对话都是一次写入)且数据模型相对简单的场景,关系型数据库的事务开销和连接管理显得过于笨重。在高并发下,连接池很快会成为Serverless环境中的瓶ăpadă颈。
内存缓存(如Redis)?速度极快,但作为主要的状态存储,数据持久性和成本是两个绕不开的问题。对于可能无限增长的对话历史,全部放在内存中不现实。
最终,我们将目光投向了Apache Cassandra。这个选择并非偶然,而是基于以下几个在生产环境中至关重要的考量:
- 为写而生: Cassandra的日志结构合并树(LSM-Tree)存储引擎使其具备极高的写入吞吐能力,这与对话历史记录的特性完美契合。
- 水平扩展: Cassandra的无主(Masterless)架构允许它像Serverless函数一样线性扩展。随着用户量和对话量的增长,只需简单地增加节点即可。
- 灵活的数据模型: 宽列存储模型非常适合存储非结构化的对话内容,同时也能够高效地存储结构化的WebAuthn凭证数据。
- 高可用性: 其内置的数据复制和多数据中心部署能力,为构建一个全球可用的应用提供了基础。
我们将使用Cassandra同时解决两个状态问题:WebAuthn用户的凭证状态和LLM的会话状态。
数据模型:Cassandra中的状态基石
在Cassandra中,数据模型的设计直接决定了系统的性能和可扩展性。一个常见的错误是带着关系型数据库的思路来设计表结构。在Cassandra中,我们必须围绕查询模式(Query-Driven Modeling)来设计。
首先是WebAuthn凭证的存储。我们需要根据用户ID快速查找其所有注册的凭证。
-- Keyspace 定义,设定复制策略
CREATE KEYSPACE IF NOT EXISTS llm_app
WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'datacenter1' : 3
};
USE llm_app;
-- 存储用户基本信息
CREATE TABLE IF NOT EXISTS users (
user_id uuid,
username text,
display_name text,
created_at timestamp,
PRIMARY KEY (user_id)
);
-- 存储 WebAuthn 凭证
-- 真实项目中,public_key 和 credential_id 应该是 blob 类型,这里用 text 仅为演示
CREATE TABLE IF NOT EXISTS webauthn_credentials (
user_id uuid, -- 分区键:按用户ID组织数据
credential_id text, -- 集群键:同一用户可有多个凭证
public_key text,
sign_count bigint,
transports list<text>, -- 如 'internal', 'usb', 'nfc'
created_at timestamp,
PRIMARY KEY (user_id, credential_id)
) WITH CLUSTERING ORDER BY (credential_id ASC);
这里的关键决策是使用 user_id
作为分区键。这意味着一个用户的所有WebAuthn凭证都存储在Cassandra集群的同一个分区(由一组副本节点管理),使得“根据用户ID查询其所有凭证”这一核心操作极为高效。credential_id
作为集群键,允许我们在分区内对凭证进行排序和唯一标识。
接下来是对话历史。我们需要根据会话ID高效地检索一个会话的所有历史消息,并且消息要按时间顺序排列。
-- 存储对话会话历史
CREATE TABLE IF NOT EXISTS chat_sessions (
session_id uuid, -- 分区键:一个会话的所有消息在同一分区
message_timestamp timeuuid, -- 集群键:保证消息的严格时序性
role text, -- 'user' or 'assistant'
content text,
metadata map<text, text>, -- 存储额外信息,如token消耗、模型版本等
PRIMARY KEY (session_id, message_timestamp)
) WITH CLUSTERING ORDER BY (message_timestamp DESC);
这个模型同样遵循查询驱动原则。session_id
作为分区键,确保了单次查询就能获取整个对话的上下文,避免了跨分区的昂贵操作。message_timestamp
使用timeuuid
类型作为集群键,它既能保证全局唯一性,又能按时间自然排序。我们使用DESC
排序,这样获取最新消息的查询SELECT ... FROM chat_sessions WHERE session_id = ? LIMIT N
会非常快。
Serverless函数实现:Python代码的粘合
我们的服务将通过AWS API Gateway暴露几个关键的端点,由不同的Lambda函数处理。这里我们重点关注核心的注册、认证和对话逻辑。
环境与配置
在Lambda的部署包中,需要包含cassandra-driver
, webauthn
, openai
等Python库。数据库连接的配置是生产实践中的一个要点。硬编码凭证是绝对禁止的。
# common/cassandra_client.py
import os
import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
# 日志配置
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
# 从环境变量或Secrets Manager获取配置
CASSANDRA_HOSTS = os.environ.get("CASSANDRA_HOSTS", "127.0.0.1").split(',')
CASSANDRA_PORT = int(os.environ.get("CASSANDRA_PORT", 9042))
CASSANDRA_USER = os.environ.get("CASSANDRA_USERNAME")
CASSANDRA_PASSWORD = os.environ.get("CASSANDRA_PASSWORD")
CASSANDRA_KEYSPACE = os.environ.get("CASSANDRA_KEYSPACE", "llm_app")
LOCAL_DC = os.environ.get("CASSANDRA_LOCAL_DC", "datacenter1")
_session = None
def get_session():
"""
获取Cassandra会话的单例。
在Lambda的执行上下文中,这可以重用连接,避免每次调用都重新建立。
"""
global _session
if _session is None:
try:
auth_provider = PlainTextAuthProvider(
username=CASSANDRA_USER, password=CASSANDRA_PASSWORD
)
# TokenAwarePolicy 结合 DCAwareRoundRobinPolicy 是生产环境推荐的负载均衡策略
# 它会优先将请求路由到持有该分区键数据的本地数据中心节点。
load_balancing_policy = TokenAwarePolicy(
DCAwareRoundRobinPolicy(local_dc=LOCAL_DC)
)
cluster = Cluster(
contact_points=CASSANDRA_HOSTS,
port=CASSANDRA_PORT,
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy
)
_session = cluster.connect(CASSANDRA_KEYSPACE)
logger.info("Cassandra session established successfully.")
except Exception as e:
logger.error(f"Failed to connect to Cassandra: {e}", exc_info=True)
raise
return _session
WebAuthn凭证注册
注册流程分为两步:生成选项和验证凭证。
# handlers/auth_handler.py
import json
import uuid
from webauthn import generate_registration_options, verify_registration_response
from webauthn.helpers.structs import RegistrationCredential
from common.cassandra_client import get_session
# RP_ID 和 RP_NAME 应该是你的服务域名和名称
RP_ID = "your-domain.com"
RP_NAME = "My LLM App"
EXPECTED_ORIGIN = "https://your-domain.com"
def generate_registration(event, context):
"""
API Gateway Lambda Handler: 生成WebAuthn注册选项
"""
body = json.loads(event.get('body', '{}'))
user_id = body.get('user_id')
username = body.get('username')
if not user_id or not username:
return {'statusCode': 400, 'body': json.dumps({'error': 'user_id and username are required'})}
session = get_session()
# 真实项目中,需要检查用户名是否已存在
options = generate_registration_options(
rp_id=RP_ID,
rp_name=RP_NAME,
user_id=user_id,
user_name=username,
user_display_name=username # 可以和username一样
)
# 这里的挑战(challenge)需要被临时存储,以便在验证步骤中使用。
# 一个好的实践是存入Cassandra一个带TTL的表,或者使用Redis。
# 此处为简化,我们将其返回给客户端,并期望客户端在下一步骤中传回。
# 注意:在生产中,这样做会带来安全风险,服务器必须自己存储challenge。
return {
'statusCode': 200,
'body': json.dumps(options)
}
def verify_registration(event, context):
"""
API Gateway Lambda Handler: 验证WebAuthn注册并持久化凭证
"""
body = json.loads(event.get('body', '{}'))
user_id = body.get('user_id')
# 从客户端获取完整的凭证创建响应
credential_data = body.get('credential')
try:
session = get_session()
# 再次强调:challenge应从服务端安全存储中获取,而不是客户端
challenge = body.get("challenge")
registration_verification = verify_registration_response(
credential=RegistrationCredential.parse_raw(json.dumps(credential_data)),
expected_challenge=challenge.encode('utf-8'),
expected_origin=EXPECTED_ORIGIN,
expected_rp_id=RP_ID,
require_user_verification=False # 根据你的安全策略设定
)
# 凭证验证成功,存入Cassandra
prepared_insert = session.prepare(
"""
INSERT INTO webauthn_credentials (user_id, credential_id, public_key, sign_count, transports, created_at)
VALUES (?, ?, ?, ?, ?, toTimestamp(now()))
"""
)
session.execute(
prepared_insert,
(
uuid.UUID(user_id),
str(registration_verification.credential_id),
str(registration_verification.credential_public_key),
registration_verification.sign_count,
credential_data.get('response', {}).get('transports', [])
)
)
return {'statusCode': 200, 'body': json.dumps({'verified': True})}
except Exception as e:
logger.error(f"WebAuthn registration verification failed: {e}", exc_info=True)
return {'statusCode': 500, 'body': json.dumps({'error': 'Verification failed'})}
这里的坑在于challenge
的管理。虽然代码为了简化而将其传递给客户端,但正确的做法是服务器生成challenge
后,将其与user_id
关联并存储(例如在带TTL的Cassandra表中),在验证阶段再取出来比对。
LLM对话与状态管理
这是整个系统的核心。Lambda函数需要完成认证、加载历史、调用LLM、存储新对话这一个完整闭环。
# handlers/chat_handler.py
import json
import uuid
import openai
from cassandra.util import TimeUUID
from cassandra.query import SimpleStatement
from common.cassandra_client import get_session
from common.auth import validate_jwt # 假设已有一个JWT验证函数
# 配置OpenAI
openai.api_key = os.environ.get("OPENAI_API_KEY")
def chat_processor(event, context):
"""
API Gateway Lambda Handler: 处理对话请求
"""
try:
# 1. 认证:从请求头中获取JWT并验证,解析出user_id和session_id
# 生产环境中,这一步可能由API Gateway的Authorizer完成
auth_header = event.get('headers', {}).get('Authorization')
token_payload = validate_jwt(auth_header)
user_id = token_payload['sub']
session_id = token_payload['session_id'] # JWT中应包含session_id
body = json.loads(event.get('body', '{}'))
user_message = body.get('message')
if not user_message:
return {'statusCode': 400, 'body': json.dumps({'error': 'message is required'})}
session = get_session()
# 2. 加载历史对话
# 查询最近的10条消息作为上下文
query = SimpleStatement(
"SELECT role, content FROM chat_sessions WHERE session_id = %s LIMIT 10",
fetch_size=10
)
rows = session.execute(query, (uuid.UUID(session_id),))
# Cassandra返回的顺序是DESC,我们需要反转它以符合对话时序
history = [{"role": row.role, "content": row.content} for row in reversed(rows)]
# 3. 构造LLM输入
messages_for_llm = history + [{"role": "user", "content": user_message}]
# 4. 调用LLM API
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages_for_llm
)
assistant_message = completion.choices[0].message['content']
# 5. 持久化新一轮对话
# 使用Batch Statement可以保证用户消息和助手消息的原子性写入
# 这是一个常见的错误:分别执行两个INSERT,如果第二个失败,状态就不一致了
prepared_insert = session.prepare(
"""
INSERT INTO chat_sessions (session_id, message_timestamp, role, content)
VALUES (?, ?, ?, ?)
"""
)
batch = BatchStatement()
user_msg_time = TimeUUID.from_datetime(datetime.utcnow())
# 稍微延迟一点,确保助手消息的时间戳在后
assistant_msg_time = TimeUUID.from_datetime(datetime.utcnow() + timedelta(milliseconds=1))
batch.add(prepared_insert, (uuid.UUID(session_id), user_msg_time, 'user', user_message))
batch.add(prepared_insert, (uuid.UUID(session_id), assistant_msg_time, 'assistant', assistant_message))
session.execute(batch)
return {
'statusCode': 200,
'body': json.dumps({'reply': assistant_message})
}
except Exception as e:
logger.error(f"Chat processing failed: {e}", exc_info=True)
# 避免向客户端暴露内部错误细节
return {'statusCode': 500, 'body': json.dumps({'error': 'An internal error occurred'})}
架构整合与流程总览
整个系统的交互流程可以通过下面的图表来可视化:
sequenceDiagram participant User participant Browser participant APIGateway as API Gateway participant LambdaAuth as Auth Lambda participant LambdaChat as Chat Lambda participant Cassandra participant LLM User->>Browser: 输入用户名,点击注册 Browser->>APIGateway: POST /register-options (username) APIGateway->>LambdaAuth: 调用generate_registration LambdaAuth-->>APIGateway: 返回注册选项 (challenge) APIGateway-->>Browser: 返回选项 Browser->>User: 提示使用指纹/安全密钥 User->>Browser: 完成生物识别 Browser->>APIGateway: POST /verify-registration (credential) APIGateway->>LambdaAuth: 调用verify_registration LambdaAuth->>Cassandra: 验证成功,存储凭证 Cassandra-->>LambdaAuth: 存储成功 LambdaAuth-->>APIGateway: 返回成功 APIGateway-->>Browser: 注册完成 %% --- 对话流程 --- User->>Browser: 输入聊天内容 Browser->>APIGateway: POST /chat (message, JWT) APIGateway->>LambdaChat: 调用chat_processor LambdaChat->>Cassandra: 查询session_id的历史记录 Cassandra-->>LambdaChat: 返回历史消息 LambdaChat->>LLM: 发送包含历史的请求 LLM-->>LambdaChat: 返回LLM响应 LambdaChat->>Cassandra: BATCH INSERT 用户新消息和LLM响应 Cassandra-->>LambdaChat: 写入成功 LambdaChat-->>APIGateway: 返回LLM响应 APIGateway-->>Browser: 显示响应 Browser->>User: 看到AI的回答
局限性与未来迭代路径
这个架构虽然解决了核心的状态问题,但在真实的生产环境中,它并非银弹。
首先是成本。虽然Serverless按需付费,但一个高可用的Cassandra集群(无论是自建还是使用AstraDB/Keyspaces等托管服务)会带来一笔固定的基础成本。这与纯Serverless的理想模型有所出入,需要在项目初期进行评估。
其次是延迟。Lambda的冷启动问题,加上至少两次对Cassandra的I/O操作(读历史、写新消息),可能会导致用户感知的延迟增加。对于延迟敏感的应用,需要考虑预置并发(Provisioned Concurrency)等优化手段。
更重要的一点是对话上下文的长度限制。目前我们通过LIMIT 10
粗暴地截断了历史,但这显然不是最优解。一个可行的优化路径是引入一个异步的总结(Summarization)任务:
- 当一个会话的历史消息超过某个阈值(比如20条)时,
chat_processor
函数发一个事件到SQS队列。 - 另一个专用的Lambda函数消费这个队列。它加载该会话的旧消息(比如前10条)。
- 调用一个专门用于总结的LLM模型(可能比主对话模型更便宜),将这10条消息总结成一段话。
- 将这个总结作为一条特殊的
role: 'summary'
消息存回Cassandra,并删除被总结的旧消息。
这样,主对话流程在加载历史时,总能在一个可控的范围内获取上下文,既能保持对话的连贯性,又避免了性能和成本的无限增长。这种异步化的架构演进,正是Serverless和分布式系统设计的魅力所在。