Files
tools/docs/architecture/notification-system.md
robonen 8adc2522c6 docs: site WIP, extractor type cleanup, tests preset; add broadcastedRef
Type the docs extractor's package.json parsing as unknown; comment the Vite
plugin version-skew cast; wire the tests preset; site/architecture WIP.
2026-06-15 16:55:22 +07:00

62 KiB

Notification System Architecture

WebSocket-driven, type- and priority-aware notifications for the @robonen monorepo.

Status: design proposal (not yet implemented). Synthesized from a multi-agent design pass and adversarially verified against the real repo APIs. The "Verified facts" call-outs below are confirmed against source; the corrections from review are already folded into the code in this document.


0. Context — what already exists

The presentation layer already exists: a full sonner-style headless toast primitive lives at vue/primitives/src/toast/ToastProvider, ToastRoot, ToastViewport, ToastTitle/Description/Action/Close/Announce/Portal, with per-toast auto-dismiss (pause/resume RAF timer), swipe-to-dismiss, teleport-to-viewport, a11y live-region announce, collection-based stacking, and controlled/uncontrolled v-model:open. We drive it, we do not fork it.

The data-layer building blocks also exist and are reused verbatim:

Need Reuse Path
Bounded priority heap PriorityQueue (min-heap) core/stdlib/src/structs/PriorityQueue
Bounded FIFO/reorder Deque core/stdlib/src/structs/Deque
Concurrency-limited drain AsyncPool core/stdlib/src/async/pool
Connection state machine createAsyncMachine core/stdlib/src/patterns/behavioral/StateMachine/async
Plugin/middleware model composePlugins / definePlugin core/fetch/src/plugin.ts
In-process events createEventHook vue/toolkit
Reactive cross-tab storage useStorage (write-lock + storage events) vue/toolkit
Online/offline useNetwork vue/toolkit
Tab leader (Web Locks) useTabLeader vue/toolkit
Cross-tab ref (BroadcastChannel) broadcastedRef vue/toolkit
Typed provide/inject useContextFactory vue/toolkit
Disposable singleton createSharedComposable vue/toolkit
App-lifetime singleton useAppSharedState vue/toolkit
Causal-ordering reference Replica (Lamport/version-vector) core/crdt/src/doc/replica.ts

There is no WebSocket / SSE / streaming code anywhere — the transport is greenfield.


1. TL;DR & key decisions

A four-layer, transport-agnostic system:

  1. L1 core/realtime (greenfield, framework-free) owns the socket: a createAsyncMachine lifecycle, a core/fetch-cloned plugin/middleware pipeline, full-jitter reconnect gated on useNetwork, a heartbeat watchdog cleared by any inbound frame, a scalar resume cursor that advances only on contiguous delivery, a Deque+AsyncPool outbound queue with a shared AbortSignal, and a swappable Transport interface shipped with both WebSocket and SSE.
  2. L2 ingest() — a single pure funnel that validates every notification (WS-pushed and locally-issued notify()/toast.* alike) through the type registry, producing a normalized Notification entity.
  3. L3 domain store — inbox via useAppSharedState, socket-owning client via createSharedComposable; a shallowRef<Map> + version-counter with a synchronous pure-reducer lifecycle, two-key (id + contentHash) dedup, a small surfacing-budget PriorityQueue distinct from a large durable-capacity Deque, derived selectors, optimistic actions with token-correlated ack/rollback, and useStorage cross-tab sync where commit() structurally separates ingest-commit (may toast) from hydration-commit (never toasts).
  4. L4 presentation — a <Toaster> that drives the existing toast primitive (v-for over the bounded toast slice) plus a headless <NotificationCenter>, both reusing one per-type renderer via a surface: 'toast' | 'inbox' prop.
Decision Choice Why
Transport home greenfield core/realtime (no Vue import) Framework-free → trivially SSR-excluded, headless-testable, mirrors the core/fetch boundary. Thin useRealtime wrapper in vue/toolkit.
Connection lifecycle one createAsyncMachine instance Genuine async side-effects per transition. NET_OFFLINE/NET_ONLINE are first-class events that pause retry scheduling.
Entity lifecycle synchronous pure reducer + legal-edge table createAsyncMachine.send is async + per-instance stateful; hundreds of entities each with async hot-path transitions is wrong.
Plugin system cloned 1:1 from core/fetch (execute field, composeRealtimePlugins(plugins, userDefaults), setup({defaults})) Team knows the model; preserves EMPTY_HOOKS fast path. Do not rename executetransform.
Resume cursor scalar seq, advances only on contiguous delivery Single-writer stream collapses the version-vector to one scalar. Advancing on a gapped/buffered frame silently skips messages on next resume.
Reorder bounded Deque + gap-timeout flush Strict in-order release head-of-line-stalls the stream on permanent loss; the timeout trades completeness for liveness + triggers server reconciliation.
Dedup two keys: server id + contentHash (FNV-1a) id makes at-least-once idempotent; contentHash folds retransmits + identical-content distinct-id events into meta.count for +N more.
Capacity two independent bounds Tiny surfacing budget PriorityQueue (~3) vs large durable Deque (~300). Conflating them spams toasts or drops history.
PriorityQueue orientation min-heap victim comparator + separate display sort enqueue throws RangeError when full; peek()/dequeue() return the comparator minimum. The PQ comparator makes the eviction victim the minimum; the viewport order is a separate explicit sort.
Store singleton inbox+registry via useAppSharedState; socket via createSharedComposable createSharedComposable drops constructor args after first call → never wrap an options-taking store in it. Socket must dispose on last unmount + be raw-on-SSR.
Cross-tab no-re-toast commit() separates ingest-commit vs hydration-commit; per-tab non-persisted toastedIds Set Correct without leader election. useEventBus is same-document only and cannot dedup across tabs.
Leader (optional) existing useTabLeader (Web Locks) + broadcastedRef fan-out navigator.locks auto-releases on tab crash (no stale-lease/split-brain). Do not hand-roll a useStorage lease coordinator (layering inversion + race).
Imperative API single ingest() funnel; toast.success = sugar over a built-in __toast type Identical validate/priority/group/lifecycle regardless of source; one code path to test.
Presentation drive the existing Toast primitive; one renderer for both surfaces Consumer-owned list + @update:open removal exactly per toast/demo.vue. surface prop kills duplication.

2. Architecture diagram

                    ┌─────────────────────────────────────────────────────────────┐
                    │  WS SERVER  (single totally-ordered stream per user)          │
                    └───────▲───────────────────────────────────────────┬──────────┘
        outbound (acks,     │  ResumeFrame{cursor} on (re)connect         │ inbound frames
        ActionFrame,        │                                             │ (notification | hello | ack | pong)
        ResumeFrame, ping)  │                                             ▼
