Compare commits

...

6 Commits

Author SHA1 Message Date
Trenton H
ab08d60b65 docs: drop Python 3.10, add 3.13 and 3.14 support 2026-03-03 11:44:18 -08:00
Trenton H
af95ee3876 Account for the Python 3.14 changes 2026-03-03 11:35:21 -08:00
Trenton H
b693636c7c Typo on 3.13 2026-03-03 11:34:40 -08:00
Trenton H
ae7f3df134 Upgrades minimum supported Python to 3.11 2026-03-03 11:34:40 -08:00
dependabot[bot]
9c0f112e94 docker(deps): Bump astral-sh/uv (#12191)
Bumps [astral-sh/uv](https://github.com/astral-sh/uv) from 0.10.5-python3.12-trixie-slim to 0.10.7-python3.12-trixie-slim.
- [Release notes](https://github.com/astral-sh/uv/releases)
- [Changelog](https://github.com/astral-sh/uv/blob/main/CHANGELOG.md)
- [Commits](https://github.com/astral-sh/uv/compare/0.10.5...0.10.7)

---
updated-dependencies:
- dependency-name: astral-sh/uv
  dependency-version: 0.10.7-python3.12-trixie-slim
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-03 07:56:35 -08:00
Trenton H
43406f44f2 Feature: Improve the retagger output using rich (#12194) 2026-03-03 07:14:59 -08:00
25 changed files with 1253 additions and 1261 deletions

View File

@@ -14,10 +14,6 @@ component_management:
# https://docs.codecov.com/docs/carryforward-flags # https://docs.codecov.com/docs/carryforward-flags
flags: flags:
# Backend Python versions # Backend Python versions
backend-python-3.10:
paths:
- src/**
carryforward: true
backend-python-3.11: backend-python-3.11:
paths: paths:
- src/** - src/**
@@ -26,6 +22,14 @@ flags:
paths: paths:
- src/** - src/**
carryforward: true carryforward: true
backend-python-3.13:
paths:
- src/**
carryforward: true
backend-python-3.14:
paths:
- src/**
carryforward: true
# Frontend (shards merge into single flag) # Frontend (shards merge into single flag)
frontend-node-24.x: frontend-node-24.x:
paths: paths:
@@ -41,9 +45,10 @@ coverage:
project: project:
backend: backend:
flags: flags:
- backend-python-3.10
- backend-python-3.11 - backend-python-3.11
- backend-python-3.12 - backend-python-3.12
- backend-python-3.13
- backend-python-3.14
paths: paths:
- src/** - src/**
# https://docs.codecov.com/docs/commit-status#threshold # https://docs.codecov.com/docs/commit-status#threshold
@@ -59,9 +64,10 @@ coverage:
patch: patch:
backend: backend:
flags: flags:
- backend-python-3.10
- backend-python-3.11 - backend-python-3.11
- backend-python-3.12 - backend-python-3.12
- backend-python-3.13
- backend-python-3.14
paths: paths:
- src/** - src/**
target: 100% target: 100%

View File

@@ -31,7 +31,7 @@ jobs:
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
strategy: strategy:
matrix: matrix:
python-version: ['3.10', '3.11', '3.12'] python-version: ['3.11', '3.12', '3.13', '3.14']
fail-fast: false fail-fast: false
steps: steps:
- name: Checkout - name: Checkout

View File

@@ -13,7 +13,9 @@ If you want to implement something big:
## Python ## Python
Paperless supports python 3.10 - 3.12 at this time. We format Python code with [ruff](https://docs.astral.sh/ruff/formatter/). Paperless-ngx currently supports Python 3.11, 3.12, 3.13, and 3.14. As a policy, we aim to support at least the three most recent Python versions, and drop support for versions as they reach end-of-life. Older versions may be supported if dependencies permit, but this is not guaranteed.
We format Python code with [ruff](https://docs.astral.sh/ruff/formatter/).
## Branches ## Branches

View File

@@ -30,7 +30,7 @@ RUN set -eux \
# Purpose: Installs s6-overlay and rootfs # Purpose: Installs s6-overlay and rootfs
# Comments: # Comments:
# - Don't leave anything extra in here either # - Don't leave anything extra in here either
FROM ghcr.io/astral-sh/uv:0.10.5-python3.12-trixie-slim AS s6-overlay-base FROM ghcr.io/astral-sh/uv:0.10.7-python3.12-trixie-slim AS s6-overlay-base
WORKDIR /usr/src/s6 WORKDIR /usr/src/s6

View File

@@ -172,7 +172,7 @@ to enable polling and disable inotify. See [here](configuration.md#polling).
#### Prerequisites #### Prerequisites
- Paperless runs on Linux only, Windows is not supported. - Paperless runs on Linux only, Windows is not supported.
- Python 3 is required with versions 3.10 - 3.12 currently supported. Newer versions may work, but some dependencies may not be fully compatible. - Python 3.11, 3.12, 3.13, or 3.14 is required. As a policy, Paperless-ngx aims to support at least the three most recent Python versions and drops support for versions as they reach end-of-life. Newer versions may work, but some dependencies may not be fully compatible.
#### Installation #### Installation

View File

@@ -3,10 +3,9 @@ name = "paperless-ngx"
version = "2.20.9" version = "2.20.9"
description = "A community-supported supercharged document management system: scan, index and archive all your physical documents" description = "A community-supported supercharged document management system: scan, index and archive all your physical documents"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.11"
classifiers = [ classifiers = [
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.13",
@@ -111,6 +110,7 @@ docs = [
testing = [ testing = [
"daphne", "daphne",
"factory-boy~=3.3.1", "factory-boy~=3.3.1",
"faker~=40.5.1",
"imagehash", "imagehash",
"pytest~=9.0.0", "pytest~=9.0.0",
"pytest-cov~=7.0.0", "pytest-cov~=7.0.0",
@@ -176,7 +176,7 @@ torch = [
] ]
[tool.ruff] [tool.ruff]
target-version = "py310" target-version = "py311"
line-length = 88 line-length = 88
src = [ src = [
"src", "src",

View File

@@ -1,5 +1,5 @@
from datetime import UTC
from datetime import datetime from datetime import datetime
from datetime import timezone
from typing import Any from typing import Any
from django.conf import settings from django.conf import settings
@@ -139,7 +139,7 @@ def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
# No cache, get the timestamp and cache the datetime # No cache, get the timestamp and cache the datetime
last_modified = datetime.fromtimestamp( last_modified = datetime.fromtimestamp(
doc.thumbnail_path.stat().st_mtime, doc.thumbnail_path.stat().st_mtime,
tz=timezone.utc, tz=UTC,
) )
cache.set(doc_key, last_modified, CACHE_50_MINUTES) cache.set(doc_key, last_modified, CACHE_50_MINUTES)
return last_modified return last_modified

View File

@@ -2,7 +2,7 @@ import datetime
import hashlib import hashlib
import os import os
import tempfile import tempfile
from enum import Enum from enum import StrEnum
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Final from typing import Final
@@ -81,7 +81,7 @@ class ConsumerError(Exception):
pass pass
class ConsumerStatusShortMessage(str, Enum): class ConsumerStatusShortMessage(StrEnum):
DOCUMENT_ALREADY_EXISTS = "document_already_exists" DOCUMENT_ALREADY_EXISTS = "document_already_exists"
DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash" DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash"
ASN_ALREADY_EXISTS = "asn_already_exists" ASN_ALREADY_EXISTS = "asn_already_exists"

View File

@@ -5,10 +5,10 @@ import math
import re import re
from collections import Counter from collections import Counter
from contextlib import contextmanager from contextlib import contextmanager
from datetime import UTC
from datetime import datetime from datetime import datetime
from datetime import time from datetime import time
from datetime import timedelta from datetime import timedelta
from datetime import timezone
from shutil import rmtree from shutil import rmtree
from time import sleep from time import sleep
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -437,7 +437,7 @@ class ManualResults:
class LocalDateParser(English): class LocalDateParser(English):
def reverse_timezone_offset(self, d): def reverse_timezone_offset(self, d):
return (d.replace(tzinfo=django_timezone.get_current_timezone())).astimezone( return (d.replace(tzinfo=django_timezone.get_current_timezone())).astimezone(
timezone.utc, UTC,
) )
def date_from(self, *args, **kwargs): def date_from(self, *args, **kwargs):
@@ -641,8 +641,8 @@ def rewrite_natural_date_keywords(query_string: str) -> str:
end = datetime(local_now.year - 1, 12, 31, 23, 59, 59, tzinfo=tz) end = datetime(local_now.year - 1, 12, 31, 23, 59, 59, tzinfo=tz)
# Convert to UTC and format # Convert to UTC and format
start_str = start.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") start_str = start.astimezone(UTC).strftime("%Y%m%d%H%M%S")
end_str = end.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") end_str = end.astimezone(UTC).strftime("%Y%m%d%H%M%S")
return f"{field}:[{start_str} TO {end_str}]" return f"{field}:[{start_str} TO {end_str}]"
return re.sub(pattern, repl, query_string, flags=re.IGNORECASE) return re.sub(pattern, repl, query_string, flags=re.IGNORECASE)

View File

@@ -6,11 +6,14 @@ Provides automatic progress bar and multiprocessing support with minimal boilerp
from __future__ import annotations from __future__ import annotations
import logging
import os import os
from collections.abc import Callable
from collections.abc import Iterable from collections.abc import Iterable
from collections.abc import Sized from collections.abc import Sized
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed from concurrent.futures import as_completed
from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
@@ -22,7 +25,11 @@ from django import db
from django.core.management import CommandError from django.core.management import CommandError
from django.db.models import QuerySet from django.db.models import QuerySet
from django_rich.management import RichCommand from django_rich.management import RichCommand
from rich import box
from rich.console import Console from rich.console import Console
from rich.console import Group
from rich.console import RenderableType
from rich.live import Live
from rich.progress import BarColumn from rich.progress import BarColumn
from rich.progress import MofNCompleteColumn from rich.progress import MofNCompleteColumn
from rich.progress import Progress from rich.progress import Progress
@@ -30,11 +37,11 @@ from rich.progress import SpinnerColumn
from rich.progress import TextColumn from rich.progress import TextColumn
from rich.progress import TimeElapsedColumn from rich.progress import TimeElapsedColumn
from rich.progress import TimeRemainingColumn from rich.progress import TimeRemainingColumn
from rich.table import Table
from rich.text import Text
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Generator from collections.abc import Generator
from collections.abc import Iterable
from collections.abc import Sequence from collections.abc import Sequence
from django.core.management import CommandParser from django.core.management import CommandParser
@@ -43,6 +50,78 @@ T = TypeVar("T")
R = TypeVar("R") R = TypeVar("R")
@dataclass(slots=True, frozen=True)
class _BufferedRecord:
level: int
name: str
message: str
class BufferingLogHandler(logging.Handler):
"""Captures log records during a command run for deferred rendering.
Attach to a logger before a long operation and call ``render()``
afterwards to emit the buffered records via Rich, optionally filtered
by minimum level.
"""
def __init__(self) -> None:
super().__init__()
self._records: list[_BufferedRecord] = []
def emit(self, record: logging.LogRecord) -> None:
self._records.append(
_BufferedRecord(
level=record.levelno,
name=record.name,
message=self.format(record),
),
)
def render(
self,
console: Console,
*,
min_level: int = logging.DEBUG,
title: str = "Log Output",
) -> None:
records = [r for r in self._records if r.level >= min_level]
if not records:
return
table = Table(
title=title,
show_header=True,
header_style="bold",
show_lines=False,
box=box.SIMPLE,
)
table.add_column("Level", style="bold", width=8)
table.add_column("Logger", style="dim")
table.add_column("Message", no_wrap=False)
_level_styles: dict[int, str] = {
logging.DEBUG: "dim",
logging.INFO: "cyan",
logging.WARNING: "yellow",
logging.ERROR: "red",
logging.CRITICAL: "bold red",
}
for record in records:
style = _level_styles.get(record.level, "")
table.add_row(
Text(logging.getLevelName(record.level), style=style),
record.name,
record.message,
)
console.print(table)
def clear(self) -> None:
self._records.clear()
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class ProcessResult(Generic[T, R]): class ProcessResult(Generic[T, R]):
""" """
@@ -91,6 +170,23 @@ class PaperlessCommand(RichCommand):
for result in self.process_parallel(process_doc, ids): for result in self.process_parallel(process_doc, ids):
if result.error: if result.error:
self.console.print(f"[red]Failed: {result.error}[/red]") self.console.print(f"[red]Failed: {result.error}[/red]")
class Command(PaperlessCommand):
help = "Import documents with live stats"
def handle(self, *args, **options):
stats = ImportStats()
def render_stats() -> Table:
... # build Rich Table from stats
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
result = import_item(item)
stats.imported += 1
""" """
supports_progress_bar: ClassVar[bool] = True supports_progress_bar: ClassVar[bool] = True
@@ -128,13 +224,11 @@ class PaperlessCommand(RichCommand):
This is called by Django's command infrastructure after argument parsing This is called by Django's command infrastructure after argument parsing
but before handle(). We use it to set instance attributes from options. but before handle(). We use it to set instance attributes from options.
""" """
# Set progress bar state
if self.supports_progress_bar: if self.supports_progress_bar:
self.no_progress_bar = options.get("no_progress_bar", False) self.no_progress_bar = options.get("no_progress_bar", False)
else: else:
self.no_progress_bar = True self.no_progress_bar = True
# Set multiprocessing state
if self.supports_multiprocessing: if self.supports_multiprocessing:
self.process_count = options.get("processes", 1) self.process_count = options.get("processes", 1)
if self.process_count < 1: if self.process_count < 1:
@@ -144,9 +238,69 @@ class PaperlessCommand(RichCommand):
return super().execute(*args, **options) return super().execute(*args, **options)
@contextmanager
def buffered_logging(
self,
*logger_names: str,
level: int = logging.DEBUG,
) -> Generator[BufferingLogHandler, None, None]:
"""Context manager that captures log output from named loggers.
Installs a ``BufferingLogHandler`` on each named logger for the
duration of the block, suppressing propagation to avoid interleaving
with the Rich live display. The handler is removed on exit regardless
of whether an exception occurred.
Usage::
with self.buffered_logging("paperless", "documents") as log_buf:
# ... run progress loop ...
if options["verbose"]:
log_buf.render(self.console)
"""
handler = BufferingLogHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
loggers: list[logging.Logger] = []
original_propagate: dict[str, bool] = {}
for name in logger_names:
log = logging.getLogger(name)
log.addHandler(handler)
original_propagate[name] = log.propagate
log.propagate = False
loggers.append(log)
try:
yield handler
finally:
for log in loggers:
log.removeHandler(handler)
log.propagate = original_propagate[log.name]
@staticmethod
def _progress_columns() -> tuple[Any, ...]:
"""
Return the standard set of progress bar columns.
Extracted so both _create_progress (standalone) and track_with_stats
(inside Live) use identical column configuration without duplication.
"""
return (
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
)
def _create_progress(self, description: str) -> Progress: def _create_progress(self, description: str) -> Progress:
""" """
Create a configured Progress instance. Create a standalone Progress instance with its own stderr Console.
Use this for track(). For track_with_stats(), Progress is created
directly inside a Live context instead.
Progress output is directed to stderr to match the convention that Progress output is directed to stderr to match the convention that
progress bars are transient UI feedback, not command output. This progress bars are transient UI feedback, not command output. This
@@ -161,12 +315,7 @@ class PaperlessCommand(RichCommand):
A Progress instance configured with appropriate columns. A Progress instance configured with appropriate columns.
""" """
return Progress( return Progress(
SpinnerColumn(), *self._progress_columns(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=Console(stderr=True), console=Console(stderr=True),
transient=False, transient=False,
) )
@@ -222,7 +371,6 @@ class PaperlessCommand(RichCommand):
yield from iterable yield from iterable
return return
# Attempt to determine total if not provided
if total is None: if total is None:
total = self._get_iterable_length(iterable) total = self._get_iterable_length(iterable)
@@ -232,6 +380,87 @@ class PaperlessCommand(RichCommand):
yield item yield item
progress.advance(task_id) progress.advance(task_id)
def track_with_stats(
self,
iterable: Iterable[T],
*,
description: str = "Processing...",
stats_renderer: Callable[[], RenderableType],
total: int | None = None,
) -> Generator[T, None, None]:
"""
Iterate over items with a progress bar and a live-updating stats display.
The progress bar and stats renderable are combined in a single Live
context, so the stats panel re-renders in place below the progress bar
after each item is processed.
Respects --no-progress-bar flag. When disabled, yields items without
any display (stats are still updated by the caller's loop body, so
they will be accurate for any post-loop summary the caller prints).
Args:
iterable: The items to iterate over.
description: Text to display alongside the progress bar.
stats_renderer: Zero-argument callable that returns a Rich
renderable. Called after each item to refresh the display.
The caller typically closes over a mutable dataclass and
rebuilds a Table from it on each call.
total: Total number of items. If None, attempts to determine
automatically via .count() (for querysets) or len().
Yields:
Items from the iterable.
Example:
@dataclass
class Stats:
processed: int = 0
failed: int = 0
stats = Stats()
def render_stats() -> Table:
table = Table(box=None)
table.add_column("Processed")
table.add_column("Failed")
table.add_row(str(stats.processed), str(stats.failed))
return table
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
try:
import_item(item)
stats.processed += 1
except Exception:
stats.failed += 1
"""
if self.no_progress_bar:
yield from iterable
return
if total is None:
total = self._get_iterable_length(iterable)
stderr_console = Console(stderr=True)
# Progress is created without its own console so Live controls rendering.
progress = Progress(*self._progress_columns())
task_id = progress.add_task(description, total=total)
with Live(
Group(progress, stats_renderer()),
console=stderr_console,
refresh_per_second=4,
) as live:
for item in iterable:
yield item
progress.advance(task_id)
live.update(Group(progress, stats_renderer()))
def process_parallel( def process_parallel(
self, self,
fn: Callable[[T], R], fn: Callable[[T], R],
@@ -269,7 +498,7 @@ class PaperlessCommand(RichCommand):
total = len(items) total = len(items)
if self.process_count == 1: if self.process_count == 1:
# Sequential execution in main process - critical for testing # Sequential execution in main process - critical for testing, so we don't fork in fork, etc
yield from self._process_sequential(fn, items, description, total) yield from self._process_sequential(fn, items, description, total)
else: else:
# Parallel execution with ProcessPoolExecutor # Parallel execution with ProcessPoolExecutor
@@ -298,6 +527,7 @@ class PaperlessCommand(RichCommand):
total: int, total: int,
) -> Generator[ProcessResult[T, R], None, None]: ) -> Generator[ProcessResult[T, R], None, None]:
"""Process items in parallel using ProcessPoolExecutor.""" """Process items in parallel using ProcessPoolExecutor."""
# Close database connections before forking - required for PostgreSQL # Close database connections before forking - required for PostgreSQL
db.connections.close_all() db.connections.close_all()

View File

@@ -1,4 +1,12 @@
from __future__ import annotations
import logging import logging
from dataclasses import dataclass
from dataclasses import field
from typing import TYPE_CHECKING
from rich.table import Table
from rich.text import Text
from documents.classifier import load_classifier from documents.classifier import load_classifier
from documents.management.commands.base import PaperlessCommand from documents.management.commands.base import PaperlessCommand
@@ -8,9 +16,162 @@ from documents.signals.handlers import set_document_type
from documents.signals.handlers import set_storage_path from documents.signals.handlers import set_storage_path
from documents.signals.handlers import set_tags from documents.signals.handlers import set_tags
if TYPE_CHECKING:
from rich.console import RenderableType
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
logger = logging.getLogger("paperless.management.retagger") logger = logging.getLogger("paperless.management.retagger")
@dataclass(slots=True)
class RetaggerStats:
"""Cumulative counters updated as the retagger processes documents.
Mutable by design -- fields are incremented in the processing loop.
slots=True reduces per-instance memory overhead and speeds attribute access.
"""
correspondents: int = 0
document_types: int = 0
tags_added: int = 0
tags_removed: int = 0
storage_paths: int = 0
documents_processed: int = 0
@dataclass(slots=True)
class DocumentSuggestion:
"""Buffered classifier suggestions for a single document (suggest mode only).
Mutable by design -- fields are assigned incrementally as each setter runs.
"""
document: Document
correspondent: Correspondent | None = None
document_type: DocumentType | None = None
tags_to_add: frozenset[Tag] = field(default_factory=frozenset)
tags_to_remove: frozenset[Tag] = field(default_factory=frozenset)
storage_path: StoragePath | None = None
@property
def has_suggestions(self) -> bool:
return bool(
self.correspondent is not None
or self.document_type is not None
or self.tags_to_add
or self.tags_to_remove
or self.storage_path is not None,
)
def _build_stats_table(stats: RetaggerStats, *, suggest: bool) -> Table:
"""
Build the live-updating stats table shown below the progress bar.
In suggest mode the labels read "would set / would add" to make clear
that nothing has been written to the database.
"""
table = Table(box=None, padding=(0, 2), show_header=True, header_style="bold")
table.add_column("Documents")
table.add_column("Correspondents")
table.add_column("Doc Types")
table.add_column("Tags (+)")
table.add_column("Tags (-)")
table.add_column("Storage Paths")
verb = "would set" if suggest else "set"
table.add_row(
str(stats.documents_processed),
f"{stats.correspondents} {verb}",
f"{stats.document_types} {verb}",
f"+{stats.tags_added}",
f"-{stats.tags_removed}",
f"{stats.storage_paths} {verb}",
)
return table
def _build_suggestion_table(
suggestions: list[DocumentSuggestion],
base_url: str | None,
) -> Table:
"""
Build the final suggestion table printed after the progress bar completes.
Only documents with at least one suggestion are included.
"""
table = Table(
title="Suggested Changes",
show_header=True,
header_style="bold cyan",
show_lines=True,
)
table.add_column("Document", style="bold", no_wrap=False, min_width=20)
table.add_column("Correspondent")
table.add_column("Doc Type")
table.add_column("Tags")
table.add_column("Storage Path")
for suggestion in suggestions:
if not suggestion.has_suggestions:
continue
doc = suggestion.document
if base_url:
doc_cell = Text()
doc_cell.append(str(doc))
doc_cell.append(f"\n{base_url}/documents/{doc.pk}", style="dim")
else:
doc_cell = Text(f"{doc} [{doc.pk}]")
tag_parts: list[str] = []
for tag in sorted(suggestion.tags_to_add, key=lambda t: t.name):
tag_parts.append(f"[green]+{tag.name}[/green]")
for tag in sorted(suggestion.tags_to_remove, key=lambda t: t.name):
tag_parts.append(f"[red]-{tag.name}[/red]")
tag_cell = Text.from_markup(", ".join(tag_parts)) if tag_parts else Text("-")
table.add_row(
doc_cell,
str(suggestion.correspondent) if suggestion.correspondent else "-",
str(suggestion.document_type) if suggestion.document_type else "-",
tag_cell,
str(suggestion.storage_path) if suggestion.storage_path else "-",
)
return table
def _build_summary_table(stats: RetaggerStats) -> Table:
"""Build the final applied-changes summary table."""
table = Table(
title="Retagger Summary",
show_header=True,
header_style="bold cyan",
)
table.add_column("Metric", style="bold")
table.add_column("Count", justify="right")
table.add_row("Documents processed", str(stats.documents_processed))
table.add_row("Correspondents set", str(stats.correspondents))
table.add_row("Document types set", str(stats.document_types))
table.add_row("Tags added", str(stats.tags_added))
table.add_row("Tags removed", str(stats.tags_removed))
table.add_row("Storage paths set", str(stats.storage_paths))
return table
class Command(PaperlessCommand): class Command(PaperlessCommand):
help = ( help = (
"Using the current classification model, assigns correspondents, tags " "Using the current classification model, assigns correspondents, tags "
@@ -19,7 +180,7 @@ class Command(PaperlessCommand):
"modified) after their initial import." "modified) after their initial import."
) )
def add_arguments(self, parser): def add_arguments(self, parser) -> None:
super().add_arguments(parser) super().add_arguments(parser)
parser.add_argument("-c", "--correspondent", default=False, action="store_true") parser.add_argument("-c", "--correspondent", default=False, action="store_true")
parser.add_argument("-T", "--tags", default=False, action="store_true") parser.add_argument("-T", "--tags", default=False, action="store_true")
@@ -31,9 +192,9 @@ class Command(PaperlessCommand):
default=False, default=False,
action="store_true", action="store_true",
help=( help=(
"By default this command won't try to assign a correspondent " "By default this command will not try to assign a correspondent "
"if more than one matches the document. Use this flag if " "if more than one matches the document. Use this flag to pick "
"you'd rather it just pick the first one it finds." "the first match instead."
), ),
) )
parser.add_argument( parser.add_argument(
@@ -42,91 +203,140 @@ class Command(PaperlessCommand):
default=False, default=False,
action="store_true", action="store_true",
help=( help=(
"If set, the document retagger will overwrite any previously " "Overwrite any previously set correspondent, document type, and "
"set correspondent, document and remove correspondents, types " "remove tags that no longer match due to changed rules."
"and tags that do not match anymore due to changed rules."
), ),
) )
parser.add_argument( parser.add_argument(
"--suggest", "--suggest",
default=False, default=False,
action="store_true", action="store_true",
help="Return the suggestion, don't change anything.", help="Show what would be changed without applying anything.",
) )
parser.add_argument( parser.add_argument(
"--base-url", "--base-url",
help="The base URL to use to build the link to the documents.", help="Base URL used to build document links in suggest output.",
) )
parser.add_argument( parser.add_argument(
"--id-range", "--id-range",
help="A range of document ids on which the retagging should be applied.", help="Restrict retagging to documents within this ID range (inclusive).",
nargs=2, nargs=2,
type=int, type=int,
) )
def handle(self, *args, **options): def handle(self, *args, **options) -> None:
suggest: bool = options["suggest"]
overwrite: bool = options["overwrite"]
use_first: bool = options["use_first"]
base_url: str | None = options["base_url"]
do_correspondent: bool = options["correspondent"]
do_document_type: bool = options["document_type"]
do_tags: bool = options["tags"]
do_storage_path: bool = options["storage_path"]
if not any([do_correspondent, do_document_type, do_tags, do_storage_path]):
self.console.print(
"[yellow]No classifier targets specified. "
"Use -c, -T, -t, or -s to select what to retag.[/yellow]",
)
return
if options["inbox_only"]: if options["inbox_only"]:
queryset = Document.objects.filter(tags__is_inbox_tag=True) queryset = Document.objects.filter(tags__is_inbox_tag=True)
else: else:
queryset = Document.objects.all() queryset = Document.objects.all()
if options["id_range"]: if options["id_range"]:
queryset = queryset.filter( lo, hi = options["id_range"]
id__range=(options["id_range"][0], options["id_range"][1]), queryset = queryset.filter(id__range=(lo, hi))
)
documents = queryset.distinct() documents = queryset.distinct()
classifier = load_classifier() classifier = load_classifier()
for document in self.track(documents, description="Retagging..."): stats = RetaggerStats()
if options["correspondent"]: suggestions: list[DocumentSuggestion] = []
set_correspondent(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
if options["document_type"]: def render_stats() -> RenderableType:
set_document_type( return _build_stats_table(stats, suggest=suggest)
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
if options["tags"]: with self.buffered_logging(
set_tags( "paperless",
sender=None, "paperless.handlers",
document=document, "documents",
classifier=classifier, ) as log_buf:
replace=options["overwrite"], for document in self.track_with_stats(
suggest=options["suggest"], documents,
base_url=options["base_url"], description="Retagging...",
stdout=self.stdout, stats_renderer=render_stats,
style_func=self.style, ):
) suggestion = DocumentSuggestion(document=document)
if options["storage_path"]: if do_correspondent:
set_storage_path( correspondent = set_correspondent(
sender=None, None,
document=document, document,
classifier=classifier, classifier=classifier,
replace=options["overwrite"], replace=overwrite,
use_first=options["use_first"], use_first=use_first,
suggest=options["suggest"], dry_run=suggest,
base_url=options["base_url"], )
stdout=self.stdout, if correspondent is not None:
style_func=self.style, stats.correspondents += 1
) suggestion.correspondent = correspondent
if do_document_type:
document_type = set_document_type(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if document_type is not None:
stats.document_types += 1
suggestion.document_type = document_type
if do_tags:
tags_to_add, tags_to_remove = set_tags(
None,
document,
classifier=classifier,
replace=overwrite,
dry_run=suggest,
)
stats.tags_added += len(tags_to_add)
stats.tags_removed += len(tags_to_remove)
suggestion.tags_to_add = frozenset(tags_to_add)
suggestion.tags_to_remove = frozenset(tags_to_remove)
if do_storage_path:
storage_path = set_storage_path(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if storage_path is not None:
stats.storage_paths += 1
suggestion.storage_path = storage_path
stats.documents_processed += 1
if suggest:
suggestions.append(suggestion)
# Post-loop output
if suggest:
visible = [s for s in suggestions if s.has_suggestions]
if visible:
self.console.print(_build_suggestion_table(visible, base_url))
else:
self.console.print("[green]No changes suggested.[/green]")
else:
self.console.print(_build_summary_table(stats))
log_buf.render(self.console, min_level=logging.INFO, title="Retagger Log")

View File

@@ -75,7 +75,7 @@ class MatchingModel(ModelWithOwner):
is_insensitive = models.BooleanField(_("is insensitive"), default=True) is_insensitive = models.BooleanField(_("is insensitive"), default=True)
class Meta: class Meta(ModelWithOwner.Meta):
abstract = True abstract = True
ordering = ("name",) ordering = ("name",)
constraints = [ constraints = [

View File

@@ -9,7 +9,7 @@ from types import TracebackType
try: try:
from typing import Self from typing import Self
except ImportError: except ImportError:
from typing_extensions import Self from typing import Self
import dateparser import dateparser

View File

@@ -8,7 +8,7 @@ if TYPE_CHECKING:
from channels_redis.pubsub import RedisPubSubChannelLayer from channels_redis.pubsub import RedisPubSubChannelLayer
class ProgressStatusOptions(str, enum.Enum): class ProgressStatusOptions(enum.StrEnum):
STARTED = "STARTED" STARTED = "STARTED"
WORKING = "WORKING" WORKING = "WORKING"
SUCCESS = "SUCCESS" SUCCESS = "SUCCESS"

View File

@@ -4,6 +4,7 @@ import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any
from celery import shared_task from celery import shared_task
from celery import states from celery import states
@@ -32,12 +33,14 @@ from documents.file_handling import create_source_path_directory
from documents.file_handling import delete_empty_directories from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename from documents.file_handling import generate_filename
from documents.file_handling import generate_unique_filename from documents.file_handling import generate_unique_filename
from documents.models import Correspondent
from documents.models import CustomField from documents.models import CustomField
from documents.models import CustomFieldInstance from documents.models import CustomFieldInstance
from documents.models import Document from documents.models import Document
from documents.models import MatchingModel from documents.models import DocumentType
from documents.models import PaperlessTask from documents.models import PaperlessTask
from documents.models import SavedView from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag from documents.models import Tag
from documents.models import UiSettings from documents.models import UiSettings
from documents.models import Workflow from documents.models import Workflow
@@ -81,47 +84,41 @@ def add_inbox_tags(sender, document: Document, logging_group=None, **kwargs) ->
document.add_nested_tags(inbox_tags) document.add_nested_tags(inbox_tags)
def _suggestion_printer(
stdout,
style_func,
suggestion_type: str,
document: Document,
selected: MatchingModel,
base_url: str | None = None,
) -> None:
"""
Smaller helper to reduce duplication when just outputting suggestions to the console
"""
doc_str = str(document)
if base_url is not None:
stdout.write(style_func.SUCCESS(doc_str))
stdout.write(style_func.SUCCESS(f"{base_url}/documents/{document.pk}"))
else:
stdout.write(style_func.SUCCESS(f"{doc_str} [{document.pk}]"))
stdout.write(f"Suggest {suggestion_type}: {selected}")
def set_correspondent( def set_correspondent(
sender, sender: object,
document: Document, document: Document,
*, *,
logging_group=None, logging_group: object = None,
classifier: DocumentClassifier | None = None, classifier: DocumentClassifier | None = None,
replace=False, replace: bool = False,
use_first=True, use_first: bool = True,
suggest=False, dry_run: bool = False,
base_url=None, **kwargs: Any,
stdout=None, ) -> Correspondent | None:
style_func=None, """
**kwargs, Assign a correspondent to a document based on classifier results.
) -> None:
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing correspondent assignment.
use_first: If True, pick the first match when multiple correspondents
match. If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The correspondent that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.correspondent and not replace: if document.correspondent and not replace:
return return None
potential_correspondents = matching.match_correspondents(document, classifier) potential_correspondents = matching.match_correspondents(document, classifier)
potential_count = len(potential_correspondents) potential_count = len(potential_correspondents)
selected = potential_correspondents[0] if potential_correspondents else None selected = potential_correspondents[0] if potential_correspondents else None
if potential_count > 1: if potential_count > 1:
if use_first: if use_first:
logger.debug( logger.debug(
@@ -135,49 +132,53 @@ def set_correspondent(
f"not assigning any correspondent", f"not assigning any correspondent",
extra={"group": logging_group}, extra={"group": logging_group},
) )
return return None
if selected or replace: if (selected or replace) and not dry_run:
if suggest: logger.info(
_suggestion_printer( f"Assigning correspondent {selected} to {document}",
stdout, extra={"group": logging_group},
style_func, )
"correspondent", document.correspondent = selected
document, document.save(update_fields=("correspondent",))
selected,
base_url,
)
else:
logger.info(
f"Assigning correspondent {selected} to {document}",
extra={"group": logging_group},
)
document.correspondent = selected return selected
document.save(update_fields=("correspondent",))
def set_document_type( def set_document_type(
sender, sender: object,
document: Document, document: Document,
*, *,
logging_group=None, logging_group: object = None,
classifier: DocumentClassifier | None = None, classifier: DocumentClassifier | None = None,
replace=False, replace: bool = False,
use_first=True, use_first: bool = True,
suggest=False, dry_run: bool = False,
base_url=None, **kwargs: Any,
stdout=None, ) -> DocumentType | None:
style_func=None, """
**kwargs, Assign a document type to a document based on classifier results.
) -> None:
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing document type assignment.
use_first: If True, pick the first match when multiple types match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The document type that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.document_type and not replace: if document.document_type and not replace:
return return None
potential_document_type = matching.match_document_types(document, classifier) potential_document_types = matching.match_document_types(document, classifier)
potential_count = len(potential_document_types)
potential_count = len(potential_document_type) selected = potential_document_types[0] if potential_document_types else None
selected = potential_document_type[0] if potential_document_type else None
if potential_count > 1: if potential_count > 1:
if use_first: if use_first:
@@ -192,42 +193,64 @@ def set_document_type(
f"not assigning any document type", f"not assigning any document type",
extra={"group": logging_group}, extra={"group": logging_group},
) )
return return None
if selected or replace: if (selected or replace) and not dry_run:
if suggest: logger.info(
_suggestion_printer( f"Assigning document type {selected} to {document}",
stdout, extra={"group": logging_group},
style_func, )
"document type", document.document_type = selected
document, document.save(update_fields=("document_type",))
selected,
base_url,
)
else:
logger.info(
f"Assigning document type {selected} to {document}",
extra={"group": logging_group},
)
document.document_type = selected return selected
document.save(update_fields=("document_type",))
def set_tags( def set_tags(
sender, sender: object,
document: Document, document: Document,
*, *,
logging_group=None, logging_group: object = None,
classifier: DocumentClassifier | None = None, classifier: DocumentClassifier | None = None,
replace=False, replace: bool = False,
suggest=False, dry_run: bool = False,
base_url=None, **kwargs: Any,
stdout=None, ) -> tuple[set[Tag], set[Tag]]:
style_func=None, """
**kwargs, Assign tags to a document based on classifier results.
) -> None:
When replace=True, existing auto-matched and rule-matched tags are removed
before applying the new set (inbox tags and manually-added tags are preserved).
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, remove existing classifier-managed tags before applying
new ones. Inbox tags and manually-added tags are always preserved.
dry_run: If True, compute what would change without saving anything.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
A two-tuple of (tags_added, tags_removed). In non-replace mode,
tags_removed is always an empty set. In dry_run mode, neither set
is applied to the database.
"""
# Compute which tags would be removed under replace mode.
# The filter mirrors the .delete() call below: keep inbox tags and
# manually-added tags (match="" and not auto-matched).
if replace: if replace:
tags_to_remove: set[Tag] = set(
document.tags.exclude(
is_inbox_tag=True,
).exclude(
Q(match="") & ~Q(matching_algorithm=Tag.MATCH_AUTO),
),
)
else:
tags_to_remove = set()
if replace and not dry_run:
Document.tags.through.objects.filter(document=document).exclude( Document.tags.through.objects.filter(document=document).exclude(
Q(tag__is_inbox_tag=True), Q(tag__is_inbox_tag=True),
).exclude( ).exclude(
@@ -235,65 +258,53 @@ def set_tags(
).delete() ).delete()
current_tags = set(document.tags.all()) current_tags = set(document.tags.all())
matched_tags = matching.match_tags(document, classifier) matched_tags = matching.match_tags(document, classifier)
tags_to_add = set(matched_tags) - current_tags
relevant_tags = set(matched_tags) - current_tags if tags_to_add and not dry_run:
if suggest:
extra_tags = current_tags - set(matched_tags)
extra_tags = [
t for t in extra_tags if t.matching_algorithm == MatchingModel.MATCH_AUTO
]
if not relevant_tags and not extra_tags:
return
doc_str = style_func.SUCCESS(str(document))
if base_url:
stdout.write(doc_str)
stdout.write(f"{base_url}/documents/{document.pk}")
else:
stdout.write(doc_str + style_func.SUCCESS(f" [{document.pk}]"))
if relevant_tags:
stdout.write("Suggest tags: " + ", ".join([t.name for t in relevant_tags]))
if extra_tags:
stdout.write("Extra tags: " + ", ".join([t.name for t in extra_tags]))
else:
if not relevant_tags:
return
message = 'Tagging "{}" with "{}"'
logger.info( logger.info(
message.format(document, ", ".join([t.name for t in relevant_tags])), f'Tagging "{document}" with "{", ".join(t.name for t in tags_to_add)}"',
extra={"group": logging_group}, extra={"group": logging_group},
) )
document.add_nested_tags(tags_to_add)
document.add_nested_tags(relevant_tags) return tags_to_add, tags_to_remove
def set_storage_path( def set_storage_path(
sender, sender: object,
document: Document, document: Document,
*, *,
logging_group=None, logging_group: object = None,
classifier: DocumentClassifier | None = None, classifier: DocumentClassifier | None = None,
replace=False, replace: bool = False,
use_first=True, use_first: bool = True,
suggest=False, dry_run: bool = False,
base_url=None, **kwargs: Any,
stdout=None, ) -> StoragePath | None:
style_func=None, """
**kwargs, Assign a storage path to a document based on classifier results.
) -> None:
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing storage path assignment.
use_first: If True, pick the first match when multiple paths match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The storage path that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.storage_path and not replace: if document.storage_path and not replace:
return return None
potential_storage_path = matching.match_storage_paths( potential_storage_paths = matching.match_storage_paths(document, classifier)
document, potential_count = len(potential_storage_paths)
classifier, selected = potential_storage_paths[0] if potential_storage_paths else None
)
potential_count = len(potential_storage_path)
selected = potential_storage_path[0] if potential_storage_path else None
if potential_count > 1: if potential_count > 1:
if use_first: if use_first:
@@ -308,26 +319,17 @@ def set_storage_path(
f"not assigning any storage directory", f"not assigning any storage directory",
extra={"group": logging_group}, extra={"group": logging_group},
) )
return return None
if selected or replace: if (selected or replace) and not dry_run:
if suggest: logger.info(
_suggestion_printer( f"Assigning storage path {selected} to {document}",
stdout, extra={"group": logging_group},
style_func, )
"storage directory", document.storage_path = selected
document, document.save(update_fields=("storage_path",))
selected,
base_url,
)
else:
logger.info(
f"Assigning storage path {selected} to {document}",
extra={"group": logging_group},
)
document.storage_path = selected return selected
document.save(update_fields=("storage_path",))
# see empty_trash in documents/tasks.py for signal handling # see empty_trash in documents/tasks.py for signal handling

View File

@@ -114,3 +114,14 @@ def authenticated_rest_api_client(rest_api_client: APIClient):
user = UserModel.objects.create_user(username="testuser", password="password") user = UserModel.objects.create_user(username="testuser", password="password")
rest_api_client.force_authenticate(user=user) rest_api_client.force_authenticate(user=user)
yield rest_api_client yield rest_api_client
@pytest.fixture(scope="session", autouse=True)
def faker_session_locale():
"""Set Faker locale for reproducibility."""
return "en_US"
@pytest.fixture(scope="session", autouse=True)
def faker_seed():
return 12345

View File

@@ -24,7 +24,7 @@ def base_config() -> DateParserConfig:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="YMD", filename_date_order="YMD",
content_date_order="DMY", content_date_order="DMY",
@@ -45,7 +45,7 @@ def config_with_ignore_dates() -> DateParserConfig:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="DMY", filename_date_order="DMY",
content_date_order="MDY", content_date_order="MDY",

View File

@@ -101,50 +101,50 @@ class TestFilterDate:
[ [
# Valid Dates # Valid Dates
pytest.param( pytest.param(
datetime.datetime(2024, 1, 10, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 10, tzinfo=datetime.UTC),
datetime.datetime(2024, 1, 10, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 10, tzinfo=datetime.UTC),
id="valid_past_date", id="valid_past_date",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC),
datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC),
id="exactly_at_reference", id="exactly_at_reference",
), ),
pytest.param( pytest.param(
datetime.datetime(1901, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1901, 1, 1, tzinfo=datetime.UTC),
datetime.datetime(1901, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1901, 1, 1, tzinfo=datetime.UTC),
id="year_1901_valid", id="year_1901_valid",
), ),
# Date is > reference_time # Date is > reference_time
pytest.param( pytest.param(
datetime.datetime(2024, 1, 16, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 16, tzinfo=datetime.UTC),
None, None,
id="future_date_day_after", id="future_date_day_after",
), ),
# date.date() in ignore_dates # date.date() in ignore_dates
pytest.param( pytest.param(
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_midnight_jan1", id="ignored_date_midnight_jan1",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 1, 1, 10, 30, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 1, 10, 30, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_midday_jan1", id="ignored_date_midday_jan1",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 12, 25, 15, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 12, 25, 15, 0, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_dec25_future", id="ignored_date_dec25_future",
), ),
# date.year <= 1900 # date.year <= 1900
pytest.param( pytest.param(
datetime.datetime(1899, 12, 31, tzinfo=datetime.timezone.utc), datetime.datetime(1899, 12, 31, tzinfo=datetime.UTC),
None, None,
id="year_1899", id="year_1899",
), ),
pytest.param( pytest.param(
datetime.datetime(1900, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1900, 1, 1, tzinfo=datetime.UTC),
None, None,
id="year_1900_boundary", id="year_1900_boundary",
), ),
@@ -176,7 +176,7 @@ class TestFilterDate:
1, 1,
12, 12,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
another_ignored = datetime.datetime( another_ignored = datetime.datetime(
2024, 2024,
@@ -184,7 +184,7 @@ class TestFilterDate:
25, 25,
15, 15,
30, 30,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
allowed_date = datetime.datetime( allowed_date = datetime.datetime(
2024, 2024,
@@ -192,7 +192,7 @@ class TestFilterDate:
2, 2,
12, 12,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
assert parser._filter_date(ignored_date) is None assert parser._filter_date(ignored_date) is None
@@ -204,7 +204,7 @@ class TestFilterDate:
regex_parser: RegexDateParserPlugin, regex_parser: RegexDateParserPlugin,
) -> None: ) -> None:
"""Should work with timezone-aware datetimes.""" """Should work with timezone-aware datetimes."""
date_utc = datetime.datetime(2024, 1, 10, 12, 0, tzinfo=datetime.timezone.utc) date_utc = datetime.datetime(2024, 1, 10, 12, 0, tzinfo=datetime.UTC)
result = regex_parser._filter_date(date_utc) result = regex_parser._filter_date(date_utc)
@@ -221,8 +221,8 @@ class TestRegexDateParser:
"report-2023-12-25.txt", "report-2023-12-25.txt",
"Event recorded on 25/12/2022.", "Event recorded on 25/12/2022.",
[ [
datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC),
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
], ],
id="filename-y-m-d_and_content-d-m-y", id="filename-y-m-d_and_content-d-m-y",
), ),
@@ -230,8 +230,8 @@ class TestRegexDateParser:
"img_2023.01.02.jpg", "img_2023.01.02.jpg",
"Taken on 01/02/2023", "Taken on 01/02/2023",
[ [
datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC),
datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC),
], ],
id="ambiguous-dates-respect-orders", id="ambiguous-dates-respect-orders",
), ),
@@ -239,7 +239,7 @@ class TestRegexDateParser:
"notes.txt", "notes.txt",
"bad date 99/99/9999 and 25/12/2022", "bad date 99/99/9999 and 25/12/2022",
[ [
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
], ],
id="parse-exception-skips-bad-and-yields-good", id="parse-exception-skips-bad-and-yields-good",
), ),
@@ -275,24 +275,24 @@ class TestRegexDateParser:
or "2023.12.25" in date_string or "2023.12.25" in date_string
or "2023-12-25" in date_string or "2023-12-25" in date_string
): ):
return datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC)
# content DMY 25/12/2022 # content DMY 25/12/2022
if "25/12/2022" in date_string or "25-12-2022" in date_string: if "25/12/2022" in date_string or "25-12-2022" in date_string:
return datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC)
# filename YMD 2023.01.02 # filename YMD 2023.01.02
if "2023.01.02" in date_string or "2023-01-02" in date_string: if "2023.01.02" in date_string or "2023-01-02" in date_string:
return datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC)
# ambiguous 01/02/2023 -> respect DATE_ORDER setting # ambiguous 01/02/2023 -> respect DATE_ORDER setting
if "01/02/2023" in date_string: if "01/02/2023" in date_string:
if date_order == "DMY": if date_order == "DMY":
return datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC)
if date_order == "YMD": if date_order == "YMD":
return datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC)
# fallback # fallback
return datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC)
# simulate parse failure for malformed input # simulate parse failure for malformed input
if "99/99/9999" in date_string or "bad date" in date_string: if "99/99/9999" in date_string or "bad date" in date_string:
@@ -328,7 +328,7 @@ class TestRegexDateParser:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="YMD", filename_date_order="YMD",
content_date_order="DMY", content_date_order="DMY",
@@ -344,13 +344,13 @@ class TestRegexDateParser:
) -> datetime.datetime | None: ) -> datetime.datetime | None:
if "10/12/2023" in date_string or "10-12-2023" in date_string: if "10/12/2023" in date_string or "10-12-2023" in date_string:
# ignored date # ignored date
return datetime.datetime(2023, 12, 10, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 10, tzinfo=datetime.UTC)
if "01/02/2024" in date_string or "01-02-2024" in date_string: if "01/02/2024" in date_string or "01-02-2024" in date_string:
# future relative to reference_time -> filtered # future relative to reference_time -> filtered
return datetime.datetime(2024, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2024, 2, 1, tzinfo=datetime.UTC)
if "05/01/2023" in date_string or "05-01-2023" in date_string: if "05/01/2023" in date_string or "05-01-2023" in date_string:
# valid # valid
return datetime.datetime(2023, 1, 5, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 5, tzinfo=datetime.UTC)
return None return None
mocker.patch(target, side_effect=fake_parse) mocker.patch(target, side_effect=fake_parse)
@@ -358,7 +358,7 @@ class TestRegexDateParser:
content = "Ignored: 10/12/2023, Future: 01/02/2024, Keep: 05/01/2023" content = "Ignored: 10/12/2023, Future: 01/02/2024, Keep: 05/01/2023"
results = list(parser.parse("whatever.txt", content)) results = list(parser.parse("whatever.txt", content))
assert results == [datetime.datetime(2023, 1, 5, tzinfo=datetime.timezone.utc)] assert results == [datetime.datetime(2023, 1, 5, tzinfo=datetime.UTC)]
def test_parse_handles_no_matches_and_returns_empty_list( def test_parse_handles_no_matches_and_returns_empty_list(
self, self,
@@ -392,7 +392,7 @@ class TestRegexDateParser:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order=None, filename_date_order=None,
content_date_order="DMY", content_date_order="DMY",
@@ -409,9 +409,9 @@ class TestRegexDateParser:
) -> datetime.datetime | None: ) -> datetime.datetime | None:
# return distinct datetimes so we can tell which source was parsed # return distinct datetimes so we can tell which source was parsed
if "25/12/2022" in date_string: if "25/12/2022" in date_string:
return datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC)
if "2023-12-25" in date_string: if "2023-12-25" in date_string:
return datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC)
return None return None
mock = mocker.patch(target, side_effect=fake_parse) mock = mocker.patch(target, side_effect=fake_parse)
@@ -429,5 +429,5 @@ class TestRegexDateParser:
assert "25/12/2022" in called_date_string assert "25/12/2022" in called_date_string
# And the parser should have yielded the corresponding datetime # And the parser should have yielded the corresponding datetime
assert results == [ assert results == [
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
] ]

View File

@@ -1,17 +1,67 @@
from factory import Faker """
Factory-boy factories for documents app models.
"""
from __future__ import annotations
import factory
from factory.django import DjangoModelFactory from factory.django import DjangoModelFactory
from documents.models import Correspondent from documents.models import Correspondent
from documents.models import Document from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
class CorrespondentFactory(DjangoModelFactory): class CorrespondentFactory(DjangoModelFactory):
class Meta: class Meta:
model = Correspondent model = Correspondent
name = Faker("name") name = factory.Sequence(lambda n: f"{factory.Faker('company')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class DocumentTypeFactory(DjangoModelFactory):
class Meta:
model = DocumentType
name = factory.Sequence(lambda n: f"{factory.Faker('bs')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class TagFactory(DjangoModelFactory):
class Meta:
model = Tag
name = factory.Sequence(lambda n: f"{factory.Faker('word')} {n}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
is_inbox_tag = False
class StoragePathFactory(DjangoModelFactory):
class Meta:
model = StoragePath
name = factory.Sequence(
lambda n: f"{factory.Faker('file_path', depth=2, extension='')} {n}",
)
path = factory.LazyAttribute(lambda o: f"{o.name}/{{title}}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class DocumentFactory(DjangoModelFactory): class DocumentFactory(DjangoModelFactory):
class Meta: class Meta:
model = Document model = Document
title = factory.Faker("sentence", nb_words=4)
checksum = factory.Faker("md5")
content = factory.Faker("paragraph")
correspondent = None
document_type = None
storage_path = None

View File

@@ -336,7 +336,11 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
added=d1, added=d1,
) )
self.assertEqual(generate_filename(doc1), Path("1232-01-09.pdf")) # Account for 3.14 padding changes
expected_year: str = d1.strftime("%Y")
expected_filename: Path = Path(f"{expected_year}-01-09.pdf")
self.assertEqual(generate_filename(doc1), expected_filename)
doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1)) doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1))

View File

@@ -21,7 +21,7 @@ class TestDateLocalization:
14, 14,
30, 30,
5, 5,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
TEST_DATETIME_STRING: str = "2023-10-26T14:30:05+00:00" TEST_DATETIME_STRING: str = "2023-10-26T14:30:05+00:00"

View File

@@ -1,298 +1,442 @@
"""
Tests for the document_retagger management command.
"""
from __future__ import annotations
import pytest import pytest
from django.core.management import call_command from django.core.management import call_command
from django.core.management.base import CommandError from django.core.management.base import CommandError
from django.test import TestCase
from documents.models import Correspondent from documents.models import Correspondent
from documents.models import Document from documents.models import Document
from documents.models import DocumentType from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath from documents.models import StoragePath
from documents.models import Tag from documents.models import Tag
from documents.tests.factories import CorrespondentFactory
from documents.tests.factories import DocumentFactory
from documents.tests.factories import DocumentTypeFactory
from documents.tests.factories import StoragePathFactory
from documents.tests.factories import TagFactory
from documents.tests.utils import DirectoriesMixin from documents.tests.utils import DirectoriesMixin
# ---------------------------------------------------------------------------
# Module-level type aliases
# ---------------------------------------------------------------------------
StoragePathTuple = tuple[StoragePath, StoragePath, StoragePath]
TagTuple = tuple[Tag, Tag, Tag, Tag, Tag]
CorrespondentTuple = tuple[Correspondent, Correspondent]
DocumentTypeTuple = tuple[DocumentType, DocumentType]
DocumentTuple = tuple[Document, Document, Document, Document]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def storage_paths(db) -> StoragePathTuple:
"""Three storage paths with varying match rules."""
sp1 = StoragePathFactory(
path="{created_data}/{title}",
match="auto document",
matching_algorithm=MatchingModel.MATCH_LITERAL,
)
sp2 = StoragePathFactory(
path="{title}",
match="^first|^unrelated",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
sp3 = StoragePathFactory(
path="{title}",
match="^blah",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
return sp1, sp2, sp3
@pytest.fixture()
def tags(db) -> TagTuple:
"""Tags covering the common matching scenarios."""
tag_first = TagFactory(match="first", matching_algorithm=Tag.MATCH_ANY)
tag_second = TagFactory(match="second", matching_algorithm=Tag.MATCH_ANY)
tag_inbox = TagFactory(is_inbox_tag=True)
tag_no_match = TagFactory()
tag_auto = TagFactory(matching_algorithm=Tag.MATCH_AUTO)
return tag_first, tag_second, tag_inbox, tag_no_match, tag_auto
@pytest.fixture()
def correspondents(db) -> CorrespondentTuple:
"""Two correspondents matching 'first' and 'second' content."""
c_first = CorrespondentFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
c_second = CorrespondentFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return c_first, c_second
@pytest.fixture()
def document_types(db) -> DocumentTypeTuple:
"""Two document types matching 'first' and 'second' content."""
dt_first = DocumentTypeFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
dt_second = DocumentTypeFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return dt_first, dt_second
@pytest.fixture()
def documents(storage_paths: StoragePathTuple, tags: TagTuple) -> DocumentTuple:
"""Four documents with varied content used across most retagger tests."""
_, _, sp3 = storage_paths
_, _, tag_inbox, tag_no_match, tag_auto = tags
d1 = DocumentFactory(checksum="A", title="A", content="first document")
d2 = DocumentFactory(checksum="B", title="B", content="second document")
d3 = DocumentFactory(
checksum="C",
title="C",
content="unrelated document",
storage_path=sp3,
)
d4 = DocumentFactory(checksum="D", title="D", content="auto document")
d3.tags.add(tag_inbox, tag_no_match)
d4.tags.add(tag_auto)
return d1, d2, d3, d4
def _get_docs() -> DocumentTuple:
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
# ---------------------------------------------------------------------------
# Tag assignment
# ---------------------------------------------------------------------------
@pytest.mark.management @pytest.mark.management
class TestRetagger(DirectoriesMixin, TestCase): @pytest.mark.django_db
def make_models(self) -> None: class TestRetaggerTags(DirectoriesMixin):
self.sp1 = StoragePath.objects.create( @pytest.mark.usefixtures("documents")
name="dummy a", def test_add_tags(self, tags: TagTuple) -> None:
path="{created_data}/{title}", tag_first, tag_second, *_ = tags
match="auto document",
matching_algorithm=StoragePath.MATCH_LITERAL,
)
self.sp2 = StoragePath.objects.create(
name="dummy b",
path="{title}",
match="^first|^unrelated",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.sp3 = StoragePath.objects.create(
name="dummy c",
path="{title}",
match="^blah",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.d1 = Document.objects.create(
checksum="A",
title="A",
content="first document",
)
self.d2 = Document.objects.create(
checksum="B",
title="B",
content="second document",
)
self.d3 = Document.objects.create(
checksum="C",
title="C",
content="unrelated document",
storage_path=self.sp3,
)
self.d4 = Document.objects.create(
checksum="D",
title="D",
content="auto document",
)
self.tag_first = Tag.objects.create(
name="tag1",
match="first",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_second = Tag.objects.create(
name="tag2",
match="second",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_inbox = Tag.objects.create(name="test", is_inbox_tag=True)
self.tag_no_match = Tag.objects.create(name="test2")
self.tag_auto = Tag.objects.create(
name="tagauto",
matching_algorithm=Tag.MATCH_AUTO,
)
self.d3.tags.add(self.tag_inbox)
self.d3.tags.add(self.tag_no_match)
self.d4.tags.add(self.tag_auto)
self.correspondent_first = Correspondent.objects.create(
name="c1",
match="first",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.correspondent_second = Correspondent.objects.create(
name="c2",
match="second",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.doctype_first = DocumentType.objects.create(
name="dt1",
match="first",
matching_algorithm=DocumentType.MATCH_ANY,
)
self.doctype_second = DocumentType.objects.create(
name="dt2",
match="second",
matching_algorithm=DocumentType.MATCH_ANY,
)
def get_updated_docs(self):
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
def setUp(self) -> None:
super().setUp()
self.make_models()
def test_add_tags(self) -> None:
call_command("document_retagger", "--tags") call_command("document_retagger", "--tags")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs() d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertEqual(d_first.tags.count(), 1) assert d_first.tags.count() == 1
self.assertEqual(d_second.tags.count(), 1) assert d_second.tags.count() == 1
self.assertEqual(d_unrelated.tags.count(), 2) assert d_unrelated.tags.count() == 2
self.assertEqual(d_auto.tags.count(), 1) assert d_auto.tags.count() == 1
assert d_first.tags.first() == tag_first
assert d_second.tags.first() == tag_second
self.assertEqual(d_first.tags.first(), self.tag_first) def test_overwrite_removes_stale_tags_and_preserves_inbox(
self.assertEqual(d_second.tags.first(), self.tag_second) self,
documents: DocumentTuple,
def test_add_type(self) -> None: tags: TagTuple,
call_command("document_retagger", "--document_type") ) -> None:
d_first, d_second, _, _ = self.get_updated_docs() d1, *_ = documents
tag_first, tag_second, tag_inbox, tag_no_match, _ = tags
self.assertEqual(d_first.document_type, self.doctype_first) d1.tags.add(tag_second)
self.assertEqual(d_second.document_type, self.doctype_second)
def test_add_correspondent(self) -> None:
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertEqual(d_first.correspondent, self.correspondent_first)
self.assertEqual(d_second.correspondent, self.correspondent_second)
def test_overwrite_preserve_inbox(self) -> None:
self.d1.tags.add(self.tag_second)
call_command("document_retagger", "--tags", "--overwrite") call_command("document_retagger", "--tags", "--overwrite")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs() d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertIsNotNone(Tag.objects.get(id=self.tag_second.id)) assert Tag.objects.filter(id=tag_second.id).exists()
assert list(d_first.tags.values_list("id", flat=True)) == [tag_first.id]
assert list(d_second.tags.values_list("id", flat=True)) == [tag_second.id]
assert set(d_unrelated.tags.values_list("id", flat=True)) == {
tag_inbox.id,
tag_no_match.id,
}
assert d_auto.tags.count() == 0
self.assertCountEqual( @pytest.mark.usefixtures("documents")
[tag.id for tag in d_first.tags.all()], @pytest.mark.parametrize(
[self.tag_first.id], "extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_tags(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--tags", "--suggest", *extra_args)
d_first, d_second, _, d_auto = _get_docs()
assert d_first.tags.count() == 0
assert d_second.tags.count() == 0
assert d_auto.tags.count() == 1
# ---------------------------------------------------------------------------
# Document type assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerDocumentType(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_type(self, document_types: DocumentTypeTuple) -> None:
dt_first, dt_second = document_types
call_command("document_retagger", "--document_type")
d_first, d_second, _, _ = _get_docs()
assert d_first.document_type == dt_first
assert d_second.document_type == dt_second
@pytest.mark.usefixtures("documents", "document_types")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_document_type(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--document_type", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
assert d_first.document_type is None
assert d_second.document_type is None
@pytest.mark.parametrize(
("use_first_flag", "expects_assignment"),
[
pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
DocumentTypeFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
) )
self.assertCountEqual( DocumentTypeFactory(
[tag.id for tag in d_second.tags.all()], match="ambiguous",
[self.tag_second.id], matching_algorithm=MatchingModel.MATCH_ANY,
) )
self.assertCountEqual( doc = DocumentFactory(content="ambiguous content")
[tag.id for tag in d_unrelated.tags.all()],
[self.tag_inbox.id, self.tag_no_match.id], call_command("document_retagger", "--document_type", *use_first_flag)
doc.refresh_from_db()
assert (doc.document_type is not None) is expects_assignment
# ---------------------------------------------------------------------------
# Correspondent assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerCorrespondent(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_correspondent(self, correspondents: CorrespondentTuple) -> None:
c_first, c_second = correspondents
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = _get_docs()
assert d_first.correspondent == c_first
assert d_second.correspondent == c_second
@pytest.mark.usefixtures("documents", "correspondents")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_correspondent(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--correspondent", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
assert d_first.correspondent is None
assert d_second.correspondent is None
@pytest.mark.parametrize(
("use_first_flag", "expects_assignment"),
[
pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
CorrespondentFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
) )
self.assertEqual(d_auto.tags.count(), 0) CorrespondentFactory(
match="ambiguous",
def test_add_tags_suggest(self) -> None: matching_algorithm=MatchingModel.MATCH_ANY,
call_command("document_retagger", "--tags", "--suggest")
d_first, d_second, _, d_auto = self.get_updated_docs()
self.assertEqual(d_first.tags.count(), 0)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
def test_add_type_suggest(self) -> None:
call_command("document_retagger", "--document_type", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
def test_add_correspondent_suggest(self) -> None:
call_command("document_retagger", "--correspondent", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
def test_add_tags_suggest_url(self) -> None:
call_command(
"document_retagger",
"--tags",
"--suggest",
"--base-url=http://localhost",
) )
d_first, d_second, _, d_auto = self.get_updated_docs() doc = DocumentFactory(content="ambiguous content")
self.assertEqual(d_first.tags.count(), 0) call_command("document_retagger", "--correspondent", *use_first_flag)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
def test_add_type_suggest_url(self) -> None: doc.refresh_from_db()
call_command( assert (doc.correspondent is not None) is expects_assignment
"document_retagger",
"--document_type",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
def test_add_correspondent_suggest_url(self) -> None: # ---------------------------------------------------------------------------
call_command( # Storage path assignment
"document_retagger", # ---------------------------------------------------------------------------
"--correspondent",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
def test_add_storage_path(self) -> None: @pytest.mark.management
@pytest.mark.django_db
class TestRetaggerStoragePath(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_storage_path(self, storage_paths: StoragePathTuple) -> None:
""" """
GIVEN: GIVEN documents matching various storage path rules
- 2 storage paths with documents which match them WHEN document_retagger --storage_path is called
- 1 document which matches but has a storage path THEN matching documents get the correct path; existing path is unchanged
WHEN:
- document retagger is called
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch left unchanged
""" """
call_command( sp1, sp2, sp3 = storage_paths
"document_retagger", call_command("document_retagger", "--storage_path")
"--storage_path", d_first, d_second, d_unrelated, d_auto = _get_docs()
)
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
self.assertEqual(d_first.storage_path, self.sp2) assert d_first.storage_path == sp2
self.assertEqual(d_auto.storage_path, self.sp1) assert d_auto.storage_path == sp1
self.assertIsNone(d_second.storage_path) assert d_second.storage_path is None
self.assertEqual(d_unrelated.storage_path, self.sp3) assert d_unrelated.storage_path == sp3
def test_overwrite_storage_path(self) -> None: @pytest.mark.usefixtures("documents")
def test_overwrite_storage_path(self, storage_paths: StoragePathTuple) -> None:
""" """
GIVEN: GIVEN a document with an existing storage path that matches a different rule
- 2 storage paths with documents which match them WHEN document_retagger --storage_path --overwrite is called
- 1 document which matches but has a storage path THEN the existing path is replaced by the newly matched path
WHEN:
- document retagger is called with overwrite
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch overwritten
""" """
sp1, sp2, _ = storage_paths
call_command("document_retagger", "--storage_path", "--overwrite") call_command("document_retagger", "--storage_path", "--overwrite")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs() d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertEqual(d_first.storage_path, self.sp2) assert d_first.storage_path == sp2
self.assertEqual(d_auto.storage_path, self.sp1) assert d_auto.storage_path == sp1
self.assertIsNone(d_second.storage_path) assert d_second.storage_path is None
self.assertEqual(d_unrelated.storage_path, self.sp2) assert d_unrelated.storage_path == sp2
def test_id_range_parameter(self) -> None: @pytest.mark.parametrize(
commandOutput = "" ("use_first_flag", "expects_assignment"),
Document.objects.create( [
checksum="E", pytest.param(["--use-first"], True, id="use_first_assigns_first_match"),
title="E", pytest.param([], False, id="no_use_first_skips_ambiguous_match"),
content="NOT the first document", ],
)
def test_use_first_with_multiple_matches(
self,
use_first_flag: list[str],
*,
expects_assignment: bool,
) -> None:
StoragePathFactory(
match="ambiguous",
matching_algorithm=MatchingModel.MATCH_ANY,
) )
call_command("document_retagger", "--tags", "--id-range", "1", "2") StoragePathFactory(
# The retagger shouldn`t apply the 'first' tag to our new document match="ambiguous",
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 1) matching_algorithm=MatchingModel.MATCH_ANY,
)
doc = DocumentFactory(content="ambiguous content")
try: call_command("document_retagger", "--storage_path", *use_first_flag)
commandOutput = call_command("document_retagger", "--tags", "--id-range")
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "Error: argument --id-range: expected 2 arguments")
try: doc.refresh_from_db()
commandOutput = call_command( assert (doc.storage_path is not None) is expects_assignment
"document_retagger",
"--tags",
"--id-range",
"a",
"b",
)
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "error: argument --id-range: invalid int value:")
call_command("document_retagger", "--tags", "--id-range", "1", "9999")
# Now we should have 2 documents # ---------------------------------------------------------------------------
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 2) # ID range filtering
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerIdRange(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
("id_range_args", "expected_count"),
[
pytest.param(["1", "2"], 1, id="narrow_range_limits_scope"),
pytest.param(["1", "9999"], 2, id="wide_range_tags_all_matches"),
],
)
def test_id_range_limits_scope(
self,
tags: TagTuple,
id_range_args: list[str],
expected_count: int,
) -> None:
DocumentFactory(content="NOT the first document")
call_command("document_retagger", "--tags", "--id-range", *id_range_args)
tag_first, *_ = tags
assert Document.objects.filter(tags__id=tag_first.id).count() == expected_count
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
"args",
[
pytest.param(["--tags", "--id-range"], id="missing_both_values"),
pytest.param(["--tags", "--id-range", "a", "b"], id="non_integer_values"),
],
)
def test_id_range_invalid_arguments_raise(self, args: list[str]) -> None:
with pytest.raises((CommandError, SystemExit)):
call_command("document_retagger", *args)
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerEdgeCases(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_no_targets_exits_cleanly(self) -> None:
"""Calling the retagger with no classifier targets should not raise."""
call_command("document_retagger")
@pytest.mark.usefixtures("documents")
def test_inbox_only_skips_non_inbox_documents(self) -> None:
"""--inbox-only must restrict processing to documents with an inbox tag."""
call_command("document_retagger", "--tags", "--inbox-only")
d_first, _, d_unrelated, _ = _get_docs()
assert d_first.tags.count() == 0
assert d_unrelated.tags.count() == 2

View File

@@ -4666,7 +4666,7 @@ class TestDateWorkflowLocalization(
14, 14,
30, 30,
5, 5,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import StrEnum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
@@ -11,7 +11,7 @@ if TYPE_CHECKING:
from django.http import HttpRequest from django.http import HttpRequest
class VersionResolutionError(str, Enum): class VersionResolutionError(StrEnum):
INVALID = "invalid" INVALID = "invalid"
NOT_FOUND = "not_found" NOT_FOUND = "not_found"

745
uv.lock generated

File diff suppressed because it is too large Load Diff