diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 390ce3e66..f050c7416 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -732,6 +732,7 @@ class ConsumerPlugin( document_updated.send( sender=self.__class__, document=document.root_document, + skip_ai_index=True, # document_consumption_finished already enqueues the LLM update ) # Delete the file only if it was successfully consumed diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 6f7628f5c..a34a3acf9 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -1344,6 +1344,8 @@ def add_or_update_document_in_llm_index(sender, document, **kwargs): """ Add or update a document in the LLM index when it is created or updated. """ + if kwargs.get("skip_ai_index"): + return ai_config = AIConfig() if ai_config.llm_index_enabled: from documents.tasks import update_document_in_llm_index diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 3005319b0..7961af381 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -319,6 +319,7 @@ def bulk_update_documents(document_ids) -> None: sender=None, document=doc, logging_group=uuid.uuid4(), + skip_ai_index=True, # bulk path calls update_llm_index once below ) post_save.send(Document, instance=doc, created=False) diff --git a/src/documents/tests/test_tasks.py b/src/documents/tests/test_tasks.py index fb06f7d60..ca57219ad 100644 --- a/src/documents/tests/test_tasks.py +++ b/src/documents/tests/test_tasks.py @@ -377,3 +377,30 @@ class TestAIIndex(DirectoriesMixin, TestCase): ) as llm_index_remove_document: tasks.remove_document_from_llm_index(doc) llm_index_remove_document.assert_called_once_with(doc) + + @override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface") + def test_bulk_update_does_not_enqueue_per_doc_llm_tasks(self) -> None: + """bulk_update_documents must not enqueue a per-document LLM task for each document. + + The bulk path calls update_llm_index once at the end; per-doc tasks would + be redundant work amplification. + """ + docs = [ + Document.objects.create( + title=f"doc{i}", + content="content", + checksum=f"checksum{i}", + ) + for i in range(3) + ] + with ( + mock.patch( + "documents.tasks.update_document_in_llm_index", + ) as update_document_in_llm_index, + mock.patch( + "documents.tasks.update_llm_index", + ) as update_llm_index, + ): + tasks.bulk_update_documents([doc.pk for doc in docs]) + self.assertEqual(update_document_in_llm_index.apply_async.call_count, 0) + update_llm_index.assert_called_once() diff --git a/src/paperless/settings/__init__.py b/src/paperless/settings/__init__.py index df3011eb2..8e47611c7 100644 --- a/src/paperless/settings/__init__.py +++ b/src/paperless/settings/__init__.py @@ -97,6 +97,8 @@ MODEL_FILE = get_path_from_env( DATA_DIR / "classification_model.pickle", ) LLM_INDEX_DIR = DATA_DIR / "llm_index" +LLM_INDEX_LOCK = DATA_DIR / "locks" / "llm_index.lock" +(DATA_DIR / "locks").mkdir(parents=True, exist_ok=True) LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log") diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index 33061d383..7ec1fdba3 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -31,8 +31,13 @@ RAG_CHUNK_OVERLAP = 200 def _index_lock_path() -> Path: - """Return the path used as the file lock for FAISS index mutations.""" - return settings.LLM_INDEX_DIR / "index.lock" + """Return the path used as the file lock for FAISS index mutations. + + The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a + rebuild — which calls shutil.rmtree(LLM_INDEX_DIR) — cannot delete the lock + while another worker still holds it. + """ + return settings.LLM_INDEX_LOCK def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool: @@ -193,6 +198,15 @@ def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex" for node_id in existing_nodes: # Delete from docstore, FAISS IndexFlatL2 are append-only index.docstore.delete_document(node_id) + # Also purge the FAISS position -> UUID mapping so subsequent similarity + # queries don't raise KeyError on ghost vector positions. + stale_keys = [ + k for k, v in index.index_struct.nodes_dict.items() if v == node_id + ] + for key in stale_keys: + del index.index_struct.nodes_dict[key] + # Re-sync the mutated index_struct so persist() writes the updated nodes_dict. + index.storage_context.index_store.add_index_struct(index.index_struct) def vector_store_file_exists(): @@ -300,7 +314,7 @@ def update_llm_index( # Delete from docstore, FAISS IndexFlatL2 are append-only for node in doc_nodes: - index.docstore.delete_document(node.node_id) + remove_document_docstore_nodes(document, index) nodes.extend(build_document_node(document, chunk_size=chunk_size)) @@ -410,36 +424,48 @@ def query_similar_documents( ) return [] - index = load_or_build_index() + with FileLock(_index_lock_path()): + index = load_or_build_index() - # constrain only the node(s) that match the document IDs, if given - doc_node_ids = ( - [ - node.node_id - for node in index.docstore.docs.values() - if node.metadata.get("document_id") in allowed_document_ids - ] - if allowed_document_ids is not None - else None - ) - if doc_node_ids is not None and not doc_node_ids: - return [] + # constrain only the node(s) that match the document IDs, if given + doc_node_ids = ( + [ + node.node_id + for node in index.docstore.docs.values() + if node.metadata.get("document_id") in allowed_document_ids + ] + if allowed_document_ids is not None + else None + ) + if doc_node_ids is not None and not doc_node_ids: + return [] - from llama_index.core.retrievers import VectorIndexRetriever + from llama_index.core.retrievers import VectorIndexRetriever - retriever = VectorIndexRetriever( - index=index, - similarity_top_k=top_k, - doc_ids=doc_node_ids, - ) + retriever = VectorIndexRetriever( + index=index, + similarity_top_k=top_k, + doc_ids=doc_node_ids, + ) - config = AIConfig() - query_text = truncate_content( - (document.title or "") + "\n" + (document.content or ""), - chunk_size=config.llm_embedding_chunk_size, - context_size=config.llm_context_size, - ) - results = retriever.retrieve(query_text) + config = AIConfig() + query_text = truncate_content( + (document.title or "") + "\n" + (document.content or ""), + chunk_size=config.llm_embedding_chunk_size, + context_size=config.llm_context_size, + ) + try: + results = retriever.retrieve(query_text) + except KeyError as e: + # Ghost FAISS positions remain after deletion because IndexFlatL2 is + # append-only. Treat them as absent and return no results. + logger.debug( + "Skipping LLM similarity query for document %s due to a stale " + "FAISS position with no docstore node: %s", + document.pk, + e, + ) + return [] retrieved_document_ids: list[int] = [] for node in results: diff --git a/src/paperless_ai/tests/test_ai_indexing.py b/src/paperless_ai/tests/test_ai_indexing.py index ee7fb4d67..339d75ead 100644 --- a/src/paperless_ai/tests/test_ai_indexing.py +++ b/src/paperless_ai/tests/test_ai_indexing.py @@ -13,6 +13,7 @@ from llama_index.core.base.embeddings.base import BaseEmbedding from documents.models import Document from documents.models import PaperlessTask +from documents.signals import document_consumption_finished from documents.signals import document_updated from documents.tests.factories import DocumentFactory from documents.tests.factories import PaperlessTaskFactory @@ -329,6 +330,26 @@ def test_remove_document_deletes_node_from_docstore( assert len(index.docstore.docs) == 0 +@pytest.mark.django_db +def test_query_after_remove_does_not_raise_key_error( + temp_llm_index_dir, + real_document, + mock_embed_model, +) -> None: + indexing.update_llm_index(rebuild=True) + + query_doc = Document.objects.create( + title="Query", + content="query content", + added=timezone.now(), + ) + + indexing.llm_index_remove_document(real_document) + + result = indexing.query_similar_documents(query_doc, top_k=5) + assert isinstance(result, list) + + @pytest.mark.django_db def test_update_llm_index_no_documents( temp_llm_index_dir, @@ -659,6 +680,27 @@ class TestDocumentUpdatedSignalTriggersLlmReindex: mock_task.apply_async.assert_called_once_with(kwargs={"document": doc}) + @pytest.mark.django_db + @override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface") + def test_version_addition_consumption_enqueues_llm_index_once( + self, + mocker: pytest_mock.MockerFixture, + ) -> None: + """When a new version is consumed, the root document must be enqueued exactly once.""" + mock_task = mocker.patch("documents.tasks.update_document_in_llm_index") + + root_doc = DocumentFactory() + document_consumption_finished.send( + sender=object, + document=root_doc, + logging_group=None, + classifier=None, + original_file=None, + ) + document_updated.send(sender=object, document=root_doc, skip_ai_index=True) + + assert mock_task.apply_async.call_count == 1 + @pytest.mark.django_db class TestLlmIndexAddOrUpdateDocumentEmptyContent: @@ -803,3 +845,54 @@ class TestLlmIndexLocking: mock_file_lock_cls.assert_called_once() mock_lock_instance.__enter__.assert_called_once() + + def test_query_similar_documents_acquires_lock( + self, + temp_llm_index_dir: Path, + mocker: pytest_mock.MockerFixture, + ) -> None: + """query_similar_documents must enter the file lock before loading the index.""" + call_order: list[str] = [] + + mock_lock_instance = MagicMock() + mock_lock_instance.__enter__ = MagicMock( + side_effect=lambda *_: call_order.append("lock_acquired"), + ) + mock_lock_instance.__exit__ = MagicMock(return_value=False) + + mock_file_lock_cls = mocker.patch( + "paperless_ai.indexing.FileLock", + return_value=mock_lock_instance, + ) + + mocker.patch( + "paperless_ai.indexing.vector_store_file_exists", + return_value=True, + ) + + mock_index = MagicMock() + mock_index.docstore.docs = {} + + mocker.patch( + "paperless_ai.indexing.load_or_build_index", + side_effect=lambda *_a, **_kw: ( + call_order.append("index_loaded") or mock_index + ), + ) + + mock_retriever = MagicMock() + mock_retriever.retrieve.return_value = [] + mocker.patch( + "llama_index.core.retrievers.VectorIndexRetriever", + return_value=mock_retriever, + ) + + mocker.patch("paperless_ai.indexing.truncate_content", return_value="") + + indexing.query_similar_documents(MagicMock(spec=Document)) + + mock_file_lock_cls.assert_called() + mock_lock_instance.__enter__.assert_called() + assert call_order.index("lock_acquired") < call_order.index("index_loaded"), ( + "Lock must be acquired before the index is loaded" + )