From aadf16e39163ff483375beb86603e628911f99c2 Mon Sep 17 00:00:00 2001 From: stumpylog <797416+stumpylog@users.noreply.github.com> Date: Tue, 9 Jun 2026 09:24:15 -0700 Subject: [PATCH] feat(ai): add Migration registry and pending migration detection --- src/paperless_ai/tests/test_vector_store.py | 115 ++++++++++++++++++++ src/paperless_ai/vector_store.py | 36 ++++++ 2 files changed, 151 insertions(+) diff --git a/src/paperless_ai/tests/test_vector_store.py b/src/paperless_ai/tests/test_vector_store.py index 933961126..9779848f5 100644 --- a/src/paperless_ai/tests/test_vector_store.py +++ b/src/paperless_ai/tests/test_vector_store.py @@ -2,6 +2,7 @@ import json from pathlib import Path import pytest +import pytest_mock from llama_index.core.schema import NodeRelationship from llama_index.core.schema import RelatedNodeInfo from llama_index.core.schema import TextNode @@ -466,3 +467,117 @@ class TestSchemaVersioning: version_file = Path(uri) / "schema_version.json" assert json.loads(version_file.read_text())["version"] == CURRENT_SCHEMA_VERSION + + +class TestMigrationRegistry: + @pytest.fixture + def uri(self, tmp_path: Path) -> str: + return str(tmp_path / "idx") + + def _store_at_version(self, uri: str, version: int) -> PaperlessLanceVectorStore: + """Create a store with a table and then fake its on-disk version.""" + store = PaperlessLanceVectorStore(uri=uri) + store.add([_node("1-0", "1", "text", 0.1)]) + store._write_schema_version(version) + return PaperlessLanceVectorStore(uri=uri) # reopen to pick up written version + + def test_pending_migrations_empty_at_current_version(self, uri: str) -> None: + from paperless_ai.vector_store import CURRENT_SCHEMA_VERSION + from paperless_ai.vector_store import Migration # noqa: F401 + + store = self._store_at_version(uri, CURRENT_SCHEMA_VERSION) + assert store.pending_migrations() == [] + + def test_pending_migrations_returns_migrations_above_stored_version( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="add col", + requires_reembed=False, + apply=lambda t: None, + ) + m3 = Migration( + version=3, + description="reindex", + requires_reembed=True, + apply=lambda t: None, + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3]) + + store = self._store_at_version(uri, 1) + pending = store.pending_migrations() + assert pending == [m2, m3] + + def test_pending_migrations_excludes_already_applied( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="add col", + requires_reembed=False, + apply=lambda t: None, + ) + m3 = Migration( + version=3, + description="reindex", + requires_reembed=True, + apply=lambda t: None, + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3]) + + store = self._store_at_version(uri, 2) + pending = store.pending_migrations() + assert pending == [m3] + + def test_pending_migrations_empty_when_no_table(self, uri: str) -> None: + store = PaperlessLanceVectorStore(uri=uri) + assert store.pending_migrations() == [] + + def test_requires_reembed_migration_false_when_none_pending(self, uri: str) -> None: + store = self._store_at_version(uri, 1) + assert store.requires_reembed_migration() is False + + def test_requires_reembed_migration_false_when_only_structural_pending( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="add col", + requires_reembed=False, + apply=lambda t: None, + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2]) + + store = self._store_at_version(uri, 1) + assert store.requires_reembed_migration() is False + + def test_requires_reembed_migration_true_when_reembed_migration_pending( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="reindex", + requires_reembed=True, + apply=lambda t: None, + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2]) + + store = self._store_at_version(uri, 1) + assert store.requires_reembed_migration() is True diff --git a/src/paperless_ai/vector_store.py b/src/paperless_ai/vector_store.py index afbc11603..cbcaa9548 100644 --- a/src/paperless_ai/vector_store.py +++ b/src/paperless_ai/vector_store.py @@ -1,6 +1,9 @@ import json import logging +from collections.abc import Callable from collections.abc import Sequence +from dataclasses import dataclass +from dataclasses import field from pathlib import Path from typing import Any from typing import Final @@ -23,6 +26,28 @@ logger = logging.getLogger("paperless_ai.vector_store") DEFAULT_TABLE_NAME: Final = "documents" CURRENT_SCHEMA_VERSION: Final[int] = 1 + +@dataclass(frozen=True) +class Migration: + version: int + description: str + requires_reembed: bool + apply: Callable[[Any], None] = field(compare=False, hash=False) + + +# Ordered list of schema migrations. Each entry upgrades the table to `version`. +# Structural migrations (requires_reembed=False) are applied in-place via LanceDB's +# add_columns/alter_columns/drop_columns APIs — no re-embedding needed. +# Migrations with requires_reembed=True cause a full rebuild on next index update, +# exactly like a model-name change does today. +# +# To add a migration: +# 1. Increment CURRENT_SCHEMA_VERSION. +# 2. Append a Migration entry here with the new version number. +# 3. For structural changes, call table.add_columns/alter_columns/drop_columns in apply(). +# 4. For embedding-invalidating changes, set requires_reembed=True; apply() can be a no-op. +MIGRATIONS: list[Migration] = [] + # Below this many chunks, LanceDB's exact (brute-force) search is sufficient and # faster than building an ANN index (per LanceDB guidance, ~100K vectors). ANN_INDEX_MIN_ROWS = 100_000 @@ -139,6 +164,17 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore): self._schema_version_path.parent.mkdir(parents=True, exist_ok=True) self._schema_version_path.write_text(json.dumps({"version": version})) + def pending_migrations(self) -> list[Migration]: + """Return migrations not yet applied to this table, in version order.""" + if self._table is None: + return [] + current = self.stored_schema_version() + return [m for m in MIGRATIONS if m.version > current] + + def requires_reembed_migration(self) -> bool: + """True when any pending migration requires a full re-embedding.""" + return any(m.requires_reembed for m in self.pending_migrations()) + def config_mismatch(self, model_name: str) -> bool: """True when the stored model name differs from ``model_name``.