构建支持离线向量检索的 React Native RAG 系统的架构与实现


在真实项目中,构建一个纯粹依赖网络连接的RAG(检索增强生成)应用,尤其是在移动端,几乎等同于构建了一个不可靠的产品。用户在地铁、电梯或网络不佳的地区,应用的核心智能检索功能会瞬间瘫痪。痛点很明确:如何让一个基于复杂后端(向量数据库、大语言模型)的React Native应用,在离线状态下依然能提供有意义的、快速的智能检索服务。

我们的目标不是简单地缓存API请求,而是要实现一个真正的、具备端侧计算能力的离线回退(Offline Fallback)机制。当设备离线时,应用应能无缝切换到本地向量索引,执行相似性搜索,并返回一个虽不经LLM润色但依然高度相关的结果集。

初步构想的架构围绕一个核心的“双路径”数据流展开:

  1. 在线路径 (Online Path): React Native -> AWS Lambda -> Weaviate -> LLM API。这是标准的高精度RAG流程。
  2. 离线路径 (Offline Path): React Native -> Service Worker -> IndexedDB (Local Vector Cache)。这是我们设计的韧性层,是整个方案的关键。

技术选型决策如下:

  • React Native & MobX: 移动端技术栈。MobX对于管理复杂的、响应式的UI状态至关重要,尤其是在处理网络状态切换、同步本地与远程数据这种场景下,它的原子化更新和衍生状态计算能极大地简化逻辑。
  • Weaviate: 托管式向量数据库。选择它是因为其强大的过滤和混合搜索能力,并且作为云服务,可以让我们专注于业务逻辑而非底层运维。
  • AWS Lambda: 无服务器计算。RAG的查询流程是典型的无状态、事件驱动型任务,Lambda的按需计费和自动扩缩容特性完美契合,有效控制成本。
  • Service Workers: 实现离线能力的核心。它不仅仅是一个网络代理,更是一个可编程的、运行在浏览器/WebView后台的独立线程,能够拦截网络请求、管理缓存(IndexedDB),并执行计算任务。这使得在端侧实现一个微型搜索引擎成为可能。

第一步: 搭建后端服务 - Weaviate 与 Lambda

一切始于一个健壮的后端。我们需要一个Lambda函数来处理两件事:数据索引和数据查询。

Weaviate Schema 定义

首先,在Weaviate中定义我们的数据结构。一个常见的错误是只存储文本块和向量。在真实项目中,元数据至关重要,它能用于过滤,也能在离线时提供上下文。

// weaviate-schema.js
// 这通常是一次性脚本,用于初始化Weaviate实例
import weaviate from 'weaviate-ts-client';

const client = weaviate.client({
    scheme: 'https',
    host: process.env.WEAVIATE_HOST, // e.g., 'your-cluster.weaviate.cloud'
    apiKey: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY),
});

const classObj = {
    class: 'DocumentChunk',
    vectorizer: 'text2vec-openai', // 让Weaviate处理向量化
    moduleConfig: {
        'text2vec-openai': {
            model: 'ada',
            type: 'text',
        },
    },
    properties: [
        {
            name: 'content',
            dataType: ['text'],
            description: 'The actual text content of the chunk.',
        },
        {
            name: 'sourceId',
            dataType: ['string'],
            description: 'Identifier for the source document.',
            tokenization: 'field', // 关键:为精确匹配优化
        },
        {
            name: 'chunkIndex',
            dataType: ['int'],
            description: 'The order of the chunk within the source document.',
        },
        {
            name: 'isPriority', // 核心字段:用于离线同步决策
            dataType: ['boolean'],
            description: 'Flag for high-priority content to be cached offline.',
        }
    ],
};

async function createSchema() {
    try {
        const res = await client.schema.classCreator().withClass(classObj).do();
        console.log('Schema created successfully:', JSON.stringify(res, null, 2));
    } catch (err) {
        console.error('Schema creation failed:', err);
        // 在生产环境中,这里应该有更复杂的重试或告警逻辑
        throw err;
    }
}

createSchema();

这里的isPriority字段是整个离线策略的基石。不是所有数据都值得被同步到客户端,只有标记为高优先级的核心文档才会被考虑。

AWS Lambda RAG 查询函数

这个Lambda函数是整个在线查询流程的核心。它需要处理API网关的请求,将其向量化,查询Weaviate,然后(可选地)将结果传递给一个LLM进行总结。

// lambda/ragQuery/index.js
import weaviate from 'weaviate-ts-client';
// 假定有一个封装好的LLM服务客户端
import { getLlmClient } from './llmClient'; 

