Files
paperless-ngx/src/paperless_ai/indexing.py
T
Trenton H 98dc191194 Fix: Lock AI index during reading and don't index documents many times during a bulk update (#12899)
* Fix: Move LLM index lock outside index dir and skip per-doc tasks on bulk update

Two concurrency bugs from #12893:

[P1] Lock file lived inside LLM_INDEX_DIR. A rebuild calls
shutil.rmtree(LLM_INDEX_DIR), deleting the lock while a worker still
held it. A second worker then acquired a fresh lock on the new path and
ran concurrently, defeating serialisation. Move the lock to
DATA_DIR/locks/llm_index.lock (a new settings constant LLM_INDEX_LOCK)
so rmtree cannot touch it. The locks/ dir is created at settings load
time, matching the existing pattern for LOGGING_DIR.

[P2] document_updated was connected to add_or_update_document_in_llm_index
in apps.py. bulk_update_documents() emits document_updated for every
document in the batch, queuing N per-document LLM tasks, and then also
calls update_llm_index(rebuild=False) once at the end. Pass
skip_ai_index=True when sending document_updated from the bulk path so
the handler skips the per-document enqueue; the existing batch call at
the end of bulk_update_documents is the only LLM update for that path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix: ghost vectors leave KeyError-prone nodes_dict entries after deletion

docstore.delete_document() removes a node from the docstore but leaves its
entry in index_struct.nodes_dict (the FAISS positional-id to node-UUID map).
A subsequent similarity query resolves the ghost position to the deleted UUID,
finds nothing in fetched_nodes_by_id, and raises KeyError inside
_insert_fetched_nodes_into_query_result.

Purge stale nodes_dict entries after each docstore deletion and re-sync the
mutated index_struct into the kvstore so persist() writes the updated mapping.
Dead FAISS vectors remain in the flat index until the next full rebuild
(IndexFlatL2 is append-only); add a try/except KeyError around
retriever.retrieve() as a defensive fallback for any residual ghost positions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix: acquire index lock in query_similar_documents

query_similar_documents() loaded the index and ran the FAISS retriever
without holding the file lock. All write paths (update_llm_index,
llm_index_add_or_update_document, llm_index_remove_document) hold
FileLock(_index_lock_path()), so a concurrent rebuild calling
shutil.rmtree(LLM_INDEX_DIR) while a read is mid-load produces an IOError
or corrupt partial state.

Wrap the load_or_build_index() call and all subsequent retriever work inside
FileLock. The early-return guards (vector_store_file_exists check, empty
allowed_document_ids) remain outside the lock; the DB query for the final
result set also stays outside.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix: skip LLM index enqueue on document_updated during version addition

When a document is consumed as a new version of an existing document, the
consumer fires document_consumption_finished (which triggers
add_or_update_document_in_llm_index) and then document_updated for the root
document. Both signals are connected to the same handler, so the root document
was enqueued for LLM indexing twice per version-addition event.

Pass skip_ai_index=True on the consumer's version-addition document_updated
send so the handler's existing guard suppresses the duplicate enqueue.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Test: bulk_update_documents must not enqueue per-doc LLM tasks

With AI enabled, bulk_update_documents() sends document_updated for every
document in the batch. The skip_ai_index=True kwarg (added in the P2 fix)
prevents add_or_update_document_in_llm_index from enqueuing a per-document
task for each one. Only the single update_llm_index call at the end should run.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Debug level log sure

* Update src/paperless_ai/indexing.py

Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>

* Apply suggestion from @shamoon

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
2026-06-02 10:46:29 -07:00

490 lines
17 KiB
Python