┌───────────────────────────┴─────────────────────────────────────────────────────────────────┐
│ L1  core/realtime  (framework-free)                                                            │
│   Transport iface ── createWebSocketTransport | createSseTransport  (swap seam, both shipped)  │
│        │ onMessage/onOpen/onClose/onError                                                       │
│        ▼                                                                                        │
│   RealtimeClient ── createAsyncMachine: idle→connecting→connected→reconnecting→error→closed     │
│        ├─ plugins (composeRealtimePlugins): auth · codec(JSON) · heartbeat · resume · log · gzip │
│        ├─ heartbeat watchdog (cleared by ANY inbound frame)  ── useNetwork-gated backoff*       │
│        ├─ reorder Deque (causal-hold) + gap-timeout  ── cursor advances ONLY on contiguous      │
│        ├─ dedup-by-id bounded LRU                                                                │
│        └─ outbound: Deque(bounded) ─drain→ AsyncPool(shared AbortSignal) ──┐ send()             │
│   onNotification(env) ─────────────────────────────────────┐              │                    │
│   onControl(ack) ─────────────────────────────┐            │              │ ack→commit/rollback │
└────────────────────────────────────────────────┼───────────┼──────────────┼────────────────────┘
   (* useNetwork.isOnline is watched in the Vue useRealtime wrapper, which dispatches
      NET_OFFLINE/NET_ONLINE into machine.send — core/realtime never imports Vue.)
                                                  │           │              │
   local notify()/toast.* ────────────┐          │           ▼              │
                                       ▼          │   ┌──────────────────┐   │
                              ┌──────────────────────▼┐ │ L2 ingest()    │   │
                              │ ONE ingest() funnel    │ │ (pure)         │   │
                              │ registry.resolve →     │◄┘ validate→build │   │
                              │ validate → normalize → │   IngestResult   │   │
                              │ contentHash → dedup    │                  │   │
                              └───────────┬────────────┘                  │   │
                                          │ commit(result, source)        │   │
                                          ▼                               │   │
┌──────────────────────────────────────────────────────────────────────┐ │   │
│ L3  domain store  (vue/toolkit)                                        │ │   │
│   shallowRef<Map<id,Notification>> + version-counter                   │ │   │
│   pure-reducer lifecycle (legal-edge table)                            │ │   │
│   surfacing PriorityQueue(~3, victim=min) ◄─ interrupt ─► durable Deque(~300) │
│   selectors: unreadCount · byPriority · byType · groups · toasting     │─┘   │
│   optimistic: clientToken ─send→ ───────────────────────────────────────────┘
│   useStorage(map) cross-tab  ── commit(): ingest=may-toast / hydrate=never   
│   optional useTabLeader + broadcastedRef (leader holds socket)         │
└───────────┬───────────────────────────────────────────────┬──────────┘
            │ store.toasting (bounded, sorted selector)       │ store.groups / unreadCount
            ▼                                                 ▼
┌────────────────────────────────┐            ┌──────────────────────────────────────┐
│ <Toaster>  (vue/primitives)    │            │ <NotificationCenter> (headless)        │
│ ToastProvider                  │            │ Root ctx via useContextFactory         │
│  └ ToastRoot v-for (EXISTING   │            │ CollectionItem-ordered groups          │
│     toast primitive, unforked) │   shared   │ data-state hooks                       │
│     v-model:open               │── per-type ┤  <component :is=renderer surface=inbox> │
│     @update:open → dismissToast│   renderer │                                        │
│     <component :is=renderer ───┤            │                                        │
│        surface="toast">        │            │                                        │
│  └ ToastViewport               │            │                                        │
└────────────────────────────────┘            └──────────────────────────────────────┘

3. File & package layout

core/realtime/                                   # NEW greenfield @robonen/realtime (framework-free)
  package.json  tsdown.config.ts  README.md
  src/
    index.ts                                     # barrel
    types.ts                                     # Envelope union, RealtimeState/Event, Transport, options
    client.ts                                    # createRealtimeClient — machine + pool + plugins + reorder
    machine.ts                                   # buildLifecycleMachine() -> createAsyncMachine(...)
    envelope.ts                                  # encode/decode (JSON default) + type guards
    reorder.ts                                   # seq reorder Deque + bounded-LRU dedup + contiguous-cursor + gap-timeout
    backoff.ts                                   # bespoke backoffDelay(attempt, opts) — full jitter (NOT stdlib retry)
    outbound.ts                                  # bounded Deque + AsyncPool drainer + ack correlation (clientToken)
    lru.ts                                       # bounded LruSet for dedup
    plugin.ts                                    # defineRealtimePlugin / composeRealtimePlugins (mirror core/fetch)
    transports/{websocket.ts, sse.ts}
    plugins/{auth,codec,heartbeat,resume,logger,compression}.ts
    __mocks__/mockTransport.ts
    *.test.ts

vue/toolkit/src/composables/realtime/useRealtime/{index.ts, index.test.ts}   # NEW: thin Vue wrapper + useNetwork wiring

vue/toolkit/src/composables/notifications/       # NEW category — domain + facade
  index.ts  types.ts  registry.ts  ingest.ts  store.ts  surfacing.ts
  selectors.ts  persistence.ts  actions.ts
  useNotifications/index.ts  useNotificationCenter/index.ts  useUnreadCount/index.ts
  __tests__/*.test.ts

vue/primitives/src/toaster/{Toaster.vue, index.ts, demo.vue}                  # NEW: drives existing toast/
vue/primitives/src/notification-center/                                       # NEW headless inbox primitive
  context.ts  NotificationCenterRoot.vue  NotificationCenterList.vue
  NotificationCenterItem.vue  NotificationCenterGroup.vue
  NotificationCenterBadge.vue  NotificationCenterEmpty.vue
  index.ts  demo.vue  __test__/NotificationCenter.test.ts

# REUSED UNCHANGED: vue/primitives/src/toast/*, core/stdlib structs/async, core/fetch plugin model,
#   vue/toolkit (createEventHook/useStorage/useNetwork/useTabLeader/broadcastedRef/
#   createSharedComposable/useAppSharedState/useContextFactory), @robonen/platform/{multi,browsers}

4. Transport layer (core/realtime)

4.1 Wire envelope protocol

// core/realtime/src/types.ts
export interface BaseEnvelope { v: 1; id: string; ts: number }

// ── inbound DATA plane ──
export type NotificationFrame = BaseEnvelope & {
  kind: 'notification';
  type: string;                 // registry key
  seq: number;                  // strictly-monotonic per session (server-assigned)
  priority?: NotificationPriority;
  groupKey?: string;
  payload: unknown;             // validated downstream by the registry
};

// ── CONTROL plane ──
export type HelloFrame     = BaseEnvelope & { kind: 'hello'; sessionId: string; cursor?: string };
export type AckFrame       = BaseEnvelope & { kind: 'ack'; ref: string; ok: boolean; error?: string };  // ref = correlation token (see §5.5)
export type HeartbeatFrame = BaseEnvelope & { kind: 'ping' | 'pong' };

export type ResumeFrame    = BaseEnvelope & { kind: 'resume'; cursor?: string };
export type ActionFrame    = BaseEnvelope & { kind: 'action'; ref: string; type: string; action: string; clientToken: string; payload?: unknown };

export type InboundEnvelope  = NotificationFrame | HelloFrame | AckFrame | HeartbeatFrame;
export type OutboundEnvelope = ResumeFrame | AckFrame | HeartbeatFrame | ActionFrame;
export type ControlFrame     = HelloFrame | AckFrame | HeartbeatFrame;

export type RealtimeState = 'idle' | 'connecting' | 'connected' | 'reconnecting' | 'error' | 'closed';
export type RealtimeEvent =
  | 'CONNECT' | 'OPENED' | 'CONNECTED' | 'DROPPED' | 'RETRY'
  | 'GIVE_UP' | 'CLOSE' | 'NET_OFFLINE' | 'NET_ONLINE';

export interface Transport {
  connect(url: string, signal: AbortSignal): Promise<void>;
  send(raw: string | ArrayBuffer): void;
  close(code?: number, reason?: string): void;
  readonly onOpen:    (cb: () => void) => () => void;
  readonly onMessage: (cb: (raw: unknown) => void) => () => void;
  readonly onClose:   (cb: (e: { code: number; reason: string; wasClean: boolean }) => void) => () => void;
  readonly onError:   (cb: (err: unknown) => void) => () => void;
}

Server contract this layer depends on (new, server-side, not part of any existing repo API): (1) strictly-monotonic per-session server-assigned seq; (2) on ResumeFrame{cursor} the server replays every frame with seq > cursor; (3) a reconciliation endpoint/command to re-request a permanently-lost [from, to] range flushed by the gap-timeout; (4) action commands are idempotent, keyed by clientToken, and the server echoes that token in AckFrame.ref.

4.2 Plugin system — cloned 1:1 from core/fetch

Verified: core/fetch uses composePlugins(plugins, userDefaults), an execute onion-middleware field, setup({defaults}) run after the merge, and a frozen EMPTY_HOOKS fast path. We mirror this exactly — the field is execute, not transform.

// core/realtime/src/plugin.ts
export interface RealtimeContext {
  state: RealtimeState; attempt: number; cursor: string | undefined;
  sessionId: string | undefined; aborter: AbortController;
}
export type RealtimeHook<C = RealtimeContext> = (context: C) => void | Promise<void>;
export interface RealtimeHooks {
  onConnect?: RealtimeHook<{ ctx: RealtimeContext; url: string }> | RealtimeHook<{ ctx: RealtimeContext; url: string }>[];
  onMessage?: RealtimeHook<{ raw: unknown; env?: InboundEnvelope }> | RealtimeHook<{ raw: unknown; env?: InboundEnvelope }>[]; // DECODE here (set env)
  onSend?:    RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }> | RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }>[]; // ENCODE here (set raw)
  onClose?:   RealtimeHook<{ code: number; reason: string }> | RealtimeHook<{ code: number; reason: string }>[];
  onError?:   RealtimeHook<{ error: unknown }> | RealtimeHook<{ error: unknown }>[];
}
export type RealtimeSendMiddleware = (ctx: { env: OutboundEnvelope; raw?: unknown }, next: () => Promise<void>) => Promise<void>;

export interface RealtimePlugin<Name extends string = string> {
  readonly name: Name;
  readonly defaults?: Partial<RealtimeClientOptions>;
  readonly hooks?: RealtimeHooks;
  readonly execute?: RealtimeSendMiddleware;                       // onion around each socket write
  readonly setup?: (ctx: { readonly defaults: RealtimeClientOptions }) => void;
}
export function defineRealtimePlugin<const N extends string>(p: RealtimePlugin<N>): RealtimePlugin<N> { return p; }

// flattens per-phase hook arrays + onion-composes `execute`, identical structure to composePlugins
export function composeRealtimePlugins(
  plugins: readonly RealtimePlugin[] | undefined,
  userDefaults: Partial<RealtimeClientOptions> | undefined,
): ComposedRealtimePlugins { /* mirror of core/fetch composePlugins; EMPTY_HOOKS fast path */ }

