# Notification System Architecture WebSocket-driven, type- and priority-aware notifications for the `@robonen` monorepo. > Status: **design proposal** (not yet implemented). Synthesized from a multi-agent design > pass and adversarially verified against the real repo APIs. The "Verified facts" call-outs > below are confirmed against source; the corrections from review are already folded into the > code in this document. --- ## 0. Context — what already exists The **presentation layer already exists**: a full sonner-style headless toast primitive lives at [vue/primitives/src/toast/](../../vue/primitives/src/toast/) — `ToastProvider`, `ToastRoot`, `ToastViewport`, `ToastTitle/Description/Action/Close/Announce/Portal`, with per-toast auto-dismiss (pause/resume RAF timer), swipe-to-dismiss, teleport-to-viewport, a11y live-region announce, collection-based stacking, and controlled/uncontrolled `v-model:open`. **We drive it, we do not fork it.** The data-layer building blocks also exist and are reused verbatim: | Need | Reuse | Path | |---|---|---| | Bounded priority heap | `PriorityQueue` (min-heap) | `core/stdlib/src/structs/PriorityQueue` | | Bounded FIFO/reorder | `Deque` | `core/stdlib/src/structs/Deque` | | Concurrency-limited drain | `AsyncPool` | `core/stdlib/src/async/pool` | | Connection state machine | `createAsyncMachine` | `core/stdlib/src/patterns/behavioral/StateMachine/async` | | Plugin/middleware model | `composePlugins` / `definePlugin` | `core/fetch/src/plugin.ts` | | In-process events | `createEventHook` | `vue/toolkit` | | Reactive cross-tab storage | `useStorage` (write-lock + storage events) | `vue/toolkit` | | Online/offline | `useNetwork` | `vue/toolkit` | | Tab leader (Web Locks) | `useTabLeader` | `vue/toolkit` | | Cross-tab ref (BroadcastChannel) | `broadcastedRef` | `vue/toolkit` | | Typed provide/inject | `useContextFactory` | `vue/toolkit` | | Disposable singleton | `createSharedComposable` | `vue/toolkit` | | App-lifetime singleton | `useAppSharedState` | `vue/toolkit` | | Causal-ordering reference | `Replica` (Lamport/version-vector) | `core/crdt/src/doc/replica.ts` | **There is no WebSocket / SSE / streaming code anywhere** — the transport is greenfield. --- ## 1. TL;DR & key decisions A **four-layer, transport-agnostic** system: 1. **L1 `core/realtime`** (greenfield, framework-free) owns the socket: a `createAsyncMachine` lifecycle, a `core/fetch`-cloned plugin/middleware pipeline, full-jitter reconnect *gated* on `useNetwork`, a heartbeat watchdog cleared by *any* inbound frame, a scalar resume cursor that advances **only on contiguous delivery**, a `Deque`+`AsyncPool` outbound queue with a shared `AbortSignal`, and a swappable `Transport` interface shipped with **both** WebSocket and SSE. 2. **L2 `ingest()`** — a single pure funnel that validates *every* notification (WS-pushed and locally-issued `notify()`/`toast.*` alike) through the **type registry**, producing a normalized `Notification` entity. 3. **L3 domain store** — inbox via `useAppSharedState`, socket-owning client via `createSharedComposable`; a `shallowRef` + version-counter with a synchronous pure-reducer lifecycle, two-key (id + contentHash) dedup, a small **surfacing-budget `PriorityQueue`** distinct from a large **durable-capacity `Deque`**, derived selectors, optimistic actions with token-correlated ack/rollback, and `useStorage` cross-tab sync where `commit()` structurally separates *ingest-commit* (may toast) from *hydration-commit* (never toasts). 4. **L4 presentation** — a `` that drives the existing toast primitive (`v-for` over the bounded toast slice) plus a headless ``, both reusing **one** per-type renderer via a `surface: 'toast' | 'inbox'` prop. | Decision | Choice | Why | |---|---|---| | Transport home | greenfield `core/realtime` (no Vue import) | Framework-free → trivially SSR-excluded, headless-testable, mirrors the `core/fetch` boundary. Thin `useRealtime` wrapper in `vue/toolkit`. | | Connection lifecycle | one `createAsyncMachine` instance | Genuine async side-effects per transition. `NET_OFFLINE`/`NET_ONLINE` are first-class events that **pause** retry scheduling. | | Entity lifecycle | synchronous **pure reducer** + legal-edge table | `createAsyncMachine.send` is async + per-instance stateful; hundreds of entities each with async hot-path transitions is wrong. | | Plugin system | cloned **1:1** from `core/fetch` (`execute` field, `composeRealtimePlugins(plugins, userDefaults)`, `setup({defaults})`) | Team knows the model; preserves `EMPTY_HOOKS` fast path. **Do not rename `execute`→`transform`.** | | Resume cursor | scalar `seq`, advances **only on contiguous delivery** | Single-writer stream collapses the version-vector to one scalar. Advancing on a gapped/buffered frame silently skips messages on next resume. | | Reorder | bounded `Deque` + **gap-timeout** flush | Strict in-order release head-of-line-stalls the stream on permanent loss; the timeout trades completeness for liveness + triggers server reconciliation. | | Dedup | **two keys**: server `id` + `contentHash` (FNV-1a) | id makes at-least-once idempotent; contentHash folds retransmits + identical-content distinct-id events into `meta.count` for `+N more`. | | Capacity | **two** independent bounds | Tiny surfacing budget `PriorityQueue` (~3) vs large durable `Deque` (~300). Conflating them spams toasts or drops history. | | `PriorityQueue` orientation | **min-heap victim comparator** + separate display sort | `enqueue` **throws `RangeError`** when full; `peek()`/`dequeue()` return the comparator **minimum**. The PQ comparator makes the *eviction victim* the minimum; the viewport order is a **separate** explicit sort. | | Store singleton | inbox+registry via `useAppSharedState`; socket via `createSharedComposable` | `createSharedComposable` **drops constructor args after first call** → never wrap an options-taking store in it. Socket must dispose on last unmount + be raw-on-SSR. | | Cross-tab no-re-toast | `commit()` separates ingest-commit vs hydration-commit; per-tab **non-persisted** `toastedIds` Set | Correct without leader election. `useEventBus` is **same-document only** and cannot dedup across tabs. | | Leader (optional) | existing **`useTabLeader`** (Web Locks) + **`broadcastedRef`** fan-out | `navigator.locks` auto-releases on tab crash (no stale-lease/split-brain). **Do not hand-roll** a `useStorage` lease coordinator (layering inversion + race). | | Imperative API | single `ingest()` funnel; `toast.success` = sugar over a built-in `__toast` type | Identical validate/priority/group/lifecycle regardless of source; one code path to test. | | Presentation | drive the **existing** Toast primitive; one renderer for both surfaces | Consumer-owned list + `@update:open` removal exactly per `toast/demo.vue`. `surface` prop kills duplication. | --- ## 2. Architecture diagram ``` ┌─────────────────────────────────────────────────────────────┐ │ WS SERVER (single totally-ordered stream per user) │ └───────▲───────────────────────────────────────────┬──────────┘ outbound (acks, │ ResumeFrame{cursor} on (re)connect │ inbound frames ActionFrame, │ │ (notification | hello | ack | pong) ResumeFrame, ping) │ ▼ ┌───────────────────────────┴─────────────────────────────────────────────────────────────────┐ │ L1 core/realtime (framework-free) │ │ Transport iface ── createWebSocketTransport | createSseTransport (swap seam, both shipped) │ │ │ onMessage/onOpen/onClose/onError │ │ ▼ │ │ RealtimeClient ── createAsyncMachine: idle→connecting→connected→reconnecting→error→closed │ │ ├─ plugins (composeRealtimePlugins): auth · codec(JSON) · heartbeat · resume · log · gzip │ │ ├─ heartbeat watchdog (cleared by ANY inbound frame) ── useNetwork-gated backoff* │ │ ├─ reorder Deque (causal-hold) + gap-timeout ── cursor advances ONLY on contiguous │ │ ├─ dedup-by-id bounded LRU │ │ └─ outbound: Deque(bounded) ─drain→ AsyncPool(shared AbortSignal) ──┐ send() │ │ onNotification(env) ─────────────────────────────────────┐ │ │ │ onControl(ack) ─────────────────────────────┐ │ │ ack→commit/rollback │ └────────────────────────────────────────────────┼───────────┼──────────────┼────────────────────┘ (* useNetwork.isOnline is watched in the Vue useRealtime wrapper, which dispatches NET_OFFLINE/NET_ONLINE into machine.send — core/realtime never imports Vue.) │ │ │ local notify()/toast.* ────────────┐ │ ▼ │ ▼ │ ┌──────────────────┐ │ ┌──────────────────────▼┐ │ L2 ingest() │ │ │ ONE ingest() funnel │ │ (pure) │ │ │ registry.resolve → │◄┘ validate→build │ │ │ validate → normalize → │ IngestResult │ │ │ contentHash → dedup │ │ │ └───────────┬────────────┘ │ │ │ commit(result, source) │ │ ▼ │ │ ┌──────────────────────────────────────────────────────────────────────┐ │ │ │ L3 domain store (vue/toolkit) │ │ │ │ shallowRef> + version-counter │ │ │ │ pure-reducer lifecycle (legal-edge table) │ │ │ │ surfacing PriorityQueue(~3, victim=min) ◄─ interrupt ─► durable Deque(~300) │ │ selectors: unreadCount · byPriority · byType · groups · toasting │─┘ │ │ optimistic: clientToken ─send→ ───────────────────────────────────────────┘ │ useStorage(map) cross-tab ── commit(): ingest=may-toast / hydrate=never │ optional useTabLeader + broadcastedRef (leader holds socket) │ └───────────┬───────────────────────────────────────────────┬──────────┘ │ store.toasting (bounded, sorted selector) │ store.groups / unreadCount ▼ ▼ ┌────────────────────────────────┐ ┌──────────────────────────────────────┐ │ (vue/primitives) │ │ (headless) │ │ ToastProvider │ │ Root ctx via useContextFactory │ │ └ ToastRoot v-for (EXISTING │ │ CollectionItem-ordered groups │ │ toast primitive, unforked) │ shared │ data-state hooks │ │ v-model:open │── per-type ┤ │ │ @update:open → dismissToast│ renderer │ │ │ │ │ │ │ └ ToastViewport │ │ │ └────────────────────────────────┘ └──────────────────────────────────────┘ ``` --- ## 3. File & package layout ``` core/realtime/ # NEW greenfield @robonen/realtime (framework-free) package.json tsdown.config.ts README.md src/ index.ts # barrel types.ts # Envelope union, RealtimeState/Event, Transport, options client.ts # createRealtimeClient — machine + pool + plugins + reorder machine.ts # buildLifecycleMachine() -> createAsyncMachine(...) envelope.ts # encode/decode (JSON default) + type guards reorder.ts # seq reorder Deque + bounded-LRU dedup + contiguous-cursor + gap-timeout backoff.ts # bespoke backoffDelay(attempt, opts) — full jitter (NOT stdlib retry) outbound.ts # bounded Deque + AsyncPool drainer + ack correlation (clientToken) lru.ts # bounded LruSet for dedup plugin.ts # defineRealtimePlugin / composeRealtimePlugins (mirror core/fetch) transports/{websocket.ts, sse.ts} plugins/{auth,codec,heartbeat,resume,logger,compression}.ts __mocks__/mockTransport.ts *.test.ts vue/toolkit/src/composables/realtime/useRealtime/{index.ts, index.test.ts} # NEW: thin Vue wrapper + useNetwork wiring vue/toolkit/src/composables/notifications/ # NEW category — domain + facade index.ts types.ts registry.ts ingest.ts store.ts surfacing.ts selectors.ts persistence.ts actions.ts useNotifications/index.ts useNotificationCenter/index.ts useUnreadCount/index.ts __tests__/*.test.ts vue/primitives/src/toaster/{Toaster.vue, index.ts, demo.vue} # NEW: drives existing toast/ vue/primitives/src/notification-center/ # NEW headless inbox primitive context.ts NotificationCenterRoot.vue NotificationCenterList.vue NotificationCenterItem.vue NotificationCenterGroup.vue NotificationCenterBadge.vue NotificationCenterEmpty.vue index.ts demo.vue __test__/NotificationCenter.test.ts # REUSED UNCHANGED: vue/primitives/src/toast/*, core/stdlib structs/async, core/fetch plugin model, # vue/toolkit (createEventHook/useStorage/useNetwork/useTabLeader/broadcastedRef/ # createSharedComposable/useAppSharedState/useContextFactory), @robonen/platform/{multi,browsers} ``` --- ## 4. Transport layer (`core/realtime`) ### 4.1 Wire envelope protocol ```ts // core/realtime/src/types.ts export interface BaseEnvelope { v: 1; id: string; ts: number } // ── inbound DATA plane ── export type NotificationFrame = BaseEnvelope & { kind: 'notification'; type: string; // registry key seq: number; // strictly-monotonic per session (server-assigned) priority?: NotificationPriority; groupKey?: string; payload: unknown; // validated downstream by the registry }; // ── CONTROL plane ── export type HelloFrame = BaseEnvelope & { kind: 'hello'; sessionId: string; cursor?: string }; export type AckFrame = BaseEnvelope & { kind: 'ack'; ref: string; ok: boolean; error?: string }; // ref = correlation token (see §5.5) export type HeartbeatFrame = BaseEnvelope & { kind: 'ping' | 'pong' }; export type ResumeFrame = BaseEnvelope & { kind: 'resume'; cursor?: string }; export type ActionFrame = BaseEnvelope & { kind: 'action'; ref: string; type: string; action: string; clientToken: string; payload?: unknown }; export type InboundEnvelope = NotificationFrame | HelloFrame | AckFrame | HeartbeatFrame; export type OutboundEnvelope = ResumeFrame | AckFrame | HeartbeatFrame | ActionFrame; export type ControlFrame = HelloFrame | AckFrame | HeartbeatFrame; export type RealtimeState = 'idle' | 'connecting' | 'connected' | 'reconnecting' | 'error' | 'closed'; export type RealtimeEvent = | 'CONNECT' | 'OPENED' | 'CONNECTED' | 'DROPPED' | 'RETRY' | 'GIVE_UP' | 'CLOSE' | 'NET_OFFLINE' | 'NET_ONLINE'; export interface Transport { connect(url: string, signal: AbortSignal): Promise; send(raw: string | ArrayBuffer): void; close(code?: number, reason?: string): void; readonly onOpen: (cb: () => void) => () => void; readonly onMessage: (cb: (raw: unknown) => void) => () => void; readonly onClose: (cb: (e: { code: number; reason: string; wasClean: boolean }) => void) => () => void; readonly onError: (cb: (err: unknown) => void) => () => void; } ``` > **Server contract this layer depends on** (new, server-side, not part of any existing repo API): > (1) strictly-monotonic per-session server-assigned `seq`; (2) on `ResumeFrame{cursor}` the server > replays every frame with `seq > cursor`; (3) a reconciliation endpoint/command to re-request a > permanently-lost `[from, to]` range flushed by the gap-timeout; (4) action commands are idempotent, > keyed by `clientToken`, and the server echoes that token in `AckFrame.ref`. ### 4.2 Plugin system — cloned 1:1 from `core/fetch` **Verified:** `core/fetch` uses `composePlugins(plugins, userDefaults)`, an `execute` onion-middleware field, `setup({defaults})` run after the merge, and a frozen `EMPTY_HOOKS` fast path. We mirror this exactly — the field is **`execute`**, *not* `transform`. ```ts // core/realtime/src/plugin.ts export interface RealtimeContext { state: RealtimeState; attempt: number; cursor: string | undefined; sessionId: string | undefined; aborter: AbortController; } export type RealtimeHook = (context: C) => void | Promise; export interface RealtimeHooks { onConnect?: RealtimeHook<{ ctx: RealtimeContext; url: string }> | RealtimeHook<{ ctx: RealtimeContext; url: string }>[]; onMessage?: RealtimeHook<{ raw: unknown; env?: InboundEnvelope }> | RealtimeHook<{ raw: unknown; env?: InboundEnvelope }>[]; // DECODE here (set env) onSend?: RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }> | RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }>[]; // ENCODE here (set raw) onClose?: RealtimeHook<{ code: number; reason: string }> | RealtimeHook<{ code: number; reason: string }>[]; onError?: RealtimeHook<{ error: unknown }> | RealtimeHook<{ error: unknown }>[]; } export type RealtimeSendMiddleware = (ctx: { env: OutboundEnvelope; raw?: unknown }, next: () => Promise) => Promise; export interface RealtimePlugin { readonly name: Name; readonly defaults?: Partial; readonly hooks?: RealtimeHooks; readonly execute?: RealtimeSendMiddleware; // onion around each socket write readonly setup?: (ctx: { readonly defaults: RealtimeClientOptions }) => void; } export function defineRealtimePlugin(p: RealtimePlugin): RealtimePlugin { return p; } // flattens per-phase hook arrays + onion-composes `execute`, identical structure to composePlugins export function composeRealtimePlugins( plugins: readonly RealtimePlugin[] | undefined, userDefaults: Partial | undefined, ): ComposedRealtimePlugins { /* mirror of core/fetch composePlugins; EMPTY_HOOKS fast path */ } ``` Shipped tree-shakeable plugins (canonical order asserted in `setup`: **codec encodes before compression**): ```ts // core/realtime/src/plugins/codec.ts — JSON default (core/encoding has NO generic codec, verified) export const jsonCodec = defineRealtimePlugin({ name: 'codec', hooks: { onMessage: ({ raw }) => { /* env = JSON.parse(raw as string); shape-check */ }, onSend: (c) => { c.raw = JSON.stringify(c.env); }, }, }); // auth.ts (onConnect: append token subprotocol/query; refresh on error) // heartbeat.ts (owns ping interval + watchdog) resume.ts (persists & injects cursor) // logger.ts (WeakMap timing like the fetch logger) compression.ts (execute middleware) ``` ### 4.3 Connection lifecycle — `createAsyncMachine` **Verified** ordering inside `send()`: `guard → action → exit → set currentState → entry`, all awaited; one action per transition, one entry/exit per state; the initial state's `entry` does **not** auto-run on construction. `NET_OFFLINE`/`NET_ONLINE` are first-class events that pause/resume retry scheduling instead of burning the attempt counter. ```ts // core/realtime/src/machine.ts export function buildLifecycleMachine(deps: MachineDeps) { return createAsyncMachine({ initial: 'idle', context: { state: 'idle', attempt: 0, cursor: undefined, sessionId: undefined, aborter: new AbortController() } as RealtimeContext, states: { idle: { on: { CONNECT: 'connecting' } }, connecting: { entry: async (ctx) => { ctx.aborter = new AbortController(); await deps.transport.connect(deps.url(), ctx.aborter.signal); }, on: { OPENED: { target: 'connected', action: async (ctx) => { await deps.send({ kind: 'resume', cursor: ctx.cursor, v: 1, id: deps.id(), ts: deps.now() }); } }, DROPPED: { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() }, CLOSE: 'closed', }, }, connected: { entry: () => { deps.startHeartbeat(); deps.flushOutbound(); }, // deterministic side-effects exit: () => { deps.stopHeartbeat(); }, on: { DROPPED: { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() }, NET_OFFLINE: { target: 'reconnecting', action: () => deps.pauseRetries() }, // sit, do NOT schedule RETRY CLOSE: 'closed', }, }, reconnecting: { entry: (ctx) => { if (deps.isOnline()) deps.scheduleRetry(ctx.attempt); }, // backoffDelay(attempt) then RETRY on: { RETRY: { target: 'connecting', action: (ctx) => { ctx.attempt++; } }, NET_ONLINE: { target: 'connecting', action: (ctx) => { ctx.attempt = 0; } }, // reset + immediate reconnect GIVE_UP: { target: 'error', guard: () => !deps.canRetry() }, CLOSE: 'closed', }, }, error: { on: { CONNECT: 'connecting' } }, closed: { entry: (ctx) => { ctx.aborter.abort(); deps.transport.close(1000); } }, }, }); } ``` `backoff.ts` is a **bespoke standalone** function (it does *not* call stdlib `retry()`, which loops attempts internally and cannot drive an event-driven, network-gated, machine-scheduled reconnect). It borrows only the *shape* of a `delay: (n) => number` function: ```ts export function backoffDelay(attempt: number, o: { base?: number; cap?: number; jitter?: boolean } = {}): number { const base = o.base ?? 500, cap = o.cap ?? 30_000; const raw = Math.min(cap, base * 2 ** attempt); return o.jitter === false ? raw : raw * (0.5 + Math.random() * 0.5); // full jitter } // scheduleRetry(attempt): a no-op while !isOnline(); else setTimeout(backoffDelay(attempt)) -> machine.send('RETRY'). ``` ### 4.4 Heartbeat / zombie detection `heartbeatPlugin` arms a watchdog whenever `connected`. **Any inbound frame** (pong *or* data) clears it; on fire → `machine.send('DROPPED')`. This is the only reliable TCP-half-open detector. Conservative configurable defaults (`ping 20s` / `watchdog 10s`). ### 4.5 Ordering / dedup / resume ```ts // core/realtime/src/lru.ts — bounded dedup set (prevents an unbounded memory leak over a long session) export class LruSet { private map = new Map(); constructor(private max = 4096) {} has(k: string) { return this.map.has(k); } add(k: string) { if (this.map.has(k)) this.map.delete(k); // bump to MRU this.map.set(k, true); if (this.map.size > this.max) this.map.delete(this.map.keys().next().value!); // evict LRU } } // core/realtime/src/reorder.ts export function createReorderBuffer(opts: { gapTimeoutMs?: number; onReconcile?: (from: number, to: number) => void }) { const buffer = new Deque([], { maxSize: 1024 }); // out-of-order hold (causal-hold; ref: crdt replica.ts) const seen = new LruSet(8192); // BOUNDED dedup-by-id let lastSeq = 0; let cursor: string | undefined; // = String(lastSeq), persisted function accept(f: NotificationFrame): NotificationFrame[] { if (seen.has(f.id) || f.seq <= lastSeq) return []; // duplicate / replay -> drop (idempotent at-least-once) seen.add(f.id); if (f.seq === lastSeq + 1) { const out = [f]; lastSeq = f.seq; cursor = String(lastSeq); // CONTIGUOUS — advance cursor ONLY here let next: NotificationFrame | undefined; while ((next = takeSeq(buffer, lastSeq + 1))) { out.push(next); lastSeq = next.seq; cursor = String(lastSeq); } return out; } putBySeq(buffer, f); // gap: hold, DO NOT advance cursor armGapTimeout(() => opts.onReconcile?.(lastSeq + 1, f.seq - 1)); // permanent-loss -> deliver-with-gap + server reconcile return []; } return { accept, get cursor() { return cursor; }, hydrate(c?: string) { if (c) { lastSeq = Number(c); cursor = c; } } }; } ``` Resume: `ResumeFrame{cursor}` is sent on every `connected.entry`; the server replays `seq > cursor`. `resumePlugin` persists the cursor via `useStorage` so it survives a full reload. A **scalar** suffices — a single-writer per-user stream collapses the version-vector to one entry (O(1) resume, small wire). ### 4.6 Outbound queue / backpressure ```ts // core/realtime/src/outbound.ts export function createOutbound(deps: { transport: Transport; signal: AbortSignal; concurrency?: number; max?: number }) { const queue = new Deque([], { maxSize: deps.max ?? 256 }); const pool = new AsyncPool({ concurrency: deps.concurrency ?? 4, signal: deps.signal }); // CLOSE aborts in-flight let connected = false; const acks = new Map void; reject: (e: unknown) => void }>(); // keyed by AckFrame.ref function enqueue(env: OutboundEnvelope) { if (queue.isFull && env.kind !== 'action') queue.popFront(); // drop-oldest non-critical (action frames never dropped) queue.pushBack(env); if (connected) flush(); } function flush() { let env: OutboundEnvelope | undefined; while ((env = queue.popFront())) { const e = env; pool.add(async (signal) => { deps.transport.send(serialize(e)); if (e.kind === 'action') await waitForAck(e.clientToken, acks, { timeout: 8000, signal }); // canonical key = clientToken }); } } function resolveAck(ref: string, ok: boolean, error?: string) { // called from the inbound ack path; ref === clientToken const w = acks.get(ref); if (!w) return; acks.delete(ref); ok ? w.resolve() : w.reject(new Error(error)); } return { enqueue, flush, setConnected: (v: boolean) => { connected = v; if (v) flush(); }, resolveAck, dispose: () => pool.dispose() }; } ``` > `AsyncPool` has **no `pause()`** (verified). We gate by **not adding** while disconnected and abort > in-flight via the shared signal on `CLOSE`. The ack correlation key is **`clientToken`** end-to-end: > the `ActionFrame` carries it, `outbound` blocks `waitForAck(clientToken)`, and the server echoes it in > `AckFrame.ref` — one canonical key, no `ref`/`clientToken` mismatch. ### 4.7 Client surface + single-shared-socket across tabs ```ts // core/realtime/src/client.ts export interface RealtimeClient { readonly state: () => RealtimeState; connect(): void; disconnect(code?: number): void; netOnline(): void; netOffline(): void; // dispatched by the Vue wrapper from useNetwork onNotification(cb: (f: NotificationFrame) => void): () => void; // ordered + deduped onControl(cb: (f: ControlFrame) => void): () => void; onStateChange(cb: (s: RealtimeState) => void): () => void; send(env: OutboundEnvelope): Promise; // queued offline, flushed on connect resumeCursor(): string | undefined; dispose(): void; } export function createRealtimeClient(options: RealtimeClientOptions): RealtimeClient { /* machine + reorder + outbound + plugins */ } ``` The **Vue/non-Vue boundary** is crossed only in the wrapper: `core/realtime` never imports `useNetwork`; the wrapper watches `useNetwork().isOnline` and dispatches into the machine. **Cross-tab single socket** is an *optional optimization* built from the existing primitives — never a hand-rolled lease: ```ts // vue/toolkit/src/composables/realtime/useRealtime/index.ts export function useRealtime(options: UseRealtimeOptions): UseRealtimeReturn { if (!isClient) return ssrNoopRealtime(); // no socket on server const { isOnline } = useNetwork(); const { isLeader } = useTabLeader('robonen:notif-socket'); // Web Locks — atomic, crash-safe failover const client = createRealtimeClient(options); const fanout = broadcastedRef('robonen:notif-inbound', null); // BroadcastChannel — truly cross-tab watch(isOnline, (online) => online ? client.netOnline() : client.netOffline()); // wire NET_ONLINE/NET_OFFLINE watchEffect(() => { if (isLeader.value) { client.connect(); client.onNotification((f) => { fanout.value = f; }); } else { client.disconnect(); /* follower ingests from `fanout`, runs inbox-only */ } }); tryOnScopeDispose(() => client.dispose()); return { /* state, isConnected, connect, disconnect, onNotification, send */ }; } export const useSharedRealtime = createSharedComposable(useRealtime); // ref-counted, disposed on last unmount, raw-on-SSR ``` --- ## 5. Domain / data layer ### 5.1 Entity model + lifecycle ```ts // vue/toolkit/src/composables/notifications/types.ts export type NotificationPriority = 0 | 1 | 2 | 3; // numeric => trivial comparator + interrupt math export const Priority = { low: 0, normal: 1, high: 2, critical: 3 } as const; export type NotificationLifecycle = | 'received' | 'queued' | 'toasted' | 'read' | 'dismissed' | 'archived' | 'expired'; // legal edges — enforced by the synchronous reducer (illegal => throw in dev, no-op + onError in prod) export const LIFECYCLE_EDGES: Record = { received: ['queued', 'toasted', 'read', 'archived'], queued: ['toasted', 'read', 'dismissed', 'archived'], toasted: ['read', 'dismissed'], read: ['archived', 'dismissed'], dismissed: ['archived'], archived: ['expired'], expired: [], }; export interface Notification { readonly id: string; // server id (dedup key #1) OR contentHash-derived for local readonly type: TType; readonly seq: number; // transport seq (ordering); 0 for local readonly payload: TPayload; // registry-validated priority: NotificationPriority; // envelope override ?? registry.defaultPriority lifecycle: NotificationLifecycle; readonly groupKey: string | null; // registry.grouping(payload) readonly contentHash: string; // dedup key #2 (FNV-1a) — folds retransmits / identical content readonly createdAt: number; readonly source: 'ws' | 'local'; readReceiptAt: number | null; dismissedAt: number | null; // runtime-only — NEVER persisted: surfaced: boolean; // currently occupies a toast slot pendingAction: { actionId: string; clientToken: string; prevLifecycle: NotificationLifecycle } | null; actionError: string | null; meta: { count: number; sourceSeqs: number[] }; // count>1 => "+N more" } export interface NotificationGroup { readonly key: string; readonly type: string; readonly priority: NotificationPriority; // max in group readonly notifications: Notification[]; // newest-first readonly count: number; readonly unread: number; readonly latest: Notification; } ``` ### 5.2 The store (singleton split + reactive core) ```ts // vue/toolkit/src/composables/notifications/store.ts export interface NotificationStoreOptions { registry: NotificationRegistry; persistKey?: string; maxToasts?: number; maxEntities?: number; client?: UseRealtimeReturn; } export function createNotificationStore(options: NotificationStoreOptions): NotificationStore { const byId = shallowRef(new Map()); // house-style shallowRef default const version = ref(0); // O(1) selector invalidation const bump = () => { triggerRef(byId); version.value++; }; const toastedIds = new Set(); // per-tab, NEVER persisted // ── SURFACING budget: scarce toast real-estate. ── // VERIFIED: PriorityQueue is a MIN-heap; peek()/dequeue() return the comparator MINIMUM. // So the comparator must make the EVICTION VICTIM the minimum: lowest priority first, then OLDEST first. const cmpEvict = (a: Notification, b: Notification) => (a.priority - b.priority) || (a.createdAt - b.createdAt); // min = lowest priority, then oldest = the victim const surfacing = new PriorityQueue([], { comparator: (x, y) => cmpEvict(byId.value.get(x)!, byId.value.get(y)!), maxSize: options.maxToasts ?? 3, }); // ── DURABLE capacity: memory retention. ── const order = new Deque([], { maxSize: options.maxEntities ?? 300 }); // ── COMMIT: the single seam. `source` decides toast eligibility. ── function commit(r: IngestResult, source: 'ingest' | 'hydration'): void { if (!r.ok) return; // invalid already routed to onReject const incoming = r.notification; const existing = byId.value.get(incoming.id) ?? findByContentHash(byId.value, incoming.contentHash); if (existing) { // DEDUP / cross-tab MERGE mergeLattice(existing, incoming); // max over monotonic lifecycle lattice + per-field LWW existing.meta.count++; existing.meta.sourceSeqs.push(incoming.seq); bump(); return; } ensureCapacity(); // guarantee room BEFORE pushBack (Deque throws when full) byId.value.set(incoming.id, incoming); order.pushBack(incoming.id); // toast ONLY on ingest-commit (NEVER on hydration) and only if not already toasted in this tab if (source === 'ingest' && options.registry.shouldToast(incoming) && !toastedIds.has(incoming.id)) surface(incoming); bump(); } function surface(n: Notification): void { // guarded PriorityQueue use (enqueue THROWS when full) if (surfacing.isFull) { const victimId = surfacing.peek()!; // heap root = most-evictable under cmpEvict const victim = byId.value.get(victimId)!; if (cmpEvict(n, victim) <= 0) return; // newcomer is no better than the victim -> inbox-only surfacing.dequeue(); // interrupt: evict lowest visible toast... transition(victim.id, 'queued'); victim.surfaced = false; // ...back to inbox, NEVER deleted } surfacing.enqueue(n.id); // safe now transition(n.id, 'toasted'); n.surfaced = true; toastedIds.add(n.id); } // VERIFIED: Deque.pushBack throws RangeError at maxSize — never rely on "transient capacity". function ensureCapacity(): void { while (order.isFull) { const victimId = pickDurableVictim(order, byId.value); // expired -> read/archived oldest -> lowest-priority oldest if (!victimId) break; // all protected -> last-resort force-evict oldest non-active below remove(victimId); } } // pickDurableVictim skips surfaced || pendingAction!=null; only if EVERY candidate is active does it // force-evict the oldest non-active entity, guaranteeing pushBack never throws. // ... transition (pure reducer over LIFECYCLE_EDGES), markRead, dismissToast (toast->inbox), archive, remove, runAction return { /* see selectors + actions below */ }; } ``` **Singleton mapping** (the verified-correct split): ```ts // store + registry are app-lifetime (survive route changes) and options are bound ONCE here. const useStoreSingleton = useAppSharedState(createNotificationStore); // NOT createSharedComposable (it drops later args) // the socket-owning client uses createSharedComposable (ref-counted, disposable, raw-on-SSR) — see §4.7. ``` > **Verified:** `createSharedComposable` binds args on the first call and ignores them afterward, and > returns the raw composable on SSR; `useAppSharedState` runs the factory once inside a module > `effectScope(true)` that is **never** stopped (app-lifetime). So: options-bearing inbox/registry → > `useAppSharedState`; disposable socket+timers → `createSharedComposable`. ### 5.3 Derived selectors — viewport order from an **explicit sort**, not heap order ```ts // selectors.ts — VERIFIED: PriorityQueue.toArray()/iteration yields HEAP order, not sorted. export function buildSelectors(byId: ShallowRef>, version: Ref, surfacing: PriorityQueue) { const live = computed(() => (version.value, [...byId.value.values()])); // display order is the INVERSE of the eviction order: highest priority first, then NEWEST first const cmpDisplay = (a: Notification, b: Notification) => (b.priority - a.priority) || (b.createdAt - a.createdAt); const toasting = computed(() => { version.value; return [...surfacing].map((id) => byId.value.get(id)!).filter(Boolean).sort(cmpDisplay); // explicit sort, never heap order }); const unreadCount = computed(() => live.value.filter((n) => n.readReceiptAt === null && n.lifecycle !== 'archived' && n.lifecycle !== 'expired').length); const byPriority = computed(() => groupByKey(live.value, (n) => n.priority)); const byType = computed(() => groupByKey(live.value, (n) => n.type)); const groups = computed(() => collapseByGroupKey(live.value)); // "+N more" return { toasting, unreadCount, byPriority, byType, groups }; } ``` ### 5.4 Persistence + cross-tab (don't re-toast) ```ts // persistence.ts — VERIFIED: useStorage(key, initial, storage, options); storage is a REQUIRED 3rd arg. const persisted = useStorage>( () => options.persistKey ?? 'robonen:notif', new Map(), localStorage, { serializer: StorageSerializers.map, listenToStorageChanges: true, shallow: true, initOnMounted: true }, // SSR-safe defer ); // PersistedNotification EXCLUDES runtime fields (surfaced, pendingAction, actionError) — never resurrect in-flight state. // useStorage write-lock prevents self-feedback; 'storage'/'vuetools-storage' events propagate to other tabs. watch(persisted, (next) => { // remote tab wrote -> hydration-commit (NEVER toasts) for (const p of next.values()) commit(ingestPersisted(p), 'hydration'); }, { deep: false }); ``` `mergeLattice` is the convergence rule for concurrent two-tab edits: ```ts const LIFECYCLE_RANK: Record = { received:0, queued:1, toasted:2, read:3, dismissed:4, archived:5, expired:6 }; function mergeLattice(local: Notification, remote: Notification): void { if (LIFECYCLE_RANK[remote.lifecycle] > LIFECYCLE_RANK[local.lifecycle]) local.lifecycle = remote.lifecycle; // max over lattice local.readReceiptAt = maxNullable(local.readReceiptAt, remote.readReceiptAt); // per-field LWW local.dismissedAt = maxNullable(local.dismissedAt, remote.dismissedAt); } ``` > **Known limitation:** the whole-Map serializer means a tab's write replaces the full key; with N tabs > committing concurrently, whole-Map last-writer-wins can clobber a concurrent per-entity edit *before* > `mergeLattice` (which only runs on the receiving side after a storage event) reconciles it. For strict > multi-tab durability, enable the optional `useTabLeader` single-writer path. ### 5.5 Optimistic actions — token-correlated ack/commit/rollback ```ts // actions.ts async function runAction(id: string, actionId: string): Promise { const n = byId.value.get(id); if (!n) return; const action = options.registry.resolve(n.type)?.actions?.find((a) => a.id === actionId); if (!action) return; const clientToken = crypto.randomUUID(); // THE canonical correlation key const prevLifecycle = n.lifecycle; applyOptimistic(n, action.optimistic?.(n) ?? {}); // immediate, reactive n.pendingAction = { actionId, clientToken, prevLifecycle }; bump(); try { await options.client?.send({ kind: 'action', v: 1, id: crypto.randomUUID(), ts: Date.now(), ref: clientToken, type: n.type, action: actionId, clientToken }); // server echoes clientToken in AckFrame.ref await waitForAck(clientToken, { timeout: 8000 }); // ack-after-timeout IGNORED via token compare n.pendingAction = null; if (action.onCommit) transition(id, action.onCommit); bump(); } catch (e) { // DO NOT rollback on a mid-action socket drop — frame is queued, await ack post-reconnect. if (isTimeoutOrServerError(e)) { transition(id, prevLifecycle); n.pendingAction = null; n.actionError = String(e); bump(); } } } ``` Eviction skips entities with `surfaced || pendingAction !== null`. --- ## 6. Type registry ```ts // vue/toolkit/src/composables/notifications/registry.ts export interface NotificationRendererProps

