diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index 8c802433e..31b7b7c5c 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -1,6 +1,7 @@ import json import logging from collections.abc import Iterable +from contextlib import contextmanager from datetime import timedelta from typing import TYPE_CHECKING @@ -71,6 +72,18 @@ def get_vector_store() -> "PaperlessLanceVectorStore": ) +@contextmanager +def write_store(): + """Acquire the write lock and yield the vector store. + + All mutating operations (upsert, delete, rebuild, compact) must go through + this context manager to serialise concurrent Celery writers. + Read paths use ``get_vector_store()`` directly — no lock needed. + """ + with FileLock(settings.LLM_INDEX_LOCK): + yield get_vector_store() + + def build_document_node( document: Document, *, @@ -232,11 +245,10 @@ def update_llm_index( chunk_size = AIConfig().llm_embedding_chunk_size embed_model = get_embedding_model() - with FileLock(settings.LLM_INDEX_LOCK): + with write_store() as store: if rebuild or not llm_index_exists(): (settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True) logger.info("Rebuilding LLM index.") - store = get_vector_store() store.drop_table() for document in iter_wrapper(documents): nodes = build_document_node(document, chunk_size=chunk_size) @@ -250,7 +262,6 @@ def update_llm_index( store.add(nodes) msg = "LLM index rebuilt successfully." else: - store = get_vector_store() existing = _stored_modified_times(store) changed = 0 for document in iter_wrapper(documents): @@ -294,16 +305,14 @@ def llm_index_add_or_update_document(document: Document): ): node.embedding = emb - with FileLock(settings.LLM_INDEX_LOCK): - store = get_vector_store() + with write_store() as store: store.upsert_document(str(document.id), new_nodes) store.ensure_document_id_scalar_index() def llm_index_remove_document(document: Document): """Remove a document's chunks from the LLM index.""" - with FileLock(settings.LLM_INDEX_LOCK): - store = get_vector_store() + with write_store() as store: store.delete(str(document.id))