diff --git a/src/documents/management/commands/document_sanity_checker.py b/src/documents/management/commands/document_sanity_checker.py index b634d4dc9..df5a3e4bf 100644 --- a/src/documents/management/commands/document_sanity_checker.py +++ b/src/documents/management/commands/document_sanity_checker.py @@ -1,17 +1,117 @@ -from django.core.management.base import BaseCommand +"""Management command to check the document archive for issues.""" -from documents.management.commands.mixins import ProgressBarMixin +from __future__ import annotations + +import logging +from typing import Any + +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from documents.management.commands.base import PaperlessCommand +from documents.models import Document +from documents.sanity_checker import SanityCheckMessages from documents.sanity_checker import check_sanity +_LEVEL_STYLE: dict[int, tuple[str, str]] = { + logging.ERROR: ("bold red", "ERROR"), + logging.WARNING: ("yellow", "WARN"), + logging.INFO: ("dim", "INFO"), +} -class Command(ProgressBarMixin, BaseCommand): + +class Command(PaperlessCommand): help = "This command checks your document archive for issues." - def add_arguments(self, parser): - self.add_argument_progress_bar_mixin(parser) + def _render_results(self, messages: SanityCheckMessages) -> None: + """Render sanity check results as a Rich table.""" - def handle(self, *args, **options): - self.handle_progress_bar_mixin(**options) - messages = check_sanity(progress=self.use_progress_bar, scheduled=False) + if ( + not messages.has_error + and not messages.has_warning + and not messages.has_info + ): + self.console.print( + Panel( + "[green]No issues detected.[/green]", + title="Sanity Check", + border_style="green", + ), + ) + return - messages.log_messages() + # Build a lookup for document titles + doc_pks = [pk for pk in messages.document_pks() if pk is not None] + titles: dict[int, str] = {} + if doc_pks: + titles = dict( + Document.global_objects.filter(pk__in=doc_pks) + .only("pk", "title") + .values_list("pk", "title"), + ) + + table = Table( + title="Sanity Check Results", + show_lines=True, + title_style="bold", + ) + table.add_column("Level", width=7, no_wrap=True) + table.add_column("Document", min_width=20) + table.add_column("Issue", ratio=1) + + for doc_pk, doc_messages in messages.iter_messages(): + if doc_pk is not None: + title = titles.get(doc_pk, "Unknown") + doc_label = f"#{doc_pk} {title}" + else: + doc_label = "(global)" + + for msg in doc_messages: + style, label = _LEVEL_STYLE.get( + msg["level"], + ("dim", "INFO"), + ) + table.add_row( + Text(label, style=style), + Text(doc_label), + Text(str(msg["message"])), + ) + + self.console.print(table) + + parts: list[str] = [] + + if messages.document_error_count: + parts.append( + f"{messages.document_error_count} document(s) with [bold red]errors[/bold red]", + ) + if messages.document_warning_count: + parts.append( + f"{messages.document_warning_count} document(s) with [yellow]warnings[/yellow]", + ) + if messages.document_info_count: + parts.append(f"{messages.document_info_count} document(s) with infos") + if messages.global_warning_count: + parts.append( + f"{messages.global_warning_count} global [yellow]warning(s)[/yellow]", + ) + + if parts: + if len(parts) > 1: + summary = ", ".join(parts[:-1]) + " and " + parts[-1] + else: + summary = parts[0] + self.console.print(f"\nFound {summary}.") + else: + self.console.print("\nNo issues found.") + + def handle(self, *args: Any, **options: Any) -> None: + messages = check_sanity( + scheduled=False, + iter_wrapper=lambda docs: self.track( + docs, + description="Checking documents...", + ), + ) + self._render_results(messages) diff --git a/src/documents/sanity_checker.py b/src/documents/sanity_checker.py index 08763d937..ef0d37e7d 100644 --- a/src/documents/sanity_checker.py +++ b/src/documents/sanity_checker.py @@ -1,80 +1,174 @@ +""" +Sanity checker for the Paperless-ngx document archive. + +Verifies that all documents have valid files, correct checksums, +and consistent metadata. Reports orphaned files in the media directory. + +Progress display is the caller's responsibility -- pass an ``iter_wrapper`` +to wrap the document queryset (e.g., with a progress bar). The default +is an identity function that adds no overhead. +""" + +from __future__ import annotations + import hashlib import logging import uuid from collections import defaultdict +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator from pathlib import Path +from typing import TYPE_CHECKING from typing import Final +from typing import TypedDict +from typing import TypeVar from celery import states from django.conf import settings from django.utils import timezone -from tqdm import tqdm from documents.models import Document from documents.models import PaperlessTask from paperless.config import GeneralConfig +logger = logging.getLogger("paperless.sanity_checker") + +_T = TypeVar("_T") +IterWrapper = Callable[[Iterable[_T]], Iterable[_T]] + + +class MessageEntry(TypedDict): + """A single sanity check message with its severity level.""" + + level: int + message: str + + +def _identity(iterable: Iterable[_T]) -> Iterable[_T]: + """Pass through an iterable unchanged (default iter_wrapper).""" + return iterable + class SanityCheckMessages: - def __init__(self) -> None: - self._messages: dict[int, list[dict]] = defaultdict(list) - self.has_error = False - self.has_warning = False + """Collects sanity check messages grouped by document primary key. - def error(self, doc_pk, message) -> None: + Messages are categorized as error, warning, or info. ``None`` is used + as the key for messages not associated with a specific document + (e.g., orphaned files). + """ + + def __init__(self) -> None: + self._messages: dict[int | None, list[MessageEntry]] = defaultdict(list) + self.has_error: bool = False + self.has_warning: bool = False + self.has_info: bool = False + self.document_count: int = 0 + self.document_error_count: int = 0 + self.document_warning_count: int = 0 + self.document_info_count: int = 0 + self.global_warning_count: int = 0 + + # -- Recording ---------------------------------------------------------- + + def error(self, doc_pk: int | None, message: str) -> None: self._messages[doc_pk].append({"level": logging.ERROR, "message": message}) self.has_error = True + if doc_pk is not None: + self.document_count += 1 + self.document_error_count += 1 - def warning(self, doc_pk, message) -> None: + def warning(self, doc_pk: int | None, message: str) -> None: self._messages[doc_pk].append({"level": logging.WARNING, "message": message}) self.has_warning = True - def info(self, doc_pk, message) -> None: + if doc_pk is not None: + self.document_count += 1 + self.document_warning_count += 1 + else: + # This is the only type of global message we do right now + self.global_warning_count += 1 + + def info(self, doc_pk: int | None, message: str) -> None: self._messages[doc_pk].append({"level": logging.INFO, "message": message}) + self.has_info = True + + if doc_pk is not None: + self.document_count += 1 + self.document_info_count += 1 + + # -- Iteration / query -------------------------------------------------- + + def document_pks(self) -> list[int | None]: + """Return all document PKs (including None for global messages).""" + return list(self._messages.keys()) + + def iter_messages(self) -> Iterator[tuple[int | None, list[MessageEntry]]]: + """Iterate over (doc_pk, messages) pairs.""" + yield from self._messages.items() + + def __getitem__(self, item: int | None) -> list[MessageEntry]: + return self._messages[item] + + # -- Summarize Helpers -------------------------------------------------- + + @property + def has_global_issues(self) -> bool: + return None in self._messages + + @property + def total_issue_count(self) -> int: + """Total number of error and warning messages across all documents and global.""" + return ( + self.document_error_count + + self.document_warning_count + + self.global_warning_count + ) + + # -- Logging output (used by Celery task path) -------------------------- def log_messages(self) -> None: - logger = logging.getLogger("paperless.sanity_checker") + """Write all messages to the ``paperless.sanity_checker`` logger. + This is the output path for headless / Celery execution. + Management commands use Rich rendering instead. + """ if len(self._messages) == 0: logger.info("Sanity checker detected no issues.") - else: - # Query once - all_docs = Document.global_objects.all() + return - for doc_pk in self._messages: - if doc_pk is not None: - doc = all_docs.get(pk=doc_pk) - logger.info( - f"Detected following issue(s) with document #{doc.pk}," - f" titled {doc.title}", - ) - for msg in self._messages[doc_pk]: - logger.log(msg["level"], msg["message"]) + doc_pks = [pk for pk in self._messages if pk is not None] + titles: dict[int, str] = {} + if doc_pks: + titles = dict( + Document.global_objects.filter(pk__in=doc_pks) + .only("pk", "title") + .values_list("pk", "title"), + ) - def __len__(self): - return len(self._messages) - - def __getitem__(self, item): - return self._messages[item] + for doc_pk, entries in self._messages.items(): + if doc_pk is not None: + title = titles.get(doc_pk, "Unknown") + logger.info( + "Detected following issue(s) with document #%s, titled %s", + doc_pk, + title, + ) + for msg in entries: + logger.log(msg["level"], msg["message"]) class SanityCheckFailedException(Exception): pass -def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages: - paperless_task = PaperlessTask.objects.create( - task_id=uuid.uuid4(), - type=PaperlessTask.TaskType.SCHEDULED_TASK - if scheduled - else PaperlessTask.TaskType.MANUAL_TASK, - task_name=PaperlessTask.TaskName.CHECK_SANITY, - status=states.STARTED, - date_created=timezone.now(), - date_started=timezone.now(), - ) - messages = SanityCheckMessages() +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _build_present_files() -> set[Path]: + """Collect all files in MEDIA_ROOT, excluding directories and ignorable files.""" present_files = { x.resolve() for x in Path(settings.MEDIA_ROOT).glob("**/*") @@ -82,95 +176,178 @@ def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages: } lockfile = Path(settings.MEDIA_LOCK).resolve() - if lockfile in present_files: - present_files.remove(lockfile) + present_files.discard(lockfile) general_config = GeneralConfig() app_logo = general_config.app_logo or settings.APP_LOGO if app_logo: logo_file = Path(settings.MEDIA_ROOT / Path(app_logo.lstrip("/"))).resolve() - if logo_file in present_files: - present_files.remove(logo_file) + present_files.discard(logo_file) - for doc in tqdm(Document.global_objects.all(), disable=not progress): - # Check sanity of the thumbnail - thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve() - if not thumbnail_path.exists() or not thumbnail_path.is_file(): - messages.error(doc.pk, "Thumbnail of document does not exist.") - else: - if thumbnail_path in present_files: - present_files.remove(thumbnail_path) - try: - _ = thumbnail_path.read_bytes() - except OSError as e: - messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}") + return present_files - # Check sanity of the original file - # TODO: extract method - source_path: Final[Path] = Path(doc.source_path).resolve() - if not source_path.exists() or not source_path.is_file(): - messages.error(doc.pk, "Original of document does not exist.") - else: - if source_path in present_files: - present_files.remove(source_path) - try: - checksum = hashlib.md5(source_path.read_bytes()).hexdigest() - except OSError as e: - messages.error(doc.pk, f"Cannot read original file of document: {e}") - else: - if checksum != doc.checksum: - messages.error( - doc.pk, - "Checksum mismatch. " - f"Stored: {doc.checksum}, actual: {checksum}.", - ) - # Check sanity of the archive file. - if doc.archive_checksum is not None and doc.archive_filename is None: +def _check_thumbnail( + doc: Document, + messages: SanityCheckMessages, + present_files: set[Path], +) -> None: + """Verify the thumbnail exists and is readable.""" + thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve() + if not thumbnail_path.exists() or not thumbnail_path.is_file(): + messages.error(doc.pk, "Thumbnail of document does not exist.") + return + + present_files.discard(thumbnail_path) + try: + _ = thumbnail_path.read_bytes() + except OSError as e: + messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}") + + +def _check_original( + doc: Document, + messages: SanityCheckMessages, + present_files: set[Path], +) -> None: + """Verify the original file exists, is readable, and has matching checksum.""" + source_path: Final[Path] = Path(doc.source_path).resolve() + if not source_path.exists() or not source_path.is_file(): + messages.error(doc.pk, "Original of document does not exist.") + return + + present_files.discard(source_path) + try: + checksum = hashlib.md5(source_path.read_bytes()).hexdigest() + except OSError as e: + messages.error(doc.pk, f"Cannot read original file of document: {e}") + else: + if checksum != doc.checksum: messages.error( doc.pk, - "Document has an archive file checksum, but no archive filename.", + f"Checksum mismatch. Stored: {doc.checksum}, actual: {checksum}.", ) - elif doc.archive_checksum is None and doc.archive_filename is not None: + + +def _check_archive( + doc: Document, + messages: SanityCheckMessages, + present_files: set[Path], +) -> None: + """Verify archive file consistency: checksum/filename pairing and file integrity.""" + if doc.archive_checksum is not None and doc.archive_filename is None: + messages.error( + doc.pk, + "Document has an archive file checksum, but no archive filename.", + ) + elif doc.archive_checksum is None and doc.archive_filename is not None: + messages.error( + doc.pk, + "Document has an archive file, but its checksum is missing.", + ) + elif doc.has_archive_version: + if TYPE_CHECKING: + assert isinstance(doc.archive_path, Path) + archive_path: Final[Path] = Path(doc.archive_path).resolve() + if not archive_path.exists() or not archive_path.is_file(): + messages.error(doc.pk, "Archived version of document does not exist.") + return + + present_files.discard(archive_path) + try: + checksum = hashlib.md5(archive_path.read_bytes()).hexdigest() + except OSError as e: messages.error( doc.pk, - "Document has an archive file, but its checksum is missing.", + f"Cannot read archive file of document: {e}", ) - elif doc.has_archive_version: - archive_path: Final[Path] = Path(doc.archive_path).resolve() - if not archive_path.exists() or not archive_path.is_file(): - messages.error(doc.pk, "Archived version of document does not exist.") - else: - if archive_path in present_files: - present_files.remove(archive_path) - try: - checksum = hashlib.md5(archive_path.read_bytes()).hexdigest() - except OSError as e: - messages.error( - doc.pk, - f"Cannot read archive file of document : {e}", - ) - else: - if checksum != doc.archive_checksum: - messages.error( - doc.pk, - "Checksum mismatch of archived document. " - f"Stored: {doc.archive_checksum}, " - f"actual: {checksum}.", - ) + else: + if checksum != doc.archive_checksum: + messages.error( + doc.pk, + "Checksum mismatch of archived document. " + f"Stored: {doc.archive_checksum}, actual: {checksum}.", + ) - # other document checks - if not doc.content: - messages.info(doc.pk, "Document contains no OCR data") + +def _check_content(doc: Document, messages: SanityCheckMessages) -> None: + """Flag documents with no OCR content.""" + if not doc.content: + messages.info(doc.pk, "Document contains no OCR data") + + +def _check_document( + doc: Document, + messages: SanityCheckMessages, + present_files: set[Path], +) -> None: + """Run all checks for a single document.""" + _check_thumbnail(doc, messages, present_files) + _check_original(doc, messages, present_files) + _check_archive(doc, messages, present_files) + _check_content(doc, messages) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def check_sanity( + *, + scheduled: bool = True, + iter_wrapper: IterWrapper[Document] = _identity, +) -> SanityCheckMessages: + """Run a full sanity check on the document archive. + + Args: + scheduled: Whether this is a scheduled (automatic) or manual check. + Controls the task type recorded in the database. + iter_wrapper: A callable that wraps the document iterable, e.g., + for progress bar display. Defaults to identity (no wrapping). + + Returns: + A SanityCheckMessages instance containing all detected issues. + """ + paperless_task = PaperlessTask.objects.create( + task_id=uuid.uuid4(), + type=( + PaperlessTask.TaskType.SCHEDULED_TASK + if scheduled + else PaperlessTask.TaskType.MANUAL_TASK + ), + task_name=PaperlessTask.TaskName.CHECK_SANITY, + status=states.STARTED, + date_created=timezone.now(), + date_started=timezone.now(), + ) + + messages = SanityCheckMessages() + present_files = _build_present_files() + + documents = Document.global_objects.all() + for doc in iter_wrapper(documents): + _check_document(doc, messages, present_files) for extra_file in present_files: messages.warning(None, f"Orphaned file in media dir: {extra_file}") paperless_task.status = states.SUCCESS if not messages.has_error else states.FAILURE - # result is concatenated messages - paperless_task.result = f"{len(messages)} issues found." - if messages.has_error: - paperless_task.result += " Check logs for details." + if messages.total_issue_count == 0: + paperless_task.result = "No issues found." + else: + parts: list[str] = [] + if messages.document_error_count: + parts.append(f"{messages.document_error_count} document(s) with errors") + if messages.document_warning_count: + parts.append(f"{messages.document_warning_count} document(s) with warnings") + if messages.global_warning_count: + parts.append(f"{messages.global_warning_count} global warning(s)") + paperless_task.result = ", ".join(parts) + " found." + if messages.has_error: + paperless_task.result += " Check logs for details." + paperless_task.date_done = timezone.now() paperless_task.save(update_fields=["status", "result", "date_done"]) + return messages diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 1c4ce2d8f..abca0a6ba 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -237,20 +237,30 @@ def consume_file( @shared_task def sanity_check(*, scheduled=True, raise_on_error=True): messages = sanity_checker.check_sanity(scheduled=scheduled) - messages.log_messages() + if not messages.has_error and not messages.has_warning and not messages.has_info: + return "No issues detected." + + parts: list[str] = [] + if messages.document_error_count: + parts.append(f"{messages.document_error_count} document(s) with errors") + if messages.document_warning_count: + parts.append(f"{messages.document_warning_count} document(s) with warnings") + if messages.document_info_count: + parts.append(f"{messages.document_info_count} document(s) with infos") + if messages.global_warning_count: + parts.append(f"{messages.global_warning_count} global warning(s)") + + summary = ", ".join(parts) + " found." + if messages.has_error: - message = "Sanity check exited with errors. See log." + message = summary + " Check logs for details." if raise_on_error: raise SanityCheckFailedException(message) return message - elif messages.has_warning: - return "Sanity check exited with warnings. See log." - elif len(messages) > 0: - return "Sanity check exited with infos. See log." - else: - return "No issues detected." + + return summary @shared_task diff --git a/src/documents/tests/conftest.py b/src/documents/tests/conftest.py index 8c88cee9f..6666afae5 100644 --- a/src/documents/tests/conftest.py +++ b/src/documents/tests/conftest.py @@ -1,10 +1,96 @@ +import shutil import zoneinfo +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING +import filelock import pytest from django.contrib.auth import get_user_model from pytest_django.fixtures import SettingsWrapper from rest_framework.test import APIClient +from documents.tests.factories import DocumentFactory + +if TYPE_CHECKING: + from documents.models import Document + + +@dataclass(frozen=True, slots=True) +class PaperlessDirs: + """Standard Paperless-ngx directory layout for tests.""" + + media: Path + originals: Path + archive: Path + thumbnails: Path + + +@pytest.fixture(scope="session") +def samples_dir() -> Path: + """Path to the shared test sample documents.""" + return Path(__file__).parent / "samples" / "documents" + + +@pytest.fixture() +def paperless_dirs(tmp_path: Path) -> PaperlessDirs: + """Create and return the directory structure for testing.""" + media = tmp_path / "media" + dirs = PaperlessDirs( + media=media, + originals=media / "documents" / "originals", + archive=media / "documents" / "archive", + thumbnails=media / "documents" / "thumbnails", + ) + for d in (dirs.originals, dirs.archive, dirs.thumbnails): + d.mkdir(parents=True) + return dirs + + +@pytest.fixture() +def _media_settings(paperless_dirs: PaperlessDirs, settings) -> None: + """Configure Django settings to point at temp directories.""" + settings.MEDIA_ROOT = paperless_dirs.media + settings.ORIGINALS_DIR = paperless_dirs.originals + settings.ARCHIVE_DIR = paperless_dirs.archive + settings.THUMBNAIL_DIR = paperless_dirs.thumbnails + settings.MEDIA_LOCK = paperless_dirs.media / "media.lock" + settings.IGNORABLE_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"} + settings.APP_LOGO = "" + + +@pytest.fixture() +def sample_doc( + paperless_dirs: PaperlessDirs, + _media_settings: None, + samples_dir: Path, +) -> "Document": + """Create a document with valid files and matching checksums.""" + with filelock.FileLock(paperless_dirs.media / "media.lock"): + shutil.copy( + samples_dir / "originals" / "0000001.pdf", + paperless_dirs.originals / "0000001.pdf", + ) + shutil.copy( + samples_dir / "archive" / "0000001.pdf", + paperless_dirs.archive / "0000001.pdf", + ) + shutil.copy( + samples_dir / "thumbnails" / "0000001.webp", + paperless_dirs.thumbnails / "0000001.webp", + ) + + return DocumentFactory( + title="test", + checksum="42995833e01aea9b3edee44bbfdd7ce1", + archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b", + content="test content", + pk=1, + filename="0000001.pdf", + mime_type="application/pdf", + archive_filename="0000001.pdf", + ) + @pytest.fixture() def settings_timezone(settings: SettingsWrapper) -> zoneinfo.ZoneInfo: diff --git a/src/documents/tests/management/test_management_sanity_checker.py b/src/documents/tests/management/test_management_sanity_checker.py new file mode 100644 index 000000000..64f21e966 --- /dev/null +++ b/src/documents/tests/management/test_management_sanity_checker.py @@ -0,0 +1,193 @@ +"""Tests for the document_sanity_checker management command. + +Verifies Rich rendering (table, panel, summary) and end-to-end CLI behavior. +""" + +from __future__ import annotations + +from io import StringIO +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest +from django.core.management import call_command +from rich.console import Console + +from documents.management.commands.document_sanity_checker import Command +from documents.sanity_checker import SanityCheckMessages +from documents.tests.factories import DocumentFactory + +if TYPE_CHECKING: + from documents.models import Document + from documents.tests.conftest import PaperlessDirs + + +def _render_to_string(messages: SanityCheckMessages) -> str: + """Render command output to a plain string for assertion.""" + buf = StringIO() + cmd = Command() + cmd.console = Console(file=buf, width=120, no_color=True) + cmd._render_results(messages) + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# Rich rendering +# --------------------------------------------------------------------------- + + +class TestRenderResultsNoIssues: + """No DB access needed -- renders an empty SanityCheckMessages.""" + + def test_shows_panel(self) -> None: + output = _render_to_string(SanityCheckMessages()) + assert "No issues detected" in output + assert "Sanity Check" in output + + +@pytest.mark.django_db +class TestRenderResultsWithIssues: + def test_error_row(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.error(sample_doc.pk, "Original missing") + output = _render_to_string(msgs) + assert "Sanity Check Results" in output + assert "ERROR" in output + assert "Original missing" in output + assert f"#{sample_doc.pk}" in output + assert sample_doc.title in output + + def test_warning_row(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.warning(sample_doc.pk, "Suspicious file") + output = _render_to_string(msgs) + assert "WARN" in output + assert "Suspicious file" in output + + def test_info_row(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.info(sample_doc.pk, "No OCR data") + output = _render_to_string(msgs) + assert "INFO" in output + assert "No OCR data" in output + + @pytest.mark.usefixtures("_media_settings") + def test_global_message(self) -> None: + msgs = SanityCheckMessages() + msgs.warning(None, "Orphaned file: /tmp/stray.pdf") + output = _render_to_string(msgs) + assert "(global)" in output + assert "Orphaned file" in output + + def test_multiple_messages_same_doc(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.error(sample_doc.pk, "Thumbnail missing") + msgs.error(sample_doc.pk, "Checksum mismatch") + output = _render_to_string(msgs) + assert "Thumbnail missing" in output + assert "Checksum mismatch" in output + + @pytest.mark.usefixtures("_media_settings") + def test_unknown_doc_pk(self) -> None: + msgs = SanityCheckMessages() + msgs.error(99999, "Ghost document") + output = _render_to_string(msgs) + assert "#99999" in output + assert "Unknown" in output + + +@pytest.mark.django_db +class TestRenderResultsSummary: + def test_errors_only(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.error(sample_doc.pk, "broken") + output = _render_to_string(msgs) + assert "1 document(s) with" in output + assert "errors" in output + + def test_warnings_only(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.warning(sample_doc.pk, "odd") + output = _render_to_string(msgs) + assert "1 document(s) with" in output + assert "warnings" in output + + def test_infos_only(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.info(sample_doc.pk, "no OCR") + output = _render_to_string(msgs) + assert "1 document(s) with infos" in output + + def test_empty_messages(self) -> None: + msgs = SanityCheckMessages() + output = _render_to_string(msgs) + assert "No issues detected." in output + + def test_document_errors_and_global_warnings(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.error(sample_doc.pk, "broken") + msgs.warning(None, "orphan") + output = _render_to_string(msgs) + assert "1 document(s) with" in output + assert "errors" in output + assert "1 global warning(s)" in output + assert "2 document(s)" not in output + + def test_global_warnings_only(self) -> None: + msgs = SanityCheckMessages() + msgs.warning(None, "extra file") + output = _render_to_string(msgs) + assert "1 global warning(s)" in output + assert "document(s) with" not in output + + def test_all_levels_combined(self, sample_doc: Document) -> None: + msgs = SanityCheckMessages() + msgs.error(sample_doc.pk, "broken") + msgs.warning(sample_doc.pk, "odd") + msgs.info(sample_doc.pk, "fyi") + msgs.warning(None, "extra file") + output = _render_to_string(msgs) + assert "1 document(s) with errors" in output + assert "1 document(s) with warnings" in output + assert "1 document(s) with infos" in output + assert "1 global warning(s)" in output + + +# --------------------------------------------------------------------------- +# End-to-end command execution +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +@pytest.mark.management +class TestDocumentSanityCheckerCommand: + def test_no_issues(self, sample_doc: Document) -> None: + out = StringIO() + call_command("document_sanity_checker", "--no-progress-bar", stdout=out) + assert "No issues detected" in out.getvalue() + + def test_missing_original(self, sample_doc: Document) -> None: + Path(sample_doc.source_path).unlink() + out = StringIO() + call_command("document_sanity_checker", "--no-progress-bar", stdout=out) + output = out.getvalue() + assert "ERROR" in output + assert "Original of document does not exist" in output + + @pytest.mark.usefixtures("_media_settings") + def test_checksum_mismatch(self, paperless_dirs: PaperlessDirs) -> None: + """Lightweight document with zero-byte files triggers checksum mismatch.""" + doc = DocumentFactory( + title="test", + content="test", + filename="test.pdf", + checksum="abc", + ) + Path(doc.source_path).touch() + Path(doc.thumbnail_path).touch() + + out = StringIO() + call_command("document_sanity_checker", "--no-progress-bar", stdout=out) + output = out.getvalue() + assert "ERROR" in output + assert "Checksum mismatch. Stored: abc, actual:" in output diff --git a/src/documents/tests/test_management.py b/src/documents/tests/test_management.py index 074e8039a..03959a85b 100644 --- a/src/documents/tests/test_management.py +++ b/src/documents/tests/test_management.py @@ -134,6 +134,7 @@ class TestRenamer(DirectoriesMixin, FileSystemAssertsMixin, TestCase): self.assertIsFile(doc2.archive_path) +@pytest.mark.management class TestCreateClassifier(TestCase): @mock.patch( "documents.management.commands.document_create_classifier.train_classifier", @@ -144,32 +145,6 @@ class TestCreateClassifier(TestCase): m.assert_called_once() -@pytest.mark.management -class TestSanityChecker(DirectoriesMixin, TestCase): - def test_no_issues(self) -> None: - with self.assertLogs() as capture: - call_command("document_sanity_checker") - - self.assertEqual(len(capture.output), 1) - self.assertIn("Sanity checker detected no issues.", capture.output[0]) - - def test_errors(self) -> None: - doc = Document.objects.create( - title="test", - content="test", - filename="test.pdf", - checksum="abc", - ) - Path(doc.source_path).touch() - Path(doc.thumbnail_path).touch() - - with self.assertLogs() as capture: - call_command("document_sanity_checker") - - self.assertEqual(len(capture.output), 2) - self.assertIn("Checksum mismatch. Stored: abc, actual:", capture.output[1]) - - @pytest.mark.management class TestConvertMariaDBUUID(TestCase): @mock.patch("django.db.connection.schema_editor") diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index e6daf5991..2d17aaec6 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -288,7 +288,7 @@ class TestExportImport( self.assertEqual(Permission.objects.count(), num_permission_objects) messages = check_sanity() # everything is alright after the test - self.assertEqual(len(messages), 0) + self.assertEqual(messages.total_issue_count, 0) def test_exporter_with_filename_format(self) -> None: shutil.rmtree(Path(self.dirs.media_dir) / "documents") diff --git a/src/documents/tests/test_sanity_check.py b/src/documents/tests/test_sanity_check.py index 415b0967f..e62c17303 100644 --- a/src/documents/tests/test_sanity_check.py +++ b/src/documents/tests/test_sanity_check.py @@ -1,192 +1,295 @@ -import logging -import shutil -from pathlib import Path +"""Tests for the sanity checker module. -import filelock -from django.conf import settings -from django.test import TestCase -from django.test import override_settings +Tests exercise ``check_sanity`` as a whole, verifying document validation, +orphan detection, task recording, and the iter_wrapper contract. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest from documents.models import Document +from documents.models import PaperlessTask from documents.sanity_checker import check_sanity -from documents.tests.utils import DirectoriesMixin + +if TYPE_CHECKING: + from collections.abc import Iterable + + from documents.tests.conftest import PaperlessDirs -class TestSanityCheck(DirectoriesMixin, TestCase): - def make_test_data(self): - with filelock.FileLock(settings.MEDIA_LOCK): - # just make sure that the lockfile is present. - shutil.copy( - ( - Path(__file__).parent - / "samples" - / "documents" - / "originals" - / "0000001.pdf" - ), - Path(self.dirs.originals_dir) / "0000001.pdf", - ) - shutil.copy( - ( - Path(__file__).parent - / "samples" - / "documents" - / "archive" - / "0000001.pdf" - ), - Path(self.dirs.archive_dir) / "0000001.pdf", - ) - shutil.copy( - ( - Path(__file__).parent - / "samples" - / "documents" - / "thumbnails" - / "0000001.webp" - ), - Path(self.dirs.thumbnail_dir) / "0000001.webp", - ) +@pytest.mark.django_db +class TestCheckSanityNoDocuments: + """Sanity checks against an empty archive.""" - return Document.objects.create( - title="test", - checksum="42995833e01aea9b3edee44bbfdd7ce1", - archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b", - content="test", - pk=1, - filename="0000001.pdf", - mime_type="application/pdf", - archive_filename="0000001.pdf", - ) - - def assertSanityError(self, doc: Document, messageRegex) -> None: + @pytest.mark.usefixtures("_media_settings") + def test_no_documents(self) -> None: messages = check_sanity() - self.assertTrue(messages.has_error) - with self.assertLogs() as capture: + assert not messages.has_error + assert not messages.has_warning + assert messages.total_issue_count == 0 + + @pytest.mark.usefixtures("_media_settings") + def test_no_issues_logs_clean(self, caplog: pytest.LogCaptureFixture) -> None: + messages = check_sanity() + with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"): messages.log_messages() - self.assertEqual( - capture.records[0].message, - f"Detected following issue(s) with document #{doc.pk}, titled {doc.title}", - ) - self.assertRegex(capture.records[1].message, messageRegex) + assert "Sanity checker detected no issues." in caplog.text - def test_no_issues(self) -> None: - self.make_test_data() + +@pytest.mark.django_db +class TestCheckSanityHealthyDocument: + def test_no_errors(self, sample_doc: Document) -> None: messages = check_sanity() - self.assertFalse(messages.has_error) - self.assertFalse(messages.has_warning) - with self.assertLogs() as capture: - messages.log_messages() - self.assertEqual(len(capture.output), 1) - self.assertEqual(capture.records[0].levelno, logging.INFO) - self.assertEqual( - capture.records[0].message, - "Sanity checker detected no issues.", - ) + assert not messages.has_error + assert not messages.has_warning + assert messages.total_issue_count == 0 - def test_no_docs(self) -> None: - self.assertEqual(len(check_sanity()), 0) - def test_success(self) -> None: - self.make_test_data() - self.assertEqual(len(check_sanity()), 0) - - def test_no_thumbnail(self) -> None: - doc = self.make_test_data() - Path(doc.thumbnail_path).unlink() - self.assertSanityError(doc, "Thumbnail of document does not exist") - - def test_thumbnail_no_access(self) -> None: - doc = self.make_test_data() - Path(doc.thumbnail_path).chmod(0o000) - self.assertSanityError(doc, "Cannot read thumbnail file of document") - Path(doc.thumbnail_path).chmod(0o777) - - def test_no_original(self) -> None: - doc = self.make_test_data() - Path(doc.source_path).unlink() - self.assertSanityError(doc, "Original of document does not exist.") - - def test_original_no_access(self) -> None: - doc = self.make_test_data() - Path(doc.source_path).chmod(0o000) - self.assertSanityError(doc, "Cannot read original file of document") - Path(doc.source_path).chmod(0o777) - - def test_original_checksum_mismatch(self) -> None: - doc = self.make_test_data() - doc.checksum = "WOW" - doc.save() - self.assertSanityError(doc, "Checksum mismatch. Stored: WOW, actual: ") - - def test_no_archive(self) -> None: - doc = self.make_test_data() - Path(doc.archive_path).unlink() - self.assertSanityError(doc, "Archived version of document does not exist.") - - def test_archive_no_access(self) -> None: - doc = self.make_test_data() - Path(doc.archive_path).chmod(0o000) - self.assertSanityError(doc, "Cannot read archive file of document") - Path(doc.archive_path).chmod(0o777) - - def test_archive_checksum_mismatch(self) -> None: - doc = self.make_test_data() - doc.archive_checksum = "WOW" - doc.save() - self.assertSanityError(doc, "Checksum mismatch of archived document") - - def test_empty_content(self) -> None: - doc = self.make_test_data() - doc.content = "" - doc.save() +@pytest.mark.django_db +class TestCheckSanityThumbnail: + def test_missing(self, sample_doc: Document) -> None: + Path(sample_doc.thumbnail_path).unlink() messages = check_sanity() - self.assertFalse(messages.has_error) - self.assertFalse(messages.has_warning) - self.assertEqual(len(messages), 1) - self.assertRegex( - messages[doc.pk][0]["message"], - "Document contains no OCR data", + assert messages.has_error + assert any( + "Thumbnail of document does not exist" in m["message"] + for m in messages[sample_doc.pk] ) - def test_orphaned_file(self) -> None: - self.make_test_data() - Path(self.dirs.originals_dir, "orphaned").touch() + def test_unreadable(self, sample_doc: Document) -> None: + thumb = Path(sample_doc.thumbnail_path) + thumb.chmod(0o000) + try: + messages = check_sanity() + assert messages.has_error + assert any( + "Cannot read thumbnail" in m["message"] for m in messages[sample_doc.pk] + ) + finally: + thumb.chmod(0o644) + + +@pytest.mark.django_db +class TestCheckSanityOriginal: + def test_missing(self, sample_doc: Document) -> None: + Path(sample_doc.source_path).unlink() messages = check_sanity() - self.assertTrue(messages.has_warning) - self.assertRegex( - messages._messages[None][0]["message"], - "Orphaned file in media dir", + assert messages.has_error + assert any( + "Original of document does not exist" in m["message"] + for m in messages[sample_doc.pk] ) - @override_settings( - APP_LOGO="logo/logo.png", + def test_checksum_mismatch(self, sample_doc: Document) -> None: + sample_doc.checksum = "badhash" + sample_doc.save() + messages = check_sanity() + assert messages.has_error + assert any( + "Checksum mismatch" in m["message"] and "badhash" in m["message"] + for m in messages[sample_doc.pk] + ) + + def test_unreadable(self, sample_doc: Document) -> None: + src = Path(sample_doc.source_path) + src.chmod(0o000) + try: + messages = check_sanity() + assert messages.has_error + assert any( + "Cannot read original" in m["message"] for m in messages[sample_doc.pk] + ) + finally: + src.chmod(0o644) + + +@pytest.mark.django_db +class TestCheckSanityArchive: + def test_checksum_without_filename(self, sample_doc: Document) -> None: + sample_doc.archive_filename = None + sample_doc.save() + messages = check_sanity() + assert messages.has_error + assert any( + "checksum, but no archive filename" in m["message"] + for m in messages[sample_doc.pk] + ) + + def test_filename_without_checksum(self, sample_doc: Document) -> None: + sample_doc.archive_checksum = None + sample_doc.save() + messages = check_sanity() + assert messages.has_error + assert any( + "checksum is missing" in m["message"] for m in messages[sample_doc.pk] + ) + + def test_missing_file(self, sample_doc: Document) -> None: + Path(sample_doc.archive_path).unlink() + messages = check_sanity() + assert messages.has_error + assert any( + "Archived version of document does not exist" in m["message"] + for m in messages[sample_doc.pk] + ) + + def test_checksum_mismatch(self, sample_doc: Document) -> None: + sample_doc.archive_checksum = "wronghash" + sample_doc.save() + messages = check_sanity() + assert messages.has_error + assert any( + "Checksum mismatch of archived document" in m["message"] + for m in messages[sample_doc.pk] + ) + + def test_unreadable(self, sample_doc: Document) -> None: + archive = Path(sample_doc.archive_path) + archive.chmod(0o000) + try: + messages = check_sanity() + assert messages.has_error + assert any( + "Cannot read archive" in m["message"] for m in messages[sample_doc.pk] + ) + finally: + archive.chmod(0o644) + + def test_no_archive_at_all(self, sample_doc: Document) -> None: + """Document with neither archive checksum nor filename is valid.""" + Path(sample_doc.archive_path).unlink() + sample_doc.archive_checksum = None + sample_doc.archive_filename = None + sample_doc.save() + messages = check_sanity() + assert not messages.has_error + + +@pytest.mark.django_db +class TestCheckSanityContent: + @pytest.mark.parametrize( + "content", + [ + pytest.param("", id="empty-string"), + ], ) - def test_ignore_logo(self) -> None: - self.make_test_data() - logo_dir = Path(self.dirs.media_dir, "logo") - logo_dir.mkdir(parents=True, exist_ok=True) - Path(self.dirs.media_dir, "logo", "logo.png").touch() + def test_no_content(self, sample_doc: Document, content: str) -> None: + sample_doc.content = content + sample_doc.save() messages = check_sanity() - self.assertFalse(messages.has_warning) + assert not messages.has_error + assert not messages.has_warning + assert any("no OCR data" in m["message"] for m in messages[sample_doc.pk]) - def test_ignore_ignorable_files(self) -> None: - self.make_test_data() - Path(self.dirs.media_dir, ".DS_Store").touch() - Path(self.dirs.media_dir, "desktop.ini").touch() + +@pytest.mark.django_db +class TestCheckSanityOrphans: + def test_orphaned_file( + self, + sample_doc: Document, + paperless_dirs: PaperlessDirs, + ) -> None: + (paperless_dirs.originals / "orphan.pdf").touch() messages = check_sanity() - self.assertFalse(messages.has_warning) + assert messages.has_warning + assert any("Orphaned file" in m["message"] for m in messages[None]) - def test_archive_filename_no_checksum(self) -> None: - doc = self.make_test_data() - doc.archive_checksum = None - doc.save() - self.assertSanityError(doc, "has an archive file, but its checksum is missing.") + @pytest.mark.usefixtures("_media_settings") + def test_ignorable_files_not_flagged( + self, + paperless_dirs: PaperlessDirs, + ) -> None: + (paperless_dirs.media / ".DS_Store").touch() + (paperless_dirs.media / "desktop.ini").touch() + messages = check_sanity() + assert not messages.has_warning - def test_archive_checksum_no_filename(self) -> None: - doc = self.make_test_data() - doc.archive_filename = None - doc.save() - self.assertSanityError( - doc, - "has an archive file checksum, but no archive filename.", - ) + +@pytest.mark.django_db +class TestCheckSanityIterWrapper: + def test_wrapper_receives_documents(self, sample_doc: Document) -> None: + seen: list[Document] = [] + + def tracking(iterable: Iterable[Document]) -> Iterable[Document]: + for item in iterable: + seen.append(item) + yield item + + check_sanity(iter_wrapper=tracking) + assert len(seen) == 1 + assert seen[0].pk == sample_doc.pk + + def test_default_works_without_wrapper(self, sample_doc: Document) -> None: + messages = check_sanity() + assert not messages.has_error + + +@pytest.mark.django_db +class TestCheckSanityTaskRecording: + @pytest.mark.parametrize( + ("expected_type", "scheduled"), + [ + pytest.param(PaperlessTask.TaskType.SCHEDULED_TASK, True, id="scheduled"), + pytest.param(PaperlessTask.TaskType.MANUAL_TASK, False, id="manual"), + ], + ) + @pytest.mark.usefixtures("_media_settings") + def test_task_type(self, expected_type: str, *, scheduled: bool) -> None: + check_sanity(scheduled=scheduled) + task = PaperlessTask.objects.latest("date_created") + assert task.task_name == PaperlessTask.TaskName.CHECK_SANITY + assert task.type == expected_type + + def test_success_status(self, sample_doc: Document) -> None: + check_sanity() + task = PaperlessTask.objects.latest("date_created") + assert task.status == "SUCCESS" + + def test_failure_status(self, sample_doc: Document) -> None: + Path(sample_doc.source_path).unlink() + check_sanity() + task = PaperlessTask.objects.latest("date_created") + assert task.status == "FAILURE" + assert "Check logs for details" in task.result + + +@pytest.mark.django_db +class TestCheckSanityLogMessages: + def test_logs_doc_issues( + self, + sample_doc: Document, + caplog: pytest.LogCaptureFixture, + ) -> None: + Path(sample_doc.source_path).unlink() + messages = check_sanity() + with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"): + messages.log_messages() + assert f"document #{sample_doc.pk}" in caplog.text + assert "Original of document does not exist" in caplog.text + + def test_logs_global_issues( + self, + sample_doc: Document, + paperless_dirs: PaperlessDirs, + caplog: pytest.LogCaptureFixture, + ) -> None: + (paperless_dirs.originals / "orphan.pdf").touch() + messages = check_sanity() + with caplog.at_level(logging.WARNING, logger="paperless.sanity_checker"): + messages.log_messages() + assert "Orphaned file" in caplog.text + + @pytest.mark.usefixtures("_media_settings") + def test_logs_unknown_doc_pk(self, caplog: pytest.LogCaptureFixture) -> None: + """A doc PK not in the DB logs 'Unknown' as the title.""" + messages = check_sanity() + messages.error(99999, "Ghost document") + with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"): + messages.log_messages() + assert "#99999" in caplog.text + assert "Unknown" in caplog.text diff --git a/src/documents/tests/test_tasks.py b/src/documents/tests/test_tasks.py index 4647c19ba..37f1e6fed 100644 --- a/src/documents/tests/test_tasks.py +++ b/src/documents/tests/test_tasks.py @@ -3,6 +3,7 @@ from datetime import timedelta from pathlib import Path from unittest import mock +import pytest from celery import states from django.conf import settings from django.test import TestCase @@ -105,55 +106,83 @@ class TestClassifier(DirectoriesMixin, FileSystemAssertsMixin, TestCase): self.assertNotEqual(mtime2, mtime3) -class TestSanityCheck(DirectoriesMixin, TestCase): - @mock.patch("documents.tasks.sanity_checker.check_sanity") - def test_sanity_check_success(self, m) -> None: - m.return_value = SanityCheckMessages() - self.assertEqual(tasks.sanity_check(), "No issues detected.") - m.assert_called_once() +@pytest.mark.django_db +class TestSanityCheck: + @pytest.fixture + def mock_check_sanity(self, mocker) -> mock.MagicMock: + return mocker.patch("documents.tasks.sanity_checker.check_sanity") - @mock.patch("documents.tasks.sanity_checker.check_sanity") - def test_sanity_check_error(self, m) -> None: - messages = SanityCheckMessages() - messages.error(None, "Some error") - m.return_value = messages - self.assertRaises(SanityCheckFailedException, tasks.sanity_check) - m.assert_called_once() + def test_sanity_check_success(self, mock_check_sanity: mock.MagicMock) -> None: + mock_check_sanity.return_value = SanityCheckMessages() + assert tasks.sanity_check() == "No issues detected." + mock_check_sanity.assert_called_once() - @mock.patch("documents.tasks.sanity_checker.check_sanity") - def test_sanity_check_error_no_raise(self, m) -> None: + def test_sanity_check_error_raises( + self, + mock_check_sanity: mock.MagicMock, + sample_doc: Document, + ) -> None: messages = SanityCheckMessages() - messages.error(None, "Some error") - m.return_value = messages - # No exception should be raised + messages.error(sample_doc.pk, "some error") + mock_check_sanity.return_value = messages + with pytest.raises(SanityCheckFailedException): + tasks.sanity_check() + mock_check_sanity.assert_called_once() + + def test_sanity_check_error_no_raise( + self, + mock_check_sanity: mock.MagicMock, + sample_doc: Document, + ) -> None: + messages = SanityCheckMessages() + messages.error(sample_doc.pk, "some error") + mock_check_sanity.return_value = messages result = tasks.sanity_check(raise_on_error=False) - self.assertEqual( - result, - "Sanity check exited with errors. See log.", - ) - m.assert_called_once() + assert "1 document(s) with errors" in result + assert "Check logs for details." in result + mock_check_sanity.assert_called_once() - @mock.patch("documents.tasks.sanity_checker.check_sanity") - def test_sanity_check_warning(self, m) -> None: + def test_sanity_check_warning_only( + self, + mock_check_sanity: mock.MagicMock, + ) -> None: messages = SanityCheckMessages() - messages.warning(None, "Some warning") - m.return_value = messages - self.assertEqual( - tasks.sanity_check(), - "Sanity check exited with warnings. See log.", - ) - m.assert_called_once() + messages.warning(None, "extra file") + mock_check_sanity.return_value = messages + result = tasks.sanity_check() + assert result == "1 global warning(s) found." + mock_check_sanity.assert_called_once() - @mock.patch("documents.tasks.sanity_checker.check_sanity") - def test_sanity_check_info(self, m) -> None: + def test_sanity_check_info_only( + self, + mock_check_sanity: mock.MagicMock, + sample_doc: Document, + ) -> None: messages = SanityCheckMessages() - messages.info(None, "Some info") - m.return_value = messages - self.assertEqual( - tasks.sanity_check(), - "Sanity check exited with infos. See log.", - ) - m.assert_called_once() + messages.info(sample_doc.pk, "some info") + mock_check_sanity.return_value = messages + result = tasks.sanity_check() + assert result == "1 document(s) with infos found." + mock_check_sanity.assert_called_once() + + def test_sanity_check_errors_warnings_and_infos( + self, + mock_check_sanity: mock.MagicMock, + sample_doc: Document, + ) -> None: + messages = SanityCheckMessages() + messages.error(sample_doc.pk, "broken") + messages.warning(sample_doc.pk, "odd") + messages.info(sample_doc.pk, "fyi") + messages.warning(None, "extra file") + mock_check_sanity.return_value = messages + result = tasks.sanity_check(raise_on_error=False) + assert "1 document(s) with errors" in result + assert "1 document(s) with warnings" in result + assert "1 document(s) with infos" in result + assert "1 global warning(s)" in result + assert "Check logs for details." in result + mock_check_sanity.assert_called_once() class TestBulkUpdate(DirectoriesMixin, TestCase):