From 7bf2a9ff827ad1b65280481ee351c061c8568025 Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Fri, 19 Jun 2026 08:40:16 -0700 Subject: [PATCH] Fix (beta): Stream chunks during compaction to prevent oom on smaller installs (#13014) --- src/paperless_ai/tests/test_vector_store.py | 17 +++++++++ src/paperless_ai/vector_store.py | 39 +++++++++++++-------- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/paperless_ai/tests/test_vector_store.py b/src/paperless_ai/tests/test_vector_store.py index cf869becf..7fa4a5a6b 100644 --- a/src/paperless_ai/tests/test_vector_store.py +++ b/src/paperless_ai/tests/test_vector_store.py @@ -393,6 +393,23 @@ class TestCompact: for c in held: c.close() + def test_force_compact_streams_rows_across_batches( + self, + store, + monkeypatch, + ) -> None: + """Rebuild must preserve every row when rows span multiple batches. + + A tiny batch size forces several fetchmany()/executemany() cycles so a + regression in the streaming loop (dropped tail, off-by-one) surfaces. + """ + monkeypatch.setattr("paperless_ai.vector_store.COMPACT_BATCH_SIZE", 3) + store.add([make_node(f"n{i}", "1", seed=float(i)) for i in range(10)]) + store.compact(force=True) + ids = {n.node_id for n in store.get_nodes(filters=_in_filter(["1"]))} + assert ids == {f"n{i}" for i in range(10)} + assert self._bloat_ratio(store) == pytest.approx(1.0) + class TestDbFile: def test_single_db_file_in_index_dir(self, store, tmp_path: Path) -> None: diff --git a/src/paperless_ai/vector_store.py b/src/paperless_ai/vector_store.py index 60fc3c988..d44583d22 100644 --- a/src/paperless_ai/vector_store.py +++ b/src/paperless_ai/vector_store.py @@ -42,6 +42,11 @@ SCHEMA_VERSION = 1 # a rebuild copies the live rows into a fresh table. COMPACT_BLOAT_RATIO = 2.0 +# compact(): number of rows copied per executemany() when rebuilding the file. +# Rows are streamed from the source cursor in batches of this size rather than +# materialized all at once, keeping memory bounded regardless of index size. +COMPACT_BATCH_SIZE = 500 + # Filterable vec0 metadata columns. _build_where() only ever receives filter # keys we construct ourselves, but allowlisting keeps SQL identifiers safe by # construction. @@ -500,24 +505,28 @@ class PaperlessSqliteVecVectorStore(BasePydanticVectorStore): value = self._meta_get(key) if value is not None: self._meta_set_on(new_conn, key, value) - rows = self._conn.execute( + src_cursor = self._conn.execute( "SELECT id, document_id, modified, node_content, embedding " "FROM " + DEFAULT_TABLE_NAME, - ).fetchall() - new_conn.execute("BEGIN IMMEDIATE") - new_conn.executemany( - self._INSERT, - [ - ( - r["id"], - r["document_id"], - r["modified"], - r["node_content"], - bytes(r["embedding"]), - ) - for r in rows - ], ) + new_conn.execute("BEGIN IMMEDIATE") + # Stream rows from the source cursor in batches instead of + # materializing the whole table in memory, so a large index does + # not cause an OOM during routine maintenance compactions. + while batch := src_cursor.fetchmany(COMPACT_BATCH_SIZE): + new_conn.executemany( + self._INSERT, + [ + ( + r["id"], + r["document_id"], + r["modified"], + r["node_content"], + bytes(r["embedding"]), + ) + for r in batch + ], + ) # Reset the cumulative counter: after compact, total_inserts == live. self._meta_set_on(new_conn, "total_inserts", str(live)) new_conn.execute("COMMIT")