fix(ai): fix four code review findings in schema migration feature

- apply_structural_migrations: stop collecting at first reembed boundary
  so the version counter never jumps past a pending reembed migration
- stored_schema_version: return 0 on missing file so pre-versioning tables
  correctly pick up all pending migrations on next update_llm_index
- Move migration integration tests into TestUpdateLlmIndexSchemaMigrations class
- Convert bare unittest.mock.patch context managers to mocker.patch in new tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
stumpylog
2026-06-09 10:11:41 -07:00
parent dcac0cb473
commit 1fb5747724
3 changed files with 85 additions and 90 deletions
+63 -64
View File
@@ -1,5 +1,6 @@
import json
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
@@ -216,84 +217,82 @@ def test_update_llm_index_rebuilds_on_model_name_change(
@pytest.mark.django_db
def test_update_llm_index_applies_structural_migration_without_rebuild(
temp_llm_index_dir: Path,
real_document: Document,
mock_embed_model: FakeEmbedding,
mocker: pytest_mock.MockerFixture,
) -> None:
"""Structural migrations are applied in-place; no full rebuild (drop) occurs."""
column_added: list[bool] = []
class TestUpdateLlmIndexSchemaMigrations:
def test_applies_structural_migration_without_rebuild(
self,
temp_llm_index_dir: Path,
real_document: Document,
mock_embed_model: FakeEmbedding,
mocker: pytest_mock.MockerFixture,
) -> None:
"""Structural migrations are applied in-place; no full rebuild (drop) occurs."""
column_added: list[bool] = []
def _add_extra(table) -> None:
table.add_columns({"extra": "CAST(NULL AS string)"})
column_added.append(True)
def _add_extra(table: Any) -> None:
table.add_columns({"extra": "CAST(NULL AS string)"})
column_added.append(True)
# Build the initial index at version 1 (the real CURRENT_SCHEMA_VERSION; no patches needed).
with patch("documents.models.Document.objects.all") as mock_all:
mock_queryset = MagicMock()
mock_queryset.exists.return_value = True
mock_queryset.__iter__.return_value = iter([real_document])
mock_all.return_value = mock_queryset
def _make_queryset() -> MagicMock:
qs = MagicMock()
qs.exists.return_value = True
qs.__iter__.return_value = iter([real_document])
return qs
mocker.patch(
"documents.models.Document.objects.all",
side_effect=_make_queryset,
)
indexing.update_llm_index(rebuild=True)
# Simulate a new v2 structural migration being introduced after the initial index was built.
m2 = Migration(
version=2,
description="add extra col",
requires_reembed=False,
apply=_add_extra,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
mocker.patch("paperless_ai.vector_store.CURRENT_SCHEMA_VERSION", 2)
drop_spy = mocker.spy(PaperlessLanceVectorStore, "drop_table")
m2 = Migration(
version=2,
description="add extra col",
requires_reembed=False,
apply=_add_extra,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
mocker.patch("paperless_ai.vector_store.CURRENT_SCHEMA_VERSION", 2)
drop_spy = mocker.spy(PaperlessLanceVectorStore, "drop_table")
with patch("documents.models.Document.objects.all") as mock_all:
mock_queryset = MagicMock()
mock_queryset.exists.return_value = True
mock_queryset.__iter__.return_value = iter([real_document])
mock_all.return_value = mock_queryset
indexing.update_llm_index(rebuild=False)
assert column_added, "Structural migration apply() was not called"
drop_spy.assert_not_called()
assert column_added, "Structural migration apply() was not called"
drop_spy.assert_not_called()
def test_forces_rebuild_on_reembed_migration(
self,
temp_llm_index_dir: Path,
real_document: Document,
mock_embed_model: FakeEmbedding,
mocker: pytest_mock.MockerFixture,
) -> None:
"""A pending reembed migration causes a full drop+rebuild on next update."""
@pytest.mark.django_db
def test_update_llm_index_forces_rebuild_on_reembed_migration(
temp_llm_index_dir: Path,
real_document: Document,
mock_embed_model: FakeEmbedding,
mocker: pytest_mock.MockerFixture,
) -> None:
"""A pending reembed migration causes a full drop+rebuild on next update."""
# Build the initial index at version 1 (the real CURRENT_SCHEMA_VERSION; no patches needed).
with patch("documents.models.Document.objects.all") as mock_all:
mock_queryset = MagicMock()
mock_queryset.exists.return_value = True
mock_queryset.__iter__.return_value = iter([real_document])
mock_all.return_value = mock_queryset
def _make_queryset() -> MagicMock:
qs = MagicMock()
qs.exists.return_value = True
qs.__iter__.return_value = iter([real_document])
return qs
mocker.patch(
"documents.models.Document.objects.all",
side_effect=_make_queryset,
)
indexing.update_llm_index(rebuild=True)
# Simulate a reembed migration at v2 being introduced after the initial index was built.
m2 = Migration(
version=2,
description="requires reembed",
requires_reembed=True,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
mocker.patch("paperless_ai.vector_store.CURRENT_SCHEMA_VERSION", 2)
drop_spy = mocker.spy(PaperlessLanceVectorStore, "drop_table")
m2 = Migration(
version=2,
description="requires reembed",
requires_reembed=True,
apply=lambda t: None,
)
mocker.patch("paperless_ai.vector_store.MIGRATIONS", [m2])
mocker.patch("paperless_ai.vector_store.CURRENT_SCHEMA_VERSION", 2)
drop_spy = mocker.spy(PaperlessLanceVectorStore, "drop_table")
with patch("documents.models.Document.objects.all") as mock_all:
mock_queryset = MagicMock()
mock_queryset.exists.return_value = True
mock_queryset.__iter__.return_value = iter([real_document])
mock_all.return_value = mock_queryset
indexing.update_llm_index(rebuild=False)
drop_spy.assert_called()
drop_spy.assert_called()
@pytest.mark.django_db
+2 -2
View File
@@ -436,7 +436,7 @@ class TestSchemaVersioning:
assert version_file.exists()
assert json.loads(version_file.read_text())["version"] == CURRENT_SCHEMA_VERSION
def test_stored_schema_version_returns_current_when_file_missing(
def test_stored_schema_version_returns_zero_when_file_missing(
self,
uri: str,
) -> None:
@@ -446,7 +446,7 @@ class TestSchemaVersioning:
(Path(uri) / "schema_version.json").unlink()
reopened = PaperlessLanceVectorStore(uri=uri)
assert reopened.stored_schema_version() == CURRENT_SCHEMA_VERSION
assert reopened.stored_schema_version() == 0
def test_stored_schema_version_persists_after_reopen(self, uri: str) -> None:
+20 -24
View File
@@ -35,17 +35,10 @@ class Migration:
apply: Callable[[Any], None] = field(compare=False, hash=False)
# Ordered list of schema migrations. Each entry upgrades the table to `version`.
# Structural migrations (requires_reembed=False) are applied in-place via LanceDB's
# add_columns/alter_columns/drop_columns APIs — no re-embedding needed.
# Migrations with requires_reembed=True cause a full rebuild on next index update,
# exactly like a model-name change does today.
#
# To add a migration:
# 1. Increment CURRENT_SCHEMA_VERSION.
# 2. Append a Migration entry here with the new version number.
# 3. For structural changes, call table.add_columns/alter_columns/drop_columns in apply().
# 4. For embedding-invalidating changes, set requires_reembed=True; apply() can be a no-op.
# Ordered schema migrations, each upgrading the table to its `version`. To add one,
# bump CURRENT_SCHEMA_VERSION and append a Migration. Structural migrations
# (requires_reembed=False) run in-place via the table's add/alter/drop_columns in
# apply(); requires_reembed=True forces a full rebuild and apply() can be a no-op.
MIGRATIONS: list[Migration] = []
# Below this many chunks, brute-force exact search is fast enough that building
@@ -150,15 +143,16 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
return Path(self._uri) / "schema_version.json"
def stored_schema_version(self) -> int:
"""Return the schema version recorded on disk, or CURRENT_SCHEMA_VERSION if missing.
"""Return the schema version recorded on disk, or 0 if the file is missing.
Missing means either the table predates versioning or was just created and the
write hasn't happened yet — treat conservatively as already current.
0 causes all registered migrations to be treated as pending, which is safe
for tables that predate versioning. Fresh tables always have the file written
by _ensure_table, so they never fall through to this default.
"""
try:
return int(json.loads(self._schema_version_path.read_text())["version"])
except (FileNotFoundError, KeyError, ValueError):
return CURRENT_SCHEMA_VERSION
return 0
def _write_schema_version(self, version: int) -> None:
self._schema_version_path.parent.mkdir(parents=True, exist_ok=True)
@@ -178,18 +172,20 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
def apply_structural_migrations(self) -> list[Migration]:
"""Apply all pending structural (non-reembed) migrations in version order.
Each applied migration's ``apply`` callable receives the live LanceDB table
object and should call ``add_columns``, ``alter_columns``, or ``drop_columns``
as needed. After all structural migrations run, the version file is updated
to the highest version applied and the in-memory table reference is refreshed.
Migrations with ``requires_reembed=True`` are skipped — the caller is
responsible for detecting them via ``requires_reembed_migration()`` and
triggering a full rebuild.
Each migration's ``apply`` callable receives the live LanceDB table and runs
in-place. Migrations with ``requires_reembed=True`` are skipped — the caller
detects them via ``requires_reembed_migration()`` and rebuilds instead.
"""
if self._table is None:
return []
structural = [m for m in self.pending_migrations() if not m.requires_reembed]
# Stop at the first reembed boundary so the version counter never jumps past
# a pending reembed migration, which would cause requires_reembed_migration()
# to return False and silently skip the rebuild.
structural: list[Migration] = []
for m in self.pending_migrations():
if m.requires_reembed:
break
structural.append(m)
if not structural:
return []
for migration in structural: