mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-06-06 13:49:44 +00:00
refactor(ai): build the index from the LanceDB store alone (lazy import)
Replace get_or_create_storage_context with get_vector_store() (lazy import of paperless_ai.vector_store inside the function), rewrite load_or_build_index to use VectorStoreIndex.from_vector_store, and rewrite vector_store_file_exists to use store.table_exists(). Add LLM_INDEX_TABLE constant and TYPE_CHECKING-only import of PaperlessLanceVectorStore. Delete remove_document_docstore_nodes and rewire llm_index_add_or_update_document, llm_index_remove_document, and update_llm_index to use upsert_document/delete/drop_table on the LanceDB store. Serialize tags list as JSON string to satisfy flat_metadata validation. Add test_get_vector_store_roundtrip, test_add_then_remove_document, test_update_shrinks_chunks_without_orphans, and the subprocess lazy-import guard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+95
-172
@@ -1,6 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
from collections.abc import Iterable
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
@@ -16,16 +15,18 @@ from documents.utils import IterWrapper
|
||||
from documents.utils import identity
|
||||
from paperless.config import AIConfig
|
||||
from paperless_ai.embedding import build_llm_index_text
|
||||
from paperless_ai.embedding import get_embedding_dim
|
||||
from paperless_ai.embedding import get_embedding_model
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from llama_index.core import VectorStoreIndex
|
||||
from llama_index.core.schema import BaseNode
|
||||
|
||||
from paperless_ai.vector_store import PaperlessLanceVectorStore
|
||||
|
||||
|
||||
logger = logging.getLogger("paperless_ai.indexing")
|
||||
|
||||
LLM_INDEX_TABLE = "documents"
|
||||
|
||||
RAG_NUM_OUTPUT = 512
|
||||
RAG_CHUNK_OVERLAP = 200
|
||||
|
||||
@@ -71,43 +72,18 @@ def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def get_or_create_storage_context(*, rebuild=False):
|
||||
def get_vector_store() -> "PaperlessLanceVectorStore":
|
||||
"""Open (or lazily create) the LanceDB-backed vector store.
|
||||
|
||||
Imports ``vector_store`` lazily so that importing ``indexing`` (which
|
||||
``documents.tasks`` does at module top) never drags in lancedb/llama_index.
|
||||
"""
|
||||
Loads or creates the StorageContext (vector store, docstore, index store).
|
||||
If rebuild=True, deletes and recreates everything.
|
||||
"""
|
||||
if rebuild:
|
||||
shutil.rmtree(settings.LLM_INDEX_DIR, ignore_errors=True)
|
||||
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
|
||||
from paperless_ai.vector_store import PaperlessLanceVectorStore
|
||||
|
||||
if rebuild or not settings.LLM_INDEX_DIR.exists():
|
||||
import faiss
|
||||
from llama_index.core import StorageContext
|
||||
from llama_index.core.storage.docstore import SimpleDocumentStore
|
||||
from llama_index.core.storage.index_store import SimpleIndexStore
|
||||
from llama_index.vector_stores.faiss import FaissVectorStore
|
||||
|
||||
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
|
||||
embedding_dim = get_embedding_dim()
|
||||
faiss_index = faiss.IndexFlatL2(embedding_dim)
|
||||
vector_store = FaissVectorStore(faiss_index=faiss_index)
|
||||
docstore = SimpleDocumentStore()
|
||||
index_store = SimpleIndexStore()
|
||||
else:
|
||||
from llama_index.core import StorageContext
|
||||
from llama_index.core.storage.docstore import SimpleDocumentStore
|
||||
from llama_index.core.storage.index_store import SimpleIndexStore
|
||||
from llama_index.vector_stores.faiss import FaissVectorStore
|
||||
|
||||
vector_store = FaissVectorStore.from_persist_dir(settings.LLM_INDEX_DIR)
|
||||
docstore = SimpleDocumentStore.from_persist_dir(settings.LLM_INDEX_DIR)
|
||||
index_store = SimpleIndexStore.from_persist_dir(settings.LLM_INDEX_DIR)
|
||||
|
||||
return StorageContext.from_defaults(
|
||||
docstore=docstore,
|
||||
index_store=index_store,
|
||||
vector_store=vector_store,
|
||||
persist_dir=settings.LLM_INDEX_DIR,
|
||||
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return PaperlessLanceVectorStore(
|
||||
uri=str(settings.LLM_INDEX_DIR),
|
||||
table_name=LLM_INDEX_TABLE,
|
||||
)
|
||||
|
||||
|
||||
@@ -123,7 +99,7 @@ def build_document_node(
|
||||
metadata = {
|
||||
"document_id": str(document.id),
|
||||
"title": document.title,
|
||||
"tags": [t.name for t in document.tags.all()],
|
||||
"tags": json.dumps([t.name for t in document.tags.all()]),
|
||||
"correspondent": document.correspondent.name
|
||||
if document.correspondent
|
||||
else None,
|
||||
@@ -156,65 +132,27 @@ def build_document_node(
|
||||
|
||||
|
||||
def load_or_build_index(nodes=None):
|
||||
"""
|
||||
Load an existing VectorStoreIndex if present,
|
||||
or build a new one using provided nodes if storage is empty.
|
||||
"""Load the VectorStoreIndex backed by the LanceDB store.
|
||||
|
||||
With ``stores_text=True`` the index runs off the vector store alone — no
|
||||
docstore or index store. ``nodes`` is accepted for signature compatibility
|
||||
but unused; the store is the source of truth.
|
||||
"""
|
||||
import llama_index.core.settings as llama_settings
|
||||
from llama_index.core import VectorStoreIndex
|
||||
from llama_index.core import load_index_from_storage
|
||||
|
||||
embed_model = get_embedding_model()
|
||||
llama_settings.Settings.embed_model = embed_model
|
||||
storage_context = get_or_create_storage_context()
|
||||
try:
|
||||
return load_index_from_storage(storage_context=storage_context)
|
||||
except ValueError as e:
|
||||
logger.warning("Failed to load index from storage: %s", e)
|
||||
if not nodes:
|
||||
queue_llm_index_update_if_needed(
|
||||
rebuild=vector_store_file_exists(),
|
||||
reason="LLM index missing or invalid while loading.",
|
||||
)
|
||||
logger.info("No nodes provided for index creation.")
|
||||
raise
|
||||
return VectorStoreIndex(
|
||||
nodes=nodes,
|
||||
storage_context=storage_context,
|
||||
embed_model=embed_model,
|
||||
)
|
||||
vector_store = get_vector_store()
|
||||
return VectorStoreIndex.from_vector_store(
|
||||
vector_store=vector_store,
|
||||
embed_model=embed_model,
|
||||
)
|
||||
|
||||
|
||||
def remove_document_docstore_nodes(document: Document, index: "VectorStoreIndex"):
|
||||
"""
|
||||
Removes existing documents from docstore for a given document from the index.
|
||||
This is necessary because FAISS IndexFlatL2 is append-only.
|
||||
"""
|
||||
all_node_ids = list(index.docstore.docs.keys())
|
||||
existing_nodes = [
|
||||
node.node_id
|
||||
for node in index.docstore.get_nodes(all_node_ids)
|
||||
if node.metadata.get("document_id") == str(document.id)
|
||||
]
|
||||
for node_id in existing_nodes:
|
||||
# Delete from docstore, FAISS IndexFlatL2 are append-only
|
||||
index.docstore.delete_document(node_id)
|
||||
# Also purge the FAISS position -> UUID mapping so subsequent similarity
|
||||
# queries don't raise KeyError on ghost vector positions.
|
||||
stale_keys = [
|
||||
k for k, v in index.index_struct.nodes_dict.items() if v == node_id
|
||||
]
|
||||
for key in stale_keys:
|
||||
del index.index_struct.nodes_dict[key]
|
||||
# Re-sync the mutated index_struct so persist() writes the updated nodes_dict.
|
||||
index.storage_context.index_store.add_index_struct(index.index_struct)
|
||||
|
||||
|
||||
def vector_store_file_exists():
|
||||
"""
|
||||
Check if the vector store file exists in the LLM index directory.
|
||||
"""
|
||||
return Path(settings.LLM_INDEX_DIR / "default__vector_store.json").exists()
|
||||
def vector_store_file_exists() -> bool:
|
||||
"""True when the LanceDB table exists."""
|
||||
return get_vector_store().table_exists()
|
||||
|
||||
|
||||
def get_rag_chunk_size() -> int:
|
||||
@@ -250,17 +188,28 @@ def get_rag_prompt_helper(
|
||||
)
|
||||
|
||||
|
||||
def _iter_existing_modified(store: "PaperlessLanceVectorStore") -> list[dict]:
|
||||
"""One representative row per document_id, for modified-time comparison."""
|
||||
if LLM_INDEX_TABLE not in store.client.table_names():
|
||||
return []
|
||||
seen: dict[str, dict] = {}
|
||||
for row in store.client.open_table(LLM_INDEX_TABLE).search().to_list():
|
||||
seen.setdefault(str(row["document_id"]), row)
|
||||
return list(seen.values())
|
||||
|
||||
|
||||
def get_llm_index_compaction_retention() -> int:
|
||||
"""Seconds of MVCC version history to keep during compaction."""
|
||||
return 60 * 60 # 1 hour: safe for in-flight readers, reclaims daily
|
||||
|
||||
|
||||
def update_llm_index(
|
||||
*,
|
||||
iter_wrapper: IterWrapper[Document] = identity,
|
||||
rebuild=False,
|
||||
) -> str:
|
||||
"""
|
||||
Rebuild or update the LLM index.
|
||||
"""
|
||||
from llama_index.core import VectorStoreIndex
|
||||
|
||||
nodes = []
|
||||
"""Rebuild or incrementally update the LLM index."""
|
||||
from llama_index.core.schema import MetadataMode
|
||||
|
||||
documents = Document.objects.all()
|
||||
if not documents.exists():
|
||||
@@ -268,105 +217,79 @@ def update_llm_index(
|
||||
if not rebuild and not vector_store_file_exists():
|
||||
return "No documents found to index."
|
||||
|
||||
config = AIConfig()
|
||||
chunk_size = config.llm_embedding_chunk_size
|
||||
chunk_size = AIConfig().llm_embedding_chunk_size
|
||||
embed_model = get_embedding_model()
|
||||
|
||||
with FileLock(_index_lock_path()):
|
||||
if rebuild or not vector_store_file_exists():
|
||||
# remove meta.json to force re-detection of embedding dim
|
||||
(settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True)
|
||||
# Rebuild index from scratch
|
||||
logger.info("Rebuilding LLM index.")
|
||||
import llama_index.core.settings as llama_settings
|
||||
|
||||
embed_model = get_embedding_model()
|
||||
llama_settings.Settings.embed_model = embed_model
|
||||
storage_context = get_or_create_storage_context(rebuild=True)
|
||||
store = get_vector_store()
|
||||
store.drop_table()
|
||||
for document in iter_wrapper(documents):
|
||||
document_nodes = build_document_node(document, chunk_size=chunk_size)
|
||||
nodes.extend(document_nodes)
|
||||
|
||||
index = VectorStoreIndex(
|
||||
nodes=nodes,
|
||||
storage_context=storage_context,
|
||||
embed_model=embed_model,
|
||||
show_progress=False,
|
||||
)
|
||||
nodes = build_document_node(document, chunk_size=chunk_size)
|
||||
for node in nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
store.add(nodes)
|
||||
msg = "LLM index rebuilt successfully."
|
||||
else:
|
||||
# Update existing index
|
||||
index = load_or_build_index()
|
||||
existing_nodes: defaultdict[str, list] = defaultdict(list)
|
||||
for node in index.docstore.docs.values():
|
||||
doc_id = node.metadata.get("document_id")
|
||||
if doc_id is not None:
|
||||
existing_nodes[doc_id].append(node)
|
||||
|
||||
store = get_vector_store()
|
||||
existing = {
|
||||
str(row["document_id"]): json.loads(row["node_content"])
|
||||
for row in _iter_existing_modified(store)
|
||||
}
|
||||
changed = 0
|
||||
for document in iter_wrapper(documents):
|
||||
doc_id = str(document.id)
|
||||
document_modified = document.modified.isoformat()
|
||||
|
||||
if doc_id in existing_nodes:
|
||||
doc_nodes = existing_nodes[doc_id]
|
||||
node_modified = doc_nodes[0].metadata.get("modified")
|
||||
|
||||
if node_modified == document_modified:
|
||||
node_meta = existing.get(doc_id)
|
||||
if node_meta is not None:
|
||||
stored_modified = node_meta.get("modified")
|
||||
if stored_modified == document.modified.isoformat():
|
||||
continue
|
||||
nodes = build_document_node(document, chunk_size=chunk_size)
|
||||
for node in nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
store.upsert_document(doc_id, nodes)
|
||||
changed += 1
|
||||
msg = (
|
||||
"LLM index updated successfully."
|
||||
if changed
|
||||
else "No changes detected in LLM index."
|
||||
)
|
||||
|
||||
# Delete from docstore, FAISS IndexFlatL2 are append-only
|
||||
for node in doc_nodes:
|
||||
remove_document_docstore_nodes(document, index)
|
||||
|
||||
nodes.extend(build_document_node(document, chunk_size=chunk_size))
|
||||
|
||||
if nodes:
|
||||
msg = "LLM index updated successfully."
|
||||
logger.info(
|
||||
"Updating %d nodes in LLM index.",
|
||||
len(nodes),
|
||||
)
|
||||
index.insert_nodes(nodes)
|
||||
else:
|
||||
msg = "No changes detected in LLM index."
|
||||
logger.info(msg)
|
||||
|
||||
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
|
||||
store.ensure_document_id_scalar_index()
|
||||
store.maybe_create_ann_index()
|
||||
store.compact(retention_seconds=get_llm_index_compaction_retention())
|
||||
return msg
|
||||
|
||||
|
||||
def llm_index_add_or_update_document(document: Document):
|
||||
"""
|
||||
Adds or updates a document in the LLM index.
|
||||
If the document already exists, it will be replaced.
|
||||
"""
|
||||
"""Add or atomically replace a document's chunks in the LLM index."""
|
||||
from llama_index.core.schema import MetadataMode
|
||||
|
||||
new_nodes = build_document_node(document, chunk_size=get_rag_chunk_size())
|
||||
if not new_nodes:
|
||||
logger.warning(
|
||||
"No indexable content for document %s; skipping LLM index update.",
|
||||
document.pk,
|
||||
|
||||
embed_model = get_embedding_model()
|
||||
for node in new_nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
return
|
||||
|
||||
with FileLock(_index_lock_path()):
|
||||
index = load_or_build_index(nodes=new_nodes)
|
||||
|
||||
remove_document_docstore_nodes(document, index)
|
||||
|
||||
index.insert_nodes(new_nodes)
|
||||
|
||||
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
|
||||
store = get_vector_store()
|
||||
store.upsert_document(str(document.id), new_nodes)
|
||||
store.ensure_document_id_scalar_index()
|
||||
|
||||
|
||||
def llm_index_remove_document(document: Document):
|
||||
"""
|
||||
Removes a document from the LLM index.
|
||||
"""
|
||||
"""Remove a document's chunks from the LLM index."""
|
||||
with FileLock(_index_lock_path()):
|
||||
index = load_or_build_index()
|
||||
|
||||
remove_document_docstore_nodes(document, index)
|
||||
|
||||
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
|
||||
store = get_vector_store()
|
||||
store.delete(str(document.id))
|
||||
|
||||
|
||||
def truncate_content(
|
||||
|
||||
@@ -908,3 +908,50 @@ class TestLlmIndexLocking:
|
||||
assert call_order.index("lock_acquired") < call_order.index("index_loaded"), (
|
||||
"Lock must be acquired before the index is loaded"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_vector_store_roundtrip(
|
||||
temp_llm_index_dir,
|
||||
mock_embed_model,
|
||||
) -> None:
|
||||
from paperless_ai.vector_store import PaperlessLanceVectorStore
|
||||
|
||||
store = indexing.get_vector_store()
|
||||
assert isinstance(store, PaperlessLanceVectorStore)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_add_then_remove_document(
|
||||
temp_llm_index_dir,
|
||||
mock_embed_model,
|
||||
real_document,
|
||||
) -> None:
|
||||
indexing.llm_index_add_or_update_document(real_document)
|
||||
store = indexing.get_vector_store()
|
||||
table = store.client.open_table(indexing.LLM_INDEX_TABLE)
|
||||
assert table.count_rows() >= 1
|
||||
|
||||
indexing.llm_index_remove_document(real_document)
|
||||
assert store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows() == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_shrinks_chunks_without_orphans(
|
||||
temp_llm_index_dir,
|
||||
mock_embed_model,
|
||||
real_document,
|
||||
) -> None:
|
||||
real_document.content = "word " * 4000 # many chunks
|
||||
real_document.save()
|
||||
indexing.llm_index_add_or_update_document(real_document)
|
||||
store = indexing.get_vector_store()
|
||||
big = store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows()
|
||||
|
||||
real_document.content = "short" # one chunk
|
||||
real_document.save()
|
||||
indexing.llm_index_add_or_update_document(real_document)
|
||||
|
||||
rows = store.client.open_table(indexing.LLM_INDEX_TABLE).count_rows()
|
||||
assert rows < big
|
||||
assert rows >= 1
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
class TestLazyAiImports:
|
||||
def test_importing_tasks_does_not_load_ai_libraries(self) -> None:
|
||||
code = (
|
||||
"import os, django, sys\n"
|
||||
"os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'paperless.settings')\n"
|
||||
"django.setup()\n"
|
||||
"import documents.tasks # noqa: F401\n"
|
||||
"leaked = [m for m in ('lancedb', 'pyarrow', 'llama_index') "
|
||||
"if m in sys.modules]\n"
|
||||
"assert not leaked, f'AI libraries leaked into the light path: {leaked}'\n"
|
||||
)
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd="src",
|
||||
)
|
||||
assert result.returncode == 0, result.stdout + result.stderr
|
||||
Reference in New Issue
Block a user