构建基于 Laravel、Weaviate 和 Styled-components 的向量检索流式响应系统


最初的 RAG (检索增强生成) 系统原型是一个阻塞式的请求-响应模型。用户提交一个问题,前端显示一个加载动画,后端去查询向量数据库,构建提示词,然后等待大语言模型(LLM)返回完整的答案。整个过程耗时5到15秒不等,最终呈现给用户的是一大段静态文本。这种用户体验在真实项目中是完全不可接受的。

核心痛点很明确:用户需要即时反馈。解决方案是流式响应(Streaming)。但简单的文本流又带来了新的问题:我们如何在一个连续的文本流中,精确地嵌入结构化数据,比如实时插入的引用来源、代码块、甚至是动态图表?让前端去解析一个混杂着特殊标记的文本流,既脆弱又不优雅。

我们的目标是构建一个后端驱动的流式UI系统。后端不仅流式传输文本,还主动推送结构化的“渲染指令”,前端只负责忠实地执行这些指令。这个方案的技术栈选型经过了反复权衡:

  • 后端业务编排: Laravel。作为团队的主力框架,其成熟的生态、队列系统和强大的HTTP层(特别是StreamedResponse)是实现复杂业务流的坚实基础。
  • 向量检索: Weaviate。它的混合搜索能力(同时进行向量相似度搜索和元数据过滤)对于需要精确引用来源的RAG场景至关重要。
  • 前端动态渲染: React + Styled-components。我们需要动态渲染不同类型的UI片段。Styled-components 将组件逻辑和样式封装在一起,使得根据后端指令动态渲染一个SourceCitation组件或一个CodeBlock组件变得异常清晰和可维护。
  • 通信协议: Server-Sent Events (SSE)。相对于WebSocket,SSE是一个更轻量级的单向通信协议,完全符合我们从服务器向客户端推送数据的场景,且自带断线重连机制,更加稳健。

整个架构的核心思想是:由Laravel作为总指挥,编排对Weaviate和LLM的调用,并将LLM的输出流实时解析、封装成结构化的SSE事件,推送给前端。

sequenceDiagram
    participant Client as React App
    participant Laravel as Laravel (SSE Endpoint)
    participant QueueWorker as Laravel Queue Worker
    participant Weaviate
    participant LLMService as OpenAI/LLM API

    Note over Client, Laravel: 1. 数据写入与向量化 (异步)
    Laravel->>QueueWorker: Dispatch(ProcessEmbeddingJob)
    QueueWorker->>LLMService: Generate embedding for document chunk
    LLMService-->>QueueWorker: Vector
    QueueWorker->>Weaviate: Store vector + metadata (source, doc_id)

    Note over Client, Laravel: 2. 用户查询与流式响应 (同步)
    Client->>Laravel: GET /api/query-stream?q=...
    Laravel->>LLMService: Generate embedding for query
    LLMService-->>Laravel: Query Vector
    Laravel->>Weaviate: Hybrid search with vector + filters
    Weaviate-->>Laravel: Top-k relevant document chunks
    Laravel->>LLMService: POST /chat/completions (stream=true, prompt with context)
    LLMService-->>Laravel: Stream of response tokens...

    loop For each token/chunk in LLM stream
        Laravel-->>Client: event: text_chunk\ndata: {"content": "..."}\n\n
        alt LLM stream contains special token e.g., [SOURCE:doc_id]
            Laravel-->>Weaviate: Fetch metadata for doc_id
            Weaviate-->>Laravel: Document source metadata
            Laravel-->>Client: event: source_citation\ndata: {"id": "...", "url": "..."}\n\n
        end
    end
    Laravel-->>Client: event: stream_end\ndata: {"status": "done"}\n\n

后端实现:Laravel 作为流式编排中心

首先,我们需要一个服务来封装与Weaviate的交互。在真实项目中,配置应该通过服务提供者注入,而不是硬编码。

1. Weaviate 服务配置

app/Providers/WeaviateServiceProvider.php:

<?php

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Weaviate\Weaviate;
use Illuminate\Contracts\Foundation\Application;

class WeaviateServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->app->singleton(Weaviate::class, function (Application $app) {
            $config = $app->config->get('services.weaviate');

            if (empty($config['host']) || empty($config['scheme'])) {
                throw new \InvalidArgumentException('Weaviate configuration is missing.');
            }

            return new Weaviate($config['host'], $config['scheme']);
        });
    }
}

config/services.php:

// ...
'weaviate' => [
    'host' => env('WEAVIATE_HOST', 'localhost:8080'),
    'scheme' => env('WEAVIATE_SCHEME', 'http'),
],
// ...

2. 数据索引:异步向量化

直接在HTTP请求中进行文档分块和向量化是灾难性的。这必须是异步过程。我们使用Laravel的队列系统。

app/Jobs/ProcessDocumentEmbedding.php:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Weaviate\Weaviate;
use App\Services\EmbeddingService; // 假设这是一个调用LLM Embedding API的服务

class ProcessDocumentEmbedding implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // 允许3次重试,每次间隔5分钟
    public $tries = 3;
    public $backoff = [300, 600];

    public function __construct(
        protected string $documentId,
        protected string $contentChunk,
        protected array $metadata
    ) {}

    public function handle(Weaviate $weaviate, EmbeddingService $embeddingService): void
    {
        try {
            $vector = $embeddingService->createEmbedding($this->contentChunk);

            // Weaviate Schema中的类名
            $className = 'TechnicalDocument';

            $weaviate->data()->create([
                'class' => $className,
                'properties' => [
                    'doc_id' => $this->documentId,
                    'content' => $this->contentChunk,
                    'source' => $this->metadata['source'] ?? 'unknown',
                    'chunk_index' => $this->metadata['chunk_index'] ?? 0,
                ],
                'vector' => $vector,
            ]);

            Log::info("Successfully embedded and stored chunk for document: {$this->documentId}");

        } catch (\Exception $e) {
            Log::error("Failed to process embedding for doc {$this->documentId}: " . $e->getMessage());
            // 触发重试
            $this->release(300);
        }
    }

    public function failed(\Throwable $exception): void
    {
        Log::critical("Embedding job finally failed for doc {$this->documentId}: " . $exception->getMessage());
        // 在这里可以添加告警逻辑,比如发送通知到Slack
    }
}

这个Job包含了基本的错误处理、日志和重试策略,这是生产级代码的最低要求。

3. 核心:流式响应控制器

这是整个系统的核心。我们使用Laravel的StreamedResponse来创建一个持久的HTTP连接,并通过它发送SSE格式的数据。

app/Http/Controllers/RAGStreamController.php:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
use App\Services\EmbeddingService;
use App\Services\LLMService; // 封装了对LLM流式API的调用
use Weaviate\Weaviate;
use Illuminate\Support\Facades\Log;

class RAGStreamController extends Controller
{
    public function __construct(
        protected Weaviate $weaviate,
        protected EmbeddingService $embeddingService,
        protected LLMService $llmService
    ) {}