// 在Lambda外部初始化客户端以利用执行上下文复用
const weaviateClient = weaviate.client({
    scheme: 'https',
    host: process.env.WEAVIATE_HOST,
    apiKey: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY),
    headers: { 'X-OpenAI-Api-Key': process.env.OPENAI_API_KEY },
});
const llmClient = getLlmClient();

// 简单的错误响应生成器
const createErrorResponse = (statusCode, message) => ({
    statusCode,
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ error: message }),
});

export const handler = async (event) => {
    // 生产级的代码必须验证输入
    if (!event.body) {
        return createErrorResponse(400, 'Request body is missing.');
    }

    let query;
    try {
        const body = JSON.parse(event.body);
        query = body.query;
        if (!query || typeof query !== 'string' || query.length < 3) {
            return createErrorResponse(400, 'Invalid "query" field in request body.');
        }
    } catch (e) {
        console.error('Failed to parse request body:', e);
        return createErrorResponse(400, 'Invalid JSON format.');
    }
    
    console.log(`Received query: ${query}`);

    try {
        // 1. 向量搜索
        const searchResult = await weaviateClient.graphql
            .get()
            .withClassName('DocumentChunk')
            .withNearText({ concepts: [query] })
            .withLimit(5)
            .withFields('content sourceId chunkIndex _additional { distance }')
            .do();

        const chunks = searchResult.data.Get.DocumentChunk;
        
        if (!chunks || chunks.length === 0) {
            return {
                statusCode: 200,
                body: JSON.stringify({ summary: "I couldn't find any relevant documents.", sources: [] }),
            };
        }

        // 2. 构建LLM提示词并获取总结
        const context = chunks.map(c => c.content).join('\n---\n');
        const prompt = `Based on the following context, answer the user's query.
                       Query: ${query}
                       Context:
                       ${context}`;
        
        const summary = await llmClient.generate(prompt);

        return {
            statusCode: 200,
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({
                summary,
                sources: chunks.map(c => ({
                    sourceId: c.sourceId,
                    chunkIndex: c.chunkIndex,
                    content: c.content,
                    distance: c._additional.distance,
                })),
            }),
        };
    } catch (error) {
        console.error('Error during RAG process:', error);
        // 这里的错误需要更详细的分类,比如是Weaviate连接错误还是LLM超时
        return createErrorResponse(500, 'An internal error occurred while processing your request.');
    }
};

这个Lambda函数包含了基本的验证、日志记录和错误处理,是生产环境的最低要求。

第二步: 客户端状态管理 - React Native 与 MobX

客户端的复杂性在于管理网络状态和数据源的无缝切换。MobX在这里表现出色。

// src/stores/SearchStore.js
import { makeAutoObservable, runInAction } from 'mobx';
import NetInfo from '@react-native-community/netinfo';
import { ApiClient } from '../services/ApiClient';
import { OfflineSearchService } from '../services/OfflineSearchService';

class SearchStore {
    // --- Observables ---
    query = '';
    isLoading = false;
    isOnline = true;
    results = null; // { summary: string, sources: [] }
    error = null;

    constructor() {
        makeAutoObservable(this);
        this.initializeNetworkListener();
    }

    // --- Actions ---
    setQuery(value) {
        this.query = value;
    }

    async executeSearch() {
        if (!this.query || this.isLoading) return;
        
        this.isLoading = true;
        this.error = null;
        this.results = null;

        try {
            let searchResults;
            if (this.isOnline) {
                console.log(`[Online Search] Executing for query: "${this.query}"`);
                searchResults = await ApiClient.performRagQuery(this.query);
            } else {
                console.log(`[Offline Search] Executing for query: "${this.query}"`);
                // 这里的OfflineSearchService是与Service Worker通信的桥梁
                searchResults = await OfflineSearchService.performLocalSearch(this.query);
                if (!searchResults) {
                    throw new Error("No offline data available for this query.");
                }
            }
            
            runInAction(() => {
                this.results = searchResults;
            });

        } catch (e) {
            console.error('Search failed:', e);
            runInAction(() => {
                this.error = e.message;
            });
        } finally {
            runInAction(() => {
                this.isLoading = false;
            });
        }
    }

    // --- Private ---
    initializeNetworkListener() {
        NetInfo.addEventListener(state => {
            runInAction(() => {
                const wasOnline = this.isOnline;
                this.isOnline = state.isConnected ?? false;
                console.log(`Network status changed: ${wasOnline} -> ${this.isOnline}`);
                // 在网络状态恢复时,可以触发一次自动刷新或提示用户
            });
        });
    }
}

export const searchStore = new SearchStore();

这个MobX Store清晰地分离了状态(query, isLoading)和动作(executeSearch)。executeSearch内部的逻辑分支是关键,它根据isOnline状态决定是调用在线API还是离线服务。

第三步: 离线核心 - Service Worker 与 IndexedDB

