mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-06-20 20:34:20 +00:00
Fix (beta): Stream chunks during compaction to prevent oom on smaller installs (#13014)
This commit is contained in:
@@ -393,6 +393,23 @@ class TestCompact:
|
||||
for c in held:
|
||||
c.close()
|
||||
|
||||
def test_force_compact_streams_rows_across_batches(
|
||||
self,
|
||||
store,
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
"""Rebuild must preserve every row when rows span multiple batches.
|
||||
|
||||
A tiny batch size forces several fetchmany()/executemany() cycles so a
|
||||
regression in the streaming loop (dropped tail, off-by-one) surfaces.
|
||||
"""
|
||||
monkeypatch.setattr("paperless_ai.vector_store.COMPACT_BATCH_SIZE", 3)
|
||||
store.add([make_node(f"n{i}", "1", seed=float(i)) for i in range(10)])
|
||||
store.compact(force=True)
|
||||
ids = {n.node_id for n in store.get_nodes(filters=_in_filter(["1"]))}
|
||||
assert ids == {f"n{i}" for i in range(10)}
|
||||
assert self._bloat_ratio(store) == pytest.approx(1.0)
|
||||
|
||||
|
||||
class TestDbFile:
|
||||
def test_single_db_file_in_index_dir(self, store, tmp_path: Path) -> None:
|
||||
|
||||
@@ -42,6 +42,11 @@ SCHEMA_VERSION = 1
|
||||
# a rebuild copies the live rows into a fresh table.
|
||||
COMPACT_BLOAT_RATIO = 2.0
|
||||
|
||||
# compact(): number of rows copied per executemany() when rebuilding the file.
|
||||
# Rows are streamed from the source cursor in batches of this size rather than
|
||||
# materialized all at once, keeping memory bounded regardless of index size.
|
||||
COMPACT_BATCH_SIZE = 500
|
||||
|
||||
# Filterable vec0 metadata columns. _build_where() only ever receives filter
|
||||
# keys we construct ourselves, but allowlisting keeps SQL identifiers safe by
|
||||
# construction.
|
||||
@@ -500,24 +505,28 @@ class PaperlessSqliteVecVectorStore(BasePydanticVectorStore):
|
||||
value = self._meta_get(key)
|
||||
if value is not None:
|
||||
self._meta_set_on(new_conn, key, value)
|
||||
rows = self._conn.execute(
|
||||
src_cursor = self._conn.execute(
|
||||
"SELECT id, document_id, modified, node_content, embedding "
|
||||
"FROM " + DEFAULT_TABLE_NAME,
|
||||
).fetchall()
|
||||
new_conn.execute("BEGIN IMMEDIATE")
|
||||
new_conn.executemany(
|
||||
self._INSERT,
|
||||
[
|
||||
(
|
||||
r["id"],
|
||||
r["document_id"],
|
||||
r["modified"],
|
||||
r["node_content"],
|
||||
bytes(r["embedding"]),
|
||||
)
|
||||
for r in rows
|
||||
],
|
||||
)
|
||||
new_conn.execute("BEGIN IMMEDIATE")
|
||||
# Stream rows from the source cursor in batches instead of
|
||||
# materializing the whole table in memory, so a large index does
|
||||
# not cause an OOM during routine maintenance compactions.
|
||||
while batch := src_cursor.fetchmany(COMPACT_BATCH_SIZE):
|
||||
new_conn.executemany(
|
||||
self._INSERT,
|
||||
[
|
||||
(
|
||||
r["id"],
|
||||
r["document_id"],
|
||||
r["modified"],
|
||||
r["node_content"],
|
||||
bytes(r["embedding"]),
|
||||
)
|
||||
for r in batch
|
||||
],
|
||||
)
|
||||
# Reset the cumulative counter: after compact, total_inserts == live.
|
||||
self._meta_set_on(new_conn, "total_inserts", str(live))
|
||||
new_conn.execute("COMMIT")
|
||||
|
||||
Reference in New Issue
Block a user