From a9d339157fd10d2490dd9e174d7dce3e5fcc1963 Mon Sep 17 00:00:00 2001 From: stumpylog <797416+stumpylog@users.noreply.github.com> Date: Wed, 3 Jun 2026 10:28:34 -0700 Subject: [PATCH] refactor(ai): write_store() context manager wraps the FileLock All mutating index operations (upsert, delete, rebuild, compact) now use with write_store() as store: instead of explicit FileLock + get_vector_store() at each call site. Read paths continue to use get_vector_store() directly (no lock needed). Also type-annotates test fixture params throughout. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/paperless_ai/indexing.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index 8c802433e..31b7b7c5c 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -1,6 +1,7 @@ import json import logging from collections.abc import Iterable +from contextlib import contextmanager from datetime import timedelta from typing import TYPE_CHECKING @@ -71,6 +72,18 @@ def get_vector_store() -> "PaperlessLanceVectorStore": ) +@contextmanager +def write_store(): + """Acquire the write lock and yield the vector store. + + All mutating operations (upsert, delete, rebuild, compact) must go through + this context manager to serialise concurrent Celery writers. + Read paths use ``get_vector_store()`` directly — no lock needed. + """ + with FileLock(settings.LLM_INDEX_LOCK): + yield get_vector_store() + + def build_document_node( document: Document, *, @@ -232,11 +245,10 @@ def update_llm_index( chunk_size = AIConfig().llm_embedding_chunk_size embed_model = get_embedding_model() - with FileLock(settings.LLM_INDEX_LOCK): + with write_store() as store: if rebuild or not llm_index_exists(): (settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True) logger.info("Rebuilding LLM index.") - store = get_vector_store() store.drop_table() for document in iter_wrapper(documents): nodes = build_document_node(document, chunk_size=chunk_size) @@ -250,7 +262,6 @@ def update_llm_index( store.add(nodes) msg = "LLM index rebuilt successfully." else: - store = get_vector_store() existing = _stored_modified_times(store) changed = 0 for document in iter_wrapper(documents): @@ -294,16 +305,14 @@ def llm_index_add_or_update_document(document: Document): ): node.embedding = emb - with FileLock(settings.LLM_INDEX_LOCK): - store = get_vector_store() + with write_store() as store: store.upsert_document(str(document.id), new_nodes) store.ensure_document_id_scalar_index() def llm_index_remove_document(document: Document): """Remove a document's chunks from the LLM index.""" - with FileLock(settings.LLM_INDEX_LOCK): - store = get_vector_store() + with write_store() as store: store.delete(str(document.id))