传统的离线应用状态同步,通常依赖于在 Service Worker 中缓存 API 的 GET 请求结果。这种策略在处理只读数据时表现尚可,但在处理复杂的用户写操作序列时,其脆弱性便暴露无遗。设想一个场景:用户在离线状态下创建了一张卡片,接着移动了它,然后又编辑了它的内容。如果网络恢复时,我们简单地将这些最终状态的 API 请求(POST /cards
, PATCH /cards/:id/position
, PATCH /cards/:id/content
)依次发送,一旦中间某个请求失败,整个系统的状态就会陷入不一致的混乱之中。这种基于状态同步的模式,其根本问题在于丢失了过程信息,使得冲突解决和错误恢复变得异常困难。
要解决这个问题,我们需要转变思路:不再同步“状态”,而是同步“意图”或“行为”。这正是事件溯源 (Event Sourcing) 架构的核心思想。我们将不再保存对象的当前状态,而是记录下导致其状态变化的所有事件(Events)序列。任何时刻的系统状态,都是通过回放这些事件日志计算(“投影”)出来的。
将这个思想应用到前端,我们可以构建一个极度健壮的离线优先系统。其核心架构是:
- UI 层 (Svelte): 负责渲染状态和派发命令 (Commands)。它不直接修改任何状态。
- Service Worker: 扮演一个离线的命令处理器和事件总线。它拦截 UI 发来的命令,将其转化为不可变的事件,存入本地持久化存储。
- 本地事件存储 (IndexedDB): 作为客户端的“事实之源”(Source of Truth),它是一个只增不减的事件日志。
- 投影器 (Projector): 读取事件日志,计算出当前的看板状态,供 UI 层消费。
- 同步器 (Synchronizer): 在 Service Worker 中运行,负责在网络恢复时,将本地事件队列推送至后端服务器。
这个架构的流程可以用下面的图来概括:
sequenceDiagram participant UI (Svelte) participant ServiceWorker participant IndexedDB participant Server UI->>+ServiceWorker: Dispatch Command (e.g., CreateCard) ServiceWorker->>ServiceWorker: Validate Command ServiceWorker->>+IndexedDB: Generate & Persist Event (e.g., CardCreated) IndexedDB-->>-ServiceWorker: Persist Success ServiceWorker-->>-UI: Acknowledge Command Note right of UI: UI re-projects state from IndexedDB, view updates. alt Network Online ServiceWorker->>+Server: Sync Unsynced Events Server-->>-ServiceWorker: Sync Success ServiceWorker->>IndexedDB: Mark Events as Synced end
接下来,我们逐步实现这个架构。
一、定义命令与事件
在事件溯源系统中,严格区分命令(Commands)和事件(Events)至关重要。
- 命令 (Command): 表达用户的“意图”。它是一个请求,可能会被系统拒绝。例如,“请求创建一张卡片”。
- 事件 (Event): 描述已经发生且不可改变的“事实”。它不能被拒绝。例如,“一张卡片已被创建”。
我们将使用 TypeScript 来定义这些核心数据结构,这在真实项目中能提供极佳的类型安全保障。
// src/types/commands.ts
export interface CreateCardCommand {
type: 'CREATE_CARD';
payload: {
id: string; // Client-generated UUID
columnId: string;
content: string;
};
}
export interface MoveCardCommand {
type: 'MOVE_CARD';
payload: {
cardId: string;
targetColumnId: string;
targetIndex: number;
};
}
// ... other commands like EditCardCommand, DeleteCardCommand
export type AppCommand = CreateCardCommand | MoveCardCommand;
// src/types/events.ts
interface BaseEvent {
eventId: string; // Globally unique event identifier
commandId: string; // The command that caused this event
timestamp: number;
synced: boolean; // Flag for server synchronization
}
export interface CardCreatedEvent extends BaseEvent {
type: 'CARD_CREATED';
payload: {
id: string;
columnId: string;
content: string;
};
}
export interface CardMovedEvent extends BaseEvent {
type: 'CARD_MOVED';
payload: {
cardId: string;
sourceColumnId: string; // For potential rollbacks or complex projections
targetColumnId:string;
targetIndex: number;
};
}
// ... other events
export type AppEvent = CardCreatedEvent | CardMovedEvent;
一个关键细节是,命令中的 ID(如卡片ID)由客户端生成(例如使用 UUID),这样后续的命令可以在卡片被同步到服务器之前就引用它。每个事件都有一个 synced
标志,用于追踪其与后端的同步状态。
二、构建客户端事件存储 (Event Store)
IndexedDB 是浏览器中唯一适合存储大量结构化数据的持久化方案。我们将封装一个简单的 Event Store 类来处理事件的读写。
// src/lib/event-store.ts
import { openDB, type IDBPDatabase } from 'idb';
import type { AppEvent } from '../types/events';
const DB_NAME = 'KanbanEventStore';
const STORE_NAME = 'events';
const DB_VERSION = 1;
class EventStore {
private dbPromise: Promise<IDBPDatabase>;
constructor() {
this.dbPromise = openDB(DB_NAME, DB_VERSION, {
upgrade(db) {
if (!db.objectStoreNames.contains(STORE_NAME)) {
const store = db.createObjectStore(STORE_NAME, {
keyPath: 'eventId',
});
// Create an index on the 'synced' property to efficiently find unsynced events.
store.createIndex('by_synced', 'synced', { unique: false });
// Create an index on timestamp for chronological retrieval.
store.createIndex('by_timestamp', 'timestamp', { unique: false });
}
},
});
}
/**
* Appends a new event to the store.
* This is the only write operation allowed.
* @param event The event object to persist.
*/
async append(event: AppEvent): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction(STORE_NAME, 'readwrite');
await tx.store.add(event);
await tx.done;
// In a real app, we'd dispatch a global event here
// to notify active tabs about the data change.
// e.g., using BroadcastChannel.
}
/**
* Retrieves all events from the store, sorted by timestamp.
* @returns A promise that resolves to an array of all events.
*/
async getAllEvents(): Promise<AppEvent[]> {
const db = await this.dbPromise;
const events = await db.getAllFromIndex(STORE_NAME, 'by_timestamp');
return events;
}
/**
* Finds all events that have not been synced to the server.
* @returns A promise that resolves to an array of unsynced events.
*/
async getUnsyncedEvents(): Promise<AppEvent[]> {
const db = await this.dbPromise;
const events = await db.getAllFromIndex(STORE_NAME, 'by_synced', 0); // 0 is false
return events;
}
/**
* Marks a batch of events as synced.
* @param eventIds An array of event IDs to update.
*/
async markAsSynced(eventIds: string[]): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction(STORE_NAME, 'readwrite');
const store = tx.store;
// This is more performant than getting and putting each one individually.
await Promise.all(eventIds.map(async (id) => {
const event = await store.get(id) as AppEvent;
if (event) {
event.synced = true;
await store.put(event);
} else {
// This indicates a logical error, should be logged.
console.warn(`Attempted to mark non-existent event as synced: ${id}`);
}
}));
await tx.done;
}
}
// Export a singleton instance.
export const eventStore = new EventStore();
这里的实现包含了必要的索引 (by_synced
, by_timestamp
),这对于高效查询至关重要。markAsSynced
的批量处理也是生产环境中减少事务开销的常用手段。
三、状态投影与 Svelte UI
UI 的任务变得非常纯粹:从事件流中计算出当前状态并渲染它。这个计算过程就是“投影”。
// src/lib/projector.ts
import type { AppEvent } from '../types/events';
// Define the shape of our projected state.
export interface Card {
id: string;
content: string;
columnId: string;
}
export interface Column {
id: string;
title: string;
cardIds: string[];
}
export interface BoardState {
columns: Record<string, Column>;
cards: Record<string, Card>;
columnOrder: string[];
}
// The initial state before any events are applied.
const getInitialState = (): BoardState => ({
columns: {
'col-1': { id: 'col-1', title: 'To Do', cardIds: [] },
'col-2': { id: 'col-2', title: 'In Progress', cardIds: [] },
'col-3': { id: 'col-3', title: 'Done', cardIds: [] },
},
cards: {},
columnOrder: ['col-1', 'col-2', 'col-3'],
});
/**
* The projector function. It takes a stream of events and reduces them
* into a single, current state object.
* @param events The array of events to project.
* @returns The final projected board state.
*/
export function projectState(events: AppEvent[]): BoardState {
// Use structuredClone for a deep copy to avoid mutating the initial state object.
const state = structuredClone(getInitialState());
for (const event of events) {
switch (event.type) {
case 'CARD_CREATED': {
const { id, columnId, content } = event.payload;
state.cards[id] = { id, content, columnId };
if (state.columns[columnId]) {
state.columns[columnId].cardIds.push(id);
}
break;
}
case 'CARD_MOVED': {
const { cardId, targetColumnId, targetIndex, sourceColumnId } = event.payload;
const card = state.cards[cardId];
if (!card) continue; // Defensive check
// Remove from old column
const sourceCol = state.columns[sourceColumnId];
if (sourceCol) {
const cardIndex = sourceCol.cardIds.indexOf(cardId);
if (cardIndex > -1) {
sourceCol.cardIds.splice(cardIndex, 1);
}
}
// Add to new column
const targetCol = state.columns[targetColumnId];
if (targetCol) {
targetCol.cardIds.splice(targetIndex, 0, cardId);
card.columnId = targetColumnId; // Update card's own column reference
}
break;
}
// ... handle other event types
}
}
return state;
}
这个投影器是一个纯函数,给定相同的事件序列,它总是返回相同的状态。这是可测试性和可预测性的基石。
在 Svelte 中,我们可以创建一个自定义 store 来集成这个逻辑。
// src/stores/boardStore.ts
import { writable } from 'svelte/store';
import { eventStore } from '$lib/event-store';
import { projectState } from '$lib/projector';
import type { BoardState } from '$lib/projector';
function createBoardStore() {
const { subscribe, set } = writable<BoardState | null>(null, () => {
// This runs when the first subscriber subscribes.
loadInitialState();
// Listen for changes from other tabs or the Service Worker
const channel = new BroadcastChannel('kanban-events');
channel.onmessage = (msg) => {
if (msg.data.type === 'EVENT_APPENDED') {
console.log('New event detected, reloading state...');
loadInitialState();
}
};
return () => {
// This runs when the last subscriber unsubscribes.
channel.close();
};
});
async function loadInitialState() {
try {
const events = await eventStore.getAllEvents();
const projectedState = projectState(events);
set(projectedState);
} catch (error) {
console.error("Failed to load and project board state:", error);
// Handle state loading failure
}
}
return {
subscribe,
reload: loadInitialState,
};
}
export const boardStore = createBoardStore();
UI 组件现在只需订阅 boardStore
即可获得最新状态。当用户执行操作时,它不会直接修改 store,而是向 Service Worker 发送一个命令。
// src/lib/command-dispatcher.ts
import { v4 as uuidv4 } from 'uuid';
import type { AppCommand } from '../types/commands';
// A "fire and forget" dispatcher. The UI trusts the SW to handle it.
// In a real app, this should return a promise that resolves on SW acknowledgement.
export async function dispatchCommand(command: Omit<AppCommand, 'payload.id'> & { payload: Omit<CreateCardCommand['payload'], 'id'> } | AppCommand) {
if (!('serviceWorker' in navigator) || !navigator.serviceWorker.controller) {
console.error("Service Worker not active. Cannot dispatch command.");
// Fallback logic could be implemented here, e.g., queue in localStorage
return;
}
// Assign a client-side ID if it's a creation command
let finalCommand: AppCommand = command as AppCommand;
if (command.type === 'CREATE_CARD') {
finalCommand = {
...command,
payload: {
...command.payload,
id: uuidv4(), // Generate ID on the client
},
} as CreateCardCommand
}
// We use a custom fetch to a "virtual" endpoint.
// The SW will intercept this.
fetch('/api/command', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(finalCommand),
}).catch(err => {
console.error("Dispatching command failed:", err);
// This usually indicates a problem with the SW itself.
});
}
四、Service Worker:命令与同步的中心
这是架构的核心。Service Worker 必须处理两件大事:拦截和处理命令,以及在后台同步事件。
// public/sw.js
import { eventStore } from './lib/event-store'; // Assuming bundler places files correctly
import { v4 as uuidv4 } from 'uuid';
const COMMAND_ENDPOINT = '/api/command';
const SYNC_TAG = 'kanban-event-sync';
// Install and activate listeners... self.skipWaiting() etc.
self.addEventListener('fetch', (event) => {
const { request } = event;
const url = new URL(request.url);
if (request.method === 'POST' && url.pathname === COMMAND_ENDPOINT) {
event.respondWith(handleCommandRequest(request));
}
});
async function handleCommandRequest(request) {
try {
const command = await request.json();
// In a real project, this would be a large switch statement
// with validation logic for each command type.
// For simplicity, we'll just handle one.
if (command.type === 'CREATE_CARD') {
const event = {
eventId: uuidv4(),
commandId: command.payload.id, // Using command ID for idempotency tracking
timestamp: Date.now(),
synced: false,
type: 'CARD_CREATED',
payload: command.payload,
};
await eventStore.append(event);
// Notify active clients that new data is available
const channel = new BroadcastChannel('kanban-events');
channel.postMessage({ type: 'EVENT_APPENDED' });
channel.close();
// Trigger a background sync attempt
if ('sync' in self.registration) {
self.registration.sync.register(SYNC_TAG);
}
return new Response(JSON.stringify({ success: true, eventId: event.eventId }), { status: 202 }); // 202 Accepted
}
// Default handler for unknown commands
return new Response(JSON.stringify({ error: 'Unknown command type' }), { status: 400 });
} catch (error) {
console.error('Error handling command:', error);
return new Response(JSON.stringify({ error: 'Failed to process command' }), { status: 500 });
}
}
// Background Sync handler
self.addEventListener('sync', (event) => {
if (event.tag === SYNC_TAG) {
event.waitUntil(synchronizeEvents());
}
});
async function synchronizeEvents() {
console.log('Background sync started...');
try {
const unsyncedEvents = await eventStore.getUnsyncedEvents();
if (unsyncedEvents.length === 0) {
console.log('No events to sync.');
return;
}
// In a real app, this would be a proper fetch to your backend API
const response = await fetch('/api/sync-events', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(unsyncedEvents),
});
if (!response.ok) {
// The server rejected the batch. This requires sophisticated error handling.
// Maybe some events were invalid? The server should return which ones.
// For now, we assume it will eventually succeed and just throw to trigger a retry.
console.error('Server sync failed:', response.statusText);
throw new Error('Server sync failed');
}
const result = await response.json();
if (result.success) {
const syncedEventIds = unsyncedEvents.map(e => e.eventId);
await eventStore.markAsSynced(syncedEventIds);
console.log(`${syncedEventIds.length} events successfully synced.`);
} else {
// Handle partial success or specific errors from the server
console.error('Sync logic reported failure:', result.error);
throw new Error('Sync logic failed');
}
} catch (error) {
console.error('Synchronization failed:', error);
// Throwing an error here will cause the browser to retry the sync later
// with exponential backoff. This is a key feature of the Background Sync API.
throw error;
}
}
handleCommandRequest
函数是命令的入口点。它验证命令,生成事件,将其持久化,然后立即返回一个“Accepted”响应给 UI,使得界面响应极快。真正的网络同步工作被推迟到 synchronizeEvents
函数中,由浏览器的 Background Sync API 调度,它能在网络恢复时(甚至在标签页关闭后)可靠地执行。
五、架构的局限性与未来迭代
这套架构解决了单用户在不可靠网络下的状态一致性问题,但它并非银弹。
首先,客户端事件日志会无限增长。在生产环境中,必须引入快照 (Snapshotting) 机制。例如,当事件数量达到某个阈值(如1000),系统可以计算出当前状态的快照并存储它,然后安全地清除该快照之前的所有事件。加载状态时,只需加载最新的快照,并重放其后的事件即可。
其次,当前的同步模型是简单的“推送”。它没有处理来自服务器的事件(例如,由另一位协作者产生的事件)。要实现实时协作,需要一个双向同步机制。客户端不仅要推送自己的事件,还要能拉取并合并来自服务器的事件流。这会引入事件版本控制和冲突解决的复杂性,此时可能需要考虑 CRDTs (Conflict-free Replicated Data Types) 或 Operational Transformation (OT) 算法。
最后,事件模式演进 (Schema Evolution) 是一个长期挑战。如果 CardCreatedEvent
的结构发生了变化,旧版本的客户端和服务器如何处理新旧两种格式的事件?这需要引入版本号和转换逻辑,对事件的设计提出了更高的要求。
尽管存在这些高级挑战,但基于 Service Worker 和 Event Sourcing 的客户端架构,为构建真正弹性和离线优先的 Web 应用提供了一个坚实且逻辑清晰的基础。它将关注点清晰地分离,使得复杂的状态管理变得可预测、可测试,并且在用户体验上达到了原生应用般的流畅与可靠。