使用 Python 与 Serverless 构建有状态 LLM 应用并集成 WebAuthn 无密码认证


一个看似简单的需求摆在面前:构建一个能与用户进行连续对话的LLM智能代理。用户通过无密码方式(WebAuthn)登录,系统需要记住上下文,并在分布式的、无状态的环境中可靠地运行。这里的核心矛盾点立刻浮现:Serverless架构的本质是无状态的,而一个有意义的LLM对话应用,其核心恰恰在于状态管理。

当一个请求通过API Gateway触发一个Python Lambda函数时,它处理完用户的输入并调用LLM API,然后这个Lambda实例的整个执行环境就被销毁了。用户的下一次提问,很可能会由一个全新的、对之前所有交互一无所知的Lambda实例来处理。如果不能有效解决会话状态的持久化,所谓的“连续对话”就无从谈起。

初步构想很简单:为每个会话引入一个外部状态存储。每次函数执行时,先从外部存储加载历史对话,处理完当前回合后,再将新的对话内容写回。问题在于,选择什么样的存储?

关系型数据库(如Postgres on RDS)?对于这种写密集型(每次对话都是一次写入)且数据模型相对简单的场景,关系型数据库的事务开销和连接管理显得过于笨重。在高并发下,连接池很快会成为Serverless环境中的瓶ăpadă颈。

内存缓存(如Redis)?速度极快,但作为主要的状态存储,数据持久性和成本是两个绕不开的问题。对于可能无限增长的对话历史,全部放在内存中不现实。

最终,我们将目光投向了Apache Cassandra。这个选择并非偶然,而是基于以下几个在生产环境中至关重要的考量:

  1. 为写而生: Cassandra的日志结构合并树(LSM-Tree)存储引擎使其具备极高的写入吞吐能力,这与对话历史记录的特性完美契合。
  2. 水平扩展: Cassandra的无主(Masterless)架构允许它像Serverless函数一样线性扩展。随着用户量和对话量的增长,只需简单地增加节点即可。
  3. 灵活的数据模型: 宽列存储模型非常适合存储非结构化的对话内容,同时也能够高效地存储结构化的WebAuthn凭证数据。
  4. 高可用性: 其内置的数据复制和多数据中心部署能力,为构建一个全球可用的应用提供了基础。

我们将使用Cassandra同时解决两个状态问题:WebAuthn用户的凭证状态和LLM的会话状态。

数据模型:Cassandra中的状态基石

在Cassandra中,数据模型的设计直接决定了系统的性能和可扩展性。一个常见的错误是带着关系型数据库的思路来设计表结构。在Cassandra中,我们必须围绕查询模式(Query-Driven Modeling)来设计。

首先是WebAuthn凭证的存储。我们需要根据用户ID快速查找其所有注册的凭证。

-- Keyspace 定义,设定复制策略
CREATE KEYSPACE IF NOT EXISTS llm_app
WITH REPLICATION = {
  'class' : 'NetworkTopologyStrategy',
  'datacenter1' : 3
};

USE llm_app;

-- 存储用户基本信息
CREATE TABLE IF NOT EXISTS users (
    user_id uuid,
    username text,
    display_name text,
    created_at timestamp,
    PRIMARY KEY (user_id)
);

-- 存储 WebAuthn 凭证
-- 真实项目中,public_key 和 credential_id 应该是 blob 类型,这里用 text 仅为演示
CREATE TABLE IF NOT EXISTS webauthn_credentials (
    user_id uuid,          -- 分区键:按用户ID组织数据
    credential_id text,    -- 集群键:同一用户可有多个凭证
    public_key text,
    sign_count bigint,
    transports list<text>, -- 如 'internal', 'usb', 'nfc'
    created_at timestamp,
    PRIMARY KEY (user_id, credential_id)
) WITH CLUSTERING ORDER BY (credential_id ASC);

这里的关键决策是使用 user_id作为分区键。这意味着一个用户的所有WebAuthn凭证都存储在Cassandra集群的同一个分区(由一组副本节点管理),使得“根据用户ID查询其所有凭证”这一核心操作极为高效。credential_id作为集群键,允许我们在分区内对凭证进行排序和唯一标识。

接下来是对话历史。我们需要根据会话ID高效地检索一个会话的所有历史消息,并且消息要按时间顺序排列。

