Abstracts the modified check into the vector store

This commit is contained in:
stumpylog
2026-06-05 13:06:44 -07:00
parent 8533b99adf
commit e3ebe7cda1
3 changed files with 65 additions and 26 deletions
+1 -26
View File
@@ -1,4 +1,3 @@
import json
import logging
from collections.abc import Iterable
from contextlib import contextmanager
@@ -193,30 +192,6 @@ def get_rag_prompt_helper(
)
def _stored_modified_times(store: "PaperlessLanceVectorStore") -> dict[str, str]:
"""Return {document_id: stored_modified_isoformat} for incremental update.
One representative chunk per document is enough to read the stored
``modified`` timestamp (all chunks for a document share the same value).
Only the two scalar columns needed for comparison are fetched; the vector
column is excluded to avoid unnecessary deserialization.
"""
if not store.table_exists():
return {}
result: dict[str, str] = {}
for row in (
store.client.open_table(LLM_INDEX_TABLE)
.search()
.select(["document_id", "node_content"])
.to_list()
):
doc_id = str(row["document_id"])
if doc_id not in result:
meta = json.loads(row["node_content"])
result[doc_id] = meta.get("modified", "")
return result
def get_llm_index_compaction_retention() -> int:
"""Seconds of MVCC version history to keep during compaction."""
return 60 * 60 # 1 hour: safe for in-flight readers, reclaims daily
@@ -260,7 +235,7 @@ def update_llm_index(
store.add(nodes)
msg = "LLM index rebuilt successfully."
else:
existing = _stored_modified_times(store)
existing = store.get_modified_times()
changed = 0
for document in iter_wrapper(documents):
doc_id = str(document.id)
@@ -318,3 +318,49 @@ class TestPaperlessLanceVectorStoreMaintenance:
store: PaperlessLanceVectorStore,
) -> None:
store.ensure_document_id_scalar_index() # no table yet — must not raise
class TestGetModifiedTimes:
@pytest.fixture
def store(self, tmp_path: Path) -> PaperlessLanceVectorStore:
return PaperlessLanceVectorStore(uri=str(tmp_path / "idx"))
def _node_with_modified(
self,
node_id: str,
doc_id: str,
modified: str,
) -> TextNode:
node = TextNode(
id_=node_id,
text="text",
metadata={"document_id": doc_id, "modified": modified},
)
node.embedding = [0.1] * DIM
node.relationships = {
NodeRelationship.SOURCE: RelatedNodeInfo(node_id=doc_id),
}
return node
def test_empty_store_returns_empty_dict(
self,
store: PaperlessLanceVectorStore,
) -> None:
assert store.get_modified_times() == {}
def test_returns_one_entry_per_document(
self,
store: PaperlessLanceVectorStore,
) -> None:
store.add(
[
self._node_with_modified("1-0", "1", "2024-01-01T00:00:00"),
self._node_with_modified("1-1", "1", "2024-01-01T00:00:00"),
self._node_with_modified("2-0", "2", "2024-06-01T00:00:00"),
],
)
result = store.get_modified_times()
assert result == {
"1": "2024-01-01T00:00:00",
"2": "2024-06-01T00:00:00",
}
+18
View File
@@ -274,6 +274,24 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
except Exception as e: # pragma: no cover - depends on data/dim
logger.warning("Skipping ANN index creation: %s", e)
def get_modified_times(self) -> dict[str, str]:
"""Return {document_id: stored_modified_isoformat} for all indexed documents.
One representative chunk per document is fetched; all chunks share the
same ``modified`` value so the first one seen is sufficient.
"""
if self._table is None:
return {}
result: dict[str, str] = {}
for row in (
self._table.search().select(["document_id", "node_content"]).to_list()
):
doc_id = str(row["document_id"])
if doc_id not in result:
meta = json.loads(row["node_content"])
result[doc_id] = meta.get("modified", "")
return result
def ensure_document_id_scalar_index(self) -> None:
"""Create a scalar index on the filter column (never on the merge key
``id`` — see https://github.com/lancedb/lancedb/issues/3177).