feat: compute produce_archive from ARCHIVE_FILE_GENERATION, pass to parser

Add _extract_text_for_archive_check() and _should_produce_archive() helper
functions to documents/consumer.py. These compute whether the parser should
produce a PDF/A archive based on the ARCHIVE_FILE_GENERATION setting (always/
never/auto), parser capabilities (can_produce_archive, requires_pdf_rendition),
MIME type, and pdftotext-based born-digital detection for auto mode.

Update the parse() call site to compute and pass produce_archive=... kwarg.
Add 10 unit tests in test_consumer_archive.py; update two existing consumer
tests that asserted run_subprocess call counts now that pdftotext is invoked
during auto-mode archive detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Trenton H
2026-03-26 14:27:42 -07:00
parent 6aedcf9026
commit dc74e2176f
3 changed files with 263 additions and 4 deletions

View File

@@ -1,5 +1,6 @@
import datetime
import hashlib
import logging
import os
import shutil
import tempfile
@@ -50,6 +51,8 @@ from documents.templating.workflows import parse_w_workflow_placeholders
from documents.utils import copy_basic_file_stats
from documents.utils import copy_file_with_basic_stats
from documents.utils import run_subprocess
from paperless.config import OcrConfig
from paperless.models import ArchiveFileGenerationChoices
from paperless.parsers import ParserContext
from paperless.parsers import ParserProtocol
from paperless.parsers.registry import get_parser_registry
@@ -105,6 +108,70 @@ class ConsumerStatusShortMessage(StrEnum):
FAILED = "failed"
_VALID_TEXT_LENGTH_FOR_ARCHIVE_CHECK = 50
def _extract_text_for_archive_check(path: Path) -> str | None:
"""Run pdftotext on *path* and return the text, or None on any failure.
Used only for the ARCHIVE_FILE_GENERATION=auto born-digital detection.
"""
try:
with tempfile.TemporaryDirectory() as tmpdir:
out_path = Path(tmpdir) / "text.txt"
run_subprocess(
[
"pdftotext",
"-q",
"-layout",
"-enc",
"UTF-8",
str(path),
str(out_path),
],
logger=logging.getLogger(__name__),
)
return out_path.read_text(encoding="utf-8", errors="replace") or None
except Exception:
return None
def _should_produce_archive(
parser: "ParserProtocol",
mime_type: str,
working_copy: Path,
) -> bool:
"""Return True if the consumer should request a PDF/A archive from the parser.
IMPORTANT: *parser* must be an instantiated parser, not the class.
``requires_pdf_rendition`` and ``can_produce_archive`` are instance
``@property`` methods — accessing them on the class returns the descriptor
(always truthy).
"""
# Must produce a PDF so the frontend can display the original format at all.
if parser.requires_pdf_rendition:
return True
# Parser cannot produce an archive (e.g. TextDocumentParser).
if not parser.can_produce_archive:
return False
generation = OcrConfig().archive_file_generation
if generation == ArchiveFileGenerationChoices.ALWAYS:
return True
if generation == ArchiveFileGenerationChoices.NEVER:
return False
# auto: produce archives for scanned/image documents; skip for born-digital PDFs.
if mime_type.startswith("image/"):
return True
if mime_type == "application/pdf":
text = _extract_text_for_archive_check(working_copy)
return text is None or len(text) <= _VALID_TEXT_LENGTH_FOR_ARCHIVE_CHECK
return False
class ConsumerPluginMixin:
if TYPE_CHECKING:
from logging import Logger
@@ -440,7 +507,16 @@ class ConsumerPlugin(
)
self.log.debug(f"Parsing {self.filename}...")
document_parser.parse(self.working_copy, mime_type)
should_produce_archive = _should_produce_archive(
document_parser,
mime_type,
self.working_copy,
)
document_parser.parse(
self.working_copy,
mime_type,
produce_archive=should_produce_archive,
)
self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(

View File

@@ -1126,6 +1126,7 @@ class TestConsumer(
mock_mail_parser_parse.assert_called_once_with(
consumer.working_copy,
"message/rfc822",
produce_archive=True,
)
@@ -1273,7 +1274,14 @@ class PreConsumeTestCase(DirectoriesMixin, GetConsumerMixin, TestCase):
def test_no_pre_consume_script(self, m) -> None:
with self.get_consumer(self.test_file) as c:
c.run()
m.assert_not_called()
# Verify no pre-consume script subprocess was invoked
# (run_subprocess may still be called by _extract_text_for_archive_check)
script_calls = [
call
for call in m.call_args_list
if call.args and call.args[0] and call.args[0][0] not in ("pdftotext",)
]
self.assertEqual(script_calls, [])
@mock.patch("documents.consumer.run_subprocess")
@override_settings(PRE_CONSUME_SCRIPT="does-not-exist")
@@ -1289,9 +1297,16 @@ class PreConsumeTestCase(DirectoriesMixin, GetConsumerMixin, TestCase):
with self.get_consumer(self.test_file) as c:
c.run()
m.assert_called_once()
self.assertTrue(m.called)
args, _ = m.call_args
# Find the call that invoked the pre-consume script
# (run_subprocess may also be called by _extract_text_for_archive_check)
script_call = next(
call
for call in m.call_args_list
if call.args and call.args[0] and call.args[0][0] == script.name
)
args, _ = script_call
command = args[0]
environment = args[1]

View File

@@ -0,0 +1,168 @@
"""Tests for _should_produce_archive() and _extract_text_for_archive_check()."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from django.test import override_settings
from documents.consumer import _should_produce_archive
def _parser_instance(
*,
can_produce: bool = True,
requires_rendition: bool = False,
) -> MagicMock:
"""Return a mock parser instance with the given capability flags."""
instance = MagicMock()
instance.can_produce_archive = can_produce
instance.requires_pdf_rendition = requires_rendition
return instance
@pytest.fixture()
def null_app_config(mocker) -> MagicMock:
"""Mock ApplicationConfiguration with all fields None → falls back to Django settings."""
return mocker.MagicMock(
output_type=None,
pages=None,
language=None,
mode=None,
archive_file_generation=None,
image_dpi=None,
unpaper_clean=None,
deskew=None,
rotate_pages=None,
rotate_pages_threshold=None,
max_image_pixels=None,
color_conversion_strategy=None,
user_args=None,
)
@pytest.fixture(autouse=True)
def patch_app_config(mocker, null_app_config):
"""Patch BaseConfig._get_config_instance for all tests in this module."""
mocker.patch(
"paperless.config.BaseConfig._get_config_instance",
return_value=null_app_config,
)
class TestShouldProduceArchive:
@override_settings(ARCHIVE_FILE_GENERATION="never")
def test_never_setting_returns_false(self) -> None:
parser = _parser_instance(can_produce=True, requires_rendition=False)
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/doc.pdf"),
)
assert result is False
@override_settings(ARCHIVE_FILE_GENERATION="always")
def test_always_setting_returns_true(self) -> None:
parser = _parser_instance(can_produce=True, requires_rendition=False)
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/doc.pdf"),
)
assert result is True
@override_settings(ARCHIVE_FILE_GENERATION="never")
def test_requires_pdf_rendition_overrides_never(self) -> None:
"""requires_pdf_rendition=True forces archive even when setting is never."""
parser = _parser_instance(can_produce=True, requires_rendition=True)
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/doc.pdf"),
)
assert result is True
@override_settings(ARCHIVE_FILE_GENERATION="always")
def test_cannot_produce_archive_overrides_always(self) -> None:
"""can_produce_archive=False prevents archive even when setting is always."""
parser = _parser_instance(can_produce=False, requires_rendition=False)
result = _should_produce_archive(parser, "text/plain", Path("/tmp/doc.txt"))
assert result is False
@override_settings(ARCHIVE_FILE_GENERATION="auto")
def test_auto_image_returns_true(self) -> None:
"""auto mode: image/* MIME types always produce archive (scanned doc)."""
parser = _parser_instance(can_produce=True, requires_rendition=False)
result = _should_produce_archive(parser, "image/tiff", Path("/tmp/scan.tiff"))
assert result is True
@override_settings(ARCHIVE_FILE_GENERATION="auto")
def test_auto_born_digital_pdf_returns_false(self) -> None:
"""auto mode: PDF with substantial text (born-digital) skips archive."""
parser = _parser_instance(can_produce=True, requires_rendition=False)
long_text = "This is a born-digital PDF with lots of text content. " * 10
with patch(
"documents.consumer._extract_text_for_archive_check",
return_value=long_text,
):
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/doc.pdf"),
)
assert result is False
@override_settings(ARCHIVE_FILE_GENERATION="auto")
def test_auto_scanned_pdf_no_text_returns_true(self) -> None:
"""auto mode: PDF where pdftotext returns None (scanned) produces archive."""
parser = _parser_instance(can_produce=True, requires_rendition=False)
with patch(
"documents.consumer._extract_text_for_archive_check",
return_value=None,
):
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/scan.pdf"),
)
assert result is True
@override_settings(ARCHIVE_FILE_GENERATION="auto")
def test_auto_pdf_short_text_returns_true(self) -> None:
"""auto mode: PDF with very short text (<=50 chars) is treated as scanned."""
parser = _parser_instance(can_produce=True, requires_rendition=False)
with patch(
"documents.consumer._extract_text_for_archive_check",
return_value="tiny",
):
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/scan.pdf"),
)
assert result is True
@override_settings(ARCHIVE_FILE_GENERATION="auto")
def test_auto_non_pdf_non_image_returns_false(self) -> None:
"""auto mode: other MIME types (e.g. email) don't produce archive by default."""
parser = _parser_instance(can_produce=True, requires_rendition=False)
result = _should_produce_archive(
parser,
"message/rfc822",
Path("/tmp/email.eml"),
)
assert result is False
@override_settings(ARCHIVE_FILE_GENERATION="always")
def test_requires_rendition_with_can_produce_false_returns_true(self) -> None:
"""requires_pdf_rendition=True always wins, even if can_produce_archive=False."""
parser = _parser_instance(can_produce=False, requires_rendition=True)
result = _should_produce_archive(
parser,
"application/pdf",
Path("/tmp/doc.pdf"),
)
assert result is True