Shipped tree-shakeable plugins (canonical order asserted in setup: codec encodes before compression):

// core/realtime/src/plugins/codec.ts  — JSON default (core/encoding has NO generic codec, verified)
export const jsonCodec = defineRealtimePlugin({
  name: 'codec',
  hooks: {
    onMessage: ({ raw }) => { /* env = JSON.parse(raw as string); shape-check */ },
    onSend:    (c) => { c.raw = JSON.stringify(c.env); },
  },
});
// auth.ts (onConnect: append token subprotocol/query; refresh on error)
// heartbeat.ts (owns ping interval + watchdog)   resume.ts (persists & injects cursor)
// logger.ts (WeakMap timing like the fetch logger)   compression.ts (execute middleware)

4.3 Connection lifecycle — createAsyncMachine

Verified ordering inside send(): guard → action → exit → set currentState → entry, all awaited; one action per transition, one entry/exit per state; the initial state's entry does not auto-run on construction. NET_OFFLINE/NET_ONLINE are first-class events that pause/resume retry scheduling instead of burning the attempt counter.

// core/realtime/src/machine.ts
export function buildLifecycleMachine(deps: MachineDeps) {
  return createAsyncMachine({
    initial: 'idle',
    context: { state: 'idle', attempt: 0, cursor: undefined, sessionId: undefined, aborter: new AbortController() } as RealtimeContext,
    states: {
      idle: { on: { CONNECT: 'connecting' } },

      connecting: {
        entry: async (ctx) => { ctx.aborter = new AbortController(); await deps.transport.connect(deps.url(), ctx.aborter.signal); },
        on: {
          OPENED:  { target: 'connected', action: async (ctx) => { await deps.send({ kind: 'resume', cursor: ctx.cursor, v: 1, id: deps.id(), ts: deps.now() }); } },
          DROPPED: { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() },
          CLOSE:   'closed',
        },
      },

      connected: {
        entry: () => { deps.startHeartbeat(); deps.flushOutbound(); },   // deterministic side-effects
        exit:  () => { deps.stopHeartbeat(); },
        on: {
          DROPPED:     { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() },
          NET_OFFLINE: { target: 'reconnecting', action: () => deps.pauseRetries() },  // sit, do NOT schedule RETRY
          CLOSE:       'closed',
        },
      },

      reconnecting: {
        entry: (ctx) => { if (deps.isOnline()) deps.scheduleRetry(ctx.attempt); },     // backoffDelay(attempt) then RETRY
        on: {
          RETRY:      { target: 'connecting', action: (ctx) => { ctx.attempt++; } },
          NET_ONLINE: { target: 'connecting', action: (ctx) => { ctx.attempt = 0; } }, // reset + immediate reconnect
          GIVE_UP:    { target: 'error', guard: () => !deps.canRetry() },
          CLOSE:      'closed',
        },
      },

      error:  { on: { CONNECT: 'connecting' } },
      closed: { entry: (ctx) => { ctx.aborter.abort(); deps.transport.close(1000); } },
    },
  });
}

backoff.ts is a bespoke standalone function (it does not call stdlib retry(), which loops attempts internally and cannot drive an event-driven, network-gated, machine-scheduled reconnect). It borrows only the shape of a delay: (n) => number function:

export function backoffDelay(attempt: number, o: { base?: number; cap?: number; jitter?: boolean } = {}): number {
  const base = o.base ?? 500, cap = o.cap ?? 30_000;
  const raw = Math.min(cap, base * 2 ** attempt);
  return o.jitter === false ? raw : raw * (0.5 + Math.random() * 0.5);   // full jitter
}
// scheduleRetry(attempt): a no-op while !isOnline(); else setTimeout(backoffDelay(attempt)) -> machine.send('RETRY').

4.4 Heartbeat / zombie detection

heartbeatPlugin arms a watchdog whenever connected. Any inbound frame (pong or data) clears it; on fire → machine.send('DROPPED'). This is the only reliable TCP-half-open detector. Conservative configurable defaults (ping 20s / watchdog 10s).

4.5 Ordering / dedup / resume

// core/realtime/src/lru.ts — bounded dedup set (prevents an unbounded memory leak over a long session)
export class LruSet {
  private map = new Map<string, true>();
  constructor(private max = 4096) {}
  has(k: string) { return this.map.has(k); }
  add(k: string) {
    if (this.map.has(k)) this.map.delete(k);        // bump to MRU
    this.map.set(k, true);
    if (this.map.size > this.max) this.map.delete(this.map.keys().next().value!);  // evict LRU
  }
}

