mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-06-04 20:59:44 +00:00
Fix: Lock AI index during reading and don't index documents many times during a bulk update (#12899)
* Fix: Move LLM index lock outside index dir and skip per-doc tasks on bulk update Two concurrency bugs from #12893: [P1] Lock file lived inside LLM_INDEX_DIR. A rebuild calls shutil.rmtree(LLM_INDEX_DIR), deleting the lock while a worker still held it. A second worker then acquired a fresh lock on the new path and ran concurrently, defeating serialisation. Move the lock to DATA_DIR/locks/llm_index.lock (a new settings constant LLM_INDEX_LOCK) so rmtree cannot touch it. The locks/ dir is created at settings load time, matching the existing pattern for LOGGING_DIR. [P2] document_updated was connected to add_or_update_document_in_llm_index in apps.py. bulk_update_documents() emits document_updated for every document in the batch, queuing N per-document LLM tasks, and then also calls update_llm_index(rebuild=False) once at the end. Pass skip_ai_index=True when sending document_updated from the bulk path so the handler skips the per-document enqueue; the existing batch call at the end of bulk_update_documents is the only LLM update for that path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix: ghost vectors leave KeyError-prone nodes_dict entries after deletion docstore.delete_document() removes a node from the docstore but leaves its entry in index_struct.nodes_dict (the FAISS positional-id to node-UUID map). A subsequent similarity query resolves the ghost position to the deleted UUID, finds nothing in fetched_nodes_by_id, and raises KeyError inside _insert_fetched_nodes_into_query_result. Purge stale nodes_dict entries after each docstore deletion and re-sync the mutated index_struct into the kvstore so persist() writes the updated mapping. Dead FAISS vectors remain in the flat index until the next full rebuild (IndexFlatL2 is append-only); add a try/except KeyError around retriever.retrieve() as a defensive fallback for any residual ghost positions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix: acquire index lock in query_similar_documents query_similar_documents() loaded the index and ran the FAISS retriever without holding the file lock. All write paths (update_llm_index, llm_index_add_or_update_document, llm_index_remove_document) hold FileLock(_index_lock_path()), so a concurrent rebuild calling shutil.rmtree(LLM_INDEX_DIR) while a read is mid-load produces an IOError or corrupt partial state. Wrap the load_or_build_index() call and all subsequent retriever work inside FileLock. The early-return guards (vector_store_file_exists check, empty allowed_document_ids) remain outside the lock; the DB query for the final result set also stays outside. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix: skip LLM index enqueue on document_updated during version addition When a document is consumed as a new version of an existing document, the consumer fires document_consumption_finished (which triggers add_or_update_document_in_llm_index) and then document_updated for the root document. Both signals are connected to the same handler, so the root document was enqueued for LLM indexing twice per version-addition event. Pass skip_ai_index=True on the consumer's version-addition document_updated send so the handler's existing guard suppresses the duplicate enqueue. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Test: bulk_update_documents must not enqueue per-doc LLM tasks With AI enabled, bulk_update_documents() sends document_updated for every document in the batch. The skip_ai_index=True kwarg (added in the P2 fix) prevents add_or_update_document_in_llm_index from enqueuing a per-document task for each one. Only the single update_llm_index call at the end should run. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Debug level log sure * Update src/paperless_ai/indexing.py Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com> * Apply suggestion from @shamoon --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
This commit is contained in:
@@ -732,6 +732,7 @@ class ConsumerPlugin(
|
||||
document_updated.send(
|
||||
sender=self.__class__,
|
||||
document=document.root_document,
|
||||
skip_ai_index=True, # document_consumption_finished already enqueues the LLM update
|
||||
)
|
||||
|
||||
# Delete the file only if it was successfully consumed
|
||||
|
||||
@@ -1344,6 +1344,8 @@ def add_or_update_document_in_llm_index(sender, document, **kwargs):
|
||||
"""
|
||||
Add or update a document in the LLM index when it is created or updated.
|
||||
"""
|
||||
if kwargs.get("skip_ai_index"):
|
||||
return
|
||||
ai_config = AIConfig()
|
||||
if ai_config.llm_index_enabled:
|
||||
from documents.tasks import update_document_in_llm_index
|
||||
|
||||
@@ -319,6 +319,7 @@ def bulk_update_documents(document_ids) -> None:
|
||||
sender=None,
|
||||
document=doc,
|
||||
logging_group=uuid.uuid4(),
|
||||
skip_ai_index=True, # bulk path calls update_llm_index once below
|
||||
)
|
||||
post_save.send(Document, instance=doc, created=False)
|
||||
|
||||
|
||||
@@ -377,3 +377,30 @@ class TestAIIndex(DirectoriesMixin, TestCase):
|
||||
) as llm_index_remove_document:
|
||||
tasks.remove_document_from_llm_index(doc)
|
||||
llm_index_remove_document.assert_called_once_with(doc)
|
||||
|
||||
@override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface")
|
||||
def test_bulk_update_does_not_enqueue_per_doc_llm_tasks(self) -> None:
|
||||
"""bulk_update_documents must not enqueue a per-document LLM task for each document.
|
||||
|
||||
The bulk path calls update_llm_index once at the end; per-doc tasks would
|
||||
be redundant work amplification.
|
||||
"""
|
||||
docs = [
|
||||
Document.objects.create(
|
||||
title=f"doc{i}",
|
||||
content="content",
|
||||
checksum=f"checksum{i}",
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
with (
|
||||
mock.patch(
|
||||
"documents.tasks.update_document_in_llm_index",
|
||||
) as update_document_in_llm_index,
|
||||
mock.patch(
|
||||
"documents.tasks.update_llm_index",
|
||||
) as update_llm_index,
|
||||
):
|
||||
tasks.bulk_update_documents([doc.pk for doc in docs])
|
||||
self.assertEqual(update_document_in_llm_index.apply_async.call_count, 0)
|
||||
update_llm_index.assert_called_once()
|
||||
|
||||
@@ -97,6 +97,8 @@ MODEL_FILE = get_path_from_env(
|
||||
DATA_DIR / "classification_model.pickle",
|
||||
)
|
||||
LLM_INDEX_DIR = DATA_DIR / "llm_index"
|
||||
LLM_INDEX_LOCK = DATA_DIR / "locks" / "llm_index.lock"
|
||||
(DATA_DIR / "locks").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
|
||||
|
||||
|
||||
@@ -31,8 +31,13 @@ RAG_CHUNK_OVERLAP = 200
|
||||
|
||||
|
||||
def _index_lock_path() -> Path:
|
||||
"""Return the path used as the file lock for FAISS index mutations."""
|
||||
return settings.LLM_INDEX_DIR / "index.lock"
|
||||
"""Return the path used as the file lock for FAISS index mutations.
|
||||
|
||||
The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a
|
||||
rebuild — which calls shutil.rmtree(LLM_INDEX_DIR) — cannot delete the lock
|
||||
while another worker still holds it.
|
||||
"""
|
||||
return settings.LLM_INDEX_LOCK
|
||||
|
||||
|
||||
def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool:
|
||||
@@ -193,6 +198,15 @@ def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex"
|
||||
for node_id in existing_nodes:
|
||||
# Delete from docstore, FAISS IndexFlatL2 are append-only
|
||||
index.docstore.delete_document(node_id)
|
||||
# Also purge the FAISS position -> UUID mapping so subsequent similarity
|
||||
# queries don't raise KeyError on ghost vector positions.
|
||||
stale_keys = [
|
||||
k for k, v in index.index_struct.nodes_dict.items() if v == node_id
|
||||
]
|
||||
for key in stale_keys:
|
||||
del index.index_struct.nodes_dict[key]
|
||||
# Re-sync the mutated index_struct so persist() writes the updated nodes_dict.
|
||||
index.storage_context.index_store.add_index_struct(index.index_struct)
|
||||
|
||||
|
||||
def vector_store_file_exists():
|
||||
@@ -300,7 +314,7 @@ def update_llm_index(
|
||||
|
||||
# Delete from docstore, FAISS IndexFlatL2 are append-only
|
||||
for node in doc_nodes:
|
||||
index.docstore.delete_document(node.node_id)
|
||||
remove_document_docstore_nodes(document, index)
|
||||
|
||||
nodes.extend(build_document_node(document, chunk_size=chunk_size))
|
||||
|
||||
@@ -410,36 +424,48 @@ def query_similar_documents(
|
||||
)
|
||||
return []
|
||||
|
||||
index = load_or_build_index()
|
||||
with FileLock(_index_lock_path()):
|
||||
index = load_or_build_index()
|
||||
|
||||
# constrain only the node(s) that match the document IDs, if given
|
||||
doc_node_ids = (
|
||||
[
|
||||
node.node_id
|
||||
for node in index.docstore.docs.values()
|
||||
if node.metadata.get("document_id") in allowed_document_ids
|
||||
]
|
||||
if allowed_document_ids is not None
|
||||
else None
|
||||
)
|
||||
if doc_node_ids is not None and not doc_node_ids:
|
||||
return []
|
||||
# constrain only the node(s) that match the document IDs, if given
|
||||
doc_node_ids = (
|
||||
[
|
||||
node.node_id
|
||||
for node in index.docstore.docs.values()
|
||||
if node.metadata.get("document_id") in allowed_document_ids
|
||||
]
|
||||
if allowed_document_ids is not None
|
||||
else None
|
||||
)
|
||||
if doc_node_ids is not None and not doc_node_ids:
|
||||
return []
|
||||
|
||||
from llama_index.core.retrievers import VectorIndexRetriever
|
||||
from llama_index.core.retrievers import VectorIndexRetriever
|
||||
|
||||
retriever = VectorIndexRetriever(
|
||||
index=index,
|
||||
similarity_top_k=top_k,
|
||||
doc_ids=doc_node_ids,
|
||||
)
|
||||
retriever = VectorIndexRetriever(
|
||||
index=index,
|
||||
similarity_top_k=top_k,
|
||||
doc_ids=doc_node_ids,
|
||||
)
|
||||
|
||||
config = AIConfig()
|
||||
query_text = truncate_content(
|
||||
(document.title or "") + "\n" + (document.content or ""),
|
||||
chunk_size=config.llm_embedding_chunk_size,
|
||||
context_size=config.llm_context_size,
|
||||
)
|
||||
results = retriever.retrieve(query_text)
|
||||
config = AIConfig()
|
||||
query_text = truncate_content(
|
||||
(document.title or "") + "\n" + (document.content or ""),
|
||||
chunk_size=config.llm_embedding_chunk_size,
|
||||
context_size=config.llm_context_size,
|
||||
)
|
||||
try:
|
||||
results = retriever.retrieve(query_text)
|
||||
except KeyError as e:
|
||||
# Ghost FAISS positions remain after deletion because IndexFlatL2 is
|
||||
# append-only. Treat them as absent and return no results.
|
||||
logger.debug(
|
||||
"Skipping LLM similarity query for document %s due to a stale "
|
||||
"FAISS position with no docstore node: %s",
|
||||
document.pk,
|
||||
e,
|
||||
)
|
||||
return []
|
||||
|
||||
retrieved_document_ids: list[int] = []
|
||||
for node in results:
|
||||
|
||||
@@ -13,6 +13,7 @@ from llama_index.core.base.embeddings.base import BaseEmbedding
|
||||
|
||||
from documents.models import Document
|
||||
from documents.models import PaperlessTask
|
||||
from documents.signals import document_consumption_finished
|
||||
from documents.signals import document_updated
|
||||
from documents.tests.factories import DocumentFactory
|
||||
from documents.tests.factories import PaperlessTaskFactory
|
||||
@@ -329,6 +330,26 @@ def test_remove_document_deletes_node_from_docstore(
|
||||
assert len(index.docstore.docs) == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_query_after_remove_does_not_raise_key_error(
|
||||
temp_llm_index_dir,
|
||||
real_document,
|
||||
mock_embed_model,
|
||||
) -> None:
|
||||
indexing.update_llm_index(rebuild=True)
|
||||
|
||||
query_doc = Document.objects.create(
|
||||
title="Query",
|
||||
content="query content",
|
||||
added=timezone.now(),
|
||||
)
|
||||
|
||||
indexing.llm_index_remove_document(real_document)
|
||||
|
||||
result = indexing.query_similar_documents(query_doc, top_k=5)
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_llm_index_no_documents(
|
||||
temp_llm_index_dir,
|
||||
@@ -659,6 +680,27 @@ class TestDocumentUpdatedSignalTriggersLlmReindex:
|
||||
|
||||
mock_task.apply_async.assert_called_once_with(kwargs={"document": doc})
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="huggingface")
|
||||
def test_version_addition_consumption_enqueues_llm_index_once(
|
||||
self,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
) -> None:
|
||||
"""When a new version is consumed, the root document must be enqueued exactly once."""
|
||||
mock_task = mocker.patch("documents.tasks.update_document_in_llm_index")
|
||||
|
||||
root_doc = DocumentFactory()
|
||||
document_consumption_finished.send(
|
||||
sender=object,
|
||||
document=root_doc,
|
||||
logging_group=None,
|
||||
classifier=None,
|
||||
original_file=None,
|
||||
)
|
||||
document_updated.send(sender=object, document=root_doc, skip_ai_index=True)
|
||||
|
||||
assert mock_task.apply_async.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestLlmIndexAddOrUpdateDocumentEmptyContent:
|
||||
@@ -803,3 +845,54 @@ class TestLlmIndexLocking:
|
||||
|
||||
mock_file_lock_cls.assert_called_once()
|
||||
mock_lock_instance.__enter__.assert_called_once()
|
||||
|
||||
def test_query_similar_documents_acquires_lock(
|
||||
self,
|
||||
temp_llm_index_dir: Path,
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
) -> None:
|
||||
"""query_similar_documents must enter the file lock before loading the index."""
|
||||
call_order: list[str] = []
|
||||
|
||||
mock_lock_instance = MagicMock()
|
||||
mock_lock_instance.__enter__ = MagicMock(
|
||||
side_effect=lambda *_: call_order.append("lock_acquired"),
|
||||
)
|
||||
mock_lock_instance.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mock_file_lock_cls = mocker.patch(
|
||||
"paperless_ai.indexing.FileLock",
|
||||
return_value=mock_lock_instance,
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"paperless_ai.indexing.vector_store_file_exists",
|
||||
return_value=True,
|
||||
)
|
||||
|
||||
mock_index = MagicMock()
|
||||
mock_index.docstore.docs = {}
|
||||
|
||||
mocker.patch(
|
||||
"paperless_ai.indexing.load_or_build_index",
|
||||
side_effect=lambda *_a, **_kw: (
|
||||
call_order.append("index_loaded") or mock_index
|
||||
),
|
||||
)
|
||||
|
||||
mock_retriever = MagicMock()
|
||||
mock_retriever.retrieve.return_value = []
|
||||
mocker.patch(
|
||||
"llama_index.core.retrievers.VectorIndexRetriever",
|
||||
return_value=mock_retriever,
|
||||
)
|
||||
|
||||
mocker.patch("paperless_ai.indexing.truncate_content", return_value="")
|
||||
|
||||
indexing.query_similar_documents(MagicMock(spec=Document))
|
||||
|
||||
mock_file_lock_cls.assert_called()
|
||||
mock_lock_instance.__enter__.assert_called()
|
||||
assert call_order.index("lock_acquired") < call_order.index("index_loaded"), (
|
||||
"Lock must be acquired before the index is loaded"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user