From e3ebe7cda19bd884063d7ea18a239b9f50394342 Mon Sep 17 00:00:00 2001 From: stumpylog <797416+stumpylog@users.noreply.github.com> Date: Fri, 5 Jun 2026 13:06:44 -0700 Subject: [PATCH] Abstracts the modified check into the vector store --- src/paperless_ai/indexing.py | 27 +----------- src/paperless_ai/tests/test_vector_store.py | 46 +++++++++++++++++++++ src/paperless_ai/vector_store.py | 18 ++++++++ 3 files changed, 65 insertions(+), 26 deletions(-) diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index 20c790ff5..d8c14c1ac 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -1,4 +1,3 @@ -import json import logging from collections.abc import Iterable from contextlib import contextmanager @@ -193,30 +192,6 @@ def get_rag_prompt_helper( ) -def _stored_modified_times(store: "PaperlessLanceVectorStore") -> dict[str, str]: - """Return {document_id: stored_modified_isoformat} for incremental update. - - One representative chunk per document is enough to read the stored - ``modified`` timestamp (all chunks for a document share the same value). - Only the two scalar columns needed for comparison are fetched; the vector - column is excluded to avoid unnecessary deserialization. - """ - if not store.table_exists(): - return {} - result: dict[str, str] = {} - for row in ( - store.client.open_table(LLM_INDEX_TABLE) - .search() - .select(["document_id", "node_content"]) - .to_list() - ): - doc_id = str(row["document_id"]) - if doc_id not in result: - meta = json.loads(row["node_content"]) - result[doc_id] = meta.get("modified", "") - return result - - def get_llm_index_compaction_retention() -> int: """Seconds of MVCC version history to keep during compaction.""" return 60 * 60 # 1 hour: safe for in-flight readers, reclaims daily @@ -260,7 +235,7 @@ def update_llm_index( store.add(nodes) msg = "LLM index rebuilt successfully." else: - existing = _stored_modified_times(store) + existing = store.get_modified_times() changed = 0 for document in iter_wrapper(documents): doc_id = str(document.id) diff --git a/src/paperless_ai/tests/test_vector_store.py b/src/paperless_ai/tests/test_vector_store.py index 641cafc57..9ac7907a2 100644 --- a/src/paperless_ai/tests/test_vector_store.py +++ b/src/paperless_ai/tests/test_vector_store.py @@ -318,3 +318,49 @@ class TestPaperlessLanceVectorStoreMaintenance: store: PaperlessLanceVectorStore, ) -> None: store.ensure_document_id_scalar_index() # no table yet — must not raise + + +class TestGetModifiedTimes: + @pytest.fixture + def store(self, tmp_path: Path) -> PaperlessLanceVectorStore: + return PaperlessLanceVectorStore(uri=str(tmp_path / "idx")) + + def _node_with_modified( + self, + node_id: str, + doc_id: str, + modified: str, + ) -> TextNode: + node = TextNode( + id_=node_id, + text="text", + metadata={"document_id": doc_id, "modified": modified}, + ) + node.embedding = [0.1] * DIM + node.relationships = { + NodeRelationship.SOURCE: RelatedNodeInfo(node_id=doc_id), + } + return node + + def test_empty_store_returns_empty_dict( + self, + store: PaperlessLanceVectorStore, + ) -> None: + assert store.get_modified_times() == {} + + def test_returns_one_entry_per_document( + self, + store: PaperlessLanceVectorStore, + ) -> None: + store.add( + [ + self._node_with_modified("1-0", "1", "2024-01-01T00:00:00"), + self._node_with_modified("1-1", "1", "2024-01-01T00:00:00"), + self._node_with_modified("2-0", "2", "2024-06-01T00:00:00"), + ], + ) + result = store.get_modified_times() + assert result == { + "1": "2024-01-01T00:00:00", + "2": "2024-06-01T00:00:00", + } diff --git a/src/paperless_ai/vector_store.py b/src/paperless_ai/vector_store.py index 7ab6a4dbf..85e99d584 100644 --- a/src/paperless_ai/vector_store.py +++ b/src/paperless_ai/vector_store.py @@ -274,6 +274,24 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore): except Exception as e: # pragma: no cover - depends on data/dim logger.warning("Skipping ANN index creation: %s", e) + def get_modified_times(self) -> dict[str, str]: + """Return {document_id: stored_modified_isoformat} for all indexed documents. + + One representative chunk per document is fetched; all chunks share the + same ``modified`` value so the first one seen is sufficient. + """ + if self._table is None: + return {} + result: dict[str, str] = {} + for row in ( + self._table.search().select(["document_id", "node_content"]).to_list() + ): + doc_id = str(row["document_id"]) + if doc_id not in result: + meta = json.loads(row["node_content"]) + result[doc_id] = meta.get("modified", "") + return result + def ensure_document_id_scalar_index(self) -> None: """Create a scalar index on the filter column (never on the merge key ``id`` — see https://github.com/lancedb/lancedb/issues/3177).