基于 Service Worker 与 Event Sourcing 构建健壮的离线优先看板架构


传统的离线应用状态同步,通常依赖于在 Service Worker 中缓存 API 的 GET 请求结果。这种策略在处理只读数据时表现尚可,但在处理复杂的用户写操作序列时,其脆弱性便暴露无遗。设想一个场景:用户在离线状态下创建了一张卡片,接着移动了它,然后又编辑了它的内容。如果网络恢复时,我们简单地将这些最终状态的 API 请求(POST /cards, PATCH /cards/:id/position, PATCH /cards/:id/content)依次发送,一旦中间某个请求失败,整个系统的状态就会陷入不一致的混乱之中。这种基于状态同步的模式,其根本问题在于丢失了过程信息,使得冲突解决和错误恢复变得异常困难。

要解决这个问题,我们需要转变思路:不再同步“状态”,而是同步“意图”或“行为”。这正是事件溯源 (Event Sourcing) 架构的核心思想。我们将不再保存对象的当前状态,而是记录下导致其状态变化的所有事件(Events)序列。任何时刻的系统状态,都是通过回放这些事件日志计算(“投影”)出来的。

将这个思想应用到前端,我们可以构建一个极度健壮的离线优先系统。其核心架构是:

  1. UI 层 (Svelte): 负责渲染状态和派发命令 (Commands)。它不直接修改任何状态。
  2. Service Worker: 扮演一个离线的命令处理器和事件总线。它拦截 UI 发来的命令,将其转化为不可变的事件,存入本地持久化存储。
  3. 本地事件存储 (IndexedDB): 作为客户端的“事实之源”(Source of Truth),它是一个只增不减的事件日志。
  4. 投影器 (Projector): 读取事件日志,计算出当前的看板状态,供 UI 层消费。
  5. 同步器 (Synchronizer): 在 Service Worker 中运行,负责在网络恢复时,将本地事件队列推送至后端服务器。

这个架构的流程可以用下面的图来概括:

sequenceDiagram
    participant UI (Svelte)
    participant ServiceWorker
    participant IndexedDB
    participant Server

    UI->>+ServiceWorker: Dispatch Command (e.g., CreateCard)
    ServiceWorker->>ServiceWorker: Validate Command
    ServiceWorker->>+IndexedDB: Generate & Persist Event (e.g., CardCreated)
    IndexedDB-->>-ServiceWorker: Persist Success
    ServiceWorker-->>-UI: Acknowledge Command
    
    Note right of UI: UI re-projects state from IndexedDB, view updates.

    alt Network Online
        ServiceWorker->>+Server: Sync Unsynced Events
        Server-->>-ServiceWorker: Sync Success
        ServiceWorker->>IndexedDB: Mark Events as Synced
    end

接下来,我们逐步实现这个架构。

一、定义命令与事件

在事件溯源系统中,严格区分命令(Commands)和事件(Events)至关重要。

  • 命令 (Command): 表达用户的“意图”。它是一个请求,可能会被系统拒绝。例如,“请求创建一张卡片”。
  • 事件 (Event): 描述已经发生且不可改变的“事实”。它不能被拒绝。例如,“一张卡片已被创建”。

我们将使用 TypeScript 来定义这些核心数据结构,这在真实项目中能提供极佳的类型安全保障。

// src/types/commands.ts

export interface CreateCardCommand {
  type: 'CREATE_CARD';
  payload: {
    id: string; // Client-generated UUID
    columnId: string;
    content: string;
  };
}

export interface MoveCardCommand {
  type: 'MOVE_CARD';
  payload: {
    cardId: string;
    targetColumnId: string;
    targetIndex: number;
  };
}

// ... other commands like EditCardCommand, DeleteCardCommand

export type AppCommand = CreateCardCommand | MoveCardCommand;
// src/types/events.ts

interface BaseEvent {
  eventId: string; // Globally unique event identifier
  commandId: string; // The command that caused this event
  timestamp: number;
  synced: boolean; // Flag for server synchronization
}

export interface CardCreatedEvent extends BaseEvent {
  type: 'CARD_CREATED';
  payload: {
    id: string;

    columnId: string;
    content: string;
  };
}

export interface CardMovedEvent extends BaseEvent {
  type: 'CARD_MOVED';
  payload: {
    cardId: string;
    sourceColumnId: string; // For potential rollbacks or complex projections
    targetColumnId:string;
    targetIndex: number;
  };
}

// ... other events

export type AppEvent = CardCreatedEvent | CardMovedEvent;

一个关键细节是,命令中的 ID(如卡片ID)由客户端生成(例如使用 UUID),这样后续的命令可以在卡片被同步到服务器之前就引用它。每个事件都有一个 synced 标志,用于追踪其与后端的同步状态。

二、构建客户端事件存储 (Event Store)

IndexedDB 是浏览器中唯一适合存储大量结构化数据的持久化方案。我们将封装一个简单的 Event Store 类来处理事件的读写。

// src/lib/event-store.ts
import { openDB, type IDBPDatabase } from 'idb';
import type { AppEvent } from '../types/events';

const DB_NAME = 'KanbanEventStore';
const STORE_NAME = 'events';
const DB_VERSION = 1;

class EventStore {
  private dbPromise: Promise<IDBPDatabase>;

  constructor() {
    this.dbPromise = openDB(DB_NAME, DB_VERSION, {
      upgrade(db) {
        if (!db.objectStoreNames.contains(STORE_NAME)) {
          const store = db.createObjectStore(STORE_NAME, {
            keyPath: 'eventId',
          });
          // Create an index on the 'synced' property to efficiently find unsynced events.
          store.createIndex('by_synced', 'synced', { unique: false });
          // Create an index on timestamp for chronological retrieval.
          store.createIndex('by_timestamp', 'timestamp', { unique: false });
        }
      },
    });
  }

  /**
   * Appends a new event to the store.
   * This is the only write operation allowed.
   * @param event The event object to persist.
   */
  async append(event: AppEvent): Promise<void> {
    const db = await this.dbPromise;
    const tx = db.transaction(STORE_NAME, 'readwrite');
    await tx.store.add(event);
    await tx.done;
    // In a real app, we'd dispatch a global event here
    // to notify active tabs about the data change.
    // e.g., using BroadcastChannel.
  }

  /**
   * Retrieves all events from the store, sorted by timestamp.
   * @returns A promise that resolves to an array of all events.
   */
  async getAllEvents(): Promise<AppEvent[]> {
    const db = await this.dbPromise;
    const events = await db.getAllFromIndex(STORE_NAME, 'by_timestamp');
    return events;
  }

  /**
   * Finds all events that have not been synced to the server.
   * @returns A promise that resolves to an array of unsynced events.
   */
  async getUnsyncedEvents(): Promise<AppEvent[]> {
    const db = await this.dbPromise;
    const events = await db.getAllFromIndex(STORE_NAME, 'by_synced', 0); // 0 is false
    return events;
  }

  /**
   * Marks a batch of events as synced.
   * @param eventIds An array of event IDs to update.
   */
  async markAsSynced(eventIds: string[]): Promise<void> {
    const db = await this.dbPromise;
    const tx = db.transaction(STORE_NAME, 'readwrite');
    const store = tx.store;
    
    // This is more performant than getting and putting each one individually.
    await Promise.all(eventIds.map(async (id) => {
        const event = await store.get(id) as AppEvent;
        if (event) {
            event.synced = true;
            await store.put(event);
        } else {
            // This indicates a logical error, should be logged.
            console.warn(`Attempted to mark non-existent event as synced: ${id}`);
        }
    }));

    await tx.done;
  }
}

// Export a singleton instance.
export const eventStore = new EventStore();

这里的实现包含了必要的索引 (by_synced, by_timestamp),这对于高效查询至关重要。markAsSynced 的批量处理也是生产环境中减少事务开销的常用手段。

三、状态投影与 Svelte UI

UI 的任务变得非常纯粹:从事件流中计算出当前状态并渲染它。这个计算过程就是“投影”。

// src/lib/projector.ts
import type { AppEvent } from '../types/events';

// Define the shape of our projected state.
export interface Card {
  id: string;
  content: string;
  columnId: string;
}

export interface Column {
  id: string;
  title: string;
  cardIds: string[];
}

export interface BoardState {
  columns: Record<string, Column>;
  cards: Record<string, Card>;
  columnOrder: string[];
}

// The initial state before any events are applied.
const getInitialState = (): BoardState => ({
  columns: {
    'col-1': { id: 'col-1', title: 'To Do', cardIds: [] },
    'col-2': { id: 'col-2', title: 'In Progress', cardIds: [] },
    'col-3': { id: 'col-3', title: 'Done', cardIds: [] },
  },
  cards: {},
  columnOrder: ['col-1', 'col-2', 'col-3'],
});

/**
 * The projector function. It takes a stream of events and reduces them
 * into a single, current state object.
 * @param events The array of events to project.
 * @returns The final projected board state.
 */
