Compare commits

..

4 Commits

21 changed files with 1137 additions and 1492 deletions

View File

@@ -30,7 +30,7 @@ RUN set -eux \
# Purpose: Installs s6-overlay and rootfs
# Comments:
# - Don't leave anything extra in here either
FROM ghcr.io/astral-sh/uv:0.10.7-python3.12-trixie-slim AS s6-overlay-base
FROM ghcr.io/astral-sh/uv:0.10.5-python3.12-trixie-slim AS s6-overlay-base
WORKDIR /usr/src/s6

View File

@@ -262,6 +262,10 @@ your files differently, you can do that by adjusting the
or using [storage paths (see below)](#storage-paths). Paperless adds the
correct file extension e.g. `.pdf`, `.jpg` automatically.
When a document has file versions, each version uses the same naming rules and
storage path resolution as any other document file, with an added version suffix
such as `_v1`, `_v2`, etc.
This variable allows you to configure the filename (folders are allowed)
using placeholders. For example, configuring this to
@@ -353,6 +357,8 @@ If paperless detects that two documents share the same filename,
paperless will automatically append `_01`, `_02`, etc to the filename.
This happens if all the placeholders in a filename evaluate to the same
value.
For versioned files, this counter is appended after the version suffix
(for example `statement_v2_01.pdf`).
If there are any errors in the placeholders included in `PAPERLESS_FILENAME_FORMAT`,
paperless will fall back to using the default naming scheme instead.

View File

@@ -95,6 +95,7 @@ Think of versions as **file history** for a document.
- Versions track the underlying file and extracted text content (OCR/text).
- Metadata such as tags, correspondent, document type, storage path and custom fields stay on the "root" document.
- Version files follow normal filename formatting (including storage paths) and add a `_vN` suffix (for example `_v1`, `_v2`).
- By default, search and document content use the latest version.
- In document detail, selecting a version switches the preview, file metadata and content (and download etc buttons) to that version.
- Deleting a non-root version keeps metadata and falls back to the latest remaining version.

View File

@@ -111,7 +111,6 @@ docs = [
testing = [
"daphne",
"factory-boy~=3.3.1",
"faker~=40.5.1",
"imagehash",
"pytest~=9.0.0",
"pytest-cov~=7.0.0",

View File

@@ -11,6 +11,7 @@ import magic
from django.conf import settings
from django.contrib.auth.models import User
from django.db import transaction
from django.db.models import Max
from django.db.models import Q
from django.utils import timezone
from filelock import FileLock
@@ -123,22 +124,6 @@ class ConsumerPluginMixin:
self.filename = self.metadata.filename or self.input_doc.original_file.name
if input_doc.root_document_id:
self.log.debug(
f"Document root document id: {input_doc.root_document_id}",
)
root_document = Document.objects.get(pk=input_doc.root_document_id)
version_index = Document.objects.filter(root_document=root_document).count()
filename_path = Path(self.filename)
if filename_path.suffix:
self.filename = str(
filename_path.with_name(
f"{filename_path.stem}_v{version_index}{filename_path.suffix}",
),
)
else:
self.filename = f"{self.filename}_v{version_index}"
def _send_progress(
self,
current_progress: int,
@@ -184,7 +169,7 @@ class ConsumerPlugin(
):
logging_name = LOGGING_NAME
def _clone_root_into_version(
def _create_version_from_root(
self,
root_doc: Document,
*,
@@ -193,30 +178,38 @@ class ConsumerPlugin(
mime_type: str,
) -> Document:
self.log.debug("Saving record for updated version to database")
version_doc = Document.objects.get(pk=root_doc.pk)
setattr(version_doc, "pk", None)
version_doc.root_document = root_doc
root_doc_frozen = Document.objects.select_for_update().get(pk=root_doc.pk)
next_version_index = (
Document.global_objects.filter(
root_document_id=root_doc_frozen.pk,
).aggregate(
max_index=Max("version_index"),
)["max_index"]
or 0
)
file_for_checksum = (
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy
)
version_doc.checksum = hashlib.md5(
file_for_checksum.read_bytes(),
).hexdigest()
version_doc.content = text or ""
version_doc.page_count = page_count
version_doc.mime_type = mime_type
version_doc.original_filename = self.filename
version_doc.storage_path = root_doc.storage_path
# Clear unique file path fields so they can be generated uniquely later
version_doc.filename = None
version_doc.archive_filename = None
version_doc.archive_checksum = None
version_doc = Document(
root_document=root_doc_frozen,
version_index=next_version_index + 1,
checksum=hashlib.md5(
file_for_checksum.read_bytes(),
).hexdigest(),
content=text or "",
page_count=page_count,
mime_type=mime_type,
original_filename=self.filename,
owner_id=root_doc_frozen.owner_id,
created=root_doc_frozen.created,
title=root_doc_frozen.title,
added=timezone.now(),
modified=timezone.now(),
)
if self.metadata.version_label is not None:
version_doc.version_label = self.metadata.version_label
version_doc.added = timezone.now()
version_doc.modified = timezone.now()
return version_doc
def run_pre_consume_script(self) -> None:
@@ -542,7 +535,7 @@ class ConsumerPlugin(
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
)
original_document = self._clone_root_into_version(
original_document = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,

View File

@@ -128,11 +128,18 @@ def generate_filename(
counter=0,
archive_filename=False,
) -> Path:
# version docs use the root document for formatting, just with a suffix
context_doc = doc if doc.root_document_id is None else doc.root_document
version_suffix = (
f"_v{doc.version_index}"
if doc.root_document_id is not None and doc.version_index is not None
else ""
)
base_path: Path | None = None
# Determine the source of the format string
if doc.storage_path is not None:
filename_format = doc.storage_path.path
if context_doc.storage_path is not None:
filename_format = context_doc.storage_path.path
elif settings.FILENAME_FORMAT is not None:
# Maybe convert old to new style
filename_format = convert_format_str_to_template_format(
@@ -143,7 +150,7 @@ def generate_filename(
# If we have one, render it
if filename_format is not None:
rendered_path: str | None = format_filename(doc, filename_format)
rendered_path: str | None = format_filename(context_doc, filename_format)
if rendered_path:
base_path = Path(rendered_path)
@@ -157,7 +164,7 @@ def generate_filename(
base_filename = base_path.name
# Build the final filename with counter and filetype
final_filename = f"{base_filename}{counter_str}{filetype_str}"
final_filename = f"{base_filename}{version_suffix}{counter_str}{filetype_str}"
# If we have a directory component, include it
if str(directory) != ".":
@@ -166,7 +173,9 @@ def generate_filename(
full_path = Path(final_filename)
else:
# No template, use document ID
final_filename = f"{doc.pk:07}{counter_str}{filetype_str}"
final_filename = (
f"{context_doc.pk:07}{version_suffix}{counter_str}{filetype_str}"
)
full_path = Path(final_filename)
return full_path

View File

@@ -6,14 +6,11 @@ Provides automatic progress bar and multiprocessing support with minimal boilerp
from __future__ import annotations
import logging
import os
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Sized
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import Any
@@ -25,11 +22,7 @@ from django import db
from django.core.management import CommandError
from django.db.models import QuerySet
from django_rich.management import RichCommand
from rich import box
from rich.console import Console
from rich.console import Group
from rich.console import RenderableType
from rich.live import Live
from rich.progress import BarColumn
from rich.progress import MofNCompleteColumn
from rich.progress import Progress
@@ -37,11 +30,11 @@ from rich.progress import SpinnerColumn
from rich.progress import TextColumn
from rich.progress import TimeElapsedColumn
from rich.progress import TimeRemainingColumn
from rich.table import Table
from rich.text import Text
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterable
from collections.abc import Sequence
from django.core.management import CommandParser
@@ -50,78 +43,6 @@ T = TypeVar("T")
R = TypeVar("R")
@dataclass(slots=True, frozen=True)
class _BufferedRecord:
level: int
name: str
message: str
class BufferingLogHandler(logging.Handler):
"""Captures log records during a command run for deferred rendering.
Attach to a logger before a long operation and call ``render()``
afterwards to emit the buffered records via Rich, optionally filtered
by minimum level.
"""
def __init__(self) -> None:
super().__init__()
self._records: list[_BufferedRecord] = []
def emit(self, record: logging.LogRecord) -> None:
self._records.append(
_BufferedRecord(
level=record.levelno,
name=record.name,
message=self.format(record),
),
)
def render(
self,
console: Console,
*,
min_level: int = logging.DEBUG,
title: str = "Log Output",
) -> None:
records = [r for r in self._records if r.level >= min_level]
if not records:
return
table = Table(
title=title,
show_header=True,
header_style="bold",
show_lines=False,
box=box.SIMPLE,
)
table.add_column("Level", style="bold", width=8)
table.add_column("Logger", style="dim")
table.add_column("Message", no_wrap=False)
_level_styles: dict[int, str] = {
logging.DEBUG: "dim",
logging.INFO: "cyan",
logging.WARNING: "yellow",
logging.ERROR: "red",
logging.CRITICAL: "bold red",
}
for record in records:
style = _level_styles.get(record.level, "")
table.add_row(
Text(logging.getLevelName(record.level), style=style),
record.name,
record.message,
)
console.print(table)
def clear(self) -> None:
self._records.clear()
@dataclass(frozen=True, slots=True)
class ProcessResult(Generic[T, R]):
"""
@@ -170,23 +91,6 @@ class PaperlessCommand(RichCommand):
for result in self.process_parallel(process_doc, ids):
if result.error:
self.console.print(f"[red]Failed: {result.error}[/red]")
class Command(PaperlessCommand):
help = "Import documents with live stats"
def handle(self, *args, **options):
stats = ImportStats()
def render_stats() -> Table:
... # build Rich Table from stats
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
result = import_item(item)
stats.imported += 1
"""
supports_progress_bar: ClassVar[bool] = True
@@ -224,11 +128,13 @@ class PaperlessCommand(RichCommand):
This is called by Django's command infrastructure after argument parsing
but before handle(). We use it to set instance attributes from options.
"""
# Set progress bar state
if self.supports_progress_bar:
self.no_progress_bar = options.get("no_progress_bar", False)
else:
self.no_progress_bar = True
# Set multiprocessing state
if self.supports_multiprocessing:
self.process_count = options.get("processes", 1)
if self.process_count < 1:
@@ -238,69 +144,9 @@ class PaperlessCommand(RichCommand):
return super().execute(*args, **options)
@contextmanager
def buffered_logging(
self,
*logger_names: str,
level: int = logging.DEBUG,
) -> Generator[BufferingLogHandler, None, None]:
"""Context manager that captures log output from named loggers.
Installs a ``BufferingLogHandler`` on each named logger for the
duration of the block, suppressing propagation to avoid interleaving
with the Rich live display. The handler is removed on exit regardless
of whether an exception occurred.
Usage::
with self.buffered_logging("paperless", "documents") as log_buf:
# ... run progress loop ...
if options["verbose"]:
log_buf.render(self.console)
"""
handler = BufferingLogHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
loggers: list[logging.Logger] = []
original_propagate: dict[str, bool] = {}
for name in logger_names:
log = logging.getLogger(name)
log.addHandler(handler)
original_propagate[name] = log.propagate
log.propagate = False
loggers.append(log)
try:
yield handler
finally:
for log in loggers:
log.removeHandler(handler)
log.propagate = original_propagate[log.name]
@staticmethod
def _progress_columns() -> tuple[Any, ...]:
"""
Return the standard set of progress bar columns.
Extracted so both _create_progress (standalone) and track_with_stats
(inside Live) use identical column configuration without duplication.
"""
return (
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
)
def _create_progress(self, description: str) -> Progress:
"""
Create a standalone Progress instance with its own stderr Console.
Use this for track(). For track_with_stats(), Progress is created
directly inside a Live context instead.
Create a configured Progress instance.
Progress output is directed to stderr to match the convention that
progress bars are transient UI feedback, not command output. This
@@ -315,7 +161,12 @@ class PaperlessCommand(RichCommand):
A Progress instance configured with appropriate columns.
"""
return Progress(
*self._progress_columns(),
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=Console(stderr=True),
transient=False,
)
@@ -371,6 +222,7 @@ class PaperlessCommand(RichCommand):
yield from iterable
return
# Attempt to determine total if not provided
if total is None:
total = self._get_iterable_length(iterable)
@@ -380,87 +232,6 @@ class PaperlessCommand(RichCommand):
yield item
progress.advance(task_id)
def track_with_stats(
self,
iterable: Iterable[T],
*,
description: str = "Processing...",
stats_renderer: Callable[[], RenderableType],
total: int | None = None,
) -> Generator[T, None, None]:
"""
Iterate over items with a progress bar and a live-updating stats display.
The progress bar and stats renderable are combined in a single Live
context, so the stats panel re-renders in place below the progress bar
after each item is processed.
Respects --no-progress-bar flag. When disabled, yields items without
any display (stats are still updated by the caller's loop body, so
they will be accurate for any post-loop summary the caller prints).
Args:
iterable: The items to iterate over.
description: Text to display alongside the progress bar.
stats_renderer: Zero-argument callable that returns a Rich
renderable. Called after each item to refresh the display.
The caller typically closes over a mutable dataclass and
rebuilds a Table from it on each call.
total: Total number of items. If None, attempts to determine
automatically via .count() (for querysets) or len().
Yields:
Items from the iterable.
Example:
@dataclass
class Stats:
processed: int = 0
failed: int = 0
stats = Stats()
def render_stats() -> Table:
table = Table(box=None)
table.add_column("Processed")
table.add_column("Failed")
table.add_row(str(stats.processed), str(stats.failed))
return table
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
try:
import_item(item)
stats.processed += 1
except Exception:
stats.failed += 1
"""
if self.no_progress_bar:
yield from iterable
return
if total is None:
total = self._get_iterable_length(iterable)
stderr_console = Console(stderr=True)
# Progress is created without its own console so Live controls rendering.
progress = Progress(*self._progress_columns())
task_id = progress.add_task(description, total=total)
with Live(
Group(progress, stats_renderer()),
console=stderr_console,
refresh_per_second=4,
) as live:
for item in iterable:
yield item
progress.advance(task_id)
live.update(Group(progress, stats_renderer()))
def process_parallel(
self,
fn: Callable[[T], R],
@@ -498,7 +269,7 @@ class PaperlessCommand(RichCommand):
total = len(items)
if self.process_count == 1:
# Sequential execution in main process - critical for testing, so we don't fork in fork, etc
# Sequential execution in main process - critical for testing
yield from self._process_sequential(fn, items, description, total)
else:
# Parallel execution with ProcessPoolExecutor
@@ -527,7 +298,6 @@ class PaperlessCommand(RichCommand):
total: int,
) -> Generator[ProcessResult[T, R], None, None]:
"""Process items in parallel using ProcessPoolExecutor."""
# Close database connections before forking - required for PostgreSQL
db.connections.close_all()

View File

@@ -1,12 +1,4 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from dataclasses import field
from typing import TYPE_CHECKING
from rich.table import Table
from rich.text import Text
from documents.classifier import load_classifier
from documents.management.commands.base import PaperlessCommand
@@ -16,162 +8,9 @@ from documents.signals.handlers import set_document_type
from documents.signals.handlers import set_storage_path
from documents.signals.handlers import set_tags
if TYPE_CHECKING:
from rich.console import RenderableType
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
logger = logging.getLogger("paperless.management.retagger")
@dataclass(slots=True)
class RetaggerStats:
"""Cumulative counters updated as the retagger processes documents.
Mutable by design -- fields are incremented in the processing loop.
slots=True reduces per-instance memory overhead and speeds attribute access.
"""
correspondents: int = 0
document_types: int = 0
tags_added: int = 0
tags_removed: int = 0
storage_paths: int = 0
documents_processed: int = 0
@dataclass(slots=True)
class DocumentSuggestion:
"""Buffered classifier suggestions for a single document (suggest mode only).
Mutable by design -- fields are assigned incrementally as each setter runs.
"""
document: Document
correspondent: Correspondent | None = None
document_type: DocumentType | None = None
tags_to_add: frozenset[Tag] = field(default_factory=frozenset)
tags_to_remove: frozenset[Tag] = field(default_factory=frozenset)
storage_path: StoragePath | None = None
@property
def has_suggestions(self) -> bool:
return bool(
self.correspondent is not None
or self.document_type is not None
or self.tags_to_add
or self.tags_to_remove
or self.storage_path is not None,
)
def _build_stats_table(stats: RetaggerStats, *, suggest: bool) -> Table:
"""
Build the live-updating stats table shown below the progress bar.
In suggest mode the labels read "would set / would add" to make clear
that nothing has been written to the database.
"""
table = Table(box=None, padding=(0, 2), show_header=True, header_style="bold")
table.add_column("Documents")
table.add_column("Correspondents")
table.add_column("Doc Types")
table.add_column("Tags (+)")
table.add_column("Tags (-)")
table.add_column("Storage Paths")
verb = "would set" if suggest else "set"
table.add_row(
str(stats.documents_processed),
f"{stats.correspondents} {verb}",
f"{stats.document_types} {verb}",
f"+{stats.tags_added}",
f"-{stats.tags_removed}",
f"{stats.storage_paths} {verb}",
)
return table
def _build_suggestion_table(
suggestions: list[DocumentSuggestion],
base_url: str | None,
) -> Table:
"""
Build the final suggestion table printed after the progress bar completes.
Only documents with at least one suggestion are included.
"""
table = Table(
title="Suggested Changes",
show_header=True,
header_style="bold cyan",
show_lines=True,
)
table.add_column("Document", style="bold", no_wrap=False, min_width=20)
table.add_column("Correspondent")
table.add_column("Doc Type")
table.add_column("Tags")
table.add_column("Storage Path")
for suggestion in suggestions:
if not suggestion.has_suggestions:
continue
doc = suggestion.document
if base_url:
doc_cell = Text()
doc_cell.append(str(doc))
doc_cell.append(f"\n{base_url}/documents/{doc.pk}", style="dim")
else:
doc_cell = Text(f"{doc} [{doc.pk}]")
tag_parts: list[str] = []
for tag in sorted(suggestion.tags_to_add, key=lambda t: t.name):
tag_parts.append(f"[green]+{tag.name}[/green]")
for tag in sorted(suggestion.tags_to_remove, key=lambda t: t.name):
tag_parts.append(f"[red]-{tag.name}[/red]")
tag_cell = Text.from_markup(", ".join(tag_parts)) if tag_parts else Text("-")
table.add_row(
doc_cell,
str(suggestion.correspondent) if suggestion.correspondent else "-",
str(suggestion.document_type) if suggestion.document_type else "-",
tag_cell,
str(suggestion.storage_path) if suggestion.storage_path else "-",
)
return table
def _build_summary_table(stats: RetaggerStats) -> Table:
"""Build the final applied-changes summary table."""
table = Table(
title="Retagger Summary",
show_header=True,
header_style="bold cyan",
)
table.add_column("Metric", style="bold")
table.add_column("Count", justify="right")
table.add_row("Documents processed", str(stats.documents_processed))
table.add_row("Correspondents set", str(stats.correspondents))
table.add_row("Document types set", str(stats.document_types))
table.add_row("Tags added", str(stats.tags_added))
table.add_row("Tags removed", str(stats.tags_removed))
table.add_row("Storage paths set", str(stats.storage_paths))
return table
class Command(PaperlessCommand):
help = (
"Using the current classification model, assigns correspondents, tags "
@@ -180,7 +19,7 @@ class Command(PaperlessCommand):
"modified) after their initial import."
)
def add_arguments(self, parser) -> None:
def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument("-c", "--correspondent", default=False, action="store_true")
parser.add_argument("-T", "--tags", default=False, action="store_true")
@@ -192,9 +31,9 @@ class Command(PaperlessCommand):
default=False,
action="store_true",
help=(
"By default this command will not try to assign a correspondent "
"if more than one matches the document. Use this flag to pick "
"the first match instead."
"By default this command won't try to assign a correspondent "
"if more than one matches the document. Use this flag if "
"you'd rather it just pick the first one it finds."
),
)
parser.add_argument(
@@ -203,140 +42,91 @@ class Command(PaperlessCommand):
default=False,
action="store_true",
help=(
"Overwrite any previously set correspondent, document type, and "
"remove tags that no longer match due to changed rules."
"If set, the document retagger will overwrite any previously "
"set correspondent, document and remove correspondents, types "
"and tags that do not match anymore due to changed rules."
),
)
parser.add_argument(
"--suggest",
default=False,
action="store_true",
help="Show what would be changed without applying anything.",
help="Return the suggestion, don't change anything.",
)
parser.add_argument(
"--base-url",
help="Base URL used to build document links in suggest output.",
help="The base URL to use to build the link to the documents.",
)
parser.add_argument(
"--id-range",
help="Restrict retagging to documents within this ID range (inclusive).",
help="A range of document ids on which the retagging should be applied.",
nargs=2,
type=int,
)
def handle(self, *args, **options) -> None:
suggest: bool = options["suggest"]
overwrite: bool = options["overwrite"]
use_first: bool = options["use_first"]
base_url: str | None = options["base_url"]
do_correspondent: bool = options["correspondent"]
do_document_type: bool = options["document_type"]
do_tags: bool = options["tags"]
do_storage_path: bool = options["storage_path"]
if not any([do_correspondent, do_document_type, do_tags, do_storage_path]):
self.console.print(
"[yellow]No classifier targets specified. "
"Use -c, -T, -t, or -s to select what to retag.[/yellow]",
)
return
def handle(self, *args, **options):
if options["inbox_only"]:
queryset = Document.objects.filter(tags__is_inbox_tag=True)
else:
queryset = Document.objects.all()
if options["id_range"]:
lo, hi = options["id_range"]
queryset = queryset.filter(id__range=(lo, hi))
queryset = queryset.filter(
id__range=(options["id_range"][0], options["id_range"][1]),
)
documents = queryset.distinct()
classifier = load_classifier()
stats = RetaggerStats()
suggestions: list[DocumentSuggestion] = []
for document in self.track(documents, description="Retagging..."):
if options["correspondent"]:
set_correspondent(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
def render_stats() -> RenderableType:
return _build_stats_table(stats, suggest=suggest)
if options["document_type"]:
set_document_type(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
with self.buffered_logging(
"paperless",
"paperless.handlers",
"documents",
) as log_buf:
for document in self.track_with_stats(
documents,
description="Retagging...",
stats_renderer=render_stats,
):
suggestion = DocumentSuggestion(document=document)
if options["tags"]:
set_tags(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
if do_correspondent:
correspondent = set_correspondent(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if correspondent is not None:
stats.correspondents += 1
suggestion.correspondent = correspondent
if do_document_type:
document_type = set_document_type(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if document_type is not None:
stats.document_types += 1
suggestion.document_type = document_type
if do_tags:
tags_to_add, tags_to_remove = set_tags(
None,
document,
classifier=classifier,
replace=overwrite,
dry_run=suggest,
)
stats.tags_added += len(tags_to_add)
stats.tags_removed += len(tags_to_remove)
suggestion.tags_to_add = frozenset(tags_to_add)
suggestion.tags_to_remove = frozenset(tags_to_remove)
if do_storage_path:
storage_path = set_storage_path(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if storage_path is not None:
stats.storage_paths += 1
suggestion.storage_path = storage_path
stats.documents_processed += 1
if suggest:
suggestions.append(suggestion)
# Post-loop output
if suggest:
visible = [s for s in suggestions if s.has_suggestions]
if visible:
self.console.print(_build_suggestion_table(visible, base_url))
else:
self.console.print("[green]No changes suggested.[/green]")
else:
self.console.print(_build_summary_table(stats))
log_buf.render(self.console, min_level=logging.INFO, title="Retagger Log")
if options["storage_path"]:
set_storage_path(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)

View File

@@ -0,0 +1,37 @@
# Generated by Django 5.2.11 on 2026-03-02 17:48
from django.conf import settings
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0013_document_root_document"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name="document",
name="version_index",
field=models.PositiveIntegerField(
blank=True,
db_index=True,
help_text="Index of this version within the root document.",
null=True,
verbose_name="version index",
),
),
migrations.AddConstraint(
model_name="document",
constraint=models.UniqueConstraint(
condition=models.Q(
("root_document__isnull", False),
("version_index__isnull", False),
),
fields=("root_document", "version_index"),
name="documents_document_root_version_index_uniq",
),
),
]

View File

@@ -75,7 +75,7 @@ class MatchingModel(ModelWithOwner):
is_insensitive = models.BooleanField(_("is insensitive"), default=True)
class Meta(ModelWithOwner.Meta):
class Meta:
abstract = True
ordering = ("name",)
constraints = [
@@ -317,6 +317,14 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
verbose_name=_("root document for this version"),
)
version_index = models.PositiveIntegerField(
_("version index"),
blank=True,
null=True,
db_index=True,
help_text=_("Index of this version within the root document."),
)
version_label = models.CharField(
_("version label"),
max_length=64,
@@ -329,6 +337,16 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
ordering = ("-created",)
verbose_name = _("document")
verbose_name_plural = _("documents")
constraints = [
models.UniqueConstraint(
fields=["root_document", "version_index"],
condition=models.Q(
root_document__isnull=False,
version_index__isnull=False,
),
name="documents_document_root_version_index_uniq",
),
]
def __str__(self) -> str:
created = self.created.isoformat()

View File

@@ -4,7 +4,6 @@ import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from celery import shared_task
from celery import states
@@ -33,14 +32,12 @@ from documents.file_handling import create_source_path_directory
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename
from documents.file_handling import generate_unique_filename
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import PaperlessTask
from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag
from documents.models import UiSettings
from documents.models import Workflow
@@ -84,41 +81,47 @@ def add_inbox_tags(sender, document: Document, logging_group=None, **kwargs) ->
document.add_nested_tags(inbox_tags)
def _suggestion_printer(
stdout,
style_func,
suggestion_type: str,
document: Document,
selected: MatchingModel,
base_url: str | None = None,
) -> None:
"""
Smaller helper to reduce duplication when just outputting suggestions to the console
"""
doc_str = str(document)
if base_url is not None:
stdout.write(style_func.SUCCESS(doc_str))
stdout.write(style_func.SUCCESS(f"{base_url}/documents/{document.pk}"))
else:
stdout.write(style_func.SUCCESS(f"{doc_str} [{document.pk}]"))
stdout.write(f"Suggest {suggestion_type}: {selected}")
def set_correspondent(
sender: object,
sender,
document: Document,
*,
logging_group: object = None,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> Correspondent | None:
"""
Assign a correspondent to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing correspondent assignment.
use_first: If True, pick the first match when multiple correspondents
match. If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The correspondent that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
if document.correspondent and not replace:
return None
return
potential_correspondents = matching.match_correspondents(document, classifier)
potential_count = len(potential_correspondents)
selected = potential_correspondents[0] if potential_correspondents else None
if potential_count > 1:
if use_first:
logger.debug(
@@ -132,53 +135,49 @@ def set_correspondent(
f"not assigning any correspondent",
extra={"group": logging_group},
)
return None
return
if (selected or replace) and not dry_run:
logger.info(
f"Assigning correspondent {selected} to {document}",
extra={"group": logging_group},
)
document.correspondent = selected
document.save(update_fields=("correspondent",))
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"correspondent",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning correspondent {selected} to {document}",
extra={"group": logging_group},
)
return selected
document.correspondent = selected
document.save(update_fields=("correspondent",))
def set_document_type(
sender: object,
sender,
document: Document,
*,
logging_group: object = None,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> DocumentType | None:
"""
Assign a document type to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing document type assignment.
use_first: If True, pick the first match when multiple types match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The document type that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
if document.document_type and not replace:
return None
return
potential_document_types = matching.match_document_types(document, classifier)
potential_count = len(potential_document_types)
selected = potential_document_types[0] if potential_document_types else None
potential_document_type = matching.match_document_types(document, classifier)
potential_count = len(potential_document_type)
selected = potential_document_type[0] if potential_document_type else None
if potential_count > 1:
if use_first:
@@ -193,64 +192,42 @@ def set_document_type(
f"not assigning any document type",
extra={"group": logging_group},
)
return None
return
if (selected or replace) and not dry_run:
logger.info(
f"Assigning document type {selected} to {document}",
extra={"group": logging_group},
)
document.document_type = selected
document.save(update_fields=("document_type",))
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"document type",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning document type {selected} to {document}",
extra={"group": logging_group},
)
return selected
document.document_type = selected
document.save(update_fields=("document_type",))
def set_tags(
sender: object,
sender,
document: Document,
*,
logging_group: object = None,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace: bool = False,
dry_run: bool = False,
**kwargs: Any,
) -> tuple[set[Tag], set[Tag]]:
"""
Assign tags to a document based on classifier results.
When replace=True, existing auto-matched and rule-matched tags are removed
before applying the new set (inbox tags and manually-added tags are preserved).
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, remove existing classifier-managed tags before applying
new ones. Inbox tags and manually-added tags are always preserved.
dry_run: If True, compute what would change without saving anything.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
A two-tuple of (tags_added, tags_removed). In non-replace mode,
tags_removed is always an empty set. In dry_run mode, neither set
is applied to the database.
"""
# Compute which tags would be removed under replace mode.
# The filter mirrors the .delete() call below: keep inbox tags and
# manually-added tags (match="" and not auto-matched).
replace=False,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
if replace:
tags_to_remove: set[Tag] = set(
document.tags.exclude(
is_inbox_tag=True,
).exclude(
Q(match="") & ~Q(matching_algorithm=Tag.MATCH_AUTO),
),
)
else:
tags_to_remove = set()
if replace and not dry_run:
Document.tags.through.objects.filter(document=document).exclude(
Q(tag__is_inbox_tag=True),
).exclude(
@@ -258,53 +235,65 @@ def set_tags(
).delete()
current_tags = set(document.tags.all())
matched_tags = matching.match_tags(document, classifier)
tags_to_add = set(matched_tags) - current_tags
if tags_to_add and not dry_run:
matched_tags = matching.match_tags(document, classifier)
relevant_tags = set(matched_tags) - current_tags
if suggest:
extra_tags = current_tags - set(matched_tags)
extra_tags = [
t for t in extra_tags if t.matching_algorithm == MatchingModel.MATCH_AUTO
]
if not relevant_tags and not extra_tags:
return
doc_str = style_func.SUCCESS(str(document))
if base_url:
stdout.write(doc_str)
stdout.write(f"{base_url}/documents/{document.pk}")
else:
stdout.write(doc_str + style_func.SUCCESS(f" [{document.pk}]"))
if relevant_tags:
stdout.write("Suggest tags: " + ", ".join([t.name for t in relevant_tags]))
if extra_tags:
stdout.write("Extra tags: " + ", ".join([t.name for t in extra_tags]))
else:
if not relevant_tags:
return
message = 'Tagging "{}" with "{}"'
logger.info(
f'Tagging "{document}" with "{", ".join(t.name for t in tags_to_add)}"',
message.format(document, ", ".join([t.name for t in relevant_tags])),
extra={"group": logging_group},
)
document.add_nested_tags(tags_to_add)
return tags_to_add, tags_to_remove
document.add_nested_tags(relevant_tags)
def set_storage_path(
sender: object,
sender,
document: Document,
*,
logging_group: object = None,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> StoragePath | None:
"""
Assign a storage path to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing storage path assignment.
use_first: If True, pick the first match when multiple paths match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The storage path that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
if document.storage_path and not replace:
return None
return
potential_storage_paths = matching.match_storage_paths(document, classifier)
potential_count = len(potential_storage_paths)
selected = potential_storage_paths[0] if potential_storage_paths else None
potential_storage_path = matching.match_storage_paths(
document,
classifier,
)
potential_count = len(potential_storage_path)
selected = potential_storage_path[0] if potential_storage_path else None
if potential_count > 1:
if use_first:
@@ -319,17 +308,26 @@ def set_storage_path(
f"not assigning any storage directory",
extra={"group": logging_group},
)
return None
return
if (selected or replace) and not dry_run:
logger.info(
f"Assigning storage path {selected} to {document}",
extra={"group": logging_group},
)
document.storage_path = selected
document.save(update_fields=("storage_path",))
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"storage directory",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning storage path {selected} to {document}",
extra={"group": logging_group},
)
return selected
document.storage_path = selected
document.save(update_fields=("storage_path",))
# see empty_trash in documents/tasks.py for signal handling
@@ -598,6 +596,16 @@ def update_filename_and_move_files(
root=settings.ARCHIVE_DIR,
)
# Keep version files in sync with root
if instance.root_document_id is None:
for version_doc in Document.objects.filter(root_document_id=instance.pk).only(
"pk",
):
update_filename_and_move_files(
Document,
version_doc,
)
@shared_task
def process_cf_select_update(custom_field: CustomField) -> None:

View File

@@ -114,14 +114,3 @@ def authenticated_rest_api_client(rest_api_client: APIClient):
user = UserModel.objects.create_user(username="testuser", password="password")
rest_api_client.force_authenticate(user=user)
yield rest_api_client
@pytest.fixture(scope="session", autouse=True)
def faker_session_locale():
"""Set Faker locale for reproducibility."""
return "en_US"
@pytest.fixture(scope="session", autouse=True)
def faker_seed():
return 12345

View File

@@ -1,67 +1,17 @@
"""
Factory-boy factories for documents app models.
"""
from __future__ import annotations
import factory
from factory import Faker
from factory.django import DjangoModelFactory
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
class CorrespondentFactory(DjangoModelFactory):
class Meta:
model = Correspondent
name = factory.Sequence(lambda n: f"{factory.Faker('company')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class DocumentTypeFactory(DjangoModelFactory):
class Meta:
model = DocumentType
name = factory.Sequence(lambda n: f"{factory.Faker('bs')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class TagFactory(DjangoModelFactory):
class Meta:
model = Tag
name = factory.Sequence(lambda n: f"{factory.Faker('word')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
is_inbox_tag = False
class StoragePathFactory(DjangoModelFactory):
class Meta:
model = StoragePath
name = factory.Sequence(
lambda n: f"{factory.Faker('file_path', depth=2, extension='')} {n}",
)
path = factory.LazyAttribute(lambda o: f"{o.name}/{{title}}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
name = Faker("name")
class DocumentFactory(DjangoModelFactory):
class Meta:
model = Document
title = factory.Faker("sentence", nb_words=4)
checksum = factory.Faker("md5")
content = factory.Faker("paragraph")
correspondent = None
document_type = None
storage_path = None

View File

@@ -699,6 +699,14 @@ class TestConsumer(
self.assertIsNotNone(root_doc)
assert root_doc is not None
root_storage_path = StoragePath.objects.create(
name="version-root-path",
path="root/{{title}}",
)
root_doc.storage_path = root_storage_path
root_doc.archive_serial_number = 42
root_doc.save()
actor = User.objects.create_user(
username="actor",
email="actor@example.com",
@@ -735,7 +743,7 @@ class TestConsumer(
)
consumer.setup()
try:
self.assertTrue(consumer.filename.endswith("_v0.pdf"))
self.assertEqual(consumer.filename, version_file.name)
consumer.run()
finally:
consumer.cleanup()
@@ -745,8 +753,10 @@ class TestConsumer(
version = versions.first()
assert version is not None
assert version.original_filename is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.version_label, "v2")
self.assertTrue(version.original_filename.endswith("_v0.pdf"))
self.assertIsNone(version.archive_serial_number)
self.assertEqual(version.original_filename, version_file.name)
self.assertTrue(bool(version.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@@ -795,7 +805,7 @@ class TestConsumer(
)
consumer.setup()
try:
self.assertEqual(consumer.filename, "valid_pdf_version-upload_v0")
self.assertEqual(consumer.filename, "valid_pdf_version-upload")
consumer.run()
finally:
consumer.cleanup()
@@ -805,9 +815,67 @@ class TestConsumer(
)
self.assertIsNotNone(version)
assert version is not None
self.assertEqual(version.original_filename, "valid_pdf_version-upload_v0")
self.assertEqual(version.version_index, 1)
self.assertEqual(version.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(version.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
def test_consume_version_index_monotonic_after_version_deletion(self, m) -> None:
m.return_value = MagicMock()
with self.get_consumer(self.get_test_file()) as consumer:
consumer.run()
root_doc = Document.objects.first()
self.assertIsNotNone(root_doc)
assert root_doc is not None
def consume_version(version_file: Path) -> Document:
status = DummyProgressManager(version_file.name, None)
overrides = DocumentMetadataOverrides()
doc = ConsumableDocument(
DocumentSource.ApiUpload,
original_file=version_file,
root_document_id=root_doc.pk,
)
preflight = ConsumerPreflightPlugin(
doc,
overrides,
status, # type: ignore[arg-type]
self.dirs.scratch_dir,
"task-id",
)
preflight.setup()
preflight.run()
consumer = ConsumerPlugin(
doc,
overrides,
status, # type: ignore[arg-type]
self.dirs.scratch_dir,
"task-id",
)
consumer.setup()
try:
consumer.run()
finally:
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
)
assert version is not None
return version
v1 = consume_version(self.get_test_file2())
self.assertEqual(v1.version_index, 1)
v1.delete()
# The next version should have version_index 2, even though version_index 1 was deleted
v2 = consume_version(self.get_test_file())
self.assertEqual(v2.version_index, 2)
@mock.patch("documents.consumer.load_classifier")
def testClassifyDocument(self, m) -> None:
correspondent = Correspondent.objects.create(

View File

@@ -77,6 +77,58 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
settings.ORIGINALS_DIR / "test" / "test.pdf",
)
@override_settings(FILENAME_FORMAT=None)
def test_root_storage_path_change_updates_version_files(self) -> None:
old_storage_path = StoragePath.objects.create(
name="old-path",
path="old/{{title}}",
)
new_storage_path = StoragePath.objects.create(
name="new-path",
path="new/{{title}}",
)
root_doc = Document.objects.create(
title="rootdoc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=old_storage_path,
)
version_doc = Document.objects.create(
title="version-title",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
Document.objects.filter(pk=root_doc.pk).update(
filename=generate_filename(root_doc),
)
Document.objects.filter(pk=version_doc.pk).update(
filename=generate_filename(version_doc),
)
root_doc.refresh_from_db()
version_doc.refresh_from_db()
create_source_path_directory(root_doc.source_path)
Path(root_doc.source_path).touch()
create_source_path_directory(version_doc.source_path)
Path(version_doc.source_path).touch()
root_doc.storage_path = new_storage_path
root_doc.save()
root_doc.refresh_from_db()
version_doc.refresh_from_db()
self.assertEqual(root_doc.filename, "new/rootdoc.pdf")
self.assertEqual(version_doc.filename, "new/rootdoc_v1.pdf")
self.assertIsFile(root_doc.source_path)
self.assertIsFile(version_doc.source_path)
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc.pdf")
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc_v1.pdf")
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming_missing_permissions(self) -> None:
document = Document()
@@ -1222,6 +1274,94 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
Path("logs.pdf"),
)
@override_settings(FILENAME_FORMAT="{title}")
def test_version_index_suffix_for_template_filename(self) -> None:
root_doc = Document.objects.create(
title="the_doc",
mime_type="application/pdf",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="the_doc",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
self.assertEqual(generate_filename(version_doc), Path("the_doc_v1.pdf"))
self.assertEqual(
generate_filename(version_doc, counter=1),
Path("the_doc_v1_01.pdf"),
)
@override_settings(FILENAME_FORMAT=None)
def test_version_index_suffix_for_default_filename(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="version-checksum",
root_document=root_doc,
version_index=2,
)
self.assertEqual(
generate_filename(version_doc),
Path(f"{root_doc.pk:07d}_v2.txt"),
)
self.assertEqual(
generate_filename(version_doc, archive_filename=True),
Path(f"{root_doc.pk:07d}_v2.pdf"),
)
@override_settings(FILENAME_FORMAT="{original_name}")
def test_version_index_suffix_with_original_name_placeholder(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="root-checksum",
original_filename="root-upload.pdf",
)
version_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
original_filename="version-upload.pdf",
)
self.assertEqual(generate_filename(version_doc), Path("root-upload_v1.pdf"))
def test_version_index_suffix_with_storage_path(self) -> None:
storage_path = StoragePath.objects.create(
name="vtest",
path="folder/{{title}}",
)
root_doc = Document.objects.create(
title="storage_doc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=storage_path,
)
version_doc = Document.objects.create(
title="version_title_should_not_be_used",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=3,
)
self.assertEqual(
generate_filename(version_doc),
Path("folder/storage_doc_v3.pdf"),
)
@override_settings(
FILENAME_FORMAT="XX{correspondent}/{title}",
FILENAME_FORMAT_REMOVE_NONE=True,

View File

@@ -1,442 +1,298 @@
"""
Tests for the document_retagger management command.
"""
from __future__ import annotations
import pytest
from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import TestCase
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.factories import CorrespondentFactory
from documents.tests.factories import DocumentFactory
from documents.tests.factories import DocumentTypeFactory
from documents.tests.factories import StoragePathFactory
from documents.tests.factories import TagFactory
from documents.tests.utils import DirectoriesMixin
# ---------------------------------------------------------------------------
# Module-level type aliases
# ---------------------------------------------------------------------------
StoragePathTuple = tuple[StoragePath, StoragePath, StoragePath]
TagTuple = tuple[Tag, Tag, Tag, Tag, Tag]
CorrespondentTuple = tuple[Correspondent, Correspondent]
DocumentTypeTuple = tuple[DocumentType, DocumentType]
DocumentTuple = tuple[Document, Document, Document, Document]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def storage_paths(db) -> StoragePathTuple:
"""Three storage paths with varying match rules."""
sp1 = StoragePathFactory(
path="{created_data}/{title}",
match="auto document",
matching_algorithm=MatchingModel.MATCH_LITERAL,
)
sp2 = StoragePathFactory(
path="{title}",
match="^first|^unrelated",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
sp3 = StoragePathFactory(
path="{title}",
match="^blah",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
return sp1, sp2, sp3
@pytest.fixture()
def tags(db) -> TagTuple:
"""Tags covering the common matching scenarios."""
tag_first = TagFactory(match="first", matching_algorithm=Tag.MATCH_ANY)
tag_second = TagFactory(match="second", matching_algorithm=Tag.MATCH_ANY)
tag_inbox = TagFactory(is_inbox_tag=True)
tag_no_match = TagFactory()
tag_auto = TagFactory(matching_algorithm=Tag.MATCH_AUTO)
return tag_first, tag_second, tag_inbox, tag_no_match, tag_auto
@pytest.fixture()
def correspondents(db) -> CorrespondentTuple:
"""Two correspondents matching 'first' and 'second' content."""
c_first = CorrespondentFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
c_second = CorrespondentFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return c_first, c_second
@pytest.fixture()
def document_types(db) -> DocumentTypeTuple:
"""Two document types matching 'first' and 'second' content."""
dt_first = DocumentTypeFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
dt_second = DocumentTypeFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return dt_first, dt_second
@pytest.fixture()
def documents(storage_paths: StoragePathTuple, tags: TagTuple) -> DocumentTuple:
"""Four documents with varied content used across most retagger tests."""
_, _, sp3 = storage_paths
_, _, tag_inbox, tag_no_match, tag_auto = tags
d1 = DocumentFactory(checksum="A", title="A", content="first document")
d2 = DocumentFactory(checksum="B", title="B", content="second document")
d3 = DocumentFactory(
checksum="C",
title="C",
content="unrelated document",
storage_path=sp3,
)
d4 = DocumentFactory(checksum="D", title="D", content="auto document")
d3.tags.add(tag_inbox, tag_no_match)
d4.tags.add(tag_auto)
return d1, d2, d3, d4
def _get_docs() -> DocumentTuple:
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
# ---------------------------------------------------------------------------
# Tag assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerTags(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_tags(self, tags: TagTuple) -> None:
tag_first, tag_second, *_ = tags
class TestRetagger(DirectoriesMixin, TestCase):
def make_models(self) -> None:
self.sp1 = StoragePath.objects.create(
name="dummy a",
path="{created_data}/{title}",
match="auto document",
matching_algorithm=StoragePath.MATCH_LITERAL,
)
self.sp2 = StoragePath.objects.create(
name="dummy b",
path="{title}",
match="^first|^unrelated",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.sp3 = StoragePath.objects.create(
name="dummy c",
path="{title}",
match="^blah",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.d1 = Document.objects.create(
checksum="A",
title="A",
content="first document",
)
self.d2 = Document.objects.create(
checksum="B",
title="B",
content="second document",
)
self.d3 = Document.objects.create(
checksum="C",
title="C",
content="unrelated document",
storage_path=self.sp3,
)
self.d4 = Document.objects.create(
checksum="D",
title="D",
content="auto document",
)
self.tag_first = Tag.objects.create(
name="tag1",
match="first",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_second = Tag.objects.create(
name="tag2",
match="second",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_inbox = Tag.objects.create(name="test", is_inbox_tag=True)
self.tag_no_match = Tag.objects.create(name="test2")
self.tag_auto = Tag.objects.create(
name="tagauto",
matching_algorithm=Tag.MATCH_AUTO,
)
self.d3.tags.add(self.tag_inbox)
self.d3.tags.add(self.tag_no_match)
self.d4.tags.add(self.tag_auto)
self.correspondent_first = Correspondent.objects.create(
name="c1",
match="first",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.correspondent_second = Correspondent.objects.create(
name="c2",
match="second",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.doctype_first = DocumentType.objects.create(
name="dt1",
match="first",
matching_algorithm=DocumentType.MATCH_ANY,
)
self.doctype_second = DocumentType.objects.create(
name="dt2",
match="second",
matching_algorithm=DocumentType.MATCH_ANY,
)
def get_updated_docs(self):
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
def setUp(self) -> None:
super().setUp()
self.make_models()
def test_add_tags(self) -> None:
call_command("document_retagger", "--tags")
d_first, d_second, d_unrelated, d_auto = _get_docs()
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
assert d_first.tags.count() == 1
assert d_second.tags.count() == 1
assert d_unrelated.tags.count() == 2
assert d_auto.tags.count() == 1
assert d_first.tags.first() == tag_first
assert d_second.tags.first() == tag_second
self.assertEqual(d_first.tags.count(), 1)
self.assertEqual(d_second.tags.count(), 1)
self.assertEqual(d_unrelated.tags.count(), 2)
self.assertEqual(d_auto.tags.count(), 1)
def test_overwrite_removes_stale_tags_and_preserves_inbox(
self,
documents: DocumentTuple,
tags: TagTuple,
) -> None:
d1, *_ = documents
tag_first, tag_second, tag_inbox, tag_no_match, _ = tags
d1.tags.add(tag_second)
self.assertEqual(d_first.tags.first(), self.tag_first)
self.assertEqual(d_second.tags.first(), self.tag_second)
def test_add_type(self) -> None:
call_command("document_retagger", "--document_type")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertEqual(d_first.document_type, self.doctype_first)
self.assertEqual(d_second.document_type, self.doctype_second)
def test_add_correspondent(self) -> None:
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertEqual(d_first.correspondent, self.correspondent_first)
self.assertEqual(d_second.correspondent, self.correspondent_second)
def test_overwrite_preserve_inbox(self) -> None:
self.d1.tags.add(self.tag_second)
call_command("document_retagger", "--tags", "--overwrite")
d_first, d_second, d_unrelated, d_auto = _get_docs()
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
assert Tag.objects.filter(id=tag_second.id).exists()
assert list(d_first.tags.values_list("id", flat=True)) == [tag_first.id]
assert list(d_second.tags.values_list("id", flat=True)) == [tag_second.id]
assert set(d_unrelated.tags.values_list("id", flat=True)) == {
tag_inbox.id,
tag_no_match.id,
}
assert d_auto.tags.count() == 0
self.assertIsNotNone(Tag.objects.get(id=self.tag_second.id))
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_tags(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--tags", "--suggest", *extra_args)
d_first, d_second, _, d_auto = _get_docs()
assert d_first.tags.count() == 0
assert d_second.tags.count() == 0
assert d_auto.tags.count() == 1
# ---------------------------------------------------------------------------
# Document type assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerDocumentType(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_type(self, document_types: DocumentTypeTuple) -> None:
dt_first, dt_second = document_types
call_command("document_retagger", "--document_type")
d_first, d_second, _, _ = _get_docs()
assert d_first.document_type == dt_first
assert d_second.document_type == dt_second
@pytest.mark.usefixtures("documents", "document_types")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_document_type(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--document_type", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
assert d_first.document_type is None
assert d_second.document_type is None
@pytest.mark.parametrize(
("use_first_flag", "expects_assignment"),
[
pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
DocumentTypeFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
self.assertCountEqual(
[tag.id for tag in d_first.tags.all()],
[self.tag_first.id],
)
DocumentTypeFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
self.assertCountEqual(
[tag.id for tag in d_second.tags.all()],
[self.tag_second.id],
)
doc = DocumentFactory(content="ambiguous content")
call_command("document_retagger", "--document_type", *use_first_flag)
doc.refresh_from_db()
assert (doc.document_type is not None) is expects_assignment
# ---------------------------------------------------------------------------
# Correspondent assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerCorrespondent(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_correspondent(self, correspondents: CorrespondentTuple) -> None:
c_first, c_second = correspondents
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = _get_docs()
assert d_first.correspondent == c_first
assert d_second.correspondent == c_second
@pytest.mark.usefixtures("documents", "correspondents")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_correspondent(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--correspondent", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
assert d_first.correspondent is None
assert d_second.correspondent is None
@pytest.mark.parametrize(
("use_first_flag", "expects_assignment"),
[
pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
CorrespondentFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
self.assertCountEqual(
[tag.id for tag in d_unrelated.tags.all()],
[self.tag_inbox.id, self.tag_no_match.id],
)
CorrespondentFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
self.assertEqual(d_auto.tags.count(), 0)
def test_add_tags_suggest(self) -> None:
call_command("document_retagger", "--tags", "--suggest")
d_first, d_second, _, d_auto = self.get_updated_docs()
self.assertEqual(d_first.tags.count(), 0)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
def test_add_type_suggest(self) -> None:
call_command("document_retagger", "--document_type", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
def test_add_correspondent_suggest(self) -> None:
call_command("document_retagger", "--correspondent", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
def test_add_tags_suggest_url(self) -> None:
call_command(
"document_retagger",
"--tags",
"--suggest",
"--base-url=http://localhost",
)
doc = DocumentFactory(content="ambiguous content")
d_first, d_second, _, d_auto = self.get_updated_docs()
call_command("document_retagger", "--correspondent", *use_first_flag)
self.assertEqual(d_first.tags.count(), 0)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
doc.refresh_from_db()
assert (doc.correspondent is not None) is expects_assignment
def test_add_type_suggest_url(self) -> None:
call_command(
"document_retagger",
"--document_type",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
# ---------------------------------------------------------------------------
# Storage path assignment
# ---------------------------------------------------------------------------
def test_add_correspondent_suggest_url(self) -> None:
call_command(
"document_retagger",
"--correspondent",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerStoragePath(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_storage_path(self, storage_paths: StoragePathTuple) -> None:
def test_add_storage_path(self) -> None:
"""
GIVEN documents matching various storage path rules
WHEN document_retagger --storage_path is called
THEN matching documents get the correct path; existing path is unchanged
GIVEN:
- 2 storage paths with documents which match them
- 1 document which matches but has a storage path
WHEN:
- document retagger is called
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch left unchanged
"""
sp1, sp2, sp3 = storage_paths
call_command("document_retagger", "--storage_path")
d_first, d_second, d_unrelated, d_auto = _get_docs()
call_command(
"document_retagger",
"--storage_path",
)
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
assert d_first.storage_path == sp2
assert d_auto.storage_path == sp1
assert d_second.storage_path is None
assert d_unrelated.storage_path == sp3
self.assertEqual(d_first.storage_path, self.sp2)
self.assertEqual(d_auto.storage_path, self.sp1)
self.assertIsNone(d_second.storage_path)
self.assertEqual(d_unrelated.storage_path, self.sp3)
@pytest.mark.usefixtures("documents")
def test_overwrite_storage_path(self, storage_paths: StoragePathTuple) -> None:
def test_overwrite_storage_path(self) -> None:
"""
GIVEN a document with an existing storage path that matches a different rule
WHEN document_retagger --storage_path --overwrite is called
THEN the existing path is replaced by the newly matched path
GIVEN:
- 2 storage paths with documents which match them
- 1 document which matches but has a storage path
WHEN:
- document retagger is called with overwrite
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch overwritten
"""
sp1, sp2, _ = storage_paths
call_command("document_retagger", "--storage_path", "--overwrite")
d_first, d_second, d_unrelated, d_auto = _get_docs()
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
assert d_first.storage_path == sp2
assert d_auto.storage_path == sp1
assert d_second.storage_path is None
assert d_unrelated.storage_path == sp2
self.assertEqual(d_first.storage_path, self.sp2)
self.assertEqual(d_auto.storage_path, self.sp1)
self.assertIsNone(d_second.storage_path)
self.assertEqual(d_unrelated.storage_path, self.sp2)
@pytest.mark.parametrize(
("use_first_flag", "expects_assignment"),
[
pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
StoragePathFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
def test_id_range_parameter(self) -> None:
commandOutput = ""
Document.objects.create(
checksum="E",
title="E",
content="NOT the first document",
)
StoragePathFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
)
doc = DocumentFactory(content="ambiguous content")
call_command("document_retagger", "--tags", "--id-range", "1", "2")
# The retagger shouldn`t apply the 'first' tag to our new document
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 1)
call_command("document_retagger", "--storage_path", *use_first_flag)
try:
commandOutput = call_command("document_retagger", "--tags", "--id-range")
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "Error: argument --id-range: expected 2 arguments")
doc.refresh_from_db()
assert (doc.storage_path is not None) is expects_assignment
try:
commandOutput = call_command(
"document_retagger",
"--tags",
"--id-range",
"a",
"b",
)
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "error: argument --id-range: invalid int value:")
# ---------------------------------------------------------------------------
# ID range filtering
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerIdRange(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
("id_range_args", "expected_count"),
[
pytest.param(["1", "2"], 1, id="narrow_range_limits_scope"),
pytest.param(["1", "9999"], 2, id="wide_range_tags_all_matches"),
],
)
def test_id_range_limits_scope(
self,
tags: TagTuple,
id_range_args: list[str],
expected_count: int,
) -> None:
DocumentFactory(content="NOT the first document")
call_command("document_retagger", "--tags", "--id-range", *id_range_args)
tag_first, *_ = tags
assert Document.objects.filter(tags__id=tag_first.id).count() == expected_count
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
"args",
[
pytest.param(["--tags", "--id-range"], id="missing_both_values"),
pytest.param(["--tags", "--id-range", "a", "b"], id="non_integer_values"),
],
)
def test_id_range_invalid_arguments_raise(self, args: list[str]) -> None:
with pytest.raises((CommandError, SystemExit)):
call_command("document_retagger", *args)
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerEdgeCases(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_no_targets_exits_cleanly(self) -> None:
"""Calling the retagger with no classifier targets should not raise."""
call_command("document_retagger")
@pytest.mark.usefixtures("documents")
def test_inbox_only_skips_non_inbox_documents(self) -> None:
"""--inbox-only must restrict processing to documents with an inbox tag."""
call_command("document_retagger", "--tags", "--inbox-only")
d_first, _, d_unrelated, _ = _get_docs()
assert d_first.tags.count() == 0
assert d_unrelated.tags.count() == 2
call_command("document_retagger", "--tags", "--id-range", "1", "9999")
# Now we should have 2 documents
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 2)

View File

@@ -1,100 +1,107 @@
import logging
from unittest import mock
import pytest
from allauth.account.adapter import get_adapter
from allauth.core import context
from allauth.socialaccount.adapter import get_adapter as get_social_adapter
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.forms import ValidationError
from django.http import HttpRequest
from django.test import TestCase
from django.test import override_settings
from django.urls import reverse
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from rest_framework.authtoken.models import Token
from paperless.adapter import DrfTokenStrategy
@pytest.mark.django_db
class TestCustomAccountAdapter:
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
class TestCustomAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
adapter = get_adapter()
# With no accounts, signups should be allowed
assert adapter.is_open_for_signup(None)
self.assertTrue(adapter.is_open_for_signup(None))
User.objects.create_user("testuser")
# Test when ACCOUNT_ALLOW_SIGNUPS is True
settings.ACCOUNT_ALLOW_SIGNUPS = True
assert adapter.is_open_for_signup(None)
self.assertTrue(adapter.is_open_for_signup(None))
# Test when ACCOUNT_ALLOW_SIGNUPS is False
settings.ACCOUNT_ALLOW_SIGNUPS = False
assert not adapter.is_open_for_signup(None)
self.assertFalse(adapter.is_open_for_signup(None))
def test_is_safe_url(self, settings: SettingsWrapper) -> None:
def test_is_safe_url(self) -> None:
request = HttpRequest()
request.get_host = lambda: "example.com"
request.get_host = mock.Mock(return_value="example.com")
with context.request_context(request):
adapter = get_adapter()
with override_settings(ALLOWED_HOSTS=["*"]):
# True because request host is same
url = "https://example.com"
self.assertTrue(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["*"]
# True because request host is same
assert adapter.is_safe_url("https://example.com")
url = "https://evil.com"
# False despite wildcard because request host is different
assert not adapter.is_safe_url("https://evil.com")
self.assertFalse(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["example.com"]
url = "https://example.com"
# True because request host is same
assert adapter.is_safe_url("https://example.com")
self.assertTrue(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["*", "example.com"]
url = "//evil.com"
# False because request host is not in allowed hosts
assert not adapter.is_safe_url("//evil.com")
self.assertFalse(adapter.is_safe_url(url))
def test_pre_authenticate(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
mocker.patch("allauth.core.internal.ratelimit.consume", return_value=True)
@mock.patch("allauth.core.internal.ratelimit.consume", return_value=True)
def test_pre_authenticate(self, mock_consume) -> None:
adapter = get_adapter()
request = HttpRequest()
request.get_host = lambda: "example.com"
request.get_host = mock.Mock(return_value="example.com")
settings.DISABLE_REGULAR_LOGIN = False
adapter.pre_authenticate(request)
settings.DISABLE_REGULAR_LOGIN = True
with pytest.raises(ValidationError):
with self.assertRaises(ValidationError):
adapter.pre_authenticate(request)
def test_get_reset_password_from_key_url(self, settings: SettingsWrapper) -> None:
def test_get_reset_password_from_key_url(self) -> None:
request = HttpRequest()
request.get_host = lambda: "foo.org"
request.get_host = mock.Mock(return_value="foo.org")
with context.request_context(request):
adapter = get_adapter()
settings.PAPERLESS_URL = None
settings.ACCOUNT_DEFAULT_HTTP_PROTOCOL = "https"
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
# Test when PAPERLESS_URL is None
with override_settings(
PAPERLESS_URL=None,
ACCOUNT_DEFAULT_HTTP_PROTOCOL="https",
):
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
settings.PAPERLESS_URL = "https://bar.com"
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
# Test when PAPERLESS_URL is not None
with override_settings(PAPERLESS_URL="https://bar.com"):
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
@override_settings(ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
Group.objects.create(name="group1")
user = User.objects.create_user("testuser")
adapter = get_adapter()
form = mocker.MagicMock(
form = mock.Mock(
cleaned_data={
"username": "testuser",
"email": "user@example.com",
@@ -103,81 +110,88 @@ class TestCustomAccountAdapter:
user = adapter.save_user(HttpRequest(), user, form, commit=True)
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
def test_fresh_install_save_creates_superuser(self, mocker: MockerFixture) -> None:
def test_fresh_install_save_creates_superuser(self) -> None:
adapter = get_adapter()
form = mocker.MagicMock(
form = mock.Mock(
cleaned_data={
"username": "testuser",
"email": "user@paperless-ngx.com",
},
)
user = adapter.save_user(HttpRequest(), User(), form, commit=True)
assert user.is_superuser
self.assertTrue(user.is_superuser)
form = mocker.MagicMock(
# Next time, it should not create a superuser
form = mock.Mock(
cleaned_data={
"username": "testuser2",
"email": "user2@paperless-ngx.com",
},
)
user2 = adapter.save_user(HttpRequest(), User(), form, commit=True)
assert not user2.is_superuser
self.assertFalse(user2.is_superuser)
class TestCustomSocialAccountAdapter:
@pytest.mark.django_db
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
class TestCustomSocialAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
adapter = get_social_adapter()
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is True
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = True
assert adapter.is_open_for_signup(None, None)
self.assertTrue(adapter.is_open_for_signup(None, None))
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is False
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = False
assert not adapter.is_open_for_signup(None, None)
self.assertFalse(adapter.is_open_for_signup(None, None))
def test_get_connect_redirect_url(self) -> None:
adapter = get_social_adapter()
assert adapter.get_connect_redirect_url(None, None) == reverse("base")
request = None
socialaccount = None
@pytest.mark.django_db
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.SOCIAL_ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
# Test the default URL
expected_url = reverse("base")
self.assertEqual(
adapter.get_connect_redirect_url(request, socialaccount),
expected_url,
)
@override_settings(SOCIAL_ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
Group.objects.create(name="group1")
adapter = get_social_adapter()
request = HttpRequest()
user = User.objects.create_user("testuser")
sociallogin = mocker.MagicMock(user=user)
sociallogin = mock.Mock(
user=user,
)
user = adapter.save_user(HttpRequest(), sociallogin, None)
user = adapter.save_user(request, sociallogin, None)
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
def test_error_logged_on_authentication_error(
self,
caplog: pytest.LogCaptureFixture,
) -> None:
def test_error_logged_on_authentication_error(self) -> None:
adapter = get_social_adapter()
with caplog.at_level(logging.INFO, logger="paperless.auth"):
request = HttpRequest()
with self.assertLogs("paperless.auth", level="INFO") as log_cm:
adapter.on_authentication_error(
HttpRequest(),
request,
provider="test-provider",
error="Error",
exception="Test authentication error",
)
assert any("Test authentication error" in msg for msg in caplog.messages)
self.assertTrue(
any("Test authentication error" in message for message in log_cm.output),
)
@pytest.mark.django_db
class TestDrfTokenStrategy:
class TestDrfTokenStrategy(TestCase):
def test_create_access_token_creates_new_token(self) -> None:
"""
GIVEN:
@@ -187,6 +201,7 @@ class TestDrfTokenStrategy:
THEN:
- A new token is created and its key is returned
"""
user = User.objects.create_user("testuser")
request = HttpRequest()
request.user = user
@@ -194,9 +209,13 @@ class TestDrfTokenStrategy:
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key is not None
assert Token.objects.filter(user=user).exists()
assert token_key == Token.objects.get(user=user).key
# Verify a token was created
self.assertIsNotNone(token_key)
self.assertTrue(Token.objects.filter(user=user).exists())
# Verify the returned key matches the created token
token = Token.objects.get(user=user)
self.assertEqual(token_key, token.key)
def test_create_access_token_returns_existing_token(self) -> None:
"""
@@ -207,6 +226,7 @@ class TestDrfTokenStrategy:
THEN:
- The same token key is returned (no new token created)
"""
user = User.objects.create_user("testuser")
existing_token = Token.objects.create(user=user)
@@ -216,8 +236,11 @@ class TestDrfTokenStrategy:
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key == existing_token.key
assert Token.objects.filter(user=user).count() == 1
# Verify the existing token key is returned
self.assertEqual(token_key, existing_token.key)
# Verify only one token exists (no duplicate created)
self.assertEqual(Token.objects.filter(user=user).count(), 1)
def test_create_access_token_returns_none_for_unauthenticated_user(self) -> None:
"""
@@ -228,11 +251,12 @@ class TestDrfTokenStrategy:
THEN:
- None is returned and no token is created
"""
request = HttpRequest()
request.user = AnonymousUser()
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key is None
assert Token.objects.count() == 0
self.assertIsNone(token_key)
self.assertEqual(Token.objects.count(), 0)

View File

@@ -1,12 +1,15 @@
import os
from dataclasses import dataclass
from pathlib import Path
from unittest import mock
import pytest
from django.core.checks import Warning
from pytest_django.fixtures import SettingsWrapper
from django.test import TestCase
from django.test import override_settings
from pytest_mock import MockerFixture
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless.checks import audit_log_check
from paperless.checks import binaries_check
from paperless.checks import check_deprecated_db_settings
@@ -15,84 +18,54 @@ from paperless.checks import paths_check
from paperless.checks import settings_values_check
@dataclass(frozen=True, slots=True)
class PaperlessTestDirs:
data_dir: Path
media_dir: Path
consumption_dir: Path
# TODO: consolidate with documents/tests/conftest.py PaperlessDirs/paperless_dirs
# once the paperless and documents test suites are ready to share fixtures.
@pytest.fixture()
def directories(tmp_path: Path, settings: SettingsWrapper) -> PaperlessTestDirs:
data_dir = tmp_path / "data"
media_dir = tmp_path / "media"
consumption_dir = tmp_path / "consumption"
for d in (data_dir, media_dir, consumption_dir):
d.mkdir()
settings.DATA_DIR = data_dir
settings.MEDIA_ROOT = media_dir
settings.CONSUMPTION_DIR = consumption_dir
return PaperlessTestDirs(
data_dir=data_dir,
media_dir=media_dir,
consumption_dir=consumption_dir,
)
class TestChecks:
class TestChecks(DirectoriesMixin, TestCase):
def test_binaries(self) -> None:
assert binaries_check(None) == []
self.assertEqual(binaries_check(None), [])
def test_binaries_fail(self, settings: SettingsWrapper) -> None:
settings.CONVERT_BINARY = "uuuhh"
assert len(binaries_check(None)) == 1
@override_settings(CONVERT_BINARY="uuuhh")
def test_binaries_fail(self) -> None:
self.assertEqual(len(binaries_check(None)), 1)
@pytest.mark.usefixtures("directories")
def test_paths_check(self) -> None:
assert paths_check(None) == []
self.assertEqual(paths_check(None), [])
def test_paths_check_dont_exist(self, settings: SettingsWrapper) -> None:
settings.MEDIA_ROOT = Path("uuh")
settings.DATA_DIR = Path("whatever")
settings.CONSUMPTION_DIR = Path("idontcare")
@override_settings(
MEDIA_ROOT=Path("uuh"),
DATA_DIR=Path("whatever"),
CONSUMPTION_DIR=Path("idontcare"),
)
def test_paths_check_dont_exist(self) -> None:
msgs = paths_check(None)
self.assertEqual(len(msgs), 3, str(msgs))
for msg in msgs:
self.assertTrue(msg.msg.endswith("is set but doesn't exist."))
def test_paths_check_no_access(self) -> None:
Path(self.dirs.data_dir).chmod(0o000)
Path(self.dirs.media_dir).chmod(0o000)
Path(self.dirs.consumption_dir).chmod(0o000)
self.addCleanup(os.chmod, self.dirs.data_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.media_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.consumption_dir, 0o777)
msgs = paths_check(None)
self.assertEqual(len(msgs), 3)
assert len(msgs) == 3, str(msgs)
for msg in msgs:
assert msg.msg.endswith("is set but doesn't exist.")
self.assertTrue(msg.msg.endswith("is not writeable"))
def test_paths_check_no_access(self, directories: PaperlessTestDirs) -> None:
directories.data_dir.chmod(0o000)
directories.media_dir.chmod(0o000)
directories.consumption_dir.chmod(0o000)
@override_settings(DEBUG=False)
def test_debug_disabled(self) -> None:
self.assertEqual(debug_mode_check(None), [])
try:
msgs = paths_check(None)
finally:
directories.data_dir.chmod(0o777)
directories.media_dir.chmod(0o777)
directories.consumption_dir.chmod(0o777)
assert len(msgs) == 3
for msg in msgs:
assert msg.msg.endswith("is not writeable")
def test_debug_disabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = False
assert debug_mode_check(None) == []
def test_debug_enabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = True
assert len(debug_mode_check(None)) == 1
@override_settings(DEBUG=True)
def test_debug_enabled(self) -> None:
self.assertEqual(len(debug_mode_check(None)), 1)
class TestSettingsChecksAgainstDefaults:
class TestSettingsChecksAgainstDefaults(DirectoriesMixin, TestCase):
def test_all_valid(self) -> None:
"""
GIVEN:
@@ -103,71 +76,104 @@ class TestSettingsChecksAgainstDefaults:
- No system check errors reported
"""
msgs = settings_values_check(None)
assert len(msgs) == 0
self.assertEqual(len(msgs), 0)
class TestOcrSettingsChecks:
@pytest.mark.parametrize(
("setting", "value", "expected_msg"),
[
pytest.param(
"OCR_OUTPUT_TYPE",
"notapdf",
'OCR output type "notapdf"',
id="invalid-output-type",
),
pytest.param(
"OCR_MODE",
"makeitso",
'OCR output mode "makeitso"',
id="invalid-mode",
),
pytest.param(
"OCR_MODE",
"skip_noarchive",
"deprecated",
id="deprecated-mode",
),
pytest.param(
"OCR_SKIP_ARCHIVE_FILE",
"invalid",
'OCR_SKIP_ARCHIVE_FILE setting "invalid"',
id="invalid-skip-archive-file",
),
pytest.param(
"OCR_CLEAN",
"cleanme",
'OCR clean mode "cleanme"',
id="invalid-clean",
),
],
)
def test_invalid_setting_produces_one_error(
self,
settings: SettingsWrapper,
setting: str,
value: str,
expected_msg: str,
) -> None:
class TestOcrSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(OCR_OUTPUT_TYPE="notapdf")
def test_invalid_output_type(self) -> None:
"""
GIVEN:
- Default settings
- One OCR setting is set to an invalid value
- OCR output type is invalid
WHEN:
- Settings are validated
THEN:
- Exactly one system check error is reported containing the expected message
- system check error reported for OCR output type
"""
setattr(settings, setting, value)
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert expected_msg in msgs[0].msg
msg = msgs[0]
self.assertIn('OCR output type "notapdf"', msg.msg)
@override_settings(OCR_MODE="makeitso")
def test_invalid_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR output mode "makeitso"', msg.msg)
@override_settings(OCR_MODE="skip_noarchive")
def test_deprecated_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is deprecated
WHEN:
- Settings are validated
THEN:
- deprecation warning reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("deprecated", msg.msg)
@override_settings(OCR_SKIP_ARCHIVE_FILE="invalid")
def test_invalid_ocr_skip_archive_file(self) -> None:
"""
GIVEN:
- Default settings
- OCR_SKIP_ARCHIVE_FILE is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR_SKIP_ARCHIVE_FILE
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR_SKIP_ARCHIVE_FILE setting "invalid"', msg.msg)
@override_settings(OCR_CLEAN="cleanme")
def test_invalid_ocr_clean(self) -> None:
"""
GIVEN:
- Default settings
- OCR cleaning type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR cleaning type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR clean mode "cleanme"', msg.msg)
class TestTimezoneSettingsChecks:
def test_invalid_timezone(self, settings: SettingsWrapper) -> None:
class TestTimezoneSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(TIME_ZONE="TheMoon\\MyCrater")
def test_invalid_timezone(self) -> None:
"""
GIVEN:
- Default settings
@@ -177,16 +183,17 @@ class TestTimezoneSettingsChecks:
THEN:
- system check error reported for timezone
"""
settings.TIME_ZONE = "TheMoon\\MyCrater"
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert 'Timezone "TheMoon\\MyCrater"' in msgs[0].msg
msg = msgs[0]
self.assertIn('Timezone "TheMoon\\MyCrater"', msg.msg)
class TestEmailCertSettingsChecks:
def test_not_valid_file(self, settings: SettingsWrapper) -> None:
class TestEmailCertSettingsChecks(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@override_settings(EMAIL_CERTIFICATE_FILE=Path("/tmp/not_actually_here.pem"))
def test_not_valid_file(self) -> None:
"""
GIVEN:
- Default settings
@@ -196,22 +203,19 @@ class TestEmailCertSettingsChecks:
THEN:
- system check error reported for email certificate
"""
cert_path = Path("/tmp/not_actually_here.pem")
assert not cert_path.is_file()
settings.EMAIL_CERTIFICATE_FILE = cert_path
self.assertIsNotFile("/tmp/not_actually_here.pem")
msgs = settings_values_check(None)
assert len(msgs) == 1
assert "Email cert /tmp/not_actually_here.pem is not a file" in msgs[0].msg
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("Email cert /tmp/not_actually_here.pem is not a file", msg.msg)
class TestAuditLogChecks:
def test_was_enabled_once(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
class TestAuditLogChecks(TestCase):
def test_was_enabled_once(self) -> None:
"""
GIVEN:
- Audit log is not enabled
@@ -220,18 +224,23 @@ class TestAuditLogChecks:
THEN:
- system check error reported for disabling audit log
"""
settings.AUDIT_LOG_ENABLED = False
introspect_mock = mocker.MagicMock()
introspect_mock = mock.MagicMock()
introspect_mock.introspection.table_names.return_value = ["auditlog_logentry"]
mocker.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
)
with override_settings(AUDIT_LOG_ENABLED=False):
with mock.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
):
msgs = audit_log_check(None)
msgs = audit_log_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert "auditlog table was found but audit log is disabled." in msgs[0].msg
msg = msgs[0]
self.assertIn(
("auditlog table was found but audit log is disabled."),
msg.msg,
)
DEPRECATED_VARS: dict[str, str] = {
@@ -260,16 +269,20 @@ class TestDeprecatedDbSettings:
@pytest.mark.parametrize(
("env_var", "db_option_key"),
[
pytest.param("PAPERLESS_DB_TIMEOUT", "timeout", id="db-timeout"),
pytest.param(
"PAPERLESS_DB_POOLSIZE",
"pool.min_size / pool.max_size",
id="db-poolsize",
),
pytest.param("PAPERLESS_DBSSLMODE", "sslmode", id="ssl-mode"),
pytest.param("PAPERLESS_DBSSLROOTCERT", "sslrootcert", id="ssl-rootcert"),
pytest.param("PAPERLESS_DBSSLCERT", "sslcert", id="ssl-cert"),
pytest.param("PAPERLESS_DBSSLKEY", "sslkey", id="ssl-key"),
("PAPERLESS_DB_TIMEOUT", "timeout"),
("PAPERLESS_DB_POOLSIZE", "pool.min_size / pool.max_size"),
("PAPERLESS_DBSSLMODE", "sslmode"),
("PAPERLESS_DBSSLROOTCERT", "sslrootcert"),
("PAPERLESS_DBSSLCERT", "sslcert"),
("PAPERLESS_DBSSLKEY", "sslkey"),
],
ids=[
"db-timeout",
"db-poolsize",
"ssl-mode",
"ssl-rootcert",
"ssl-cert",
"ssl-key",
],
)
def test_single_deprecated_var_produces_one_warning(

View File

@@ -9,50 +9,35 @@ from paperless.utils import ocr_to_dateparser_languages
@pytest.mark.parametrize(
("ocr_language", "expected"),
[
pytest.param("eng", ["en"], id="single-language"),
pytest.param("fra+ita+lao", ["fr", "it", "lo"], id="multiple-languages"),
pytest.param("fil", ["fil"], id="no-two-letter-equivalent"),
pytest.param(
"aze_cyrl+srp_latn",
["az-Cyrl", "sr-Latn"],
id="script-supported-by-dateparser",
),
pytest.param(
"deu_frak",
["de"],
id="script-not-supported-falls-back-to-language",
),
pytest.param(
"chi_tra+chi_sim",
["zh"],
id="chinese-variants-collapse-to-general",
),
pytest.param(
"eng+unsupported_language+por",
["en", "pt"],
id="unsupported-language-skipped",
),
pytest.param(
"unsupported1+unsupported2",
[],
id="all-unsupported-returns-empty",
),
pytest.param("eng+eng", ["en"], id="duplicates-deduplicated"),
pytest.param(
"ita_unknownscript",
["it"],
id="unknown-script-falls-back-to-language",
),
# One language
("eng", ["en"]),
# Multiple languages
("fra+ita+lao", ["fr", "it", "lo"]),
# Languages that don't have a two-letter equivalent
("fil", ["fil"]),
# Languages with a script part supported by dateparser
("aze_cyrl+srp_latn", ["az-Cyrl", "sr-Latn"]),
# Languages with a script part not supported by dateparser
# In this case, default to the language without script
("deu_frak", ["de"]),
# Traditional and simplified chinese don't have the same name in dateparser,
# so they're converted to the general chinese language
("chi_tra+chi_sim", ["zh"]),
# If a language is not supported by dateparser, fallback to the supported ones
("eng+unsupported_language+por", ["en", "pt"]),
# If no language is supported, fallback to default
("unsupported1+unsupported2", []),
# Duplicate languages, should not duplicate in result
("eng+eng", ["en"]),
# Language with script, but script is not mapped
("ita_unknownscript", ["it"]),
],
)
def test_ocr_to_dateparser_languages(ocr_language: str, expected: list[str]) -> None:
def test_ocr_to_dateparser_languages(ocr_language, expected):
assert sorted(ocr_to_dateparser_languages(ocr_language)) == sorted(expected)
def test_ocr_to_dateparser_languages_exception(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
def test_ocr_to_dateparser_languages_exception(monkeypatch, caplog):
# Patch LocaleDataLoader.get_locale_map to raise an exception
class DummyLoader:
def get_locale_map(self, locales=None):

View File

@@ -1,31 +1,24 @@
import tempfile
from pathlib import Path
from django.test import Client
from pytest_django.fixtures import SettingsWrapper
from django.test import override_settings
def test_favicon_view(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
favicon_path = tmp_path / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
def test_favicon_view(client):
with tempfile.TemporaryDirectory() as tmpdir:
static_dir = Path(tmpdir)
favicon_path = static_dir / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True, exist_ok=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
with override_settings(STATIC_ROOT=static_dir):
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
def test_favicon_view_missing_file(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 404
def test_favicon_view_missing_file(client):
with override_settings(STATIC_ROOT=Path(tempfile.mkdtemp())):
response = client.get("/favicon.ico")
assert response.status_code == 404

10
uv.lock generated
View File

@@ -1342,11 +1342,11 @@ wheels = [
[[package]]
name = "faker"
version = "40.5.1"
version = "40.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/03/2a/96fff3edcb10f6505143448a4b91535f77b74865cec45be52690ee280443/faker-40.5.1.tar.gz", hash = "sha256:70222361cd82aa10cb86066d1a4e8f47f2bcdc919615c412045a69c4e6da0cd3", size = 1952684, upload-time = "2026-02-23T21:34:38.362Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5e/77/1c3ff07b6739b9a1d23ca01ec0a90a309a33b78e345a3eb52f9ce9240e36/faker-40.1.2.tar.gz", hash = "sha256:b76a68163aa5f171d260fc24827a8349bc1db672f6a665359e8d0095e8135d30", size = 1949802, upload-time = "2026-01-13T20:51:49.917Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4d/a9/1eed4db92d0aec2f9bfdf1faae0ab0418b5e121dda5701f118a7a4f0cd6a/faker-40.5.1-py3-none-any.whl", hash = "sha256:c69640c1e13bad49b4bcebcbf1b52f9f1a872b6ea186c248ada34d798f1661bf", size = 1987053, upload-time = "2026-02-23T21:34:36.418Z" },
{ url = "https://files.pythonhosted.org/packages/46/ec/91a434c8a53d40c3598966621dea9c50512bec6ce8e76fa1751015e74cef/faker-40.1.2-py3-none-any.whl", hash = "sha256:93503165c165d330260e4379fd6dc07c94da90c611ed3191a0174d2ab9966a42", size = 1985633, upload-time = "2026-01-13T20:51:47.982Z" },
]
[[package]]
@@ -3121,7 +3121,6 @@ webserver = [
dev = [
{ name = "daphne", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "factory-boy", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "faker", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "imagehash", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "prek", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -3146,7 +3145,6 @@ lint = [
testing = [
{ name = "daphne", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "factory-boy", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "faker", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "imagehash", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest-cov", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -3259,7 +3257,6 @@ provides-extras = ["mariadb", "postgres", "webserver"]
dev = [
{ name = "daphne" },
{ name = "factory-boy", specifier = "~=3.3.1" },
{ name = "faker", specifier = "~=40.5.1" },
{ name = "imagehash" },
{ name = "prek", specifier = "~=0.3.0" },
{ name = "pytest", specifier = "~=9.0.0" },
@@ -3282,7 +3279,6 @@ lint = [
testing = [
{ name = "daphne" },
{ name = "factory-boy", specifier = "~=3.3.1" },
{ name = "faker", specifier = "~=40.5.1" },
{ name = "imagehash" },
{ name = "pytest", specifier = "~=9.0.0" },
{ name = "pytest-cov", specifier = "~=7.0.0" },