From e84a76783980b9909e24e7068bc476d27b4135ed Mon Sep 17 00:00:00 2001 From: stumpylog <797416+stumpylog@users.noreply.github.com> Date: Tue, 16 Jun 2026 10:48:46 -0700 Subject: [PATCH] Docs: revise ingestion staging spec per critical review Co-Authored-By: Claude Opus 4.8 --- ...16-ingestion-staging-unification-design.md | 195 ++++++++++++++---- 1 file changed, 154 insertions(+), 41 deletions(-) diff --git a/docs/superpowers/specs/2026-06-16-ingestion-staging-unification-design.md b/docs/superpowers/specs/2026-06-16-ingestion-staging-unification-design.md index 5b26ec669..e355530cb 100644 --- a/docs/superpowers/specs/2026-06-16-ingestion-staging-unification-design.md +++ b/docs/superpowers/specs/2026-06-16-ingestion-staging-unification-design.md @@ -2,7 +2,7 @@ **Date:** 2026-06-16 **Branch base:** `dev` -**Status:** Approved design, pending implementation plan +**Status:** Approved design (revised per critical review), pending implementation plan ## Problem @@ -26,13 +26,15 @@ The duplication causes three concrete problems: mapping. That mapping is even re-implemented a second time as `_SOURCE_TO_TRIGGER` inside `barcodes.py:198`. -2. **A temp-directory leak from split staging/cleanup ownership.** Staged sources - (mail, API, version, barcode-child) create a temp **directory** via `mkdtemp` - and write the input into it, but nothing ever removes that directory: - `ConsumerPlugin` only unlinks the input **file**, and only on the success path - (`consumer.py`). Every mail/API ingest therefore orphans an empty - `SCRATCH_DIR/paperless-*/` directory forever, and on failure the input file - leaks entirely. +2. **A scratch leak from split staging/cleanup ownership.** Staged sources create + scratch input under `SCRATCH_DIR` that nothing ever fully removes: + `ConsumerPlugin` unlinks only the input **file**, and only on the success path + (`consumer.py:742`). The exact leak shape varies by site — mail attachments and + API/version use `mkdtemp` + a file inside, so the **directory** is orphaned + (empty after success, dir-with-file on failure); the mail `.eml` path uses + `mkstemp` (`mail.py:~955`), so it leaks a **file** directly in `SCRATCH_DIR` on + failure. Either way there is no owner that removes the staged input on every + terminal path. 3. **Three test seams for one operation.** `ConsumeTaskMixin` patches `documents.tasks.consume_file.apply_async` (`tests/utils.py:249`); the @@ -71,6 +73,12 @@ In scope: Out of scope (explicitly — confirmed during exploration): +- **`bulk_edit.py`'s 8 dispatch sites (phase 2).** Bulk merge/split/version + (`bulk_edit.py:485,588,661,727,811,844,938,961`) also build `ConsumableDocument`s + and dispatch `consume_file`. They are deferred to a follow-up plan that adopts + the seam this refactor establishes. Consequence: until phase 2, the "single + canonical seam" is partial — those paths still call `consume_file` directly. The + spec states this rather than implying full unification. - **New poller sources (S3/SFTP/webhook).** They need infrastructure that does not exist (a scheduling/registration framework, per-source credential/config models, a generic already-seen dedup table, new `DocumentSource`/`TriggerSource` enum @@ -100,7 +108,13 @@ Settled during brainstorming: task's working artifacts live under a single directory, removed by one `rmtree`. This also folds away `ConsumerPlugin`'s redundant second temp dir. 4. **One canonical dispatch seam: `enqueue_consumption`.** Tests patch it in one - place. + place — **but only because** of two implementation constraints the plan must + enforce: (a) sites call it **module-qualified** (`ingest.enqueue_consumption(...)`, + not a bare imported name), so a single `documents.ingest.enqueue_consumption` + patch intercepts every site; (b) `build_consume_signature` passes + `input_doc`/`overrides` as **keyword** args, so `Signature.kwargs` keeps the + shape mail tests already assert on. Without both, the "one patch point" claim is + false. ## Architecture @@ -187,30 +201,79 @@ One tree per document; one cleanup. The split re-enqueue produces each child via `stage_document` + `build_consume_signature` using `SOURCE_TO_TRIGGER`, removing the sixth -hand-rolled site and the `_SOURCE_TO_TRIGGER` duplicate. Each child gets its own -work_root; the parent's work_root is cleaned when the parent task stops. +hand-rolled site and the `_SOURCE_TO_TRIGGER` duplicate. **This is a +restructuring, not a swap:** today all children share a single `mkdtemp` dir +(`barcodes.py:188-194`, deliberately separate from the parent's `base_temp_dir`). +Each child must instead get its **own** work_root, because each child is a +separate `consume_file` task whose `finally` will `rmtree` its `staging_dir` — a +shared dir would let one child delete siblings' not-yet-consumed files. The +children already copy their split file out of the parent tree +(`copy_file_with_basic_stats`, `barcodes.py:211`), so the parent's work_root is +independently cleanable when the parent stops. -### Call-site refactor (the five external sites) +### Mail ownership boundary (the batch case — `mail.py`) -Each collapses to: `with stage_document(...) as staged: staged.write(...); -overrides = DocumentMetadataOverrides(...source-specific...); -enqueue_consumption(staged.input_doc, overrides); staged.release()`. Folder source -has no payload to stage (the file is already in `CONSUMPTION_DIR`), so it builds a -`ConsumableDocument(..., staging_dir=None)` and calls `enqueue_consumption` -directly without `stage_document`. Mail builds each signature with -`build_consume_signature`, appends to its chord list, dispatches the chord -unchanged, then `release()`s each staged document. +Mail is the one source that does **not** dispatch per file: `_handle_message` +collects N attachment signatures (and optionally the `.eml` signature), then +`queue_consumption_tasks` wraps them in a single `chord(...).delay()` _after_ the +loop (`mail.py:919`). A per-file `release()` is therefore wrong — if `release()` +ran per attachment and the later chord dispatch threw, every staged file would be +orphaned, reopening the leak. **The ownership boundary is the whole message:** + +```python +def _handle_message(...): + with contextlib.ExitStack() as staging_stack: + consume_tasks = [] + for att in attachments: # and the .eml branch + staged = staging_stack.enter_context(stage_document(MailFetch, name=...)) + staged.write(att.payload) + consume_tasks.append(build_consume_signature(staged.input_doc, overrides)) + queue_consumption_tasks(consume_tasks, rule, message) # chord(...).delay() + for staged in staged_docs: + staged.release() # only after the chord is dispatched + # ExitStack __exit__: any un-released staged doc → rmtree (covers a chord-dispatch failure) +``` + +`queue_consumption_tasks` itself is unchanged. `build_consume_signature` **must +pass `input_doc`/`overrides` as keyword args** (`consume_file.s(input_doc=..., +overrides=...)`) so the resulting `Signature.kwargs` keeps the shape mail tests +assert on (`test_mail.py:365-366`). + +### Call-site refactor (the external sites) + +Folder/API/version collapse to: `with stage_document(...) as staged: +staged.write(...); overrides = DocumentMetadataOverrides(...source-specific...); +ingest.enqueue_consumption(staged.input_doc, overrides); staged.release()`. Folder +source has no payload to stage (the file is already in `CONSUMPTION_DIR`), so it +builds a `ConsumableDocument(..., staging_dir=None)` and calls +`ingest.enqueue_consumption` directly without `stage_document`. Mail uses the +`ExitStack` pattern above. + +**Call style is module-qualified.** Sites do `from documents import ingest` and +call `ingest.enqueue_consumption(...)` / `ingest.build_consume_signature(...)` — +_not_ a bare imported name. This is what makes a single patch target +(`documents.ingest.enqueue_consumption`) intercept every site; a direct +`from documents.ingest import enqueue_consumption` would bind the name per-module +and force per-module patching (the existing `from documents.tasks import +consume_file` style is exactly why tests today need multiple patch targets). ## Data flow ``` -enqueue site (synchronous) +folder / API / version site (synchronous, single dispatch) with stage_document(source, name=...) as staged: # mkdtemp work_root, write input overrides = DocumentMetadataOverrides(... per-source ...) - result = enqueue_consumption(staged.input_doc, overrides) # folder/API/version - # or: sig = build_consume_signature(...); chord.append(sig) # mail + result = ingest.enqueue_consumption(staged.input_doc, overrides) staged.release() # ownership → task # __exit__: rmtree(work_root) ONLY if release() never ran (pre-dispatch failure) + # (folder source: no stage_document; ConsumableDocument(staging_dir=None) + enqueue_consumption) + +mail site (synchronous, BATCH dispatch — see "Mail ownership boundary") + with ExitStack() as staging_stack: # owns ALL of the message's staged docs + build N signatures via ingest.build_consume_signature(... keyword args ...) + queue_consumption_tasks(...) # one chord(...).delay() + release() every staged doc # only after the chord dispatches + # __exit__: rmtree any un-released work_root (a chord-dispatch failure cleans the whole batch) consume_file task (async, later) work_root = input_doc.staging_dir or TemporaryDirectory(SCRATCH_DIR) @@ -223,15 +286,32 @@ consume_file task (async, later) ## Error handling & edges -- **Double-sided collation** preserves the first half across tasks: it stops with - `StopConsumeTaskError` to await the second half, but copies that content out to - its own staging file (`double_sided.py:74`) **before** the stop. Removing the - parent `work_root` afterward is therefore safe. The plan MUST verify this - copy-out-precedes-stop ordering, since it is load-bearing for the cleanup rule. +- **Double-sided collation — safe, but outside the work_root model.** It stops + with `StopConsumeTaskError` to await the second half, and preserves that half by + **`shutil.move(pdf_file, staging)`** to `SCRATCH_DIR/` + (`double_sided.py:~134`) — a _move_, to a location _outside_ any work_root, + performed **before** the stop. So `rmtree`-ing the parent work_root afterward is + safe (the half already left the tree). Two consequences the plan must honor: + (a) the preserved staging file lives in `SCRATCH_DIR`, is **never** covered by + the per-document cleanup, and is cleaned by the second-half collate + (`staging.unlink()`) or timeout as today — the "one root" framing does not + extend to it; (b) the plan must verify the move-precedes-stop ordering, since it + is load-bearing for the cleanup rule. +- **`ConsumerPlugin`'s own cleanup becomes partly redundant.** On success it + unlinks `original_file` and `working_copy` (`consumer.py:742/744`), both of + which now live inside work_root that the task `finally` `rmtree`s. The redundant + unlinks are harmless but the plan should remove them for clarity, while keeping + the qpdf `--replace-input` recovery (`unmodified_original`, `consumer.py:452+`) + working when `working_copy` lives under work_root. - **Folder source is intrinsically asymmetric** — its original lives in the watched dir, not a work_root. The "one root" model fully applies to staged sources; folder gets in-place-original (cleaned by `ConsumerPlugin` on success) plus an isolated per-task working root. This is correct, not a gap. +- **`staging_dir is None` must be a strict no-op.** Many integration tests call + the real `consume_file` with hand-built `ConsumableDocument`s that never set + `staging_dir` (`test_workflows.py`, `test_barcodes.py`, `test_double_sided.py`). + The new work_root/`finally` logic must reduce to exactly today's behavior when + `staging_dir is None`, or those currently-passing tests regress. - **Duplicate/stop are not failures.** The worker `finally` cleans `work_root` on every terminal path, but a future quarantine feature (below) would relocate the input only on a genuine exception, never on `ConsumeFileDuplicateError` or @@ -251,17 +331,40 @@ New `src/documents/tests/` unit tests for `ingest.py` (pytest-style classes, - `consume_file` removes `staging_dir` on success, on `StopConsumeTaskError`, on duplicate, and on exception; and does nothing destructive when `staging_dir` is `None` (folder source) beyond today's behavior. +- The `trigger_source` header survives `Signature.set(headers=...).apply_async()` + **and** chord dispatch (a guard against chord wrapping dropping per-signature + headers — the one path where header propagation could silently break). -Existing tests — the single-seam payoff and its migration cost: +Existing tests — the migration is centralized but **not** trivial: -- `ConsumeTaskMixin` (`tests/utils.py:249`) repoints its patch from - `documents.tasks.consume_file.apply_async` to the new `enqueue_consumption` - seam, and its assert helper inspects `(input_doc, overrides)` directly. Its ~40 - API tests then pass unchanged through the mixin. +- `ConsumeTaskMixin` (`tests/utils.py:242-280`): repoint the patch from + `documents.tasks.consume_file.apply_async` to `documents.ingest.enqueue_consumption`, + **and rewrite both assert helpers** — they currently read the raw `apply_async` + shape `call_args.kwargs["kwargs"]["input_doc"]` (`assert_queue_consumption_task_call_args` + at :259 and `get_all_consume_task_call_args` at :267). With the seam called + positionally as `enqueue_consumption(input_doc, overrides)`, those become + `call_args.args[0]/[1]`. This is concentrated in the mixin, so its ~15 helper + call sites in `test_api_documents.py` + 1 in `test_barcodes.py` pass once the + helpers are fixed — but it is a helper rewrite, not a one-line change. - The consumer-folder tests (`test_management_consumer.py`, ~15 methods) repoint - `mock_consume_file_delay` from the module-local `consume_file` to the seam. -- Mail tests that patch `queue_consumption_tasks` stay untouched (the chord - boundary is preserved). + `mock_consume_file_delay` to the seam. +- `test_api_document_versions.py` (3 tests) patches the **module-local** + `documents.views.consume_file` — repoint to `documents.views`-qualified usage or + the central seam. +- Real-task integration tests that build `ConsumableDocument`s by hand and call + `consume_file` directly (`test_workflows.py` ~15, `test_barcodes.py` ~5, + `test_double_sided.py` ~9) exercise the `staging_dir is None` path; they should + stay green **iff** that path is a strict no-op (see Error handling). +- Mail tests that patch `queue_consumption_tasks` stay untouched **only if** + `build_consume_signature` uses keyword args (above); otherwise their assertions + on `Signature.kwargs` (`test_mail.py`, `test_mail_nfc.py`, `test_preprocessor.py`, + ~15 methods) break. + +**Realistic blast radius: ~70–90 in-scope test methods** route through the +changed seams (the export-style "one patch point" still holds, but the helper +rewrite + keyword-arg + module-qualified constraints are what make it true). This +excludes `bulk_edit.py`'s ~35 tests, which are deferred with their migration to +the bulk-edit phase-2 plan. ## Enabled future work (not built here) @@ -284,8 +387,18 @@ if it is wanted. - **Cleanup must run on all terminal paths.** The worker `finally` must cover success, `StopConsumeTaskError`, `ConsumeFileDuplicateError`, and unexpected exceptions, or the leak reappears. Covered by the `consume_file` tests above. -- **Test-seam migration churn.** ~55 existing tests route through the two patch - points being repointed. Mitigation: the changes are concentrated in - `ConsumeTaskMixin` and the consumer test fixture, not spread across every test. -- **Double-sided ordering.** The copy-out-before-stop assumption must be verified - in the plan before relying on it for cleanup. +- **Test-seam migration churn.** ~70–90 in-scope test methods route through the + changed seams. Mitigation: concentrated in `ConsumeTaskMixin` (helper rewrite) + and a few fixtures — but it is a helper rewrite plus a keyword-arg and a + module-qualified-call contract, not a one-line repoint. The plan must encode all + three constraints or the "single patch point" promise is false. +- **Mail batch ownership.** The `ExitStack` boundary (release all only after the + chord dispatches; rmtree-all on dispatch failure) is load-bearing; getting it + per-attachment instead reopens the leak for the whole message. +- **Double-sided ordering.** The move-precedes-stop assumption + (`shutil.move` to `SCRATCH_DIR` at `double_sided.py:~134`) must be verified in + the plan before relying on it for cleanup. +- **`bulk_edit.py` is deferred, not done.** Until the phase-2 plan migrates its 8 + dispatch sites, the "single canonical seam" is partial: bulk merge/split/version + still call `consume_file` directly. The spec states this honestly rather than + implying full unification.