export function projectState(events: AppEvent[]): BoardState {
  // Use structuredClone for a deep copy to avoid mutating the initial state object.
  const state = structuredClone(getInitialState());

  for (const event of events) {
    switch (event.type) {
      case 'CARD_CREATED': {
        const { id, columnId, content } = event.payload;
        state.cards[id] = { id, content, columnId };
        if (state.columns[columnId]) {
          state.columns[columnId].cardIds.push(id);
        }
        break;
      }
      case 'CARD_MOVED': {
        const { cardId, targetColumnId, targetIndex, sourceColumnId } = event.payload;
        const card = state.cards[cardId];
        if (!card) continue; // Defensive check

        // Remove from old column
        const sourceCol = state.columns[sourceColumnId];
        if (sourceCol) {
            const cardIndex = sourceCol.cardIds.indexOf(cardId);
            if (cardIndex > -1) {
                sourceCol.cardIds.splice(cardIndex, 1);
            }
        }
        
        // Add to new column
        const targetCol = state.columns[targetColumnId];
        if (targetCol) {
            targetCol.cardIds.splice(targetIndex, 0, cardId);
            card.columnId = targetColumnId; // Update card's own column reference
        }
        break;
      }
      // ... handle other event types
    }
  }
  return state;
}

这个投影器是一个纯函数,给定相同的事件序列,它总是返回相同的状态。这是可测试性和可预测性的基石。

在 Svelte 中,我们可以创建一个自定义 store 来集成这个逻辑。

// src/stores/boardStore.ts
import { writable } from 'svelte/store';
import { eventStore } from '$lib/event-store';
import { projectState } from '$lib/projector';
import type { BoardState } from '$lib/projector';

function createBoardStore() {
  const { subscribe, set } = writable<BoardState | null>(null, () => {
    // This runs when the first subscriber subscribes.
    loadInitialState();
    
    // Listen for changes from other tabs or the Service Worker
    const channel = new BroadcastChannel('kanban-events');
    channel.onmessage = (msg) => {
        if (msg.data.type === 'EVENT_APPENDED') {
            console.log('New event detected, reloading state...');
            loadInitialState();
        }
    };

    return () => {
      // This runs when the last subscriber unsubscribes.
      channel.close();
    };
  });

  async function loadInitialState() {
    try {
      const events = await eventStore.getAllEvents();
      const projectedState = projectState(events);
      set(projectedState);
    } catch (error) {
      console.error("Failed to load and project board state:", error);
      // Handle state loading failure
    }
  }

  return {
    subscribe,
    reload: loadInitialState,
  };
}

export const boardStore = createBoardStore();

UI 组件现在只需订阅 boardStore 即可获得最新状态。当用户执行操作时,它不会直接修改 store,而是向 Service Worker 发送一个命令。

// src/lib/command-dispatcher.ts
import { v4 as uuidv4 } from 'uuid';
import type { AppCommand } from '../types/commands';

// A "fire and forget" dispatcher. The UI trusts the SW to handle it.
// In a real app, this should return a promise that resolves on SW acknowledgement.
export async function dispatchCommand(command: Omit<AppCommand, 'payload.id'> & { payload: Omit<CreateCardCommand['payload'], 'id'> } | AppCommand) {
    if (!('serviceWorker' in navigator) || !navigator.serviceWorker.controller) {
        console.error("Service Worker not active. Cannot dispatch command.");
        // Fallback logic could be implemented here, e.g., queue in localStorage
        return;
    }

    // Assign a client-side ID if it's a creation command
    let finalCommand: AppCommand = command as AppCommand;
    if (command.type === 'CREATE_CARD') {
        finalCommand = {
            ...command,
            payload: {
                ...command.payload,
                id: uuidv4(), // Generate ID on the client
            },
        } as CreateCardCommand
    }

    // We use a custom fetch to a "virtual" endpoint.
    // The SW will intercept this.
    fetch('/api/command', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(finalCommand),
    }).catch(err => {
        console.error("Dispatching command failed:", err);
        // This usually indicates a problem with the SW itself.
    });
}

四、Service Worker:命令与同步的中心

这是架构的核心。Service Worker 必须处理两件大事:拦截和处理命令,以及在后台同步事件。

// public/sw.js
import { eventStore } from './lib/event-store'; // Assuming bundler places files correctly
import { v4 as uuidv4 } from 'uuid';

const COMMAND_ENDPOINT = '/api/command';
const SYNC_TAG = 'kanban-event-sync';

// Install and activate listeners... self.skipWaiting() etc.

self.addEventListener('fetch', (event) => {
  const { request } = event;
  const url = new URL(request.url);

  if (request.method === 'POST' && url.pathname === COMMAND_ENDPOINT) {
    event.respondWith(handleCommandRequest(request));
  }
});