-- 存储对话会话历史
CREATE TABLE IF NOT EXISTS chat_sessions (
    session_id uuid,           -- 分区键:一个会话的所有消息在同一分区
    message_timestamp timeuuid, -- 集群键:保证消息的严格时序性
    role text,                  -- 'user' or 'assistant'
    content text,
    metadata map<text, text>,   -- 存储额外信息,如token消耗、模型版本等
    PRIMARY KEY (session_id, message_timestamp)
) WITH CLUSTERING ORDER BY (message_timestamp DESC);

这个模型同样遵循查询驱动原则。session_id作为分区键,确保了单次查询就能获取整个对话的上下文,避免了跨分区的昂贵操作。message_timestamp使用timeuuid类型作为集群键,它既能保证全局唯一性,又能按时间自然排序。我们使用DESC排序,这样获取最新消息的查询SELECT ... FROM chat_sessions WHERE session_id = ? LIMIT N会非常快。

Serverless函数实现:Python代码的粘合

我们的服务将通过AWS API Gateway暴露几个关键的端点,由不同的Lambda函数处理。这里我们重点关注核心的注册、认证和对话逻辑。

环境与配置

在Lambda的部署包中,需要包含cassandra-driver, webauthn, openai等Python库。数据库连接的配置是生产实践中的一个要点。硬编码凭证是绝对禁止的。

# common/cassandra_client.py
import os
import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy

# 日志配置
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

# 从环境变量或Secrets Manager获取配置
CASSANDRA_HOSTS = os.environ.get("CASSANDRA_HOSTS", "127.0.0.1").split(',')
CASSANDRA_PORT = int(os.environ.get("CASSANDRA_PORT", 9042))
CASSANDRA_USER = os.environ.get("CASSANDRA_USERNAME")
CASSANDRA_PASSWORD = os.environ.get("CASSANDRA_PASSWORD")
CASSANDRA_KEYSPACE = os.environ.get("CASSANDRA_KEYSPACE", "llm_app")
LOCAL_DC = os.environ.get("CASSANDRA_LOCAL_DC", "datacenter1")

_session = None

def get_session():
    """
    获取Cassandra会话的单例。
    在Lambda的执行上下文中,这可以重用连接,避免每次调用都重新建立。
    """
    global _session
    if _session is None:
        try:
            auth_provider = PlainTextAuthProvider(
                username=CASSANDRA_USER, password=CASSANDRA_PASSWORD
            )
            # TokenAwarePolicy 结合 DCAwareRoundRobinPolicy 是生产环境推荐的负载均衡策略
            # 它会优先将请求路由到持有该分区键数据的本地数据中心节点。
            load_balancing_policy = TokenAwarePolicy(
                DCAwareRoundRobinPolicy(local_dc=LOCAL_DC)
            )
            
            cluster = Cluster(
                contact_points=CASSANDRA_HOSTS,
                port=CASSANDRA_PORT,
                auth_provider=auth_provider,
                load_balancing_policy=load_balancing_policy
            )
            _session = cluster.connect(CASSANDRA_KEYSPACE)
            logger.info("Cassandra session established successfully.")
        except Exception as e:
            logger.error(f"Failed to connect to Cassandra: {e}", exc_info=True)
            raise
    return _session

WebAuthn凭证注册

注册流程分为两步:生成选项和验证凭证。

# handlers/auth_handler.py
import json
import uuid
from webauthn import generate_registration_options, verify_registration_response
from webauthn.helpers.structs import RegistrationCredential

from common.cassandra_client import get_session

# RP_ID 和 RP_NAME 应该是你的服务域名和名称
RP_ID = "your-domain.com"
RP_NAME = "My LLM App"
EXPECTED_ORIGIN = "https://your-domain.com"

def generate_registration(event, context):
    """
    API Gateway Lambda Handler: 生成WebAuthn注册选项
    """
    body = json.loads(event.get('body', '{}'))
    user_id = body.get('user_id')
    username = body.get('username')

    if not user_id or not username:
        return {'statusCode': 400, 'body': json.dumps({'error': 'user_id and username are required'})}

    session = get_session()
    # 真实项目中,需要检查用户名是否已存在
    
    options = generate_registration_options(
        rp_id=RP_ID,
        rp_name=RP_NAME,
        user_id=user_id,
        user_name=username,
        user_display_name=username # 可以和username一样
    )
    
    # 这里的挑战(challenge)需要被临时存储,以便在验证步骤中使用。
    # 一个好的实践是存入Cassandra一个带TTL的表,或者使用Redis。
    # 此处为简化,我们将其返回给客户端,并期望客户端在下一步骤中传回。
    # 注意:在生产中,这样做会带来安全风险,服务器必须自己存储challenge。

    return {
        'statusCode': 200,
        'body': json.dumps(options)
    }

