feat(ai): add Migration registry and pending migration detection

This commit is contained in:
stumpylog
2026-06-09 09:24:15 -07:00
parent b0d09b0e2c
commit aadf16e391
2 changed files with 151 additions and 0 deletions
+115
View File
@@ -2,6 +2,7 @@ import json
from pathlib import Path
import pytest
import pytest_mock
from llama_index.core.schema import NodeRelationship
from llama_index.core.schema import RelatedNodeInfo
from llama_index.core.schema import TextNode
@@ -466,3 +467,117 @@ class TestSchemaVersioning:
version_file = Path(uri) / "schema_version.json"
assert json.loads(version_file.read_text())["version"] == CURRENT_SCHEMA_VERSION
class TestMigrationRegistry:
@pytest.fixture
def uri(self, tmp_path: Path) -> str:
return str(tmp_path / "idx")
def _store_at_version(self, uri: str, version: int) -> PaperlessLanceVectorStore:
"""Create a store with a table and then fake its on-disk version."""
store = PaperlessLanceVectorStore(uri=uri)
store.add([_node("1-0", "1", "text", 0.1)])
store._write_schema_version(version)
return PaperlessLanceVectorStore(uri=uri) # reopen to pick up written version
def test_pending_migrations_empty_at_current_version(self, uri: str) -> None:
from paperless_ai.vector_store import CURRENT_SCHEMA_VERSION
from paperless_ai.vector_store import Migration # noqa: F401
store = self._store_at_version(uri, CURRENT_SCHEMA_VERSION)
assert store.pending_migrations() == []
def test_pending_migrations_returns_migrations_above_stored_version(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="add col",
requires_reembed=False,
apply=lambda t: None,
)
m3 = Migration(
version=3,
description="reindex",
requires_reembed=True,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3])
store = self._store_at_version(uri, 1)
pending = store.pending_migrations()
assert pending == [m2, m3]
def test_pending_migrations_excludes_already_applied(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="add col",
requires_reembed=False,
apply=lambda t: None,
)
m3 = Migration(
version=3,
description="reindex",
requires_reembed=True,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3])
store = self._store_at_version(uri, 2)
pending = store.pending_migrations()
assert pending == [m3]
def test_pending_migrations_empty_when_no_table(self, uri: str) -> None:
store = PaperlessLanceVectorStore(uri=uri)
assert store.pending_migrations() == []
def test_requires_reembed_migration_false_when_none_pending(self, uri: str) -> None:
store = self._store_at_version(uri, 1)
assert store.requires_reembed_migration() is False
def test_requires_reembed_migration_false_when_only_structural_pending(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="add col",
requires_reembed=False,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
store = self._store_at_version(uri, 1)
assert store.requires_reembed_migration() is False
def test_requires_reembed_migration_true_when_reembed_migration_pending(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="reindex",
requires_reembed=True,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
store = self._store_at_version(uri, 1)
assert store.requires_reembed_migration() is True
+36
View File
@@ -1,6 +1,9 @@
import json
import logging
from collections.abc import Callable
from collections.abc import Sequence
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any
from typing import Final
@@ -23,6 +26,28 @@ logger = logging.getLogger("paperless_ai.vector_store")
DEFAULT_TABLE_NAME: Final = "documents"
CURRENT_SCHEMA_VERSION: Final[int] = 1
@dataclass(frozen=True)
class Migration:
version: int
description: str
requires_reembed: bool
apply: Callable[[Any], None] = field(compare=False, hash=False)
# Ordered list of schema migrations. Each entry upgrades the table to `version`.
# Structural migrations (requires_reembed=False) are applied in-place via LanceDB's
# add_columns/alter_columns/drop_columns APIs — no re-embedding needed.
# Migrations with requires_reembed=True cause a full rebuild on next index update,
# exactly like a model-name change does today.
#
# To add a migration:
# 1. Increment CURRENT_SCHEMA_VERSION.
# 2. Append a Migration entry here with the new version number.
# 3. For structural changes, call table.add_columns/alter_columns/drop_columns in apply().
# 4. For embedding-invalidating changes, set requires_reembed=True; apply() can be a no-op.
MIGRATIONS: list[Migration] = []
# Below this many chunks, LanceDB's exact (brute-force) search is sufficient and
# faster than building an ANN index (per LanceDB guidance, ~100K vectors).
ANN_INDEX_MIN_ROWS = 100_000
@@ -139,6 +164,17 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
self._schema_version_path.parent.mkdir(parents=True, exist_ok=True)
self._schema_version_path.write_text(json.dumps({"version": version}))
def pending_migrations(self) -> list[Migration]:
"""Return migrations not yet applied to this table, in version order."""
if self._table is None:
return []
current = self.stored_schema_version()
return [m for m in MIGRATIONS if m.version > current]
def requires_reembed_migration(self) -> bool:
"""True when any pending migration requires a full re-embedding."""
return any(m.requires_reembed for m in self.pending_migrations())
def config_mismatch(self, model_name: str) -> bool:
"""True when the stored model name differs from ``model_name``.