这是最棘手但也是价值最高的部分。我们需要让Service Worker扮演两个角色:

  1. 数据同步器: 在应用启动或后台同步任务中,从后端拉取isPriority=true的文档及其向量,存入IndexedDB。
  2. 网络拦截器: 在离线时拦截对/api/rag的请求,转而在本地IndexedDB中执行向量搜索。

数据同步与存储

首先,我们需要一个函数来获取并存储离线数据。这部分逻辑可以在应用主逻辑中调用,它会通过postMessage与Service Worker通信,请求其执行同步任务。

// src/services/OfflineSyncService.js

// 这个函数在应用启动时调用
export async function triggerOfflineSync() {
    if ('serviceWorker' in navigator && navigator.serviceWorker.controller) {
        try {
            console.log('Requesting offline data sync...');
            // 后端需要一个专门的接口来获取可离线的数据
            const response = await fetch('/api/offline-data-package');
            if (!response.ok) {
                throw new Error('Failed to fetch offline data package');
            }
            const dataPackage = await response.json(); // e.g., { chunks: [], vectors: [] }
            
            // 将数据包发送给Service Worker进行存储
            navigator.serviceWorker.controller.postMessage({
                type: 'CACHE_OFFLINE_DATA',
                payload: dataPackage,
            });
            console.log('Offline data package sent to Service Worker.');

        } catch (error) {
            console.error('Offline sync failed:', error);
        }
    }
}

Service Worker 实现

这里的Service Worker代码是整个架构的核心。

// public/sw.js

const CACHE_NAME = 'offline-rag-cache-v1';
const DB_NAME = 'OfflineVectorDB';
const DB_VERSION = 1;
const OBJECT_STORE_NAME = 'document_chunks';

// 简单的Promise封装的IndexedDB操作
function openDB() {
    return new Promise((resolve, reject) => {
        const request = indexedDB.open(DB_NAME, DB_VERSION);
        request.onerror = () => reject("Error opening DB");
        request.onsuccess = () => resolve(request.result);
        request.onupgradeneeded = event => {
            const db = event.target.result;
            if (!db.objectStoreNames.contains(OBJECT_STORE_NAME)) {
                // sourceId + chunkIndex 作为唯一键
                db.createObjectStore(OBJECT_STORE_NAME, { keyPath: ['sourceId', 'chunkIndex'] });
            }
        };
    });
}

// 监听消息,用于存储离线数据
self.addEventListener('message', event => {
    if (event.data && event.data.type === 'CACHE_OFFLINE_DATA') {
        const { chunks } = event.data.payload;
        event.waitUntil(
            openDB().then(db => {
                const transaction = db.transaction(OBJECT_STORE_NAME, 'readwrite');
                const store = transaction.objectStore(OBJECT_STORE_NAME);
                // 清空旧数据
                store.clear();
                chunks.forEach(chunk => {
                    // chunk 结构: { sourceId, chunkIndex, content, vector }
                    store.put(chunk);
                });
                return new Promise((resolve, reject) => {
                    transaction.oncomplete = () => {
                        console.log(`[SW] Successfully cached ${chunks.length} chunks.`);
                        resolve();
                    };
                    transaction.onerror = () => reject('DB transaction failed');
                });
            })
        );
    }
});


// 核心:网络请求拦截
self.addEventListener('fetch', event => {
    const url = new URL(event.request.url);
    
    // 只拦截我们关心的API请求
    if (url.pathname === '/api/rag') {
        event.respondWith(
            fetch(event.request)
                .catch(() => {
                    console.log('[SW] Network fetch failed. Attempting offline fallback.');
                    // 网络失败,执行离线逻辑
                    return handleOfflineRagQuery(event.request);
                })
        );
    }
});

