mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-27 05:46:27 +00:00
Compare commits
1 Commits
dev
...
feature-sa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfed67fc0a |
@@ -1,17 +1,97 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
"""Management command to check the document archive for issues."""
|
||||
|
||||
from documents.management.commands.mixins import ProgressBarMixin
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
|
||||
from documents.management.commands.base import PaperlessCommand
|
||||
from documents.models import Document
|
||||
from documents.sanity_checker import SanityCheckMessages
|
||||
from documents.sanity_checker import check_sanity
|
||||
|
||||
_LEVEL_STYLE: dict[int, tuple[str, str]] = {
|
||||
logging.ERROR: ("bold red", "ERROR"),
|
||||
logging.WARNING: ("yellow", "WARN"),
|
||||
logging.INFO: ("dim", "INFO"),
|
||||
}
|
||||
|
||||
class Command(ProgressBarMixin, BaseCommand):
|
||||
|
||||
class Command(PaperlessCommand):
|
||||
help = "This command checks your document archive for issues."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
self.add_argument_progress_bar_mixin(parser)
|
||||
def _render_results(self, messages: SanityCheckMessages) -> None:
|
||||
"""Render sanity check results as a Rich table."""
|
||||
console = self.console
|
||||
|
||||
def handle(self, *args, **options):
|
||||
self.handle_progress_bar_mixin(**options)
|
||||
messages = check_sanity(progress=self.use_progress_bar, scheduled=False)
|
||||
if len(messages) == 0:
|
||||
console.print(
|
||||
Panel(
|
||||
"[green]No issues detected.[/green]",
|
||||
title="Sanity Check",
|
||||
border_style="green",
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
messages.log_messages()
|
||||
# Build a lookup for document titles
|
||||
doc_pks = [pk for pk in messages.document_pks() if pk is not None]
|
||||
titles: dict[int, str] = {}
|
||||
if doc_pks:
|
||||
titles = dict(
|
||||
Document.global_objects.filter(pk__in=doc_pks)
|
||||
.only("pk", "title")
|
||||
.values_list("pk", "title"),
|
||||
)
|
||||
|
||||
table = Table(
|
||||
title="Sanity Check Results",
|
||||
show_lines=True,
|
||||
title_style="bold",
|
||||
)
|
||||
table.add_column("Level", width=7, no_wrap=True)
|
||||
table.add_column("Document", min_width=20)
|
||||
table.add_column("Issue", ratio=1)
|
||||
|
||||
for doc_pk, doc_messages in messages.iter_messages():
|
||||
if doc_pk is not None:
|
||||
title = titles.get(doc_pk, "Unknown")
|
||||
doc_label = f"#{doc_pk} {title}"
|
||||
else:
|
||||
doc_label = "(global)"
|
||||
|
||||
for msg in doc_messages:
|
||||
style, label = _LEVEL_STYLE.get(
|
||||
msg["level"],
|
||||
("dim", "INFO"),
|
||||
)
|
||||
table.add_row(
|
||||
Text(label, style=style),
|
||||
doc_label,
|
||||
msg["message"],
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
# Summary line
|
||||
parts: list[str] = []
|
||||
if messages.has_error:
|
||||
parts.append("[bold red]errors[/bold red]")
|
||||
if messages.has_warning:
|
||||
parts.append("[yellow]warnings[/yellow]")
|
||||
summary = " and ".join(parts) if parts else "infos"
|
||||
console.print(f"\nFound {len(messages)} document(s) with {summary}.")
|
||||
|
||||
def handle(self, *args: Any, **options: Any) -> None:
|
||||
messages = check_sanity(
|
||||
scheduled=False,
|
||||
iter_wrapper=lambda docs: self.track(
|
||||
docs,
|
||||
description="Checking documents...",
|
||||
),
|
||||
)
|
||||
self._render_results(messages)
|
||||
|
||||
@@ -1,80 +1,141 @@
|
||||
"""
|
||||
Sanity checker for the Paperless-ngx document archive.
|
||||
|
||||
Verifies that all documents have valid files, correct checksums,
|
||||
and consistent metadata. Reports orphaned files in the media directory.
|
||||
|
||||
Progress display is the caller's responsibility -- pass an ``iter_wrapper``
|
||||
to wrap the document queryset (e.g., with a progress bar). The default
|
||||
is an identity function that adds no overhead.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Final
|
||||
from typing import TypedDict
|
||||
from typing import TypeVar
|
||||
|
||||
from celery import states
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
from tqdm import tqdm
|
||||
|
||||
from documents.models import Document
|
||||
from documents.models import PaperlessTask
|
||||
from paperless.config import GeneralConfig
|
||||
|
||||
logger = logging.getLogger("paperless.sanity_checker")
|
||||
|
||||
_T = TypeVar("_T")
|
||||
IterWrapper = Callable[[Iterable[_T]], Iterable[_T]]
|
||||
|
||||
|
||||
class MessageEntry(TypedDict):
|
||||
"""A single sanity check message with its severity level."""
|
||||
|
||||
level: int
|
||||
message: str
|
||||
|
||||
|
||||
def _identity(iterable: Iterable[_T]) -> Iterable[_T]:
|
||||
"""Pass through an iterable unchanged (default iter_wrapper)."""
|
||||
return iterable
|
||||
|
||||
|
||||
class SanityCheckMessages:
|
||||
def __init__(self) -> None:
|
||||
self._messages: dict[int, list[dict]] = defaultdict(list)
|
||||
self.has_error = False
|
||||
self.has_warning = False
|
||||
"""Collects sanity check messages grouped by document primary key.
|
||||
|
||||
def error(self, doc_pk, message) -> None:
|
||||
Messages are categorized as error, warning, or info. ``None`` is used
|
||||
as the key for messages not associated with a specific document
|
||||
(e.g., orphaned files).
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._messages: dict[int | None, list[MessageEntry]] = defaultdict(list)
|
||||
self.has_error: bool = False
|
||||
self.has_warning: bool = False
|
||||
|
||||
# -- Recording ----------------------------------------------------------
|
||||
|
||||
def error(self, doc_pk: int | None, message: str) -> None:
|
||||
self._messages[doc_pk].append({"level": logging.ERROR, "message": message})
|
||||
self.has_error = True
|
||||
|
||||
def warning(self, doc_pk, message) -> None:
|
||||
def warning(self, doc_pk: int | None, message: str) -> None:
|
||||
self._messages[doc_pk].append({"level": logging.WARNING, "message": message})
|
||||
self.has_warning = True
|
||||
|
||||
def info(self, doc_pk, message) -> None:
|
||||
def info(self, doc_pk: int | None, message: str) -> None:
|
||||
self._messages[doc_pk].append({"level": logging.INFO, "message": message})
|
||||
|
||||
def log_messages(self) -> None:
|
||||
logger = logging.getLogger("paperless.sanity_checker")
|
||||
# -- Iteration / query --------------------------------------------------
|
||||
|
||||
if len(self._messages) == 0:
|
||||
logger.info("Sanity checker detected no issues.")
|
||||
else:
|
||||
# Query once
|
||||
all_docs = Document.global_objects.all()
|
||||
def document_pks(self) -> list[int | None]:
|
||||
"""Return all document PKs (including None for global messages)."""
|
||||
return list(self._messages.keys())
|
||||
|
||||
for doc_pk in self._messages:
|
||||
if doc_pk is not None:
|
||||
doc = all_docs.get(pk=doc_pk)
|
||||
logger.info(
|
||||
f"Detected following issue(s) with document #{doc.pk},"
|
||||
f" titled {doc.title}",
|
||||
)
|
||||
for msg in self._messages[doc_pk]:
|
||||
logger.log(msg["level"], msg["message"])
|
||||
def iter_messages(self) -> Iterator[tuple[int | None, list[MessageEntry]]]:
|
||||
"""Iterate over (doc_pk, messages) pairs."""
|
||||
yield from self._messages.items()
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self._messages)
|
||||
|
||||
def __getitem__(self, item):
|
||||
def __getitem__(self, item: int | None) -> list[MessageEntry]:
|
||||
return self._messages[item]
|
||||
|
||||
# -- Logging output (used by Celery task path) --------------------------
|
||||
|
||||
def log_messages(self) -> None:
|
||||
"""Write all messages to the ``paperless.sanity_checker`` logger.
|
||||
|
||||
This is the output path for headless / Celery execution.
|
||||
Management commands use Rich rendering instead.
|
||||
"""
|
||||
if len(self._messages) == 0:
|
||||
logger.info("Sanity checker detected no issues.")
|
||||
return
|
||||
|
||||
doc_pks = [pk for pk in self._messages if pk is not None]
|
||||
titles: dict[int, str] = {}
|
||||
if doc_pks:
|
||||
titles = dict(
|
||||
Document.global_objects.filter(pk__in=doc_pks)
|
||||
.only("pk", "title")
|
||||
.values_list("pk", "title"),
|
||||
)
|
||||
|
||||
for doc_pk, entries in self._messages.items():
|
||||
if doc_pk is not None:
|
||||
title = titles.get(doc_pk, "Unknown")
|
||||
logger.info(
|
||||
"Detected following issue(s) with document #%s, titled %s",
|
||||
doc_pk,
|
||||
title,
|
||||
)
|
||||
for msg in entries:
|
||||
logger.log(msg["level"], msg["message"])
|
||||
|
||||
|
||||
class SanityCheckFailedException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages:
|
||||
paperless_task = PaperlessTask.objects.create(
|
||||
task_id=uuid.uuid4(),
|
||||
type=PaperlessTask.TaskType.SCHEDULED_TASK
|
||||
if scheduled
|
||||
else PaperlessTask.TaskType.MANUAL_TASK,
|
||||
task_name=PaperlessTask.TaskName.CHECK_SANITY,
|
||||
status=states.STARTED,
|
||||
date_created=timezone.now(),
|
||||
date_started=timezone.now(),
|
||||
)
|
||||
messages = SanityCheckMessages()
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_present_files() -> set[Path]:
|
||||
"""Collect all files in MEDIA_ROOT, excluding directories and ignorable files."""
|
||||
present_files = {
|
||||
x.resolve()
|
||||
for x in Path(settings.MEDIA_ROOT).glob("**/*")
|
||||
@@ -82,95 +143,167 @@ def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages:
|
||||
}
|
||||
|
||||
lockfile = Path(settings.MEDIA_LOCK).resolve()
|
||||
if lockfile in present_files:
|
||||
present_files.remove(lockfile)
|
||||
present_files.discard(lockfile)
|
||||
|
||||
general_config = GeneralConfig()
|
||||
app_logo = general_config.app_logo or settings.APP_LOGO
|
||||
if app_logo:
|
||||
logo_file = Path(settings.MEDIA_ROOT / Path(app_logo.lstrip("/"))).resolve()
|
||||
if logo_file in present_files:
|
||||
present_files.remove(logo_file)
|
||||
present_files.discard(logo_file)
|
||||
|
||||
for doc in tqdm(Document.global_objects.all(), disable=not progress):
|
||||
# Check sanity of the thumbnail
|
||||
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
|
||||
if not thumbnail_path.exists() or not thumbnail_path.is_file():
|
||||
messages.error(doc.pk, "Thumbnail of document does not exist.")
|
||||
else:
|
||||
if thumbnail_path in present_files:
|
||||
present_files.remove(thumbnail_path)
|
||||
try:
|
||||
_ = thumbnail_path.read_bytes()
|
||||
except OSError as e:
|
||||
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
|
||||
return present_files
|
||||
|
||||
# Check sanity of the original file
|
||||
# TODO: extract method
|
||||
source_path: Final[Path] = Path(doc.source_path).resolve()
|
||||
if not source_path.exists() or not source_path.is_file():
|
||||
messages.error(doc.pk, "Original of document does not exist.")
|
||||
else:
|
||||
if source_path in present_files:
|
||||
present_files.remove(source_path)
|
||||
try:
|
||||
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
|
||||
except OSError as e:
|
||||
messages.error(doc.pk, f"Cannot read original file of document: {e}")
|
||||
else:
|
||||
if checksum != doc.checksum:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Checksum mismatch. "
|
||||
f"Stored: {doc.checksum}, actual: {checksum}.",
|
||||
)
|
||||
|
||||
# Check sanity of the archive file.
|
||||
if doc.archive_checksum is not None and doc.archive_filename is None:
|
||||
def _check_thumbnail(
|
||||
doc: Document,
|
||||
messages: SanityCheckMessages,
|
||||
present_files: set[Path],
|
||||
) -> None:
|
||||
"""Verify the thumbnail exists and is readable."""
|
||||
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
|
||||
if not thumbnail_path.exists() or not thumbnail_path.is_file():
|
||||
messages.error(doc.pk, "Thumbnail of document does not exist.")
|
||||
return
|
||||
|
||||
present_files.discard(thumbnail_path)
|
||||
try:
|
||||
_ = thumbnail_path.read_bytes()
|
||||
except OSError as e:
|
||||
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
|
||||
|
||||
|
||||
def _check_original(
|
||||
doc: Document,
|
||||
messages: SanityCheckMessages,
|
||||
present_files: set[Path],
|
||||
) -> None:
|
||||
"""Verify the original file exists, is readable, and has matching checksum."""
|
||||
source_path: Final[Path] = Path(doc.source_path).resolve()
|
||||
if not source_path.exists() or not source_path.is_file():
|
||||
messages.error(doc.pk, "Original of document does not exist.")
|
||||
return
|
||||
|
||||
present_files.discard(source_path)
|
||||
try:
|
||||
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
|
||||
except OSError as e:
|
||||
messages.error(doc.pk, f"Cannot read original file of document: {e}")
|
||||
else:
|
||||
if checksum != doc.checksum:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Document has an archive file checksum, but no archive filename.",
|
||||
f"Checksum mismatch. Stored: {doc.checksum}, actual: {checksum}.",
|
||||
)
|
||||
elif doc.archive_checksum is None and doc.archive_filename is not None:
|
||||
|
||||
|
||||
def _check_archive(
|
||||
doc: Document,
|
||||
messages: SanityCheckMessages,
|
||||
present_files: set[Path],
|
||||
) -> None:
|
||||
"""Verify archive file consistency: checksum/filename pairing and file integrity."""
|
||||
if doc.archive_checksum is not None and doc.archive_filename is None:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Document has an archive file checksum, but no archive filename.",
|
||||
)
|
||||
elif doc.archive_checksum is None and doc.archive_filename is not None:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Document has an archive file, but its checksum is missing.",
|
||||
)
|
||||
elif doc.has_archive_version:
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(doc.archive_path, Path)
|
||||
archive_path: Final[Path] = Path(doc.archive_path).resolve()
|
||||
if not archive_path.exists() or not archive_path.is_file():
|
||||
messages.error(doc.pk, "Archived version of document does not exist.")
|
||||
return
|
||||
|
||||
present_files.discard(archive_path)
|
||||
try:
|
||||
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
|
||||
except OSError as e:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Document has an archive file, but its checksum is missing.",
|
||||
f"Cannot read archive file of document : {e}",
|
||||
)
|
||||
elif doc.has_archive_version:
|
||||
archive_path: Final[Path] = Path(doc.archive_path).resolve()
|
||||
if not archive_path.exists() or not archive_path.is_file():
|
||||
messages.error(doc.pk, "Archived version of document does not exist.")
|
||||
else:
|
||||
if archive_path in present_files:
|
||||
present_files.remove(archive_path)
|
||||
try:
|
||||
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
|
||||
except OSError as e:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
f"Cannot read archive file of document : {e}",
|
||||
)
|
||||
else:
|
||||
if checksum != doc.archive_checksum:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Checksum mismatch of archived document. "
|
||||
f"Stored: {doc.archive_checksum}, "
|
||||
f"actual: {checksum}.",
|
||||
)
|
||||
else:
|
||||
if checksum != doc.archive_checksum:
|
||||
messages.error(
|
||||
doc.pk,
|
||||
"Checksum mismatch of archived document. "
|
||||
f"Stored: {doc.archive_checksum}, actual: {checksum}.",
|
||||
)
|
||||
|
||||
# other document checks
|
||||
if not doc.content:
|
||||
messages.info(doc.pk, "Document contains no OCR data")
|
||||
|
||||
def _check_content(doc: Document, messages: SanityCheckMessages) -> None:
|
||||
"""Flag documents with no OCR content."""
|
||||
if not doc.content:
|
||||
messages.info(doc.pk, "Document contains no OCR data")
|
||||
|
||||
|
||||
def _check_document(
|
||||
doc: Document,
|
||||
messages: SanityCheckMessages,
|
||||
present_files: set[Path],
|
||||
) -> None:
|
||||
"""Run all checks for a single document."""
|
||||
_check_thumbnail(doc, messages, present_files)
|
||||
_check_original(doc, messages, present_files)
|
||||
_check_archive(doc, messages, present_files)
|
||||
_check_content(doc, messages)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def check_sanity(
|
||||
*,
|
||||
scheduled: bool = True,
|
||||
iter_wrapper: IterWrapper[Document] = _identity,
|
||||
) -> SanityCheckMessages:
|
||||
"""Run a full sanity check on the document archive.
|
||||
|
||||
Args:
|
||||
scheduled: Whether this is a scheduled (automatic) or manual check.
|
||||
Controls the task type recorded in the database.
|
||||
iter_wrapper: A callable that wraps the document iterable, e.g.,
|
||||
for progress bar display. Defaults to identity (no wrapping).
|
||||
|
||||
Returns:
|
||||
A SanityCheckMessages instance containing all detected issues.
|
||||
"""
|
||||
paperless_task = PaperlessTask.objects.create(
|
||||
task_id=uuid.uuid4(),
|
||||
type=(
|
||||
PaperlessTask.TaskType.SCHEDULED_TASK
|
||||
if scheduled
|
||||
else PaperlessTask.TaskType.MANUAL_TASK
|
||||
),
|
||||
task_name=PaperlessTask.TaskName.CHECK_SANITY,
|
||||
status=states.STARTED,
|
||||
date_created=timezone.now(),
|
||||
date_started=timezone.now(),
|
||||
)
|
||||
|
||||
messages = SanityCheckMessages()
|
||||
present_files = _build_present_files()
|
||||
|
||||
documents = Document.global_objects.all()
|
||||
for doc in iter_wrapper(documents):
|
||||
_check_document(doc, messages, present_files)
|
||||
|
||||
for extra_file in present_files:
|
||||
messages.warning(None, f"Orphaned file in media dir: {extra_file}")
|
||||
|
||||
paperless_task.status = states.SUCCESS if not messages.has_error else states.FAILURE
|
||||
# result is concatenated messages
|
||||
paperless_task.result = f"{len(messages)} issues found."
|
||||
if messages.has_error:
|
||||
paperless_task.result += " Check logs for details."
|
||||
paperless_task.date_done = timezone.now()
|
||||
paperless_task.save(update_fields=["status", "result", "date_done"])
|
||||
|
||||
return messages
|
||||
|
||||
@@ -1,10 +1,96 @@
|
||||
import shutil
|
||||
import zoneinfo
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import filelock
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
from pytest_django.fixtures import SettingsWrapper
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from documents.tests.factories import DocumentFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from documents.models import Document
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class PaperlessDirs:
|
||||
"""Standard Paperless-ngx directory layout for tests."""
|
||||
|
||||
media: Path
|
||||
originals: Path
|
||||
archive: Path
|
||||
thumbnails: Path
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def samples_dir() -> Path:
|
||||
"""Path to the shared test sample documents."""
|
||||
return Path(__file__).parent / "samples" / "documents"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def paperless_dirs(tmp_path: Path) -> PaperlessDirs:
|
||||
"""Create and return the directory structure for testing."""
|
||||
media = tmp_path / "media"
|
||||
dirs = PaperlessDirs(
|
||||
media=media,
|
||||
originals=media / "documents" / "originals",
|
||||
archive=media / "documents" / "archive",
|
||||
thumbnails=media / "documents" / "thumbnails",
|
||||
)
|
||||
for d in (dirs.originals, dirs.archive, dirs.thumbnails):
|
||||
d.mkdir(parents=True)
|
||||
return dirs
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def _media_settings(paperless_dirs: PaperlessDirs, settings) -> None:
|
||||
"""Configure Django settings to point at temp directories."""
|
||||
settings.MEDIA_ROOT = paperless_dirs.media
|
||||
settings.ORIGINALS_DIR = paperless_dirs.originals
|
||||
settings.ARCHIVE_DIR = paperless_dirs.archive
|
||||
settings.THUMBNAIL_DIR = paperless_dirs.thumbnails
|
||||
settings.MEDIA_LOCK = paperless_dirs.media / "media.lock"
|
||||
settings.IGNORABLE_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
|
||||
settings.APP_LOGO = ""
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def sample_doc(
|
||||
paperless_dirs: PaperlessDirs,
|
||||
_media_settings: None,
|
||||
samples_dir: Path,
|
||||
) -> "Document":
|
||||
"""Create a document with valid files and matching checksums."""
|
||||
with filelock.FileLock(paperless_dirs.media / "media.lock"):
|
||||
shutil.copy(
|
||||
samples_dir / "originals" / "0000001.pdf",
|
||||
paperless_dirs.originals / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
samples_dir / "archive" / "0000001.pdf",
|
||||
paperless_dirs.archive / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
samples_dir / "thumbnails" / "0000001.webp",
|
||||
paperless_dirs.thumbnails / "0000001.webp",
|
||||
)
|
||||
|
||||
return DocumentFactory(
|
||||
title="test",
|
||||
checksum="42995833e01aea9b3edee44bbfdd7ce1",
|
||||
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
|
||||
content="test content",
|
||||
pk=1,
|
||||
filename="0000001.pdf",
|
||||
mime_type="application/pdf",
|
||||
archive_filename="0000001.pdf",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def settings_timezone(settings: SettingsWrapper) -> zoneinfo.ZoneInfo:
|
||||
|
||||
173
src/documents/tests/management/test_management_sanity_checker.py
Normal file
173
src/documents/tests/management/test_management_sanity_checker.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""Tests for the document_sanity_checker management command.
|
||||
|
||||
Verifies Rich rendering (table, panel, summary) and end-to-end CLI behavior.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
from rich.console import Console
|
||||
|
||||
from documents.management.commands.document_sanity_checker import Command
|
||||
from documents.sanity_checker import SanityCheckMessages
|
||||
from documents.tests.factories import DocumentFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from documents.models import Document
|
||||
from documents.tests.conftest import PaperlessDirs
|
||||
|
||||
|
||||
def _render_to_string(messages: SanityCheckMessages) -> str:
|
||||
"""Render command output to a plain string for assertion."""
|
||||
buf = StringIO()
|
||||
cmd = Command()
|
||||
cmd.console = Console(file=buf, width=120, no_color=True)
|
||||
cmd._render_results(messages)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rich rendering
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRenderResultsNoIssues:
|
||||
"""No DB access needed -- renders an empty SanityCheckMessages."""
|
||||
|
||||
def test_shows_panel(self) -> None:
|
||||
output = _render_to_string(SanityCheckMessages())
|
||||
assert "No issues detected" in output
|
||||
assert "Sanity Check" in output
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestRenderResultsWithIssues:
|
||||
def test_error_row(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.error(sample_doc.pk, "Original missing")
|
||||
output = _render_to_string(msgs)
|
||||
assert "Sanity Check Results" in output
|
||||
assert "ERROR" in output
|
||||
assert "Original missing" in output
|
||||
assert f"#{sample_doc.pk}" in output
|
||||
assert sample_doc.title in output
|
||||
|
||||
def test_warning_row(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.warning(sample_doc.pk, "Suspicious file")
|
||||
output = _render_to_string(msgs)
|
||||
assert "WARN" in output
|
||||
assert "Suspicious file" in output
|
||||
|
||||
def test_info_row(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.info(sample_doc.pk, "No OCR data")
|
||||
output = _render_to_string(msgs)
|
||||
assert "INFO" in output
|
||||
assert "No OCR data" in output
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_global_message(self) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.warning(None, "Orphaned file: /tmp/stray.pdf")
|
||||
output = _render_to_string(msgs)
|
||||
assert "(global)" in output
|
||||
assert "Orphaned file" in output
|
||||
|
||||
def test_multiple_messages_same_doc(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.error(sample_doc.pk, "Thumbnail missing")
|
||||
msgs.error(sample_doc.pk, "Checksum mismatch")
|
||||
output = _render_to_string(msgs)
|
||||
assert "Thumbnail missing" in output
|
||||
assert "Checksum mismatch" in output
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_unknown_doc_pk(self) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.error(99999, "Ghost document")
|
||||
output = _render_to_string(msgs)
|
||||
assert "#99999" in output
|
||||
assert "Unknown" in output
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestRenderResultsSummary:
|
||||
def test_errors_only(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.error(sample_doc.pk, "broken")
|
||||
output = _render_to_string(msgs)
|
||||
assert "errors" in output
|
||||
assert "Found 1 document(s)" in output
|
||||
|
||||
def test_warnings_only(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.warning(sample_doc.pk, "odd")
|
||||
output = _render_to_string(msgs)
|
||||
assert "warnings" in output
|
||||
|
||||
def test_errors_and_warnings(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.error(sample_doc.pk, "broken")
|
||||
msgs.warning(None, "orphan")
|
||||
output = _render_to_string(msgs)
|
||||
assert "errors" in output
|
||||
assert "warnings" in output
|
||||
assert "Found 2 document(s)" in output
|
||||
|
||||
def test_infos_only(self, sample_doc: Document) -> None:
|
||||
msgs = SanityCheckMessages()
|
||||
msgs.info(sample_doc.pk, "no OCR")
|
||||
output = _render_to_string(msgs)
|
||||
assert "infos" in output
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end command execution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.management
|
||||
class TestDocumentSanityCheckerCommand:
|
||||
def test_no_issues(self, sample_doc: Document) -> None:
|
||||
out = StringIO()
|
||||
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
|
||||
assert "No issues detected" in out.getvalue()
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_no_issues_empty_archive(self) -> None:
|
||||
out = StringIO()
|
||||
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
|
||||
assert "No issues detected" in out.getvalue()
|
||||
|
||||
def test_missing_original(self, sample_doc: Document) -> None:
|
||||
Path(sample_doc.source_path).unlink()
|
||||
out = StringIO()
|
||||
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
|
||||
output = out.getvalue()
|
||||
assert "ERROR" in output
|
||||
assert "Original of document does not exist" in output
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_checksum_mismatch(self, paperless_dirs: PaperlessDirs) -> None:
|
||||
"""Lightweight document with zero-byte files triggers checksum mismatch."""
|
||||
doc = DocumentFactory(
|
||||
title="test",
|
||||
content="test",
|
||||
filename="test.pdf",
|
||||
checksum="abc",
|
||||
)
|
||||
Path(doc.source_path).touch()
|
||||
Path(doc.thumbnail_path).touch()
|
||||
|
||||
out = StringIO()
|
||||
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
|
||||
output = out.getvalue()
|
||||
assert "ERROR" in output
|
||||
assert "Checksum mismatch. Stored: abc, actual:" in output
|
||||
@@ -134,6 +134,7 @@ class TestRenamer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
self.assertIsFile(doc2.archive_path)
|
||||
|
||||
|
||||
@pytest.mark.management
|
||||
class TestCreateClassifier(TestCase):
|
||||
@mock.patch(
|
||||
"documents.management.commands.document_create_classifier.train_classifier",
|
||||
@@ -144,32 +145,6 @@ class TestCreateClassifier(TestCase):
|
||||
m.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.management
|
||||
class TestSanityChecker(DirectoriesMixin, TestCase):
|
||||
def test_no_issues(self) -> None:
|
||||
with self.assertLogs() as capture:
|
||||
call_command("document_sanity_checker")
|
||||
|
||||
self.assertEqual(len(capture.output), 1)
|
||||
self.assertIn("Sanity checker detected no issues.", capture.output[0])
|
||||
|
||||
def test_errors(self) -> None:
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
content="test",
|
||||
filename="test.pdf",
|
||||
checksum="abc",
|
||||
)
|
||||
Path(doc.source_path).touch()
|
||||
Path(doc.thumbnail_path).touch()
|
||||
|
||||
with self.assertLogs() as capture:
|
||||
call_command("document_sanity_checker")
|
||||
|
||||
self.assertEqual(len(capture.output), 2)
|
||||
self.assertIn("Checksum mismatch. Stored: abc, actual:", capture.output[1])
|
||||
|
||||
|
||||
@pytest.mark.management
|
||||
class TestConvertMariaDBUUID(TestCase):
|
||||
@mock.patch("django.db.connection.schema_editor")
|
||||
|
||||
@@ -1,192 +1,295 @@
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
"""Tests for the sanity checker module.
|
||||
|
||||
import filelock
|
||||
from django.conf import settings
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
Tests exercise ``check_sanity`` as a whole, verifying document validation,
|
||||
orphan detection, task recording, and the iter_wrapper contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
|
||||
from documents.models import Document
|
||||
from documents.models import PaperlessTask
|
||||
from documents.sanity_checker import check_sanity
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
from documents.tests.conftest import PaperlessDirs
|
||||
|
||||
|
||||
class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
def make_test_data(self):
|
||||
with filelock.FileLock(settings.MEDIA_LOCK):
|
||||
# just make sure that the lockfile is present.
|
||||
shutil.copy(
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf"
|
||||
),
|
||||
Path(self.dirs.originals_dir) / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "archive"
|
||||
/ "0000001.pdf"
|
||||
),
|
||||
Path(self.dirs.archive_dir) / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "thumbnails"
|
||||
/ "0000001.webp"
|
||||
),
|
||||
Path(self.dirs.thumbnail_dir) / "0000001.webp",
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityNoDocuments:
|
||||
"""Sanity checks against an empty archive."""
|
||||
|
||||
return Document.objects.create(
|
||||
title="test",
|
||||
checksum="42995833e01aea9b3edee44bbfdd7ce1",
|
||||
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
|
||||
content="test",
|
||||
pk=1,
|
||||
filename="0000001.pdf",
|
||||
mime_type="application/pdf",
|
||||
archive_filename="0000001.pdf",
|
||||
)
|
||||
|
||||
def assertSanityError(self, doc: Document, messageRegex) -> None:
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_no_documents(self) -> None:
|
||||
messages = check_sanity()
|
||||
self.assertTrue(messages.has_error)
|
||||
with self.assertLogs() as capture:
|
||||
assert not messages.has_error
|
||||
assert not messages.has_warning
|
||||
assert len(messages) == 0
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_no_issues_logs_clean(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
messages = check_sanity()
|
||||
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
|
||||
messages.log_messages()
|
||||
self.assertEqual(
|
||||
capture.records[0].message,
|
||||
f"Detected following issue(s) with document #{doc.pk}, titled {doc.title}",
|
||||
)
|
||||
self.assertRegex(capture.records[1].message, messageRegex)
|
||||
assert "Sanity checker detected no issues." in caplog.text
|
||||
|
||||
def test_no_issues(self) -> None:
|
||||
self.make_test_data()
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityHealthyDocument:
|
||||
def test_no_errors(self, sample_doc: Document) -> None:
|
||||
messages = check_sanity()
|
||||
self.assertFalse(messages.has_error)
|
||||
self.assertFalse(messages.has_warning)
|
||||
with self.assertLogs() as capture:
|
||||
messages.log_messages()
|
||||
self.assertEqual(len(capture.output), 1)
|
||||
self.assertEqual(capture.records[0].levelno, logging.INFO)
|
||||
self.assertEqual(
|
||||
capture.records[0].message,
|
||||
"Sanity checker detected no issues.",
|
||||
)
|
||||
assert not messages.has_error
|
||||
assert not messages.has_warning
|
||||
assert len(messages) == 0
|
||||
|
||||
def test_no_docs(self) -> None:
|
||||
self.assertEqual(len(check_sanity()), 0)
|
||||
|
||||
def test_success(self) -> None:
|
||||
self.make_test_data()
|
||||
self.assertEqual(len(check_sanity()), 0)
|
||||
|
||||
def test_no_thumbnail(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.thumbnail_path).unlink()
|
||||
self.assertSanityError(doc, "Thumbnail of document does not exist")
|
||||
|
||||
def test_thumbnail_no_access(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.thumbnail_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read thumbnail file of document")
|
||||
Path(doc.thumbnail_path).chmod(0o777)
|
||||
|
||||
def test_no_original(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.source_path).unlink()
|
||||
self.assertSanityError(doc, "Original of document does not exist.")
|
||||
|
||||
def test_original_no_access(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.source_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read original file of document")
|
||||
Path(doc.source_path).chmod(0o777)
|
||||
|
||||
def test_original_checksum_mismatch(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
doc.checksum = "WOW"
|
||||
doc.save()
|
||||
self.assertSanityError(doc, "Checksum mismatch. Stored: WOW, actual: ")
|
||||
|
||||
def test_no_archive(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.archive_path).unlink()
|
||||
self.assertSanityError(doc, "Archived version of document does not exist.")
|
||||
|
||||
def test_archive_no_access(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
Path(doc.archive_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read archive file of document")
|
||||
Path(doc.archive_path).chmod(0o777)
|
||||
|
||||
def test_archive_checksum_mismatch(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
doc.archive_checksum = "WOW"
|
||||
doc.save()
|
||||
self.assertSanityError(doc, "Checksum mismatch of archived document")
|
||||
|
||||
def test_empty_content(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
doc.content = ""
|
||||
doc.save()
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityThumbnail:
|
||||
def test_missing(self, sample_doc: Document) -> None:
|
||||
Path(sample_doc.thumbnail_path).unlink()
|
||||
messages = check_sanity()
|
||||
self.assertFalse(messages.has_error)
|
||||
self.assertFalse(messages.has_warning)
|
||||
self.assertEqual(len(messages), 1)
|
||||
self.assertRegex(
|
||||
messages[doc.pk][0]["message"],
|
||||
"Document contains no OCR data",
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Thumbnail of document does not exist" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_orphaned_file(self) -> None:
|
||||
self.make_test_data()
|
||||
Path(self.dirs.originals_dir, "orphaned").touch()
|
||||
def test_unreadable(self, sample_doc: Document) -> None:
|
||||
thumb = Path(sample_doc.thumbnail_path)
|
||||
thumb.chmod(0o000)
|
||||
try:
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Cannot read thumbnail" in m["message"] for m in messages[sample_doc.pk]
|
||||
)
|
||||
finally:
|
||||
thumb.chmod(0o644)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityOriginal:
|
||||
def test_missing(self, sample_doc: Document) -> None:
|
||||
Path(sample_doc.source_path).unlink()
|
||||
messages = check_sanity()
|
||||
self.assertTrue(messages.has_warning)
|
||||
self.assertRegex(
|
||||
messages._messages[None][0]["message"],
|
||||
"Orphaned file in media dir",
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Original of document does not exist" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
APP_LOGO="logo/logo.png",
|
||||
def test_checksum_mismatch(self, sample_doc: Document) -> None:
|
||||
sample_doc.checksum = "badhash"
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Checksum mismatch" in m["message"] and "badhash" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_unreadable(self, sample_doc: Document) -> None:
|
||||
src = Path(sample_doc.source_path)
|
||||
src.chmod(0o000)
|
||||
try:
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Cannot read original" in m["message"] for m in messages[sample_doc.pk]
|
||||
)
|
||||
finally:
|
||||
src.chmod(0o644)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityArchive:
|
||||
def test_checksum_without_filename(self, sample_doc: Document) -> None:
|
||||
sample_doc.archive_filename = None
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"checksum, but no archive filename" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_filename_without_checksum(self, sample_doc: Document) -> None:
|
||||
sample_doc.archive_checksum = None
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"checksum is missing" in m["message"] for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_missing_file(self, sample_doc: Document) -> None:
|
||||
Path(sample_doc.archive_path).unlink()
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Archived version of document does not exist" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_checksum_mismatch(self, sample_doc: Document) -> None:
|
||||
sample_doc.archive_checksum = "wronghash"
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Checksum mismatch of archived document" in m["message"]
|
||||
for m in messages[sample_doc.pk]
|
||||
)
|
||||
|
||||
def test_unreadable(self, sample_doc: Document) -> None:
|
||||
archive = Path(sample_doc.archive_path)
|
||||
archive.chmod(0o000)
|
||||
try:
|
||||
messages = check_sanity()
|
||||
assert messages.has_error
|
||||
assert any(
|
||||
"Cannot read archive" in m["message"] for m in messages[sample_doc.pk]
|
||||
)
|
||||
finally:
|
||||
archive.chmod(0o644)
|
||||
|
||||
def test_no_archive_at_all(self, sample_doc: Document) -> None:
|
||||
"""Document with neither archive checksum nor filename is valid."""
|
||||
Path(sample_doc.archive_path).unlink()
|
||||
sample_doc.archive_checksum = None
|
||||
sample_doc.archive_filename = None
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
assert not messages.has_error
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityContent:
|
||||
@pytest.mark.parametrize(
|
||||
"content",
|
||||
[
|
||||
pytest.param("", id="empty-string"),
|
||||
],
|
||||
)
|
||||
def test_ignore_logo(self) -> None:
|
||||
self.make_test_data()
|
||||
logo_dir = Path(self.dirs.media_dir, "logo")
|
||||
logo_dir.mkdir(parents=True, exist_ok=True)
|
||||
Path(self.dirs.media_dir, "logo", "logo.png").touch()
|
||||
def test_no_content(self, sample_doc: Document, content: str) -> None:
|
||||
sample_doc.content = content
|
||||
sample_doc.save()
|
||||
messages = check_sanity()
|
||||
self.assertFalse(messages.has_warning)
|
||||
assert not messages.has_error
|
||||
assert not messages.has_warning
|
||||
assert any("no OCR data" in m["message"] for m in messages[sample_doc.pk])
|
||||
|
||||
def test_ignore_ignorable_files(self) -> None:
|
||||
self.make_test_data()
|
||||
Path(self.dirs.media_dir, ".DS_Store").touch()
|
||||
Path(self.dirs.media_dir, "desktop.ini").touch()
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityOrphans:
|
||||
def test_orphaned_file(
|
||||
self,
|
||||
sample_doc: Document,
|
||||
paperless_dirs: PaperlessDirs,
|
||||
) -> None:
|
||||
(paperless_dirs.originals / "orphan.pdf").touch()
|
||||
messages = check_sanity()
|
||||
self.assertFalse(messages.has_warning)
|
||||
assert messages.has_warning
|
||||
assert any("Orphaned file" in m["message"] for m in messages[None])
|
||||
|
||||
def test_archive_filename_no_checksum(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
doc.archive_checksum = None
|
||||
doc.save()
|
||||
self.assertSanityError(doc, "has an archive file, but its checksum is missing.")
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_ignorable_files_not_flagged(
|
||||
self,
|
||||
paperless_dirs: PaperlessDirs,
|
||||
) -> None:
|
||||
(paperless_dirs.media / ".DS_Store").touch()
|
||||
(paperless_dirs.media / "desktop.ini").touch()
|
||||
messages = check_sanity()
|
||||
assert not messages.has_warning
|
||||
|
||||
def test_archive_checksum_no_filename(self) -> None:
|
||||
doc = self.make_test_data()
|
||||
doc.archive_filename = None
|
||||
doc.save()
|
||||
self.assertSanityError(
|
||||
doc,
|
||||
"has an archive file checksum, but no archive filename.",
|
||||
)
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityIterWrapper:
|
||||
def test_wrapper_receives_documents(self, sample_doc: Document) -> None:
|
||||
seen: list[Document] = []
|
||||
|
||||
def tracking(iterable: Iterable[Document]) -> Iterable[Document]:
|
||||
for item in iterable:
|
||||
seen.append(item)
|
||||
yield item
|
||||
|
||||
check_sanity(iter_wrapper=tracking)
|
||||
assert len(seen) == 1
|
||||
assert seen[0].pk == sample_doc.pk
|
||||
|
||||
def test_default_works_without_wrapper(self, sample_doc: Document) -> None:
|
||||
messages = check_sanity()
|
||||
assert not messages.has_error
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityTaskRecording:
|
||||
@pytest.mark.parametrize(
|
||||
("expected_type", "scheduled"),
|
||||
[
|
||||
pytest.param(PaperlessTask.TaskType.SCHEDULED_TASK, True, id="scheduled"),
|
||||
pytest.param(PaperlessTask.TaskType.MANUAL_TASK, False, id="manual"),
|
||||
],
|
||||
)
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_task_type(self, expected_type: str, *, scheduled: bool) -> None:
|
||||
check_sanity(scheduled=scheduled)
|
||||
task = PaperlessTask.objects.latest("date_created")
|
||||
assert task.task_name == PaperlessTask.TaskName.CHECK_SANITY
|
||||
assert task.type == expected_type
|
||||
|
||||
def test_success_status(self, sample_doc: Document) -> None:
|
||||
check_sanity()
|
||||
task = PaperlessTask.objects.latest("date_created")
|
||||
assert task.status == "SUCCESS"
|
||||
|
||||
def test_failure_status(self, sample_doc: Document) -> None:
|
||||
Path(sample_doc.source_path).unlink()
|
||||
check_sanity()
|
||||
task = PaperlessTask.objects.latest("date_created")
|
||||
assert task.status == "FAILURE"
|
||||
assert "Check logs for details" in task.result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestCheckSanityLogMessages:
|
||||
def test_logs_doc_issues(
|
||||
self,
|
||||
sample_doc: Document,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
Path(sample_doc.source_path).unlink()
|
||||
messages = check_sanity()
|
||||
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
|
||||
messages.log_messages()
|
||||
assert f"document #{sample_doc.pk}" in caplog.text
|
||||
assert "Original of document does not exist" in caplog.text
|
||||
|
||||
def test_logs_global_issues(
|
||||
self,
|
||||
sample_doc: Document,
|
||||
paperless_dirs: PaperlessDirs,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
(paperless_dirs.originals / "orphan.pdf").touch()
|
||||
messages = check_sanity()
|
||||
with caplog.at_level(logging.WARNING, logger="paperless.sanity_checker"):
|
||||
messages.log_messages()
|
||||
assert "Orphaned file" in caplog.text
|
||||
|
||||
@pytest.mark.usefixtures("_media_settings")
|
||||
def test_logs_unknown_doc_pk(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""A doc PK not in the DB logs 'Unknown' as the title."""
|
||||
messages = check_sanity()
|
||||
messages.error(99999, "Ghost document")
|
||||
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
|
||||
messages.log_messages()
|
||||
assert "#99999" in caplog.text
|
||||
assert "Unknown" in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user