最初的 RAG (检索增强生成) 系统原型是一个阻塞式的请求-响应模型。用户提交一个问题,前端显示一个加载动画,后端去查询向量数据库,构建提示词,然后等待大语言模型(LLM)返回完整的答案。整个过程耗时5到15秒不等,最终呈现给用户的是一大段静态文本。这种用户体验在真实项目中是完全不可接受的。
核心痛点很明确:用户需要即时反馈。解决方案是流式响应(Streaming)。但简单的文本流又带来了新的问题:我们如何在一个连续的文本流中,精确地嵌入结构化数据,比如实时插入的引用来源、代码块、甚至是动态图表?让前端去解析一个混杂着特殊标记的文本流,既脆弱又不优雅。
我们的目标是构建一个后端驱动的流式UI系统。后端不仅流式传输文本,还主动推送结构化的“渲染指令”,前端只负责忠实地执行这些指令。这个方案的技术栈选型经过了反复权衡:
- 后端业务编排: Laravel。作为团队的主力框架,其成熟的生态、队列系统和强大的HTTP层(特别是
StreamedResponse
)是实现复杂业务流的坚实基础。 - 向量检索: Weaviate。它的混合搜索能力(同时进行向量相似度搜索和元数据过滤)对于需要精确引用来源的RAG场景至关重要。
- 前端动态渲染: React + Styled-components。我们需要动态渲染不同类型的UI片段。Styled-components 将组件逻辑和样式封装在一起,使得根据后端指令动态渲染一个
SourceCitation
组件或一个CodeBlock
组件变得异常清晰和可维护。 - 通信协议: Server-Sent Events (SSE)。相对于WebSocket,SSE是一个更轻量级的单向通信协议,完全符合我们从服务器向客户端推送数据的场景,且自带断线重连机制,更加稳健。
整个架构的核心思想是:由Laravel作为总指挥,编排对Weaviate和LLM的调用,并将LLM的输出流实时解析、封装成结构化的SSE事件,推送给前端。
sequenceDiagram participant Client as React App participant Laravel as Laravel (SSE Endpoint) participant QueueWorker as Laravel Queue Worker participant Weaviate participant LLMService as OpenAI/LLM API Note over Client, Laravel: 1. 数据写入与向量化 (异步) Laravel->>QueueWorker: Dispatch(ProcessEmbeddingJob) QueueWorker->>LLMService: Generate embedding for document chunk LLMService-->>QueueWorker: Vector QueueWorker->>Weaviate: Store vector + metadata (source, doc_id) Note over Client, Laravel: 2. 用户查询与流式响应 (同步) Client->>Laravel: GET /api/query-stream?q=... Laravel->>LLMService: Generate embedding for query LLMService-->>Laravel: Query Vector Laravel->>Weaviate: Hybrid search with vector + filters Weaviate-->>Laravel: Top-k relevant document chunks Laravel->>LLMService: POST /chat/completions (stream=true, prompt with context) LLMService-->>Laravel: Stream of response tokens... loop For each token/chunk in LLM stream Laravel-->>Client: event: text_chunk\ndata: {"content": "..."}\n\n alt LLM stream contains special token e.g., [SOURCE:doc_id] Laravel-->>Weaviate: Fetch metadata for doc_id Weaviate-->>Laravel: Document source metadata Laravel-->>Client: event: source_citation\ndata: {"id": "...", "url": "..."}\n\n end end Laravel-->>Client: event: stream_end\ndata: {"status": "done"}\n\n
后端实现:Laravel 作为流式编排中心
首先,我们需要一个服务来封装与Weaviate的交互。在真实项目中,配置应该通过服务提供者注入,而不是硬编码。
1. Weaviate 服务配置
app/Providers/WeaviateServiceProvider.php
:
<?php
namespace App\Providers;
use Illuminate\Support\ServiceProvider;
use Weaviate\Weaviate;
use Illuminate\Contracts\Foundation\Application;
class WeaviateServiceProvider extends ServiceProvider
{
public function register(): void
{
$this->app->singleton(Weaviate::class, function (Application $app) {
$config = $app->config->get('services.weaviate');
if (empty($config['host']) || empty($config['scheme'])) {
throw new \InvalidArgumentException('Weaviate configuration is missing.');
}
return new Weaviate($config['host'], $config['scheme']);
});
}
}
config/services.php
:
// ...
'weaviate' => [
'host' => env('WEAVIATE_HOST', 'localhost:8080'),
'scheme' => env('WEAVIATE_SCHEME', 'http'),
],
// ...
2. 数据索引:异步向量化
直接在HTTP请求中进行文档分块和向量化是灾难性的。这必须是异步过程。我们使用Laravel的队列系统。
app/Jobs/ProcessDocumentEmbedding.php
:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Weaviate\Weaviate;
use App\Services\EmbeddingService; // 假设这是一个调用LLM Embedding API的服务
class ProcessDocumentEmbedding implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
// 允许3次重试,每次间隔5分钟
public $tries = 3;
public $backoff = [300, 600];
public function __construct(
protected string $documentId,
protected string $contentChunk,
protected array $metadata
) {}
public function handle(Weaviate $weaviate, EmbeddingService $embeddingService): void
{
try {
$vector = $embeddingService->createEmbedding($this->contentChunk);
// Weaviate Schema中的类名
$className = 'TechnicalDocument';
$weaviate->data()->create([
'class' => $className,
'properties' => [
'doc_id' => $this->documentId,
'content' => $this->contentChunk,
'source' => $this->metadata['source'] ?? 'unknown',
'chunk_index' => $this->metadata['chunk_index'] ?? 0,
],
'vector' => $vector,
]);
Log::info("Successfully embedded and stored chunk for document: {$this->documentId}");
} catch (\Exception $e) {
Log::error("Failed to process embedding for doc {$this->documentId}: " . $e->getMessage());
// 触发重试
$this->release(300);
}
}
public function failed(\Throwable $exception): void
{
Log::critical("Embedding job finally failed for doc {$this->documentId}: " . $exception->getMessage());
// 在这里可以添加告警逻辑,比如发送通知到Slack
}
}
这个Job包含了基本的错误处理、日志和重试策略,这是生产级代码的最低要求。
3. 核心:流式响应控制器
这是整个系统的核心。我们使用Laravel的StreamedResponse
来创建一个持久的HTTP连接,并通过它发送SSE格式的数据。
app/Http/Controllers/RAGStreamController.php
:
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
use App\Services\EmbeddingService;
use App\Services\LLMService; // 封装了对LLM流式API的调用
use Weaviate\Weaviate;
use Illuminate\Support\Facades\Log;
class RAGStreamController extends Controller
{
public function __construct(
protected Weaviate $weaviate,
protected EmbeddingService $embeddingService,
protected LLMService $llmService
) {}
public function streamQuery(Request $request)
{
$query = $request->validate(['q' => 'required|string|max:500'])['q'];
// 立即发送头部,让客户端知道连接已建立
ob_flush();
flush();
$response = new StreamedResponse(function () use ($query) {
try {
// 1. 获取查询向量
$queryVector = $this->embeddingService->createEmbedding($query);
// 2. Weaviate 混合检索
$retrievedChunks = $this->weaviate->graphql()->get()
->withClassName('TechnicalDocument')
->withFields('content doc_id source')
->withNearVector($queryVector)
->withLimit(5)
->run();
$context = "";
$sourceMapping = [];
foreach ($retrievedChunks['data']['Get']['TechnicalDocument'] ?? [] as $chunk) {
$context .= "Source (ID: {$chunk['doc_id']}):\n{$chunk['content']}\n\n";
if (!isset($sourceMapping[$chunk['doc_id']])) {
$sourceMapping[$chunk['doc_id']] = $chunk['source'];
}
}
// 3. 构建 Prompt
$prompt = "Based on the following sources, answer the user's question. When you use information from a source, you MUST cite it by including its ID in brackets, like [SOURCE:doc_id].\n\n---\nContext:\n{$context}---\n\nUser Question: {$query}\n\nAnswer:";
// 4. 调用LLM流式API并处理响应
$this->llmService->streamChat($prompt, function ($chunk) use (&$sourceMapping) {
// 这里是核心的解析逻辑
// 假设 $chunk 是从LLM流返回的文本片段
// 正则表达式匹配 [SOURCE:doc_id] 格式的引用
preg_match_all('/\[SOURCE:([\w-]+)\]/', $chunk, $matches, PREG_SET_ORDER);
$textParts = preg_split('/\[SOURCE:[\w-]+\]/', $chunk);
foreach ($textParts as $index => $textPart) {
if (!empty($textPart)) {
$this->sendSseEvent('text_chunk', ['content' => $textPart]);
}
if (isset($matches[$index])) {
$docId = $matches[$index][1];
$this->sendSseEvent('source_citation', [
'id' => $docId,
'source' => $sourceMapping[$docId] ?? 'Source not found'
]);
}
}
});
$this->sendSseEvent('stream_end', ['status' => 'done']);
} catch (\Exception $e) {
Log::error("RAG Stream Error: " . $e->getMessage());
$this->sendSseEvent('error', ['message' => 'An internal error occurred.']);
}
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('X-Accel-Buffering', 'no');
$response->headers->set('Cache-Control', 'no-cache');
return $response;
}
private function sendSseEvent(string $eventName, array $data): void
{
echo "event: " . $eventName . "\n";
echo "data: " . json_encode($data) . "\n\n";
// 强制将缓冲区内容发送到客户端
ob_flush();
flush();
}
}
这里的关键在于 streamQuery
方法。它不是一次性返回数据,而是返回一个StreamedResponse
对象。在其闭包内部,我们执行了完整的RAG流程,并在接收到LLM数据流时,实时解析并封装成SSE事件发送出去。ob_flush()
和 flush()
对于确保数据被立即发送至关重要。
前端实现:React 与 Styled-components 动态渲染
前端的任务是监听这些结构化的事件,并根据事件类型渲染对应的组件。
1. 建立SSE连接与事件监听
我们创建一个React Hook来封装EventSource
的逻辑。
hooks/useRAGStream.js
:
import { useState, useEffect, useRef } from 'react';
export const useRAGStream = (query) => {
const [streamData, setStreamData] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const eventSourceRef = useRef(null);
useEffect(() => {
if (!query) {
return;
}
setIsStreaming(true);
setStreamData([]);
const es = new EventSource(`/api/query-stream?q=${encodeURIComponent(query)}`);
eventSourceRef.current = es;
es.onopen = () => {
console.log("SSE connection opened.");
};
es.addEventListener('text_chunk', (event) => {
const data = JSON.parse(event.data);
setStreamData(prev => {
// 合并连续的文本块以优化渲染性能
const lastItem = prev[prev.length - 1];
if (lastItem && lastItem.type === 'text') {
const newLastItem = { ...lastItem, content: lastItem.content + data.content };
return [...prev.slice(0, -1), newLastItem];
}
return [...prev, { type: 'text', content: data.content }];
});
});
es.addEventListener('source_citation', (event) => {
const data = JSON.parse(event.data);
setStreamData(prev => [...prev, { type: 'source', data }]);
});
es.addEventListener('stream_end', (event) => {
console.log('Stream ended');
es.close();
setIsStreaming(false);
});
es.onerror = (error) => {
console.error("EventSource failed:", error);
es.close();
setIsStreaming(false);
// 在生产环境中,这里应该设置一个错误状态并显示给用户
setStreamData(prev => [...prev, {type: 'error', message: 'Connection lost.'}]);
};
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
};
}, [query]);
return { streamData, isStreaming };
};
2. 渲染组件与Styled-components
components/RAGResponseViewer.js
:
import React from 'react';
import styled, { keyframes } from 'styled-components';
import { useRAGStream } from '../hooks/useRAGStream';
// --- Styled Components Definition ---
const ResponseContainer = styled.div`
background-color: #f9f9f9;
border: 1px solid #eee;
padding: 20px;
border-radius: 8px;
line-height: 1.7;
font-family: 'Georgia', serif;
color: #333;
`;
const TextSpan = styled.span`
white-space: pre-wrap; /* 保持文本中的换行和空格 */
`;
const blink = keyframes`
50% { opacity: 0; }
`;
const Cursor = styled.span`
display: inline-block;
width: 8px;
height: 1.2em;
background-color: #333;
animation: ${blink} 1s step-end infinite;
margin-left: 2px;
vertical-align: text-bottom;
`;
const CitationChip = styled.a`
display: inline-block;
background-color: #e0eafc;
color: #3f51b5;
padding: 2px 8px;
border-radius: 12px;
font-size: 0.8em;
font-weight: bold;
margin: 0 4px;
text-decoration: none;
transition: background-color 0.2s ease;
cursor: pointer;
&:hover {
background-color: #c5d4f8;
}
`;
const ErrorMessage = styled.div`
color: #d9534f;
font-weight: bold;
`;
// --- Main Component ---
export const RAGResponseViewer = ({ query }) => {
const { streamData, isStreaming } = useRAGStream(query);
const renderPart = (part, index) => {
switch (part.type) {
case 'text':
return <TextSpan key={index}>{part.content}</TextSpan>;
case 'source':
return (
<CitationChip key={index} href={part.data.source} target="_blank" rel="noopener noreferrer" title={part.data.source}>
{part.data.id}
</A>
);
case 'error':
return <ErrorMessage key={index}>{part.message}</ErrorMessage>;
default:
return null;
}
};
if (!query && streamData.length === 0) {
return <ResponseContainer>Please enter a query to start.</ResponseContainer>;
}
return (
<ResponseContainer>
{streamData.map(renderPart)}
{isStreaming && <Cursor />}
</ResponseContainer>
);
};
这里的精髓在于RAGResponseViewer
组件。它完全是“无状态”的,只负责根据streamData
数组的内容进行渲染。streamData
的每一项都是一个带有type
的对象,这正是后端SSE事件所定义的结构。
-
renderPart
函数根据part.type
来决定渲染TextSpan
还是CitationChip
。 -
Styled-components
的价值在此体现得淋漓尽致。CitationChip
不仅仅是一个链接,它拥有自己的样式、交互(hover效果)和语义,被封装成一个独立的、可复用的单元。当后端推送一个source_citation
事件时,前端不需要做任何复杂的DOM操作或CSS类名管理,只需在数组中增加一个{type: 'source', ...}
对象,React和Styled-components就会自动、高效地完成新组件的渲染。
方案的局限性与未来展望
这个架构虽然优雅地解决了结构化数据流式渲染的问题,但并非没有缺点。
首先,后端对LLM输出流的实时解析逻辑是基于简单的字符串和正则表达式匹配。如果LLM的输出格式稍微不稳定(例如,多吐出了一个空格),就可能导致解析失败。一个更健壮的方案是训练或微调模型,使其能够输出更严格的格式,比如在需要引用时稳定地输出一个特定的JSON结构。
其次,SSE是单向的。如果未来业务需要双向通信(例如,在流式输出过程中用户可以打断或追问),那么升级到WebSocket将是不可避免的选择。但届时,当前基于事件的结构化数据模型依然可以复用。
最后,前端对连续text_chunk
事件的合并处理是一个简单的性能优化。在更复杂的场景下,例如流式渲染一个表格或列表,可能需要更精细的状态管理和批处理逻辑,以避免因过于频繁的React重渲染而导致的性能问题。可以使用requestAnimationFrame
来节流渲染更新,进一步提升用户体验的流畅度。