refactor(ai): write_store() context manager wraps the FileLock

All mutating index operations (upsert, delete, rebuild, compact) now use
  with write_store() as store:
instead of explicit FileLock + get_vector_store() at each call site.
Read paths continue to use get_vector_store() directly (no lock needed).
Also type-annotates test fixture params throughout.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
stumpylog
2026-06-03 10:28:34 -07:00
parent 2c3c892dae
commit a9d339157f
+16 -7
View File
@@ -1,6 +1,7 @@
import json
import logging
from collections.abc import Iterable
from contextlib import contextmanager
from datetime import timedelta
from typing import TYPE_CHECKING
@@ -71,6 +72,18 @@ def get_vector_store() -> "PaperlessLanceVectorStore":
)
@contextmanager
def write_store():
"""Acquire the write lock and yield the vector store.
All mutating operations (upsert, delete, rebuild, compact) must go through
this context manager to serialise concurrent Celery writers.
Read paths use ``get_vector_store()`` directly — no lock needed.
"""
with FileLock(settings.LLM_INDEX_LOCK):
yield get_vector_store()
def build_document_node(
document: Document,
*,
@@ -232,11 +245,10 @@ def update_llm_index(
chunk_size = AIConfig().llm_embedding_chunk_size
embed_model = get_embedding_model()
with FileLock(settings.LLM_INDEX_LOCK):
with write_store() as store:
if rebuild or not llm_index_exists():
(settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True)
logger.info("Rebuilding LLM index.")
store = get_vector_store()
store.drop_table()
for document in iter_wrapper(documents):
nodes = build_document_node(document, chunk_size=chunk_size)
@@ -250,7 +262,6 @@ def update_llm_index(
store.add(nodes)
msg = "LLM index rebuilt successfully."
else:
store = get_vector_store()
existing = _stored_modified_times(store)
changed = 0
for document in iter_wrapper(documents):
@@ -294,16 +305,14 @@ def llm_index_add_or_update_document(document: Document):
):
node.embedding = emb
with FileLock(settings.LLM_INDEX_LOCK):
store = get_vector_store()
with write_store() as store:
store.upsert_document(str(document.id), new_nodes)
store.ensure_document_id_scalar_index()
def llm_index_remove_document(document: Document):
"""Remove a document's chunks from the LLM index."""
with FileLock(settings.LLM_INDEX_LOCK):
store = get_vector_store()
with write_store() as store:
store.delete(str(document.id))