From aa3148f4e4d30f97c1d595062888cce3fc33e447 Mon Sep 17 00:00:00 2001 From: robonen Date: Sun, 7 Jun 2026 03:57:58 +0700 Subject: [PATCH] feat: enhance entity management and reactivity in vue-sync-engine --- shiki-vue-wrapper/src/App.vue | 8 +- shiki-vue-wrapper/src/ShikiCode/ShikiCode.vue | 32 +-- .../src/ShikiCode/ShikiStatic.vue | 33 +++ .../src/ShikiCode/shiki-host.css | 26 +++ .../src/ShikiCode/vite-plugin-shiki.ts | 106 +++++++++ shiki-vue-wrapper/src/shiki.d.ts | 5 + shiki-vue-wrapper/vite.config.ts | 7 +- .../lib/src/__tests__/composables.test.ts | 57 +++++ .../lib/src/__tests__/engine.test.ts | 130 ++++++++++- .../lib/src/__tests__/mirror.test.ts | 214 +++++++++++++++++- .../lib/src/composables/useInfiniteQuery.ts | 18 +- .../lib/src/composables/useQuery.ts | 22 +- vue-sync-engine/lib/src/core/types.ts | 4 + vue-sync-engine/lib/src/createEngine.ts | 20 +- vue-sync-engine/lib/src/tab/mirror.ts | 111 +++++++-- .../lib/src/worker/mutationQueue.ts | 9 +- vue-sync-engine/lib/src/worker/queryGraph.ts | 133 ++++++++++- 17 files changed, 840 insertions(+), 95 deletions(-) create mode 100644 shiki-vue-wrapper/src/ShikiCode/ShikiStatic.vue create mode 100644 shiki-vue-wrapper/src/ShikiCode/shiki-host.css create mode 100644 shiki-vue-wrapper/src/ShikiCode/vite-plugin-shiki.ts create mode 100644 shiki-vue-wrapper/src/shiki.d.ts diff --git a/shiki-vue-wrapper/src/App.vue b/shiki-vue-wrapper/src/App.vue index 171810a..a1efadd 100644 --- a/shiki-vue-wrapper/src/App.vue +++ b/shiki-vue-wrapper/src/App.vue @@ -1,12 +1,14 @@ diff --git a/shiki-vue-wrapper/src/ShikiCode/ShikiCode.vue b/shiki-vue-wrapper/src/ShikiCode/ShikiCode.vue index 9d44a34..79dc69d 100644 --- a/shiki-vue-wrapper/src/ShikiCode/ShikiCode.vue +++ b/shiki-vue-wrapper/src/ShikiCode/ShikiCode.vue @@ -2,6 +2,7 @@ import { computed } from 'vue' import type { ShikiTransformer } from 'shiki/core' import { useShikiHighlight } from './useShikiHighlight' +import './shiki-host.css' const props = withDefaults( defineProps<{ @@ -47,33 +48,4 @@ const gutterStyle = computed(() => {
{{ code }}
- - - \ No newline at end of file + \ No newline at end of file diff --git a/shiki-vue-wrapper/src/ShikiCode/ShikiStatic.vue b/shiki-vue-wrapper/src/ShikiCode/ShikiStatic.vue new file mode 100644 index 0000000..27753f3 --- /dev/null +++ b/shiki-vue-wrapper/src/ShikiCode/ShikiStatic.vue @@ -0,0 +1,33 @@ + + + diff --git a/shiki-vue-wrapper/src/ShikiCode/shiki-host.css b/shiki-vue-wrapper/src/ShikiCode/shiki-host.css new file mode 100644 index 0000000..e690d4b --- /dev/null +++ b/shiki-vue-wrapper/src/ShikiCode/shiki-host.css @@ -0,0 +1,26 @@ +.shiki-host .shiki { + padding: 1rem; + overflow-x: auto; + scrollbar-width: thin; + scrollbar-color: var(--color-zinc-300) var(--color-zinc-950); +} + +.shiki-host[data-line-numbers] .shiki code { + counter-reset: shiki-line calc(var(--shiki-line-start, 1) - 1); +} + +.shiki-host[data-line-numbers] .shiki code .line::before { + counter-increment: shiki-line; + content: counter(shiki-line); + display: inline-block; + width: var(--shiki-gutter-width, 2ch); + margin-right: 1.25rem; + text-align: right; + color: color-mix(in srgb, currentColor 40%, transparent); + user-select: none; +} + +/* shiki иногда оставляет пустую финальную строку — прячем её номер */ +.shiki-host[data-line-numbers] .shiki code .line:last-child:empty::before { + content: none; +} diff --git a/shiki-vue-wrapper/src/ShikiCode/vite-plugin-shiki.ts b/shiki-vue-wrapper/src/ShikiCode/vite-plugin-shiki.ts new file mode 100644 index 0000000..47ee82e --- /dev/null +++ b/shiki-vue-wrapper/src/ShikiCode/vite-plugin-shiki.ts @@ -0,0 +1,106 @@ +import { readFile } from 'node:fs/promises'; +import type { Plugin } from 'vite'; +import { + createHighlighterCore, + type HighlighterCore, + type ShikiTransformer, +} from 'shiki/core'; +import { createJavaScriptRegexEngine } from 'shiki/engine/javascript'; + +const SHIKI_QUERY = 'shiki'; + +/** Расширение файла -> id грамматики Shiki. */ +const EXT_TO_LANG: Record = { + js: 'javascript', mjs: 'javascript', cjs: 'javascript', jsx: 'jsx', + ts: 'typescript', mts: 'typescript', cts: 'typescript', tsx: 'tsx', + vue: 'vue', json: 'json', jsonc: 'jsonc', css: 'css', scss: 'scss', + html: 'html', md: 'markdown', py: 'python', rs: 'rust', go: 'go', + sh: 'bash', bash: 'bash', yml: 'yaml', yaml: 'yaml', toml: 'toml', sql: 'sql', +}; + +export interface ShikiPluginOptions { + /** Одиночная тема (по умолчанию aurora-x). Игнорируется, если задан `themes`. */ + theme?: string; + /** Парные темы — Shiki отдаёт HTML с CSS-переменными для light/dark. */ + themes?: { light: string; dark: string }; + /** Доп. соответствия расширение -> язык поверх дефолтных. */ + langAlias?: Record; + /** Трансформеры Shiki (номера строк, диффы и т.п.). */ + transformers?: ShikiTransformer[]; +} + +/** + * Импорт `./snippet.ts?shiki` возвращает строку с уже подсвеченным HTML. + * Вся работа Shiki происходит в Node на этапе сборки/дева — в бандл клиента + * не попадает ни движок, ни грамматики. Zero runtime. + * + * Язык берётся из расширения файла, либо из `?shiki&lang=...`. + */ +export function shiki(options: ShikiPluginOptions = {}): Plugin { + const { theme = 'aurora-x', themes, transformers } = options; + const extToLang = { ...EXT_TO_LANG, ...options.langAlias }; + + let highlighter: Promise | null = null; + const loadedLangs = new Set(); + const loadedThemes = new Set(); + + const getHighlighter = () => { + highlighter ??= createHighlighterCore({ + langs: [], + themes: [], + engine: createJavaScriptRegexEngine(), + }); + return highlighter; + }; + + const ensureLang = async (hl: HighlighterCore, lang: string) => { + if (loadedLangs.has(lang)) return; + const mod = await import(`shiki/langs/${lang}.mjs`); + await hl.loadLanguage(mod.default); + loadedLangs.add(lang); + }; + + const ensureTheme = async (hl: HighlighterCore, name: string) => { + if (loadedThemes.has(name)) return; + const mod = await import(`shiki/themes/${name}.mjs`); + await hl.loadTheme(mod.default); + loadedThemes.add(name); + }; + + return { + name: 'vite-plugin-shiki', + enforce: 'pre', + + async load(id) { + const [filepath, rawQuery] = id.split('?', 2); + if (!rawQuery) return; + + const params = new URLSearchParams(rawQuery); + if (!params.has(SHIKI_QUERY)) return; + + const ext = filepath.split('.').pop()?.toLowerCase() ?? ''; + const lang = params.get('lang') ?? extToLang[ext] ?? ext ?? 'text'; + + // Перечитываем исходник сами + регистрируем как зависимость для HMR. + const source = await readFile(filepath, 'utf8'); + this.addWatchFile(filepath); + + const hl = await getHighlighter(); + await ensureLang(hl, lang); + if (themes) { + await ensureTheme(hl, themes.light); + await ensureTheme(hl, themes.dark); + } else { + await ensureTheme(hl, theme); + } + + const html = hl.codeToHtml(source.replace(/\n$/, ''), { + lang, + ...(themes ? { themes } : { theme }), + transformers, + }); + + return { code: `export default ${JSON.stringify(html)}`, map: null }; + }, + }; +} diff --git a/shiki-vue-wrapper/src/shiki.d.ts b/shiki-vue-wrapper/src/shiki.d.ts new file mode 100644 index 0000000..e9cb667 --- /dev/null +++ b/shiki-vue-wrapper/src/shiki.d.ts @@ -0,0 +1,5 @@ +/** Импорт `*?shiki` отдаёт уже подсвеченный HTML (см. vite-plugin-shiki). */ +declare module '*?shiki' { + const html: string; + export default html; +} diff --git a/shiki-vue-wrapper/vite.config.ts b/shiki-vue-wrapper/vite.config.ts index 1cd32ad..503eb5f 100644 --- a/shiki-vue-wrapper/vite.config.ts +++ b/shiki-vue-wrapper/vite.config.ts @@ -1,7 +1,12 @@ import { defineConfig } from 'vite'; import vue from '@vitejs/plugin-vue'; import tailwind from '@tailwindcss/vite'; +import { shiki } from './src/ShikiCode/vite-plugin-shiki'; export default defineConfig({ - plugins: [vue(), tailwind()], + plugins: [ + vue(), + tailwind(), + shiki({ theme: 'aurora-x' }), + ], }); diff --git a/vue-sync-engine/lib/src/__tests__/composables.test.ts b/vue-sync-engine/lib/src/__tests__/composables.test.ts index dda5236..6d40531 100644 --- a/vue-sync-engine/lib/src/__tests__/composables.test.ts +++ b/vue-sync-engine/lib/src/__tests__/composables.test.ts @@ -125,6 +125,36 @@ describe('useQuery', () => { m.unmount() }) + it('data switches to the new subscription result after args change', async () => { + const list = vi.fn(async (a: { search?: string }): Promise => ({ + items: a.search ? [{ id: '2', name: 'Bob', age: 25 }] : [{ id: '1', name: 'Ada', age: 30 }], + nextCursor: null, + })) + const { engine, defs } = buildEngine({ list, update: vi.fn() }) + + const search = ref('') + let api!: ReturnType> + const C = defineComponent({ + setup() { + api = useQuery(defs.usersList, () => ({ search: search.value })) + return () => h('div') + }, + }) + const m = mountWith(engine, C) + await flush() + await flush() + expect(api.data.value).toEqual({ ids: ['1'] }) + + search.value = 'b' + await nextTick() + await flush() + await flush() + // Computeds must follow the swapped-in subscription ref, not stay bound to the old one. + expect(api.data.value).toEqual({ ids: ['2'] }) + expect(api.isSuccess.value).toBe(true) + m.unmount() + }) + it('releases handle on unmount', async () => { const list = vi.fn(async () => ({ items: [], nextCursor: null })) const { engine, defs } = buildEngine({ list, update: vi.fn() }) @@ -194,6 +224,33 @@ describe('useInfiniteQuery', () => { expect(list.mock.calls.length).toBeGreaterThanOrEqual(2) m.unmount() }) + + it('pages switch to the new subscription result after args change', async () => { + const list = vi.fn(async (a: { search?: string }): Promise => ({ + items: a.search ? [{ id: '2', name: 'B', age: 2 }] : [{ id: '1', name: 'A', age: 1 }], + nextCursor: null, + })) + const { engine, defs } = buildEngine({ list, update: vi.fn() }) + const search = ref('') + let api!: ReturnType> + const C = defineComponent({ + setup() { + api = useInfiniteQuery(defs.usersInfinite, () => ({ search: search.value })) + return () => h('div') + }, + }) + const m = mountWith(engine, C) + await flush() + await flush() + expect(api.pages.value[0]?.ids).toEqual(['1']) + + search.value = 'q' + await nextTick() + await flush() + await flush() + expect(api.pages.value[0]?.ids).toEqual(['2']) + m.unmount() + }) }) describe('useEntity', () => { diff --git a/vue-sync-engine/lib/src/__tests__/engine.test.ts b/vue-sync-engine/lib/src/__tests__/engine.test.ts index fee431f..770f046 100644 --- a/vue-sync-engine/lib/src/__tests__/engine.test.ts +++ b/vue-sync-engine/lib/src/__tests__/engine.test.ts @@ -8,15 +8,21 @@ import { memoryAdapter } from '../adapters/storageAdapter' import { Status } from '../core/flags' import { flush, makeUserDefs, type ListUsersResp, type User, UserEntity } from './fixtures' -function setup(api: { list: any; update: any }) { +function setup( + api: { list: any; update: any }, + options?: { defaultMaxPages?: number; entityCap?: number; entityGc?: boolean; defaultGcTime?: number }, +) { const defs = makeUserDefs(api) const storage = memoryAdapter() const { client, server } = createInlineTransport() let onlineCb: (() => void) | null = null let online = true - createQueryGraph({ + const graph = createQueryGraph({ storage, endpoint: server, + defaultMaxPages: options?.defaultMaxPages, + entityGc: options?.entityGc, + defaultGcTime: options?.defaultGcTime, registry: { entities: new Map([[UserEntity.name, UserEntity]]), queries: new Map([ @@ -31,12 +37,13 @@ function setup(api: { list: any; update: any }) { return () => {} }, }) - const mirror = createMirror() + const mirror = createMirror({ entityCap: options?.entityCap }) const runtime = createTabRuntime({ transport: client, mirror, staleSubGcMs: 10 }) return { runtime, defs, storage, + graph, setOnline(v: boolean) { online = v if (v && onlineCb) onlineCb() @@ -200,6 +207,48 @@ describe('useInfiniteQuery', () => { expect(state.value.data?.pages[1].ids).toEqual(['2']) scope.stop() }) + + it('windows pages to maxPages, dropping the oldest page and its entity refs', async () => { + let call = 0 + const list = vi.fn(async (): Promise => { + call++ + if (call === 1) return { items: [{ id: '1', name: 'A', age: 1 }], nextCursor: 'c1' } + if (call === 2) return { items: [{ id: '2', name: 'B', age: 2 }], nextCursor: 'c2' } + return { items: [{ id: '3', name: 'C', age: 3 }], nextCursor: null } + }) + const { runtime, defs, graph } = setup({ list, update: vi.fn() }, { defaultMaxPages: 2 }) + + const scope = effectScope() + let handle!: ReturnType + scope.run(() => { + handle = runtime.subscribeQuery(defs.usersInfinite.name, defs.usersInfinite.key({}), {}) + }) + await flush() + await flush() + + type R = { ids: string[]; nextCursor: string | null } + const state = runtime.mirror.ensureQuery<{ pages: R[]; pageParams: unknown[] }>(handle.subId) + + handle.fetchNextPage() + await flush() + await flush() + expect(state.value.data?.pages.length).toBe(2) + + handle.fetchNextPage() + await flush() + await flush() + + // Only the last two pages are retained; the first (id '1') is dropped. + expect(state.value.data?.pages.length).toBe(2) + expect(state.value.data?.pages.map((p) => p.ids)).toEqual([['2'], ['3']]) + expect(state.value.data?.pageParams.length).toBe(2) + + // The worker node's entity refs are windowed in lockstep with the pages. + const node = [...graph.nodes.values()].find((n) => n.def.name === defs.usersInfinite.name)! + expect(node.entityRefs.map((r) => r.id)).toEqual(['2', '3']) + expect(node.pageRefCounts).toEqual([1, 1]) + scope.stop() + }) }) describe('GC', () => { @@ -217,3 +266,78 @@ describe('GC', () => { } }) }) + +describe('worker entity GC', () => { + const users = () => ({ items: [{ id: '1', name: 'A', age: 1 }, { id: '2', name: 'B', age: 2 }], nextCursor: null }) + + it('keeps worker entities forever when entityGc is off (default)', async () => { + const { runtime, defs, graph } = setup({ list: vi.fn(users), update: vi.fn() }, { defaultGcTime: 15 }) + const handle = runtime.subscribeQuery(defs.usersList.name, defs.usersList.key({}), {}) + await flush() + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(2) + + handle.release() + await new Promise((r) => setTimeout(r, 60)) + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(2) // not reclaimed + }) + + it('frees entities once their only query node is garbage-collected', async () => { + const { runtime, defs, graph } = setup({ list: vi.fn(users), update: vi.fn() }, { entityGc: true, defaultGcTime: 15 }) + const handle = runtime.subscribeQuery(defs.usersList.name, defs.usersList.key({}), {}) + await flush() + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(2) + + handle.release() + await new Promise((r) => setTimeout(r, 60)) + await flush() + expect(graph.entitiesInMemory.get('user')?.size ?? 0).toBe(0) // reclaimed + }) + + it('keeps an entity alive while another query still references it', async () => { + const list = vi.fn(async () => ({ items: [{ id: '1', name: 'A', age: 1 }], nextCursor: null })) + const { runtime, defs, graph } = setup({ list, update: vi.fn() }, { entityGc: true, defaultGcTime: 15 }) + const h1 = runtime.subscribeQuery(defs.usersList.name, defs.usersList.key({}), {}) + const h2 = runtime.subscribeQuery(defs.usersInfinite.name, defs.usersInfinite.key({}), {}) + await flush() + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(1) + + h1.release() + await new Promise((r) => setTimeout(r, 60)) + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(1) // infinite query still holds it + + h2.release() + await new Promise((r) => setTimeout(r, 60)) + await flush() + expect(graph.entitiesInMemory.get('user')?.size ?? 0).toBe(0) + }) + + it('an in-flight mutation pins its entity against eviction until it settles', async () => { + let resolveMut!: (v: User) => void + const update = vi.fn(() => new Promise((r) => { resolveMut = r })) + const list = vi.fn(async () => ({ items: [{ id: '1', name: 'A', age: 1 }], nextCursor: null })) + const { runtime, defs, graph } = setup({ list, update }, { entityGc: true, defaultGcTime: 15 }) + + const handle = runtime.subscribeQuery(defs.usersList.name, defs.usersList.key({}), {}) + await flush() + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(1) + + // Start an optimistic mutation (pins user '1'), then drop the only query referencing it. + void runtime.mutate(defs.updateUser.name, { id: '1', patch: { name: 'X' } }) + await flush() + handle.release() + await new Promise((r) => setTimeout(r, 60)) + await flush() + expect(graph.entitiesInMemory.get('user')?.size).toBe(1) // pinned -> survived node GC + + resolveMut({ id: '1', name: 'X', age: 1 }) + await flush() + await flush() + expect(graph.entitiesInMemory.get('user')?.size ?? 0).toBe(0) // unpinned + unreferenced -> freed + }) +}) diff --git a/vue-sync-engine/lib/src/__tests__/mirror.test.ts b/vue-sync-engine/lib/src/__tests__/mirror.test.ts index c39ba1b..ecf7cc5 100644 --- a/vue-sync-engine/lib/src/__tests__/mirror.test.ts +++ b/vue-sync-engine/lib/src/__tests__/mirror.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import { effectScope, nextTick, watchEffect } from 'vue' import { createMirror } from '../tab/mirror' +import { entityKey } from '../core/queryKey' import { Op, Status } from '../core/flags' describe('mirror.applyEntityPatches', () => { @@ -20,22 +21,22 @@ describe('mirror.applyEntityPatches', () => { expect(m.getEntity('user', '1')).toBeUndefined() }) - it('triggers reactivity for all touched types', async () => { + it('triggers reactivity for every touched entity in a batch', async () => { const m = createMirror() - const seen = { user: 0, post: 0, tag: 0 } + const seen = { u1: 0, p1: 0, t1: 0 } const scope = effectScope() scope.run(() => { watchEffect(() => { - m.getEntity('user', 'noop') - seen.user++ + m.getEntity('user', '1') + seen.u1++ }) watchEffect(() => { - m.getEntity('post', 'noop') - seen.post++ + m.getEntity('post', 'p1') + seen.p1++ }) watchEffect(() => { - m.getEntity('tag', 'noop') - seen.tag++ + m.getEntity('tag', 't1') + seen.t1++ }) }) await nextTick() @@ -49,9 +50,59 @@ describe('mirror.applyEntityPatches', () => { ]) await nextTick() - expect(seen.user).toBeGreaterThan(before.user) - expect(seen.post).toBeGreaterThan(before.post) - expect(seen.tag).toBeGreaterThan(before.tag) + expect(seen.u1).toBeGreaterThan(before.u1) + expect(seen.p1).toBeGreaterThan(before.p1) + expect(seen.t1).toBeGreaterThan(before.t1) + scope.stop() + }) + + it('does NOT re-run readers of unaffected sibling entities (fine-grained)', async () => { + const m = createMirror() + let reads1 = 0 + let reads2 = 0 + const scope = effectScope() + scope.run(() => { + watchEffect(() => { + m.getEntity('user', '1') + reads1++ + }) + watchEffect(() => { + m.getEntity('user', '2') + reads2++ + }) + }) + await nextTick() + const before2 = reads2 + + // Mutating user/1 must not invalidate the reader of user/2. + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Set, path: [], value: { v: 1 } } }]) + await nextTick() + + expect(reads1).toBeGreaterThan(0) + expect(reads2).toBe(before2) + scope.stop() + }) + + it('triggers a reader that initially saw undefined when its entity is created', async () => { + const m = createMirror() + let value: unknown + let runs = 0 + const scope = effectScope() + scope.run(() => { + watchEffect(() => { + value = m.getEntity('user', 'late') + runs++ + }) + }) + await nextTick() + expect(value).toBeUndefined() + const before = runs + + m.applyEntityPatches([{ type: 'user', id: 'late', patch: { op: Op.Set, path: [], value: { id: 'late' } } }]) + await nextTick() + + expect(runs).toBeGreaterThan(before) + expect(value).toEqual({ id: 'late' }) scope.stop() }) @@ -60,6 +111,53 @@ describe('mirror.applyEntityPatches', () => { expect(() => m.applyEntityPatches([])).not.toThrow() }) + it('prunes the version ref when an entity is deleted', () => { + const m = createMirror() + // A read creates the per-entity version ref. + m.getEntity('user', '1') + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Set, path: [], value: { id: '1' } } }]) + expect(m.versions.has(entityKey('user', '1'))).toBe(true) + + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Delete, path: [] } }]) + expect(m.versions.has(entityKey('user', '1'))).toBe(false) + }) + + it('does not create version refs for entities that are written but never read', () => { + const m = createMirror() + m.applyEntityPatches([{ type: 'user', id: '99', patch: { op: Op.Set, path: [], value: { id: '99' } } }]) + expect(m.versions.has(entityKey('user', '99'))).toBe(false) + expect(m.getEntity('user', '99')).toEqual({ id: '99' }) + }) + + it('stays reactive after a delete prunes and the entity is re-created', async () => { + const m = createMirror() + let value: unknown + let runs = 0 + const scope = effectScope() + scope.run(() => { + watchEffect(() => { + value = m.getEntity('user', '1') + runs++ + }) + }) + await nextTick() + + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Set, path: [], value: { v: 1 } } }]) + await nextTick() + expect(value).toEqual({ v: 1 }) + + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Delete, path: [] } }]) + await nextTick() + expect(value).toBeUndefined() // reader re-ran and re-created its ref + + const after = runs + m.applyEntityPatches([{ type: 'user', id: '1', patch: { op: Op.Set, path: [], value: { v: 2 } } }]) + await nextTick() + expect(runs).toBeGreaterThan(after) // still reactive on the re-created entity + expect(value).toEqual({ v: 2 }) + scope.stop() + }) + it('handles non-root delete by merging the path', () => { const m = createMirror() m.applyEntityPatches([ @@ -105,3 +203,97 @@ describe('mirror.query state', () => { expect(r2.value.status).toBe(Status.Idle) }) }) + +describe('mirror LRU cap', () => { + const set = (id: string, value: unknown = { id }) => ({ + type: 'user', + id, + patch: { op: Op.Set, path: [] as never[], value }, + }) + + it('evicts the least-recently-used entity when over cap', () => { + const m = createMirror({ entityCap: 2 }) + m.applyEntityPatches([set('1')]) + m.applyEntityPatches([set('2')]) + m.applyEntityPatches([set('3')]) // overflow -> evict oldest ('1') + + expect(m.getEntity('user', '1')).toBeUndefined() + expect(m.getEntity('user', '2')).toEqual({ id: '2' }) + expect(m.getEntity('user', '3')).toEqual({ id: '3' }) + }) + + it('a read marks an entity recently-used so it survives eviction', () => { + const m = createMirror({ entityCap: 2 }) + m.applyEntityPatches([set('1')]) + m.applyEntityPatches([set('2')]) + m.getEntity('user', '1') // touch '1' -> now '2' is the LRU + m.applyEntityPatches([set('3')]) // evicts '2' + + expect(m.getEntity('user', '1')).toEqual({ id: '1' }) + expect(m.getEntity('user', '2')).toBeUndefined() + expect(m.getEntity('user', '3')).toEqual({ id: '3' }) + }) + + it('a write marks an entity recently-used so it survives eviction', () => { + const m = createMirror({ entityCap: 2 }) + m.applyEntityPatches([set('1')]) + m.applyEntityPatches([set('2')]) + m.applyEntityPatches([set('1', { id: '1', v: 2 })]) // touch '1' -> '2' is LRU + m.applyEntityPatches([set('3')]) // evicts '2' + + expect(m.getEntity('user', '1')).toEqual({ id: '1', v: 2 }) + expect(m.getEntity('user', '2')).toBeUndefined() + expect(m.getEntity('user', '3')).toEqual({ id: '3' }) + }) + + it('caps each type independently', () => { + const m = createMirror({ entityCap: 2 }) + m.applyEntityPatches([ + { type: 'user', id: 'u1', patch: { op: Op.Set, path: [], value: 1 } }, + { type: 'user', id: 'u2', patch: { op: Op.Set, path: [], value: 1 } }, + { type: 'user', id: 'u3', patch: { op: Op.Set, path: [], value: 1 } }, + { type: 'post', id: 'p1', patch: { op: Op.Set, path: [], value: 1 } }, + { type: 'post', id: 'p2', patch: { op: Op.Set, path: [], value: 1 } }, + ]) + expect(m.entities.get('user')!.size).toBe(2) + expect(m.entities.get('post')!.size).toBe(2) + expect(m.getEntity('user', 'u1')).toBeUndefined() // oldest user evicted + expect(m.getEntity('post', 'p1')).toBe(1) // posts within cap + }) + + it('cap of 0 (default) never evicts', () => { + const m = createMirror() + for (let i = 0; i < 100; i++) m.applyEntityPatches([set(String(i))]) + expect(m.entities.get('user')!.size).toBe(100) + }) + + it('eviction prunes the version ref of an unobserved entity', () => { + const m = createMirror({ entityCap: 1 }) + m.getEntity('user', '1') // create the version ref (no persistent reader) + m.applyEntityPatches([set('1')]) + expect(m.versions.has(entityKey('user', '1'))).toBe(true) + + m.applyEntityPatches([set('2')]) // evicts '1' and prunes its ref (nothing re-reads it) + expect(m.versions.has(entityKey('user', '1'))).toBe(false) + expect(m.entities.get('user')!.has('1')).toBe(false) + }) + + it('eviction re-runs a stale reader (reactivity preserved)', async () => { + const m = createMirror({ entityCap: 1 }) + let value: unknown + const scope = effectScope() + scope.run(() => { + watchEffect(() => { + value = m.getEntity('user', '1') + }) + }) + m.applyEntityPatches([set('1')]) + await nextTick() + expect(value).toEqual({ id: '1' }) + + m.applyEntityPatches([set('2')]) // evicts '1' + await nextTick() + expect(value).toBeUndefined() // reader re-ran on eviction + scope.stop() + }) +}) diff --git a/vue-sync-engine/lib/src/composables/useInfiniteQuery.ts b/vue-sync-engine/lib/src/composables/useInfiniteQuery.ts index 5a7d10c..147bfe8 100644 --- a/vue-sync-engine/lib/src/composables/useInfiniteQuery.ts +++ b/vue-sync-engine/lib/src/composables/useInfiniteQuery.ts @@ -1,4 +1,4 @@ -import { computed, onScopeDispose, watch, type ComputedRef, type MaybeRefOrGetter, toValue } from 'vue' +import { computed, onScopeDispose, shallowRef, watch, type ComputedRef, type MaybeRefOrGetter, toValue } from 'vue' import type { InfiniteQueryDef, QueryStatus } from '../core/types' import { Status } from '../core/flags' import { hashKey } from '../core/queryKey' @@ -26,7 +26,9 @@ export function useInfiniteQuery( const initial = toValue(args) let handle = engine.subscribeQuery(def.name, def.key(initial), initial) - let stateRef = engine.mirror.ensureQuery>(handle.subId) + // Track the active subId reactively and resolve via ensureQuery (see useQuery for rationale). + const subId = shallowRef(handle.subId) + const state = () => engine.mirror.ensureQuery>(subId.value).value if (!def.staticHash) { watch( @@ -35,7 +37,7 @@ export function useInfiniteQuery( const next = toValue(args) const prev = handle handle = engine.subscribeQuery(def.name, def.key(next), next) - stateRef = engine.mirror.ensureQuery>(handle.subId) + subId.value = handle.subId prev.release() }, ) @@ -44,11 +46,11 @@ export function useInfiniteQuery( onScopeDispose(() => handle.release()) return { - pages: computed(() => stateRef.value.data?.pages ?? []), - pageParams: computed(() => stateRef.value.data?.pageParams ?? []), - status: computed(() => stateRef.value.status), - error: computed(() => stateRef.value.error), - isLoading: computed(() => stateRef.value.status === Status.Pending), + pages: computed(() => state().data?.pages ?? []), + pageParams: computed(() => state().data?.pageParams ?? []), + status: computed(() => state().status), + error: computed(() => state().error), + isLoading: computed(() => state().status === Status.Pending), fetchNextPage: () => handle.fetchNextPage(), } } diff --git a/vue-sync-engine/lib/src/composables/useQuery.ts b/vue-sync-engine/lib/src/composables/useQuery.ts index ce91668..a03072f 100644 --- a/vue-sync-engine/lib/src/composables/useQuery.ts +++ b/vue-sync-engine/lib/src/composables/useQuery.ts @@ -1,4 +1,4 @@ -import { computed, onScopeDispose, watch, type ComputedRef, type MaybeRefOrGetter, toValue } from 'vue' +import { computed, onScopeDispose, shallowRef, watch, type ComputedRef, type MaybeRefOrGetter, toValue } from 'vue' import type { InfiniteQueryDef, QueryDef, QueryStatus } from '../core/types' import { Status } from '../core/flags' import { hashKey } from '../core/queryKey' @@ -21,7 +21,11 @@ export function useQuery( const initial = toValue(args) let currentHandle = engine.subscribeQuery(def.name, def.key(initial), initial) - let currentRef = engine.mirror.ensureQuery(currentHandle.subId) + // Track the active subId reactively (not the state ref itself — passing a ref into shallowRef + // unwraps it). Resolving through ensureQuery() inside each computed means the computed tracks + // both `subId` and the resolved ref, so it re-runs on both an args switch and a data update. + const subId = shallowRef(currentHandle.subId) + const state = () => engine.mirror.ensureQuery(subId.value).value if (!def.staticHash) { watch( @@ -30,7 +34,7 @@ export function useQuery( const next = toValue(args) const prev = currentHandle currentHandle = engine.subscribeQuery(def.name, def.key(next), next) - currentRef = engine.mirror.ensureQuery(currentHandle.subId) + subId.value = currentHandle.subId prev.release() }, ) @@ -39,11 +43,11 @@ export function useQuery( onScopeDispose(() => currentHandle.release()) return { - data: computed(() => currentRef.value.data), - status: computed(() => currentRef.value.status), - error: computed(() => currentRef.value.error), - isLoading: computed(() => currentRef.value.status === Status.Pending), - isSuccess: computed(() => currentRef.value.status === Status.Success), - isError: computed(() => currentRef.value.status === Status.Error), + data: computed(() => state().data), + status: computed(() => state().status), + error: computed(() => state().error), + isLoading: computed(() => state().status === Status.Pending), + isSuccess: computed(() => state().status === Status.Success), + isError: computed(() => state().status === Status.Error), } } diff --git a/vue-sync-engine/lib/src/core/types.ts b/vue-sync-engine/lib/src/core/types.ts index e66434f..6964f94 100644 --- a/vue-sync-engine/lib/src/core/types.ts +++ b/vue-sync-engine/lib/src/core/types.ts @@ -41,6 +41,8 @@ export interface InfiniteQueryDef TPageParam | null | undefined + /** Keep at most this many pages in memory; older pages are dropped as new ones load. 0/undefined = unlimited. */ + readonly maxPages?: number readonly fetch: (args: TArgs, ctx: FetchCtx & { pageParam: TPageParam }) => Promise readonly normalize?: (resp: TResp, args: TArgs, pageParam: TPageParam) => { entities?: Record>; result: TResult } readonly exec?: (args: TArgs, ctx: ExecCtx) => Promise @@ -85,6 +87,8 @@ export interface QuerySnapshot { error?: { message: string } updatedAt?: number entityRefs?: ReadonlyArray<{ type: string; id: EntityId }> + /** Per-page entityRef counts for infinite queries; lets page windowing survive hydration. */ + pageRefCounts?: ReadonlyArray } export interface QueuedMutation { diff --git a/vue-sync-engine/lib/src/createEngine.ts b/vue-sync-engine/lib/src/createEngine.ts index 10ef000..f079942 100644 --- a/vue-sync-engine/lib/src/createEngine.ts +++ b/vue-sync-engine/lib/src/createEngine.ts @@ -19,6 +19,10 @@ export interface WorkerBootstrapOptions { endpoint: ServerEndpoint defaultStaleTime?: number defaultGcTime?: number + /** Default page cap for infinite queries without their own `maxPages`. 0 = unlimited. */ + defaultMaxPages?: number + /** Reclaim worker-memory entities once no live query references them. Default false. */ + entityGc?: boolean } export function bootstrapWorker(opts: WorkerBootstrapOptions): void { @@ -33,16 +37,20 @@ export function bootstrapWorker(opts: WorkerBootstrapOptions): void { registry, defaultStaleTime: opts.defaultStaleTime, defaultGcTime: opts.defaultGcTime, + defaultMaxPages: opts.defaultMaxPages, + entityGc: opts.entityGc, }) } export interface TabEngineOptions { transport: Transport staleSubGcMs?: number + /** Max entities kept per type in the tab cache (LRU eviction). 0 = unlimited. */ + entityCap?: number } export function createTabEngine(opts: TabEngineOptions): TabRuntime { - const mirror = createMirror() + const mirror = createMirror({ entityCap: opts.entityCap }) return createTabRuntime({ transport: opts.transport, mirror, staleSubGcMs: opts.staleSubGcMs }) } @@ -53,6 +61,12 @@ export interface EngineOptions { storage?: StorageAdapter defaultStaleTime?: number defaultGcTime?: number + /** Default page cap for infinite queries without their own `maxPages`. 0 = unlimited. */ + defaultMaxPages?: number + /** Max entities kept per type in the tab cache (LRU eviction). 0 = unlimited. */ + entityCap?: number + /** Reclaim worker-memory entities once no live query references them. Default false. */ + entityGc?: boolean } export function createEngine(opts: EngineOptions): TabRuntime { @@ -66,8 +80,10 @@ export function createEngine(opts: EngineOptions): TabRuntime { endpoint: server, defaultStaleTime: opts.defaultStaleTime, defaultGcTime: opts.defaultGcTime, + defaultMaxPages: opts.defaultMaxPages, + entityGc: opts.entityGc, }) - return createTabEngine({ transport: client }) + return createTabEngine({ transport: client, entityCap: opts.entityCap }) } export interface InstallEngineOptions { diff --git a/vue-sync-engine/lib/src/tab/mirror.ts b/vue-sync-engine/lib/src/tab/mirror.ts index dbd77fb..118b177 100644 --- a/vue-sync-engine/lib/src/tab/mirror.ts +++ b/vue-sync-engine/lib/src/tab/mirror.ts @@ -2,6 +2,7 @@ import { shallowRef, triggerRef, type ShallowRef } from 'vue' import type { EntityId, EntityPatch, Patch, QueryStatus } from '../core/types' import { Op, Status } from '../core/flags' import { applyPatch } from '../core/patches' +import { entityKey } from '../core/queryKey' export interface QueryState { status: QueryStatus @@ -9,16 +10,29 @@ export interface QueryState { error: { message: string } | undefined } -export function createMirror() { +export interface MirrorOptions { + /** + * Max entities kept per type. When exceeded, the least-recently-used entity is evicted. + * 0 (default) = unlimited. Reads and writes bump recency; set this above your largest + * live working set so eviction only ever reclaims off-screen (orphaned) entities. + */ + entityCap?: number +} + +export function createMirror(opts?: MirrorOptions) { + const cap = opts?.entityCap ?? 0 const entities = new Map>() + // Per-entity version refs (keyed by `type id`) give fine-grained reactivity: + // a reader of one entity only re-runs when *that* entity changes, not when any + // sibling of the same type does. const versions = new Map>() const queries = new Map>() - function typeVersion(type: string): ShallowRef { - let v = versions.get(type) + function entityVersion(key: string): ShallowRef { + let v = versions.get(key) if (!v) { v = shallowRef(0) - versions.set(type, v) + versions.set(key, v) } return v } @@ -32,38 +46,97 @@ export function createMirror() { return b } + // Write a value, moving the key to the most-recently-used position when a cap is active. + // (Map.set on an existing key keeps its original position, so we delete first.) + function setVal(bucket: Map, id: EntityId, val: unknown): void { + if (cap !== 0) bucket.delete(id) + bucket.set(id, val) + } + function getEntity(type: string, id: EntityId): T | undefined { - typeVersion(type).value + // Lazily create + track this entity's version so that a later create/update/delete + // of exactly this entity re-runs the calling effect — even if it currently reads undefined. + entityVersion(entityKey(type, id)).value const b = entities.get(type) - return b === undefined ? undefined : (b.get(id) as T | undefined) + if (b === undefined) return undefined + const v = b.get(id) + if (cap !== 0 && v !== undefined) { + // LRU touch: a read marks the entity as recently used so it survives eviction. + b.delete(id) + b.set(id, v) + } + return v as T | undefined + } + + // Notify the entity's readers, then drop its version ref if the entity no longer exists. + // Readers re-run synchronously on the trigger, call getEntity, and lazily re-create a fresh + // ref (seeing undefined) — so pruning is safe and keeps `versions` from growing on churn. + // Refs for entities written-but-never-read are never created, so this is a no-op for them. + function bumpEntity(type: string, id: EntityId, key: string): void { + const v = versions.get(key) + if (v === undefined) return + triggerRef(v) + const b = entities.get(type) + if (b === undefined || !b.has(id)) versions.delete(key) + } + + // Evict least-recently-used entities (the front of the Map) until the type is within cap. + function evictOverflow(type: string): void { + const b = entities.get(type) + if (b === undefined || b.size <= cap) return + while (b.size > cap) { + const oldest = b.keys().next().value as EntityId + b.delete(oldest) + const key = entityKey(type, oldest) + const vref = versions.get(key) + if (vref !== undefined) { + triggerRef(vref) + versions.delete(key) + } + } } function applyEntityPatches(patches: EntityPatch[]): void { - if (patches.length === 0) return + const n = patches.length + if (n === 0) return + + // Fast path: a single patch (the common optimistic-update case) needs no Map. + if (n === 1) { + const p = patches[0] + const bucket = entityBucket(p.type) + const patch = p.patch + if (patch.op === Op.Delete && patch.path.length === 0) bucket.delete(p.id) + else setVal(bucket, p.id, applyPatch(bucket.get(p.id), patch)) + bumpEntity(p.type, p.id, entityKey(p.type, p.id)) + if (cap !== 0) evictOverflow(p.type) + return + } + let lastType = '' let bucket: Map | undefined - let touchedFirst: string | undefined - let touchedRest: Set | undefined - for (let i = 0; i < patches.length; i++) { + // Dedupe touched entities so one patched twice in a batch fires once; retain type+id + // so pruning can check current existence. + let touched: Map | undefined + let touchedTypes: Set | undefined + for (let i = 0; i < n; i++) { const p = patches[i] if (p.type !== lastType) { lastType = p.type bucket = entityBucket(lastType) - if (touchedFirst === undefined) touchedFirst = lastType - else if (lastType !== touchedFirst) { - if (touchedRest === undefined) touchedRest = new Set() - touchedRest.add(lastType) - } } const patch = p.patch if (patch.op === Op.Delete && patch.path.length === 0) { bucket!.delete(p.id) } else { - bucket!.set(p.id, applyPatch(bucket!.get(p.id), patch)) + setVal(bucket!, p.id, applyPatch(bucket!.get(p.id), patch)) } + const key = entityKey(p.type, p.id) + if (touched === undefined) touched = new Map() + if (!touched.has(key)) touched.set(key, { type: p.type, id: p.id }) + if (cap !== 0) (touchedTypes ??= new Set()).add(p.type) } - if (touchedFirst !== undefined) triggerRef(typeVersion(touchedFirst)) - if (touchedRest !== undefined) for (const t of touchedRest) triggerRef(typeVersion(t)) + if (touched !== undefined) for (const [key, { type, id }] of touched) bumpEntity(type, id, key) + if (touchedTypes !== undefined) for (const t of touchedTypes) evictOverflow(t) } function ensureQuery(subId: string): ShallowRef> { @@ -89,7 +162,7 @@ export function createMirror() { queries.delete(subId) } - return { entities, getEntity, applyEntityPatches, ensureQuery, applyQueryPatch, dropQuery } + return { entities, versions, getEntity, applyEntityPatches, ensureQuery, applyQueryPatch, dropQuery } } export type Mirror = ReturnType diff --git a/vue-sync-engine/lib/src/worker/mutationQueue.ts b/vue-sync-engine/lib/src/worker/mutationQueue.ts index 010d1fa..e75ea3f 100644 --- a/vue-sync-engine/lib/src/worker/mutationQueue.ts +++ b/vue-sync-engine/lib/src/worker/mutationQueue.ts @@ -9,6 +9,8 @@ export interface MutationQueueDeps { buildCtx: (forward: EntityPatch[], inverse: EntityPatch[]) => OptimisticCtx buildPostCtx: (post: EntityPatch[]) => OptimisticCtx invalidate: (def: MutationDef, input: unknown, resp: unknown) => void + pinEntities: (patches: ReadonlyArray) => void + unpinEntities: (patches: ReadonlyArray) => void isOnline: () => boolean onOnline: (cb: () => void) => () => void onResult: (mutId: string, ok: boolean, data?: unknown, error?: { message: string }) => void @@ -32,7 +34,9 @@ export function createMutationQueue(deps: MutationQueueDeps) { const persisted = await deps.storage.mutations.readAll() for (const m of persisted) { if (m.seq > seq) seq = m.seq - inflight.set(m.id, { queued: m, inverse: m.inversePatches ?? [] }) + const inverse = m.inversePatches ?? [] + inflight.set(m.id, { queued: m, inverse }) + deps.pinEntities(inverse) // protect restored optimistic entities until they settle } void drain() deps.onOnline(() => void drain()) @@ -52,6 +56,7 @@ export function createMutationQueue(deps: MutationQueueDeps) { if (def.optimistic) { def.optimistic(input, deps.buildCtx(forward, inverse)) if (forward.length) deps.emitEntityPatches(forward) + if (inverse.length) deps.pinEntities(inverse) // protect until the mutation settles } const queued: QueuedMutation = { @@ -105,6 +110,7 @@ export function createMutationQueue(deps: MutationQueueDeps) { deps.invalidate(def, entry.queued.input, resp) inflight.delete(entry.queued.id) await deps.storage.mutations.delete(entry.queued.id) + deps.unpinEntities(entry.inverse) deps.onResult(entry.queued.id, true, resp) } catch (err) { const networkLike = !deps.isOnline() || isNetworkError(err) @@ -124,6 +130,7 @@ export function createMutationQueue(deps: MutationQueueDeps) { } inflight.delete(entry.queued.id) await deps.storage.mutations.delete(entry.queued.id) + deps.unpinEntities(entry.inverse) deps.onResult(entry.queued.id, false, undefined, { message: (err as Error)?.message ?? String(err) }) } } diff --git a/vue-sync-engine/lib/src/worker/queryGraph.ts b/vue-sync-engine/lib/src/worker/queryGraph.ts index 04fb937..56dfb1e 100644 --- a/vue-sync-engine/lib/src/worker/queryGraph.ts +++ b/vue-sync-engine/lib/src/worker/queryGraph.ts @@ -1,7 +1,7 @@ import type { StorageAdapter } from '../adapters/storageAdapter' import type { EntityDef, EntityId, EntityPatch, InfiniteQueryDef, MutationDef, OptimisticCtx, QueryDef, QuerySnapshot, QueryStatus } from '../core/types' import { Op, Status, Msg, Kind } from '../core/flags' -import { hashKey } from '../core/queryKey' +import { hashKey, entityKey } from '../core/queryKey' import type { ServerEndpoint, ClientMsg } from '../transport/protocol' import { createMutationQueue } from './mutationQueue' import { DEV } from '../__dev' @@ -22,6 +22,9 @@ interface QueryNode { abort: AbortController | null gcTimer: ReturnType | null entityRefs: Array<{ type: string; id: EntityId }> + // Number of entityRefs contributed by each retained page (infinite queries only), + // so page windowing can drop the right slice of refs alongside the page. + pageRefCounts: number[] } interface Registry { @@ -36,6 +39,14 @@ export interface QueryGraphOptions { registry: Registry defaultStaleTime?: number defaultGcTime?: number + /** Default page cap for infinite queries that don't set their own `maxPages`. 0 = unlimited. */ + defaultMaxPages?: number + /** + * Reclaim worker-memory entities once no live query references them (and no in-flight + * mutation pins them). Uses exact reference counts from each node's entityRefs, so it + * only ever frees provably-orphaned entities. Default false. + */ + entityGc?: boolean isOnline?: () => boolean onOnline?: (cb: () => void) => () => void } @@ -44,6 +55,11 @@ export function createQueryGraph(opts: QueryGraphOptions) { const { storage, endpoint, registry } = opts const defaultStaleTime = opts.defaultStaleTime ?? 30_000 const defaultGcTime = opts.defaultGcTime ?? 5 * 60_000 + const defaultMaxPages = opts.defaultMaxPages ?? 0 + // Entity GC bookkeeping. When disabled, the maps are null and every retain/release/pin + // call short-circuits on the first line — zero overhead on the hot fetch path. + const entityRefCount = opts.entityGc ? new Map() : null + const entityPins = opts.entityGc ? new Map() : null const isOnline = opts.isOnline ?? (() => (typeof navigator !== 'undefined' ? navigator.onLine : true)) const onOnline = opts.onOnline ?? @@ -71,6 +87,71 @@ export function createQueryGraph(opts: QueryGraphOptions) { return entityBucket(type).get(id) } + function evictEntity(type: string, id: EntityId): void { + const b = entitiesInMemory.get(type) + if (b !== undefined) b.delete(id) + } + + type Ref = { type: string; id: EntityId } + + // Increment the query-reference count for each ref. Always called before releaseRefs so an + // entity present in both the old and new ref sets never transiently drops to 0. + function retainRefs(refs: ReadonlyArray): void { + if (entityRefCount === null) return + for (let i = 0; i < refs.length; i++) { + const k = entityKey(refs[i].type, refs[i].id) + entityRefCount.set(k, (entityRefCount.get(k) ?? 0) + 1) + } + } + + // Decrement counts; an entity that reaches 0 references and isn't pinned is freed immediately. + function releaseRefs(refs: ReadonlyArray): void { + if (entityRefCount === null) return + for (let i = 0; i < refs.length; i++) { + const r = refs[i] + const k = entityKey(r.type, r.id) + const c = (entityRefCount.get(k) ?? 0) - 1 + if (c <= 0) { + entityRefCount.delete(k) + if (entityPins === null || !entityPins.has(k)) evictEntity(r.type, r.id) + } else { + entityRefCount.set(k, c) + } + } + } + + // Atomically swap a node's referenced entities, retaining the new set before releasing the old. + function setNodeRefs(node: QueryNode, newRefs: Ref[]): void { + retainRefs(newRefs) + releaseRefs(node.entityRefs) + node.entityRefs = newRefs + } + + // Pins protect entities touched by an in-flight mutation from eviction until it settles. + function pinEntities(patches: ReadonlyArray<{ type: string; id: EntityId }>): void { + if (entityPins === null) return + for (let i = 0; i < patches.length; i++) { + const p = patches[i] + const k = entityKey(p.type, p.id) + entityPins.set(k, (entityPins.get(k) ?? 0) + 1) + } + } + + function unpinEntities(patches: ReadonlyArray<{ type: string; id: EntityId }>): void { + if (entityPins === null) return + for (let i = 0; i < patches.length; i++) { + const p = patches[i] + const k = entityKey(p.type, p.id) + const c = (entityPins.get(k) ?? 0) - 1 + if (c <= 0) { + entityPins.delete(k) + if (entityRefCount === null || !entityRefCount.has(k)) evictEntity(p.type, p.id) + } else { + entityPins.set(k, c) + } + } + } + function emitEntityPatches(patches: EntityPatch[]): Promise { if (patches.length === 0) return Promise.resolve() const writesByType = new Map>() @@ -142,6 +223,7 @@ export function createQueryGraph(opts: QueryGraphOptions) { abort: null, gcTimer: null, entityRefs: [], + pageRefCounts: [], } nodes.set(key, node) } else if (node.gcTimer !== null) { @@ -156,6 +238,8 @@ export function createQueryGraph(opts: QueryGraphOptions) { const gc = node.def.gcTime ?? defaultGcTime node.gcTimer = setTimeout(() => { if (node.subscribers.size === 0) { + releaseRefs(node.entityRefs) // free entities this node was the last to reference + node.entityRefs = [] nodes.delete(node.key) void storage.queries.delete(node.key) } @@ -187,7 +271,8 @@ export function createQueryGraph(opts: QueryGraphOptions) { return } if (patches.length > 0) endpoint.broadcast({ type: Msg.EntityPatch, patches }) - node.entityRefs = stored.entityRefs.slice() + setNodeRefs(node, stored.entityRefs.slice()) // node started empty; retains restored refs + if (stored.pageRefCounts) node.pageRefCounts = stored.pageRefCounts.slice() } node.result = stored.result node.status = Status.Success @@ -289,14 +374,43 @@ export function createQueryGraph(opts: QueryGraphOptions) { }) if (entities !== null) await emitEntityPatches(ingestEntities(entities, pageRefs)) if (isInfinite) { + const maxPages = (node.def as InfiniteQueryDef).maxPages ?? defaultMaxPages const prev = (node.result as { pages: unknown[]; pageParams: unknown[] } | undefined) ?? { pages: [], pageParams: [] } - node.result = append - ? { pages: [...prev.pages, pageResult], pageParams: [...prev.pageParams, effectivePageParam] } - : { pages: [pageResult], pageParams: [effectivePageParam] } - node.entityRefs = append ? node.entityRefs.concat(pageRefs) : pageRefs + if (append) { + // Incremental retain: only the new page's entities, never re-touching the window — + // keeps append O(page) rather than O(total) for long infinite lists. + retainRefs(pageRefs) + const pages = [...prev.pages, pageResult] + const pageParams = [...prev.pageParams, effectivePageParam] + let refs = node.entityRefs.concat(pageRefs) + // Counts stay aligned with pages unless they drifted (e.g. a hydrated snapshot + // without per-page counts). When aligned we can drop the exact ref slice. + const counts = node.pageRefCounts.length === prev.pages.length ? node.pageRefCounts.concat(pageRefs.length) : null + if (maxPages && pages.length > maxPages) { + const dropN = pages.length - maxPages + pages.splice(0, dropN) + pageParams.splice(0, dropN) + if (counts) { + let dropRefs = 0 + for (let i = 0; i < dropN; i++) dropRefs += counts[i] + counts.splice(0, dropN) + if (dropRefs > 0) { + releaseRefs(refs.slice(0, dropRefs)) + refs = refs.slice(dropRefs) + } + } + } + node.result = { pages, pageParams } + node.entityRefs = refs + node.pageRefCounts = counts ?? [] + } else { + node.result = { pages: [pageResult], pageParams: [effectivePageParam] } + setNodeRefs(node, pageRefs) + node.pageRefCounts = [pageRefs.length] + } } else { node.result = pageResult - node.entityRefs = pageRefs + setNodeRefs(node, pageRefs) } node.status = Status.Success node.updatedAt = Date.now() @@ -305,6 +419,7 @@ export function createQueryGraph(opts: QueryGraphOptions) { result: node.result, updatedAt: node.updatedAt, entityRefs: node.entityRefs, + pageRefCounts: isInfinite ? node.pageRefCounts : undefined, } await storage.queries.write([{ key: node.key, value: snap }]) pushSnapshotToSubscribers(node) @@ -463,6 +578,8 @@ export function createQueryGraph(opts: QueryGraphOptions) { buildCtx, buildPostCtx, invalidate, + pinEntities, + unpinEntities, isOnline, onOnline, onResult: (mutId, ok, data, error) => @@ -478,7 +595,7 @@ export function createQueryGraph(opts: QueryGraphOptions) { else if (msg.type === Msg.FetchNextPage) fetchNextPage(msg.subId) }) - return { nodes, subscribe, unsubscribe, fetchNextPage, queue } + return { nodes, entitiesInMemory, subscribe, unsubscribe, fetchNextPage, queue } } function shallowEqual(a: Record, b: Record): boolean {