diff --git a/src/paperless_ai/tests/test_vector_store.py b/src/paperless_ai/tests/test_vector_store.py index 9779848f5..6710e1502 100644 --- a/src/paperless_ai/tests/test_vector_store.py +++ b/src/paperless_ai/tests/test_vector_store.py @@ -1,5 +1,6 @@ import json from pathlib import Path +from typing import Any import pytest import pytest_mock @@ -581,3 +582,129 @@ class TestMigrationRegistry: store = self._store_at_version(uri, 1) assert store.requires_reembed_migration() is True + + +class TestApplyStructuralMigrations: + @pytest.fixture + def uri(self, tmp_path: Path) -> str: + return str(tmp_path / "idx") + + def _store_at_version(self, uri: str, version: int) -> PaperlessLanceVectorStore: + store = PaperlessLanceVectorStore(uri=uri) + store.add([_node("1-0", "1", "text", 0.1)]) + store._write_schema_version(version) + return PaperlessLanceVectorStore(uri=uri) + + def test_apply_structural_adds_column_via_lancedb( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + def _add_extra(table: Any) -> None: + table.add_columns({"extra": "CAST(NULL AS string)"}) + + m2 = Migration( + version=2, + description="add extra col", + requires_reembed=False, + apply=_add_extra, + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2]) + + store = self._store_at_version(uri, 1) + applied = store.apply_structural_migrations() + + assert len(applied) == 1 + assert applied[0] == m2 + # Column actually present in the table schema after reopen. + reopened = PaperlessLanceVectorStore(uri=uri) + field_names = [f.name for f in reopened._table.schema] + assert "extra" in field_names + + def test_apply_structural_updates_version_file( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="add col", + requires_reembed=False, + apply=lambda t: t.add_columns({"c": "CAST(NULL AS string)"}), + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2]) + + store = self._store_at_version(uri, 1) + store.apply_structural_migrations() + + assert store.stored_schema_version() == 2 + + def test_apply_structural_skips_reembed_migrations( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + from paperless_ai.vector_store import Migration + + applied_versions: list[int] = [] + m2 = Migration( + version=2, + description="structural", + requires_reembed=False, + apply=lambda t: ( + applied_versions.append(2) + or t.add_columns({"c": "CAST(NULL AS string)"}) + ), + ) + m3 = Migration( + version=3, + description="reembed", + requires_reembed=True, + apply=lambda t: applied_versions.append(3), + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3]) + + store = self._store_at_version(uri, 1) + applied = store.apply_structural_migrations() + + assert [m.version for m in applied] == [2] + assert 3 not in applied_versions + # Version advances only to the last structural migration applied. + assert store.stored_schema_version() == 2 + + def test_apply_structural_noop_at_current_version(self, uri: str) -> None: + store = self._store_at_version(uri, 1) + applied = store.apply_structural_migrations() + assert applied == [] + + def test_apply_structural_noop_when_no_table(self, uri: str) -> None: + store = PaperlessLanceVectorStore(uri=uri) + applied = store.apply_structural_migrations() + assert applied == [] + + def test_apply_structural_refreshes_table_reference( + self, + uri: str, + mocker: pytest_mock.MockerFixture, + ) -> None: + """After add_columns the in-memory table object must reflect the new schema.""" + from paperless_ai.vector_store import Migration + + m2 = Migration( + version=2, + description="add col", + requires_reembed=False, + apply=lambda t: t.add_columns({"extra": "CAST(NULL AS string)"}), + ) + mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2]) + + store = self._store_at_version(uri, 1) + store.apply_structural_migrations() + + # The store's own _table reference (not a re-open) must see the new column. + field_names = [f.name for f in store._table.schema] + assert "extra" in field_names diff --git a/src/paperless_ai/vector_store.py b/src/paperless_ai/vector_store.py index cbcaa9548..fbefaaf8d 100644 --- a/src/paperless_ai/vector_store.py +++ b/src/paperless_ai/vector_store.py @@ -175,6 +175,35 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore): """True when any pending migration requires a full re-embedding.""" return any(m.requires_reembed for m in self.pending_migrations()) + def apply_structural_migrations(self) -> list[Migration]: + """Apply all pending structural (non-reembed) migrations in version order. + + Each applied migration's ``apply`` callable receives the live LanceDB table + object and should call ``add_columns``, ``alter_columns``, or ``drop_columns`` + as needed. After all structural migrations run, the version file is updated + to the highest version applied and the in-memory table reference is refreshed. + + Migrations with ``requires_reembed=True`` are skipped — the caller is + responsible for detecting them via ``requires_reembed_migration()`` and + triggering a full rebuild. + """ + if self._table is None: + return [] + structural = [m for m in self.pending_migrations() if not m.requires_reembed] + if not structural: + return [] + for migration in structural: + logger.info( + "Applying schema migration v%d: %s", + migration.version, + migration.description, + ) + migration.apply(self._table) + # Refresh the in-memory table so subsequent operations see the new schema. + self._table = self._conn.open_table(self._table_name) + self._write_schema_version(structural[-1].version) + return structural + def config_mismatch(self, model_name: str) -> bool: """True when the stored model name differs from ``model_name``.