From e9c3f04b8bc6ae09d9198dd3f033274140974351 Mon Sep 17 00:00:00 2001 From: stumpylog <797416+stumpylog@users.noreply.github.com> Date: Tue, 2 Jun 2026 15:37:22 -0700 Subject: [PATCH] refactor(ai): build the index from the LanceDB store alone (lazy import) Replace get_or_create_storage_context with get_vector_store() (lazy import of paperless_ai.vector_store inside the function), rewrite load_or_build_index to use VectorStoreIndex.from_vector_store, and rewrite vector_store_file_exists to use store.table_exists(). Add LLM_INDEX_TABLE constant and TYPE_CHECKING-only import of PaperlessLanceVectorStore. Delete remove_document_docstore_nodes and rewire llm_index_add_or_update_document, llm_index_remove_document, and update_llm_index to use upsert_document/delete/drop_table on the LanceDB store. Serialize tags list as JSON string to satisfy flat_metadata validation. Add test_get_vector_store_roundtrip, test_add_then_remove_document, test_update_shrinks_chunks_without_orphans, and the subprocess lazy-import guard. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/paperless_ai/indexing.py | 267 +++++++------------- src/paperless_ai/tests/test_ai_indexing.py | 47 ++++ src/paperless_ai/tests/test_lazy_imports.py | 22 ++ 3 files changed, 164 insertions(+), 172 deletions(-) create mode 100644 src/paperless_ai/tests/test_lazy_imports.py diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index cf68ebc53..9252ad852 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -1,6 +1,5 @@ +import json import logging -import shutil -from collections import defaultdict from collections.abc import Iterable from datetime import timedelta from pathlib import Path @@ -16,16 +15,18 @@ from documents.utils import IterWrapper from documents.utils import identity from paperless.config import AIConfig from paperless_ai.embedding import build_llm_index_text -from paperless_ai.embedding import get_embedding_dim from paperless_ai.embedding import get_embedding_model if TYPE_CHECKING: - from llama_index.core import VectorStoreIndex from llama_index.core.schema import BaseNode + from paperless_ai.vector_store import PaperlessLanceVectorStore + logger = logging.getLogger("paperless_ai.indexing") +LLM_INDEX_TABLE = "documents" + RAG_NUM_OUTPUT = 512 RAG_CHUNK_OVERLAP = 200 @@ -71,43 +72,18 @@ def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool: return True -def get_or_create_storage_context(*, rebuild=False): +def get_vector_store() -> "PaperlessLanceVectorStore": + """Open (or lazily create) the LanceDB-backed vector store. + + Imports ``vector_store`` lazily so that importing ``indexing`` (which + ``documents.tasks`` does at module top) never drags in lancedb/llama_index. """ - Loads or creates the StorageContext (vector store, docstore, index store). - If rebuild=True, deletes and recreates everything. - """ - if rebuild: - shutil.rmtree(settings.LLM_INDEX_DIR, ignore_errors=True) - settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True) + from paperless_ai.vector_store import PaperlessLanceVectorStore - if rebuild or not settings.LLM_INDEX_DIR.exists(): - import faiss - from llama_index.core import StorageContext - from llama_index.core.storage.docstore import SimpleDocumentStore - from llama_index.core.storage.index_store import SimpleIndexStore - from llama_index.vector_stores.faiss import FaissVectorStore - - settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True) - embedding_dim = get_embedding_dim() - faiss_index = faiss.IndexFlatL2(embedding_dim) - vector_store = FaissVectorStore(faiss_index=faiss_index) - docstore = SimpleDocumentStore() - index_store = SimpleIndexStore() - else: - from llama_index.core import StorageContext - from llama_index.core.storage.docstore import SimpleDocumentStore - from llama_index.core.storage.index_store import SimpleIndexStore - from llama_index.vector_stores.faiss import FaissVectorStore - - vector_store = FaissVectorStore.from_persist_dir(settings.LLM_INDEX_DIR) - docstore = SimpleDocumentStore.from_persist_dir(settings.LLM_INDEX_DIR) - index_store = SimpleIndexStore.from_persist_dir(settings.LLM_INDEX_DIR) - - return StorageContext.from_defaults( - docstore=docstore, - index_store=index_store, - vector_store=vector_store, - persist_dir=settings.LLM_INDEX_DIR, + settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True) + return PaperlessLanceVectorStore( + uri=str(settings.LLM_INDEX_DIR), + table_name=LLM_INDEX_TABLE, ) @@ -123,7 +99,7 @@ def build_document_node( metadata = { "document_id": str(document.id), "title": document.title, - "tags": [t.name for t in document.tags.all()], + "tags": json.dumps([t.name for t in document.tags.all()]), "correspondent": document.correspondent.name if document.correspondent else None, @@ -156,65 +132,27 @@ def build_document_node( def load_or_build_index(nodes=None): - """ - Load an existing VectorStoreIndex if present, - or build a new one using provided nodes if storage is empty. + """Load the VectorStoreIndex backed by the LanceDB store. + + With ``stores_text=True`` the index runs off the vector store alone — no + docstore or index store. ``nodes`` is accepted for signature compatibility + but unused; the store is the source of truth. """ import llama_index.core.settings as llama_settings from llama_index.core import VectorStoreIndex - from llama_index.core import load_index_from_storage embed_model = get_embedding_model() llama_settings.Settings.embed_model = embed_model - storage_context = get_or_create_storage_context() - try: - return load_index_from_storage(storage_context=storage_context) - except ValueError as e: - logger.warning("Failed to load index from storage: %s", e) - if not nodes: - queue_llm_index_update_if_needed( - rebuild=vector_store_file_exists(), - reason="LLM index missing or invalid while loading.", - ) - logger.info("No nodes provided for index creation.") - raise - return VectorStoreIndex( - nodes=nodes, - storage_context=storage_context, - embed_model=embed_model, - ) + vector_store = get_vector_store() + return VectorStoreIndex.from_vector_store( + vector_store=vector_store, + embed_model=embed_model, + ) -def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex"): - """ - Removes existing documents from docstore for a given document from the index. - This is necessary because FAISS IndexFlatL2 is append-only. - """ - all_node_ids = list(index.docstore.docs.keys()) - existing_nodes = [ - node.node_id - for node in index.docstore.get_nodes(all_node_ids) - if node.metadata.get("document_id") == str(document.id) - ] - for node_id in existing_nodes: - # Delete from docstore, FAISS IndexFlatL2 are append-only - index.docstore.delete_document(node_id) - # Also purge the FAISS position -> UUID mapping so subsequent similarity - # queries don't raise KeyError on ghost vector positions. - stale_keys = [ - k for k, v in index.index_struct.nodes_dict.items() if v == node_id - ] - for key in stale_keys: - del index.index_struct.nodes_dict[key] - # Re-sync the mutated index_struct so persist() writes the updated nodes_dict. - index.storage_context.index_store.add_index_struct(index.index_struct) - - -def vector_store_file_exists(): - """ - Check if the vector store file exists in the LLM index directory. - """ - return Path(settings.LLM_INDEX_DIR / "default__vector_store.json").exists() +def vector_store_file_exists() -> bool: + """True when the LanceDB table exists.""" + return get_vector_store().table_exists() def get_rag_chunk_size() -> int: @@ -250,17 +188,28 @@ def get_rag_prompt_helper( ) +def _iter_existing_modified(store: "PaperlessLanceVectorStore") -> list[dict]: + """One representative row per document_id, for modified-time comparison.""" + if LLM_INDEX_TABLE not in store.client.table_names(): + return [] + seen: dict[str, dict] = {} + for row in store.client.open_table(LLM_INDEX_TABLE).search().to_list(): + seen.setdefault(str(row["document_id"]), row) + return list(seen.values()) + + +def get_llm_index_compaction_retention() -> int: + """Seconds of MVCC version history to keep during compaction.""" + return 60 * 60 # 1 hour: safe for in-flight readers, reclaims daily + + def update_llm_index( *, iter_wrapper: IterWrapper[Document] = identity, rebuild=False, ) -> str: - """ - Rebuild or update the LLM index. - """ - from llama_index.core import VectorStoreIndex - - nodes = [] + """Rebuild or incrementally update the LLM index.""" + from llama_index.core.schema import MetadataMode documents = Document.objects.all() if not documents.exists(): @@ -268,105 +217,79 @@ def update_llm_index( if not rebuild and not vector_store_file_exists(): return "No documents found to index." - config = AIConfig() - chunk_size = config.llm_embedding_chunk_size + chunk_size = AIConfig().llm_embedding_chunk_size + embed_model = get_embedding_model() with FileLock(_index_lock_path()): if rebuild or not vector_store_file_exists(): - # remove meta.json to force re-detection of embedding dim (settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True) - # Rebuild index from scratch logger.info("Rebuilding LLM index.") - import llama_index.core.settings as llama_settings - - embed_model = get_embedding_model() - llama_settings.Settings.embed_model = embed_model - storage_context = get_or_create_storage_context(rebuild=True) + store = get_vector_store() + store.drop_table() for document in iter_wrapper(documents): - document_nodes = build_document_node(document, chunk_size=chunk_size) - nodes.extend(document_nodes) - - index = VectorStoreIndex( - nodes=nodes, - storage_context=storage_context, - embed_model=embed_model, - show_progress=False, - ) + nodes = build_document_node(document, chunk_size=chunk_size) + for node in nodes: + node.embedding = embed_model.get_text_embedding( + node.get_content(metadata_mode=MetadataMode.EMBED), + ) + store.add(nodes) msg = "LLM index rebuilt successfully." else: - # Update existing index - index = load_or_build_index() - existing_nodes: defaultdict[str, list] = defaultdict(list) - for node in index.docstore.docs.values(): - doc_id = node.metadata.get("document_id") - if doc_id is not None: - existing_nodes[doc_id].append(node) - + store = get_vector_store() + existing = { + str(row["document_id"]): json.loads(row["node_content"]) + for row in _iter_existing_modified(store) + } + changed = 0 for document in iter_wrapper(documents): doc_id = str(document.id) - document_modified = document.modified.isoformat() - - if doc_id in existing_nodes: - doc_nodes = existing_nodes[doc_id] - node_modified = doc_nodes[0].metadata.get("modified") - - if node_modified == document_modified: + node_meta = existing.get(doc_id) + if node_meta is not None: + stored_modified = node_meta.get("modified") + if stored_modified == document.modified.isoformat(): continue + nodes = build_document_node(document, chunk_size=chunk_size) + for node in nodes: + node.embedding = embed_model.get_text_embedding( + node.get_content(metadata_mode=MetadataMode.EMBED), + ) + store.upsert_document(doc_id, nodes) + changed += 1 + msg = ( + "LLM index updated successfully." + if changed + else "No changes detected in LLM index." + ) - # Delete from docstore, FAISS IndexFlatL2 are append-only - for node in doc_nodes: - remove_document_docstore_nodes(document, index) - - nodes.extend(build_document_node(document, chunk_size=chunk_size)) - - if nodes: - msg = "LLM index updated successfully." - logger.info( - "Updating %d nodes in LLM index.", - len(nodes), - ) - index.insert_nodes(nodes) - else: - msg = "No changes detected in LLM index." - logger.info(msg) - - index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR) + store.ensure_document_id_scalar_index() + store.maybe_create_ann_index() + store.compact(retention_seconds=get_llm_index_compaction_retention()) return msg def llm_index_add_or_update_document(document: Document): - """ - Adds or updates a document in the LLM index. - If the document already exists, it will be replaced. - """ + """Add or atomically replace a document's chunks in the LLM index.""" + from llama_index.core.schema import MetadataMode + new_nodes = build_document_node(document, chunk_size=get_rag_chunk_size()) - if not new_nodes: - logger.warning( - "No indexable content for document %s; skipping LLM index update.", - document.pk, + + embed_model = get_embedding_model() + for node in new_nodes: + node.embedding = embed_model.get_text_embedding( + node.get_content(metadata_mode=MetadataMode.EMBED), ) - return with FileLock(_index_lock_path()): - index = load_or_build_index(nodes=new_nodes) - - remove_document_docstore_nodes(document, index) - - index.insert_nodes(new_nodes) - - index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR) + store = get_vector_store() + store.upsert_document(str(document.id), new_nodes) + store.ensure_document_id_scalar_index() def llm_index_remove_document(document: Document): - """ - Removes a document from the LLM index. - """ + """Remove a document's chunks from the LLM index.""" with FileLock(_index_lock_path()): - index = load_or_build_index() - - remove_document_docstore_nodes(document, index) - - index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR) + store = get_vector_store() + store.delete(str(document.id)) def truncate_content( diff --git a/src/paperless_ai/tests/test_ai_indexing.py b/src/paperless_ai/tests/test_ai_indexing.py index c1ddbb7d2..b922b488d 100644 --- a/src/paperless_ai/tests/test_ai_indexing.py +++ b/src/paperless_ai/tests/test_ai_indexing.py @@ -908,3 +908,50 @@ class TestLlmIndexLocking: assert call_order.index("lock_acquired") < call_order.index("index_loaded"), ( "Lock must be acquired before the index is loaded" ) + + +@pytest.mark.django_db +def test_get_vector_store_roundtrip( + temp_llm_index_dir, + mock_embed_model, +) -> None: + from paperless_ai.vector_store import PaperlessLanceVectorStore + + store = indexing.get_vector_store() + assert isinstance(store, PaperlessLanceVectorStore) + + +@pytest.mark.django_db +def test_add_then_remove_document( + temp_llm_index_dir, + mock_embed_model, + real_document, +) -> None: + indexing.llm_index_add_or_update_document(real_document) + store = indexing.get_vector_store() + table = store.client.open_table(indexing.LLM_INDEX_TABLE) + assert table.count_rows() >= 1 + + indexing.llm_index_remove_document(real_document) + assert store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows() == 0 + + +@pytest.mark.django_db +def test_update_shrinks_chunks_without_orphans( + temp_llm_index_dir, + mock_embed_model, + real_document, +) -> None: + real_document.content = "word " * 4000 # many chunks + real_document.save() + indexing.llm_index_add_or_update_document(real_document) + store = indexing.get_vector_store() + big = store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows() + + real_document.content = "short" # one chunk + real_document.save() + indexing.llm_index_add_or_update_document(real_document) + + rows = store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows() + assert rows < big + assert rows >= 1 diff --git a/src/paperless_ai/tests/test_lazy_imports.py b/src/paperless_ai/tests/test_lazy_imports.py new file mode 100644 index 000000000..12089308a --- /dev/null +++ b/src/paperless_ai/tests/test_lazy_imports.py @@ -0,0 +1,22 @@ +import subprocess +import sys + + +class TestLazyAiImports: + def test_importing_tasks_does_not_load_ai_libraries(self) -> None: + code = ( + "import os, django, sys\n" + "os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'paperless.settings')\n" + "django.setup()\n" + "import documents.tasks # noqa: F401\n" + "leaked = [m for m in ('lancedb', 'pyarrow', 'llama_index') " + "if m in sys.modules]\n" + "assert not leaked, f'AI libraries leaked into the light path: {leaked}'\n" + ) + result = subprocess.run( + [sys.executable, "-c", code], + capture_output=True, + text=True, + cwd="src", + ) + assert result.returncode == 0, result.stdout + result.stderr