// core/realtime/src/reorder.ts
export function createReorderBuffer(opts: { gapTimeoutMs?: number; onReconcile?: (from: number, to: number) => void }) {
  const buffer = new Deque<NotificationFrame>([], { maxSize: 1024 });   // out-of-order hold (causal-hold; ref: crdt replica.ts)
  const seen = new LruSet(8192);                                        // BOUNDED dedup-by-id
  let lastSeq = 0;
  let cursor: string | undefined;                                       // = String(lastSeq), persisted

  function accept(f: NotificationFrame): NotificationFrame[] {
    if (seen.has(f.id) || f.seq <= lastSeq) return [];                  // duplicate / replay -> drop (idempotent at-least-once)
    seen.add(f.id);
    if (f.seq === lastSeq + 1) {
      const out = [f]; lastSeq = f.seq; cursor = String(lastSeq);       // CONTIGUOUS — advance cursor ONLY here
      let next: NotificationFrame | undefined;
      while ((next = takeSeq(buffer, lastSeq + 1))) { out.push(next); lastSeq = next.seq; cursor = String(lastSeq); }
      return out;
    }
    putBySeq(buffer, f);                                                // gap: hold, DO NOT advance cursor
    armGapTimeout(() => opts.onReconcile?.(lastSeq + 1, f.seq - 1));    // permanent-loss -> deliver-with-gap + server reconcile
    return [];
  }
  return { accept, get cursor() { return cursor; }, hydrate(c?: string) { if (c) { lastSeq = Number(c); cursor = c; } } };
}

Resume: ResumeFrame{cursor} is sent on every connected.entry; the server replays seq > cursor. resumePlugin persists the cursor via useStorage so it survives a full reload. A scalar suffices — a single-writer per-user stream collapses the version-vector to one entry (O(1) resume, small wire).

4.6 Outbound queue / backpressure

// core/realtime/src/outbound.ts
export function createOutbound(deps: { transport: Transport; signal: AbortSignal; concurrency?: number; max?: number }) {
  const queue = new Deque<OutboundEnvelope>([], { maxSize: deps.max ?? 256 });
  const pool  = new AsyncPool({ concurrency: deps.concurrency ?? 4, signal: deps.signal }); // CLOSE aborts in-flight
  let connected = false;
  const acks = new Map<string, { resolve: () => void; reject: (e: unknown) => void }>();    // keyed by AckFrame.ref

  function enqueue(env: OutboundEnvelope) {
    if (queue.isFull && env.kind !== 'action') queue.popFront();        // drop-oldest non-critical (action frames never dropped)
    queue.pushBack(env);
    if (connected) flush();
  }
  function flush() {
    let env: OutboundEnvelope | undefined;
    while ((env = queue.popFront())) {
      const e = env;
      pool.add(async (signal) => {
        deps.transport.send(serialize(e));
        if (e.kind === 'action') await waitForAck(e.clientToken, acks, { timeout: 8000, signal });  // canonical key = clientToken
      });
    }
  }
  function resolveAck(ref: string, ok: boolean, error?: string) {       // called from the inbound ack path; ref === clientToken
    const w = acks.get(ref); if (!w) return; acks.delete(ref); ok ? w.resolve() : w.reject(new Error(error));
  }
  return { enqueue, flush, setConnected: (v: boolean) => { connected = v; if (v) flush(); }, resolveAck, dispose: () => pool.dispose() };
}

AsyncPool has no pause() (verified). We gate by not adding while disconnected and abort in-flight via the shared signal on CLOSE. The ack correlation key is clientToken end-to-end: the ActionFrame carries it, outbound blocks waitForAck(clientToken), and the server echoes it in AckFrame.ref — one canonical key, no ref/clientToken mismatch.

4.7 Client surface + single-shared-socket across tabs

// core/realtime/src/client.ts
export interface RealtimeClient {
  readonly state: () => RealtimeState;
  connect(): void; disconnect(code?: number): void;
  netOnline(): void; netOffline(): void;                            // dispatched by the Vue wrapper from useNetwork
  onNotification(cb: (f: NotificationFrame) => void): () => void;   // ordered + deduped
  onControl(cb: (f: ControlFrame) => void): () => void;
  onStateChange(cb: (s: RealtimeState) => void): () => void;
  send(env: OutboundEnvelope): Promise<void>;                       // queued offline, flushed on connect
  resumeCursor(): string | undefined;
  dispose(): void;
}
export function createRealtimeClient(options: RealtimeClientOptions): RealtimeClient { /* machine + reorder + outbound + plugins */ }

The Vue/non-Vue boundary is crossed only in the wrapper: core/realtime never imports useNetwork; the wrapper watches useNetwork().isOnline and dispatches into the machine. Cross-tab single socket is an optional optimization built from the existing primitives — never a hand-rolled lease:

// vue/toolkit/src/composables/realtime/useRealtime/index.ts
export function useRealtime(options: UseRealtimeOptions): UseRealtimeReturn {
  if (!isClient) return ssrNoopRealtime();                          // no socket on server

  const { isOnline } = useNetwork();
  const { isLeader } = useTabLeader('robonen:notif-socket');        // Web Locks — atomic, crash-safe failover
  const client = createRealtimeClient(options);
  const fanout = broadcastedRef<NotificationFrame | null>('robonen:notif-inbound', null);  // BroadcastChannel — truly cross-tab

  watch(isOnline, (online) => online ? client.netOnline() : client.netOffline());          // wire NET_ONLINE/NET_OFFLINE

  watchEffect(() => {
    if (isLeader.value) { client.connect(); client.onNotification((f) => { fanout.value = f; }); }
    else                { client.disconnect(); /* follower ingests from `fanout`, runs inbox-only */ }
  });
  tryOnScopeDispose(() => client.dispose());
  return { /* state, isConnected, connect, disconnect, onNotification, send */ };
}
export const useSharedRealtime = createSharedComposable(useRealtime);  // ref-counted, disposed on last unmount, raw-on-SSR

5. Domain / data layer

5.1 Entity model + lifecycle

// vue/toolkit/src/composables/notifications/types.ts
export type NotificationPriority = 0 | 1 | 2 | 3;   // numeric => trivial comparator + interrupt math
export const Priority = { low: 0, normal: 1, high: 2, critical: 3 } as const;

export type NotificationLifecycle =
  | 'received' | 'queued' | 'toasted' | 'read' | 'dismissed' | 'archived' | 'expired';

// legal edges — enforced by the synchronous reducer (illegal => throw in dev, no-op + onError in prod)
export const LIFECYCLE_EDGES: Record<NotificationLifecycle, NotificationLifecycle[]> = {
  received:  ['queued', 'toasted', 'read', 'archived'],
  queued:    ['toasted', 'read', 'dismissed', 'archived'],
  toasted:   ['read', 'dismissed'],
  read:      ['archived', 'dismissed'],
  dismissed: ['archived'],
  archived:  ['expired'],
  expired:   [],
};