async function handleCommandRequest(request) {
  try {
    const command = await request.json();
    
    // In a real project, this would be a large switch statement
    // with validation logic for each command type.
    // For simplicity, we'll just handle one.
    if (command.type === 'CREATE_CARD') {
      const event = {
        eventId: uuidv4(),
        commandId: command.payload.id, // Using command ID for idempotency tracking
        timestamp: Date.now(),
        synced: false,
        type: 'CARD_CREATED',
        payload: command.payload,
      };

      await eventStore.append(event);

      // Notify active clients that new data is available
      const channel = new BroadcastChannel('kanban-events');
      channel.postMessage({ type: 'EVENT_APPENDED' });
      channel.close();

      // Trigger a background sync attempt
      if ('sync' in self.registration) {
        self.registration.sync.register(SYNC_TAG);
      }

      return new Response(JSON.stringify({ success: true, eventId: event.eventId }), { status: 202 }); // 202 Accepted
    }
    
    // Default handler for unknown commands
    return new Response(JSON.stringify({ error: 'Unknown command type' }), { status: 400 });

  } catch (error) {
    console.error('Error handling command:', error);
    return new Response(JSON.stringify({ error: 'Failed to process command' }), { status: 500 });
  }
}

// Background Sync handler
self.addEventListener('sync', (event) => {
  if (event.tag === SYNC_TAG) {
    event.waitUntil(synchronizeEvents());
  }
});

async function synchronizeEvents() {
  console.log('Background sync started...');
  try {
    const unsyncedEvents = await eventStore.getUnsyncedEvents();
    if (unsyncedEvents.length === 0) {
      console.log('No events to sync.');
      return;
    }

    // In a real app, this would be a proper fetch to your backend API
    const response = await fetch('/api/sync-events', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(unsyncedEvents),
    });

    if (!response.ok) {
      // The server rejected the batch. This requires sophisticated error handling.
      // Maybe some events were invalid? The server should return which ones.
      // For now, we assume it will eventually succeed and just throw to trigger a retry.
      console.error('Server sync failed:', response.statusText);
      throw new Error('Server sync failed');
    }

    const result = await response.json();
    if (result.success) {
      const syncedEventIds = unsyncedEvents.map(e => e.eventId);
      await eventStore.markAsSynced(syncedEventIds);
      console.log(`${syncedEventIds.length} events successfully synced.`);
    } else {
        // Handle partial success or specific errors from the server
        console.error('Sync logic reported failure:', result.error);
        throw new Error('Sync logic failed');
    }

  } catch (error) {
    console.error('Synchronization failed:', error);
    // Throwing an error here will cause the browser to retry the sync later
    // with exponential backoff. This is a key feature of the Background Sync API.
    throw error;
  }
}

handleCommandRequest 函数是命令的入口点。它验证命令,生成事件,将其持久化,然后立即返回一个“Accepted”响应给 UI,使得界面响应极快。真正的网络同步工作被推迟到 synchronizeEvents 函数中,由浏览器的 Background Sync API 调度,它能在网络恢复时(甚至在标签页关闭后)可靠地执行。

五、架构的局限性与未来迭代

这套架构解决了单用户在不可靠网络下的状态一致性问题,但它并非银弹。

首先,客户端事件日志会无限增长。在生产环境中,必须引入快照 (Snapshotting) 机制。例如,当事件数量达到某个阈值(如1000),系统可以计算出当前状态的快照并存储它,然后安全地清除该快照之前的所有事件。加载状态时,只需加载最新的快照,并重放其后的事件即可。

其次,当前的同步模型是简单的“推送”。它没有处理来自服务器的事件(例如,由另一位协作者产生的事件)。要实现实时协作,需要一个双向同步机制。客户端不仅要推送自己的事件,还要能拉取并合并来自服务器的事件流。这会引入事件版本控制和冲突解决的复杂性,此时可能需要考虑 CRDTs (Conflict-free Replicated Data Types) 或 Operational Transformation (OT) 算法。

最后,事件模式演进 (Schema Evolution) 是一个长期挑战。如果 CardCreatedEvent 的结构发生了变化,旧版本的客户端和服务器如何处理新旧两种格式的事件?这需要引入版本号和转换逻辑,对事件的设计提出了更高的要求。

尽管存在这些高级挑战,但基于 Service Worker 和 Event Sourcing 的客户端架构,为构建真正弹性和离线优先的 Web 应用提供了一个坚实且逻辑清晰的基础。它将关注点清晰地分离,使得复杂的状态管理变得可预测、可测试,并且在用户体验上达到了原生应用般的流畅与可靠。


  目录