Docs: revise ingestion staging spec per critical review

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
stumpylog
2026-06-16 10:48:46 -07:00
parent 2d7d0d17c9
commit e84a767839
@@ -2,7 +2,7 @@
**Date:** 2026-06-16
**Branch base:** `dev`
**Status:** Approved design, pending implementation plan
**Status:** Approved design (revised per critical review), pending implementation plan
## Problem
@@ -26,13 +26,15 @@ The duplication causes three concrete problems:
mapping. That mapping is even re-implemented a second time as
`_SOURCE_TO_TRIGGER` inside `barcodes.py:198`.
2. **A temp-directory leak from split staging/cleanup ownership.** Staged sources
(mail, API, version, barcode-child) create a temp **directory** via `mkdtemp`
and write the input into it, but nothing ever removes that directory:
`ConsumerPlugin` only unlinks the input **file**, and only on the success path
(`consumer.py`). Every mail/API ingest therefore orphans an empty
`SCRATCH_DIR/paperless-*/` directory forever, and on failure the input file
leaks entirely.
2. **A scratch leak from split staging/cleanup ownership.** Staged sources create
scratch input under `SCRATCH_DIR` that nothing ever fully removes:
`ConsumerPlugin` unlinks only the input **file**, and only on the success path
(`consumer.py:742`). The exact leak shape varies by site — mail attachments and
API/version use `mkdtemp` + a file inside, so the **directory** is orphaned
(empty after success, dir-with-file on failure); the mail `.eml` path uses
`mkstemp` (`mail.py:~955`), so it leaks a **file** directly in `SCRATCH_DIR` on
failure. Either way there is no owner that removes the staged input on every
terminal path.
3. **Three test seams for one operation.** `ConsumeTaskMixin` patches
`documents.tasks.consume_file.apply_async` (`tests/utils.py:249`); the
@@ -71,6 +73,12 @@ In scope:
Out of scope (explicitly — confirmed during exploration):
- **`bulk_edit.py`'s 8 dispatch sites (phase 2).** Bulk merge/split/version
(`bulk_edit.py:485,588,661,727,811,844,938,961`) also build `ConsumableDocument`s
and dispatch `consume_file`. They are deferred to a follow-up plan that adopts
the seam this refactor establishes. Consequence: until phase 2, the "single
canonical seam" is partial — those paths still call `consume_file` directly. The
spec states this rather than implying full unification.
- **New poller sources (S3/SFTP/webhook).** They need infrastructure that does not
exist (a scheduling/registration framework, per-source credential/config models,
a generic already-seen dedup table, new `DocumentSource`/`TriggerSource` enum
@@ -100,7 +108,13 @@ Settled during brainstorming:
task's working artifacts live under a single directory, removed by one
`rmtree`. This also folds away `ConsumerPlugin`'s redundant second temp dir.
4. **One canonical dispatch seam: `enqueue_consumption`.** Tests patch it in one
place.
place**but only because** of two implementation constraints the plan must
enforce: (a) sites call it **module-qualified** (`ingest.enqueue_consumption(...)`,
not a bare imported name), so a single `documents.ingest.enqueue_consumption`
patch intercepts every site; (b) `build_consume_signature` passes
`input_doc`/`overrides` as **keyword** args, so `Signature.kwargs` keeps the
shape mail tests already assert on. Without both, the "one patch point" claim is
false.
## Architecture
@@ -187,30 +201,79 @@ One tree per document; one cleanup.
The split re-enqueue produces each child via `stage_document` +
`build_consume_signature` using `SOURCE_TO_TRIGGER`, removing the sixth
hand-rolled site and the `_SOURCE_TO_TRIGGER` duplicate. Each child gets its own
work_root; the parent's work_root is cleaned when the parent task stops.
hand-rolled site and the `_SOURCE_TO_TRIGGER` duplicate. **This is a
restructuring, not a swap:** today all children share a single `mkdtemp` dir
(`barcodes.py:188-194`, deliberately separate from the parent's `base_temp_dir`).
Each child must instead get its **own** work_root, because each child is a
separate `consume_file` task whose `finally` will `rmtree` its `staging_dir` — a
shared dir would let one child delete siblings' not-yet-consumed files. The
children already copy their split file out of the parent tree
(`copy_file_with_basic_stats`, `barcodes.py:211`), so the parent's work_root is
independently cleanable when the parent stops.
### Call-site refactor (the five external sites)
### Mail ownership boundary (the batch case — `mail.py`)
Each collapses to: `with stage_document(...) as staged: staged.write(...);
overrides = DocumentMetadataOverrides(...source-specific...);
enqueue_consumption(staged.input_doc, overrides); staged.release()`. Folder source
has no payload to stage (the file is already in `CONSUMPTION_DIR`), so it builds a
`ConsumableDocument(..., staging_dir=None)` and calls `enqueue_consumption`
directly without `stage_document`. Mail builds each signature with
`build_consume_signature`, appends to its chord list, dispatches the chord
unchanged, then `release()`s each staged document.
Mail is the one source that does **not** dispatch per file: `_handle_message`
collects N attachment signatures (and optionally the `.eml` signature), then
`queue_consumption_tasks` wraps them in a single `chord(...).delay()` _after_ the
loop (`mail.py:919`). A per-file `release()` is therefore wrong — if `release()`
ran per attachment and the later chord dispatch threw, every staged file would be
orphaned, reopening the leak. **The ownership boundary is the whole message:**
```python
def _handle_message(...):
with contextlib.ExitStack() as staging_stack:
consume_tasks = []
for att in attachments: # and the .eml branch
staged = staging_stack.enter_context(stage_document(MailFetch, name=...))
staged.write(att.payload)
consume_tasks.append(build_consume_signature(staged.input_doc, overrides))
queue_consumption_tasks(consume_tasks, rule, message) # chord(...).delay()
for staged in staged_docs:
staged.release() # only after the chord is dispatched
# ExitStack __exit__: any un-released staged doc → rmtree (covers a chord-dispatch failure)
```
`queue_consumption_tasks` itself is unchanged. `build_consume_signature` **must
pass `input_doc`/`overrides` as keyword args** (`consume_file.s(input_doc=...,
overrides=...)`) so the resulting `Signature.kwargs` keeps the shape mail tests
assert on (`test_mail.py:365-366`).
### Call-site refactor (the external sites)
Folder/API/version collapse to: `with stage_document(...) as staged:
staged.write(...); overrides = DocumentMetadataOverrides(...source-specific...);
ingest.enqueue_consumption(staged.input_doc, overrides); staged.release()`. Folder
source has no payload to stage (the file is already in `CONSUMPTION_DIR`), so it
builds a `ConsumableDocument(..., staging_dir=None)` and calls
`ingest.enqueue_consumption` directly without `stage_document`. Mail uses the
`ExitStack` pattern above.
**Call style is module-qualified.** Sites do `from documents import ingest` and
call `ingest.enqueue_consumption(...)` / `ingest.build_consume_signature(...)`
_not_ a bare imported name. This is what makes a single patch target
(`documents.ingest.enqueue_consumption`) intercept every site; a direct
`from documents.ingest import enqueue_consumption` would bind the name per-module
and force per-module patching (the existing `from documents.tasks import
consume_file` style is exactly why tests today need multiple patch targets).
## Data flow
```
enqueue site (synchronous)
folder / API / version site (synchronous, single dispatch)
with stage_document(source, name=...) as staged: # mkdtemp work_root, write input
overrides = DocumentMetadataOverrides(... per-source ...)
result = enqueue_consumption(staged.input_doc, overrides) # folder/API/version
# or: sig = build_consume_signature(...); chord.append(sig) # mail
result = ingest.enqueue_consumption(staged.input_doc, overrides)
staged.release() # ownership → task
# __exit__: rmtree(work_root) ONLY if release() never ran (pre-dispatch failure)
# (folder source: no stage_document; ConsumableDocument(staging_dir=None) + enqueue_consumption)
mail site (synchronous, BATCH dispatch — see "Mail ownership boundary")
with ExitStack() as staging_stack: # owns ALL of the message's staged docs
build N signatures via ingest.build_consume_signature(... keyword args ...)
queue_consumption_tasks(...) # one chord(...).delay()
release() every staged doc # only after the chord dispatches
# __exit__: rmtree any un-released work_root (a chord-dispatch failure cleans the whole batch)
consume_file task (async, later)
work_root = input_doc.staging_dir or TemporaryDirectory(SCRATCH_DIR)
@@ -223,15 +286,32 @@ consume_file task (async, later)
## Error handling & edges
- **Double-sided collation** preserves the first half across tasks: it stops with
`StopConsumeTaskError` to await the second half, but copies that content out to
its own staging file (`double_sided.py:74`) **before** the stop. Removing the
parent `work_root` afterward is therefore safe. The plan MUST verify this
copy-out-precedes-stop ordering, since it is load-bearing for the cleanup rule.
- **Double-sided collation — safe, but outside the work_root model.** It stops
with `StopConsumeTaskError` to await the second half, and preserves that half by
**`shutil.move(pdf_file, staging)`** to `SCRATCH_DIR/<staging-name>`
(`double_sided.py:~134`) — a _move_, to a location _outside_ any work_root,
performed **before** the stop. So `rmtree`-ing the parent work_root afterward is
safe (the half already left the tree). Two consequences the plan must honor:
(a) the preserved staging file lives in `SCRATCH_DIR`, is **never** covered by
the per-document cleanup, and is cleaned by the second-half collate
(`staging.unlink()`) or timeout as today — the "one root" framing does not
extend to it; (b) the plan must verify the move-precedes-stop ordering, since it
is load-bearing for the cleanup rule.
- **`ConsumerPlugin`'s own cleanup becomes partly redundant.** On success it
unlinks `original_file` and `working_copy` (`consumer.py:742/744`), both of
which now live inside work_root that the task `finally` `rmtree`s. The redundant
unlinks are harmless but the plan should remove them for clarity, while keeping
the qpdf `--replace-input` recovery (`unmodified_original`, `consumer.py:452+`)
working when `working_copy` lives under work_root.
- **Folder source is intrinsically asymmetric** — its original lives in the
watched dir, not a work_root. The "one root" model fully applies to staged
sources; folder gets in-place-original (cleaned by `ConsumerPlugin` on success)
plus an isolated per-task working root. This is correct, not a gap.
- **`staging_dir is None` must be a strict no-op.** Many integration tests call
the real `consume_file` with hand-built `ConsumableDocument`s that never set
`staging_dir` (`test_workflows.py`, `test_barcodes.py`, `test_double_sided.py`).
The new work_root/`finally` logic must reduce to exactly today's behavior when
`staging_dir is None`, or those currently-passing tests regress.
- **Duplicate/stop are not failures.** The worker `finally` cleans `work_root` on
every terminal path, but a future quarantine feature (below) would relocate the
input only on a genuine exception, never on `ConsumeFileDuplicateError` or
@@ -251,17 +331,40 @@ New `src/documents/tests/` unit tests for `ingest.py` (pytest-style classes,
- `consume_file` removes `staging_dir` on success, on `StopConsumeTaskError`, on
duplicate, and on exception; and does nothing destructive when `staging_dir`
is `None` (folder source) beyond today's behavior.
- The `trigger_source` header survives `Signature.set(headers=...).apply_async()`
**and** chord dispatch (a guard against chord wrapping dropping per-signature
headers — the one path where header propagation could silently break).
Existing tests — the single-seam payoff and its migration cost:
Existing tests — the migration is centralized but **not** trivial:
- `ConsumeTaskMixin` (`tests/utils.py:249`) repoints its patch from
`documents.tasks.consume_file.apply_async` to the new `enqueue_consumption`
seam, and its assert helper inspects `(input_doc, overrides)` directly. Its ~40
API tests then pass unchanged through the mixin.
- `ConsumeTaskMixin` (`tests/utils.py:242-280`): repoint the patch from
`documents.tasks.consume_file.apply_async` to `documents.ingest.enqueue_consumption`,
**and rewrite both assert helpers** — they currently read the raw `apply_async`
shape `call_args.kwargs["kwargs"]["input_doc"]` (`assert_queue_consumption_task_call_args`
at :259 and `get_all_consume_task_call_args` at :267). With the seam called
positionally as `enqueue_consumption(input_doc, overrides)`, those become
`call_args.args[0]/[1]`. This is concentrated in the mixin, so its ~15 helper
call sites in `test_api_documents.py` + 1 in `test_barcodes.py` pass once the
helpers are fixed — but it is a helper rewrite, not a one-line change.
- The consumer-folder tests (`test_management_consumer.py`, ~15 methods) repoint
`mock_consume_file_delay` from the module-local `consume_file` to the seam.
- Mail tests that patch `queue_consumption_tasks` stay untouched (the chord
boundary is preserved).
`mock_consume_file_delay` to the seam.
- `test_api_document_versions.py` (3 tests) patches the **module-local**
`documents.views.consume_file` — repoint to `documents.views`-qualified usage or
the central seam.
- Real-task integration tests that build `ConsumableDocument`s by hand and call
`consume_file` directly (`test_workflows.py` ~15, `test_barcodes.py` ~5,
`test_double_sided.py` ~9) exercise the `staging_dir is None` path; they should
stay green **iff** that path is a strict no-op (see Error handling).
- Mail tests that patch `queue_consumption_tasks` stay untouched **only if**
`build_consume_signature` uses keyword args (above); otherwise their assertions
on `Signature.kwargs` (`test_mail.py`, `test_mail_nfc.py`, `test_preprocessor.py`,
~15 methods) break.
**Realistic blast radius: ~7090 in-scope test methods** route through the
changed seams (the export-style "one patch point" still holds, but the helper
rewrite + keyword-arg + module-qualified constraints are what make it true). This
excludes `bulk_edit.py`'s ~35 tests, which are deferred with their migration to
the bulk-edit phase-2 plan.
## Enabled future work (not built here)
@@ -284,8 +387,18 @@ if it is wanted.
- **Cleanup must run on all terminal paths.** The worker `finally` must cover
success, `StopConsumeTaskError`, `ConsumeFileDuplicateError`, and unexpected
exceptions, or the leak reappears. Covered by the `consume_file` tests above.
- **Test-seam migration churn.** ~55 existing tests route through the two patch
points being repointed. Mitigation: the changes are concentrated in
`ConsumeTaskMixin` and the consumer test fixture, not spread across every test.
- **Double-sided ordering.** The copy-out-before-stop assumption must be verified
in the plan before relying on it for cleanup.
- **Test-seam migration churn.** ~7090 in-scope test methods route through the
changed seams. Mitigation: concentrated in `ConsumeTaskMixin` (helper rewrite)
and a few fixtures — but it is a helper rewrite plus a keyword-arg and a
module-qualified-call contract, not a one-line repoint. The plan must encode all
three constraints or the "single patch point" promise is false.
- **Mail batch ownership.** The `ExitStack` boundary (release all only after the
chord dispatches; rmtree-all on dispatch failure) is load-bearing; getting it
per-attachment instead reopens the leak for the whole message.
- **Double-sided ordering.** The move-precedes-stop assumption
(`shutil.move` to `SCRATCH_DIR` at `double_sided.py:~134`) must be verified in
the plan before relying on it for cleanup.
- **`bulk_edit.py` is deferred, not done.** Until the phase-2 plan migrates its 8
dispatch sites, the "single canonical seam" is partial: bulk merge/split/version
still call `consume_file` directly. The spec states this honestly rather than
implying full unification.