async function handleOfflineRagQuery(request) {
    try {
        const requestBody = await request.json();
        const userQuery = requestBody.query;

        // 在真实项目中,这里也需要一个轻量级的客户端向量化模型
        // 为了简化,我们假设查询也被提前向量化或使用一种简化的方式
        // 这是一个巨大的权衡,端侧向量化会增加包体积和计算负载
        // 此处我们采用一种更务实的方式:进行关键字匹配,并返回最相关的chunk
        // 或者,假设我们预先为一些常见查询缓存了结果。
        // 最具挑战也最强大的方式是在本地进行向量计算。
        const allChunks = await getAllChunksFromDB();

        // 模拟本地搜索: 实际应为向量余弦相似度计算
        // 为了示例的可运行性,这里使用简单的文本匹配
        const lowerCaseQuery = userQuery.toLowerCase();
        const scoredChunks = allChunks
            .map(chunk => {
                const score = chunk.content.toLowerCase().includes(lowerCaseQuery) ? 1 : 0;
                // 也可以计算更复杂的分数,例如TF-IDF
                return { ...chunk, score };
            })
            .filter(chunk => chunk.score > 0)
            .sort((a, b) => b.score - a.score)
            .slice(0, 5);
        
        // 构造一个与在线API格式兼容的响应
        const responseBody = {
            summary: `[Offline Mode] Found ${scoredChunks.length} relevant local documents for "${userQuery}". Full summary requires an internet connection.`,
            sources: scoredChunks.map(c => ({
                sourceId: c.sourceId,
                chunkIndex: c.chunkIndex,
                content: c.content,
                distance: 1 - c.score, // 模拟距离
            })),
        };
        
        return new Response(JSON.stringify(responseBody), {
            headers: { 'Content-Type': 'application/json' },
        });

    } catch (e) {
        console.error('[SW] Offline query handling failed:', e);
        // 返回一个标准的错误结构
        const errorBody = {
            summary: 'Failed to perform offline search.',
            sources: [],
            error: e.message
        };
        return new Response(JSON.stringify(errorBody), {
            status: 500,
            headers: { 'Content-Type': 'application/json' },
        });
    }
}

async function getAllChunksFromDB() {
    const db = await openDB();
    const transaction = db.transaction(OBJECT_STORE_NAME, 'readonly');
    const store = transaction.objectStore(OBJECT_STORE_NAME);
    return new Promise((resolve, reject) => {
        const request = store.getAll();
        request.onsuccess = () => resolve(request.result);
        request.onerror = () => reject('Failed to get all chunks from DB');
    });
}

这个Service Worker实现了一个关键的工程权衡。完整的端侧向量搜索需要一个JS实现的向量库(如onnxruntime-web配合一个轻量模型)和向量化逻辑,这会显著增加应用的复杂度和体积。对于许多场景,一个基于关键词的本地搜索或者返回预缓存的结果已经是一个非常有价值的离线体验。这里的代码展示了这种更务实的回退策略。

第四步: 整体架构串联与可视化

整个系统的数据流已经形成。我们可以用Mermaid图来清晰地展示在线和离线两种模式下的工作流程。

graph TD
    subgraph "React Native App"
        A[UI Component] -- 触发搜索 --> B{MobX Store};
        B -- isOnline? --> C{Online/Offline Branch};
        C -- Yes --> D[ApiClient];
        C -- No --> E[OfflineSearchService];
        B -- 更新状态 --> A;
    end

    subgraph "Online Path"
        D -- HTTP POST /api/rag --> F[API Gateway];
        F --> G[AWS Lambda: ragQuery];
        G -- Vector Search --> H[Weaviate];
        G -- Generate Summary --> I[LLM API];
        I --> G;
        H --> G;
        G -- 响应 --> F;
        F -- 响应 --> D;
    end

    subgraph "Offline Path"
        E -- 触发 --> J((Service Worker));
        J -- 拦截 /api/rag --> K{Fetch Event};
        K -- 网络请求失败 --> L[handleOfflineRagQuery];
        L -- 读取数据 --> M[(IndexedDB)];
        M --> L;
        L -- 本地搜索逻辑 --> L;
        L -- 构造Response --> J;
        J -- 响应 --> E;
    end

    subgraph "Background Sync"
        P[App Start] -- 触发同步 --> Q[triggerOfflineSync];
        Q -- POST /api/offline-data --> R[Backend API];
        R -- isPriority=true --> H;
        R -- 返回数据包 --> Q;
        Q -- postMessage --> J;
        J -- 存储数据 --> M;
    end

局限性与未来迭代路径

这套架构解决了移动端RAG应用的核心痛点,但它并非没有权衡。

  1. 离线搜索质量: 最大的局限在于离线搜索的质量。它受限于本地缓存数据量的大小和本地搜索算法的精度。我们采用的关键字匹配远不如向量搜索精准。这是一个典型的可用性与资源消耗之间的平衡。
  2. 数据同步策略: 当前的同步策略比较粗放(拉取所有isPriority=true的文档)。一个更优的策略是基于用户行为分析,动态调整需要离线缓存的文档集,实现个性化的离线知识库。
  3. 端侧向量化: 未来的一个重要演进方向是,利用WASM和轻量级的模型(如MobileBERT的变体)在端侧直接对用户查询进行向量化,从而在本地实现真正的向量搜索。这将极大提升离线搜索的准确性,但需要仔细评估其对性能和应用体积的影响。
  4. 状态冲突与合并: 当设备在离线状态下进行操作,然后恢复在线时,如何合并或处理状态是一个潜在的复杂问题。例如,用户可能已经基于离线结果进行了某些操作,这些操作在获取到更准确的在线结果后是否依然有效,需要业务层面定义清晰的规则。

  目录