feat(ai): ANN index threshold, scalar index, and compaction

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
stumpylog
2026-06-02 14:36:22 -07:00
parent fa74bb77b3
commit 0421bfcf54
2 changed files with 132 additions and 0 deletions
@@ -218,3 +218,70 @@ class TestPaperlessLanceVectorStoreUpsert:
remaining = sorted(r["document_id"] for r in table.search().to_list())
assert "1" not in remaining
assert "2" in remaining
class TestPaperlessLanceVectorStoreMaintenance:
@pytest.fixture
def store(self, tmp_path: Path) -> PaperlessLanceVectorStore:
return PaperlessLanceVectorStore(uri=str(tmp_path / "idx"))
def test_maybe_create_ann_index_noop_below_threshold(
self,
store: PaperlessLanceVectorStore,
) -> None:
store.add([_node("1-0", "1", "a", 0.1)])
# Threshold far above row count -> no index attempted, no error.
store.maybe_create_ann_index(min_rows=1000)
# Still queryable.
result = store.query(
VectorStoreQuery(query_embedding=[0.1] * DIM, similarity_top_k=1),
)
assert len(result.nodes) == 1
def test_maybe_create_ann_index_non_divisible_dim_falls_back(
self,
store: PaperlessLanceVectorStore,
) -> None:
# DIM=8 is not divisible by the PQ default sub-vectors; must not raise
# and must leave the table queryable (IVF_FLAT fallback or skipped).
for i in range(40):
store.add([_node(f"1-{i}", "1", f"t{i}", float(i))])
store.maybe_create_ann_index(min_rows=10)
result = store.query(
VectorStoreQuery(query_embedding=[1.0] * DIM, similarity_top_k=3),
)
assert len(result.nodes) == 3
def test_compact_reduces_to_single_version(
self,
store: PaperlessLanceVectorStore,
) -> None:
for i in range(5):
store.add([_node(f"1-{i}", "1", f"t{i}", float(i))])
assert len(store.client.open_table("documents").list_versions()) > 1
store.compact(retention_seconds=0)
assert len(store.client.open_table("documents").list_versions()) == 1
def test_upsert_after_optimize_with_scalar_index(
self,
store: PaperlessLanceVectorStore,
) -> None:
store.add(
[
_node("1-0", "1", "old0", 0.1),
_node("1-1", "1", "old1", 0.2),
_node("1-2", "1", "old2", 0.3),
_node("2-0", "2", "keep", 0.9),
],
)
store.ensure_document_id_scalar_index()
store.compact(retention_seconds=0)
store.upsert_document("1", [_node("1-0", "1", "new0", 0.1)])
table = store.client.open_table("documents")
doc1 = sorted(
r["id"] for r in table.search().where("document_id = '1'").to_list()
)
assert doc1 == ["1-0"]
assert table.count_rows() == 2
+65
View File
@@ -20,6 +20,12 @@ logger = logging.getLogger("paperless_ai.vector_store")
DEFAULT_TABLE_NAME = "documents"
# Below this many chunks, LanceDB's exact (brute-force) search is sufficient and
# faster than building an ANN index (per LanceDB guidance, ~100K vectors).
ANN_INDEX_MIN_ROWS = 100_000
# IVF_PQ default; num_sub_vectors must evenly divide the embedding dimension.
ANN_PQ_SUB_VECTORS = 96
def _escape(value: str) -> str:
return str(value).replace("'", "''")
@@ -226,3 +232,62 @@ class PaperlessLanceVectorStore(BasePydanticVectorStore):
sims = [1.0 / (1.0 + float(row["_distance"])) for row in rows]
ids = [row["id"] for row in rows]
return VectorStoreQueryResult(nodes=nodes, similarities=sims, ids=ids)
def _has_vector_index(self) -> bool:
try:
return any(
"vector" in (getattr(idx, "columns", []) or [])
for idx in self._table.list_indices()
)
except Exception: # pragma: no cover - older lancedb without list_indices
return False
def maybe_create_ann_index(self, min_rows: int = ANN_INDEX_MIN_ROWS) -> None:
"""Best-effort: build an IVF index once the table is large enough.
IVF_PQ is used when ``num_sub_vectors`` divides the embedding dimension,
otherwise IVF_FLAT (no divisor constraint). Any failure is logged and
leaves the table on exact search, which is always correct.
"""
if self._table is None:
return
rows = self._table.count_rows()
if rows < min_rows or self._has_vector_index():
return
num_partitions = max(1, rows // 4096)
# Embedding dim from the schema's fixed-size list column.
dim = self._table.schema.field("vector").type.list_size
try:
if dim % ANN_PQ_SUB_VECTORS == 0:
self._table.create_index(
metric="l2",
num_partitions=num_partitions,
num_sub_vectors=ANN_PQ_SUB_VECTORS,
index_type="IVF_PQ",
)
else:
self._table.create_index(
metric="l2",
num_partitions=num_partitions,
index_type="IVF_FLAT",
)
except Exception as e: # pragma: no cover - depends on data/dim
logger.warning("Skipping ANN index creation: %s", e)
def ensure_document_id_scalar_index(self) -> None:
"""Create a scalar index on the filter column (never on the merge key
``id`` — see LanceDB #3177)."""
if self._table is None:
return
try:
self._table.create_scalar_index("document_id", replace=True)
except Exception as e: # pragma: no cover
logger.warning("Skipping document_id scalar index: %s", e)
def compact(self, retention_seconds: int) -> None:
"""Compact fragments and prune old MVCC versions in one call."""
if self._table is None:
return
from datetime import timedelta
self._table.optimize(cleanup_older_than=timedelta(seconds=retention_seconds))