def verify_registration(event, context):
    """
    API Gateway Lambda Handler: 验证WebAuthn注册并持久化凭证
    """
    body = json.loads(event.get('body', '{}'))
    user_id = body.get('user_id')
    # 从客户端获取完整的凭证创建响应
    credential_data = body.get('credential') 
    
    try:
        session = get_session()
        # 再次强调:challenge应从服务端安全存储中获取,而不是客户端
        challenge = body.get("challenge") 

        registration_verification = verify_registration_response(
            credential=RegistrationCredential.parse_raw(json.dumps(credential_data)),
            expected_challenge=challenge.encode('utf-8'),
            expected_origin=EXPECTED_ORIGIN,
            expected_rp_id=RP_ID,
            require_user_verification=False # 根据你的安全策略设定
        )
        
        # 凭证验证成功,存入Cassandra
        prepared_insert = session.prepare(
            """
            INSERT INTO webauthn_credentials (user_id, credential_id, public_key, sign_count, transports, created_at)
            VALUES (?, ?, ?, ?, ?, toTimestamp(now()))
            """
        )
        session.execute(
            prepared_insert,
            (
                uuid.UUID(user_id),
                str(registration_verification.credential_id),
                str(registration_verification.credential_public_key),
                registration_verification.sign_count,
                credential_data.get('response', {}).get('transports', [])
            )
        )
        
        return {'statusCode': 200, 'body': json.dumps({'verified': True})}
    except Exception as e:
        logger.error(f"WebAuthn registration verification failed: {e}", exc_info=True)
        return {'statusCode': 500, 'body': json.dumps({'error': 'Verification failed'})}

这里的坑在于challenge的管理。虽然代码为了简化而将其传递给客户端,但正确的做法是服务器生成challenge后,将其与user_id关联并存储(例如在带TTL的Cassandra表中),在验证阶段再取出来比对。

LLM对话与状态管理

这是整个系统的核心。Lambda函数需要完成认证、加载历史、调用LLM、存储新对话这一个完整闭环。

# handlers/chat_handler.py
import json
import uuid
import openai
from cassandra.util import TimeUUID
from cassandra.query import SimpleStatement

from common.cassandra_client import get_session
from common.auth import validate_jwt # 假设已有一个JWT验证函数

# 配置OpenAI
openai.api_key = os.environ.get("OPENAI_API_KEY")

def chat_processor(event, context):
    """
    API Gateway Lambda Handler: 处理对话请求
    """
    try:
        # 1. 认证:从请求头中获取JWT并验证,解析出user_id和session_id
        # 生产环境中,这一步可能由API Gateway的Authorizer完成
        auth_header = event.get('headers', {}).get('Authorization')
        token_payload = validate_jwt(auth_header)
        user_id = token_payload['sub']
        session_id = token_payload['session_id'] # JWT中应包含session_id
        
        body = json.loads(event.get('body', '{}'))
        user_message = body.get('message')

        if not user_message:
            return {'statusCode': 400, 'body': json.dumps({'error': 'message is required'})}
            
        session = get_session()
        
        # 2. 加载历史对话
        # 查询最近的10条消息作为上下文
        query = SimpleStatement(
            "SELECT role, content FROM chat_sessions WHERE session_id = %s LIMIT 10",
            fetch_size=10
        )
        rows = session.execute(query, (uuid.UUID(session_id),))
        
        # Cassandra返回的顺序是DESC,我们需要反转它以符合对话时序
        history = [{"role": row.role, "content": row.content} for row in reversed(rows)]
        
        # 3. 构造LLM输入
        messages_for_llm = history + [{"role": "user", "content": user_message}]

        # 4. 调用LLM API
        completion = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=messages_for_llm
        )
        assistant_message = completion.choices[0].message['content']

        # 5. 持久化新一轮对话
        # 使用Batch Statement可以保证用户消息和助手消息的原子性写入
        # 这是一个常见的错误:分别执行两个INSERT,如果第二个失败,状态就不一致了
        prepared_insert = session.prepare(
            """
            INSERT INTO chat_sessions (session_id, message_timestamp, role, content)
            VALUES (?, ?, ?, ?)
            """
        )
        batch = BatchStatement()
        user_msg_time = TimeUUID.from_datetime(datetime.utcnow())
        # 稍微延迟一点,确保助手消息的时间戳在后
        assistant_msg_time = TimeUUID.from_datetime(datetime.utcnow() + timedelta(milliseconds=1))

        batch.add(prepared_insert, (uuid.UUID(session_id), user_msg_time, 'user', user_message))
        batch.add(prepared_insert, (uuid.UUID(session_id), assistant_msg_time, 'assistant', assistant_message))
        session.execute(batch)
        
        return {
            'statusCode': 200,
            'body': json.dumps({'reply': assistant_message})
        }

    except Exception as e:
        logger.error(f"Chat processing failed: {e}", exc_info=True)
        # 避免向客户端暴露内部错误细节
        return {'statusCode': 500, 'body': json.dumps({'error': 'An internal error occurred'})}

