Type the docs extractor's package.json parsing as unknown; comment the Vite plugin version-skew cast; wire the tests preset; site/architecture WIP.
62 KiB
Notification System Architecture
WebSocket-driven, type- and priority-aware notifications for the @robonen monorepo.
Status: design proposal (not yet implemented). Synthesized from a multi-agent design pass and adversarially verified against the real repo APIs. The "Verified facts" call-outs below are confirmed against source; the corrections from review are already folded into the code in this document.
0. Context — what already exists
The presentation layer already exists: a full sonner-style headless toast primitive lives at
vue/primitives/src/toast/ — ToastProvider, ToastRoot,
ToastViewport, ToastTitle/Description/Action/Close/Announce/Portal, with per-toast auto-dismiss
(pause/resume RAF timer), swipe-to-dismiss, teleport-to-viewport, a11y live-region announce,
collection-based stacking, and controlled/uncontrolled v-model:open. We drive it, we do not
fork it.
The data-layer building blocks also exist and are reused verbatim:
| Need | Reuse | Path |
|---|---|---|
| Bounded priority heap | PriorityQueue (min-heap) |
core/stdlib/src/structs/PriorityQueue |
| Bounded FIFO/reorder | Deque |
core/stdlib/src/structs/Deque |
| Concurrency-limited drain | AsyncPool |
core/stdlib/src/async/pool |
| Connection state machine | createAsyncMachine |
core/stdlib/src/patterns/behavioral/StateMachine/async |
| Plugin/middleware model | composePlugins / definePlugin |
core/fetch/src/plugin.ts |
| In-process events | createEventHook |
vue/toolkit |
| Reactive cross-tab storage | useStorage (write-lock + storage events) |
vue/toolkit |
| Online/offline | useNetwork |
vue/toolkit |
| Tab leader (Web Locks) | useTabLeader |
vue/toolkit |
| Cross-tab ref (BroadcastChannel) | broadcastedRef |
vue/toolkit |
| Typed provide/inject | useContextFactory |
vue/toolkit |
| Disposable singleton | createSharedComposable |
vue/toolkit |
| App-lifetime singleton | useAppSharedState |
vue/toolkit |
| Causal-ordering reference | Replica (Lamport/version-vector) |
core/crdt/src/doc/replica.ts |
There is no WebSocket / SSE / streaming code anywhere — the transport is greenfield.
1. TL;DR & key decisions
A four-layer, transport-agnostic system:
- L1
core/realtime(greenfield, framework-free) owns the socket: acreateAsyncMachinelifecycle, acore/fetch-cloned plugin/middleware pipeline, full-jitter reconnect gated onuseNetwork, a heartbeat watchdog cleared by any inbound frame, a scalar resume cursor that advances only on contiguous delivery, aDeque+AsyncPooloutbound queue with a sharedAbortSignal, and a swappableTransportinterface shipped with both WebSocket and SSE. - L2
ingest()— a single pure funnel that validates every notification (WS-pushed and locally-issuednotify()/toast.*alike) through the type registry, producing a normalizedNotificationentity. - L3 domain store — inbox via
useAppSharedState, socket-owning client viacreateSharedComposable; ashallowRef<Map>+ version-counter with a synchronous pure-reducer lifecycle, two-key (id + contentHash) dedup, a small surfacing-budgetPriorityQueuedistinct from a large durable-capacityDeque, derived selectors, optimistic actions with token-correlated ack/rollback, anduseStoragecross-tab sync wherecommit()structurally separates ingest-commit (may toast) from hydration-commit (never toasts). - L4 presentation — a
<Toaster>that drives the existing toast primitive (v-forover the bounded toast slice) plus a headless<NotificationCenter>, both reusing one per-type renderer via asurface: 'toast' | 'inbox'prop.
| Decision | Choice | Why |
|---|---|---|
| Transport home | greenfield core/realtime (no Vue import) |
Framework-free → trivially SSR-excluded, headless-testable, mirrors the core/fetch boundary. Thin useRealtime wrapper in vue/toolkit. |
| Connection lifecycle | one createAsyncMachine instance |
Genuine async side-effects per transition. NET_OFFLINE/NET_ONLINE are first-class events that pause retry scheduling. |
| Entity lifecycle | synchronous pure reducer + legal-edge table | createAsyncMachine.send is async + per-instance stateful; hundreds of entities each with async hot-path transitions is wrong. |
| Plugin system | cloned 1:1 from core/fetch (execute field, composeRealtimePlugins(plugins, userDefaults), setup({defaults})) |
Team knows the model; preserves EMPTY_HOOKS fast path. Do not rename execute→transform. |
| Resume cursor | scalar seq, advances only on contiguous delivery |
Single-writer stream collapses the version-vector to one scalar. Advancing on a gapped/buffered frame silently skips messages on next resume. |
| Reorder | bounded Deque + gap-timeout flush |
Strict in-order release head-of-line-stalls the stream on permanent loss; the timeout trades completeness for liveness + triggers server reconciliation. |
| Dedup | two keys: server id + contentHash (FNV-1a) |
id makes at-least-once idempotent; contentHash folds retransmits + identical-content distinct-id events into meta.count for +N more. |
| Capacity | two independent bounds | Tiny surfacing budget PriorityQueue (~3) vs large durable Deque (~300). Conflating them spams toasts or drops history. |
PriorityQueue orientation |
min-heap victim comparator + separate display sort | enqueue throws RangeError when full; peek()/dequeue() return the comparator minimum. The PQ comparator makes the eviction victim the minimum; the viewport order is a separate explicit sort. |
| Store singleton | inbox+registry via useAppSharedState; socket via createSharedComposable |
createSharedComposable drops constructor args after first call → never wrap an options-taking store in it. Socket must dispose on last unmount + be raw-on-SSR. |
| Cross-tab no-re-toast | commit() separates ingest-commit vs hydration-commit; per-tab non-persisted toastedIds Set |
Correct without leader election. useEventBus is same-document only and cannot dedup across tabs. |
| Leader (optional) | existing useTabLeader (Web Locks) + broadcastedRef fan-out |
navigator.locks auto-releases on tab crash (no stale-lease/split-brain). Do not hand-roll a useStorage lease coordinator (layering inversion + race). |
| Imperative API | single ingest() funnel; toast.success = sugar over a built-in __toast type |
Identical validate/priority/group/lifecycle regardless of source; one code path to test. |
| Presentation | drive the existing Toast primitive; one renderer for both surfaces | Consumer-owned list + @update:open removal exactly per toast/demo.vue. surface prop kills duplication. |
2. Architecture diagram
┌─────────────────────────────────────────────────────────────┐
│ WS SERVER (single totally-ordered stream per user) │
└───────▲───────────────────────────────────────────┬──────────┘
outbound (acks, │ ResumeFrame{cursor} on (re)connect │ inbound frames
ActionFrame, │ │ (notification | hello | ack | pong)
ResumeFrame, ping) │ ▼
┌───────────────────────────┴─────────────────────────────────────────────────────────────────┐
│ L1 core/realtime (framework-free) │
│ Transport iface ── createWebSocketTransport | createSseTransport (swap seam, both shipped) │
│ │ onMessage/onOpen/onClose/onError │
│ ▼ │
│ RealtimeClient ── createAsyncMachine: idle→connecting→connected→reconnecting→error→closed │
│ ├─ plugins (composeRealtimePlugins): auth · codec(JSON) · heartbeat · resume · log · gzip │
│ ├─ heartbeat watchdog (cleared by ANY inbound frame) ── useNetwork-gated backoff* │
│ ├─ reorder Deque (causal-hold) + gap-timeout ── cursor advances ONLY on contiguous │
│ ├─ dedup-by-id bounded LRU │
│ └─ outbound: Deque(bounded) ─drain→ AsyncPool(shared AbortSignal) ──┐ send() │
│ onNotification(env) ─────────────────────────────────────┐ │ │
│ onControl(ack) ─────────────────────────────┐ │ │ ack→commit/rollback │
└────────────────────────────────────────────────┼───────────┼──────────────┼────────────────────┘
(* useNetwork.isOnline is watched in the Vue useRealtime wrapper, which dispatches
NET_OFFLINE/NET_ONLINE into machine.send — core/realtime never imports Vue.)
│ │ │
local notify()/toast.* ────────────┐ │ ▼ │
▼ │ ┌──────────────────┐ │
┌──────────────────────▼┐ │ L2 ingest() │ │
│ ONE ingest() funnel │ │ (pure) │ │
│ registry.resolve → │◄┘ validate→build │ │
│ validate → normalize → │ IngestResult │ │
│ contentHash → dedup │ │ │
└───────────┬────────────┘ │ │
│ commit(result, source) │ │
▼ │ │
┌──────────────────────────────────────────────────────────────────────┐ │ │
│ L3 domain store (vue/toolkit) │ │ │
│ shallowRef<Map<id,Notification>> + version-counter │ │ │
│ pure-reducer lifecycle (legal-edge table) │ │ │
│ surfacing PriorityQueue(~3, victim=min) ◄─ interrupt ─► durable Deque(~300) │
│ selectors: unreadCount · byPriority · byType · groups · toasting │─┘ │
│ optimistic: clientToken ─send→ ───────────────────────────────────────────┘
│ useStorage(map) cross-tab ── commit(): ingest=may-toast / hydrate=never
│ optional useTabLeader + broadcastedRef (leader holds socket) │
└───────────┬───────────────────────────────────────────────┬──────────┘
│ store.toasting (bounded, sorted selector) │ store.groups / unreadCount
▼ ▼
┌────────────────────────────────┐ ┌──────────────────────────────────────┐
│ <Toaster> (vue/primitives) │ │ <NotificationCenter> (headless) │
│ ToastProvider │ │ Root ctx via useContextFactory │
│ └ ToastRoot v-for (EXISTING │ │ CollectionItem-ordered groups │
│ toast primitive, unforked) │ shared │ data-state hooks │
│ v-model:open │── per-type ┤ <component :is=renderer surface=inbox> │
│ @update:open → dismissToast│ renderer │ │
│ <component :is=renderer ───┤ │ │
│ surface="toast"> │ │ │
│ └ ToastViewport │ │ │
└────────────────────────────────┘ └──────────────────────────────────────┘
3. File & package layout
core/realtime/ # NEW greenfield @robonen/realtime (framework-free)
package.json tsdown.config.ts README.md
src/
index.ts # barrel
types.ts # Envelope union, RealtimeState/Event, Transport, options
client.ts # createRealtimeClient — machine + pool + plugins + reorder
machine.ts # buildLifecycleMachine() -> createAsyncMachine(...)
envelope.ts # encode/decode (JSON default) + type guards
reorder.ts # seq reorder Deque + bounded-LRU dedup + contiguous-cursor + gap-timeout
backoff.ts # bespoke backoffDelay(attempt, opts) — full jitter (NOT stdlib retry)
outbound.ts # bounded Deque + AsyncPool drainer + ack correlation (clientToken)
lru.ts # bounded LruSet for dedup
plugin.ts # defineRealtimePlugin / composeRealtimePlugins (mirror core/fetch)
transports/{websocket.ts, sse.ts}
plugins/{auth,codec,heartbeat,resume,logger,compression}.ts
__mocks__/mockTransport.ts
*.test.ts
vue/toolkit/src/composables/realtime/useRealtime/{index.ts, index.test.ts} # NEW: thin Vue wrapper + useNetwork wiring
vue/toolkit/src/composables/notifications/ # NEW category — domain + facade
index.ts types.ts registry.ts ingest.ts store.ts surfacing.ts
selectors.ts persistence.ts actions.ts
useNotifications/index.ts useNotificationCenter/index.ts useUnreadCount/index.ts
__tests__/*.test.ts
vue/primitives/src/toaster/{Toaster.vue, index.ts, demo.vue} # NEW: drives existing toast/
vue/primitives/src/notification-center/ # NEW headless inbox primitive
context.ts NotificationCenterRoot.vue NotificationCenterList.vue
NotificationCenterItem.vue NotificationCenterGroup.vue
NotificationCenterBadge.vue NotificationCenterEmpty.vue
index.ts demo.vue __test__/NotificationCenter.test.ts
# REUSED UNCHANGED: vue/primitives/src/toast/*, core/stdlib structs/async, core/fetch plugin model,
# vue/toolkit (createEventHook/useStorage/useNetwork/useTabLeader/broadcastedRef/
# createSharedComposable/useAppSharedState/useContextFactory), @robonen/platform/{multi,browsers}
4. Transport layer (core/realtime)
4.1 Wire envelope protocol
// core/realtime/src/types.ts
export interface BaseEnvelope { v: 1; id: string; ts: number }
// ── inbound DATA plane ──
export type NotificationFrame = BaseEnvelope & {
kind: 'notification';
type: string; // registry key
seq: number; // strictly-monotonic per session (server-assigned)
priority?: NotificationPriority;
groupKey?: string;
payload: unknown; // validated downstream by the registry
};
// ── CONTROL plane ──
export type HelloFrame = BaseEnvelope & { kind: 'hello'; sessionId: string; cursor?: string };
export type AckFrame = BaseEnvelope & { kind: 'ack'; ref: string; ok: boolean; error?: string }; // ref = correlation token (see §5.5)
export type HeartbeatFrame = BaseEnvelope & { kind: 'ping' | 'pong' };
export type ResumeFrame = BaseEnvelope & { kind: 'resume'; cursor?: string };
export type ActionFrame = BaseEnvelope & { kind: 'action'; ref: string; type: string; action: string; clientToken: string; payload?: unknown };
export type InboundEnvelope = NotificationFrame | HelloFrame | AckFrame | HeartbeatFrame;
export type OutboundEnvelope = ResumeFrame | AckFrame | HeartbeatFrame | ActionFrame;
export type ControlFrame = HelloFrame | AckFrame | HeartbeatFrame;
export type RealtimeState = 'idle' | 'connecting' | 'connected' | 'reconnecting' | 'error' | 'closed';
export type RealtimeEvent =
| 'CONNECT' | 'OPENED' | 'CONNECTED' | 'DROPPED' | 'RETRY'
| 'GIVE_UP' | 'CLOSE' | 'NET_OFFLINE' | 'NET_ONLINE';
export interface Transport {
connect(url: string, signal: AbortSignal): Promise<void>;
send(raw: string | ArrayBuffer): void;
close(code?: number, reason?: string): void;
readonly onOpen: (cb: () => void) => () => void;
readonly onMessage: (cb: (raw: unknown) => void) => () => void;
readonly onClose: (cb: (e: { code: number; reason: string; wasClean: boolean }) => void) => () => void;
readonly onError: (cb: (err: unknown) => void) => () => void;
}
Server contract this layer depends on (new, server-side, not part of any existing repo API): (1) strictly-monotonic per-session server-assigned
seq; (2) onResumeFrame{cursor}the server replays every frame withseq > cursor; (3) a reconciliation endpoint/command to re-request a permanently-lost[from, to]range flushed by the gap-timeout; (4) action commands are idempotent, keyed byclientToken, and the server echoes that token inAckFrame.ref.
4.2 Plugin system — cloned 1:1 from core/fetch
Verified: core/fetch uses composePlugins(plugins, userDefaults), an execute onion-middleware
field, setup({defaults}) run after the merge, and a frozen EMPTY_HOOKS fast path. We mirror this
exactly — the field is execute, not transform.
// core/realtime/src/plugin.ts
export interface RealtimeContext {
state: RealtimeState; attempt: number; cursor: string | undefined;
sessionId: string | undefined; aborter: AbortController;
}
export type RealtimeHook<C = RealtimeContext> = (context: C) => void | Promise<void>;
export interface RealtimeHooks {
onConnect?: RealtimeHook<{ ctx: RealtimeContext; url: string }> | RealtimeHook<{ ctx: RealtimeContext; url: string }>[];
onMessage?: RealtimeHook<{ raw: unknown; env?: InboundEnvelope }> | RealtimeHook<{ raw: unknown; env?: InboundEnvelope }>[]; // DECODE here (set env)
onSend?: RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }> | RealtimeHook<{ env: OutboundEnvelope; raw?: unknown }>[]; // ENCODE here (set raw)
onClose?: RealtimeHook<{ code: number; reason: string }> | RealtimeHook<{ code: number; reason: string }>[];
onError?: RealtimeHook<{ error: unknown }> | RealtimeHook<{ error: unknown }>[];
}
export type RealtimeSendMiddleware = (ctx: { env: OutboundEnvelope; raw?: unknown }, next: () => Promise<void>) => Promise<void>;
export interface RealtimePlugin<Name extends string = string> {
readonly name: Name;
readonly defaults?: Partial<RealtimeClientOptions>;
readonly hooks?: RealtimeHooks;
readonly execute?: RealtimeSendMiddleware; // onion around each socket write
readonly setup?: (ctx: { readonly defaults: RealtimeClientOptions }) => void;
}
export function defineRealtimePlugin<const N extends string>(p: RealtimePlugin<N>): RealtimePlugin<N> { return p; }
// flattens per-phase hook arrays + onion-composes `execute`, identical structure to composePlugins
export function composeRealtimePlugins(
plugins: readonly RealtimePlugin[] | undefined,
userDefaults: Partial<RealtimeClientOptions> | undefined,
): ComposedRealtimePlugins { /* mirror of core/fetch composePlugins; EMPTY_HOOKS fast path */ }
Shipped tree-shakeable plugins (canonical order asserted in setup: codec encodes before
compression):
// core/realtime/src/plugins/codec.ts — JSON default (core/encoding has NO generic codec, verified)
export const jsonCodec = defineRealtimePlugin({
name: 'codec',
hooks: {
onMessage: ({ raw }) => { /* env = JSON.parse(raw as string); shape-check */ },
onSend: (c) => { c.raw = JSON.stringify(c.env); },
},
});
// auth.ts (onConnect: append token subprotocol/query; refresh on error)
// heartbeat.ts (owns ping interval + watchdog) resume.ts (persists & injects cursor)
// logger.ts (WeakMap timing like the fetch logger) compression.ts (execute middleware)
4.3 Connection lifecycle — createAsyncMachine
Verified ordering inside send(): guard → action → exit → set currentState → entry, all awaited;
one action per transition, one entry/exit per state; the initial state's entry does not auto-run
on construction. NET_OFFLINE/NET_ONLINE are first-class events that pause/resume retry scheduling
instead of burning the attempt counter.
// core/realtime/src/machine.ts
export function buildLifecycleMachine(deps: MachineDeps) {
return createAsyncMachine({
initial: 'idle',
context: { state: 'idle', attempt: 0, cursor: undefined, sessionId: undefined, aborter: new AbortController() } as RealtimeContext,
states: {
idle: { on: { CONNECT: 'connecting' } },
connecting: {
entry: async (ctx) => { ctx.aborter = new AbortController(); await deps.transport.connect(deps.url(), ctx.aborter.signal); },
on: {
OPENED: { target: 'connected', action: async (ctx) => { await deps.send({ kind: 'resume', cursor: ctx.cursor, v: 1, id: deps.id(), ts: deps.now() }); } },
DROPPED: { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() },
CLOSE: 'closed',
},
},
connected: {
entry: () => { deps.startHeartbeat(); deps.flushOutbound(); }, // deterministic side-effects
exit: () => { deps.stopHeartbeat(); },
on: {
DROPPED: { target: 'reconnecting', guard: () => deps.isOnline() && deps.canRetry() },
NET_OFFLINE: { target: 'reconnecting', action: () => deps.pauseRetries() }, // sit, do NOT schedule RETRY
CLOSE: 'closed',
},
},
reconnecting: {
entry: (ctx) => { if (deps.isOnline()) deps.scheduleRetry(ctx.attempt); }, // backoffDelay(attempt) then RETRY
on: {
RETRY: { target: 'connecting', action: (ctx) => { ctx.attempt++; } },
NET_ONLINE: { target: 'connecting', action: (ctx) => { ctx.attempt = 0; } }, // reset + immediate reconnect
GIVE_UP: { target: 'error', guard: () => !deps.canRetry() },
CLOSE: 'closed',
},
},
error: { on: { CONNECT: 'connecting' } },
closed: { entry: (ctx) => { ctx.aborter.abort(); deps.transport.close(1000); } },
},
});
}
backoff.ts is a bespoke standalone function (it does not call stdlib retry(), which loops
attempts internally and cannot drive an event-driven, network-gated, machine-scheduled reconnect). It
borrows only the shape of a delay: (n) => number function:
export function backoffDelay(attempt: number, o: { base?: number; cap?: number; jitter?: boolean } = {}): number {
const base = o.base ?? 500, cap = o.cap ?? 30_000;
const raw = Math.min(cap, base * 2 ** attempt);
return o.jitter === false ? raw : raw * (0.5 + Math.random() * 0.5); // full jitter
}
// scheduleRetry(attempt): a no-op while !isOnline(); else setTimeout(backoffDelay(attempt)) -> machine.send('RETRY').
4.4 Heartbeat / zombie detection
heartbeatPlugin arms a watchdog whenever connected. Any inbound frame (pong or data) clears
it; on fire → machine.send('DROPPED'). This is the only reliable TCP-half-open detector. Conservative
configurable defaults (ping 20s / watchdog 10s).
4.5 Ordering / dedup / resume
// core/realtime/src/lru.ts — bounded dedup set (prevents an unbounded memory leak over a long session)
export class LruSet {
private map = new Map<string, true>();
constructor(private max = 4096) {}
has(k: string) { return this.map.has(k); }
add(k: string) {
if (this.map.has(k)) this.map.delete(k); // bump to MRU
this.map.set(k, true);
if (this.map.size > this.max) this.map.delete(this.map.keys().next().value!); // evict LRU
}
}
// core/realtime/src/reorder.ts
export function createReorderBuffer(opts: { gapTimeoutMs?: number; onReconcile?: (from: number, to: number) => void }) {
const buffer = new Deque<NotificationFrame>([], { maxSize: 1024 }); // out-of-order hold (causal-hold; ref: crdt replica.ts)
const seen = new LruSet(8192); // BOUNDED dedup-by-id
let lastSeq = 0;
let cursor: string | undefined; // = String(lastSeq), persisted
function accept(f: NotificationFrame): NotificationFrame[] {
if (seen.has(f.id) || f.seq <= lastSeq) return []; // duplicate / replay -> drop (idempotent at-least-once)
seen.add(f.id);
if (f.seq === lastSeq + 1) {
const out = [f]; lastSeq = f.seq; cursor = String(lastSeq); // CONTIGUOUS — advance cursor ONLY here
let next: NotificationFrame | undefined;
while ((next = takeSeq(buffer, lastSeq + 1))) { out.push(next); lastSeq = next.seq; cursor = String(lastSeq); }
return out;
}
putBySeq(buffer, f); // gap: hold, DO NOT advance cursor
armGapTimeout(() => opts.onReconcile?.(lastSeq + 1, f.seq - 1)); // permanent-loss -> deliver-with-gap + server reconcile
return [];
}
return { accept, get cursor() { return cursor; }, hydrate(c?: string) { if (c) { lastSeq = Number(c); cursor = c; } } };
}
Resume: ResumeFrame{cursor} is sent on every connected.entry; the server replays seq > cursor.
resumePlugin persists the cursor via useStorage so it survives a full reload. A scalar suffices —
a single-writer per-user stream collapses the version-vector to one entry (O(1) resume, small wire).
4.6 Outbound queue / backpressure
// core/realtime/src/outbound.ts
export function createOutbound(deps: { transport: Transport; signal: AbortSignal; concurrency?: number; max?: number }) {
const queue = new Deque<OutboundEnvelope>([], { maxSize: deps.max ?? 256 });
const pool = new AsyncPool({ concurrency: deps.concurrency ?? 4, signal: deps.signal }); // CLOSE aborts in-flight
let connected = false;
const acks = new Map<string, { resolve: () => void; reject: (e: unknown) => void }>(); // keyed by AckFrame.ref
function enqueue(env: OutboundEnvelope) {
if (queue.isFull && env.kind !== 'action') queue.popFront(); // drop-oldest non-critical (action frames never dropped)
queue.pushBack(env);
if (connected) flush();
}
function flush() {
let env: OutboundEnvelope | undefined;
while ((env = queue.popFront())) {
const e = env;
pool.add(async (signal) => {
deps.transport.send(serialize(e));
if (e.kind === 'action') await waitForAck(e.clientToken, acks, { timeout: 8000, signal }); // canonical key = clientToken
});
}
}
function resolveAck(ref: string, ok: boolean, error?: string) { // called from the inbound ack path; ref === clientToken
const w = acks.get(ref); if (!w) return; acks.delete(ref); ok ? w.resolve() : w.reject(new Error(error));
}
return { enqueue, flush, setConnected: (v: boolean) => { connected = v; if (v) flush(); }, resolveAck, dispose: () => pool.dispose() };
}
AsyncPoolhas nopause()(verified). We gate by not adding while disconnected and abort in-flight via the shared signal onCLOSE. The ack correlation key isclientTokenend-to-end: theActionFramecarries it,outboundblockswaitForAck(clientToken), and the server echoes it inAckFrame.ref— one canonical key, noref/clientTokenmismatch.
4.7 Client surface + single-shared-socket across tabs
// core/realtime/src/client.ts
export interface RealtimeClient {
readonly state: () => RealtimeState;
connect(): void; disconnect(code?: number): void;
netOnline(): void; netOffline(): void; // dispatched by the Vue wrapper from useNetwork
onNotification(cb: (f: NotificationFrame) => void): () => void; // ordered + deduped
onControl(cb: (f: ControlFrame) => void): () => void;
onStateChange(cb: (s: RealtimeState) => void): () => void;
send(env: OutboundEnvelope): Promise<void>; // queued offline, flushed on connect
resumeCursor(): string | undefined;
dispose(): void;
}
export function createRealtimeClient(options: RealtimeClientOptions): RealtimeClient { /* machine + reorder + outbound + plugins */ }
The Vue/non-Vue boundary is crossed only in the wrapper: core/realtime never imports useNetwork;
the wrapper watches useNetwork().isOnline and dispatches into the machine. Cross-tab single socket
is an optional optimization built from the existing primitives — never a hand-rolled lease:
// vue/toolkit/src/composables/realtime/useRealtime/index.ts
export function useRealtime(options: UseRealtimeOptions): UseRealtimeReturn {
if (!isClient) return ssrNoopRealtime(); // no socket on server
const { isOnline } = useNetwork();
const { isLeader } = useTabLeader('robonen:notif-socket'); // Web Locks — atomic, crash-safe failover
const client = createRealtimeClient(options);
const fanout = broadcastedRef<NotificationFrame | null>('robonen:notif-inbound', null); // BroadcastChannel — truly cross-tab
watch(isOnline, (online) => online ? client.netOnline() : client.netOffline()); // wire NET_ONLINE/NET_OFFLINE
watchEffect(() => {
if (isLeader.value) { client.connect(); client.onNotification((f) => { fanout.value = f; }); }
else { client.disconnect(); /* follower ingests from `fanout`, runs inbox-only */ }
});
tryOnScopeDispose(() => client.dispose());
return { /* state, isConnected, connect, disconnect, onNotification, send */ };
}
export const useSharedRealtime = createSharedComposable(useRealtime); // ref-counted, disposed on last unmount, raw-on-SSR
5. Domain / data layer
5.1 Entity model + lifecycle
// vue/toolkit/src/composables/notifications/types.ts
export type NotificationPriority = 0 | 1 | 2 | 3; // numeric => trivial comparator + interrupt math
export const Priority = { low: 0, normal: 1, high: 2, critical: 3 } as const;
export type NotificationLifecycle =
| 'received' | 'queued' | 'toasted' | 'read' | 'dismissed' | 'archived' | 'expired';
// legal edges — enforced by the synchronous reducer (illegal => throw in dev, no-op + onError in prod)
export const LIFECYCLE_EDGES: Record<NotificationLifecycle, NotificationLifecycle[]> = {
received: ['queued', 'toasted', 'read', 'archived'],
queued: ['toasted', 'read', 'dismissed', 'archived'],
toasted: ['read', 'dismissed'],
read: ['archived', 'dismissed'],
dismissed: ['archived'],
archived: ['expired'],
expired: [],
};
export interface Notification<TType extends string = string, TPayload = unknown> {
readonly id: string; // server id (dedup key #1) OR contentHash-derived for local
readonly type: TType;
readonly seq: number; // transport seq (ordering); 0 for local
readonly payload: TPayload; // registry-validated
priority: NotificationPriority; // envelope override ?? registry.defaultPriority
lifecycle: NotificationLifecycle;
readonly groupKey: string | null; // registry.grouping(payload)
readonly contentHash: string; // dedup key #2 (FNV-1a) — folds retransmits / identical content
readonly createdAt: number;
readonly source: 'ws' | 'local';
readReceiptAt: number | null;
dismissedAt: number | null;
// runtime-only — NEVER persisted:
surfaced: boolean; // currently occupies a toast slot
pendingAction: { actionId: string; clientToken: string; prevLifecycle: NotificationLifecycle } | null;
actionError: string | null;
meta: { count: number; sourceSeqs: number[] }; // count>1 => "+N more"
}
export interface NotificationGroup {
readonly key: string; readonly type: string; readonly priority: NotificationPriority; // max in group
readonly notifications: Notification[]; // newest-first
readonly count: number; readonly unread: number; readonly latest: Notification;
}
5.2 The store (singleton split + reactive core)
// vue/toolkit/src/composables/notifications/store.ts
export interface NotificationStoreOptions {
registry: NotificationRegistry; persistKey?: string;
maxToasts?: number; maxEntities?: number; client?: UseRealtimeReturn;
}
export function createNotificationStore(options: NotificationStoreOptions): NotificationStore {
const byId = shallowRef(new Map<string, Notification>()); // house-style shallowRef default
const version = ref(0); // O(1) selector invalidation
const bump = () => { triggerRef(byId); version.value++; };
const toastedIds = new Set<string>(); // per-tab, NEVER persisted
// ── SURFACING budget: scarce toast real-estate. ──
// VERIFIED: PriorityQueue is a MIN-heap; peek()/dequeue() return the comparator MINIMUM.
// So the comparator must make the EVICTION VICTIM the minimum: lowest priority first, then OLDEST first.
const cmpEvict = (a: Notification, b: Notification) =>
(a.priority - b.priority) || (a.createdAt - b.createdAt); // min = lowest priority, then oldest = the victim
const surfacing = new PriorityQueue<string>([], {
comparator: (x, y) => cmpEvict(byId.value.get(x)!, byId.value.get(y)!),
maxSize: options.maxToasts ?? 3,
});
// ── DURABLE capacity: memory retention. ──
const order = new Deque<string>([], { maxSize: options.maxEntities ?? 300 });
// ── COMMIT: the single seam. `source` decides toast eligibility. ──
function commit(r: IngestResult, source: 'ingest' | 'hydration'): void {
if (!r.ok) return; // invalid already routed to onReject
const incoming = r.notification;
const existing = byId.value.get(incoming.id) ?? findByContentHash(byId.value, incoming.contentHash);
if (existing) { // DEDUP / cross-tab MERGE
mergeLattice(existing, incoming); // max over monotonic lifecycle lattice + per-field LWW
existing.meta.count++; existing.meta.sourceSeqs.push(incoming.seq);
bump(); return;
}
ensureCapacity(); // guarantee room BEFORE pushBack (Deque throws when full)
byId.value.set(incoming.id, incoming); order.pushBack(incoming.id);
// toast ONLY on ingest-commit (NEVER on hydration) and only if not already toasted in this tab
if (source === 'ingest' && options.registry.shouldToast(incoming) && !toastedIds.has(incoming.id))
surface(incoming);
bump();
}
function surface(n: Notification): void { // guarded PriorityQueue use (enqueue THROWS when full)
if (surfacing.isFull) {
const victimId = surfacing.peek()!; // heap root = most-evictable under cmpEvict
const victim = byId.value.get(victimId)!;
if (cmpEvict(n, victim) <= 0) return; // newcomer is no better than the victim -> inbox-only
surfacing.dequeue(); // interrupt: evict lowest visible toast...
transition(victim.id, 'queued'); victim.surfaced = false; // ...back to inbox, NEVER deleted
}
surfacing.enqueue(n.id); // safe now
transition(n.id, 'toasted'); n.surfaced = true; toastedIds.add(n.id);
}
// VERIFIED: Deque.pushBack throws RangeError at maxSize — never rely on "transient capacity".
function ensureCapacity(): void {
while (order.isFull) {
const victimId = pickDurableVictim(order, byId.value); // expired -> read/archived oldest -> lowest-priority oldest
if (!victimId) break; // all protected -> last-resort force-evict oldest non-active below
remove(victimId);
}
}
// pickDurableVictim skips surfaced || pendingAction!=null; only if EVERY candidate is active does it
// force-evict the oldest non-active entity, guaranteeing pushBack never throws.
// ... transition (pure reducer over LIFECYCLE_EDGES), markRead, dismissToast (toast->inbox), archive, remove, runAction
return { /* see selectors + actions below */ };
}
Singleton mapping (the verified-correct split):
// store + registry are app-lifetime (survive route changes) and options are bound ONCE here.
const useStoreSingleton = useAppSharedState(createNotificationStore); // NOT createSharedComposable (it drops later args)
// the socket-owning client uses createSharedComposable (ref-counted, disposable, raw-on-SSR) — see §4.7.
Verified:
createSharedComposablebinds args on the first call and ignores them afterward, and returns the raw composable on SSR;useAppSharedStateruns the factory once inside a moduleeffectScope(true)that is never stopped (app-lifetime). So: options-bearing inbox/registry →useAppSharedState; disposable socket+timers →createSharedComposable.
5.3 Derived selectors — viewport order from an explicit sort, not heap order
// selectors.ts — VERIFIED: PriorityQueue.toArray()/iteration yields HEAP order, not sorted.
export function buildSelectors(byId: ShallowRef<Map<string, Notification>>, version: Ref<number>, surfacing: PriorityQueue<string>) {
const live = computed(() => (version.value, [...byId.value.values()]));
// display order is the INVERSE of the eviction order: highest priority first, then NEWEST first
const cmpDisplay = (a: Notification, b: Notification) => (b.priority - a.priority) || (b.createdAt - a.createdAt);
const toasting = computed(() => {
version.value;
return [...surfacing].map((id) => byId.value.get(id)!).filter(Boolean).sort(cmpDisplay); // explicit sort, never heap order
});
const unreadCount = computed(() => live.value.filter((n) => n.readReceiptAt === null && n.lifecycle !== 'archived' && n.lifecycle !== 'expired').length);
const byPriority = computed(() => groupByKey(live.value, (n) => n.priority));
const byType = computed(() => groupByKey(live.value, (n) => n.type));
const groups = computed(() => collapseByGroupKey(live.value)); // "+N more"
return { toasting, unreadCount, byPriority, byType, groups };
}
5.4 Persistence + cross-tab (don't re-toast)
// persistence.ts — VERIFIED: useStorage(key, initial, storage, options); storage is a REQUIRED 3rd arg.
const persisted = useStorage<Map<string, PersistedNotification>>(
() => options.persistKey ?? 'robonen:notif',
new Map(),
localStorage,
{ serializer: StorageSerializers.map, listenToStorageChanges: true, shallow: true, initOnMounted: true }, // SSR-safe defer
);
// PersistedNotification EXCLUDES runtime fields (surfaced, pendingAction, actionError) — never resurrect in-flight state.
// useStorage write-lock prevents self-feedback; 'storage'/'vuetools-storage' events propagate to other tabs.
watch(persisted, (next) => { // remote tab wrote -> hydration-commit (NEVER toasts)
for (const p of next.values()) commit(ingestPersisted(p), 'hydration');
}, { deep: false });
mergeLattice is the convergence rule for concurrent two-tab edits:
const LIFECYCLE_RANK: Record<NotificationLifecycle, number> = { received:0, queued:1, toasted:2, read:3, dismissed:4, archived:5, expired:6 };
function mergeLattice(local: Notification, remote: Notification): void {
if (LIFECYCLE_RANK[remote.lifecycle] > LIFECYCLE_RANK[local.lifecycle]) local.lifecycle = remote.lifecycle; // max over lattice
local.readReceiptAt = maxNullable(local.readReceiptAt, remote.readReceiptAt); // per-field LWW
local.dismissedAt = maxNullable(local.dismissedAt, remote.dismissedAt);
}
Known limitation: the whole-Map serializer means a tab's write replaces the full key; with N tabs committing concurrently, whole-Map last-writer-wins can clobber a concurrent per-entity edit before
mergeLattice(which only runs on the receiving side after a storage event) reconciles it. For strict multi-tab durability, enable the optionaluseTabLeadersingle-writer path.
5.5 Optimistic actions — token-correlated ack/commit/rollback
// actions.ts
async function runAction(id: string, actionId: string): Promise<void> {
const n = byId.value.get(id); if (!n) return;
const action = options.registry.resolve(n.type)?.actions?.find((a) => a.id === actionId);
if (!action) return;
const clientToken = crypto.randomUUID(); // THE canonical correlation key
const prevLifecycle = n.lifecycle;
applyOptimistic(n, action.optimistic?.(n) ?? {}); // immediate, reactive
n.pendingAction = { actionId, clientToken, prevLifecycle }; bump();
try {
await options.client?.send({ kind: 'action', v: 1, id: crypto.randomUUID(), ts: Date.now(),
ref: clientToken, type: n.type, action: actionId, clientToken }); // server echoes clientToken in AckFrame.ref
await waitForAck(clientToken, { timeout: 8000 }); // ack-after-timeout IGNORED via token compare
n.pendingAction = null; if (action.onCommit) transition(id, action.onCommit); bump();
} catch (e) {
// DO NOT rollback on a mid-action socket drop — frame is queued, await ack post-reconnect.
if (isTimeoutOrServerError(e)) { transition(id, prevLifecycle); n.pendingAction = null; n.actionError = String(e); bump(); }
}
}
Eviction skips entities with surfaced || pendingAction !== null.
6. Type registry
// vue/toolkit/src/composables/notifications/registry.ts
export interface NotificationRendererProps<P = unknown> {
notification: Notification<string, P>;
surface: 'toast' | 'inbox'; // ONE renderer, both surfaces
runAction: (actionId: string) => void;
dismiss: () => void;
}
export interface NotificationActionDef<P = unknown> {
readonly id: string; // 'accept' | 'decline' | 'reply' | 'mark-read'
readonly label: string | ((n: Notification<string, P>) => string);
readonly altText: string; // -> ToastAction alt-text (required by the primitive)
readonly variant?: 'primary' | 'secondary' | 'danger';
readonly optimistic?: (n: Notification<string, P>) => Partial<Notification>;
readonly onCommit?: NotificationLifecycle; // terminal lifecycle on ack
}
export type ToastPolicy = 'always' | 'never' | 'by-priority';
export interface NotificationTypeDef<P = unknown> {
readonly type: string;
readonly validate: (raw: unknown) => P; // throws on malformed -> caught per-message in ingest
readonly renderer: Component; // NotificationRendererProps<P>
readonly actions?: readonly NotificationActionDef<P>[];
readonly defaultPriority?: NotificationPriority;
readonly toastPolicy?: ToastPolicy; // default 'by-priority'
readonly grouping?: (p: P) => string | null;
readonly contentHash?: (p: P) => string; // override the FNV-1a default
readonly sound?: string | false;
readonly ttl?: number; // toast auto-dismiss ms; Infinity = sticky
readonly persist?: boolean; // default true
}
export function defineNotificationType<P>(def: NotificationTypeDef<P>): NotificationTypeDef<P> { return def; }
export interface NotificationRegistry {
register<P>(def: NotificationTypeDef<P>): void;
resolve(type: string): NotificationTypeDef | undefined;
has(type: string): boolean;
shouldToast(n: Notification): boolean;
priorityFor(type: string, override?: NotificationPriority): NotificationPriority;
all(): readonly NotificationTypeDef[];
}
export function createNotificationRegistry(
defs?: readonly NotificationTypeDef[],
opts?: { onUnknownType?: (type: string) => void; toastFloor?: NotificationPriority },
): NotificationRegistry;
// provided/injected so renderers + store + ingestion share ONE instance (and tests inject a fake)
export const { inject: useNotificationRegistry, provide: provideNotificationRegistry, appProvide: appProvideNotificationRegistry } =
useContextFactory<NotificationRegistry>('NotificationRegistry');
Registering the three required types:
const friendRequest = defineNotificationType<{ fromUserId: string; name: string; avatar: string }>({
type: 'friend_request',
validate: (raw) => { const p = raw as any; if (typeof p?.fromUserId !== 'string') throw new Error('bad friend_request'); return p; },
renderer: FriendRequestCard,
defaultPriority: Priority.high,
toastPolicy: 'by-priority',
grouping: () => null,
sound: '/sounds/ping.mp3',
ttl: Infinity, // sticky until acted on
actions: [
{ id: 'accept', label: 'Accept', altText: 'Accept friend request', variant: 'primary',
optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' },
{ id: 'decline', label: 'Decline', altText: 'Decline friend request', variant: 'danger',
optimistic: () => ({ lifecycle: 'dismissed', dismissedAt: Date.now() }), onCommit: 'dismissed' },
],
});
const systemAlert = defineNotificationType<{ title: string; body: string }>({
type: 'system_alert',
validate: (raw) => raw as any,
renderer: SystemAlertCard,
defaultPriority: Priority.critical, // renders ToastRoot type='foreground' (assertive aria-live)
toastPolicy: 'always',
ttl: 60_000,
actions: [], // dismiss-only via <ToastClose>
});
const message = defineNotificationType<{ threadId: string; from: string; text: string }>({
type: 'message',
validate: (raw) => raw as any,
renderer: MessageRow,
defaultPriority: Priority.normal,
toastPolicy: 'by-priority', // normal < high floor => inbox-only (badge), no toast
grouping: (p) => `dm:${p.threadId}`, // collapse same thread => "+N more"
contentHash: (p) => `${p.threadId}:${p.text}`,
actions: [
{ id: 'reply', label: 'Reply', altText: 'Reply to message' },
{ id: 'mark-read', label: 'Mark read', altText: 'Mark message read',
optimistic: () => ({ lifecycle: 'read', readReceiptAt: Date.now() }), onCommit: 'read' },
],
});
registerNotificationTypes([friendRequest, systemAlert, message]); // tree-shakeable: only imported types ship
7. Presentation bridge
7.1 <Toaster> — drives the EXISTING toast primitive (consumer-owned list per toast/demo.vue)
<!-- vue/primitives/src/toaster/Toaster.vue -->
<script setup lang="ts">
import { ToastProvider, ToastRoot, ToastAction, ToastClose, ToastViewport } from '@robonen/primitives';
import { useNotifications, useNotificationRegistry, Priority } from '@robonen/vue';
const props = withDefaults(defineProps<{ duration?: number; swipeDirection?: 'up'|'down'|'left'|'right' }>(),
{ duration: 6000, swipeDirection: 'right' });
const store = useNotifications();
const registry = useNotificationRegistry();
const resolve = (type: string) => registry.resolve(type)!;
const durationFor = (n: { type: string; priority: number }) =>
resolve(n.type).ttl ?? (n.priority >= Priority.high ? 10_000 : props.duration);
</script>
<template>
<ToastProvider :duration="props.duration" :swipe-direction="props.swipeDirection">
<ToastRoot
v-for="n in store.toasting.value"
:key="n.id"
v-model:open="store.openModel(n.id).value"
:duration="durationFor(n)"
:type="n.priority >= Priority.critical ? 'foreground' : 'background'"
to-viewport
@update:open="(open) => { if (!open) store.dismissToast(n.id); }"
>
<component
:is="resolve(n.type).renderer"
:notification="n" surface="toast"
:run-action="(a: string) => store.runAction(n.id, a)"
:dismiss="() => store.dismissToast(n.id)"
/>
<ToastAction
v-for="a in resolve(n.type).actions ?? []"
:key="a.id" :alt-text="a.altText"
@click="store.runAction(n.id, a.id)"
>{{ typeof a.label === 'function' ? a.label(n) : a.label }}</ToastAction>
<ToastClose aria-label="Dismiss" />
</ToastRoot>
<ToastViewport class="fixed bottom-4 right-4 flex flex-col gap-2" />
</ToastProvider>
</template>
The store owns the list +
remove/dismissToast; the primitive stays consumer-driven exactly as itsdemo.vuerequires. We do not fork or extendvue/primitives/src/toast. Auto-dismiss (the primitive setsopen=false) routes through@update:open→dismissToast(toast→inbox, never deleted).
7.2 <NotificationCenter> — headless inbox, same renderer (surface='inbox')
<!-- vue/primitives/src/notification-center/NotificationCenterItem.vue -->
<script setup lang="ts">
import { useNotificationCenterContext } from './context';
import { useNotificationRegistry } from '@robonen/vue';
const props = defineProps<{ group: NotificationGroup }>();
const ctx = useNotificationCenterContext();
const registry = useNotificationRegistry();
const latest = props.group.latest;
</script>
<template>
<li :data-state="latest.readReceiptAt ? 'read' : 'unread'" class="nc-item">
<component
:is="registry.resolve(latest.type)!.renderer"
:notification="latest" surface="inbox"
:run-action="(a: string) => ctx.runAction(latest.id, a)"
:dismiss="() => ctx.archive(latest.id)"
/>
<span v-if="group.count > 1" class="nc-more">+{{ group.count - 1 }} more</span>
</li>
</template>
Priority + policy decides toast-vs-inbox entirely inside registry.shouldToast(n): 'always'→toast;
'never'→inbox-only; 'by-priority'→n.priority >= toastFloor (default high). Every notification
lands in the inbox (durable Deque); only shouldToast ones also enter the surfacing PriorityQueue.
critical renders type='foreground' (assertive aria-live) and interrupts the lowest visible toast.
8. Public API & usage
// vue/toolkit/src/composables/notifications/useNotifications/index.ts
export interface UseNotificationsReturn {
entities: Readonly<ShallowRef<Map<string, Notification>>>;
toasting: ComputedRef<Notification[]>;
groups: ComputedRef<NotificationGroup[]>;
unreadCount: ComputedRef<number>;
byPriority: ComputedRef<Record<NotificationPriority, Notification[]>>;
byType: ComputedRef<Record<string, Notification[]>>;
connectionState: Readonly<ShallowRef<RealtimeState>>;
notify: <P>(input: { type: string; payload: P; priority?: NotificationPriority }) => string; // local -> SAME funnel
toast: ToastSugar; // toast.success/error/dismiss -> built-in '__toast' type
markRead: (id: string) => void; markAllRead: () => void;
dismissToast: (id: string) => void; archive: (id: string) => void;
runAction: (id: string, actionId: string) => Promise<void>;
openModel: (id: string) => Ref<boolean>;
}
export function useNotifications(): UseNotificationsReturn; // full surface
export function useNotificationCenter(): UseNotificationCenterReturn; // inbox/groups/filter/markAllRead
export function useUnreadCount(): Readonly<ComputedRef<number>>; // cheap badge — does NOT pull the whole store
End-to-end (a Nuxt plugin / app entry):
// plugins/notifications.client.ts
import { createNotificationRegistry, appProvideNotificationRegistry, registerNotificationTypes,
useSharedRealtime } from '@robonen/vue';
import { createWebSocketTransport } from '@robonen/realtime';
import { authPlugin, jsonCodec, heartbeatPlugin, resumePlugin, loggerPlugin } from '@robonen/realtime/plugins';
export default defineNuxtPlugin((nuxtApp) => {
const registry = createNotificationRegistry();
registerNotificationTypes([friendRequest, systemAlert, message]); // bootstrap, once
// CORRECTED: use appProvide at the plugin layer (component-level `provide` only works inside setup)
appProvideNotificationRegistry(nuxtApp.vueApp)(registry);
useSharedRealtime({
transport: createWebSocketTransport(),
url: () => `wss://api.example.com/notifications?u=${useUserId().value}`,
plugins: [authPlugin(getToken), jsonCodec, heartbeatPlugin(), resumePlugin(), loggerPlugin(console.log)],
backoff: { base: 500, cap: 30_000, jitter: true },
});
});
<!-- App.vue : mount presentation ONCE near the root -->
<template>
<NuxtPage />
<ClientOnly><Toaster /></ClientOnly> <!-- client-only: no socket/badge during SSR -->
</template>
<!-- HeaderBell.vue : cheap badge, rendered only after mount to avoid hydration mismatch -->
<script setup lang="ts">
import { useUnreadCount } from '@robonen/vue';
const unread = useUnreadCount();
const mounted = ref(false); onMounted(() => mounted.value = true);
</script>
<template><button>🔔 <span v-if="mounted && unread.value">{{ unread.value }}</span></button></template>
// anywhere: local notification + imperative toast (SAME ingest funnel)
const { notify, toast, runAction } = useNotifications();
notify({ type: 'message', payload: { threadId: 't1', from: 'Ana', text: 'hi' } });
toast.success('Saved', { description: 'Your changes are live.' });
toast.error('Upload failed', { actions: [{ id: 'retry', label: 'Retry', altText: 'Retry upload' }] });
await runAction(friendReqId, 'accept'); // optimistic -> WS action -> ack commit / timeout rollback
9. SSR, testing, tree-shaking
SSR. core/realtime imports nothing from Vue and touches WebSocket/BroadcastChannel/timers only
after connect(). useRealtime returns ssrNoopRealtime() when !isClient. useStorage defers with
initOnMounted. useSharedRealtime (createSharedComposable) returns a fresh raw instance on the
server. The unread badge renders only after onMounted. <Toaster> lives under <ClientOnly>.
Testing (house style: describe(fnName), effectScope() disposal, MockTransport, fake registry):
// store.test.ts
describe('createNotificationStore', () => {
it('surfacing interrupt evicts lowest toast back to inbox, never deletes', () => {
const scope = effectScope();
scope.run(() => {
const store = createNotificationStore({ registry: fakeRegistry, maxToasts: 1 });
store.commit(ingest(frame('a', { priority: Priority.low })), 'ingest');
store.commit(ingest(frame('b', { priority: Priority.high })), 'ingest'); // interrupts
expect(store.toasting.value.map(n => n.id)).toEqual(['b']);
expect(store.entities.value.get('a')?.lifecycle).toBe('queued'); // demoted, still present
});
scope.stop();
});
it('hydration-commit never toasts (cross-tab no-re-toast)', () => {
const store = createNotificationStore({ registry: fakeRegistry });
store.commit(ingest(frame('x', { priority: Priority.critical })), 'hydration');
expect(store.toasting.value).toHaveLength(0); // inbox only
expect(store.unreadCount.value).toBe(1);
});
it('two-key dedup folds identical content into meta.count', () => { /* same contentHash, distinct id -> count=2 */ });
it('disposes timers/subscriptions on scope stop', () => { /* spy + effectScope().stop() */ });
});
// machine.test.ts — drive via await machine.send(event)
describe('buildLifecycleMachine', () => {
it('NET_OFFLINE pauses retries without scheduling RETRY', async () => {
const m = buildLifecycleMachine(depsOffline);
await m.send('CONNECT'); await m.send('OPENED'); await m.send('NET_OFFLINE');
expect(depsOffline.scheduleRetry).not.toHaveBeenCalled();
});
it('cursor advances ONLY on contiguous delivery', () => {
const rb = createReorderBuffer({});
expect(rb.accept(frame('a', { seq: 1 })).map(f => f.seq)).toEqual([1]); expect(rb.cursor).toBe('1');
expect(rb.accept(frame('c', { seq: 3 }))).toEqual([]); expect(rb.cursor).toBe('1'); // gap held
expect(rb.accept(frame('b', { seq: 2 })).map(f => f.seq)).toEqual([2, 3]); expect(rb.cursor).toBe('3'); // drains
});
it('drops duplicates by id and seq<=lastSeq', () => { /* idempotent at-least-once */ });
});
Plus explicit tests for seq-gap-then-reconnect (assert no loss/dup), storage-event toast suppression, bounded-LRU dedup eviction, and leader handoff on abrupt tab close.
Tree-shaking. Per-category barrels (composables/notifications/index.ts → composables/index.ts →
src/index.ts). Registry descriptors are side-effect-free; only the defineNotificationTypes a
consumer imports + register()s ship. Realtime plugins are separate modules under
@robonen/realtime/plugins. useUnreadCount imports only the badge selector.
10. Build order / milestones
| # | Milestone | Deliverables | Gate |
|---|---|---|---|
| M1 | Transport core skeleton | core/realtime: Transport iface, createWebSocketTransport, envelope.ts, MockTransport. |
Connect/send/recv against a mock; JSON round-trip. |
| M2 | Lifecycle + reliability | machine.ts, backoff.ts (jitter, gated), heartbeat/resume plugins, reorder.ts (contiguous cursor + gap-timeout + LruSet), outbound.ts (AsyncPool+Deque+ack by clientToken). |
machine/reorder/backoff tests green; seq-gap-reconnect asserts no loss. |
| M3 | Plugin pipeline + 2nd transport | plugin.ts 1:1 with core/fetch; auth/codec/logger/compression; ship createSseTransport. |
SSE swaps in with zero upper-layer change. |
| M4 | Registry + ingest funnel | registry.ts, defineNotificationType, ingest.ts (pure, catches validate throws → onReject), contentHash. |
One funnel for WS + local; malformed payload never crashes the pump. |
| M5 | Domain store | store.ts, surfacing.ts (victim-min PQ + guarded enqueue + ensureCapacity), selectors.ts (explicit sort), pure-reducer lifecycle, two-key dedup. |
Interrupt/eviction/dedup/disposal tests green. |
| M6 | Persistence + cross-tab | persistence.ts (useStorage map, merge lattice, toastedIds), hydration-commit separation, optional useTabLeader+broadcastedRef. |
Storage-event suppression + leader-handoff tests green. |
| M7 | Presentation | <Toaster> (drives existing toast), <NotificationCenter> primitive, shared surface-prop renderers; useNotifications/useNotificationCenter/useUnreadCount; notify()/toast.*. |
Optimistic ack/rollback + a11y verified; the 3 demo types work end-to-end. |
Hardest risks & mitigations
- Resume gap / cursor correctness. Cursor advances only on contiguous delivery; gap-timeout deliver-with-gap + a server reconciliation request for permanently-lost ranges. Test: inject gaps + reconnect, assert no loss/dup.
- Zombie connections (TCP half-open). Watchdog cleared by any inbound frame, armed whenever
connected; conservative configurableping 20s/watchdog 10s. PriorityQueueis a min-heap. The surfacing PQ comparator makes the eviction victim the minimum (lowest priority, then oldest); viewport order is a separate display sort.surface()peek/isFull-guards thendequeue-then-enqueue; nevertoArray()for display.- Cross-tab double-toast.
commit()separates ingest (may toast) from hydration (never); per-tab non-persistedtoastedIds;useTabLeader(Web Locks) for the optional single socket; neveruseEventBusfor cross-tab; never a hand-rolled lease. - Singleton arg-dropping. Inbox/registry via
useAppSharedState(options bound once); socket viacreateSharedComposable(disposable, raw-on-SSR). - Optimistic double-apply. Idempotent server commands keyed by
clientToken(the one canonical correlation key); ack-after-timeout ignored via token compare; do not rollback on mid-action socket drop; eviction skipspendingAction/surfaced. - Persistence schema. Persist only committed lifecycle + read/dismiss receipts + cursor (exclude runtime fields); Map serializer; validate-on-hydrate, drop+log unknown-type entities. Whole-Map LWW is the multi-tab durability limit — opt into leader single-writer for strictness.
- Durable-Deque overflow.
pushBackthrows atmaxSize, soensureCapacity()runs before every insert and force-evicts the oldest non-active entity if all candidates are protected. - SSR hydration mismatch. Badge/connection UI render only after mount;
<Toaster>under<ClientOnly>;isClientguards throughout.
Appendix — review corrections folded into this document
This design was adversarially verified against the real repo source. Corrections already applied above:
- PriorityQueue orientation — it's a min-heap; the surfacing comparator now makes the eviction victim the minimum, and the viewport uses a separate display sort (§5.2, §5.3).
- Registry provisioning — the Nuxt plugin uses
appProvide(nuxtApp.vueApp)instead of component-levelprovide(§6, §8). - Bounded LRU dedup —
seenis a real boundedLruSet, not an unboundedSet(§4.5). - Backoff —
backoffDelayis bespoke and does not reuse stdlibretry()(§4.3). - Ack correlation — one canonical key (
clientToken) end-to-end;AckFrame.refechoes it (§4.1, §4.6, §5.5). - Network boundary — the Vue
useRealtimewrapper watchesuseNetwork().isOnlineand dispatchesNET_OFFLINE/NET_ONLINE;core/realtimenever imports Vue (§4.7). - Durable overflow — explicit
ensureCapacity()before insert (theDequethrows when full) (§5.2). - Server contract — the monotonic-seq / replay / reconcile / token-echo requirements are stated explicitly (§4.1).