feat(ai): implement apply_structural_migrations for in-place schema changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
stumpylog
2026-06-09 09:33:12 -07:00
parent aadf16e391
commit da2e1a6f96
2 changed files with 156 additions and 0 deletions
+127
View File
@@ -1,5 +1,6 @@
import json
from pathlib import Path
from typing import Any
import pytest
import pytest_mock
@@ -581,3 +582,129 @@ class TestMigrationRegistry:
store = self._store_at_version(uri, 1)
assert store.requires_reembed_migration() is True
class TestApplyStructuralMigrations:
@pytest.fixture
def uri(self, tmp_path: Path) -> str:
return str(tmp_path / "idx")
def _store_at_version(self, uri: str, version: int) -> PaperlessLanceVectorStore:
store = PaperlessLanceVectorStore(uri=uri)
store.add([_node("1-0", "1", "text", 0.1)])
store._write_schema_version(version)
return PaperlessLanceVectorStore(uri=uri)
def test_apply_structural_adds_column_via_lancedb(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
def _add_extra(table: Any) -> None:
table.add_columns({"extra": "CAST(NULL AS string)"})
m2 = Migration(
version=2,
description="add extra col",
requires_reembed=False,
apply=_add_extra,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
store = self._store_at_version(uri, 1)
applied = store.apply_structural_migrations()
assert len(applied) == 1
assert applied[0] == m2
# Column actually present in the table schema after reopen.
reopened = PaperlessLanceVectorStore(uri=uri)
field_names = [f.name for f in reopened._table.schema]
assert "extra" in field_names
def test_apply_structural_updates_version_file(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="add col",
requires_reembed=False,
apply=lambda t: t.add_columns({"c": "CAST(NULL AS string)"}),
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
store = self._store_at_version(uri, 1)
store.apply_structural_migrations()
assert store.stored_schema_version() == 2
def test_apply_structural_skips_reembed_migrations(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
from paperless_ai.vector_store import Migration
applied_versions: list[int] = []
m2 = Migration(
version=2,
description="structural",
requires_reembed=False,
apply=lambda t: (
applied_versions.append(2)
or t.add_columns({"c": "CAST(NULL AS string)"})
),
)
m3 = Migration(
version=3,
description="reembed",
requires_reembed=True,
apply=lambda t: applied_versions.append(3),
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2, m3])
store = self._store_at_version(uri, 1)
applied = store.apply_structural_migrations()
assert [m.version for m in applied] == [2]
assert 3 not in applied_versions
# Version advances only to the last structural migration applied.
assert store.stored_schema_version() == 2
def test_apply_structural_noop_at_current_version(self, uri: str) -> None:
store = self._store_at_version(uri, 1)
applied = store.apply_structural_migrations()
assert applied == []
def test_apply_structural_noop_when_no_table(self, uri: str) -> None:
store = PaperlessLanceVectorStore(uri=uri)
applied = store.apply_structural_migrations()
assert applied == []
def test_apply_structural_refreshes_table_reference(
self,
uri: str,
mocker: pytest_mock.MockerFixture,
) -> None:
"""After add_columns the in-memory table object must reflect the new schema."""
from paperless_ai.vector_store import Migration
m2 = Migration(
version=2,
description="add col",
requires_reembed=False,
apply=lambda t: t.add_columns({"extra": "CAST(NULL AS string)"}),
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
store = self._store_at_version(uri, 1)
store.apply_structural_migrations()
# The store's own _table reference (not a re-open) must see the new column.
field_names = [f.name for f in store._table.schema]
assert "extra" in field_names
+29
View File
@@ -175,6 +175,35 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
"""True when any pending migration requires a full re-embedding."""
return any(m.requires_reembed for m in self.pending_migrations())
def apply_structural_migrations(self) -> list[Migration]:
"""Apply all pending structural (non-reembed) migrations in version order.
Each applied migration's ``apply`` callable receives the live LanceDB table
object and should call ``add_columns``, ``alter_columns``, or ``drop_columns``
as needed. After all structural migrations run, the version file is updated
to the highest version applied and the in-memory table reference is refreshed.
Migrations with ``requires_reembed=True`` are skipped — the caller is
responsible for detecting them via ``requires_reembed_migration()`` and
triggering a full rebuild.
"""
if self._table is None:
return []
structural = [m for m in self.pending_migrations() if not m.requires_reembed]
if not structural:
return []
for migration in structural:
logger.info(
"Applying schema migration v%d: %s",
migration.version,
migration.description,
)
migration.apply(self._table)
# Refresh the in-memory table so subsequent operations see the new schema.
self._table = self._conn.open_table(self._table_name)
self._write_schema_version(structural[-1].version)
return structural
def config_mismatch(self, model_name: str) -> bool:
"""True when the stored model name differs from ``model_name``.