架构整合与流程总览

整个系统的交互流程可以通过下面的图表来可视化:

sequenceDiagram
    participant User
    participant Browser
    participant APIGateway as API Gateway
    participant LambdaAuth as Auth Lambda
    participant LambdaChat as Chat Lambda
    participant Cassandra
    participant LLM

    User->>Browser: 输入用户名,点击注册
    Browser->>APIGateway: POST /register-options (username)
    APIGateway->>LambdaAuth: 调用generate_registration
    LambdaAuth-->>APIGateway: 返回注册选项 (challenge)
    APIGateway-->>Browser: 返回选项
    Browser->>User: 提示使用指纹/安全密钥
    User->>Browser: 完成生物识别
    Browser->>APIGateway: POST /verify-registration (credential)
    APIGateway->>LambdaAuth: 调用verify_registration
    LambdaAuth->>Cassandra: 验证成功,存储凭证
    Cassandra-->>LambdaAuth: 存储成功
    LambdaAuth-->>APIGateway: 返回成功
    APIGateway-->>Browser: 注册完成

    %% --- 对话流程 ---

    User->>Browser: 输入聊天内容
    Browser->>APIGateway: POST /chat (message, JWT)
    APIGateway->>LambdaChat: 调用chat_processor
    LambdaChat->>Cassandra: 查询session_id的历史记录
    Cassandra-->>LambdaChat: 返回历史消息
    LambdaChat->>LLM: 发送包含历史的请求
    LLM-->>LambdaChat: 返回LLM响应
    LambdaChat->>Cassandra: BATCH INSERT 用户新消息和LLM响应
    Cassandra-->>LambdaChat: 写入成功
    LambdaChat-->>APIGateway: 返回LLM响应
    APIGateway-->>Browser: 显示响应
    Browser->>User: 看到AI的回答

局限性与未来迭代路径

这个架构虽然解决了核心的状态问题,但在真实的生产环境中,它并非银弹。

首先是成本。虽然Serverless按需付费,但一个高可用的Cassandra集群(无论是自建还是使用AstraDB/Keyspaces等托管服务)会带来一笔固定的基础成本。这与纯Serverless的理想模型有所出入,需要在项目初期进行评估。

其次是延迟。Lambda的冷启动问题,加上至少两次对Cassandra的I/O操作(读历史、写新消息),可能会导致用户感知的延迟增加。对于延迟敏感的应用,需要考虑预置并发(Provisioned Concurrency)等优化手段。

更重要的一点是对话上下文的长度限制。目前我们通过LIMIT 10粗暴地截断了历史,但这显然不是最优解。一个可行的优化路径是引入一个异步的总结(Summarization)任务:

  1. 当一个会话的历史消息超过某个阈值(比如20条)时,chat_processor函数发一个事件到SQS队列。
  2. 另一个专用的Lambda函数消费这个队列。它加载该会话的旧消息(比如前10条)。
  3. 调用一个专门用于总结的LLM模型(可能比主对话模型更便宜),将这10条消息总结成一段话。
  4. 将这个总结作为一条特殊的role: 'summary'消息存回Cassandra,并删除被总结的旧消息。

这样,主对话流程在加载历史时,总能在一个可控的范围内获取上下文,既能保持对话的连贯性,又避免了性能和成本的无限增长。这种异步化的架构演进,正是Serverless和分布式系统设计的魅力所在。


  目录