    public function streamQuery(Request $request)
    {
        $query = $request->validate(['q' => 'required|string|max:500'])['q'];

        // 立即发送头部,让客户端知道连接已建立
        ob_flush();
        flush();

        $response = new StreamedResponse(function () use ($query) {
            try {
                // 1. 获取查询向量
                $queryVector = $this->embeddingService->createEmbedding($query);

                // 2. Weaviate 混合检索
                $retrievedChunks = $this->weaviate->graphql()->get()
                    ->withClassName('TechnicalDocument')
                    ->withFields('content doc_id source')
                    ->withNearVector($queryVector)
                    ->withLimit(5)
                    ->run();

                $context = "";
                $sourceMapping = [];
                foreach ($retrievedChunks['data']['Get']['TechnicalDocument'] ?? [] as $chunk) {
                    $context .= "Source (ID: {$chunk['doc_id']}):\n{$chunk['content']}\n\n";
                    if (!isset($sourceMapping[$chunk['doc_id']])) {
                        $sourceMapping[$chunk['doc_id']] = $chunk['source'];
                    }
                }

                // 3. 构建 Prompt
                $prompt = "Based on the following sources, answer the user's question. When you use information from a source, you MUST cite it by including its ID in brackets, like [SOURCE:doc_id].\n\n---\nContext:\n{$context}---\n\nUser Question: {$query}\n\nAnswer:";
                
                // 4. 调用LLM流式API并处理响应
                $this->llmService->streamChat($prompt, function ($chunk) use (&$sourceMapping) {
                    // 这里是核心的解析逻辑
                    // 假设 $chunk 是从LLM流返回的文本片段
                    
                    // 正则表达式匹配 [SOURCE:doc_id] 格式的引用
                    preg_match_all('/\[SOURCE:([\w-]+)\]/', $chunk, $matches, PREG_SET_ORDER);
                    
                    $textParts = preg_split('/\[SOURCE:[\w-]+\]/', $chunk);

                    foreach ($textParts as $index => $textPart) {
                        if (!empty($textPart)) {
                            $this->sendSseEvent('text_chunk', ['content' => $textPart]);
                        }

                        if (isset($matches[$index])) {
                            $docId = $matches[$index][1];
                            $this->sendSseEvent('source_citation', [
                                'id' => $docId,
                                'source' => $sourceMapping[$docId] ?? 'Source not found'
                            ]);
                        }
                    }
                });

                $this->sendSseEvent('stream_end', ['status' => 'done']);

            } catch (\Exception $e) {
                Log::error("RAG Stream Error: " . $e->getMessage());
                $this->sendSseEvent('error', ['message' => 'An internal error occurred.']);
            }
        });

        $response->headers->set('Content-Type', 'text/event-stream');
        $response->headers->set('X-Accel-Buffering', 'no');
        $response->headers->set('Cache-Control', 'no-cache');
        return $response;
    }

    private function sendSseEvent(string $eventName, array $data): void
    {
        echo "event: " . $eventName . "\n";
        echo "data: " . json_encode($data) . "\n\n";
        // 强制将缓冲区内容发送到客户端
        ob_flush();
        flush();
    }
}

这里的关键在于 streamQuery 方法。它不是一次性返回数据,而是返回一个StreamedResponse对象。在其闭包内部,我们执行了完整的RAG流程,并在接收到LLM数据流时,实时解析并封装成SSE事件发送出去。ob_flush()flush() 对于确保数据被立即发送至关重要。

前端实现:React 与 Styled-components 动态渲染

前端的任务是监听这些结构化的事件,并根据事件类型渲染对应的组件。

1. 建立SSE连接与事件监听

我们创建一个React Hook来封装EventSource的逻辑。

hooks/useRAGStream.js:

import { useState, useEffect, useRef } from 'react';

export const useRAGStream = (query) => {
    const [streamData, setStreamData] = useState([]);
    const [isStreaming, setIsStreaming] = useState(false);
    const eventSourceRef = useRef(null);

    useEffect(() => {
        if (!query) {
            return;
        }

        setIsStreaming(true);
        setStreamData([]);

        const es = new EventSource(`/api/query-stream?q=${encodeURIComponent(query)}`);
        eventSourceRef.current = es;

        es.onopen = () => {
            console.log("SSE connection opened.");
        };

        es.addEventListener('text_chunk', (event) => {
            const data = JSON.parse(event.data);
            setStreamData(prev => {
                // 合并连续的文本块以优化渲染性能
                const lastItem = prev[prev.length - 1];
                if (lastItem && lastItem.type === 'text') {
                    const newLastItem = { ...lastItem, content: lastItem.content + data.content };
                    return [...prev.slice(0, -1), newLastItem];
                }
                return [...prev, { type: 'text', content: data.content }];
            });
        });

        es.addEventListener('source_citation', (event) => {
            const data = JSON.parse(event.data);
            setStreamData(prev => [...prev, { type: 'source', data }]);
        });
        
        es.addEventListener('stream_end', (event) => {
            console.log('Stream ended');
            es.close();
            setIsStreaming(false);
        });

        es.onerror = (error) => {
            console.error("EventSource failed:", error);
            es.close();
            setIsStreaming(false);
            // 在生产环境中,这里应该设置一个错误状态并显示给用户
            setStreamData(prev => [...prev, {type: 'error', message: 'Connection lost.'}]);
        };

        return () => {
            if (eventSourceRef.current) {
                eventSourceRef.current.close();
            }
        };
    }, [query]);

    return { streamData, isStreaming };
};