export interface Notification<TType extends string = string, TPayload = unknown> {
  readonly id: string;                 // server id (dedup key #1) OR contentHash-derived for local
  readonly type: TType;
  readonly seq: number;                // transport seq (ordering); 0 for local
  readonly payload: TPayload;          // registry-validated
  priority: NotificationPriority;      // envelope override ?? registry.defaultPriority
  lifecycle: NotificationLifecycle;
  readonly groupKey: string | null;    // registry.grouping(payload)
  readonly contentHash: string;        // dedup key #2 (FNV-1a) — folds retransmits / identical content
  readonly createdAt: number;
  readonly source: 'ws' | 'local';
  readReceiptAt: number | null;
  dismissedAt: number | null;
  // runtime-only — NEVER persisted:
  surfaced: boolean;                   // currently occupies a toast slot
  pendingAction: { actionId: string; clientToken: string; prevLifecycle: NotificationLifecycle } | null;
  actionError: string | null;
  meta: { count: number; sourceSeqs: number[] };  // count>1 => "+N more"
}

export interface NotificationGroup {
  readonly key: string; readonly type: string; readonly priority: NotificationPriority;   // max in group
  readonly notifications: Notification[];          // newest-first
  readonly count: number; readonly unread: number; readonly latest: Notification;
}

5.2 The store (singleton split + reactive core)

// vue/toolkit/src/composables/notifications/store.ts
export interface NotificationStoreOptions {
  registry: NotificationRegistry; persistKey?: string;
  maxToasts?: number; maxEntities?: number; client?: UseRealtimeReturn;
}

export function createNotificationStore(options: NotificationStoreOptions): NotificationStore {
  const byId = shallowRef(new Map<string, Notification>());      // house-style shallowRef default
  const version = ref(0);                                        // O(1) selector invalidation
  const bump = () => { triggerRef(byId); version.value++; };
  const toastedIds = new Set<string>();                          // per-tab, NEVER persisted

  // ── SURFACING budget: scarce toast real-estate. ──
  // VERIFIED: PriorityQueue is a MIN-heap; peek()/dequeue() return the comparator MINIMUM.
  // So the comparator must make the EVICTION VICTIM the minimum: lowest priority first, then OLDEST first.
  const cmpEvict = (a: Notification, b: Notification) =>
    (a.priority - b.priority) || (a.createdAt - b.createdAt);     // min = lowest priority, then oldest = the victim
  const surfacing = new PriorityQueue<string>([], {
    comparator: (x, y) => cmpEvict(byId.value.get(x)!, byId.value.get(y)!),
    maxSize: options.maxToasts ?? 3,
  });

  // ── DURABLE capacity: memory retention. ──
  const order = new Deque<string>([], { maxSize: options.maxEntities ?? 300 });

  // ── COMMIT: the single seam. `source` decides toast eligibility. ──
  function commit(r: IngestResult, source: 'ingest' | 'hydration'): void {
    if (!r.ok) return;                                            // invalid already routed to onReject
    const incoming = r.notification;
    const existing = byId.value.get(incoming.id) ?? findByContentHash(byId.value, incoming.contentHash);

    if (existing) {                                               // DEDUP / cross-tab MERGE
      mergeLattice(existing, incoming);                           // max over monotonic lifecycle lattice + per-field LWW
      existing.meta.count++; existing.meta.sourceSeqs.push(incoming.seq);
      bump(); return;
    }
    ensureCapacity();                                             // guarantee room BEFORE pushBack (Deque throws when full)
    byId.value.set(incoming.id, incoming); order.pushBack(incoming.id);

    // toast ONLY on ingest-commit (NEVER on hydration) and only if not already toasted in this tab
    if (source === 'ingest' && options.registry.shouldToast(incoming) && !toastedIds.has(incoming.id))
      surface(incoming);
    bump();
  }

  function surface(n: Notification): void {                       // guarded PriorityQueue use (enqueue THROWS when full)
    if (surfacing.isFull) {
      const victimId = surfacing.peek()!;                         // heap root = most-evictable under cmpEvict
      const victim = byId.value.get(victimId)!;
      if (cmpEvict(n, victim) <= 0) return;                       // newcomer is no better than the victim -> inbox-only
      surfacing.dequeue();                                        // interrupt: evict lowest visible toast...
      transition(victim.id, 'queued'); victim.surfaced = false;   // ...back to inbox, NEVER deleted
    }
    surfacing.enqueue(n.id);                                      // safe now
    transition(n.id, 'toasted'); n.surfaced = true; toastedIds.add(n.id);
  }

  // VERIFIED: Deque.pushBack throws RangeError at maxSize — never rely on "transient capacity".
  function ensureCapacity(): void {
    while (order.isFull) {
      const victimId = pickDurableVictim(order, byId.value);      // expired -> read/archived oldest -> lowest-priority oldest
      if (!victimId) break;                                       // all protected -> last-resort force-evict oldest non-active below
      remove(victimId);
    }
  }
  // pickDurableVictim skips surfaced || pendingAction!=null; only if EVERY candidate is active does it
  // force-evict the oldest non-active entity, guaranteeing pushBack never throws.

  // ... transition (pure reducer over LIFECYCLE_EDGES), markRead, dismissToast (toast->inbox), archive, remove, runAction
  return { /* see selectors + actions below */ };
}

Singleton mapping (the verified-correct split):

// store + registry are app-lifetime (survive route changes) and options are bound ONCE here.
const useStoreSingleton = useAppSharedState(createNotificationStore);   // NOT createSharedComposable (it drops later args)
// the socket-owning client uses createSharedComposable (ref-counted, disposable, raw-on-SSR) — see §4.7.

Verified: createSharedComposable binds args on the first call and ignores them afterward, and returns the raw composable on SSR; useAppSharedState runs the factory once inside a module effectScope(true) that is never stopped (app-lifetime). So: options-bearing inbox/registry → useAppSharedState; disposable socket+timers → createSharedComposable.

5.3 Derived selectors — viewport order from an explicit sort, not heap order

// selectors.ts — VERIFIED: PriorityQueue.toArray()/iteration yields HEAP order, not sorted.
export function buildSelectors(byId: ShallowRef<Map<string, Notification>>, version: Ref<number>, surfacing: PriorityQueue<string>) {
  const live = computed(() => (version.value, [...byId.value.values()]));

  // display order is the INVERSE of the eviction order: highest priority first, then NEWEST first
  const cmpDisplay = (a: Notification, b: Notification) => (b.priority - a.priority) || (b.createdAt - a.createdAt);
  const toasting = computed(() => {
    version.value;
    return [...surfacing].map((id) => byId.value.get(id)!).filter(Boolean).sort(cmpDisplay);  // explicit sort, never heap order
  });

  const unreadCount = computed(() => live.value.filter((n) => n.readReceiptAt === null && n.lifecycle !== 'archived' && n.lifecycle !== 'expired').length);
  const byPriority  = computed(() => groupByKey(live.value, (n) => n.priority));
  const byType      = computed(() => groupByKey(live.value, (n) => n.type));
  const groups      = computed(() => collapseByGroupKey(live.value));  // "+N more"
  return { toasting, unreadCount, byPriority, byType, groups };
}