import logging
import shutil
from collections import defaultdict
from collections.abc import Iterable
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from django.conf import settings
from django.utils import timezone
from filelock import FileLock
from documents.models import Document
from documents.models import PaperlessTask
from documents.utils import IterWrapper
from documents.utils import identity
from paperless.config import AIConfig
from paperless_ai.embedding import build_llm_index_text
from paperless_ai.embedding import get_embedding_dim
from paperless_ai.embedding import get_embedding_model
if TYPE_CHECKING:
from llama_index.core import VectorStoreIndex
from llama_index.core.schema import BaseNode
logger = logging.getLogger("paperless_ai.indexing")
RAG_NUM_OUTPUT = 512
RAG_CHUNK_OVERLAP = 200
def _index_lock_path() -> Path:
"""Return the path used as the file lock for FAISS index mutations.
The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a
rebuild — which calls shutil.rmtree(LLM_INDEX_DIR) — cannot delete the lock
while another worker still holds it.
"""
return settings.LLM_INDEX_LOCK
def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool:
# NOTE: The check-then-enqueue sequence below is non-atomic (TOCTOU): two
# concurrent workers can both observe no running task and both enqueue a
# full rebuild. This is wasteful but not data-corrupting — update_llm_index
# is itself protected by _index_lock_path(), so only one rebuild runs at a
# time and the second one is serialised after the first completes.
from documents.tasks import llmindex_index
has_running = PaperlessTask.objects.filter(
task_type=PaperlessTask.TaskType.LLM_INDEX,
status__in=[PaperlessTask.Status.PENDING, PaperlessTask.Status.STARTED],
).exists()
has_recent = PaperlessTask.objects.filter(
task_type=PaperlessTask.TaskType.LLM_INDEX,
date_created__gte=(timezone.now() - timedelta(minutes=5)),
).exists()
if has_running or has_recent:
return False
llmindex_index.apply_async(
kwargs={"rebuild": rebuild},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
logger.warning(
"Queued LLM index update%s: %s",
" (rebuild)" if rebuild else "",
reason,
)
return True
def get_or_create_storage_context(*, rebuild=False):
"""
Loads or creates the StorageContext (vector store, docstore, index store).
If rebuild=True, deletes and recreates everything.
"""
if rebuild:
shutil.rmtree(settings.LLM_INDEX_DIR, ignore_errors=True)
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
if rebuild or not settings.LLM_INDEX_DIR.exists():
import faiss
from llama_index.core import StorageContext
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.storage.index_store import SimpleIndexStore
from llama_index.vector_stores.faiss import FaissVectorStore
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
embedding_dim = get_embedding_dim()
faiss_index = faiss.IndexFlatL2(embedding_dim)
vector_store = FaissVectorStore(faiss_index=faiss_index)
docstore = SimpleDocumentStore()
index_store = SimpleIndexStore()
else:
from llama_index.core import StorageContext
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.storage.index_store import SimpleIndexStore
from llama_index.vector_stores.faiss import FaissVectorStore
vector_store = FaissVectorStore.from_persist_dir(settings.LLM_INDEX_DIR)
docstore = SimpleDocumentStore.from_persist_dir(settings.LLM_INDEX_DIR)
index_store = SimpleIndexStore.from_persist_dir(settings.LLM_INDEX_DIR)
return StorageContext.from_defaults(
docstore=docstore,
index_store=index_store,
vector_store=vector_store,
persist_dir=settings.LLM_INDEX_DIR,
)
def build_document_node(
document: Document,
*,
chunk_size: int | None = None,
) -> list["BaseNode"]:
"""
Given a Document, returns parsed Nodes ready for indexing.
"""
text = build_llm_index_text(document)
metadata = {
"document_id": str(document.id),
"title": document.title,
"tags": [t.name for t in document.tags.all()],
"correspondent": document.correspondent.name
if document.correspondent
else None,
"document_type": document.document_type.name
if document.document_type
else None,
"created": document.created.isoformat() if document.created else None,
"added": document.added.isoformat() if document.added else None,
"modified": document.modified.isoformat(),
}
from llama_index.core import Document as LlamaDocument
from llama_index.core.node_parser import SimpleNodeParser
# Exclude all metadata keys from the embedding text — build_llm_index_text
# already encodes this info in the body, so prepending it again would double
# the token count and exceed embedding models with small context windows
# (e.g. nomic-embed-text via Ollama defaults to num_ctx=2048).
doc = LlamaDocument(
text=text,
metadata=metadata,
excluded_embed_metadata_keys=list(metadata.keys()),
)
chunk_size = chunk_size or get_rag_chunk_size()
parser = SimpleNodeParser(
chunk_size=chunk_size,
chunk_overlap=get_rag_chunk_overlap(chunk_size),
)
return parser.get_nodes_from_documents([doc])
def load_or_build_index(nodes=None):
"""
Load an existing VectorStoreIndex if present,
or build a new one using provided nodes if storage is empty.
"""
import llama_index.core.settings as llama_settings
from llama_index.core import VectorStoreIndex
from llama_index.core import load_index_from_storage
embed_model = get_embedding_model()
llama_settings.Settings.embed_model = embed_model
storage_context = get_or_create_storage_context()
try:
return load_index_from_storage(storage_context=storage_context)
except ValueError as e:
logger.warning("Failed to load index from storage: %s", e)
if not nodes:
queue_llm_index_update_if_needed(
rebuild=vector_store_file_exists(),
reason="LLM index missing or invalid while loading.",
)
logger.info("No nodes provided for index creation.")
raise
return VectorStoreIndex(
nodes=nodes,
storage_context=storage_context,
embed_model=embed_model,
)
def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex"):
"""
Removes existing documents from docstore for a given document from the index.
This is necessary because FAISS IndexFlatL2 is append-only.
"""
all_node_ids = list(index.docstore.docs.keys())
existing_nodes = [
node.node_id
for node in index.docstore.get_nodes(all_node_ids)
if node.metadata.get("document_id") == str(document.id)
]
for node_id in existing_nodes:
# Delete from docstore, FAISS IndexFlatL2 are append-only
index.docstore.delete_document(node_id)
# Also purge the FAISS position -> UUID mapping so subsequent similarity
# queries don't raise KeyError on ghost vector positions.
stale_keys = [
k for k, v in index.index_struct.nodes_dict.items() if v == node_id
]
for key in stale_keys:
del index.index_struct.nodes_dict[key]
# Re-sync the mutated index_struct so persist() writes the updated nodes_dict.
index.storage_context.index_store.add_index_struct(index.index_struct)
def vector_store_file_exists():
"""
Check if the vector store file exists in the LLM index directory.
"""
return Path(settings.LLM_INDEX_DIR / "default__vector_store.json").exists()
def get_rag_chunk_size() -> int:
return AIConfig().llm_embedding_chunk_size
def get_rag_context_size() -> int:
return AIConfig().llm_context_size
def get_rag_chunk_overlap(chunk_size: int | None = None) -> int:
chunk_size = chunk_size or get_rag_chunk_size()
return min(RAG_CHUNK_OVERLAP, chunk_size - 1)
def get_rag_prompt_helper(
*,
chunk_size: int | None = None,
context_size: int | None = None,
):
from llama_index.core.indices.prompt_helper import PromptHelper
if chunk_size is None or context_size is None:
config = AIConfig()
chunk_size = chunk_size or config.llm_embedding_chunk_size
context_size = context_size or config.llm_context_size
return PromptHelper(
context_window=context_size,
num_output=RAG_NUM_OUTPUT,
chunk_overlap_ratio=0.1,
chunk_size_limit=chunk_size,
)
def update_llm_index(
*,
iter_wrapper: IterWrapper[Document] = identity,
rebuild=False,
) -> str:
"""
Rebuild or update the LLM index.
"""
from llama_index.core import VectorStoreIndex
nodes = []
documents = Document.objects.all()
if not documents.exists():
logger.warning("No documents found to index.")
if not rebuild and not vector_store_file_exists():
return "No documents found to index."
config = AIConfig()
chunk_size = config.llm_embedding_chunk_size
with FileLock(_index_lock_path()):
if rebuild or not vector_store_file_exists():
# remove meta.json to force re-detection of embedding dim
(settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True)
# Rebuild index from scratch
logger.info("Rebuilding LLM index.")
import llama_index.core.settings as llama_settings
embed_model = get_embedding_model()
llama_settings.Settings.embed_model = embed_model
storage_context = get_or_create_storage_context(rebuild=True)
for document in iter_wrapper(documents):
document_nodes = build_document_node(document, chunk_size=chunk_size)
nodes.extend(document_nodes)
index = VectorStoreIndex(
nodes=nodes,
storage_context=storage_context,
embed_model=embed_model,
show_progress=False,
)
msg = "LLM index rebuilt successfully."
else:
# Update existing index
index = load_or_build_index()
existing_nodes: defaultdict[str, list] = defaultdict(list)
for node in index.docstore.docs.values():
doc_id = node.metadata.get("document_id")
if doc_id is not None:
existing_nodes[doc_id].append(node)
for document in iter_wrapper(documents):
doc_id = str(document.id)
document_modified = document.modified.isoformat()
if doc_id in existing_nodes:
doc_nodes = existing_nodes[doc_id]
node_modified = doc_nodes[0].metadata.get("modified")
if node_modified == document_modified:
continue
# Delete from docstore, FAISS IndexFlatL2 are append-only
for node in doc_nodes:
remove_document_docstore_nodes(document, index)
nodes.extend(build_document_node(document, chunk_size=chunk_size))
if nodes:
msg = "LLM index updated successfully."
logger.info(
"Updating %d nodes in LLM index.",
len(nodes),
)
index.insert_nodes(nodes)
else:
msg = "No changes detected in LLM index."
logger.info(msg)
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
return msg
def llm_index_add_or_update_document(document: Document):
"""
Adds or updates a document in the LLM index.
If the document already exists, it will be replaced.
"""
new_nodes = build_document_node(document, chunk_size=get_rag_chunk_size())
if not new_nodes:
logger.warning(
"No indexable content for document %s; skipping LLM index update.",
document.pk,
)
return
with FileLock(_index_lock_path()):
index = load_or_build_index(nodes=new_nodes)
remove_document_docstore_nodes(document, index)
index.insert_nodes(new_nodes)
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
def llm_index_remove_document(document: Document):
"""
Removes a document from the LLM index.
"""
with FileLock(_index_lock_path()):
index = load_or_build_index()
remove_document_docstore_nodes(document, index)
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
def truncate_content(
content: str,
*,
chunk_size: int | None = None,
context_size: int | None = None,
) -> str:
from llama_index.core.prompts import PromptTemplate
from llama_index.core.text_splitter import TokenTextSplitter
if chunk_size is None or context_size is None:
config = AIConfig()
chunk_size = chunk_size or config.llm_embedding_chunk_size
context_size = context_size or config.llm_context_size
prompt_helper = get_rag_prompt_helper(
chunk_size=chunk_size,
context_size=context_size,
)
splitter = TokenTextSplitter(
separator=" ",
chunk_size=chunk_size,
chunk_overlap=get_rag_chunk_overlap(chunk_size),
)
content_chunks = splitter.split_text(content)
truncated_chunks = prompt_helper.truncate(
prompt=PromptTemplate(template="{content}"),
text_chunks=content_chunks,
padding=5,
)
return " ".join(truncated_chunks)
def normalize_document_ids(document_ids: Iterable[int | str] | None) -> set[str] | None:
if document_ids is None:
return None
return {str(document_id) for document_id in document_ids}
def query_similar_documents(
document: Document,
top_k: int = 5,
document_ids: Iterable[int | str] | None = None,
) -> list[Document]:
"""
Runs a similarity query and returns top-k similar Document objects.
"""
allowed_document_ids = normalize_document_ids(document_ids)
if allowed_document_ids is not None and not allowed_document_ids:
return []
if not vector_store_file_exists():
queue_llm_index_update_if_needed(
rebuild=False,
reason="LLM index not found for similarity query.",
)
return []
with FileLock(_index_lock_path()):
index = load_or_build_index()
# constrain only the node(s) that match the document IDs, if given
doc_node_ids = (
[
node.node_id
for node in index.docstore.docs.values()
if node.metadata.get("document_id") in allowed_document_ids
]
if allowed_document_ids is not None
else None
)
if doc_node_ids is not None and not doc_node_ids:
return []
from llama_index.core.retrievers import VectorIndexRetriever
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=top_k,
doc_ids=doc_node_ids,
)
config = AIConfig()
query_text = truncate_content(
(document.title or "") + "\n" + (document.content or ""),
chunk_size=config.llm_embedding_chunk_size,
context_size=config.llm_context_size,
)
try:
results = retriever.retrieve(query_text)
except KeyError as e:
# Ghost FAISS positions remain after deletion because IndexFlatL2 is
# append-only. Treat them as absent and return no results.
logger.debug(
"Skipping LLM similarity query for document %s due to a stale "
"FAISS position with no docstore node: %s",
document.pk,
e,
)
return []
retrieved_document_ids: list[int] = []
for node in results:
document_id = node.metadata.get("document_id")
if document_id is None:
continue
normalized_document_id = str(document_id)
if (
allowed_document_ids is not None
and normalized_document_id not in allowed_document_ids
):
continue
try:
retrieved_document_ids.append(int(normalized_document_id))
except ValueError:
logger.warning(
"Skipping LLM index result with invalid document_id %r.",
document_id,
)
return list(Document.objects.filter(pk__in=retrieved_document_ids))