Compare commits

...

1 Commits

Author SHA1 Message Date
Trenton H
cfed67fc0a Initial implementation of the new sanity checker 2026-02-26 16:18:13 -08:00
6 changed files with 856 additions and 306 deletions

View File

@@ -1,17 +1,97 @@
from django.core.management.base import BaseCommand
"""Management command to check the document archive for issues."""
from documents.management.commands.mixins import ProgressBarMixin
from __future__ import annotations
import logging
from typing import Any
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from documents.management.commands.base import PaperlessCommand
from documents.models import Document
from documents.sanity_checker import SanityCheckMessages
from documents.sanity_checker import check_sanity
_LEVEL_STYLE: dict[int, tuple[str, str]] = {
logging.ERROR: ("bold red", "ERROR"),
logging.WARNING: ("yellow", "WARN"),
logging.INFO: ("dim", "INFO"),
}
class Command(ProgressBarMixin, BaseCommand):
class Command(PaperlessCommand):
help = "This command checks your document archive for issues."
def add_arguments(self, parser):
self.add_argument_progress_bar_mixin(parser)
def _render_results(self, messages: SanityCheckMessages) -> None:
"""Render sanity check results as a Rich table."""
console = self.console
def handle(self, *args, **options):
self.handle_progress_bar_mixin(**options)
messages = check_sanity(progress=self.use_progress_bar, scheduled=False)
if len(messages) == 0:
console.print(
Panel(
"[green]No issues detected.[/green]",
title="Sanity Check",
border_style="green",
),
)
return
messages.log_messages()
# Build a lookup for document titles
doc_pks = [pk for pk in messages.document_pks() if pk is not None]
titles: dict[int, str] = {}
if doc_pks:
titles = dict(
Document.global_objects.filter(pk__in=doc_pks)
.only("pk", "title")
.values_list("pk", "title"),
)
table = Table(
title="Sanity Check Results",
show_lines=True,
title_style="bold",
)
table.add_column("Level", width=7, no_wrap=True)
table.add_column("Document", min_width=20)
table.add_column("Issue", ratio=1)
for doc_pk, doc_messages in messages.iter_messages():
if doc_pk is not None:
title = titles.get(doc_pk, "Unknown")
doc_label = f"#{doc_pk} {title}"
else:
doc_label = "(global)"
for msg in doc_messages:
style, label = _LEVEL_STYLE.get(
msg["level"],
("dim", "INFO"),
)
table.add_row(
Text(label, style=style),
doc_label,
msg["message"],
)
console.print(table)
# Summary line
parts: list[str] = []
if messages.has_error:
parts.append("[bold red]errors[/bold red]")
if messages.has_warning:
parts.append("[yellow]warnings[/yellow]")
summary = " and ".join(parts) if parts else "infos"
console.print(f"\nFound {len(messages)} document(s) with {summary}.")
def handle(self, *args: Any, **options: Any) -> None:
messages = check_sanity(
scheduled=False,
iter_wrapper=lambda docs: self.track(
docs,
description="Checking documents...",
),
)
self._render_results(messages)

View File

