mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-06-06 21:59:46 +00:00
refactor(ai): cleanup pass — naming, batched embedding, remove dead wrappers
- Rename vector_store_file_exists -> llm_index_exists (accurate now) - Rename _iter_existing_modified -> _stored_modified_times; project away vector column (cheap scan) and return dict[doc_id, modified_str] directly - Drop _index_lock_path() indirection; inline settings.LLM_INDEX_LOCK - Move LLM_INDEX_LOCK alongside the index dir (drop_table is safe; no rmtree) - Drop current_embedding_dim() redirect; callers use get_embedding_dim() - Drop lazy-import explanatory comments (constraint lives in CLAUDE.md) - Batch embedding calls via get_text_embedding_batch() in all three loops - get_nodes: raise NotImplementedError for node_ids (was silently ignored) - has_nodes(): cheap limit(1) existence check; chat.py uses it instead of get_nodes() which materialized all matching rows - conftest: use mocker fixture (pytest-mock) instead of bare patch; add LLM_INDEX_LOCK to temp_llm_index_dir override; type-annotate mock_embed_model Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -97,8 +97,7 @@ MODEL_FILE = get_path_from_env(
|
||||
DATA_DIR / "classification_model.pickle",
|
||||
)
|
||||
LLM_INDEX_DIR = DATA_DIR / "llm_index"
|
||||
LLM_INDEX_LOCK = DATA_DIR / "locks" / "llm_index.lock"
|
||||
(DATA_DIR / "locks").mkdir(parents=True, exist_ok=True)
|
||||
LLM_INDEX_LOCK = LLM_INDEX_DIR / "index.lock"
|
||||
|
||||
LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ def _stream_chat_with_documents(query_str: str, documents: list[Document]):
|
||||
)
|
||||
|
||||
# No indexed content for these documents -> bail early (before touching the LLM).
|
||||
if not index.vector_store.get_nodes(filters=filters):
|
||||
if not index.vector_store.has_nodes(filters=filters):
|
||||
logger.warning("No nodes found for the given documents.")
|
||||
yield CHAT_NO_CONTENT_MESSAGE
|
||||
return
|
||||
|
||||
@@ -132,11 +132,6 @@ def get_embedding_dim() -> int:
|
||||
return dim
|
||||
|
||||
|
||||
def current_embedding_dim() -> int:
|
||||
"""Embedding dimension for the configured model (probes if not cached)."""
|
||||
return get_embedding_dim()
|
||||
|
||||
|
||||
def _normalize_llm_index_text(text: str) -> str:
|
||||
text = OCR_LEADER_REGEX.sub(" ", text)
|
||||
return HORIZONTAL_WHITESPACE_REGEX.sub(" ", text)
|
||||
|
||||
@@ -2,7 +2,6 @@ import json
|
||||
import logging
|
||||
from collections.abc import Iterable
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from django.conf import settings
|
||||
@@ -31,21 +30,11 @@ RAG_NUM_OUTPUT = 512
|
||||
RAG_CHUNK_OVERLAP = 200
|
||||
|
||||
|
||||
def _index_lock_path() -> Path:
|
||||
"""Return the path used as the file lock for LanceDB index mutations.
|
||||
|
||||
The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a
|
||||
rebuild — which calls store.drop_table() — cannot interfere with another
|
||||
worker that still holds the lock.
|
||||
"""
|
||||
return settings.LLM_INDEX_LOCK
|
||||
|
||||
|
||||
def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool:
|
||||
# NOTE: The check-then-enqueue sequence below is non-atomic (TOCTOU): two
|
||||
# concurrent workers can both observe no running task and both enqueue a
|
||||
# full rebuild. This is wasteful but not data-corrupting — update_llm_index
|
||||
# is itself protected by _index_lock_path(), so only one rebuild runs at a
|
||||
# is itself protected by settings.LLM_INDEX_LOCK, so only one rebuild runs at a
|
||||
# time and the second one is serialised after the first completes.
|
||||
from documents.tasks import llmindex_index
|
||||
|
||||
@@ -73,11 +62,6 @@ def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool:
|
||||
|
||||
|
||||
def get_vector_store() -> "PaperlessLanceVectorStore":
|
||||
"""Open (or lazily create) the LanceDB-backed vector store.
|
||||
|
||||
Imports ``vector_store`` lazily so that importing ``indexing`` (which
|
||||
``documents.tasks`` does at module top) never drags in lancedb/llama_index.
|
||||
"""
|
||||
from paperless_ai.vector_store import PaperlessLanceVectorStore
|
||||
|
||||
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
|
||||
@@ -132,11 +116,10 @@ def build_document_node(
|
||||
|
||||
|
||||
def load_or_build_index(nodes=None):
|
||||
"""Load the VectorStoreIndex backed by the LanceDB store.
|
||||
"""Return a VectorStoreIndex backed by the vector store.
|
||||
|
||||
With ``stores_text=True`` the index runs off the vector store alone — no
|
||||
docstore or index store. ``nodes`` is accepted for signature compatibility
|
||||
but unused; the store is the source of truth.
|
||||
``nodes`` is accepted for signature compatibility but unused; the store
|
||||
is the source of truth.
|
||||
"""
|
||||
import llama_index.core.settings as llama_settings
|
||||
from llama_index.core import VectorStoreIndex
|
||||
@@ -150,8 +133,8 @@ def load_or_build_index(nodes=None):
|
||||
)
|
||||
|
||||
|
||||
def vector_store_file_exists() -> bool:
|
||||
"""True when the LanceDB table exists."""
|
||||
def llm_index_exists() -> bool:
|
||||
"""True when the index table exists on disk."""
|
||||
return get_vector_store().table_exists()
|
||||
|
||||
|
||||
@@ -161,9 +144,9 @@ def embedding_dim_mismatch() -> bool:
|
||||
stored = store.vector_dim()
|
||||
if stored is None:
|
||||
return False
|
||||
from paperless_ai.embedding import current_embedding_dim
|
||||
from paperless_ai.embedding import get_embedding_dim
|
||||
|
||||
return stored != current_embedding_dim()
|
||||
return stored != get_embedding_dim()
|
||||
|
||||
|
||||
def get_rag_chunk_size() -> int:
|
||||
@@ -199,14 +182,28 @@ def get_rag_prompt_helper(
|
||||
)
|
||||
|
||||
|
||||
def _iter_existing_modified(store: "PaperlessLanceVectorStore") -> list[dict]:
|
||||
"""One representative row per document_id, for modified-time comparison."""
|
||||
def _stored_modified_times(store: "PaperlessLanceVectorStore") -> dict[str, str]:
|
||||
"""Return {document_id: stored_modified_isoformat} for incremental update.
|
||||
|
||||
One representative chunk per document is enough to read the stored
|
||||
``modified`` timestamp (all chunks for a document share the same value).
|
||||
Only the two scalar columns needed for comparison are fetched; the vector
|
||||
column is excluded to avoid unnecessary deserialization.
|
||||
"""
|
||||
if not store.table_exists():
|
||||
return []
|
||||
seen: dict[str, dict] = {}
|
||||
for row in store.client.open_table(LLM_INDEX_TABLE).search().to_list():
|
||||
seen.setdefault(str(row["document_id"]), row)
|
||||
return list(seen.values())
|
||||
return {}
|
||||
result: dict[str, str] = {}
|
||||
for row in (
|
||||
store.client.open_table(LLM_INDEX_TABLE)
|
||||
.search()
|
||||
.select(["document_id", "node_content"])
|
||||
.to_list()
|
||||
):
|
||||
doc_id = str(row["document_id"])
|
||||
if doc_id not in result:
|
||||
meta = json.loads(row["node_content"])
|
||||
result[doc_id] = meta.get("modified", "")
|
||||
return result
|
||||
|
||||
|
||||
def get_llm_index_compaction_retention() -> int:
|
||||
@@ -222,52 +219,52 @@ def update_llm_index(
|
||||
"""Rebuild or incrementally update the LLM index."""
|
||||
from llama_index.core.schema import MetadataMode
|
||||
|
||||
if not rebuild and vector_store_file_exists() and embedding_dim_mismatch():
|
||||
if not rebuild and llm_index_exists() and embedding_dim_mismatch():
|
||||
logger.warning("Embedding dimension changed; forcing LLM index rebuild.")
|
||||
rebuild = True
|
||||
|
||||
documents = Document.objects.all()
|
||||
if not documents.exists():
|
||||
logger.warning("No documents found to index.")
|
||||
if not rebuild and not vector_store_file_exists():
|
||||
if not rebuild and not llm_index_exists():
|
||||
return "No documents found to index."
|
||||
|
||||
chunk_size = AIConfig().llm_embedding_chunk_size
|
||||
embed_model = get_embedding_model()
|
||||
|
||||
with FileLock(_index_lock_path()):
|
||||
if rebuild or not vector_store_file_exists():
|
||||
with FileLock(settings.LLM_INDEX_LOCK):
|
||||
if rebuild or not llm_index_exists():
|
||||
(settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True)
|
||||
logger.info("Rebuilding LLM index.")
|
||||
store = get_vector_store()
|
||||
store.drop_table()
|
||||
for document in iter_wrapper(documents):
|
||||
nodes = build_document_node(document, chunk_size=chunk_size)
|
||||
for node in nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in nodes]
|
||||
for node, emb in zip(
|
||||
nodes,
|
||||
embed_model.get_text_embedding_batch(texts),
|
||||
strict=True,
|
||||
):
|
||||
node.embedding = emb
|
||||
store.add(nodes)
|
||||
msg = "LLM index rebuilt successfully."
|
||||
else:
|
||||
store = get_vector_store()
|
||||
existing = {
|
||||
str(row["document_id"]): json.loads(row["node_content"])
|
||||
for row in _iter_existing_modified(store)
|
||||
}
|
||||
existing = _stored_modified_times(store)
|
||||
changed = 0
|
||||
for document in iter_wrapper(documents):
|
||||
doc_id = str(document.id)
|
||||
node_meta = existing.get(doc_id)
|
||||
if node_meta is not None:
|
||||
stored_modified = node_meta.get("modified")
|
||||
if stored_modified == document.modified.isoformat():
|
||||
continue
|
||||
if existing.get(doc_id) == document.modified.isoformat():
|
||||
continue
|
||||
nodes = build_document_node(document, chunk_size=chunk_size)
|
||||
for node in nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in nodes]
|
||||
for node, emb in zip(
|
||||
nodes,
|
||||
embed_model.get_text_embedding_batch(texts),
|
||||
strict=True,
|
||||
):
|
||||
node.embedding = emb
|
||||
store.upsert_document(doc_id, nodes)
|
||||
changed += 1
|
||||
msg = (
|
||||
@@ -283,18 +280,21 @@ def update_llm_index(
|
||||
|
||||
|
||||
def llm_index_add_or_update_document(document: Document):
|
||||
"""Add or atomically replace a document's chunks in the LLM index."""
|
||||
"""Add or atomically replace a document's chunks in the index."""
|
||||
from llama_index.core.schema import MetadataMode
|
||||
|
||||
new_nodes = build_document_node(document, chunk_size=get_rag_chunk_size())
|
||||
if new_nodes:
|
||||
embed_model = get_embedding_model()
|
||||
texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in new_nodes]
|
||||
for node, emb in zip(
|
||||
new_nodes,
|
||||
embed_model.get_text_embedding_batch(texts),
|
||||
strict=True,
|
||||
):
|
||||
node.embedding = emb
|
||||
|
||||
embed_model = get_embedding_model()
|
||||
for node in new_nodes:
|
||||
node.embedding = embed_model.get_text_embedding(
|
||||
node.get_content(metadata_mode=MetadataMode.EMBED),
|
||||
)
|
||||
|
||||
with FileLock(_index_lock_path()):
|
||||
with FileLock(settings.LLM_INDEX_LOCK):
|
||||
store = get_vector_store()
|
||||
store.upsert_document(str(document.id), new_nodes)
|
||||
store.ensure_document_id_scalar_index()
|
||||
@@ -302,7 +302,7 @@ def llm_index_add_or_update_document(document: Document):
|
||||
|
||||
def llm_index_remove_document(document: Document):
|
||||
"""Remove a document's chunks from the LLM index."""
|
||||
with FileLock(_index_lock_path()):
|
||||
with FileLock(settings.LLM_INDEX_LOCK):
|
||||
store = get_vector_store()
|
||||
store.delete(str(document.id))
|
||||
|
||||
@@ -354,7 +354,7 @@ def query_similar_documents(
|
||||
if allowed_document_ids is not None and not allowed_document_ids:
|
||||
return []
|
||||
|
||||
if not vector_store_file_exists():
|
||||
if not llm_index_exists():
|
||||
queue_llm_index_update_if_needed(
|
||||
rebuild=False,
|
||||
reason="LLM index not found for similarity query.",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import pytest_mock
|
||||
from llama_index.core.base.embeddings.base import BaseEmbedding
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
|
||||
@@ -9,6 +9,7 @@ from pytest_django.fixtures import SettingsWrapper
|
||||
@pytest.fixture
|
||||
def temp_llm_index_dir(tmp_path: Path, settings: SettingsWrapper) -> Path:
|
||||
settings.LLM_INDEX_DIR = tmp_path
|
||||
settings.LLM_INDEX_LOCK = tmp_path / "index.lock"
|
||||
return tmp_path
|
||||
|
||||
|
||||
@@ -27,14 +28,8 @@ class FakeEmbedding(BaseEmbedding):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_embed_model():
|
||||
def mock_embed_model(mocker: pytest_mock.MockerFixture) -> pytest_mock.MockType:
|
||||
fake = FakeEmbedding()
|
||||
with (
|
||||
patch("paperless_ai.indexing.get_embedding_model") as mock_index,
|
||||
patch(
|
||||
"paperless_ai.embedding.get_embedding_model",
|
||||
) as mock_embedding,
|
||||
):
|
||||
mock_index.return_value = fake
|
||||
mock_embedding.return_value = fake
|
||||
yield mock_index
|
||||
mocker.patch("paperless_ai.indexing.get_embedding_model", return_value=fake)
|
||||
mocker.patch("paperless_ai.embedding.get_embedding_model", return_value=fake)
|
||||
return fake
|
||||
|
||||
@@ -287,7 +287,7 @@ def test_query_similar_documents(
|
||||
with (
|
||||
patch("paperless_ai.indexing.load_or_build_index") as mock_load_or_build_index,
|
||||
patch(
|
||||
"paperless_ai.indexing.vector_store_file_exists",
|
||||
"paperless_ai.indexing.llm_index_exists",
|
||||
) as mock_vector_store_exists,
|
||||
patch("llama_index.core.retrievers.VectorIndexRetriever") as mock_retriever_cls,
|
||||
patch("paperless_ai.indexing.Document.objects.filter") as mock_filter,
|
||||
@@ -330,7 +330,7 @@ def test_query_similar_documents_triggers_update_when_index_missing(
|
||||
) -> None:
|
||||
with (
|
||||
patch(
|
||||
"paperless_ai.indexing.vector_store_file_exists",
|
||||
"paperless_ai.indexing.llm_index_exists",
|
||||
return_value=False,
|
||||
),
|
||||
patch(
|
||||
@@ -357,7 +357,7 @@ def test_query_similar_documents_empty_allow_list_fails_closed(
|
||||
) -> None:
|
||||
with (
|
||||
patch(
|
||||
"paperless_ai.indexing.vector_store_file_exists",
|
||||
"paperless_ai.indexing.llm_index_exists",
|
||||
return_value=True,
|
||||
) as mock_vector_store_exists,
|
||||
patch("paperless_ai.indexing.load_or_build_index") as mock_load_or_build_index,
|
||||
|
||||
@@ -194,6 +194,11 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
|
||||
filters: MetadataFilters | None = None,
|
||||
**kwargs: Any,
|
||||
) -> list[BaseNode]:
|
||||
if node_ids is not None:
|
||||
# node_ids lookup is not implemented; see class docstring.
|
||||
raise NotImplementedError(
|
||||
"PaperlessLanceVectorStore does not support node_ids lookup",
|
||||
)
|
||||
if self._table is None:
|
||||
return []
|
||||
where = _build_where(filters)
|
||||
@@ -202,6 +207,16 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
|
||||
query = query.where(where)
|
||||
return self._rows_to_nodes(query.to_list())
|
||||
|
||||
def has_nodes(self, filters: MetadataFilters | None = None) -> bool:
|
||||
"""Return True if at least one matching node exists (cheap existence check)."""
|
||||
if self._table is None:
|
||||
return False
|
||||
where = _build_where(filters)
|
||||
query = self._table.search()
|
||||
if where:
|
||||
query = query.where(where)
|
||||
return len(query.limit(1).to_list()) > 0
|
||||
|
||||
def query(
|
||||
self,
|
||||
query: VectorStoreQuery,
|
||||
|
||||
Reference in New Issue
Block a user