2. 渲染组件与Styled-components

components/RAGResponseViewer.js:

import React from 'react';
import styled, { keyframes } from 'styled-components';
import { useRAGStream } from '../hooks/useRAGStream';

// --- Styled Components Definition ---

const ResponseContainer = styled.div`
    background-color: #f9f9f9;
    border: 1px solid #eee;
    padding: 20px;
    border-radius: 8px;
    line-height: 1.7;
    font-family: 'Georgia', serif;
    color: #333;
`;

const TextSpan = styled.span`
    white-space: pre-wrap; /* 保持文本中的换行和空格 */
`;

const blink = keyframes`
    50% { opacity: 0; }
`;

const Cursor = styled.span`
    display: inline-block;
    width: 8px;
    height: 1.2em;
    background-color: #333;
    animation: ${blink} 1s step-end infinite;
    margin-left: 2px;
    vertical-align: text-bottom;
`;

const CitationChip = styled.a`
    display: inline-block;
    background-color: #e0eafc;
    color: #3f51b5;
    padding: 2px 8px;
    border-radius: 12px;
    font-size: 0.8em;
    font-weight: bold;
    margin: 0 4px;
    text-decoration: none;
    transition: background-color 0.2s ease;
    cursor: pointer;

    &:hover {
        background-color: #c5d4f8;
    }
`;

const ErrorMessage = styled.div`
    color: #d9534f;
    font-weight: bold;
`;


// --- Main Component ---

export const RAGResponseViewer = ({ query }) => {
    const { streamData, isStreaming } = useRAGStream(query);

    const renderPart = (part, index) => {
        switch (part.type) {
            case 'text':
                return <TextSpan key={index}>{part.content}</TextSpan>;
            case 'source':
                return (
                    <CitationChip key={index} href={part.data.source} target="_blank" rel="noopener noreferrer" title={part.data.source}>
                        {part.data.id}
                    </A>
                );
            case 'error':
                 return <ErrorMessage key={index}>{part.message}</ErrorMessage>;
            default:
                return null;
        }
    };

    if (!query && streamData.length === 0) {
        return <ResponseContainer>Please enter a query to start.</ResponseContainer>;
    }
    
    return (
        <ResponseContainer>
            {streamData.map(renderPart)}
            {isStreaming && <Cursor />}
        </ResponseContainer>
    );
};

这里的精髓在于RAGResponseViewer组件。它完全是“无状态”的,只负责根据streamData数组的内容进行渲染。streamData的每一项都是一个带有type的对象,这正是后端SSE事件所定义的结构。

  • renderPart函数根据part.type来决定渲染TextSpan还是CitationChip
  • Styled-components的价值在此体现得淋漓尽致。CitationChip不仅仅是一个链接,它拥有自己的样式、交互(hover效果)和语义,被封装成一个独立的、可复用的单元。当后端推送一个source_citation事件时,前端不需要做任何复杂的DOM操作或CSS类名管理,只需在数组中增加一个{type: 'source', ...}对象,React和Styled-components就会自动、高效地完成新组件的渲染。

方案的局限性与未来展望

这个架构虽然优雅地解决了结构化数据流式渲染的问题,但并非没有缺点。

首先,后端对LLM输出流的实时解析逻辑是基于简单的字符串和正则表达式匹配。如果LLM的输出格式稍微不稳定(例如,多吐出了一个空格),就可能导致解析失败。一个更健壮的方案是训练或微调模型,使其能够输出更严格的格式,比如在需要引用时稳定地输出一个特定的JSON结构。

其次,SSE是单向的。如果未来业务需要双向通信(例如,在流式输出过程中用户可以打断或追问),那么升级到WebSocket将是不可避免的选择。但届时,当前基于事件的结构化数据模型依然可以复用。

最后,前端对连续text_chunk事件的合并处理是一个简单的性能优化。在更复杂的场景下,例如流式渲染一个表格或列表,可能需要更精细的状态管理和批处理逻辑,以避免因过于频繁的React重渲染而导致的性能问题。可以使用requestAnimationFrame来节流渲染更新,进一步提升用户体验的流畅度。


  目录