@@ -1,80 +1,141 @@
"""
Sanity checker for the Paperless-ngx document archive.
Verifies that all documents have valid files, correct checksums,
and consistent metadata. Reports orphaned files in the media directory.
Progress display is the caller's responsibility -- pass an ``iter_wrapper``
to wrap the document queryset (e.g., with a progress bar). The default
is an identity function that adds no overhead.
"""
from __future__ import annotations
import hashlib
import logging
import uuid
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Final
from typing import TypedDict
from typing import TypeVar
from celery import states
from django.conf import settings
from django.utils import timezone
from tqdm import tqdm
from documents.models import Document
from documents.models import PaperlessTask
from paperless.config import GeneralConfig
logger = logging.getLogger("paperless.sanity_checker")
_T = TypeVar("_T")
IterWrapper = Callable[[Iterable[_T]], Iterable[_T]]
class MessageEntry(TypedDict):
"""A single sanity check message with its severity level."""
level: int
message: str
def _identity(iterable: Iterable[_T]) -> Iterable[_T]:
"""Pass through an iterable unchanged (default iter_wrapper)."""
return iterable
class SanityCheckMessages:
def __init__(self) -> None:
self._messages: dict[int, list[dict]] = defaultdict(list)
self.has_error = False
self.has_warning = False
"""Collects sanity check messages grouped by document primary key.
def error(self, doc_pk, message) -> None:
Messages are categorized as error, warning, or info. ``None`` is used
as the key for messages not associated with a specific document
(e.g., orphaned files).
"""
def __init__(self) -> None:
self._messages: dict[int | None, list[MessageEntry]] = defaultdict(list)
self.has_error: bool = False
self.has_warning: bool = False
# -- Recording ----------------------------------------------------------
def error(self, doc_pk: int | None, message: str) -> None:
self._messages[doc_pk].append({"level": logging.ERROR, "message": message})
self.has_error = True
def warning(self, doc_pk, message) -> None:
def warning(self, doc_pk: int | None, message: str) -> None:
self._messages[doc_pk].append({"level": logging.WARNING, "message": message})
self.has_warning = True
def info(self, doc_pk, message) -> None:
def info(self, doc_pk: int | None, message: str) -> None:
self._messages[doc_pk].append({"level": logging.INFO, "message": message})
def log_messages(self) -> None:
logger = logging.getLogger("paperless.sanity_checker")
# -- Iteration / query --------------------------------------------------
if len(self._messages) == 0:
logger.info("Sanity checker detected no issues.")
else:
# Query once
all_docs = Document.global_objects.all()
def document_pks(self) -> list[int | None]:
"""Return all document PKs (including None for global messages)."""
return list(self._messages.keys())
for doc_pk in self._messages:
if doc_pk is not None:
doc = all_docs.get(pk=doc_pk)
logger.info(
f"Detected following issue(s) with document #{doc.pk},"
f" titled {doc.title}",
)
for msg in self._messages[doc_pk]:
logger.log(msg["level"], msg["message"])
def iter_messages(self) -> Iterator[tuple[int | None, list[MessageEntry]]]:
"""Iterate over (doc_pk, messages) pairs."""
yield from self._messages.items()
def __len__(self):
def __len__(self) -> int:
return len(self._messages)
def __getitem__(self, item):
def __getitem__(self, item: int | None) -> list[MessageEntry]:
return self._messages[item]
# -- Logging output (used by Celery task path) --------------------------
def log_messages(self) -> None:
"""Write all messages to the ``paperless.sanity_checker`` logger.
This is the output path for headless / Celery execution.
Management commands use Rich rendering instead.
"""
if len(self._messages) == 0:
logger.info("Sanity checker detected no issues.")
return
doc_pks = [pk for pk in self._messages if pk is not None]
titles: dict[int, str] = {}
if doc_pks:
titles = dict(
Document.global_objects.filter(pk__in=doc_pks)
.only("pk", "title")
.values_list("pk", "title"),
)
for doc_pk, entries in self._messages.items():
if doc_pk is not None:
title = titles.get(doc_pk, "Unknown")
logger.info(
"Detected following issue(s) with document #%s, titled %s",
doc_pk,
title,
)
for msg in entries:
logger.log(msg["level"], msg["message"])
class SanityCheckFailedException(Exception):
pass
def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages:
paperless_task = PaperlessTask.objects.create(
task_id=uuid.uuid4(),
type=PaperlessTask.TaskType.SCHEDULED_TASK
if scheduled
else PaperlessTask.TaskType.MANUAL_TASK,
task_name=PaperlessTask.TaskName.CHECK_SANITY,
status=states.STARTED,
date_created=timezone.now(),
date_started=timezone.now(),
)
messages = SanityCheckMessages()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _build_present_files() -> set[Path]:
"""Collect all files in MEDIA_ROOT, excluding directories and ignorable files."""
present_files = {
x.resolve()
for x in Path(settings.MEDIA_ROOT).glob("**/*")
@@ -82,95 +143,167 @@ def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages:
}
lockfile = Path(settings.MEDIA_LOCK).resolve()
if lockfile in present_files:
present_files.remove(lockfile)
present_files.discard(lockfile)
general_config = GeneralConfig()
app_logo = general_config.app_logo or settings.APP_LOGO
if app_logo:
logo_file = Path(settings.MEDIA_ROOT / Path(app_logo.lstrip("/"))).resolve()
if logo_file in present_files:
present_files.remove(logo_file)
present_files.discard(logo_file)
for doc in tqdm(Document.global_objects.all(), disable=not progress):
# Check sanity of the thumbnail
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
if not thumbnail_path.exists() or not thumbnail_path.is_file():
messages.error(doc.pk, "Thumbnail of document does not exist.")
else:
if thumbnail_path in present_files:
present_files.remove(thumbnail_path)
try:
_ = thumbnail_path.read_bytes()
except OSError as e:
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
return present_files
# Check sanity of the original file
# TODO: extract method
source_path: Final[Path] = Path(doc.source_path).resolve()
if not source_path.exists() or not source_path.is_file():
messages.error(doc.pk, "Original of document does not exist.")
else:
if source_path in present_files:
present_files.remove(source_path)
try:
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(doc.pk, f"Cannot read original file of document: {e}")
else:
if checksum != doc.checksum:
messages.error(
doc.pk,
"Checksum mismatch. "
f"Stored: {doc.checksum}, actual: {checksum}.",
)
# Check sanity of the archive file.
if doc.archive_checksum is not None and doc.archive_filename is None:
def _check_thumbnail(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify the thumbnail exists and is readable."""
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
if not thumbnail_path.exists() or not thumbnail_path.is_file():
messages.error(doc.pk, "Thumbnail of document does not exist.")
return
present_files.discard(thumbnail_path)
try:
_ = thumbnail_path.read_bytes()
except OSError as e:
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
def _check_original(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify the original file exists, is readable, and has matching checksum."""
source_path: Final[Path] = Path(doc.source_path).resolve()
if not source_path.exists() or not source_path.is_file():
messages.error(doc.pk, "Original of document does not exist.")
return
present_files.discard(source_path)
try:
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(doc.pk, f"Cannot read original file of document: {e}")
else:
if checksum != doc.checksum:
messages.error(
doc.pk,
"Document has an archive file checksum, but no archive filename.",
f"Checksum mismatch. Stored: {doc.checksum}, actual: {checksum}.",
)
elif doc.archive_checksum is None and doc.archive_filename is not None:
def _check_archive(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify archive file consistency: checksum/filename pairing and file integrity."""
if doc.archive_checksum is not None and doc.archive_filename is None:
messages.error(
doc.pk,
"Document has an archive file checksum, but no archive filename.",
)
elif doc.archive_checksum is None and doc.archive_filename is not None:
messages.error(
doc.pk,
"Document has an archive file, but its checksum is missing.",
)
elif doc.has_archive_version:
if TYPE_CHECKING:
assert isinstance(doc.archive_path, Path)
archive_path: Final[Path] = Path(doc.archive_path).resolve()
if not archive_path.exists() or not archive_path.is_file():
messages.error(doc.pk, "Archived version of document does not exist.")
return
present_files.discard(archive_path)
try:
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(
doc.pk,
"Document has an archive file, but its checksum is missing.",
f"Cannot read archive file of document : {e}",
)
elif doc.has_archive_version:
archive_path: Final[Path] = Path(doc.archive_path).resolve()
if not archive_path.exists() or not archive_path.is_file():
messages.error(doc.pk, "Archived version of document does not exist.")
else:
if archive_path in present_files:
present_files.remove(archive_path)
try:
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(
doc.pk,
f"Cannot read archive file of document : {e}",
)
else:
if checksum != doc.archive_checksum:
messages.error(
doc.pk,
"Checksum mismatch of archived document. "
f"Stored: {doc.archive_checksum}, "
f"actual: {checksum}.",
)
else:
if checksum != doc.archive_checksum:
messages.error(
doc.pk,
"Checksum mismatch of archived document. "
f"Stored: {doc.archive_checksum}, actual: {checksum}.",
)
# other document checks
if not doc.content:
messages.info(doc.pk, "Document contains no OCR data")
def _check_content(doc: Document, messages: SanityCheckMessages) -> None:
"""Flag documents with no OCR content."""
if not doc.content:
messages.info(doc.pk, "Document contains no OCR data")
def _check_document(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Run all checks for a single document."""
_check_thumbnail(doc, messages, present_files)
_check_original(doc, messages, present_files)
_check_archive(doc, messages, present_files)
_check_content(doc, messages)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def check_sanity(
*,
scheduled: bool = True,
iter_wrapper: IterWrapper[Document] = _identity,
) -> SanityCheckMessages:
"""Run a full sanity check on the document archive.
Args:
scheduled: Whether this is a scheduled (automatic) or manual check.
Controls the task type recorded in the database.
iter_wrapper: A callable that wraps the document iterable, e.g.,
for progress bar display. Defaults to identity (no wrapping).
Returns:
A SanityCheckMessages instance containing all detected issues.
"""
paperless_task = PaperlessTask.objects.create(
task_id=uuid.uuid4(),
type=(
PaperlessTask.TaskType.SCHEDULED_TASK
if scheduled
else PaperlessTask.TaskType.MANUAL_TASK
),
task_name=PaperlessTask.TaskName.CHECK_SANITY,
status=states.STARTED,
date_created=timezone.now(),
date_started=timezone.now(),
)
messages = SanityCheckMessages()
present_files = _build_present_files()
documents = Document.global_objects.all()
for doc in iter_wrapper(documents):
_check_document(doc, messages, present_files)
for extra_file in present_files:
messages.warning(None, f"Orphaned file in media dir: {extra_file}")
paperless_task.status = states.SUCCESS if not messages.has_error else states.FAILURE
# result is concatenated messages
paperless_task.result = f"{len(messages)} issues found."
if messages.has_error:
paperless_task.result += " Check logs for details."
paperless_task.date_done = timezone.now()
paperless_task.save(update_fields=["status", "result", "date_done"])
return messages

View File

@@ -1,10 +1,96 @@
import shutil
import zoneinfo
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
import filelock
import pytest
from django.contrib.auth import get_user_model
from pytest_django.fixtures import SettingsWrapper
from rest_framework.test import APIClient
from documents.tests.factories import DocumentFactory
if TYPE_CHECKING:
from documents.models import Document
@dataclass(frozen=True, slots=True)
class PaperlessDirs:
"""Standard Paperless-ngx directory layout for tests."""
media: Path
originals: Path
archive: Path
thumbnails: Path
@pytest.fixture(scope="session")
def samples_dir() -> Path:
"""Path to the shared test sample documents."""
return Path(__file__).parent / "samples" / "documents"
@pytest.fixture()
def paperless_dirs(tmp_path: Path) -> PaperlessDirs:
"""Create and return the directory structure for testing."""
media = tmp_path / "media"
dirs = PaperlessDirs(
media=media,
originals=media / "documents" / "originals",
archive=media / "documents" / "archive",
thumbnails=media / "documents" / "thumbnails",
)
for d in (dirs.originals, dirs.archive, dirs.thumbnails):
d.mkdir(parents=True)
return dirs
@pytest.fixture()
def _media_settings(paperless_dirs: PaperlessDirs, settings) -> None:
"""Configure Django settings to point at temp directories."""
settings.MEDIA_ROOT = paperless_dirs.media
settings.ORIGINALS_DIR = paperless_dirs.originals
settings.ARCHIVE_DIR = paperless_dirs.archive
settings.THUMBNAIL_DIR = paperless_dirs.thumbnails
settings.MEDIA_LOCK = paperless_dirs.media / "media.lock"
settings.IGNORABLE_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
settings.APP_LOGO = ""
@pytest.fixture()
def sample_doc(
paperless_dirs: PaperlessDirs,
_media_settings: None,
samples_dir: Path,
) -> "Document":
"""Create a document with valid files and matching checksums."""
with filelock.FileLock(paperless_dirs.media / "media.lock"):
shutil.copy(
samples_dir / "originals" / "0000001.pdf",
paperless_dirs.originals / "0000001.pdf",
)
shutil.copy(
samples_dir / "archive" / "0000001.pdf",
paperless_dirs.archive / "0000001.pdf",
)
shutil.copy(
samples_dir / "thumbnails" / "0000001.webp",
paperless_dirs.thumbnails / "0000001.webp",
)
return DocumentFactory(
title="test",
checksum="42995833e01aea9b3edee44bbfdd7ce1",
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
content="test content",
pk=1,
filename="0000001.pdf",
mime_type="application/pdf",
archive_filename="0000001.pdf",
)
@pytest.fixture()
def settings_timezone(settings: SettingsWrapper) -> zoneinfo.ZoneInfo:

View File

@@ -0,0 +1,173 @@
"""Tests for the document_sanity_checker management command.
Verifies Rich rendering (table, panel, summary) and end-to-end CLI behavior.
"""
from __future__ import annotations
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from django.core.management import call_command
from rich.console import Console
from documents.management.commands.document_sanity_checker import Command
from documents.sanity_checker import SanityCheckMessages
from documents.tests.factories import DocumentFactory
if TYPE_CHECKING:
from documents.models import Document
from documents.tests.conftest import PaperlessDirs
def _render_to_string(messages: SanityCheckMessages) -> str:
"""Render command output to a plain string for assertion."""
buf = StringIO()
cmd = Command()
cmd.console = Console(file=buf, width=120, no_color=True)
cmd._render_results(messages)
return buf.getvalue()
# ---------------------------------------------------------------------------
# Rich rendering
# ---------------------------------------------------------------------------
class TestRenderResultsNoIssues:
"""No DB access needed -- renders an empty SanityCheckMessages."""
def test_shows_panel(self) -> None:
output = _render_to_string(SanityCheckMessages())
assert "No issues detected" in output
assert "Sanity Check" in output
@pytest.mark.django_db
class TestRenderResultsWithIssues:
def test_error_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "Original missing")
output = _render_to_string(msgs)
assert "Sanity Check Results" in output
assert "ERROR" in output
assert "Original missing" in output
assert f"#{sample_doc.pk}" in output
assert sample_doc.title in output
def test_warning_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.warning(sample_doc.pk, "Suspicious file")
output = _render_to_string(msgs)
assert "WARN" in output
assert "Suspicious file" in output
def test_info_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.info(sample_doc.pk, "No OCR data")
output = _render_to_string(msgs)
assert "INFO" in output
assert "No OCR data" in output
@pytest.mark.usefixtures("_media_settings")
def test_global_message(self) -> None:
msgs = SanityCheckMessages()
msgs.warning(None, "Orphaned file: /tmp/stray.pdf")
output = _render_to_string(msgs)
assert "(global)" in output
assert "Orphaned file" in output
def test_multiple_messages_same_doc(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "Thumbnail missing")
msgs.error(sample_doc.pk, "Checksum mismatch")
output = _render_to_string(msgs)
assert "Thumbnail missing" in output
assert "Checksum mismatch" in output
@pytest.mark.usefixtures("_media_settings")
def test_unknown_doc_pk(self) -> None:
msgs = SanityCheckMessages()
msgs.error(99999, "Ghost document")
output = _render_to_string(msgs)
assert "#99999" in output
assert "Unknown" in output
@pytest.mark.django_db
class TestRenderResultsSummary:
def test_errors_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "broken")
output = _render_to_string(msgs)
assert "errors" in output
assert "Found 1 document(s)" in output
def test_warnings_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.warning(sample_doc.pk, "odd")
output = _render_to_string(msgs)
assert "warnings" in output
def test_errors_and_warnings(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "broken")
msgs.warning(None, "orphan")
output = _render_to_string(msgs)
assert "errors" in output
assert "warnings" in output
assert "Found 2 document(s)" in output
def test_infos_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.info(sample_doc.pk, "no OCR")
output = _render_to_string(msgs)
assert "infos" in output
# ---------------------------------------------------------------------------
# End-to-end command execution
# ---------------------------------------------------------------------------
@pytest.mark.django_db
@pytest.mark.management
class TestDocumentSanityCheckerCommand:
def test_no_issues(self, sample_doc: Document) -> None:
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
assert "No issues detected" in out.getvalue()
@pytest.mark.usefixtures("_media_settings")
def test_no_issues_empty_archive(self) -> None:
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
assert "No issues detected" in out.getvalue()
def test_missing_original(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
output = out.getvalue()
assert "ERROR" in output
assert "Original of document does not exist" in output
@pytest.mark.usefixtures("_media_settings")
def test_checksum_mismatch(self, paperless_dirs: PaperlessDirs) -> None:
"""Lightweight document with zero-byte files triggers checksum mismatch."""
doc = DocumentFactory(
title="test",
content="test",
filename="test.pdf",
checksum="abc",
)
Path(doc.source_path).touch()
Path(doc.thumbnail_path).touch()
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
output = out.getvalue()
assert "ERROR" in output
assert "Checksum mismatch. Stored: abc, actual:" in output

View File

@@ -134,6 +134,7 @@ class TestRenamer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
self.assertIsFile(doc2.archive_path)
@pytest.mark.management
class TestCreateClassifier(TestCase):
@mock.patch(
"documents.management.commands.document_create_classifier.train_classifier",
@@ -144,32 +145,6 @@ class TestCreateClassifier(TestCase):
m.assert_called_once()
@pytest.mark.management
class TestSanityChecker(DirectoriesMixin, TestCase):
def test_no_issues(self) -> None:
with self.assertLogs() as capture:
call_command("document_sanity_checker")
self.assertEqual(len(capture.output), 1)
self.assertIn("Sanity checker detected no issues.", capture.output[0])
def test_errors(self) -> None:
doc = Document.objects.create(
title="test",
content="test",
filename="test.pdf",
checksum="abc",
)
Path(doc.source_path).touch()
Path(doc.thumbnail_path).touch()
with self.assertLogs() as capture:
call_command("document_sanity_checker")
self.assertEqual(len(capture.output), 2)
self.assertIn("Checksum mismatch. Stored: abc, actual:", capture.output[1])
@pytest.mark.management
class TestConvertMariaDBUUID(TestCase):
@mock.patch("django.db.connection.schema_editor")

View File

@@ -1,192 +1,295 @@
import logging
import shutil
from pathlib import Path
"""Tests for the sanity checker module.
import filelock
from django.conf import settings
from django.test import TestCase
from django.test import override_settings
Tests exercise ``check_sanity`` as a whole, verifying document validation,
orphan detection, task recording, and the iter_wrapper contract.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from documents.models import Document
from documents.models import PaperlessTask
from documents.sanity_checker import check_sanity
from documents.tests.utils import DirectoriesMixin
if TYPE_CHECKING:
from collections.abc import Iterable
from documents.tests.conftest import PaperlessDirs
class TestSanityCheck(DirectoriesMixin, TestCase):
def make_test_data(self):
with filelock.FileLock(settings.MEDIA_LOCK):
# just make sure that the lockfile is present.
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf"
),
Path(self.dirs.originals_dir) / "0000001.pdf",
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "archive"
/ "0000001.pdf"
),
Path(self.dirs.archive_dir) / "0000001.pdf",
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "thumbnails"
/ "0000001.webp"
),
Path(self.dirs.thumbnail_dir) / "0000001.webp",
)
@pytest.mark.django_db
class TestCheckSanityNoDocuments:
"""Sanity checks against an empty archive."""
return Document.objects.create(
title="test",
checksum="42995833e01aea9b3edee44bbfdd7ce1",
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
content="test",
pk=1,
filename="0000001.pdf",
mime_type="application/pdf",
archive_filename="0000001.pdf",
)
def assertSanityError(self, doc: Document, messageRegex) -> None:
@pytest.mark.usefixtures("_media_settings")
def test_no_documents(self) -> None:
messages = check_sanity()
self.assertTrue(messages.has_error)
with self.assertLogs() as capture:
assert not messages.has_error
assert not messages.has_warning
assert len(messages) == 0
@pytest.mark.usefixtures("_media_settings")
def test_no_issues_logs_clean(self, caplog: pytest.LogCaptureFixture) -> None:
messages = check_sanity()
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
messages.log_messages()
self.assertEqual(
capture.records[0].message,
f"Detected following issue(s) with document #{doc.pk}, titled {doc.title}",
)
self.assertRegex(capture.records[1].message, messageRegex)
assert "Sanity checker detected no issues." in caplog.text
def test_no_issues(self) -> None:
self.make_test_data()
@pytest.mark.django_db
class TestCheckSanityHealthyDocument:
def test_no_errors(self, sample_doc: Document) -> None:
messages = check_sanity()
self.assertFalse(messages.has_error)
self.assertFalse(messages.has_warning)
with self.assertLogs() as capture:
messages.log_messages()
self.assertEqual(len(capture.output), 1)
self.assertEqual(capture.records[0].levelno, logging.INFO)
self.assertEqual(
capture.records[0].message,
"Sanity checker detected no issues.",
)
assert not messages.has_error
assert not messages.has_warning
assert len(messages) == 0
def test_no_docs(self) -> None:
self.assertEqual(len(check_sanity()), 0)
def test_success(self) -> None:
self.make_test_data()
self.assertEqual(len(check_sanity()), 0)
def test_no_thumbnail(self) -> None:
doc = self.make_test_data()
Path(doc.thumbnail_path).unlink()
self.assertSanityError(doc, "Thumbnail of document does not exist")
def test_thumbnail_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.thumbnail_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read thumbnail file of document")
Path(doc.thumbnail_path).chmod(0o777)
def test_no_original(self) -> None:
doc = self.make_test_data()
Path(doc.source_path).unlink()
self.assertSanityError(doc, "Original of document does not exist.")
def test_original_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.source_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read original file of document")
Path(doc.source_path).chmod(0o777)
def test_original_checksum_mismatch(self) -> None:
doc = self.make_test_data()
doc.checksum = "WOW"
doc.save()
self.assertSanityError(doc, "Checksum mismatch. Stored: WOW, actual: ")
def test_no_archive(self) -> None:
doc = self.make_test_data()
Path(doc.archive_path).unlink()
self.assertSanityError(doc, "Archived version of document does not exist.")
def test_archive_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.archive_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read archive file of document")
Path(doc.archive_path).chmod(0o777)
def test_archive_checksum_mismatch(self) -> None:
doc = self.make_test_data()
doc.archive_checksum = "WOW"
doc.save()
self.assertSanityError(doc, "Checksum mismatch of archived document")
def test_empty_content(self) -> None:
doc = self.make_test_data()
doc.content = ""
doc.save()
@pytest.mark.django_db
class TestCheckSanityThumbnail:
def test_missing(self, sample_doc: Document) -> None:
Path(sample_doc.thumbnail_path).unlink()
messages = check_sanity()
self.assertFalse(messages.has_error)
self.assertFalse(messages.has_warning)
self.assertEqual(len(messages), 1)
self.assertRegex(
messages[doc.pk][0]["message"],
"Document contains no OCR data",
assert messages.has_error
assert any(
"Thumbnail of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
def test_orphaned_file(self) -> None:
self.make_test_data()
Path(self.dirs.originals_dir, "orphaned").touch()
def test_unreadable(self, sample_doc: Document) -> None:
thumb = Path(sample_doc.thumbnail_path)
thumb.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read thumbnail" in m["message"] for m in messages[sample_doc.pk]
)
finally:
thumb.chmod(0o644)
@pytest.mark.django_db
class TestCheckSanityOriginal:
def test_missing(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
messages = check_sanity()
self.assertTrue(messages.has_warning)
self.assertRegex(
messages._messages[None][0]["message"],
"Orphaned file in media dir",
assert messages.has_error
assert any(
"Original of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
@override_settings(
APP_LOGO="logo/logo.png",
def test_checksum_mismatch(self, sample_doc: Document) -> None:
sample_doc.checksum = "badhash"
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"Checksum mismatch" in m["message"] and "badhash" in m["message"]
for m in messages[sample_doc.pk]
)
def test_unreadable(self, sample_doc: Document) -> None:
src = Path(sample_doc.source_path)
src.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read original" in m["message"] for m in messages[sample_doc.pk]
)
finally:
src.chmod(0o644)
@pytest.mark.django_db
class TestCheckSanityArchive:
def test_checksum_without_filename(self, sample_doc: Document) -> None:
sample_doc.archive_filename = None
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"checksum, but no archive filename" in m["message"]
for m in messages[sample_doc.pk]
)
def test_filename_without_checksum(self, sample_doc: Document) -> None:
sample_doc.archive_checksum = None
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"checksum is missing" in m["message"] for m in messages[sample_doc.pk]
)
def test_missing_file(self, sample_doc: Document) -> None:
Path(sample_doc.archive_path).unlink()
messages = check_sanity()
assert messages.has_error
assert any(
"Archived version of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
def test_checksum_mismatch(self, sample_doc: Document) -> None:
sample_doc.archive_checksum = "wronghash"
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"Checksum mismatch of archived document" in m["message"]
for m in messages[sample_doc.pk]
)
def test_unreadable(self, sample_doc: Document) -> None:
archive = Path(sample_doc.archive_path)
archive.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read archive" in m["message"] for m in messages[sample_doc.pk]
)
finally:
archive.chmod(0o644)
def test_no_archive_at_all(self, sample_doc: Document) -> None:
"""Document with neither archive checksum nor filename is valid."""
Path(sample_doc.archive_path).unlink()
sample_doc.archive_checksum = None
sample_doc.archive_filename = None
sample_doc.save()
messages = check_sanity()
assert not messages.has_error
@pytest.mark.django_db
class TestCheckSanityContent:
@pytest.mark.parametrize(
"content",
[
pytest.param("", id="empty-string"),
],
)
def test_ignore_logo(self) -> None:
self.make_test_data()
logo_dir = Path(self.dirs.media_dir, "logo")
logo_dir.mkdir(parents=True, exist_ok=True)
Path(self.dirs.media_dir, "logo", "logo.png").touch()
def test_no_content(self, sample_doc: Document, content: str) -> None:
sample_doc.content = content
sample_doc.save()
messages = check_sanity()
self.assertFalse(messages.has_warning)
assert not messages.has_error
assert not messages.has_warning
assert any("no OCR data" in m["message"] for m in messages[sample_doc.pk])
def test_ignore_ignorable_files(self) -> None:
self.make_test_data()
Path(self.dirs.media_dir, ".DS_Store").touch()
Path(self.dirs.media_dir, "desktop.ini").touch()
@pytest.mark.django_db
class TestCheckSanityOrphans:
def test_orphaned_file(
self,
sample_doc: Document,
paperless_dirs: PaperlessDirs,
) -> None:
(paperless_dirs.originals / "orphan.pdf").touch()
messages = check_sanity()
self.assertFalse(messages.has_warning)
assert messages.has_warning
assert any("Orphaned file" in m["message"] for m in messages[None])
def test_archive_filename_no_checksum(self) -> None:
doc = self.make_test_data()
doc.archive_checksum = None
doc.save()
self.assertSanityError(doc, "has an archive file, but its checksum is missing.")
@pytest.mark.usefixtures("_media_settings")
def test_ignorable_files_not_flagged(
self,
paperless_dirs: PaperlessDirs,
) -> None:
(paperless_dirs.media / ".DS_Store").touch()
(paperless_dirs.media / "desktop.ini").touch()
messages = check_sanity()
assert not messages.has_warning
def test_archive_checksum_no_filename(self) -> None:
doc = self.make_test_data()
doc.archive_filename = None
doc.save()
self.assertSanityError(
doc,
"has an archive file checksum, but no archive filename.",
)
@pytest.mark.django_db
class TestCheckSanityIterWrapper:
def test_wrapper_receives_documents(self, sample_doc: Document) -> None:
seen: list[Document] = []
def tracking(iterable: Iterable[Document]) -> Iterable[Document]:
for item in iterable:
seen.append(item)
yield item
check_sanity(iter_wrapper=tracking)
assert len(seen) == 1
assert seen[0].pk == sample_doc.pk
def test_default_works_without_wrapper(self, sample_doc: Document) -> None:
messages = check_sanity()
assert not messages.has_error
@pytest.mark.django_db
class TestCheckSanityTaskRecording:
@pytest.mark.parametrize(
("expected_type", "scheduled"),
[
pytest.param(PaperlessTask.TaskType.SCHEDULED_TASK, True, id="scheduled"),
pytest.param(PaperlessTask.TaskType.MANUAL_TASK, False, id="manual"),
],
)
@pytest.mark.usefixtures("_media_settings")
def test_task_type(self, expected_type: str, *, scheduled: bool) -> None:
check_sanity(scheduled=scheduled)
task = PaperlessTask.objects.latest("date_created")
assert task.task_name == PaperlessTask.TaskName.CHECK_SANITY
assert task.type == expected_type
def test_success_status(self, sample_doc: Document) -> None:
check_sanity()
task = PaperlessTask.objects.latest("date_created")
assert task.status == "SUCCESS"
def test_failure_status(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
check_sanity()
task = PaperlessTask.objects.latest("date_created")
assert task.status == "FAILURE"
assert "Check logs for details" in task.result
@pytest.mark.django_db
class TestCheckSanityLogMessages:
def test_logs_doc_issues(
self,
sample_doc: Document,
caplog: pytest.LogCaptureFixture,
) -> None:
Path(sample_doc.source_path).unlink()
messages = check_sanity()
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
messages.log_messages()
assert f"document #{sample_doc.pk}" in caplog.text
assert "Original of document does not exist" in caplog.text
def test_logs_global_issues(
self,
sample_doc: Document,
paperless_dirs: PaperlessDirs,
caplog: pytest.LogCaptureFixture,
) -> None:
(paperless_dirs.originals / "orphan.pdf").touch()
messages = check_sanity()
with caplog.at_level(logging.WARNING, logger="paperless.sanity_checker"):
messages.log_messages()
assert "Orphaned file" in caplog.text
@pytest.mark.usefixtures("_media_settings")
def test_logs_unknown_doc_pk(self, caplog: pytest.LogCaptureFixture) -> None:
"""A doc PK not in the DB logs 'Unknown' as the title."""
messages = check_sanity()
messages.error(99999, "Ghost document")
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
messages.log_messages()
assert "#99999" in caplog.text
assert "Unknown" in caplog.text