Files
paperless-ngx/src/paperless_ai/vector_store.py
T
Trenton H a020f64d08 Enhancement(beta): replace LanceDB vector store with sqlite-vec (#12990)
* Chore(beta): add sqlite-vec 0.1.9 dependency

Pinned exactly: the 0.1.9 wheels carry no baked SIMD flags (safe on
pre-AVX2 CPUs, the point of this migration); the 0.1.10 alphas bake
-mavx and would reintroduce the #12970 crash class.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Test(beta): port vector store tests to sqlite-vec backend

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Enhancement(beta): switch AI vector store from LanceDB to sqlite-vec

Fixes the non-AVX2 SIGILL class (#12970) at the root: lancedb is no
longer imported. sqlite-vec 0.1.9 wheels carry no baked SIMD, vec0
metadata columns give parameterized EQ/IN filtering, WAL preserves the
lock-free-reader model, and compact() rebuilds the table because vec0
DELETEs never reclaim space.

Implementation notes vs. the Task 3A draft:
- compact() uses a file-swap approach (new db file + Path.replace) rather
  than ALTER TABLE RENAME, which does not cascade to shadow tables in
  sqlite-vec 0.1.9 (upstream limitation).
- Bloat is tracked via a cumulative total_inserts counter in index_meta
  because the _rowids shadow table does not accumulate deleted rows in
  0.1.9 (contrary to the design doc assumption from #54).
- None distances from the zero-vector cosine edge case are mapped to
  similarity 0.0 rather than raising TypeError.
- Test suite updated accordingly: _bloat_ratio reads index_meta instead
  of _rowids; seed collision in force-compact test fixed (seed=100.0).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Enhancement(beta): wire indexing pipeline to the sqlite-vec store

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Enhancement(beta): move filename/storage path/ASN to node metadata

Same treatment as title/tags/correspondent in #12944: excluded from
the embedded text, visible to the LLM via metadata prepend. Changes
embedded text for every document, so it ships inside the sqlite-vec
transition, whose forced rebuild re-embeds everything anyway.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Test(beta): cover legacy LanceDB index cleanup and forced rebuild

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Chore(beta): drop lancedb dependency

Fixes #12970: the package whose wheels SIGILL on non-AVX2 CPUs is no
longer installed at all.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Chore(beta): partial pyrefly cleanup on sqlite-vec vector store

- Add MetadataFilter import and isinstance guard in _build_where()
- Add query_embedding None guard in query()
- Fix dict.get() type-checker ambiguity in get_configured_model_name()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Chore(beta): drop automatic LanceDB index cleanup on startup

Leave legacy Lance directory removal to the user rather than deleting it
automatically on first run. Beta policy: user is expected to do a clean
re-embed anyway; no need for the system to silently delete their data.

Remove _cleanup_legacy_lance_index(), the forced-rebuild path that called
it, and the associated tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Chore(beta): ruff format pass on sqlite-vec AI files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Removes the benchmarking file

* Try to resolve or silence some semgrep.  But we're using SQL here, not an ORM and we control the inputs, not users

* Enhancement(beta): add schema migration machinery to sqlite-vec vector store

Adds versioned schema migration support modelled after PR #12968's LanceDB
approach, adapted for sqlite-vec's file-swap compaction pattern.

- SCHEMA_VERSION = 1 written to index_meta at table creation and preserved
  through compact()
- Migration dataclass with from_version, to_version, kind ("structural" or
  "re-embed"), description, and an optional apply(src, dst, dim) callable
- MIGRATIONS registry (empty at v1 baseline); add entries and bump
  SCHEMA_VERSION when the schema changes
- check_and_run_migrations(): structural migrations run via the same
  file-swap as compact() (no re-embed); re-embed migrations return True
  so the caller forces a full rebuild
- update_llm_index() calls check_and_run_migrations() under the write lock
  before any indexing work

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Chore(beta): deduplicate vector store internals via helper methods

Extract three helpers to remove copy-paste between compact() and
_run_structural_migration():
- _meta_set_on(conn, key, value): static upsert into any connection's
  index_meta; _meta_set() now delegates to it
- _create_vec_table(conn, dim): CREATE VIRTUAL TABLE DDL (carries the
  nosemgrep annotation)
- _swap_in_compact(compact_path, db_path): close/replace/reconnect
  sequence used by both file-swap callers

Also normalises compact() error-path cleanup to unlink(missing_ok=True).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Adds equality test and no covers some defensive error handling stuff

* Ensures an embed migration stops the migration chain, just in case

* Silence one kind right but not really semgrep

* Trims dead assignment

* Fix(beta): address Copilot review on sqlite-vec vector store

Three findings from the PR review:

- compact() failure cleanup now unlinks the temporary .compact-wal and
  .compact-shm files, matching _run_structural_migration(); previously
  only the main .compact file was removed.
- _build_where() fails closed (1 = 0) when filters are requested but none
  translate, instead of emitting "()" which is invalid SQL; filters scope
  document access, so an empty translation must match no rows.
- Drop the unused table_name constructor parameter (all SQL hardcodes
  DEFAULT_TABLE_NAME) and its callers in indexing.py.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Enhancement(beta): guard sqlite-vec compaction swap against concurrent readers

The compaction/migration file swap replaces the database via os.replace,
but the -wal/-shm files are keyed by path, not inode. A reader holding an
open connection across the swap leaves the old WAL aliased onto the new
file; a subsequent write then corrupts the database (reproduced via
PRAGMA integrity_check).

Add a cross-process read/write lock (filelock.ReadWriteLock) over the
index:

- read_store() holds it shared for the whole connection lifetime (and
  closes the connection on exit); concurrent readers do not block.
- compaction and the migration check run under an exclusive lock that
  drains readers, and skip with an info log on Timeout (maintenance op,
  retries next run).
- Normal writes are untouched: WAL gives reader/writer concurrency and
  LLM_INDEX_LOCK still serializes writers, so they never block readers.

load_or_build_index() now takes the store from the caller's read_store()
so the lock and connection span the whole retrieval; chat holds it across
the streamed response. Two new settings: LLM_INDEX_RWLOCK and
LLM_INDEX_COMPACTION_LOCK_TIMEOUT.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Ensures the store alays cleans up SQLite connections for any operations, even on errors

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 13:20:41 -07:00

605 lines
24 KiB
Python

import json
import logging
import sqlite3
import struct
from collections.abc import Callable
from collections.abc import Iterator
from collections.abc import Sequence
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from types import TracebackType
from typing import Any
from typing import Literal
import sqlite_vec
from llama_index.core.bridge.pydantic import PrivateAttr
from llama_index.core.schema import BaseNode
from llama_index.core.vector_stores.types import BasePydanticVectorStore
from llama_index.core.vector_stores.types import FilterCondition
from llama_index.core.vector_stores.types import FilterOperator
from llama_index.core.vector_stores.types import MetadataFilter
from llama_index.core.vector_stores.types import MetadataFilters
from llama_index.core.vector_stores.types import VectorStoreQuery
from llama_index.core.vector_stores.types import VectorStoreQueryResult
from llama_index.core.vector_stores.utils import metadata_dict_to_node
from llama_index.core.vector_stores.utils import node_to_metadata_dict
logger = logging.getLogger("paperless_ai.vector_store")
DB_FILENAME = "llmindex.db"
DEFAULT_TABLE_NAME = "documents"
# Current schema version. Written to index_meta at table creation and bumped
# whenever a Migration is added to MIGRATIONS. check_and_run_migrations() uses
# this to decide which migrations to run on an existing store.
SCHEMA_VERSION = 1
# compact(): rebuild when the cumulative rowid count exceeds this multiple of
# the live row count. DELETEs on vec0 tables never reclaim space (upstream
# asg017/sqlite-vec#54), so per-document re-index churn grows the file until
# a rebuild copies the live rows into a fresh table.
COMPACT_BLOAT_RATIO = 2.0
# Filterable vec0 metadata columns. _build_where() only ever receives filter
# keys we construct ourselves, but allowlisting keeps SQL identifiers safe by
# construction.
_FILTER_COLUMNS = frozenset({"document_id", "modified"})
@dataclass
class Migration:
"""A schema migration for the sqlite-vec vector store.
kind="structural": rows are copied into a new-schema file with no
re-embedding needed. Supply ``apply(src_conn, dst_conn, dim)`` which
must create the vec0 table in ``dst_conn``, copy all rows from
``src_conn``, and write ``dim`` / ``embed_model`` / ``total_inserts`` to
``dst_conn``'s ``index_meta``. ``schema_version`` is written by the
migration runner after ``apply`` returns.
kind="re-embed": the new schema requires fresh embeddings.
``check_and_run_migrations()`` returns True when it encounters one of
these so the caller can force a full rebuild (which recreates the table
at the current SCHEMA_VERSION).
"""
from_version: int
to_version: int
kind: Literal["structural", "re-embed"]
description: str
apply: Callable[[sqlite3.Connection, sqlite3.Connection, int], None] | None = field(
default=None,
repr=False,
)
# Registry of all schema migrations in order. Empty at v1 -- this is the
# baseline. Add entries here (and bump SCHEMA_VERSION) when the schema changes.
MIGRATIONS: list[Migration] = []
def _pack(embedding: Sequence[float]) -> bytes:
return struct.pack(f"{len(embedding)}f", *embedding)
def _unpack(blob: bytes) -> list[float]:
return list(struct.unpack(f"{len(blob) // 4}f", blob))
def _build_where(filters: MetadataFilters | None) -> tuple[str, list[str]]:
"""Translate the EQ / IN filters we use into a parameterized SQL clause
on vec0 metadata columns. Returns ("", []) when there is nothing to filter.
"""
if filters is None or not filters.filters:
return "", []
clauses: list[str] = []
params: list[str] = []
for f in filters.filters:
# filters.filters is Union[MetadataFilter, ExactMatchFilter, MetadataFilters];
# we only build MetadataFilter entries, so skip anything else at runtime.
if not isinstance(f, MetadataFilter):
continue
if f.key not in _FILTER_COLUMNS: # pragma: no cover - we build the keys
raise NotImplementedError(f"Unsupported filter column: {f.key}")
if f.operator == FilterOperator.IN:
values = [str(v) for v in f.value] # type: ignore[union-attr] # value is list when operator is IN
if not values: # pragma: no cover
clauses.append("1 = 0")
continue
placeholders = ",".join("?" for _ in values)
clauses.append(f"{f.key} IN ({placeholders})")
params.extend(values)
elif f.operator == FilterOperator.EQ:
clauses.append(f"{f.key} = ?")
params.append(str(f.value))
else: # pragma: no cover - we only ever build EQ/IN filters
raise NotImplementedError(f"Unsupported filter operator: {f.operator}")
if not clauses:
# Filters were requested but none could be translated. Fail closed
# rather than emit "()" (invalid SQL): filters scope document access,
# so an empty translation must match no rows, never widen the scope.
return "1 = 0", []
joiner = " OR " if filters.condition == FilterCondition.OR else " AND "
return "(" + joiner.join(clauses) + ")", params
class PaperlessSqliteVecVectorStore(BasePydanticVectorStore):
"""A llama-index vector store backed by a sqlite-vec vec0 table.
Stores one row per node: the node id (TEXT primary key), its document id
(metadata column, used for EQ/IN filtering and per-document delete), the
document's modified timestamp, the embedding (float32, cosine metric), and
the serialized node (text + metadata) as JSON in an auxiliary column.
``stores_text`` lets llama-index run off this store alone, with no
separate docstore or index store.
Everything lives in one SQLite database file (``DB_FILENAME``) inside the
directory given as ``uri`` (kept as a directory for compatibility with the
previous LanceDB layout). WAL mode allows readers in other processes to
proceed while the (FileLock-serialized) writer holds a transaction.
Implemented surface of ``BasePydanticVectorStore``
---------------------------------------------------
Only the methods actively used by this codebase are implemented.
``delete_nodes`` and the ``node_ids`` lookup path of ``get_nodes`` are
part of the llama-index interface contract and may be needed if a future
retriever or extension invokes them — add them then, with tests.
"""
stores_text: bool = True
flat_metadata: bool = False
_uri: str = PrivateAttr()
_embed_model_name: str | None = PrivateAttr()
_conn: Any = PrivateAttr()
def __init__(
self,
uri: str,
embed_model_name: str | None = None,
) -> None:
super().__init__(stores_text=True, flat_metadata=False)
self._uri = uri
self._embed_model_name = embed_model_name
self._conn = self._open_connection(str(Path(uri) / DB_FILENAME))
@staticmethod
def _open_connection(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(
db_path,
timeout=30,
isolation_level=None, # autocommit; explicit transactions below
)
conn.row_factory = sqlite3.Row
conn.enable_load_extension(True) # noqa: FBT003
sqlite_vec.load(conn)
conn.enable_load_extension(False) # noqa: FBT003
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute(
"CREATE TABLE IF NOT EXISTS index_meta (key TEXT PRIMARY KEY, value TEXT)",
)
return conn
@property
def client(self) -> Any:
return self._conn
def close(self) -> None:
"""Close the underlying SQLite connection (idempotent)."""
self._conn.close()
def __enter__(self) -> "PaperlessSqliteVecVectorStore":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
# Deterministically release the connection (and its WAL/SHM handles) so
# it is never left open across a compaction/migration file swap.
self.close()
@contextmanager
def _transaction(self) -> Iterator[None]:
self._conn.execute("BEGIN IMMEDIATE")
try:
yield
except BaseException: # pragma: no cover
self._conn.execute("ROLLBACK")
raise
else:
self._conn.execute("COMMIT")
def _meta_get(self, key: str) -> str | None:
row = self._conn.execute(
"SELECT value FROM index_meta WHERE key = ?",
(key,),
).fetchone()
return row["value"] if row else None
@staticmethod
def _meta_set_on(conn: sqlite3.Connection, key: str, value: str) -> None:
conn.execute(
"INSERT INTO index_meta (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
def _meta_set(self, key: str, value: str) -> None:
self._meta_set_on(self._conn, key, value)
def table_exists(self) -> bool:
return (
self._conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?",
(DEFAULT_TABLE_NAME,),
).fetchone()
is not None
)
def vector_dim(self) -> int | None:
if not self.table_exists():
return None
value = self._meta_get("dim")
return int(value) if value else None
def drop_table(self) -> None:
self._conn.execute("DROP TABLE IF EXISTS " + DEFAULT_TABLE_NAME)
self._conn.execute("DELETE FROM index_meta")
def stored_model_name(self) -> str | None:
"""Return the embedding model name recorded at table creation, or None."""
if not self.table_exists():
return None
return self._meta_get("embed_model")
def config_mismatch(self, model_name: str) -> bool:
"""True when the stored model name differs from ``model_name``.
Returns False when no table exists or when the table predates
model-name tracking — conservative default avoids spurious rebuilds.
"""
stored = self.stored_model_name()
if stored is None:
return False
return stored != model_name
@staticmethod
def _create_vec_table(conn: sqlite3.Connection, dim: int) -> None:
# document_id is deliberately a metadata column, NOT a partition key:
# partition keys change KNN `k` to per-partition semantics under IN
# filters (asg017/sqlite-vec#142); metadata columns give a correct
# global top-k.
conn.execute( # nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query
"CREATE VIRTUAL TABLE "
+ DEFAULT_TABLE_NAME
+ " USING vec0("
+ "id TEXT PRIMARY KEY,"
+ " document_id TEXT,"
+ " modified TEXT,"
+ " +node_content TEXT,"
+ " embedding float["
+ str(int(dim))
+ "] distance_metric=cosine"
+ ")",
)
def _create_table(self, dim: int) -> None:
self._create_vec_table(self._conn, dim)
self._meta_set("dim", str(dim))
self._meta_set("schema_version", str(SCHEMA_VERSION))
if self._embed_model_name:
self._meta_set("embed_model", self._embed_model_name)
def _ensure_table(self, dim: int) -> None:
if not self.table_exists():
self._create_table(dim)
def _row(self, node: BaseNode) -> tuple[str, str, str, str, bytes]:
meta = node_to_metadata_dict(
node,
remove_text=False,
flat_metadata=self.flat_metadata,
)
# vec0 metadata columns reject NULL (asg017/sqlite-vec#141): coerce
# every value to a string, with "" as the absent sentinel.
document_id = node.ref_doc_id or node.metadata.get("document_id")
return (
node.node_id,
str(document_id or ""),
str(node.metadata.get("modified") or ""),
json.dumps(meta),
_pack(node.get_embedding()),
)
_INSERT = (
"INSERT INTO "
+ DEFAULT_TABLE_NAME
+ " (id, document_id, modified, node_content, embedding) VALUES (?, ?, ?, ?, ?)"
)
def _increment_total_inserts(self, count: int) -> None:
"""Increment the cumulative insert counter stored in index_meta.
This counter never decreases (DELETEs do not decrement it) and is
used by compact() to estimate the bloat ratio: when total_inserts /
live_rows exceeds COMPACT_BLOAT_RATIO the table has accumulated
enough deleted-but-not-freed rows to warrant a rebuild.
"""
current = int(self._meta_get("total_inserts") or "0")
self._meta_set("total_inserts", str(current + count))
def add(self, nodes: Sequence[BaseNode], **add_kwargs: Any) -> list[str]:
if not nodes:
return []
rows = [self._row(node) for node in nodes]
with self._transaction():
self._ensure_table(len(nodes[0].get_embedding()))
self._conn.executemany(self._INSERT, rows)
self._increment_total_inserts(len(rows))
return [node.node_id for node in nodes]
def upsert_document(self, document_id: str, nodes: list[BaseNode]) -> list[str]:
"""Atomically replace all stored chunks of ``document_id`` with ``nodes``.
One transaction deletes the document's existing rows and inserts the
new set (vec0's INSERT OR REPLACE is broken upstream, #259, so
delete+insert it is). WAL readers in other processes see either the
old or the new chunk set, never a partial state.
"""
rows = [self._row(node) for node in nodes]
with self._transaction():
if nodes:
self._ensure_table(len(nodes[0].get_embedding()))
if self.table_exists():
self._conn.execute(
"DELETE FROM " + DEFAULT_TABLE_NAME + " WHERE document_id = ?",
(str(document_id),),
)
if rows:
self._conn.executemany(self._INSERT, rows)
self._increment_total_inserts(len(rows))
return [node.node_id for node in nodes]
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
if self.table_exists():
with self._transaction():
self._conn.execute(
"DELETE FROM " + DEFAULT_TABLE_NAME + " WHERE document_id = ?",
(str(ref_doc_id),),
)
def _rows_to_nodes(self, rows: list[sqlite3.Row]) -> list[BaseNode]:
nodes: list[BaseNode] = []
for row in rows:
node = metadata_dict_to_node(json.loads(row["node_content"]))
node.embedding = _unpack(row["embedding"])
nodes.append(node)
return nodes
def get_nodes(
self,
node_ids: list[str] | None = None,
filters: MetadataFilters | None = None,
**kwargs: Any,
) -> list[BaseNode]:
if node_ids is not None: # pragma: no cover
# node_ids lookup is not implemented; see class docstring.
raise NotImplementedError(
"PaperlessSqliteVecVectorStore does not support node_ids lookup",
)
if not self.table_exists():
return []
where, params = _build_where(filters)
sql = "SELECT node_content, embedding FROM " + DEFAULT_TABLE_NAME
if where:
sql += " WHERE " + where
return self._rows_to_nodes(self._conn.execute(sql, params).fetchall())
def query(
self,
query: VectorStoreQuery,
**kwargs: Any,
) -> VectorStoreQueryResult:
if not self.table_exists():
return VectorStoreQueryResult(nodes=[], similarities=[], ids=[])
if query.query_embedding is None: # pragma: no cover
return VectorStoreQueryResult(nodes=[], similarities=[], ids=[])
top_k = query.similarity_top_k if query.similarity_top_k is not None else 10
where, params = _build_where(query.filters)
sql = (
"SELECT id, node_content, embedding, distance FROM "
+ DEFAULT_TABLE_NAME
+ " WHERE embedding MATCH ? AND k = ?"
)
if where:
sql += " AND " + where
rows = self._conn.execute(
sql,
[_pack(query.query_embedding), top_k, *params],
).fetchall()
# vec0 returns rows distance-sorted ascending; slice defensively in
# case future schema changes alter k semantics (e.g. partition keys
# return k rows per partition).
rows = rows[:top_k]
nodes = self._rows_to_nodes(rows)
# Cosine distance in [0, 2]; map to a descending similarity.
# vec0 returns None distance when the query embedding is the zero vector
# (no meaningful cosine angle); treat that as maximum distance (1.0) so
# the row is included but ranked last.
sims = [
1.0 - float(row["distance"] if row["distance"] is not None else 1.0)
for row in rows
]
ids = [row["id"] for row in rows]
return VectorStoreQueryResult(nodes=nodes, similarities=sims, ids=ids)
def get_modified_times(self) -> dict[str, str]:
"""Return {document_id: stored_modified_isoformat} for all indexed documents.
All chunks of a document share the same ``modified`` value, so the
first row seen per document is sufficient.
"""
if not self.table_exists():
return {}
result: dict[str, str] = {}
for row in self._conn.execute(
"SELECT document_id, modified FROM " + DEFAULT_TABLE_NAME,
):
doc_id = str(row["document_id"])
if doc_id not in result:
result[doc_id] = str(row["modified"] or "")
return result
def compact(self, *, force: bool = False) -> None:
"""Rebuild the database file to reclaim space left behind by DELETEs.
vec0 DELETE only invalidates rows; the vector data stays in the file
forever (asg017/sqlite-vec#54), and per-document re-indexing is a
delete+insert. The cumulative insert counter in ``index_meta`` tracks
total rows ever written; when that exceeds ``COMPACT_BLOAT_RATIO`` x
the live row count (or when forced), live rows are copied into a fresh
database file and swapped in via ``os.replace``.
Note: ``ALTER TABLE ... RENAME TO`` on vec0 virtual tables does NOT
rename the shadow tables (sqlite-vec upstream limitation), so
an in-place rename-based rebuild is not safe. The file-swap approach
is the maintainer-endorsed workaround (asg017/sqlite-vec#205).
"""
if not self.table_exists():
return
live = self._conn.execute(
"SELECT count(*) FROM " + DEFAULT_TABLE_NAME,
).fetchone()[0]
total = int(self._meta_get("total_inserts") or str(live))
if not force and total <= max(live, 1) * COMPACT_BLOAT_RATIO:
return
dim = self.vector_dim()
if dim is None: # pragma: no cover - dim is written at creation
logger.warning("Skipping compact: no stored vector dimension")
return
logger.info(
"Compacting LLM index (%d live rows, %d cumulative inserts)",
live,
total,
)
db_path = str(Path(self._uri) / DB_FILENAME)
compact_path = db_path + ".compact"
# Copy all live rows into a fresh database file.
new_conn = self._open_connection(compact_path)
try:
self._create_vec_table(new_conn, dim)
self._meta_set_on(new_conn, "dim", str(dim))
for key in ("embed_model", "schema_version"):
value = self._meta_get(key)
if value is not None:
self._meta_set_on(new_conn, key, value)
rows = self._conn.execute(
"SELECT id, document_id, modified, node_content, embedding "
"FROM " + DEFAULT_TABLE_NAME,
).fetchall()
new_conn.execute("BEGIN IMMEDIATE")
new_conn.executemany(
self._INSERT,
[
(
r["id"],
r["document_id"],
r["modified"],
r["node_content"],
bytes(r["embedding"]),
)
for r in rows
],
)
# Reset the cumulative counter: after compact, total_inserts == live.
self._meta_set_on(new_conn, "total_inserts", str(live))
new_conn.execute("COMMIT")
except BaseException:
new_conn.close()
for p in [compact_path, compact_path + "-wal", compact_path + "-shm"]:
Path(p).unlink(missing_ok=True)
raise
new_conn.close()
self._swap_in_compact(compact_path, db_path)
def _swap_in_compact(self, compact_path: str, db_path: str) -> None:
"""Atomically replace the live database with the compacted copy."""
self._conn.close()
for suffix in ["-wal", "-shm"]:
stale = Path(compact_path + suffix)
if stale.exists(): # pragma: no cover
stale.unlink()
Path(compact_path).replace(db_path)
self._conn = self._open_connection(db_path)
def check_and_run_migrations(self) -> bool:
"""Apply any pending schema migrations to the store.
Structural migrations copy live rows into a new-schema file with no
re-embedding. Re-embed migrations cannot be applied automatically;
this method returns True when one is encountered so the caller can
force a full rebuild (which recreates the table at SCHEMA_VERSION).
Must be called under the write FileLock. No-op when the table does
not exist or is already at SCHEMA_VERSION.
"""
if not self.table_exists():
return False
raw = self._meta_get("schema_version")
current = int(raw) if raw is not None else SCHEMA_VERSION
if current >= SCHEMA_VERSION:
return False
pending = sorted(
[m for m in MIGRATIONS if current <= m.from_version < SCHEMA_VERSION],
key=lambda m: m.from_version,
)
for migration in pending:
if migration.kind == "re-embed":
logger.warning(
"LLM index schema v%d -> v%d requires re-embedding (%s); "
"forcing full rebuild.",
migration.from_version,
migration.to_version,
migration.description,
)
return True
logger.info(
"Running structural LLM index migration v%d -> v%d: %s",
migration.from_version,
migration.to_version,
migration.description,
)
self._run_structural_migration(migration)
return False
def _run_structural_migration(self, migration: Migration) -> None:
"""Execute a structural migration using the same file-swap as compact()."""
assert migration.apply is not None, "structural migration must have apply()"
dim = self.vector_dim()
if dim is None: # pragma: no cover
raise RuntimeError("Cannot migrate: no stored vector dimension")
db_path = str(Path(self._uri) / DB_FILENAME)
compact_path = db_path + ".compact"
new_conn = self._open_connection(compact_path)
try:
migration.apply(self._conn, new_conn, dim)
self._meta_set_on(new_conn, "schema_version", str(migration.to_version))
except BaseException: # pragma: no cover
new_conn.close()
for p in [compact_path, compact_path + "-wal", compact_path + "-shm"]:
Path(p).unlink(missing_ok=True)
raise
new_conn.close()
self._swap_in_compact(compact_path, db_path)