From 98dc1911941354906dd8bfdcf03f5caf0b0d4e12 Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Tue, 2 Jun 2026 10:46:29 -0700 Subject: [PATCH] Fix: Lock AI index during reading and don't index documents many times during a bulk update (#12899) * Fix: Move LLM index lock outside index dir and skip per-doc tasks on bulk update Two concurrency bugs from #12893: [P1] Lock file lived inside LLM_INDEX_DIR. A rebuild calls shutil.rmtree(LLM_INDEX_DIR), deleting the lock while a worker still held it. A second worker then acquired a fresh lock on the new path and ran concurrently, defeating serialisation. Move the lock to DATA_DIR/locks/llm_index.lock (a new settings constant LLM_INDEX_LOCK) so rmtree cannot touch it. The locks/ dir is created at settings load time, matching the existing pattern for LOGGING_DIR. [P2] document_updated was connected to add_or_update_document_in_llm_index in apps.py. bulk_update_documents() emits document_updated for every document in the batch, queuing N per-document LLM tasks, and then also calls update_llm_index(rebuild=False) once at the end. Pass skip_ai_index=True when sending document_updated from the bulk path so the handler skips the per-document enqueue; the existing batch call at the end of bulk_update_documents is the only LLM update for that path. Co-Authored-By: Claude Sonnet 4.6 * Fix: ghost vectors leave KeyError-prone nodes_dict entries after deletion docstore.delete_document() removes a node from the docstore but leaves its entry in index_struct.nodes_dict (the FAISS positional-id to node-UUID map). A subsequent similarity query resolves the ghost position to the deleted UUID, finds nothing in fetched_nodes_by_id, and raises KeyError inside _insert_fetched_nodes_into_query_result. Purge stale nodes_dict entries after each docstore deletion and re-sync the mutated index_struct into the kvstore so persist() writes the updated mapping. Dead FAISS vectors remain in the flat index until the next full rebuild (IndexFlatL2 is append-only); add a try/except KeyError around retriever.retrieve() as a defensive fallback for any residual ghost positions. Co-Authored-By: Claude Sonnet 4.6 * Fix: acquire index lock in query_similar_documents query_similar_documents() loaded the index and ran the FAISS retriever without holding the file lock. All write paths (update_llm_index, llm_index_add_or_update_document, llm_index_remove_document) hold FileLock(_index_lock_path()), so a concurrent rebuild calling shutil.rmtree(LLM_INDEX_DIR) while a read is mid-load produces an IOError or corrupt partial state. Wrap the load_or_build_index() call and all subsequent retriever work inside FileLock. The early-return guards (vector_store_file_exists check, empty allowed_document_ids) remain outside the lock; the DB query for the final result set also stays outside. Co-Authored-By: Claude Sonnet 4.6 * Fix: skip LLM index enqueue on document_updated during version addition When a document is consumed as a new version of an existing document, the consumer fires document_consumption_finished (which triggers add_or_update_document_in_llm_index) and then document_updated for the root document. Both signals are connected to the same handler, so the root document was enqueued for LLM indexing twice per version-addition event. Pass skip_ai_index=True on the consumer's version-addition document_updated send so the handler's existing guard suppresses the duplicate enqueue. Co-Authored-By: Claude Sonnet 4.6 * Test: bulk_update_documents must not enqueue per-doc LLM tasks With AI enabled, bulk_update_documents() sends document_updated for every document in the batch. The skip_ai_index=True kwarg (added in the P2 fix) prevents add_or_update_document_in_llm_index from enqueuing a per-document task for each one. Only the single update_llm_index call at the end should run. Co-Authored-By: Claude Sonnet 4.6 * Debug level log sure * Update src/paperless_ai/indexing.py Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com> * Apply suggestion from @shamoon --------- Co-authored-by: Claude Sonnet 4.6 Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com> --- src/documents/consumer.py | 1 + src/documents/signals/handlers.py | 2 + src/documents/tasks.py | 1 + src/documents/tests/test_tasks.py | 27 +++++++ src/paperless/settings/__init__.py | 2 + src/paperless_ai/indexing.py | 84 ++++++++++++------- src/paperless_ai/tests/test_ai_indexing.py | 93 ++++++++++++++++++++++ 7 files changed, 181 insertions(+), 29 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 390ce3e66..f050c7416 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -732,6 +732,7 @@ class ConsumerPlugin( document_updated.send( sender=self.__class__, document=document.root_document, + skip_ai_index=True, # document_consumption_finished already enqueues the LLM update ) # Delete the file only if it was successfully consumed diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 6f7628f5c..a34a3acf9 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -1344,6 +1344,8 @@ def add_or_update_document_in_llm_index(sender, document, **kwargs): """ Add or update a document in the LLM index when it is created or updated. """ + if kwargs.get("skip_ai_index"): + return ai_config = AIConfig() if ai_config.llm_index_enabled: from documents.tasks import update_document_in_llm_index diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 3005319b0..7961af381 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -319,6 +319,7 @@ def bulk_update_documents(document_ids) -> None: sender=None, document=doc, logging_group=uuid.uuid4(), + skip_ai_index=True, # bulk path calls update_llm_index once below ) post_save.send(Document, instance=doc, created=False) diff --git a/src/documents/tests/test_tasks.py b/src/documents/tests/test_tasks.py index fb06f7d60..ca57219ad 100644 --- a/src/documents/tests/test_tasks.py +++ b/src/documents/tests/test_tasks.py @@ -377,3 +377,30 @@ class TestAIIndex(DirectoriesMixin, TestCase): ) as llm_index_remove_document: tasks.remove_document_from_llm_index(doc) llm_index_remove_document.assert_called_once_with(doc) + + @override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface") + def test_bulk_update_does_not_enqueue_per_doc_llm_tasks(self) -> None: + """bulk_update_documents must not enqueue a per-document LLM task for each document. + + The bulk path calls update_llm_index once at the end; per-doc tasks would + be redundant work amplification. + """ + docs = [ + Document.objects.create( + title=f"doc{i}", + content="content", + checksum=f"checksum{i}", + ) + for i in range(3) + ] + with ( + mock.patch( + "documents.tasks.update_document_in_llm_index", + ) as update_document_in_llm_index, + mock.patch( + "documents.tasks.update_llm_index", + ) as update_llm_index, + ): + tasks.bulk_update_documents([doc.pk for doc in docs]) + self.assertEqual(update_document_in_llm_index.apply_async.call_count, 0) + update_llm_index.assert_called_once() diff --git a/src/paperless/settings/__init__.py b/src/paperless/settings/__init__.py index df3011eb2..8e47611c7 100644 --- a/src/paperless/settings/__init__.py +++ b/src/paperless/settings/__init__.py @@ -97,6 +97,8 @@ MODEL_FILE = get_path_from_env( DATA_DIR / "classification_model.pickle", ) LLM_INDEX_DIR = DATA_DIR / "llm_index" +LLM_INDEX_LOCK = DATA_DIR / "locks" / "llm_index.lock" +(DATA_DIR / "locks").mkdir(parents=True, exist_ok=True) LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log") diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index 33061d383..7ec1fdba3 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -31,8 +31,13 @@ RAG_CHUNK_OVERLAP = 200 def _index_lock_path() -> Path: - """Return the path used as the file lock for FAISS index mutations.""" - return settings.LLM_INDEX_DIR / "index.lock" + """Return the path used as the file lock for FAISS index mutations. + + The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a + rebuild — which calls shutil.rmtree(LLM_INDEX_DIR) — cannot delete the lock + while another worker still holds it. + """ + return settings.LLM_INDEX_LOCK def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool: @@ -193,6 +198,15 @@ def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex" for node_id in existing_nodes: # Delete from docstore, FAISS IndexFlatL2 are append-only index.docstore.delete_document(node_id) + # Also purge the FAISS position -> UUID mapping so subsequent similarity + # queries don't raise KeyError on ghost vector positions. + stale_keys = [ + k for k, v in index.index_struct.nodes_dict.items() if v == node_id + ] + for key in stale_keys: + del index.index_struct.nodes_dict[key] + # Re-sync the mutated index_struct so persist() writes the updated nodes_dict. + index.storage_context.index_store.add_index_struct(index.index_struct) def vector_store_file_exists(): @@ -300,7 +314,7 @@ def update_llm_index( # Delete from docstore, FAISS IndexFlatL2 are append-only for node in doc_nodes: - index.docstore.delete_document(node.node_id) + remove_document_docstore_nodes(document, index) nodes.extend(build_document_node(document, chunk_size=chunk_size)) @@ -410,36 +424,48 @@ def query_similar_documents( ) return [] - index = load_or_build_index() + with FileLock(_index_lock_path()): + index = load_or_build_index() - # constrain only the node(s) that match the document IDs, if given - doc_node_ids = ( - [ - node.node_id - for node in index.docstore.docs.values() - if node.metadata.get("document_id") in allowed_document_ids - ] - if allowed_document_ids is not None - else None - ) - if doc_node_ids is not None and not doc_node_ids: - return [] + # constrain only the node(s) that match the document IDs, if given + doc_node_ids = ( + [ + node.node_id + for node in index.docstore.docs.values() + if node.metadata.get("document_id") in allowed_document_ids + ] + if allowed_document_ids is not None + else None + ) + if doc_node_ids is not None and not doc_node_ids: + return [] - from llama_index.core.retrievers import VectorIndexRetriever + from llama_index.core.retrievers import VectorIndexRetriever - retriever = VectorIndexRetriever( - index=index, - similarity_top_k=top_k, - doc_ids=doc_node_ids, - ) + retriever = VectorIndexRetriever( + index=index, + similarity_top_k=top_k, + doc_ids=doc_node_ids, + ) - config = AIConfig() - query_text = truncate_content( - (document.title or "") + "\n" + (document.content or ""), - chunk_size=config.llm_embedding_chunk_size, - context_size=config.llm_context_size, - ) - results = retriever.retrieve(query_text) + config = AIConfig() + query_text = truncate_content( + (document.title or "") + "\n" + (document.content or ""), + chunk_size=config.llm_embedding_chunk_size, + context_size=config.llm_context_size, + ) + try: + results = retriever.retrieve(query_text) + except KeyError as e: + # Ghost FAISS positions remain after deletion because IndexFlatL2 is + # append-only. Treat them as absent and return no results. + logger.debug( + "Skipping LLM similarity query for document %s due to a stale " + "FAISS position with no docstore node: %s", + document.pk, + e, + ) + return [] retrieved_document_ids: list[int] = [] for node in results: diff --git a/src/paperless_ai/tests/test_ai_indexing.py b/src/paperless_ai/tests/test_ai_indexing.py index ee7fb4d67..339d75ead 100644 --- a/src/paperless_ai/tests/test_ai_indexing.py +++ b/src/paperless_ai/tests/test_ai_indexing.py @@ -13,6 +13,7 @@ from llama_index.core.base.embeddings.base import BaseEmbedding from documents.models import Document from documents.models import PaperlessTask +from documents.signals import document_consumption_finished from documents.signals import document_updated from documents.tests.factories import DocumentFactory from documents.tests.factories import PaperlessTaskFactory @@ -329,6 +330,26 @@ def test_remove_document_deletes_node_from_docstore( assert len(index.docstore.docs) == 0 +@pytest.mark.django_db +def test_query_after_remove_does_not_raise_key_error( + temp_llm_index_dir, + real_document, + mock_embed_model, +) -> None: + indexing.update_llm_index(rebuild=True) + + query_doc = Document.objects.create( + title="Query", + content="query content", + added=timezone.now(), + ) + + indexing.llm_index_remove_document(real_document) + + result = indexing.query_similar_documents(query_doc, top_k=5) + assert isinstance(result, list) + + @pytest.mark.django_db def test_update_llm_index_no_documents( temp_llm_index_dir, @@ -659,6 +680,27 @@ class TestDocumentUpdatedSignalTriggersLlmReindex: mock_task.apply_async.assert_called_once_with(kwargs={"document": doc}) + @pytest.mark.django_db + @override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface") + def test_version_addition_consumption_enqueues_llm_index_once( + self, + mocker: pytest_mock.MockerFixture, + ) -> None: + """When a new version is consumed, the root document must be enqueued exactly once.""" + mock_task = mocker.patch("documents.tasks.update_document_in_llm_index") + + root_doc = DocumentFactory() + document_consumption_finished.send( + sender=object, + document=root_doc, + logging_group=None, + classifier=None, + original_file=None, + ) + document_updated.send(sender=object, document=root_doc, skip_ai_index=True) + + assert mock_task.apply_async.call_count == 1 + @pytest.mark.django_db class TestLlmIndexAddOrUpdateDocumentEmptyContent: @@ -803,3 +845,54 @@ class TestLlmIndexLocking: mock_file_lock_cls.assert_called_once() mock_lock_instance.__enter__.assert_called_once() + + def test_query_similar_documents_acquires_lock( + self, + temp_llm_index_dir: Path, + mocker: pytest_mock.MockerFixture, + ) -> None: + """query_similar_documents must enter the file lock before loading the index.""" + call_order: list[str] = [] + + mock_lock_instance = MagicMock() + mock_lock_instance.__enter__ = MagicMock( + side_effect=lambda *_: call_order.append("lock_acquired"), + ) + mock_lock_instance.__exit__ = MagicMock(return_value=False) + + mock_file_lock_cls = mocker.patch( + "paperless_ai.indexing.FileLock", + return_value=mock_lock_instance, + ) + + mocker.patch( + "paperless_ai.indexing.vector_store_file_exists", + return_value=True, + ) + + mock_index = MagicMock() + mock_index.docstore.docs = {} + + mocker.patch( + "paperless_ai.indexing.load_or_build_index", + side_effect=lambda *_a, **_kw: ( + call_order.append("index_loaded") or mock_index + ), + ) + + mock_retriever = MagicMock() + mock_retriever.retrieve.return_value = [] + mocker.patch( + "llama_index.core.retrievers.VectorIndexRetriever", + return_value=mock_retriever, + ) + + mocker.patch("paperless_ai.indexing.truncate_content", return_value="") + + indexing.query_similar_documents(MagicMock(spec=Document)) + + mock_file_lock_cls.assert_called() + mock_lock_instance.__enter__.assert_called() + assert call_order.index("lock_acquired") < call_order.index("index_loaded"), ( + "Lock must be acquired before the index is loaded" + )