5.4 Persistence + cross-tab (don't re-toast)

// persistence.ts — VERIFIED: useStorage(key, initial, storage, options); storage is a REQUIRED 3rd arg.
const persisted = useStorage<Map<string, PersistedNotification>>(
  () => options.persistKey ?? 'robonen:notif',
  new Map(),
  localStorage,
  { serializer: StorageSerializers.map, listenToStorageChanges: true, shallow: true, initOnMounted: true },  // SSR-safe defer
);
// PersistedNotification EXCLUDES runtime fields (surfaced, pendingAction, actionError) — never resurrect in-flight state.
// useStorage write-lock prevents self-feedback; 'storage'/'vuetools-storage' events propagate to other tabs.

watch(persisted, (next) => {                                          // remote tab wrote -> hydration-commit (NEVER toasts)
  for (const p of next.values()) commit(ingestPersisted(p), 'hydration');
}, { deep: false });

mergeLattice is the convergence rule for concurrent two-tab edits:

const LIFECYCLE_RANK: Record<NotificationLifecycle, number> = { received:0, queued:1, toasted:2, read:3, dismissed:4, archived:5, expired:6 };
function mergeLattice(local: Notification, remote: Notification): void {
  if (LIFECYCLE_RANK[remote.lifecycle] > LIFECYCLE_RANK[local.lifecycle]) local.lifecycle = remote.lifecycle;  // max over lattice
  local.readReceiptAt = maxNullable(local.readReceiptAt, remote.readReceiptAt);   // per-field LWW
  local.dismissedAt   = maxNullable(local.dismissedAt, remote.dismissedAt);
}

Known limitation: the whole-Map serializer means a tab's write replaces the full key; with N tabs committing concurrently, whole-Map last-writer-wins can clobber a concurrent per-entity edit before mergeLattice (which only runs on the receiving side after a storage event) reconciles it. For strict multi-tab durability, enable the optional useTabLeader single-writer path.

5.5 Optimistic actions — token-correlated ack/commit/rollback

// actions.ts
async function runAction(id: string, actionId: string): Promise<void> {
  const n = byId.value.get(id); if (!n) return;
  const action = options.registry.resolve(n.type)?.actions?.find((a) => a.id === actionId);
  if (!action) return;
  const clientToken = crypto.randomUUID();                                // THE canonical correlation key
  const prevLifecycle = n.lifecycle;
  applyOptimistic(n, action.optimistic?.(n) ?? {});                       // immediate, reactive
  n.pendingAction = { actionId, clientToken, prevLifecycle }; bump();
  try {
    await options.client?.send({ kind: 'action', v: 1, id: crypto.randomUUID(), ts: Date.now(),
      ref: clientToken, type: n.type, action: actionId, clientToken });   // server echoes clientToken in AckFrame.ref
    await waitForAck(clientToken, { timeout: 8000 });                     // ack-after-timeout IGNORED via token compare
    n.pendingAction = null; if (action.onCommit) transition(id, action.onCommit); bump();
  } catch (e) {
    // DO NOT rollback on a mid-action socket drop — frame is queued, await ack post-reconnect.
    if (isTimeoutOrServerError(e)) { transition(id, prevLifecycle); n.pendingAction = null; n.actionError = String(e); bump(); }
  }
}

Eviction skips entities with surfaced || pendingAction !== null.


6. Type registry

// vue/toolkit/src/composables/notifications/registry.ts
export interface NotificationRendererProps<P = unknown> {
  notification: Notification<string, P>;
  surface: 'toast' | 'inbox';                       // ONE renderer, both surfaces
  runAction: (actionId: string) => void;
  dismiss: () => void;
}
export interface NotificationActionDef<P = unknown> {
  readonly id: string;                              // 'accept' | 'decline' | 'reply' | 'mark-read'
  readonly label: string | ((n: Notification<string, P>) => string);
  readonly altText: string;                         // -> ToastAction alt-text (required by the primitive)
  readonly variant?: 'primary' | 'secondary' | 'danger';
  readonly optimistic?: (n: Notification<string, P>) => Partial<Notification>;
  readonly onCommit?: NotificationLifecycle;        // terminal lifecycle on ack
}
export type ToastPolicy = 'always' | 'never' | 'by-priority';
export interface NotificationTypeDef<P = unknown> {
  readonly type: string;
  readonly validate: (raw: unknown) => P;           // throws on malformed -> caught per-message in ingest
  readonly renderer: Component;                     // NotificationRendererProps<P>
  readonly actions?: readonly NotificationActionDef<P>[];
  readonly defaultPriority?: NotificationPriority;
  readonly toastPolicy?: ToastPolicy;               // default 'by-priority'
  readonly grouping?: (p: P) => string | null;
  readonly contentHash?: (p: P) => string;          // override the FNV-1a default
  readonly sound?: string | false;
  readonly ttl?: number;                            // toast auto-dismiss ms; Infinity = sticky
  readonly persist?: boolean;                       // default true
}
export function defineNotificationType<P>(def: NotificationTypeDef<P>): NotificationTypeDef<P> { return def; }

export interface NotificationRegistry {
  register<P>(def: NotificationTypeDef<P>): void;
  resolve(type: string): NotificationTypeDef | undefined;
  has(type: string): boolean;
  shouldToast(n: Notification): boolean;
  priorityFor(type: string, override?: NotificationPriority): NotificationPriority;
  all(): readonly NotificationTypeDef[];
}
export function createNotificationRegistry(
  defs?: readonly NotificationTypeDef[],
  opts?: { onUnknownType?: (type: string) => void; toastFloor?: NotificationPriority },
): NotificationRegistry;

// provided/injected so renderers + store + ingestion share ONE instance (and tests inject a fake)
export const { inject: useNotificationRegistry, provide: provideNotificationRegistry, appProvide: appProvideNotificationRegistry } =
  useContextFactory<NotificationRegistry>('NotificationRegistry');

Registering the three required types:

const friendRequest = defineNotificationType<{ fromUserId: string; name: string; avatar: string }>({
  type: 'friend_request',
  validate: (raw) => { const p = raw as any; if (typeof p?.fromUserId !== 'string') throw new Error('bad friend_request'); return p; },
  renderer: FriendRequestCard,
  defaultPriority: Priority.high,
  toastPolicy: 'by-priority',
  grouping: () => null,
  sound: '/sounds/ping.mp3',
  ttl: Infinity,                                    // sticky until acted on
  actions: [
    { id: 'accept',  label: 'Accept',  altText: 'Accept friend request',  variant: 'primary',
      optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' },
    { id: 'decline', label: 'Decline', altText: 'Decline friend request', variant: 'danger',
      optimistic: () => ({ lifecycle: 'dismissed', dismissedAt: Date.now() }), onCommit: 'dismissed' },
  ],
});

const systemAlert = defineNotificationType<{ title: string; body: string }>({
  type: 'system_alert',
  validate: (raw) => raw as any,
  renderer: SystemAlertCard,
  defaultPriority: Priority.critical,               // renders ToastRoot type='foreground' (assertive aria-live)
  toastPolicy: 'always',
  ttl: 60_000,
  actions: [],                                       // dismiss-only via <ToastClose>
});

const message = defineNotificationType<{ threadId: string; from: string; text: string }>({
  type: 'message',
  validate: (raw) => raw as any,
  renderer: MessageRow,
  defaultPriority: Priority.normal,
  toastPolicy: 'by-priority',                        // normal < high floor => inbox-only (badge), no toast
  grouping: (p) => `dm:${p.threadId}`,               // collapse same thread => "+N more"
  contentHash: (p) => `${p.threadId}:${p.text}`,
  actions: [
    { id: 'reply',     label: 'Reply',     altText: 'Reply to message' },
    { id: 'mark-read', label: 'Mark read', altText: 'Mark message read',
      optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' },
  ],
});

registerNotificationTypes([friendRequest, systemAlert, message]);   // tree-shakeable: only imported types ship

7. Presentation bridge

7.1 <Toaster> — drives the EXISTING toast primitive (consumer-owned list per toast/demo.vue)

<!-- vue/primitives/src/toaster/Toaster.vue -->
<script setup lang="ts">
import { ToastProvider, ToastRoot, ToastAction, ToastClose, ToastViewport } from '@robonen/primitives';
import { useNotifications, useNotificationRegistry, Priority } from '@robonen/vue';

const props = withDefaults(defineProps<{ duration?: number; swipeDirection?: 'up'|'down'|'left'|'right' }>(),
  { duration: 6000, swipeDirection: 'right' });

const store = useNotifications();
const registry = useNotificationRegistry();
const resolve = (type: string) => registry.resolve(type)!;
const durationFor = (n: { type: string; priority: number }) =>
  resolve(n.type).ttl ?? (n.priority >= Priority.high ? 10_000 : props.duration);
</script>

<template>
  <ToastProvider :duration="props.duration" :swipe-direction="props.swipeDirection">
    <ToastRoot
      v-for="n in store.toasting.value"
      :key="n.id"
      v-model:open="store.openModel(n.id).value"
      :duration="durationFor(n)"
      :type="n.priority >= Priority.critical ? 'foreground' : 'background'"
      to-viewport
      @update:open="(open) => { if (!open) store.dismissToast(n.id); }"
    >
      <component
        :is="resolve(n.type).renderer"
        :notification="n" surface="toast"
        :run-action="(a: string) => store.runAction(n.id, a)"
        :dismiss="() => store.dismissToast(n.id)"
      />
      <ToastAction
        v-for="a in resolve(n.type).actions ?? []"
        :key="a.id" :alt-text="a.altText"
        @click="store.runAction(n.id, a.id)"
      >{{ typeof a.label === 'function' ? a.label(n) : a.label }}</ToastAction>
      <ToastClose aria-label="Dismiss" />
    </ToastRoot>
    <ToastViewport class="fixed bottom-4 right-4 flex flex-col gap-2" />
  </ToastProvider>
</template>

The store owns the list + remove/dismissToast; the primitive stays consumer-driven exactly as its demo.vue requires. We do not fork or extend vue/primitives/src/toast. Auto-dismiss (the primitive sets open=false) routes through @update:opendismissToast (toast→inbox, never deleted).

7.2 <NotificationCenter> — headless inbox, same renderer (surface='inbox')

<!-- vue/primitives/src/notification-center/NotificationCenterItem.vue -->
<script setup lang="ts">
import { useNotificationCenterContext } from './context';
import { useNotificationRegistry } from '@robonen/vue';
const props = defineProps<{ group: NotificationGroup }>();
const ctx = useNotificationCenterContext();
const registry = useNotificationRegistry();
const latest = props.group.latest;
</script>

<template>
  <li :data-state="latest.readReceiptAt ? 'read' : 'unread'" class="nc-item">
    <component
      :is="registry.resolve(latest.type)!.renderer"
      :notification="latest" surface="inbox"
      :run-action="(a: string) => ctx.runAction(latest.id, a)"
      :dismiss="() => ctx.archive(latest.id)"
    />
    <span v-if="group.count > 1" class="nc-more">+{{ group.count - 1 }} more</span>
  </li>
</template>

Priority + policy decides toast-vs-inbox entirely inside registry.shouldToast(n): 'always'→toast; 'never'→inbox-only; 'by-priority'n.priority >= toastFloor (default high). Every notification lands in the inbox (durable Deque); only shouldToast ones also enter the surfacing PriorityQueue. critical renders type='foreground' (assertive aria-live) and interrupts the lowest visible toast.


8. Public API & usage

// vue/toolkit/src/composables/notifications/useNotifications/index.ts
export interface UseNotificationsReturn {
  entities: Readonly<ShallowRef<Map<string, Notification>>>;
  toasting: ComputedRef<Notification[]>;
  groups: ComputedRef<NotificationGroup[]>;
  unreadCount: ComputedRef<number>;
  byPriority: ComputedRef<Record<NotificationPriority, Notification[]>>;
  byType: ComputedRef<Record<string, Notification[]>>;
  connectionState: Readonly<ShallowRef<RealtimeState>>;
  notify: <P>(input: { type: string; payload: P; priority?: NotificationPriority }) => string;  // local -> SAME funnel
  toast: ToastSugar;                  // toast.success/error/dismiss -> built-in '__toast' type
  markRead: (id: string) => void; markAllRead: () => void;
  dismissToast: (id: string) => void; archive: (id: string) => void;
  runAction: (id: string, actionId: string) => Promise<void>;
  openModel: (id: string) => Ref<boolean>;
}
export function useNotifications(): UseNotificationsReturn;          // full surface
export function useNotificationCenter(): UseNotificationCenterReturn; // inbox/groups/filter/markAllRead
export function useUnreadCount(): Readonly<ComputedRef<number>>;     // cheap badge — does NOT pull the whole store

End-to-end (a Nuxt plugin / app entry):

// plugins/notifications.client.ts
import { createNotificationRegistry, appProvideNotificationRegistry, registerNotificationTypes,
         useSharedRealtime } from '@robonen/vue';
import { createWebSocketTransport } from '@robonen/realtime';
import { authPlugin, jsonCodec, heartbeatPlugin, resumePlugin, loggerPlugin } from '@robonen/realtime/plugins';

export default defineNuxtPlugin((nuxtApp) => {
  const registry = createNotificationRegistry();
  registerNotificationTypes([friendRequest, systemAlert, message]);   // bootstrap, once

  // CORRECTED: use appProvide at the plugin layer (component-level `provide` only works inside setup)
  appProvideNotificationRegistry(nuxtApp.vueApp)(registry);

  useSharedRealtime({
    transport: createWebSocketTransport(),
    url: () => `wss://api.example.com/notifications?u=${useUserId().value}`,
    plugins: [authPlugin(getToken), jsonCodec, heartbeatPlugin(), resumePlugin(), loggerPlugin(console.log)],
    backoff: { base: 500, cap: 30_000, jitter: true },
  });
});
<!-- App.vue : mount presentation ONCE near the root -->
<template>
  <NuxtPage />
  <ClientOnly><Toaster /></ClientOnly>           <!-- client-only: no socket/badge during SSR -->
</template>

<!-- HeaderBell.vue : cheap badge, rendered only after mount to avoid hydration mismatch -->
<script setup lang="ts">
import { useUnreadCount } from '@robonen/vue';
const unread = useUnreadCount();
const mounted = ref(false); onMounted(() => mounted.value = true);
</script>
<template><button>🔔 <span v-if="mounted && unread.value">{{ unread.value }}</span></button></template>
// anywhere: local notification + imperative toast (SAME ingest funnel)
const { notify, toast, runAction } = useNotifications();
notify({ type: 'message', payload: { threadId: 't1', from: 'Ana', text: 'hi' } });
toast.success('Saved', { description: 'Your changes are live.' });
toast.error('Upload failed', { actions: [{ id: 'retry', label: 'Retry', altText: 'Retry upload' }] });
await runAction(friendReqId, 'accept');           // optimistic -> WS action -> ack commit / timeout rollback

9. SSR, testing, tree-shaking

SSR. core/realtime imports nothing from Vue and touches WebSocket/BroadcastChannel/timers only after connect(). useRealtime returns ssrNoopRealtime() when !isClient. useStorage defers with initOnMounted. useSharedRealtime (createSharedComposable) returns a fresh raw instance on the server. The unread badge renders only after onMounted. <Toaster> lives under <ClientOnly>.

Testing (house style: describe(fnName), effectScope() disposal, MockTransport, fake registry):

// store.test.ts
describe('createNotificationStore', () => {
  it('surfacing interrupt evicts lowest toast back to inbox, never deletes', () => {
    const scope = effectScope();
    scope.run(() => {
      const store = createNotificationStore({ registry: fakeRegistry, maxToasts: 1 });
      store.commit(ingest(frame('a', { priority: Priority.low })), 'ingest');
      store.commit(ingest(frame('b', { priority: Priority.high })), 'ingest');   // interrupts
      expect(store.toasting.value.map(n => n.id)).toEqual(['b']);
      expect(store.entities.value.get('a')?.lifecycle).toBe('queued');           // demoted, still present
    });
    scope.stop();
  });

  it('hydration-commit never toasts (cross-tab no-re-toast)', () => {
    const store = createNotificationStore({ registry: fakeRegistry });
    store.commit(ingest(frame('x', { priority: Priority.critical })), 'hydration');
    expect(store.toasting.value).toHaveLength(0);                                // inbox only
    expect(store.unreadCount.value).toBe(1);
  });

  it('two-key dedup folds identical content into meta.count', () => { /* same contentHash, distinct id -> count=2 */ });
  it('disposes timers/subscriptions on scope stop', () => { /* spy + effectScope().stop() */ });
});

// machine.test.ts — drive via await machine.send(event)
describe('buildLifecycleMachine', () => {
  it('NET_OFFLINE pauses retries without scheduling RETRY', async () => {
    const m = buildLifecycleMachine(depsOffline);
    await m.send('CONNECT'); await m.send('OPENED'); await m.send('NET_OFFLINE');
    expect(depsOffline.scheduleRetry).not.toHaveBeenCalled();
  });
  it('cursor advances ONLY on contiguous delivery', () => {
    const rb = createReorderBuffer({});
    expect(rb.accept(frame('a', { seq: 1 })).map(f => f.seq)).toEqual([1]); expect(rb.cursor).toBe('1');
    expect(rb.accept(frame('c', { seq: 3 }))).toEqual([]);                  expect(rb.cursor).toBe('1'); // gap held
    expect(rb.accept(frame('b', { seq: 2 })).map(f => f.seq)).toEqual([2, 3]); expect(rb.cursor).toBe('3'); // drains
  });
  it('drops duplicates by id and seq<=lastSeq', () => { /* idempotent at-least-once */ });
});

Plus explicit tests for seq-gap-then-reconnect (assert no loss/dup), storage-event toast suppression, bounded-LRU dedup eviction, and leader handoff on abrupt tab close.

Tree-shaking. Per-category barrels (composables/notifications/index.tscomposables/index.tssrc/index.ts). Registry descriptors are side-effect-free; only the defineNotificationTypes a consumer imports + register()s ship. Realtime plugins are separate modules under @robonen/realtime/plugins. useUnreadCount imports only the badge selector.


10. Build order / milestones

# Milestone Deliverables Gate
M1 Transport core skeleton core/realtime: Transport iface, createWebSocketTransport, envelope.ts, MockTransport. Connect/send/recv against a mock; JSON round-trip.
M2 Lifecycle + reliability machine.ts, backoff.ts (jitter, gated), heartbeat/resume plugins, reorder.ts (contiguous cursor + gap-timeout + LruSet), outbound.ts (AsyncPool+Deque+ack by clientToken). machine/reorder/backoff tests green; seq-gap-reconnect asserts no loss.
M3 Plugin pipeline + 2nd transport plugin.ts 1:1 with core/fetch; auth/codec/logger/compression; ship createSseTransport. SSE swaps in with zero upper-layer change.
M4 Registry + ingest funnel registry.ts, defineNotificationType, ingest.ts (pure, catches validate throws → onReject), contentHash. One funnel for WS + local; malformed payload never crashes the pump.
M5 Domain store store.ts, surfacing.ts (victim-min PQ + guarded enqueue + ensureCapacity), selectors.ts (explicit sort), pure-reducer lifecycle, two-key dedup. Interrupt/eviction/dedup/disposal tests green.
M6 Persistence + cross-tab persistence.ts (useStorage map, merge lattice, toastedIds), hydration-commit separation, optional useTabLeader+broadcastedRef. Storage-event suppression + leader-handoff tests green.
M7 Presentation <Toaster> (drives existing toast), <NotificationCenter> primitive, shared surface-prop renderers; useNotifications/useNotificationCenter/useUnreadCount; notify()/toast.*. Optimistic ack/rollback + a11y verified; the 3 demo types work end-to-end.

Hardest risks & mitigations

  1. Resume gap / cursor correctness. Cursor advances only on contiguous delivery; gap-timeout deliver-with-gap + a server reconciliation request for permanently-lost ranges. Test: inject gaps + reconnect, assert no loss/dup.
  2. Zombie connections (TCP half-open). Watchdog cleared by any inbound frame, armed whenever connected; conservative configurable ping 20s/watchdog 10s.
  3. PriorityQueue is a min-heap. The surfacing PQ comparator makes the eviction victim the minimum (lowest priority, then oldest); viewport order is a separate display sort. surface() peek/isFull-guards then dequeue-then-enqueue; never toArray() for display.
  4. Cross-tab double-toast. commit() separates ingest (may toast) from hydration (never); per-tab non-persisted toastedIds; useTabLeader (Web Locks) for the optional single socket; never useEventBus for cross-tab; never a hand-rolled lease.
  5. Singleton arg-dropping. Inbox/registry via useAppSharedState (options bound once); socket via createSharedComposable (disposable, raw-on-SSR).
  6. Optimistic double-apply. Idempotent server commands keyed by clientToken (the one canonical correlation key); ack-after-timeout ignored via token compare; do not rollback on mid-action socket drop; eviction skips pendingAction/surfaced.
  7. Persistence schema. Persist only committed lifecycle + read/dismiss receipts + cursor (exclude runtime fields); Map serializer; validate-on-hydrate, drop+log unknown-type entities. Whole-Map LWW is the multi-tab durability limit — opt into leader single-writer for strictness.
  8. Durable-Deque overflow. pushBack throws at maxSize, so ensureCapacity() runs before every insert and force-evicts the oldest non-active entity if all candidates are protected.
  9. SSR hydration mismatch. Badge/connection UI render only after mount; <Toaster> under <ClientOnly>; isClient guards throughout.

Appendix — review corrections folded into this document

This design was adversarially verified against the real repo source. Corrections already applied above:

  1. PriorityQueue orientation — it's a min-heap; the surfacing comparator now makes the eviction victim the minimum, and the viewport uses a separate display sort (§5.2, §5.3).
  2. Registry provisioning — the Nuxt plugin uses appProvide(nuxtApp.vueApp) instead of component-level provide (§6, §8).
  3. Bounded LRU dedupseen is a real bounded LruSet, not an unbounded Set (§4.5).
  4. BackoffbackoffDelay is bespoke and does not reuse stdlib retry() (§4.3).
  5. Ack correlation — one canonical key (clientToken) end-to-end; AckFrame.ref echoes it (§4.1, §4.6, §5.5).
  6. Network boundary — the Vue useRealtime wrapper watches useNetwork().isOnline and dispatches NET_OFFLINE/NET_ONLINE; core/realtime never imports Vue (§4.7).
  7. Durable overflow — explicit ensureCapacity() before insert (the Deque throws when full) (§5.2).
  8. Server contract — the monotonic-seq / replay / reconcile / token-echo requirements are stated explicitly (§4.1).