diff --git a/src/paperless/settings/__init__.py b/src/paperless/settings/__init__.py index 96b2279a7..a7a985a7d 100644 --- a/src/paperless/settings/__init__.py +++ b/src/paperless/settings/__init__.py @@ -97,8 +97,7 @@ MODEL_FILE = get_path_from_env( DATA_DIR / "classification_model.pickle", ) LLM_INDEX_DIR = DATA_DIR / "llm_index" -LLM_INDEX_LOCK = DATA_DIR / "locks" / "llm_index.lock" -(DATA_DIR / "locks").mkdir(parents=True, exist_ok=True) +LLM_INDEX_LOCK = LLM_INDEX_DIR / "index.lock" LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log") diff --git a/src/paperless_ai/chat.py b/src/paperless_ai/chat.py index 63e9267b0..be1da80f6 100644 --- a/src/paperless_ai/chat.py +++ b/src/paperless_ai/chat.py @@ -106,7 +106,7 @@ def _stream_chat_with_documents(query_str: str, documents: list[Document]): ) # No indexed content for these documents -> bail early (before touching the LLM). - if not index.vector_store.get_nodes(filters=filters): + if not index.vector_store.has_nodes(filters=filters): logger.warning("No nodes found for the given documents.") yield CHAT_NO_CONTENT_MESSAGE return diff --git a/src/paperless_ai/embedding.py b/src/paperless_ai/embedding.py index 8524e77bf..2695e9fb3 100644 --- a/src/paperless_ai/embedding.py +++ b/src/paperless_ai/embedding.py @@ -132,11 +132,6 @@ def get_embedding_dim() -> int: return dim -def current_embedding_dim() -> int: - """Embedding dimension for the configured model (probes if not cached).""" - return get_embedding_dim() - - def _normalize_llm_index_text(text: str) -> str: text = OCR_LEADER_REGEX.sub(" ", text) return HORIZONTAL_WHITESPACE_REGEX.sub(" ", text) diff --git a/src/paperless_ai/indexing.py b/src/paperless_ai/indexing.py index cd904f0d1..8c802433e 100644 --- a/src/paperless_ai/indexing.py +++ b/src/paperless_ai/indexing.py @@ -2,7 +2,6 @@ import json import logging from collections.abc import Iterable from datetime import timedelta -from pathlib import Path from typing import TYPE_CHECKING from django.conf import settings @@ -31,21 +30,11 @@ RAG_NUM_OUTPUT = 512 RAG_CHUNK_OVERLAP = 200 -def _index_lock_path() -> Path: - """Return the path used as the file lock for LanceDB index mutations. - - The lock file lives in DATA_DIR/locks/ (not inside LLM_INDEX_DIR) so that a - rebuild — which calls store.drop_table() — cannot interfere with another - worker that still holds the lock. - """ - return settings.LLM_INDEX_LOCK - - def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool: # NOTE: The check-then-enqueue sequence below is non-atomic (TOCTOU): two # concurrent workers can both observe no running task and both enqueue a # full rebuild. This is wasteful but not data-corrupting — update_llm_index - # is itself protected by _index_lock_path(), so only one rebuild runs at a + # is itself protected by settings.LLM_INDEX_LOCK, so only one rebuild runs at a # time and the second one is serialised after the first completes. from documents.tasks import llmindex_index @@ -73,11 +62,6 @@ def queue_llm_index_update_if_needed(*, rebuild: bool, reason: str) -> bool: def get_vector_store() -> "PaperlessLanceVectorStore": - """Open (or lazily create) the LanceDB-backed vector store. - - Imports ``vector_store`` lazily so that importing ``indexing`` (which - ``documents.tasks`` does at module top) never drags in lancedb/llama_index. - """ from paperless_ai.vector_store import PaperlessLanceVectorStore settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True) @@ -132,11 +116,10 @@ def build_document_node( def load_or_build_index(nodes=None): - """Load the VectorStoreIndex backed by the LanceDB store. + """Return a VectorStoreIndex backed by the vector store. - With ``stores_text=True`` the index runs off the vector store alone — no - docstore or index store. ``nodes`` is accepted for signature compatibility - but unused; the store is the source of truth. + ``nodes`` is accepted for signature compatibility but unused; the store + is the source of truth. """ import llama_index.core.settings as llama_settings from llama_index.core import VectorStoreIndex @@ -150,8 +133,8 @@ def load_or_build_index(nodes=None): ) -def vector_store_file_exists() -> bool: - """True when the LanceDB table exists.""" +def llm_index_exists() -> bool: + """True when the index table exists on disk.""" return get_vector_store().table_exists() @@ -161,9 +144,9 @@ def embedding_dim_mismatch() -> bool: stored = store.vector_dim() if stored is None: return False - from paperless_ai.embedding import current_embedding_dim + from paperless_ai.embedding import get_embedding_dim - return stored != current_embedding_dim() + return stored != get_embedding_dim() def get_rag_chunk_size() -> int: @@ -199,14 +182,28 @@ def get_rag_prompt_helper( ) -def _iter_existing_modified(store: "PaperlessLanceVectorStore") -> list[dict]: - """One representative row per document_id, for modified-time comparison.""" +def _stored_modified_times(store: "PaperlessLanceVectorStore") -> dict[str, str]: + """Return {document_id: stored_modified_isoformat} for incremental update. + + One representative chunk per document is enough to read the stored + ``modified`` timestamp (all chunks for a document share the same value). + Only the two scalar columns needed for comparison are fetched; the vector + column is excluded to avoid unnecessary deserialization. + """ if not store.table_exists(): - return [] - seen: dict[str, dict] = {} - for row in store.client.open_table(LLM_INDEX_TABLE).search().to_list(): - seen.setdefault(str(row["document_id"]), row) - return list(seen.values()) + return {} + result: dict[str, str] = {} + for row in ( + store.client.open_table(LLM_INDEX_TABLE) + .search() + .select(["document_id", "node_content"]) + .to_list() + ): + doc_id = str(row["document_id"]) + if doc_id not in result: + meta = json.loads(row["node_content"]) + result[doc_id] = meta.get("modified", "") + return result def get_llm_index_compaction_retention() -> int: @@ -222,52 +219,52 @@ def update_llm_index( """Rebuild or incrementally update the LLM index.""" from llama_index.core.schema import MetadataMode - if not rebuild and vector_store_file_exists() and embedding_dim_mismatch(): + if not rebuild and llm_index_exists() and embedding_dim_mismatch(): logger.warning("Embedding dimension changed; forcing LLM index rebuild.") rebuild = True documents = Document.objects.all() if not documents.exists(): logger.warning("No documents found to index.") - if not rebuild and not vector_store_file_exists(): + if not rebuild and not llm_index_exists(): return "No documents found to index." chunk_size = AIConfig().llm_embedding_chunk_size embed_model = get_embedding_model() - with FileLock(_index_lock_path()): - if rebuild or not vector_store_file_exists(): + with FileLock(settings.LLM_INDEX_LOCK): + if rebuild or not llm_index_exists(): (settings.LLM_INDEX_DIR / "meta.json").unlink(missing_ok=True) logger.info("Rebuilding LLM index.") store = get_vector_store() store.drop_table() for document in iter_wrapper(documents): nodes = build_document_node(document, chunk_size=chunk_size) - for node in nodes: - node.embedding = embed_model.get_text_embedding( - node.get_content(metadata_mode=MetadataMode.EMBED), - ) + texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in nodes] + for node, emb in zip( + nodes, + embed_model.get_text_embedding_batch(texts), + strict=True, + ): + node.embedding = emb store.add(nodes) msg = "LLM index rebuilt successfully." else: store = get_vector_store() - existing = { - str(row["document_id"]): json.loads(row["node_content"]) - for row in _iter_existing_modified(store) - } + existing = _stored_modified_times(store) changed = 0 for document in iter_wrapper(documents): doc_id = str(document.id) - node_meta = existing.get(doc_id) - if node_meta is not None: - stored_modified = node_meta.get("modified") - if stored_modified == document.modified.isoformat(): - continue + if existing.get(doc_id) == document.modified.isoformat(): + continue nodes = build_document_node(document, chunk_size=chunk_size) - for node in nodes: - node.embedding = embed_model.get_text_embedding( - node.get_content(metadata_mode=MetadataMode.EMBED), - ) + texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in nodes] + for node, emb in zip( + nodes, + embed_model.get_text_embedding_batch(texts), + strict=True, + ): + node.embedding = emb store.upsert_document(doc_id, nodes) changed += 1 msg = ( @@ -283,18 +280,21 @@ def update_llm_index( def llm_index_add_or_update_document(document: Document): - """Add or atomically replace a document's chunks in the LLM index.""" + """Add or atomically replace a document's chunks in the index.""" from llama_index.core.schema import MetadataMode new_nodes = build_document_node(document, chunk_size=get_rag_chunk_size()) + if new_nodes: + embed_model = get_embedding_model() + texts = [n.get_content(metadata_mode=MetadataMode.EMBED) for n in new_nodes] + for node, emb in zip( + new_nodes, + embed_model.get_text_embedding_batch(texts), + strict=True, + ): + node.embedding = emb - embed_model = get_embedding_model() - for node in new_nodes: - node.embedding = embed_model.get_text_embedding( - node.get_content(metadata_mode=MetadataMode.EMBED), - ) - - with FileLock(_index_lock_path()): + with FileLock(settings.LLM_INDEX_LOCK): store = get_vector_store() store.upsert_document(str(document.id), new_nodes) store.ensure_document_id_scalar_index() @@ -302,7 +302,7 @@ def llm_index_add_or_update_document(document: Document): def llm_index_remove_document(document: Document): """Remove a document's chunks from the LLM index.""" - with FileLock(_index_lock_path()): + with FileLock(settings.LLM_INDEX_LOCK): store = get_vector_store() store.delete(str(document.id)) @@ -354,7 +354,7 @@ def query_similar_documents( if allowed_document_ids is not None and not allowed_document_ids: return [] - if not vector_store_file_exists(): + if not llm_index_exists(): queue_llm_index_update_if_needed( rebuild=False, reason="LLM index not found for similarity query.", diff --git a/src/paperless_ai/tests/conftest.py b/src/paperless_ai/tests/conftest.py index 6d54adada..6a7abf7ec 100644 --- a/src/paperless_ai/tests/conftest.py +++ b/src/paperless_ai/tests/conftest.py @@ -1,7 +1,7 @@ from pathlib import Path -from unittest.mock import patch import pytest +import pytest_mock from llama_index.core.base.embeddings.base import BaseEmbedding from pytest_django.fixtures import SettingsWrapper @@ -9,6 +9,7 @@ from pytest_django.fixtures import SettingsWrapper @pytest.fixture def temp_llm_index_dir(tmp_path: Path, settings: SettingsWrapper) -> Path: settings.LLM_INDEX_DIR = tmp_path + settings.LLM_INDEX_LOCK = tmp_path / "index.lock" return tmp_path @@ -27,14 +28,8 @@ class FakeEmbedding(BaseEmbedding): @pytest.fixture -def mock_embed_model(): +def mock_embed_model(mocker: pytest_mock.MockerFixture) -> pytest_mock.MockType: fake = FakeEmbedding() - with ( - patch("paperless_ai.indexing.get_embedding_model") as mock_index, - patch( - "paperless_ai.embedding.get_embedding_model", - ) as mock_embedding, - ): - mock_index.return_value = fake - mock_embedding.return_value = fake - yield mock_index + mocker.patch("paperless_ai.indexing.get_embedding_model", return_value=fake) + mocker.patch("paperless_ai.embedding.get_embedding_model", return_value=fake) + return fake diff --git a/src/paperless_ai/tests/test_ai_indexing.py b/src/paperless_ai/tests/test_ai_indexing.py index 503999b2c..1376b0ea5 100644 --- a/src/paperless_ai/tests/test_ai_indexing.py +++ b/src/paperless_ai/tests/test_ai_indexing.py @@ -287,7 +287,7 @@ def test_query_similar_documents( with ( patch("paperless_ai.indexing.load_or_build_index") as mock_load_or_build_index, patch( - "paperless_ai.indexing.vector_store_file_exists", + "paperless_ai.indexing.llm_index_exists", ) as mock_vector_store_exists, patch("llama_index.core.retrievers.VectorIndexRetriever") as mock_retriever_cls, patch("paperless_ai.indexing.Document.objects.filter") as mock_filter, @@ -330,7 +330,7 @@ def test_query_similar_documents_triggers_update_when_index_missing( ) -> None: with ( patch( - "paperless_ai.indexing.vector_store_file_exists", + "paperless_ai.indexing.llm_index_exists", return_value=False, ), patch( @@ -357,7 +357,7 @@ def test_query_similar_documents_empty_allow_list_fails_closed( ) -> None: with ( patch( - "paperless_ai.indexing.vector_store_file_exists", + "paperless_ai.indexing.llm_index_exists", return_value=True, ) as mock_vector_store_exists, patch("paperless_ai.indexing.load_or_build_index") as mock_load_or_build_index, diff --git a/src/paperless_ai/vector_store.py b/src/paperless_ai/vector_store.py index 573d424da..6fe3926ff 100644 --- a/src/paperless_ai/vector_store.py +++ b/src/paperless_ai/vector_store.py @@ -194,6 +194,11 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore): filters: MetadataFilters | None = None, **kwargs: Any, ) -> list[BaseNode]: + if node_ids is not None: + # node_ids lookup is not implemented; see class docstring. + raise NotImplementedError( + "PaperlessLanceVectorStore does not support node_ids lookup", + ) if self._table is None: return [] where = _build_where(filters) @@ -202,6 +207,16 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore): query = query.where(where) return self._rows_to_nodes(query.to_list()) + def has_nodes(self, filters: MetadataFilters | None = None) -> bool: + """Return True if at least one matching node exists (cheap existence check).""" + if self._table is None: + return False + where = _build_where(filters) + query = self._table.search() + if where: + query = query.where(where) + return len(query.limit(1).to_list()) > 0 + def query( self, query: VectorStoreQuery,