{ notification: Notification; surface: 'toast' | 'inbox'; // ONE renderer, both surfaces runAction: (actionId: string) => void; dismiss: () => void; } export interface NotificationActionDef

{ readonly id: string; // 'accept' | 'decline' | 'reply' | 'mark-read' readonly label: string | ((n: Notification) => string); readonly altText: string; // -> ToastAction alt-text (required by the primitive) readonly variant?: 'primary' | 'secondary' | 'danger'; readonly optimistic?: (n: Notification) => Partial; readonly onCommit?: NotificationLifecycle; // terminal lifecycle on ack } export type ToastPolicy = 'always' | 'never' | 'by-priority'; export interface NotificationTypeDef

{ readonly type: string; readonly validate: (raw: unknown) => P; // throws on malformed -> caught per-message in ingest readonly renderer: Component; // NotificationRendererProps

readonly actions?: readonly NotificationActionDef

[]; readonly defaultPriority?: NotificationPriority; readonly toastPolicy?: ToastPolicy; // default 'by-priority' readonly grouping?: (p: P) => string | null; readonly contentHash?: (p: P) => string; // override the FNV-1a default readonly sound?: string | false; readonly ttl?: number; // toast auto-dismiss ms; Infinity = sticky readonly persist?: boolean; // default true } export function defineNotificationType

(def: NotificationTypeDef

): NotificationTypeDef

{ return def; } export interface NotificationRegistry { register

(def: NotificationTypeDef

): void; resolve(type: string): NotificationTypeDef | undefined; has(type: string): boolean; shouldToast(n: Notification): boolean; priorityFor(type: string, override?: NotificationPriority): NotificationPriority; all(): readonly NotificationTypeDef[]; } export function createNotificationRegistry( defs?: readonly NotificationTypeDef[], opts?: { onUnknownType?: (type: string) => void; toastFloor?: NotificationPriority }, ): NotificationRegistry; // provided/injected so renderers + store + ingestion share ONE instance (and tests inject a fake) export const { inject: useNotificationRegistry, provide: provideNotificationRegistry, appProvide: appProvideNotificationRegistry } = useContextFactory('NotificationRegistry'); ``` Registering the three required types: ```ts const friendRequest = defineNotificationType<{ fromUserId: string; name: string; avatar: string }>({ type: 'friend_request', validate: (raw) => { const p = raw as any; if (typeof p?.fromUserId !== 'string') throw new Error('bad friend_request'); return p; }, renderer: FriendRequestCard, defaultPriority: Priority.high, toastPolicy: 'by-priority', grouping: () => null, sound: '/sounds/ping.mp3', ttl: Infinity, // sticky until acted on actions: [ { id: 'accept', label: 'Accept', altText: 'Accept friend request', variant: 'primary', optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' }, { id: 'decline', label: 'Decline', altText: 'Decline friend request', variant: 'danger', optimistic: () => ({ lifecycle: 'dismissed', dismissedAt: Date.now() }), onCommit: 'dismissed' }, ], }); const systemAlert = defineNotificationType<{ title: string; body: string }>({ type: 'system_alert', validate: (raw) => raw as any, renderer: SystemAlertCard, defaultPriority: Priority.critical, // renders ToastRoot type='foreground' (assertive aria-live) toastPolicy: 'always', ttl: 60_000, actions: [], // dismiss-only via }); const message = defineNotificationType<{ threadId: string; from: string; text: string }>({ type: 'message', validate: (raw) => raw as any, renderer: MessageRow, defaultPriority: Priority.normal, toastPolicy: 'by-priority', // normal < high floor => inbox-only (badge), no toast grouping: (p) => `dm:${p.threadId}`, // collapse same thread => "+N more" contentHash: (p) => `${p.threadId}:${p.text}`, actions: [ { id: 'reply', label: 'Reply', altText: 'Reply to message' }, { id: 'mark-read', label: 'Mark read', altText: 'Mark message read', optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' }, ], }); registerNotificationTypes([friendRequest, systemAlert, message]); // tree-shakeable: only imported types ship ``` --- ## 7. Presentation bridge ### 7.1 `` — drives the EXISTING toast primitive (consumer-owned list per `toast/demo.vue`) ```vue ``` > The store owns the list + `remove`/`dismissToast`; the primitive stays consumer-driven exactly as its > `demo.vue` requires. We do **not** fork or extend `vue/primitives/src/toast`. Auto-dismiss (the > primitive sets `open=false`) routes through `@update:open` → `dismissToast` (toast→inbox, never > deleted). ### 7.2 `` — headless inbox, same renderer (`surface='inbox'`) ```vue ``` **Priority + policy decides toast-vs-inbox** entirely inside `registry.shouldToast(n)`: `'always'`→toast; `'never'`→inbox-only; `'by-priority'`→`n.priority >= toastFloor` (default `high`). Every notification lands in the inbox (durable `Deque`); only `shouldToast` ones also enter the surfacing `PriorityQueue`. `critical` renders `type='foreground'` (assertive aria-live) and interrupts the lowest visible toast. --- ## 8. Public API & usage ```ts // vue/toolkit/src/composables/notifications/useNotifications/index.ts export interface UseNotificationsReturn { entities: Readonly>>; toasting: ComputedRef; groups: ComputedRef; unreadCount: ComputedRef; byPriority: ComputedRef>; byType: ComputedRef>; connectionState: Readonly>; notify:

(input: { type: string; payload: P; priority?: NotificationPriority }) => string; // local -> SAME funnel toast: ToastSugar; // toast.success/error/dismiss -> built-in '__toast' type markRead: (id: string) => void; markAllRead: () => void; dismissToast: (id: string) => void; archive: (id: string) => void; runAction: (id: string, actionId: string) => Promise; openModel: (id: string) => Ref; } export function useNotifications(): UseNotificationsReturn; // full surface export function useNotificationCenter(): UseNotificationCenterReturn; // inbox/groups/filter/markAllRead export function useUnreadCount(): Readonly>; // cheap badge — does NOT pull the whole store ``` End-to-end (a Nuxt plugin / app entry): ```ts // plugins/notifications.client.ts import { createNotificationRegistry, appProvideNotificationRegistry, registerNotificationTypes, useSharedRealtime } from '@robonen/vue'; import { createWebSocketTransport } from '@robonen/realtime'; import { authPlugin, jsonCodec, heartbeatPlugin, resumePlugin, loggerPlugin } from '@robonen/realtime/plugins'; export default defineNuxtPlugin((nuxtApp) => { const registry = createNotificationRegistry(); registerNotificationTypes([friendRequest, systemAlert, message]); // bootstrap, once // CORRECTED: use appProvide at the plugin layer (component-level `provide` only works inside setup) appProvideNotificationRegistry(nuxtApp.vueApp)(registry); useSharedRealtime({ transport: createWebSocketTransport(), url: () => `wss://api.example.com/notifications?u=${useUserId().value}`, plugins: [authPlugin(getToken), jsonCodec, heartbeatPlugin(), resumePlugin(), loggerPlugin(console.log)], backoff: { base: 500, cap: 30_000, jitter: true }, }); }); ``` ```vue ``` ```ts // anywhere: local notification + imperative toast (SAME ingest funnel) const { notify, toast, runAction } = useNotifications(); notify({ type: 'message', payload: { threadId: 't1', from: 'Ana', text: 'hi' } }); toast.success('Saved', { description: 'Your changes are live.' }); toast.error('Upload failed', { actions: [{ id: 'retry', label: 'Retry', altText: 'Retry upload' }] }); await runAction(friendReqId, 'accept'); // optimistic -> WS action -> ack commit / timeout rollback ``` --- ## 9. SSR, testing, tree-shaking **SSR.** `core/realtime` imports nothing from Vue and touches `WebSocket`/`BroadcastChannel`/timers only after `connect()`. `useRealtime` returns `ssrNoopRealtime()` when `!isClient`. `useStorage` defers with `initOnMounted`. `useSharedRealtime` (`createSharedComposable`) returns a fresh raw instance on the server. The unread badge renders only after `onMounted`. `` lives under ``. **Testing** (house style: `describe(fnName)`, `effectScope()` disposal, `MockTransport`, fake registry): ```ts // store.test.ts describe('createNotificationStore', () => { it('surfacing interrupt evicts lowest toast back to inbox, never deletes', () => { const scope = effectScope(); scope.run(() => { const store = createNotificationStore({ registry: fakeRegistry, maxToasts: 1 }); store.commit(ingest(frame('a', { priority: Priority.low })), 'ingest'); store.commit(ingest(frame('b', { priority: Priority.high })), 'ingest'); // interrupts expect(store.toasting.value.map(n => n.id)).toEqual(['b']); expect(store.entities.value.get('a')?.lifecycle).toBe('queued'); // demoted, still present }); scope.stop(); }); it('hydration-commit never toasts (cross-tab no-re-toast)', () => { const store = createNotificationStore({ registry: fakeRegistry }); store.commit(ingest(frame('x', { priority: Priority.critical })), 'hydration'); expect(store.toasting.value).toHaveLength(0); // inbox only expect(store.unreadCount.value).toBe(1); }); it('two-key dedup folds identical content into meta.count', () => { /* same contentHash, distinct id -> count=2 */ }); it('disposes timers/subscriptions on scope stop', () => { /* spy + effectScope().stop() */ }); }); // machine.test.ts — drive via await machine.send(event) describe('buildLifecycleMachine', () => { it('NET_OFFLINE pauses retries without scheduling RETRY', async () => { const m = buildLifecycleMachine(depsOffline); await m.send('CONNECT'); await m.send('OPENED'); await m.send('NET_OFFLINE'); expect(depsOffline.scheduleRetry).not.toHaveBeenCalled(); }); it('cursor advances ONLY on contiguous delivery', () => { const rb = createReorderBuffer({}); expect(rb.accept(frame('a', { seq: 1 })).map(f => f.seq)).toEqual([1]); expect(rb.cursor).toBe('1'); expect(rb.accept(frame('c', { seq: 3 }))).toEqual([]); expect(rb.cursor).toBe('1'); // gap held expect(rb.accept(frame('b', { seq: 2 })).map(f => f.seq)).toEqual([2, 3]); expect(rb.cursor).toBe('3'); // drains }); it('drops duplicates by id and seq<=lastSeq', () => { /* idempotent at-least-once */ }); }); ``` Plus explicit tests for **seq-gap-then-reconnect** (assert no loss/dup), **storage-event toast suppression**, **bounded-LRU dedup eviction**, and **leader handoff on abrupt tab close**. **Tree-shaking.** Per-category barrels (`composables/notifications/index.ts` → `composables/index.ts` → `src/index.ts`). Registry descriptors are **side-effect-free**; only the `defineNotificationType`s a consumer imports + `register()`s ship. Realtime plugins are separate modules under `@robonen/realtime/plugins`. `useUnreadCount` imports only the badge selector. --- ## 10. Build order / milestones | # | Milestone | Deliverables | Gate | |---|---|---|---| | **M1** | Transport core skeleton | `core/realtime`: `Transport` iface, `createWebSocketTransport`, `envelope.ts`, `MockTransport`. | Connect/send/recv against a mock; JSON round-trip. | | **M2** | Lifecycle + reliability | `machine.ts`, `backoff.ts` (jitter, gated), `heartbeat`/`resume` plugins, `reorder.ts` (contiguous cursor + gap-timeout + LruSet), `outbound.ts` (`AsyncPool`+`Deque`+ack by clientToken). | machine/reorder/backoff tests green; seq-gap-reconnect asserts no loss. | | **M3** | Plugin pipeline + 2nd transport | `plugin.ts` 1:1 with `core/fetch`; `auth`/`codec`/`logger`/`compression`; ship `createSseTransport`. | SSE swaps in with zero upper-layer change. | | **M4** | Registry + ingest funnel | `registry.ts`, `defineNotificationType`, `ingest.ts` (pure, catches `validate` throws → `onReject`), `contentHash`. | One funnel for WS + local; malformed payload never crashes the pump. | | **M5** | Domain store | `store.ts`, `surfacing.ts` (victim-min PQ + guarded enqueue + `ensureCapacity`), `selectors.ts` (explicit sort), pure-reducer lifecycle, two-key dedup. | Interrupt/eviction/dedup/disposal tests green. | | **M6** | Persistence + cross-tab | `persistence.ts` (`useStorage` map, merge lattice, `toastedIds`), hydration-commit separation, optional `useTabLeader`+`broadcastedRef`. | Storage-event suppression + leader-handoff tests green. | | **M7** | Presentation | `` (drives existing toast), `` primitive, shared `surface`-prop renderers; `useNotifications`/`useNotificationCenter`/`useUnreadCount`; `notify()`/`toast.*`. | Optimistic ack/rollback + a11y verified; the 3 demo types work end-to-end. | **Hardest risks & mitigations** 1. **Resume gap / cursor correctness.** Cursor advances *only* on contiguous delivery; gap-timeout deliver-with-gap + a server reconciliation request for permanently-lost ranges. Test: inject gaps + reconnect, assert no loss/dup. 2. **Zombie connections (TCP half-open).** Watchdog cleared by *any* inbound frame, armed whenever `connected`; conservative configurable `ping 20s`/`watchdog 10s`. 3. **`PriorityQueue` is a min-heap.** The surfacing PQ comparator makes the *eviction victim* the minimum (lowest priority, then oldest); viewport order is a **separate** display sort. `surface()` `peek`/`isFull`-guards then `dequeue`-then-`enqueue`; never `toArray()` for display. 4. **Cross-tab double-toast.** `commit()` separates ingest (may toast) from hydration (never); per-tab non-persisted `toastedIds`; `useTabLeader` (Web Locks) for the optional single socket; **never** `useEventBus` for cross-tab; **never** a hand-rolled lease. 5. **Singleton arg-dropping.** Inbox/registry via `useAppSharedState` (options bound once); socket via `createSharedComposable` (disposable, raw-on-SSR). 6. **Optimistic double-apply.** Idempotent server commands keyed by `clientToken` (the one canonical correlation key); ack-after-timeout ignored via token compare; do **not** rollback on mid-action socket drop; eviction skips `pendingAction`/`surfaced`. 7. **Persistence schema.** Persist only committed lifecycle + read/dismiss receipts + cursor (exclude runtime fields); Map serializer; validate-on-hydrate, drop+log unknown-type entities. Whole-Map LWW is the multi-tab durability limit — opt into leader single-writer for strictness. 8. **Durable-Deque overflow.** `pushBack` throws at `maxSize`, so `ensureCapacity()` runs *before* every insert and force-evicts the oldest non-active entity if all candidates are protected. 9. **SSR hydration mismatch.** Badge/connection UI render only after mount; `` under ``; `isClient` guards throughout. --- ## Appendix — review corrections folded into this document This design was adversarially verified against the real repo source. Corrections already applied above: 1. **PriorityQueue orientation** — it's a min-heap; the surfacing comparator now makes the eviction victim the minimum, and the viewport uses a separate display sort (§5.2, §5.3). 2. **Registry provisioning** — the Nuxt plugin uses `appProvide(nuxtApp.vueApp)` instead of component-level `provide` (§6, §8). 3. **Bounded LRU dedup** — `seen` is a real bounded `LruSet`, not an unbounded `Set` (§4.5). 4. **Backoff** — `backoffDelay` is bespoke and does not reuse stdlib `retry()` (§4.3). 5. **Ack correlation** — one canonical key (`clientToken`) end-to-end; `AckFrame.ref` echoes it (§4.1, §4.6, §5.5). 6. **Network boundary** — the Vue `useRealtime` wrapper watches `useNetwork().isOnline` and dispatches `NET_OFFLINE`/`NET_ONLINE`; `core/realtime` never imports Vue (§4.7). 7. **Durable overflow** — explicit `ensureCapacity()` before insert (the `Deque` throws when full) (§5.2). 8. **Server contract** — the monotonic-seq / replay / reconcile / token-echo requirements are stated explicitly (§4.1).