Compare commits

..

2 Commits

Author SHA1 Message Date
Trenton H
c92abb8bb7 Handles the linter updates now 2026-02-27 15:06:10 -08:00
Trenton H
d8c01c78e6 Working to improve the retagger output 2026-02-27 14:57:10 -08:00
32 changed files with 1194 additions and 1968 deletions

View File

@@ -39,6 +39,3 @@ max_line_length = off
[Dockerfile*]
indent_style = space
[*.toml]
indent_style = space

View File

@@ -62,10 +62,6 @@ copies you created in the steps above.
## Updating Paperless {#updating}
!!! warning
Please review the [migration instructions](migration-v3.md) before upgrading Paperless-ngx to v3.0, it includes some breaking changes that require manual intervention before upgrading.
### Docker Route {#docker-updating}
If a new release of paperless-ngx is available, upgrading depends on how

View File

@@ -51,172 +51,137 @@ matcher.
### Database
By default, Paperless uses **SQLite** with a database stored at `data/db.sqlite3`.
For multi-user or higher-throughput deployments, **PostgreSQL** (recommended) or
**MariaDB** can be used instead by setting [`PAPERLESS_DBENGINE`](#PAPERLESS_DBENGINE)
and the relevant connection variables.
#### [`PAPERLESS_DBENGINE=<engine>`](#PAPERLESS_DBENGINE) {#PAPERLESS_DBENGINE}
: Specifies the database engine to use. Accepted values are `sqlite`, `postgresql`,
and `mariadb`.
Defaults to `sqlite` if not set.
PostgreSQL and MariaDB both require [`PAPERLESS_DBHOST`](#PAPERLESS_DBHOST) to be
set. SQLite does not use any other connection variables; the database file is always
located at `<PAPERLESS_DATA_DIR>/db.sqlite3`.
!!! warning
Using MariaDB comes with some caveats.
See [MySQL Caveats](advanced_usage.md#mysql-caveats).
To switch to **PostgreSQL** or **MariaDB**, set [`PAPERLESS_DBHOST`](#PAPERLESS_DBHOST) and optionally configure other
database-related environment variables.
#### [`PAPERLESS_DBHOST=<hostname>`](#PAPERLESS_DBHOST) {#PAPERLESS_DBHOST}
: Hostname of the PostgreSQL or MariaDB database server. Required when
`PAPERLESS_DBENGINE` is `postgresql` or `mariadb`.
: If unset, Paperless uses **SQLite** by default.
Set `PAPERLESS_DBHOST` to switch to PostgreSQL or MariaDB instead.
#### [`PAPERLESS_DBENGINE=<engine_name>`](#PAPERLESS_DBENGINE) {#PAPERLESS_DBENGINE}
: Optional. Specifies the database engine to use when connecting to a remote database.
Available options are `postgresql` and `mariadb`.
Defaults to `postgresql` if `PAPERLESS_DBHOST` is set.
!!! warning
Using MariaDB comes with some caveats. See [MySQL Caveats](advanced_usage.md#mysql-caveats).
#### [`PAPERLESS_DBPORT=<port>`](#PAPERLESS_DBPORT) {#PAPERLESS_DBPORT}
: Port to use when connecting to PostgreSQL or MariaDB.
Defaults to `5432` for PostgreSQL and `3306` for MariaDB.
Default is `5432` for PostgreSQL and `3306` for MariaDB.
#### [`PAPERLESS_DBNAME=<name>`](#PAPERLESS_DBNAME) {#PAPERLESS_DBNAME}
: Name of the PostgreSQL or MariaDB database to connect to.
: Name of the database to connect to when using PostgreSQL or MariaDB.
Defaults to `paperless`.
Defaults to "paperless".
#### [`PAPERLESS_DBUSER=<user>`](#PAPERLESS_DBUSER) {#PAPERLESS_DBUSER}
#### [`PAPERLESS_DBUSER=<name>`](#PAPERLESS_DBUSER) {#PAPERLESS_DBUSER}
: Username for authenticating with the PostgreSQL or MariaDB database.
Defaults to `paperless`.
Defaults to "paperless".
#### [`PAPERLESS_DBPASS=<password>`](#PAPERLESS_DBPASS) {#PAPERLESS_DBPASS}
: Password for the PostgreSQL or MariaDB database user.
Defaults to `paperless`.
Defaults to "paperless".
#### [`PAPERLESS_DB_OPTIONS=<options>`](#PAPERLESS_DB_OPTIONS) {#PAPERLESS_DB_OPTIONS}
#### [`PAPERLESS_DBSSLMODE=<mode>`](#PAPERLESS_DBSSLMODE) {#PAPERLESS_DBSSLMODE}
: Advanced database connection options as a semicolon-delimited key-value string.
Keys and values are separated by `=`. Dot-notation produces nested option
dictionaries; for example, `pool.max_size=20` sets
`OPTIONS["pool"]["max_size"] = 20`.
: SSL mode to use when connecting to PostgreSQL or MariaDB.
Options specified here are merged over the engine defaults. Unrecognised keys
are passed through to the underlying database driver without validation, so a
typo will be silently ignored rather than producing an error.
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
Refer to your database driver's documentation for the full set of accepted keys:
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode).
- PostgreSQL: [libpq connection parameters](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS)
- MariaDB: [MariaDB Connector/Python](https://mariadb.com/kb/en/mariadb-connector-python/)
- SQLite: [SQLite PRAGMA statements](https://www.sqlite.org/pragma.html)
*Note*: SSL mode values differ between PostgreSQL and MariaDB.
!!! note "PostgreSQL connection pooling"
Default is `prefer` for PostgreSQL and `PREFERRED` for MariaDB.
Pool size is controlled via `pool.min_size` and `pool.max_size`. When
configuring pooling, ensure your PostgreSQL `max_connections` is large enough
to handle all pool connections across all workers:
`(web_workers + celery_workers) * pool.max_size + safety_margin`.
#### [`PAPERLESS_DBSSLROOTCERT=<ca-path>`](#PAPERLESS_DBSSLROOTCERT) {#PAPERLESS_DBSSLROOTCERT}
**Examples:**
: Path to the SSL root certificate used to verify the database server.
```bash title="PostgreSQL: require SSL, set a custom CA certificate, and limit the pool size"
PAPERLESS_DB_OPTIONS="sslmode=require;sslrootcert=/certs/ca.pem;pool.max_size=5"
```
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
Changes the location of `root.crt`.
```bash title="MariaDB: require SSL with a custom CA certificate"
PAPERLESS_DB_OPTIONS="ssl_mode=REQUIRED;ssl.ca=/certs/ca.pem"
```
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-ca).
```bash title="SQLite: set a busy timeout of 30 seconds"
# PostgreSQL: set a connection timeout
PAPERLESS_DB_OPTIONS="connect_timeout=10"
```
Defaults to unset, using the standard location in the home directory.
#### ~~[`PAPERLESS_DBSSLMODE`](#PAPERLESS_DBSSLMODE)~~ {#PAPERLESS_DBSSLMODE}
#### [`PAPERLESS_DBSSLCERT=<client-cert-path>`](#PAPERLESS_DBSSLCERT) {#PAPERLESS_DBSSLCERT}
!!! failure "Removed in v3"
: Path to the client SSL certificate used when connecting securely.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslmode=require"
```
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-cert).
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl_mode=REQUIRED"
```
Changes the location of `postgresql.crt`.
#### ~~[`PAPERLESS_DBSSLROOTCERT`](#PAPERLESS_DBSSLROOTCERT)~~ {#PAPERLESS_DBSSLROOTCERT}
Defaults to unset, using the standard location in the home directory.
!!! failure "Removed in v3"
#### [`PAPERLESS_DBSSLKEY=<client-cert-key>`](#PAPERLESS_DBSSLKEY) {#PAPERLESS_DBSSLKEY}
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
: Path to the client SSL private key used when connecting securely.
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslrootcert=/path/to/ca.pem"
```
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.ca=/path/to/ca.pem"
```
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-key).
#### ~~[`PAPERLESS_DBSSLCERT`](#PAPERLESS_DBSSLCERT)~~ {#PAPERLESS_DBSSLCERT}
Changes the location of `postgresql.key`.
!!! failure "Removed in v3"
Defaults to unset, using the standard location in the home directory.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
#### [`PAPERLESS_DB_TIMEOUT=<int>`](#PAPERLESS_DB_TIMEOUT) {#PAPERLESS_DB_TIMEOUT}
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslcert=/path/to/client.crt"
```
: Sets how long a database connection should wait before timing out.
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.cert=/path/to/client.crt"
```
For SQLite, this sets how long to wait if the database is locked.
For PostgreSQL or MariaDB, this sets the connection timeout.
#### ~~[`PAPERLESS_DBSSLKEY`](#PAPERLESS_DBSSLKEY)~~ {#PAPERLESS_DBSSLKEY}
Defaults to unset, which uses Djangos built-in defaults.
!!! failure "Removed in v3"
#### [`PAPERLESS_DB_POOLSIZE=<int>`](#PAPERLESS_DB_POOLSIZE) {#PAPERLESS_DB_POOLSIZE}
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
: Defines the maximum number of database connections to keep in the pool.
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslkey=/path/to/client.key"
```
Only applies to PostgreSQL. This setting is ignored for other database engines.
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.key=/path/to/client.key"
```
The value must be greater than or equal to 1 to be used.
Defaults to unset, which disables connection pooling.
#### ~~[`PAPERLESS_DB_TIMEOUT`](#PAPERLESS_DB_TIMEOUT)~~ {#PAPERLESS_DB_TIMEOUT}
!!! note
!!! failure "Removed in v3"
A pool of 8-10 connections per worker is typically sufficient.
If you encounter error messages such as `couldn't get a connection`
or database connection timeouts, you probably need to increase the pool size.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
!!! warning
Make sure your PostgreSQL `max_connections` setting is large enough to handle the connection pools:
`(NB_PAPERLESS_WORKERS + NB_CELERY_WORKERS) × POOL_SIZE + SAFETY_MARGIN`. For example, with
4 Paperless workers and 2 Celery workers, and a pool size of 8:``(4 + 2) × 8 + 10 = 58`,
so `max_connections = 60` (or even more) is appropriate.
```bash title="SQLite"
PAPERLESS_DB_OPTIONS="timeout=30"
```
```bash title="PostgreSQL or MariaDB"
PAPERLESS_DB_OPTIONS="connect_timeout=30"
```
#### ~~[`PAPERLESS_DB_POOLSIZE`](#PAPERLESS_DB_POOLSIZE)~~ {#PAPERLESS_DB_POOLSIZE}
!!! failure "Removed in v3"
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
```bash
PAPERLESS_DB_OPTIONS="pool.max_size=10"
```
This assumes only Paperless-ngx connects to your PostgreSQL instance. If you have other applications,
you should increase `max_connections` accordingly.
#### [`PAPERLESS_DB_READ_CACHE_ENABLED=<bool>`](#PAPERLESS_DB_READ_CACHE_ENABLED) {#PAPERLESS_DB_READ_CACHE_ENABLED}

View File

@@ -48,58 +48,3 @@ The `CONSUMER_BARCODE_SCANNER` setting has been removed. zxing-cpp is now the on
reliability.
- The `libzbar0` / `libzbar-dev` system packages are no longer required and can be removed from any custom Docker
images or host installations.
## Database Engine
`PAPERLESS_DBENGINE` is now required to use PostgreSQL or MariaDB. Previously, the
engine was inferred from the presence of `PAPERLESS_DBHOST`, with `PAPERLESS_DBENGINE`
only needed to select MariaDB over PostgreSQL.
SQLite users require no changes, though they may explicitly set their engine if desired.
#### Action Required
PostgreSQL and MariaDB users must add `PAPERLESS_DBENGINE` to their environment:
```yaml
# v2 (PostgreSQL inferred from PAPERLESS_DBHOST)
PAPERLESS_DBHOST: postgres
# v3 (engine must be explicit)
PAPERLESS_DBENGINE: postgresql
PAPERLESS_DBHOST: postgres
```
See [`PAPERLESS_DBENGINE`](configuration.md#PAPERLESS_DBENGINE) for accepted values.
## Database Advanced Options
The individual SSL, timeout, and pooling variables have been removed in favor of a
single [`PAPERLESS_DB_OPTIONS`](configuration.md#PAPERLESS_DB_OPTIONS) string. This
consolidates a growing set of engine-specific variables into one place, and allows
any option supported by the underlying database driver to be set without requiring a
dedicated environment variable for each.
The removed variables and their replacements are:
| Removed Variable | Replacement in `PAPERLESS_DB_OPTIONS` |
| ------------------------- | ---------------------------------------------------------------------------- |
| `PAPERLESS_DBSSLMODE` | `sslmode=<value>` (PostgreSQL) or `ssl_mode=<value>` (MariaDB) |
| `PAPERLESS_DBSSLROOTCERT` | `sslrootcert=<path>` (PostgreSQL) or `ssl.ca=<path>` (MariaDB) |
| `PAPERLESS_DBSSLCERT` | `sslcert=<path>` (PostgreSQL) or `ssl.cert=<path>` (MariaDB) |
| `PAPERLESS_DBSSLKEY` | `sslkey=<path>` (PostgreSQL) or `ssl.key=<path>` (MariaDB) |
| `PAPERLESS_DB_POOLSIZE` | `pool.max_size=<value>` (PostgreSQL only) |
| `PAPERLESS_DB_TIMEOUT` | `timeout=<value>` (SQLite) or `connect_timeout=<value>` (PostgreSQL/MariaDB) |
The deprecated variables will continue to function for now but will be removed in a
future release. A deprecation warning is logged at startup for each deprecated variable
that is still set.
#### Action Required
Users with any of the deprecated variables set should migrate to `PAPERLESS_DB_OPTIONS`.
Multiple options are combined in a single value:
```bash
PAPERLESS_DB_OPTIONS="sslmode=require;sslrootcert=/certs/ca.pem;pool.max_size=10"
```

View File

@@ -504,7 +504,8 @@ installation. Keep these points in mind:
- Read the [changelog](changelog.md) and
take note of breaking changes.
- Decide whether to stay on SQLite or migrate to PostgreSQL.
Both work fine with
See [documentation](#sqlite_to_psql) for details on moving data
from SQLite to PostgreSQL. Both work fine with
Paperless. However, if you already have a database server running
for other services, you might as well use it for Paperless as well.
- The task scheduler of Paperless, which is used to execute periodic

View File

@@ -57,7 +57,7 @@
}
</div>
@for (version of versions; track version.id) {
<div class="dropdown-item border-top px-0" [class.pe-3]="versions.length === 1">
<div class="dropdown-item border-top px-0">
<div class="d-flex align-items-center w-100 py-2 version-item">
<div class="btn btn-link link-underline link-underline-opacity-0 d-flex align-items-center small text-start p-0 version-link"
(click)="selectVersion(version.id)"
@@ -88,7 +88,7 @@
@if (version.version_label) {
{{ version.version_label }}
} @else {
<ng-container i18n>Version</ng-container>&nbsp;{{ versions.length - $index }}&nbsp;<span class="text-muted small">(#{{ version.id }})</span>
<span i18n>Version</span>&nbsp;#{{ version.id }}
}
</span>
}

View File

@@ -7,6 +7,7 @@ Provides automatic progress bar and multiprocessing support with minimal boilerp
from __future__ import annotations
import os
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Sized
from concurrent.futures import ProcessPoolExecutor
@@ -23,6 +24,9 @@ from django.core.management import CommandError
from django.db.models import QuerySet
from django_rich.management import RichCommand
from rich.console import Console
from rich.console import Group
from rich.console import RenderableType
from rich.live import Live
from rich.progress import BarColumn
from rich.progress import MofNCompleteColumn
from rich.progress import Progress
@@ -32,9 +36,7 @@ from rich.progress import TimeElapsedColumn
from rich.progress import TimeRemainingColumn
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterable
from collections.abc import Sequence
from django.core.management import CommandParser
@@ -91,6 +93,23 @@ class PaperlessCommand(RichCommand):
for result in self.process_parallel(process_doc, ids):
if result.error:
self.console.print(f"[red]Failed: {result.error}[/red]")
class Command(PaperlessCommand):
help = "Import documents with live stats"
def handle(self, *args, **options):
stats = ImportStats()
def render_stats() -> Table:
... # build Rich Table from stats
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
result = import_item(item)
stats.imported += 1
"""
supports_progress_bar: ClassVar[bool] = True
@@ -128,13 +147,11 @@ class PaperlessCommand(RichCommand):
This is called by Django's command infrastructure after argument parsing
but before handle(). We use it to set instance attributes from options.
"""
# Set progress bar state
if self.supports_progress_bar:
self.no_progress_bar = options.get("no_progress_bar", False)
else:
self.no_progress_bar = True
# Set multiprocessing state
if self.supports_multiprocessing:
self.process_count = options.get("processes", 1)
if self.process_count < 1:
@@ -144,9 +161,29 @@ class PaperlessCommand(RichCommand):
return super().execute(*args, **options)
@staticmethod
def _progress_columns() -> tuple[Any, ...]:
"""
Return the standard set of progress bar columns.
Extracted so both _create_progress (standalone) and track_with_stats
(inside Live) use identical column configuration without duplication.
"""
return (
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
)
def _create_progress(self, description: str) -> Progress:
"""
Create a configured Progress instance.
Create a standalone Progress instance with its own stderr Console.
Use this for track(). For track_with_stats(), Progress is created
directly inside a Live context instead.
Progress output is directed to stderr to match the convention that
progress bars are transient UI feedback, not command output. This
@@ -161,12 +198,7 @@ class PaperlessCommand(RichCommand):
A Progress instance configured with appropriate columns.
"""
return Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
*self._progress_columns(),
console=Console(stderr=True),
transient=False,
)
@@ -222,7 +254,6 @@ class PaperlessCommand(RichCommand):
yield from iterable
return
# Attempt to determine total if not provided
if total is None:
total = self._get_iterable_length(iterable)
@@ -232,6 +263,87 @@ class PaperlessCommand(RichCommand):
yield item
progress.advance(task_id)
def track_with_stats(
self,
iterable: Iterable[T],
*,
description: str = "Processing...",
stats_renderer: Callable[[], RenderableType],
total: int | None = None,
) -> Generator[T, None, None]:
"""
Iterate over items with a progress bar and a live-updating stats display.
The progress bar and stats renderable are combined in a single Live
context, so the stats panel re-renders in place below the progress bar
after each item is processed.
Respects --no-progress-bar flag. When disabled, yields items without
any display (stats are still updated by the caller's loop body, so
they will be accurate for any post-loop summary the caller prints).
Args:
iterable: The items to iterate over.
description: Text to display alongside the progress bar.
stats_renderer: Zero-argument callable that returns a Rich
renderable. Called after each item to refresh the display.
The caller typically closes over a mutable dataclass and
rebuilds a Table from it on each call.
total: Total number of items. If None, attempts to determine
automatically via .count() (for querysets) or len().
Yields:
Items from the iterable.
Example:
@dataclass
class Stats:
processed: int = 0
failed: int = 0
stats = Stats()
def render_stats() -> Table:
table = Table(box=None)
table.add_column("Processed")
table.add_column("Failed")
table.add_row(str(stats.processed), str(stats.failed))
return table
for item in self.track_with_stats(
items,
description="Importing...",
stats_renderer=render_stats,
):
try:
import_item(item)
stats.processed += 1
except Exception:
stats.failed += 1
"""
if self.no_progress_bar:
yield from iterable
return
if total is None:
total = self._get_iterable_length(iterable)
stderr_console = Console(stderr=True)
# Progress is created without its own console so Live controls rendering.
progress = Progress(*self._progress_columns())
task_id = progress.add_task(description, total=total)
with Live(
Group(progress, stats_renderer()),
console=stderr_console,
refresh_per_second=4,
) as live:
for item in iterable:
yield item
progress.advance(task_id)
live.update(Group(progress, stats_renderer()))
def process_parallel(
self,
fn: Callable[[T], R],
@@ -269,10 +381,8 @@ class PaperlessCommand(RichCommand):
total = len(items)
if self.process_count == 1:
# Sequential execution in main process - critical for testing
yield from self._process_sequential(fn, items, description, total)
else:
# Parallel execution with ProcessPoolExecutor
yield from self._process_parallel(fn, items, description, total)
def _process_sequential(
@@ -298,17 +408,14 @@ class PaperlessCommand(RichCommand):
total: int,
) -> Generator[ProcessResult[T, R], None, None]:
"""Process items in parallel using ProcessPoolExecutor."""
# Close database connections before forking - required for PostgreSQL
db.connections.close_all()
with self._create_progress(description) as progress:
task_id = progress.add_task(description, total=total)
with ProcessPoolExecutor(max_workers=self.process_count) as executor:
# Submit all tasks and map futures back to items
future_to_item = {executor.submit(fn, item): item for item in items}
# Yield results as they complete
for future in as_completed(future_to_item):
item = future_to_item[future]
try:

View File

@@ -1,4 +1,12 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from dataclasses import field
from typing import TYPE_CHECKING
from rich.table import Table
from rich.text import Text
from documents.classifier import load_classifier
from documents.management.commands.base import PaperlessCommand
@@ -8,9 +16,162 @@ from documents.signals.handlers import set_document_type
from documents.signals.handlers import set_storage_path
from documents.signals.handlers import set_tags
if TYPE_CHECKING:
from rich.console import RenderableType
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
logger = logging.getLogger("paperless.management.retagger")
@dataclass(slots=True)
class RetaggerStats:
"""Cumulative counters updated as the retagger processes documents.
Mutable by design -- fields are incremented in the processing loop.
slots=True reduces per-instance memory overhead and speeds attribute access.
"""
correspondents: int = 0
document_types: int = 0
tags_added: int = 0
tags_removed: int = 0
storage_paths: int = 0
documents_processed: int = 0
@dataclass(slots=True)
class DocumentSuggestion:
"""Buffered classifier suggestions for a single document (suggest mode only).
Mutable by design -- fields are assigned incrementally as each setter runs.
"""
document: Document
correspondent: Correspondent | None = None
document_type: DocumentType | None = None
tags_to_add: frozenset[Tag] = field(default_factory=frozenset)
tags_to_remove: frozenset[Tag] = field(default_factory=frozenset)
storage_path: StoragePath | None = None
@property
def has_suggestions(self) -> bool:
return bool(
self.correspondent is not None
or self.document_type is not None
or self.tags_to_add
or self.tags_to_remove
or self.storage_path is not None,
)
def _build_stats_table(stats: RetaggerStats, *, suggest: bool) -> Table:
"""
Build the live-updating stats table shown below the progress bar.
In suggest mode the labels read "would set / would add" to make clear
that nothing has been written to the database.
"""
table = Table(box=None, padding=(0, 2), show_header=True, header_style="bold")
table.add_column("Documents")
table.add_column("Correspondents")
table.add_column("Doc Types")
table.add_column("Tags (+)")
table.add_column("Tags (-)")
table.add_column("Storage Paths")
verb = "would set" if suggest else "set"
table.add_row(
str(stats.documents_processed),
f"{stats.correspondents} {verb}",
f"{stats.document_types} {verb}",
f"+{stats.tags_added}",
f"-{stats.tags_removed}",
f"{stats.storage_paths} {verb}",
)
return table
def _build_suggestion_table(
suggestions: list[DocumentSuggestion],
base_url: str | None,
) -> Table:
"""
Build the final suggestion table printed after the progress bar completes.
Only documents with at least one suggestion are included.
"""
table = Table(
title="Suggested Changes",
show_header=True,
header_style="bold cyan",
show_lines=True,
)
table.add_column("Document", style="bold", no_wrap=False, min_width=20)
table.add_column("Correspondent")
table.add_column("Doc Type")
table.add_column("Tags")
table.add_column("Storage Path")
for suggestion in suggestions:
if not suggestion.has_suggestions:
continue
doc = suggestion.document
if base_url:
doc_cell = Text()
doc_cell.append(str(doc))
doc_cell.append(f"\n{base_url}/documents/{doc.pk}", style="dim")
else:
doc_cell = Text(f"{doc} [{doc.pk}]")
tag_parts: list[str] = []
for tag in sorted(suggestion.tags_to_add, key=lambda t: t.name):
tag_parts.append(f"[green]+{tag.name}[/green]")
for tag in sorted(suggestion.tags_to_remove, key=lambda t: t.name):
tag_parts.append(f"[red]-{tag.name}[/red]")
tag_cell = Text.from_markup(", ".join(tag_parts)) if tag_parts else Text("-")
table.add_row(
doc_cell,
str(suggestion.correspondent) if suggestion.correspondent else "-",
str(suggestion.document_type) if suggestion.document_type else "-",
tag_cell,
str(suggestion.storage_path) if suggestion.storage_path else "-",
)
return table
def _build_summary_table(stats: RetaggerStats) -> Table:
"""Build the final applied-changes summary table."""
table = Table(
title="Retagger Summary",
show_header=True,
header_style="bold cyan",
)
table.add_column("Metric", style="bold")
table.add_column("Count", justify="right")
table.add_row("Documents processed", str(stats.documents_processed))
table.add_row("Correspondents set", str(stats.correspondents))
table.add_row("Document types set", str(stats.document_types))
table.add_row("Tags added", str(stats.tags_added))
table.add_row("Tags removed", str(stats.tags_removed))
table.add_row("Storage paths set", str(stats.storage_paths))
return table
class Command(PaperlessCommand):
help = (
"Using the current classification model, assigns correspondents, tags "
@@ -19,7 +180,7 @@ class Command(PaperlessCommand):
"modified) after their initial import."
)
def add_arguments(self, parser):
def add_arguments(self, parser) -> None:
super().add_arguments(parser)
parser.add_argument("-c", "--correspondent", default=False, action="store_true")
parser.add_argument("-T", "--tags", default=False, action="store_true")
@@ -31,9 +192,9 @@ class Command(PaperlessCommand):
default=False,
action="store_true",
help=(
"By default this command won't try to assign a correspondent "
"if more than one matches the document. Use this flag if "
"you'd rather it just pick the first one it finds."
"By default this command will not try to assign a correspondent "
"if more than one matches the document. Use this flag to pick "
"the first match instead."
),
)
parser.add_argument(
@@ -42,91 +203,133 @@ class Command(PaperlessCommand):
default=False,
action="store_true",
help=(
"If set, the document retagger will overwrite any previously "
"set correspondent, document and remove correspondents, types "
"and tags that do not match anymore due to changed rules."
"Overwrite any previously set correspondent, document type, and "
"remove tags that no longer match due to changed rules."
),
)
parser.add_argument(
"--suggest",
default=False,
action="store_true",
help="Return the suggestion, don't change anything.",
help="Show what would be changed without applying anything.",
)
parser.add_argument(
"--base-url",
help="The base URL to use to build the link to the documents.",
help="Base URL used to build document links in suggest output.",
)
parser.add_argument(
"--id-range",
help="A range of document ids on which the retagging should be applied.",
help="Restrict retagging to documents within this ID range (inclusive).",
nargs=2,
type=int,
)
def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
suggest: bool = options["suggest"]
overwrite: bool = options["overwrite"]
use_first: bool = options["use_first"]
base_url: str | None = options["base_url"]
do_correspondent: bool = options["correspondent"]
do_document_type: bool = options["document_type"]
do_tags: bool = options["tags"]
do_storage_path: bool = options["storage_path"]
if not any([do_correspondent, do_document_type, do_tags, do_storage_path]):
self.console.print(
"[yellow]No classifier targets specified. "
"Use -c, -T, -t, or -s to select what to retag.[/yellow]",
)
return
if options["inbox_only"]:
queryset = Document.objects.filter(tags__is_inbox_tag=True)
else:
queryset = Document.objects.all()
if options["id_range"]:
queryset = queryset.filter(
id__range=(options["id_range"][0], options["id_range"][1]),
)
lo, hi = options["id_range"]
queryset = queryset.filter(id__range=(lo, hi))
documents = queryset.distinct()
classifier = load_classifier()
for document in self.track(documents, description="Retagging..."):
if options["correspondent"]:
set_correspondent(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
stats = RetaggerStats()
suggestions: list[DocumentSuggestion] = []
if options["document_type"]:
set_document_type(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
def render_stats() -> RenderableType:
return _build_stats_table(stats, suggest=suggest)
if options["tags"]:
set_tags(
sender=None,
document=document,
classifier=classifier,
replace=options["overwrite"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
)
for document in self.track_with_stats(
documents,
description="Retagging...",
stats_renderer=render_stats,
):
suggestion = DocumentSuggestion(document=document)
if options["storage_path"]:
set_storage_path(
sender=None,
document=document,
if do_correspondent:
correspondent = set_correspondent(
None,
document,
classifier=classifier,
replace=options["overwrite"],
use_first=options["use_first"],
suggest=options["suggest"],
base_url=options["base_url"],
stdout=self.stdout,
style_func=self.style,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if correspondent is not None:
stats.correspondents += 1
suggestion.correspondent = correspondent
if do_document_type:
document_type = set_document_type(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if document_type is not None:
stats.document_types += 1
suggestion.document_type = document_type
if do_tags:
tags_to_add, tags_to_remove = set_tags(
None,
document,
classifier=classifier,
replace=overwrite,
dry_run=suggest,
)
stats.tags_added += len(tags_to_add)
stats.tags_removed += len(tags_to_remove)
suggestion.tags_to_add = frozenset(tags_to_add)
suggestion.tags_to_remove = frozenset(tags_to_remove)
if do_storage_path:
storage_path = set_storage_path(
None,
document,
classifier=classifier,
replace=overwrite,
use_first=use_first,
dry_run=suggest,
)
if storage_path is not None:
stats.storage_paths += 1
suggestion.storage_path = storage_path
stats.documents_processed += 1
if suggest:
suggestions.append(suggestion)
# Post-loop output
if suggest:
visible = [s for s in suggestions if s.has_suggestions]
if visible:
self.console.print(_build_suggestion_table(visible, base_url))
else:
self.console.print("[green]No changes suggested.[/green]")
else:
self.console.print(_build_summary_table(stats))

View File

@@ -4,6 +4,7 @@ import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from celery import shared_task
from celery import states
@@ -32,12 +33,14 @@ from documents.file_handling import create_source_path_directory
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename
from documents.file_handling import generate_unique_filename
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import MatchingModel
from documents.models import DocumentType
from documents.models import PaperlessTask
from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag
from documents.models import UiSettings
from documents.models import Workflow
@@ -81,47 +84,41 @@ def add_inbox_tags(sender, document: Document, logging_group=None, **kwargs) ->
document.add_nested_tags(inbox_tags)
def _suggestion_printer(
stdout,
style_func,
suggestion_type: str,
document: Document,
selected: MatchingModel,
base_url: str | None = None,
) -> None:
"""
Smaller helper to reduce duplication when just outputting suggestions to the console
"""
doc_str = str(document)
if base_url is not None:
stdout.write(style_func.SUCCESS(doc_str))
stdout.write(style_func.SUCCESS(f"{base_url}/documents/{document.pk}"))
else:
stdout.write(style_func.SUCCESS(f"{doc_str} [{document.pk}]"))
stdout.write(f"Suggest {suggestion_type}: {selected}")
def set_correspondent(
sender,
sender: object,
document: Document,
*,
logging_group=None,
logging_group: object = None,
classifier: DocumentClassifier | None = None,
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> Correspondent | None:
"""
Assign a correspondent to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing correspondent assignment.
use_first: If True, pick the first match when multiple correspondents
match. If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The correspondent that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.correspondent and not replace:
return
return None
potential_correspondents = matching.match_correspondents(document, classifier)
potential_count = len(potential_correspondents)
selected = potential_correspondents[0] if potential_correspondents else None
if potential_count > 1:
if use_first:
logger.debug(
@@ -135,49 +132,53 @@ def set_correspondent(
f"not assigning any correspondent",
extra={"group": logging_group},
)
return
return None
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"correspondent",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning correspondent {selected} to {document}",
extra={"group": logging_group},
)
if (selected or replace) and not dry_run:
logger.info(
f"Assigning correspondent {selected} to {document}",
extra={"group": logging_group},
)
document.correspondent = selected
document.save(update_fields=("correspondent",))
document.correspondent = selected
document.save(update_fields=("correspondent",))
return selected
def set_document_type(
sender,
sender: object,
document: Document,
*,
logging_group=None,
logging_group: object = None,
classifier: DocumentClassifier | None = None,
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> DocumentType | None:
"""
Assign a document type to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing document type assignment.
use_first: If True, pick the first match when multiple types match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The document type that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.document_type and not replace:
return
return None
potential_document_type = matching.match_document_types(document, classifier)
potential_count = len(potential_document_type)
selected = potential_document_type[0] if potential_document_type else None
potential_document_types = matching.match_document_types(document, classifier)
potential_count = len(potential_document_types)
selected = potential_document_types[0] if potential_document_types else None
if potential_count > 1:
if use_first:
@@ -192,42 +193,64 @@ def set_document_type(
f"not assigning any document type",
extra={"group": logging_group},
)
return
return None
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"document type",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning document type {selected} to {document}",
extra={"group": logging_group},
)
if (selected or replace) and not dry_run:
logger.info(
f"Assigning document type {selected} to {document}",
extra={"group": logging_group},
)
document.document_type = selected
document.save(update_fields=("document_type",))
document.document_type = selected
document.save(update_fields=("document_type",))
return selected
def set_tags(
sender,
sender: object,
document: Document,
*,
logging_group=None,
logging_group: object = None,
classifier: DocumentClassifier | None = None,
replace=False,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
replace: bool = False,
dry_run: bool = False,
**kwargs: Any,
) -> tuple[set[Tag], set[Tag]]:
"""
Assign tags to a document based on classifier results.
When replace=True, existing auto-matched and rule-matched tags are removed
before applying the new set (inbox tags and manually-added tags are preserved).
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, remove existing classifier-managed tags before applying
new ones. Inbox tags and manually-added tags are always preserved.
dry_run: If True, compute what would change without saving anything.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
A two-tuple of (tags_added, tags_removed). In non-replace mode,
tags_removed is always an empty set. In dry_run mode, neither set
is applied to the database.
"""
# Compute which tags would be removed under replace mode.
# The filter mirrors the .delete() call below: keep inbox tags and
# manually-added tags (match="" and not auto-matched).
if replace:
tags_to_remove: set[Tag] = set(
document.tags.exclude(
is_inbox_tag=True,
).exclude(
Q(match="") & ~Q(matching_algorithm=Tag.MATCH_AUTO),
),
)
else:
tags_to_remove = set()
if replace and not dry_run:
Document.tags.through.objects.filter(document=document).exclude(
Q(tag__is_inbox_tag=True),
).exclude(
@@ -235,65 +258,53 @@ def set_tags(
).delete()
current_tags = set(document.tags.all())
matched_tags = matching.match_tags(document, classifier)
tags_to_add = set(matched_tags) - current_tags
relevant_tags = set(matched_tags) - current_tags
if suggest:
extra_tags = current_tags - set(matched_tags)
extra_tags = [
t for t in extra_tags if t.matching_algorithm == MatchingModel.MATCH_AUTO
]
if not relevant_tags and not extra_tags:
return
doc_str = style_func.SUCCESS(str(document))
if base_url:
stdout.write(doc_str)
stdout.write(f"{base_url}/documents/{document.pk}")
else:
stdout.write(doc_str + style_func.SUCCESS(f" [{document.pk}]"))
if relevant_tags:
stdout.write("Suggest tags: " + ", ".join([t.name for t in relevant_tags]))
if extra_tags:
stdout.write("Extra tags: " + ", ".join([t.name for t in extra_tags]))
else:
if not relevant_tags:
return
message = 'Tagging "{}" with "{}"'
if tags_to_add and not dry_run:
logger.info(
message.format(document, ", ".join([t.name for t in relevant_tags])),
f'Tagging "{document}" with "{", ".join(t.name for t in tags_to_add)}"',
extra={"group": logging_group},
)
document.add_nested_tags(tags_to_add)
document.add_nested_tags(relevant_tags)
return tags_to_add, tags_to_remove
def set_storage_path(
sender,
sender: object,
document: Document,
*,
logging_group=None,
logging_group: object = None,
classifier: DocumentClassifier | None = None,
replace=False,
use_first=True,
suggest=False,
base_url=None,
stdout=None,
style_func=None,
**kwargs,
) -> None:
replace: bool = False,
use_first: bool = True,
dry_run: bool = False,
**kwargs: Any,
) -> StoragePath | None:
"""
Assign a storage path to a document based on classifier results.
Args:
document: The document to classify.
logging_group: Optional logging group for structured log output.
classifier: The trained classifier. If None, only rule-based matching runs.
replace: If True, overwrite an existing storage path assignment.
use_first: If True, pick the first match when multiple paths match.
If False, skip assignment when multiple match.
dry_run: If True, compute and return the selection without saving.
**kwargs: Absorbed for Django signal compatibility (e.g. sender, signal).
Returns:
The storage path that was (or would be) assigned, or None if no match
was found or assignment was skipped.
"""
if document.storage_path and not replace:
return
return None
potential_storage_path = matching.match_storage_paths(
document,
classifier,
)
potential_count = len(potential_storage_path)
selected = potential_storage_path[0] if potential_storage_path else None
potential_storage_paths = matching.match_storage_paths(document, classifier)
potential_count = len(potential_storage_paths)
selected = potential_storage_paths[0] if potential_storage_paths else None
if potential_count > 1:
if use_first:
@@ -308,26 +319,17 @@ def set_storage_path(
f"not assigning any storage directory",
extra={"group": logging_group},
)
return
return None
if selected or replace:
if suggest:
_suggestion_printer(
stdout,
style_func,
"storage directory",
document,
selected,
base_url,
)
else:
logger.info(
f"Assigning storage path {selected} to {document}",
extra={"group": logging_group},
)
if (selected or replace) and not dry_run:
logger.info(
f"Assigning storage path {selected} to {document}",
extra={"group": logging_group},
)
document.storage_path = selected
document.save(update_fields=("storage_path",))
document.storage_path = selected
document.save(update_fields=("storage_path",))
return selected
# see empty_trash in documents/tasks.py for signal handling

View File

@@ -1,17 +1,65 @@
from factory import Faker
"""
Factory-boy factories for documents app models.
"""
from __future__ import annotations
import factory
from factory.django import DjangoModelFactory
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
class CorrespondentFactory(DjangoModelFactory):
class Meta:
model = Correspondent
name = Faker("name")
name = factory.Faker("company")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class DocumentTypeFactory(DjangoModelFactory):
class Meta:
model = DocumentType
name = factory.Faker("bs")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class TagFactory(DjangoModelFactory):
class Meta:
model = Tag
name = factory.Faker("word")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
is_inbox_tag = False
class StoragePathFactory(DjangoModelFactory):
class Meta:
model = StoragePath
name = factory.Faker("file_path", depth=2, extension="")
path = factory.LazyAttribute(lambda o: f"{o.name}/{{title}}")
match = ""
matching_algorithm = MatchingModel.MATCH_NONE
class DocumentFactory(DjangoModelFactory):
class Meta:
model = Document
title = factory.Faker("sentence", nb_words=4)
checksum = factory.Faker("md5")
content = factory.Faker("paragraph")
correspondent = None
document_type = None
storage_path = None

View File

@@ -21,16 +21,6 @@ class TestApiUiSettings(DirectoriesMixin, APITestCase):
self.test_user.save()
self.client.force_authenticate(user=self.test_user)
@override_settings(
APP_TITLE=None,
APP_LOGO=None,
AUDIT_LOG_ENABLED=True,
EMPTY_TRASH_DELAY=30,
ENABLE_UPDATE_CHECK="default",
EMAIL_ENABLED=False,
GMAIL_OAUTH_ENABLED=False,
OUTLOOK_OAUTH_ENABLED=False,
)
def test_api_get_ui_settings(self) -> None:
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)

View File

@@ -919,7 +919,6 @@ class TestTagBarcode(DirectoriesMixin, SampleDirMixin, GetReaderPluginMixin, Tes
@override_settings(
CONSUMER_ENABLE_TAG_BARCODE=True,
CONSUMER_TAG_BARCODE_MAPPING={"ASN(.*)": "\\g<1>"},
CONSUMER_ENABLE_ASN_BARCODE=False,
)
def test_scan_file_for_many_custom_tags(self) -> None:
"""

View File

@@ -329,14 +329,14 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
FILENAME_FORMAT="{added_year}-{added_month}-{added_day}",
)
def test_added_year_month_day(self) -> None:
d1 = timezone.make_aware(datetime.datetime(1232, 1, 9, 1, 1, 1))
d1 = timezone.make_aware(datetime.datetime(232, 1, 9, 1, 1, 1))
doc1 = Document.objects.create(
title="doc1",
mime_type="application/pdf",
added=d1,
)
self.assertEqual(generate_filename(doc1), Path("1232-01-09.pdf"))
self.assertEqual(generate_filename(doc1), Path("232-01-09.pdf"))
doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1))

View File

@@ -140,7 +140,7 @@ class TestFuzzyMatchCommand(TestCase):
mime_type="application/pdf",
filename="final_test.pdf",
)
stdout, _ = self.call_command("--no-progress-bar", "--processes", "1")
stdout, _ = self.call_command("--no-progress-bar")
lines = [x.strip() for x in stdout.splitlines() if x.strip()]
self.assertEqual(len(lines), 3)
for line in lines:
@@ -183,12 +183,7 @@ class TestFuzzyMatchCommand(TestCase):
self.assertEqual(Document.objects.count(), 3)
stdout, _ = self.call_command(
"--delete",
"--no-progress-bar",
"--processes",
"1",
)
stdout, _ = self.call_command("--delete", "--no-progress-bar")
self.assertIn(
"The command is configured to delete documents. Use with caution",

View File

@@ -1,298 +1,358 @@
"""
Tests for the document_retagger management command.
"""
from __future__ import annotations
import pytest
from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import TestCase
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.factories import CorrespondentFactory
from documents.tests.factories import DocumentFactory
from documents.tests.factories import DocumentTypeFactory
from documents.tests.factories import StoragePathFactory
from documents.tests.factories import TagFactory
from documents.tests.utils import DirectoriesMixin
# ---------------------------------------------------------------------------
# Module-level type aliases
# ---------------------------------------------------------------------------
StoragePathTuple = tuple[StoragePath, StoragePath, StoragePath]
TagTuple = tuple[Tag, Tag, Tag, Tag, Tag]
CorrespondentTuple = tuple[Correspondent, Correspondent]
DocumentTypeTuple = tuple[DocumentType, DocumentType]
DocumentTuple = tuple[Document, Document, Document, Document]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def storage_paths(db) -> StoragePathTuple:
"""Three storage paths with varying match rules."""
sp1 = StoragePathFactory(
path="{created_data}/{title}",
match="auto document",
matching_algorithm=MatchingModel.MATCH_LITERAL,
)
sp2 = StoragePathFactory(
path="{title}",
match="^first|^unrelated",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
sp3 = StoragePathFactory(
path="{title}",
match="^blah",
matching_algorithm=MatchingModel.MATCH_REGEX,
)
return sp1, sp2, sp3
@pytest.fixture()
def tags(db) -> TagTuple:
"""Tags covering the common matching scenarios."""
tag_first = TagFactory(match="first", matching_algorithm=Tag.MATCH_ANY)
tag_second = TagFactory(match="second", matching_algorithm=Tag.MATCH_ANY)
tag_inbox = TagFactory(is_inbox_tag=True)
tag_no_match = TagFactory()
tag_auto = TagFactory(matching_algorithm=Tag.MATCH_AUTO)
return tag_first, tag_second, tag_inbox, tag_no_match, tag_auto
@pytest.fixture()
def correspondents(db) -> CorrespondentTuple:
"""Two correspondents matching 'first' and 'second' content."""
c_first = CorrespondentFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
c_second = CorrespondentFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return c_first, c_second
@pytest.fixture()
def document_types(db) -> DocumentTypeTuple:
"""Two document types matching 'first' and 'second' content."""
dt_first = DocumentTypeFactory(
match="first",
matching_algorithm=MatchingModel.MATCH_ANY,
)
dt_second = DocumentTypeFactory(
match="second",
matching_algorithm=MatchingModel.MATCH_ANY,
)
return dt_first, dt_second
@pytest.fixture()
def documents(storage_paths: StoragePathTuple, tags: TagTuple) -> DocumentTuple:
"""Four documents with varied content used across most retagger tests."""
_, _, sp3 = storage_paths
_, _, tag_inbox, tag_no_match, tag_auto = tags
d1 = DocumentFactory(checksum="A", title="A", content="first document")
d2 = DocumentFactory(checksum="B", title="B", content="second document")
d3 = DocumentFactory(
checksum="C",
title="C",
content="unrelated document",
storage_path=sp3,
)
d4 = DocumentFactory(checksum="D", title="D", content="auto document")
d3.tags.add(tag_inbox, tag_no_match)
d4.tags.add(tag_auto)
return d1, d2, d3, d4
def _get_docs() -> DocumentTuple:
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
# ---------------------------------------------------------------------------
# Tag assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
class TestRetagger(DirectoriesMixin, TestCase):
def make_models(self) -> None:
self.sp1 = StoragePath.objects.create(
name="dummy a",
path="{created_data}/{title}",
match="auto document",
matching_algorithm=StoragePath.MATCH_LITERAL,
)
self.sp2 = StoragePath.objects.create(
name="dummy b",
path="{title}",
match="^first|^unrelated",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.sp3 = StoragePath.objects.create(
name="dummy c",
path="{title}",
match="^blah",
matching_algorithm=StoragePath.MATCH_REGEX,
)
self.d1 = Document.objects.create(
checksum="A",
title="A",
content="first document",
)
self.d2 = Document.objects.create(
checksum="B",
title="B",
content="second document",
)
self.d3 = Document.objects.create(
checksum="C",
title="C",
content="unrelated document",
storage_path=self.sp3,
)
self.d4 = Document.objects.create(
checksum="D",
title="D",
content="auto document",
)
self.tag_first = Tag.objects.create(
name="tag1",
match="first",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_second = Tag.objects.create(
name="tag2",
match="second",
matching_algorithm=Tag.MATCH_ANY,
)
self.tag_inbox = Tag.objects.create(name="test", is_inbox_tag=True)
self.tag_no_match = Tag.objects.create(name="test2")
self.tag_auto = Tag.objects.create(
name="tagauto",
matching_algorithm=Tag.MATCH_AUTO,
)
self.d3.tags.add(self.tag_inbox)
self.d3.tags.add(self.tag_no_match)
self.d4.tags.add(self.tag_auto)
self.correspondent_first = Correspondent.objects.create(
name="c1",
match="first",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.correspondent_second = Correspondent.objects.create(
name="c2",
match="second",
matching_algorithm=Correspondent.MATCH_ANY,
)
self.doctype_first = DocumentType.objects.create(
name="dt1",
match="first",
matching_algorithm=DocumentType.MATCH_ANY,
)
self.doctype_second = DocumentType.objects.create(
name="dt2",
match="second",
matching_algorithm=DocumentType.MATCH_ANY,
)
def get_updated_docs(self):
return (
Document.objects.get(title="A"),
Document.objects.get(title="B"),
Document.objects.get(title="C"),
Document.objects.get(title="D"),
)
def setUp(self) -> None:
super().setUp()
self.make_models()
def test_add_tags(self) -> None:
@pytest.mark.django_db
class TestRetaggerTags(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_tags(self, tags: TagTuple) -> None:
tag_first, tag_second, *_ = tags
call_command("document_retagger", "--tags")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertEqual(d_first.tags.count(), 1)
self.assertEqual(d_second.tags.count(), 1)
self.assertEqual(d_unrelated.tags.count(), 2)
self.assertEqual(d_auto.tags.count(), 1)
assert d_first.tags.count() == 1
assert d_second.tags.count() == 1
assert d_unrelated.tags.count() == 2
assert d_auto.tags.count() == 1
assert d_first.tags.first() == tag_first
assert d_second.tags.first() == tag_second
self.assertEqual(d_first.tags.first(), self.tag_first)
self.assertEqual(d_second.tags.first(), self.tag_second)
def test_add_type(self) -> None:
call_command("document_retagger", "--document_type")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertEqual(d_first.document_type, self.doctype_first)
self.assertEqual(d_second.document_type, self.doctype_second)
def test_add_correspondent(self) -> None:
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = self.get_updated_docs()
self.assertEqual(d_first.correspondent, self.correspondent_first)
self.assertEqual(d_second.correspondent, self.correspondent_second)
def test_overwrite_preserve_inbox(self) -> None:
self.d1.tags.add(self.tag_second)
def test_overwrite_removes_stale_tags_and_preserves_inbox(
self,
documents: DocumentTuple,
tags: TagTuple,
) -> None:
d1, *_ = documents
tag_first, tag_second, tag_inbox, tag_no_match, _ = tags
d1.tags.add(tag_second)
call_command("document_retagger", "--tags", "--overwrite")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertIsNotNone(Tag.objects.get(id=self.tag_second.id))
assert Tag.objects.filter(id=tag_second.id).exists()
assert list(d_first.tags.values_list("id", flat=True)) == [tag_first.id]
assert list(d_second.tags.values_list("id", flat=True)) == [tag_second.id]
assert set(d_unrelated.tags.values_list("id", flat=True)) == {
tag_inbox.id,
tag_no_match.id,
}
assert d_auto.tags.count() == 0
self.assertCountEqual(
[tag.id for tag in d_first.tags.all()],
[self.tag_first.id],
)
self.assertCountEqual(
[tag.id for tag in d_second.tags.all()],
[self.tag_second.id],
)
self.assertCountEqual(
[tag.id for tag in d_unrelated.tags.all()],
[self.tag_inbox.id, self.tag_no_match.id],
)
self.assertEqual(d_auto.tags.count(), 0)
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_tags(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--tags", "--suggest", *extra_args)
d_first, d_second, _, d_auto = _get_docs()
def test_add_tags_suggest(self) -> None:
call_command("document_retagger", "--tags", "--suggest")
d_first, d_second, _, d_auto = self.get_updated_docs()
assert d_first.tags.count() == 0
assert d_second.tags.count() == 0
assert d_auto.tags.count() == 1
self.assertEqual(d_first.tags.count(), 0)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
def test_add_type_suggest(self) -> None:
call_command("document_retagger", "--document_type", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
# ---------------------------------------------------------------------------
# Document type assignment
# ---------------------------------------------------------------------------
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
def test_add_correspondent_suggest(self) -> None:
call_command("document_retagger", "--correspondent", "--suggest")
d_first, d_second, _, _ = self.get_updated_docs()
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerDocumentType(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_type(self, document_types: DocumentTypeTuple) -> None:
dt_first, dt_second = document_types
call_command("document_retagger", "--document_type")
d_first, d_second, _, _ = _get_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
assert d_first.document_type == dt_first
assert d_second.document_type == dt_second
def test_add_tags_suggest_url(self) -> None:
call_command(
"document_retagger",
"--tags",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, d_auto = self.get_updated_docs()
@pytest.mark.usefixtures("documents", "document_types")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_document_type(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--document_type", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
self.assertEqual(d_first.tags.count(), 0)
self.assertEqual(d_second.tags.count(), 0)
self.assertEqual(d_auto.tags.count(), 1)
assert d_first.document_type is None
assert d_second.document_type is None
def test_add_type_suggest_url(self) -> None:
call_command(
"document_retagger",
"--document_type",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.document_type)
self.assertIsNone(d_second.document_type)
# ---------------------------------------------------------------------------
# Correspondent assignment
# ---------------------------------------------------------------------------
def test_add_correspondent_suggest_url(self) -> None:
call_command(
"document_retagger",
"--correspondent",
"--suggest",
"--base-url=http://localhost",
)
d_first, d_second, _, _ = self.get_updated_docs()
self.assertIsNone(d_first.correspondent)
self.assertIsNone(d_second.correspondent)
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerCorrespondent(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_correspondent(self, correspondents: CorrespondentTuple) -> None:
c_first, c_second = correspondents
call_command("document_retagger", "--correspondent")
d_first, d_second, _, _ = _get_docs()
def test_add_storage_path(self) -> None:
assert d_first.correspondent == c_first
assert d_second.correspondent == c_second
@pytest.mark.usefixtures("documents", "correspondents")
@pytest.mark.parametrize(
"extra_args",
[
pytest.param([], id="no_base_url"),
pytest.param(["--base-url=http://localhost"], id="with_base_url"),
],
)
def test_suggest_does_not_apply_correspondent(self, extra_args: list[str]) -> None:
call_command("document_retagger", "--correspondent", "--suggest", *extra_args)
d_first, d_second, _, _ = _get_docs()
assert d_first.correspondent is None
assert d_second.correspondent is None
# ---------------------------------------------------------------------------
# Storage path assignment
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerStoragePath(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_add_storage_path(self, storage_paths: StoragePathTuple) -> None:
"""
GIVEN:
- 2 storage paths with documents which match them
- 1 document which matches but has a storage path
WHEN:
- document retagger is called
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch left unchanged
GIVEN documents matching various storage path rules
WHEN document_retagger --storage_path is called
THEN matching documents get the correct path; existing path is unchanged
"""
call_command(
"document_retagger",
"--storage_path",
)
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
sp1, sp2, sp3 = storage_paths
call_command("document_retagger", "--storage_path")
d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertEqual(d_first.storage_path, self.sp2)
self.assertEqual(d_auto.storage_path, self.sp1)
self.assertIsNone(d_second.storage_path)
self.assertEqual(d_unrelated.storage_path, self.sp3)
assert d_first.storage_path == sp2
assert d_auto.storage_path == sp1
assert d_second.storage_path is None
assert d_unrelated.storage_path == sp3
def test_overwrite_storage_path(self) -> None:
@pytest.mark.usefixtures("documents")
def test_overwrite_storage_path(self, storage_paths: StoragePathTuple) -> None:
"""
GIVEN:
- 2 storage paths with documents which match them
- 1 document which matches but has a storage path
WHEN:
- document retagger is called with overwrite
THEN:
- Matching document's storage paths updated
- Non-matching documents have no storage path
- Existing storage patch overwritten
GIVEN a document with an existing storage path that matches a different rule
WHEN document_retagger --storage_path --overwrite is called
THEN the existing path is replaced by the newly matched path
"""
sp1, sp2, _ = storage_paths
call_command("document_retagger", "--storage_path", "--overwrite")
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
d_first, d_second, d_unrelated, d_auto = _get_docs()
self.assertEqual(d_first.storage_path, self.sp2)
self.assertEqual(d_auto.storage_path, self.sp1)
self.assertIsNone(d_second.storage_path)
self.assertEqual(d_unrelated.storage_path, self.sp2)
assert d_first.storage_path == sp2
assert d_auto.storage_path == sp1
assert d_second.storage_path is None
assert d_unrelated.storage_path == sp2
def test_id_range_parameter(self) -> None:
commandOutput = ""
Document.objects.create(
checksum="E",
title="E",
content="NOT the first document",
)
call_command("document_retagger", "--tags", "--id-range", "1", "2")
# The retagger shouldn`t apply the 'first' tag to our new document
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 1)
try:
commandOutput = call_command("document_retagger", "--tags", "--id-range")
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "Error: argument --id-range: expected 2 arguments")
# ---------------------------------------------------------------------------
# ID range filtering
# ---------------------------------------------------------------------------
try:
commandOutput = call_command(
"document_retagger",
"--tags",
"--id-range",
"a",
"b",
)
except CommandError:
# Just ignore the error
None
self.assertIn(commandOutput, "error: argument --id-range: invalid int value:")
call_command("document_retagger", "--tags", "--id-range", "1", "9999")
# Now we should have 2 documents
self.assertEqual(Document.objects.filter(tags__id=self.tag_first.id).count(), 2)
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerIdRange(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
("id_range_args", "expected_count"),
[
pytest.param(["1", "2"], 1, id="narrow_range_limits_scope"),
pytest.param(["1", "9999"], 2, id="wide_range_tags_all_matches"),
],
)
def test_id_range_limits_scope(
self,
tags: TagTuple,
id_range_args: list[str],
expected_count: int,
) -> None:
DocumentFactory(content="NOT the first document")
call_command("document_retagger", "--tags", "--id-range", *id_range_args)
tag_first, *_ = tags
assert Document.objects.filter(tags__id=tag_first.id).count() == expected_count
@pytest.mark.usefixtures("documents")
@pytest.mark.parametrize(
"args",
[
pytest.param(["--tags", "--id-range"], id="missing_both_values"),
pytest.param(["--tags", "--id-range", "a", "b"], id="non_integer_values"),
],
)
def test_id_range_invalid_arguments_raise(self, args: list[str]) -> None:
with pytest.raises((CommandError, SystemExit)):
call_command("document_retagger", *args)
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
@pytest.mark.management
@pytest.mark.django_db
class TestRetaggerEdgeCases(DirectoriesMixin):
@pytest.mark.usefixtures("documents")
def test_no_targets_exits_cleanly(self) -> None:
"""Calling the retagger with no classifier targets should not raise."""
call_command("document_retagger")
@pytest.mark.usefixtures("documents")
def test_inbox_only_skips_non_inbox_documents(self) -> None:
"""--inbox-only must restrict processing to documents with an inbox tag."""
call_command("document_retagger", "--tags", "--inbox-only")
d_first, _, d_unrelated, _ = _get_docs()
assert d_first.tags.count() == 0
assert d_unrelated.tags.count() == 2

View File

@@ -33,11 +33,11 @@ from documents.plugins.helpers import ProgressStatusOptions
def setup_directories():
dirs = namedtuple("Dirs", ())
dirs.data_dir = Path(tempfile.mkdtemp()).resolve()
dirs.scratch_dir = Path(tempfile.mkdtemp()).resolve()
dirs.media_dir = Path(tempfile.mkdtemp()).resolve()
dirs.consumption_dir = Path(tempfile.mkdtemp()).resolve()
dirs.static_dir = Path(tempfile.mkdtemp()).resolve()
dirs.data_dir = Path(tempfile.mkdtemp())
dirs.scratch_dir = Path(tempfile.mkdtemp())
dirs.media_dir = Path(tempfile.mkdtemp())
dirs.consumption_dir = Path(tempfile.mkdtemp())
dirs.static_dir = Path(tempfile.mkdtemp())
dirs.index_dir = dirs.data_dir / "index"
dirs.originals_dir = dirs.media_dir / "documents" / "originals"
dirs.thumbnail_dir = dirs.media_dir / "documents" / "thumbnails"

View File

@@ -2,7 +2,7 @@ msgid ""
msgstr ""
"Project-Id-Version: paperless-ngx\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-02-27 22:38+0000\n"
"POT-Creation-Date: 2026-02-26 18:09+0000\n"
"PO-Revision-Date: 2022-02-17 04:17\n"
"Last-Translator: \n"
"Language-Team: English\n"
@@ -1856,151 +1856,151 @@ msgstr ""
msgid "paperless application settings"
msgstr ""
#: paperless/settings/__init__.py:746
#: paperless/settings.py:819
msgid "English (US)"
msgstr ""
#: paperless/settings/__init__.py:747
#: paperless/settings.py:820
msgid "Arabic"
msgstr ""
#: paperless/settings/__init__.py:748
#: paperless/settings.py:821
msgid "Afrikaans"
msgstr ""
#: paperless/settings/__init__.py:749
#: paperless/settings.py:822
msgid "Belarusian"
msgstr ""
#: paperless/settings/__init__.py:750
#: paperless/settings.py:823
msgid "Bulgarian"
msgstr ""
#: paperless/settings/__init__.py:751
#: paperless/settings.py:824
msgid "Catalan"
msgstr ""
#: paperless/settings/__init__.py:752
#: paperless/settings.py:825
msgid "Czech"
msgstr ""
#: paperless/settings/__init__.py:753
#: paperless/settings.py:826
msgid "Danish"
msgstr ""
#: paperless/settings/__init__.py:754
#: paperless/settings.py:827
msgid "German"
msgstr ""
#: paperless/settings/__init__.py:755
#: paperless/settings.py:828
msgid "Greek"
msgstr ""
#: paperless/settings/__init__.py:756
#: paperless/settings.py:829
msgid "English (GB)"
msgstr ""
#: paperless/settings/__init__.py:757
#: paperless/settings.py:830
msgid "Spanish"
msgstr ""
#: paperless/settings/__init__.py:758
#: paperless/settings.py:831
msgid "Persian"
msgstr ""
#: paperless/settings/__init__.py:759
#: paperless/settings.py:832
msgid "Finnish"
msgstr ""
#: paperless/settings/__init__.py:760
#: paperless/settings.py:833
msgid "French"
msgstr ""
#: paperless/settings/__init__.py:761
#: paperless/settings.py:834
msgid "Hungarian"
msgstr ""
#: paperless/settings/__init__.py:762
#: paperless/settings.py:835
msgid "Indonesian"
msgstr ""
#: paperless/settings/__init__.py:763
#: paperless/settings.py:836
msgid "Italian"
msgstr ""
#: paperless/settings/__init__.py:764
#: paperless/settings.py:837
msgid "Japanese"
msgstr ""
#: paperless/settings/__init__.py:765
#: paperless/settings.py:838
msgid "Korean"
msgstr ""
#: paperless/settings/__init__.py:766
#: paperless/settings.py:839
msgid "Luxembourgish"
msgstr ""
#: paperless/settings/__init__.py:767
#: paperless/settings.py:840
msgid "Norwegian"
msgstr ""
#: paperless/settings/__init__.py:768
#: paperless/settings.py:841
msgid "Dutch"
msgstr ""
#: paperless/settings/__init__.py:769
#: paperless/settings.py:842
msgid "Polish"
msgstr ""
#: paperless/settings/__init__.py:770
#: paperless/settings.py:843
msgid "Portuguese (Brazil)"
msgstr ""
#: paperless/settings/__init__.py:771
#: paperless/settings.py:844
msgid "Portuguese"
msgstr ""
#: paperless/settings/__init__.py:772
#: paperless/settings.py:845
msgid "Romanian"
msgstr ""
#: paperless/settings/__init__.py:773
#: paperless/settings.py:846
msgid "Russian"
msgstr ""
#: paperless/settings/__init__.py:774
#: paperless/settings.py:847
msgid "Slovak"
msgstr ""
#: paperless/settings/__init__.py:775
#: paperless/settings.py:848
msgid "Slovenian"
msgstr ""
#: paperless/settings/__init__.py:776
#: paperless/settings.py:849
msgid "Serbian"
msgstr ""
#: paperless/settings/__init__.py:777
#: paperless/settings.py:850
msgid "Swedish"
msgstr ""
#: paperless/settings/__init__.py:778
#: paperless/settings.py:851
msgid "Turkish"
msgstr ""
#: paperless/settings/__init__.py:779
#: paperless/settings.py:852
msgid "Ukrainian"
msgstr ""
#: paperless/settings/__init__.py:780
#: paperless/settings.py:853
msgid "Vietnamese"
msgstr ""
#: paperless/settings/__init__.py:781
#: paperless/settings.py:854
msgid "Chinese Simplified"
msgstr ""
#: paperless/settings/__init__.py:782
#: paperless/settings.py:855
msgid "Chinese Traditional"
msgstr ""

View File

@@ -202,43 +202,3 @@ def audit_log_check(app_configs, **kwargs):
)
return result
@register()
def check_deprecated_db_settings(
app_configs: object,
**kwargs: object,
) -> list[Warning]:
"""Check for deprecated database environment variables.
Detects legacy advanced options that should be migrated to
PAPERLESS_DB_OPTIONS. Returns one Warning per deprecated variable found.
"""
deprecated_vars: dict[str, str] = {
"PAPERLESS_DB_TIMEOUT": "timeout",
"PAPERLESS_DB_POOLSIZE": "pool.min_size / pool.max_size",
"PAPERLESS_DBSSLMODE": "sslmode",
"PAPERLESS_DBSSLROOTCERT": "sslrootcert",
"PAPERLESS_DBSSLCERT": "sslcert",
"PAPERLESS_DBSSLKEY": "sslkey",
}
warnings: list[Warning] = []
for var_name, db_option_key in deprecated_vars.items():
if not os.getenv(var_name):
continue
warnings.append(
Warning(
f"Deprecated environment variable: {var_name}",
hint=(
f"{var_name} is no longer supported and will be removed in v3.2. "
f"Set the equivalent option via PAPERLESS_DB_OPTIONS instead. "
f'Example: PAPERLESS_DB_OPTIONS=\'{{"{db_option_key}": "<value>"}}\'. '
"See https://docs.paperless-ngx.com/migration/ for the full reference."
),
id="paperless.W001",
),
)
return warnings

View File

@@ -17,8 +17,6 @@ from dateparser.languages.loader import LocaleDataLoader
from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv
from paperless.settings.custom import parse_db_settings
logger = logging.getLogger("paperless.settings")
# Tap paperless.conf if it's available
@@ -284,7 +282,7 @@ DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
# Directories #
###############################################################################
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
BASE_DIR: Path = Path(__file__).resolve().parent.parent
STATIC_ROOT = __get_path("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
@@ -724,8 +722,83 @@ EMAIL_CERTIFICATE_FILE = __get_optional_path("PAPERLESS_EMAIL_CERTIFICATE_LOCATI
###############################################################################
# Database #
###############################################################################
def _parse_db_settings() -> dict:
databases = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": DATA_DIR / "db.sqlite3",
"OPTIONS": {},
},
}
if os.getenv("PAPERLESS_DBHOST"):
# Have sqlite available as a second option for management commands
# This is important when migrating to/from sqlite
databases["sqlite"] = databases["default"].copy()
DATABASES = parse_db_settings(DATA_DIR)
databases["default"] = {
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
"OPTIONS": {},
}
if os.getenv("PAPERLESS_DBPORT"):
databases["default"]["PORT"] = os.getenv("PAPERLESS_DBPORT")
# Leave room for future extensibility
if os.getenv("PAPERLESS_DBENGINE") == "mariadb":
engine = "django.db.backends.mysql"
# Contrary to Postgres, Django does not natively support connection pooling for MariaDB.
# However, since MariaDB uses threads instead of forks, establishing connections is significantly faster
# compared to PostgreSQL, so the lack of pooling is not an issue
options = {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"ssl_mode": os.getenv("PAPERLESS_DBSSLMODE", "PREFERRED"),
"ssl": {
"ca": os.getenv("PAPERLESS_DBSSLROOTCERT", None),
"cert": os.getenv("PAPERLESS_DBSSLCERT", None),
"key": os.getenv("PAPERLESS_DBSSLKEY", None),
},
}
else: # Default to PostgresDB
engine = "django.db.backends.postgresql"
options = {
"sslmode": os.getenv("PAPERLESS_DBSSLMODE", "prefer"),
"sslrootcert": os.getenv("PAPERLESS_DBSSLROOTCERT", None),
"sslcert": os.getenv("PAPERLESS_DBSSLCERT", None),
"sslkey": os.getenv("PAPERLESS_DBSSLKEY", None),
}
if int(os.getenv("PAPERLESS_DB_POOLSIZE", 0)) > 0:
options.update(
{
"pool": {
"min_size": 1,
"max_size": int(os.getenv("PAPERLESS_DB_POOLSIZE")),
},
},
)
databases["default"]["ENGINE"] = engine
databases["default"]["OPTIONS"].update(options)
if os.getenv("PAPERLESS_DB_TIMEOUT") is not None:
if databases["default"]["ENGINE"] == "django.db.backends.sqlite3":
databases["default"]["OPTIONS"].update(
{"timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
else:
databases["default"]["OPTIONS"].update(
{"connect_timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
databases["sqlite"]["OPTIONS"].update(
{"timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
return databases
DATABASES = _parse_db_settings()
if os.getenv("PAPERLESS_DBENGINE") == "mariadb":
# Silence Django error on old MariaDB versions.

View File

@@ -1,122 +0,0 @@
import os
from pathlib import Path
from typing import Any
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str
def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
"""Parse database settings from environment variables.
Core connection variables (no deprecation):
- PAPERLESS_DBENGINE (sqlite/postgresql/mariadb)
- PAPERLESS_DBHOST, PAPERLESS_DBPORT
- PAPERLESS_DBNAME, PAPERLESS_DBUSER, PAPERLESS_DBPASS
Advanced options can be set via:
- Legacy individual env vars (deprecated in v3.0, removed in v3.2)
- PAPERLESS_DB_OPTIONS (recommended v3+ approach)
Args:
data_dir: The data directory path for SQLite database location.
Returns:
A databases dict suitable for Django DATABASES setting.
"""
try:
engine = get_choice_from_env(
"PAPERLESS_DBENGINE",
{"sqlite", "postgresql", "mariadb"},
default="sqlite",
)
except ValueError:
# MariaDB users already had to set PAPERLESS_DBENGINE, so it was picked up above
# SQLite users didn't need to set anything
engine = "postgresql" if "PAPERLESS_DBHOST" in os.environ else "sqlite"
db_config: dict[str, Any]
base_options: dict[str, Any]
match engine:
case "sqlite":
db_config = {
"ENGINE": "django.db.backends.sqlite3",
"NAME": str((data_dir / "db.sqlite3").resolve()),
}
base_options = {}
case "postgresql":
db_config = {
"ENGINE": "django.db.backends.postgresql",
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
}
base_options = {
"sslmode": os.getenv("PAPERLESS_DBSSLMODE", "prefer"),
"sslrootcert": os.getenv("PAPERLESS_DBSSLROOTCERT"),
"sslcert": os.getenv("PAPERLESS_DBSSLCERT"),
"sslkey": os.getenv("PAPERLESS_DBSSLKEY"),
}
if (pool_size := get_int_from_env("PAPERLESS_DB_POOLSIZE")) is not None:
base_options["pool"] = {
"min_size": 1,
"max_size": pool_size,
}
case "mariadb":
db_config = {
"ENGINE": "django.db.backends.mysql",
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
}
base_options = {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": os.getenv("PAPERLESS_DBSSLMODE", "PREFERRED"),
"ssl": {
"ca": os.getenv("PAPERLESS_DBSSLROOTCERT"),
"cert": os.getenv("PAPERLESS_DBSSLCERT"),
"key": os.getenv("PAPERLESS_DBSSLKEY"),
},
}
case _: # pragma: no cover
raise NotImplementedError(engine)
# Handle port setting for external databases
if (
engine in ("postgresql", "mariadb")
and (port := get_int_from_env("PAPERLESS_DBPORT")) is not None
):
db_config["PORT"] = port
# Handle timeout setting (common across all engines, different key names)
if (timeout := get_int_from_env("PAPERLESS_DB_TIMEOUT")) is not None:
timeout_key = "timeout" if engine == "sqlite" else "connect_timeout"
base_options[timeout_key] = timeout
# Apply PAPERLESS_DB_OPTIONS overrides
db_config["OPTIONS"] = parse_dict_from_str(
os.getenv("PAPERLESS_DB_OPTIONS"),
defaults=base_options,
separator=";",
type_map={
# SQLite options
"timeout": int,
# Postgres/MariaDB options
"connect_timeout": int,
"pool.min_size": int,
"pool.max_size": int,
},
)
return {"default": db_config}

View File

@@ -1,192 +0,0 @@
import copy
import os
from collections.abc import Callable
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from typing import TypeVar
from typing import overload
T = TypeVar("T")
def str_to_bool(value: str) -> bool:
"""
Converts a string representation of truth to a boolean value.
Recognizes 'true', '1', 't', 'y', 'yes' as True, and
'false', '0', 'f', 'n', 'no' as False. Case-insensitive.
Args:
value: The string to convert.
Returns:
The boolean representation of the string.
Raises:
ValueError: If the string is not a recognized boolean value.
"""
val_lower = value.strip().lower()
if val_lower in ("true", "1", "t", "y", "yes"):
return True
elif val_lower in ("false", "0", "f", "n", "no"):
return False
raise ValueError(f"Cannot convert '{value}' to a boolean.")
@overload
def get_int_from_env(key: str) -> int | None: ...
@overload
def get_int_from_env(key: str, default: None) -> int | None: ...
@overload
def get_int_from_env(key: str, default: int) -> int: ...
def get_int_from_env(key: str, default: int | None = None) -> int | None:
"""
Return an integer value based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default
return int(os.environ[key])
def parse_dict_from_str(
env_str: str | None,
defaults: dict[str, Any] | None = None,
type_map: Mapping[str, Callable[[str], Any]] | None = None,
separator: str = ",",
) -> dict[str, Any]:
"""
Parses a key-value string into a dictionary, applying defaults and casting types.
Supports nested keys via dot-notation, e.g.:
"database.host=localhost,database.port=5432"
Args:
env_str: The string from the environment variable (e.g., "port=9090,debug=true").
defaults: A dictionary of default values (can contain nested dicts).
type_map: A dictionary mapping keys (dot-notation allowed) to a type or a parsing
function (e.g., {'port': int, 'debug': bool, 'database.port': int}).
The special `bool` type triggers custom boolean parsing.
separator: The character used to separate key-value pairs. Defaults to ','.
Returns:
A dictionary with the parsed and correctly-typed settings.
Raises:
ValueError: If a value cannot be cast to its specified type.
"""
def _set_nested(d: dict, keys: list[str], value: Any) -> None:
"""Set a nested value, creating intermediate dicts as needed."""
cur = d
for k in keys[:-1]:
if k not in cur or not isinstance(cur[k], dict):
cur[k] = {}
cur = cur[k]
cur[keys[-1]] = value
def _get_nested(d: dict, keys: list[str]) -> Any:
"""Get nested value or raise KeyError if not present."""
cur = d
for k in keys:
if not isinstance(cur, dict) or k not in cur:
raise KeyError
cur = cur[k]
return cur
def _has_nested(d: dict, keys: list[str]) -> bool:
try:
_get_nested(d, keys)
return True
except KeyError:
return False
settings: dict[str, Any] = copy.deepcopy(defaults) if defaults else {}
_type_map = type_map if type_map else {}
if not env_str:
return settings
# Parse the environment string using the specified separator
pairs = [p.strip() for p in env_str.split(separator) if p.strip()]
for pair in pairs:
if "=" not in pair:
# ignore malformed pairs
continue
key, val = pair.split("=", 1)
key = key.strip()
val = val.strip()
if not key:
continue
parts = key.split(".")
_set_nested(settings, parts, val)
# Apply type casting to the updated settings (supports nested keys in type_map)
for key, caster in _type_map.items():
key_parts = key.split(".")
if _has_nested(settings, key_parts):
raw_val = _get_nested(settings, key_parts)
# Only cast if it's a string (i.e. from env parsing). If defaults already provided
# a different type we leave it as-is.
if isinstance(raw_val, str):
try:
if caster is bool:
parsed = str_to_bool(raw_val)
elif caster is Path:
parsed = Path(raw_val).resolve()
else:
parsed = caster(raw_val)
except (ValueError, TypeError) as e:
caster_name = getattr(caster, "__name__", repr(caster))
raise ValueError(
f"Error casting key '{key}' with value '{raw_val}' "
f"to type '{caster_name}'",
) from e
_set_nested(settings, key_parts, parsed)
return settings
def get_choice_from_env(
env_key: str,
choices: set[str],
default: str | None = None,
) -> str:
"""
Gets and validates an environment variable against a set of allowed choices.
Args:
env_key: The environment variable key to validate
choices: Set of valid choices for the environment variable
default: Optional default value if environment variable is not set
Returns:
The validated environment variable value
Raises:
ValueError: If the environment variable value is not in choices
or if no default is provided and env var is missing
"""
value = os.environ.get(env_key, default)
if value is None:
raise ValueError(
f"Environment variable '{env_key}' is required but not set.",
)
if value not in choices:
raise ValueError(
f"Environment variable '{env_key}' has invalid value '{value}'. "
f"Valid choices are: {', '.join(sorted(choices))}",
)
return value

View File

@@ -1,266 +0,0 @@
import os
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from paperless.settings.custom import parse_db_settings
class TestParseDbSettings:
"""Test suite for parse_db_settings function."""
@pytest.mark.parametrize(
("env_vars", "expected_database_settings"),
[
pytest.param(
{},
{
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": None, # Will be replaced with tmp_path
"OPTIONS": {},
},
},
id="default-sqlite",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "sqlite",
"PAPERLESS_DB_OPTIONS": "timeout=30",
},
{
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": None, # Will be replaced with tmp_path
"OPTIONS": {
"timeout": 30,
},
},
},
id="sqlite-with-timeout-override",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "localhost",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "localhost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "prefer",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
},
},
},
id="postgresql-defaults",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "paperless-db-host",
"PAPERLESS_DBPORT": "1111",
"PAPERLESS_DBNAME": "customdb",
"PAPERLESS_DBUSER": "customuser",
"PAPERLESS_DBPASS": "custompass",
"PAPERLESS_DB_OPTIONS": "pool.max_size=50;pool.min_size=2;sslmode=require",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "paperless-db-host",
"PORT": 1111,
"NAME": "customdb",
"USER": "customuser",
"PASSWORD": "custompass",
"OPTIONS": {
"sslmode": "require",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
"pool": {
"min_size": 2,
"max_size": 50,
},
},
},
},
id="postgresql-overrides",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "pghost",
"PAPERLESS_DB_POOLSIZE": "10",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "pghost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "prefer",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
"pool": {
"min_size": 1,
"max_size": 10,
},
},
},
},
id="postgresql-legacy-poolsize",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "pghost",
"PAPERLESS_DBSSLMODE": "require",
"PAPERLESS_DBSSLROOTCERT": "/certs/ca.crt",
"PAPERLESS_DB_TIMEOUT": "30",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "pghost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "require",
"sslrootcert": "/certs/ca.crt",
"sslcert": None,
"sslkey": None,
"connect_timeout": 30,
},
},
},
id="postgresql-legacy-ssl-and-timeout",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "localhost",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "localhost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "PREFERRED",
"ssl": {
"ca": None,
"cert": None,
"key": None,
},
},
},
},
id="mariadb-defaults",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "paperless-mariadb-host",
"PAPERLESS_DBPORT": "5555",
"PAPERLESS_DBUSER": "my-cool-user",
"PAPERLESS_DBPASS": "my-secure-password",
"PAPERLESS_DB_OPTIONS": "ssl.ca=/path/to/ca.pem;ssl_mode=REQUIRED",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "paperless-mariadb-host",
"PORT": 5555,
"NAME": "paperless",
"USER": "my-cool-user",
"PASSWORD": "my-secure-password",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "REQUIRED",
"ssl": {
"ca": "/path/to/ca.pem",
"cert": None,
"key": None,
},
},
},
},
id="mariadb-overrides",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "mariahost",
"PAPERLESS_DBSSLMODE": "REQUIRED",
"PAPERLESS_DBSSLROOTCERT": "/certs/ca.pem",
"PAPERLESS_DBSSLCERT": "/certs/client.pem",
"PAPERLESS_DBSSLKEY": "/certs/client.key",
"PAPERLESS_DB_TIMEOUT": "25",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "mariahost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "REQUIRED",
"ssl": {
"ca": "/certs/ca.pem",
"cert": "/certs/client.pem",
"key": "/certs/client.key",
},
"connect_timeout": 25,
},
},
},
id="mariadb-legacy-ssl-and-timeout",
),
],
)
def test_parse_db_settings(
self,
tmp_path: Path,
mocker: MockerFixture,
env_vars: dict[str, str],
expected_database_settings: dict[str, dict],
) -> None:
"""Test various database configurations with defaults and overrides."""
# Clear environment and set test vars
mocker.patch.dict(os.environ, env_vars, clear=True)
# Update expected paths with actual tmp_path
if (
"default" in expected_database_settings
and expected_database_settings["default"]["NAME"] is None
):
expected_database_settings["default"]["NAME"] = str(
tmp_path / "db.sqlite3",
)
settings = parse_db_settings(tmp_path)
assert settings == expected_database_settings

View File

@@ -1,414 +0,0 @@
import os
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str
from paperless.settings.parsers import str_to_bool
class TestStringToBool:
@pytest.mark.parametrize(
"true_value",
[
pytest.param("true", id="lowercase_true"),
pytest.param("1", id="digit_1"),
pytest.param("T", id="capital_T"),
pytest.param("y", id="lowercase_y"),
pytest.param("YES", id="uppercase_YES"),
pytest.param(" True ", id="whitespace_true"),
],
)
def test_true_conversion(self, true_value: str):
"""Test that various 'true' strings correctly evaluate to True."""
assert str_to_bool(true_value) is True
@pytest.mark.parametrize(
"false_value",
[
pytest.param("false", id="lowercase_false"),
pytest.param("0", id="digit_0"),
pytest.param("f", id="capital_f"),
pytest.param("N", id="capital_N"),
pytest.param("no", id="lowercase_no"),
pytest.param(" False ", id="whitespace_false"),
],
)
def test_false_conversion(self, false_value: str):
"""Test that various 'false' strings correctly evaluate to False."""
assert str_to_bool(false_value) is False
def test_invalid_conversion(self):
"""Test that an invalid string raises a ValueError."""
with pytest.raises(ValueError, match="Cannot convert 'maybe' to a boolean\\."):
str_to_bool("maybe")
class TestParseDictFromString:
def test_empty_and_none_input(self):
"""Test behavior with None or empty string input."""
assert parse_dict_from_str(None) == {}
assert parse_dict_from_str("") == {}
defaults = {"a": 1}
res = parse_dict_from_str(None, defaults=defaults)
assert res == defaults
# Ensure it returns a copy, not the original object
assert res is not defaults
def test_basic_parsing(self):
"""Test simple key-value parsing without defaults or types."""
env_str = "key1=val1, key2=val2"
expected = {"key1": "val1", "key2": "val2"}
assert parse_dict_from_str(env_str) == expected
def test_with_defaults(self):
"""Test that environment values override defaults correctly."""
defaults = {"host": "localhost", "port": 8000, "user": "default"}
env_str = "port=9090, host=db.example.com"
expected = {"host": "db.example.com", "port": "9090", "user": "default"}
result = parse_dict_from_str(env_str, defaults=defaults)
assert result == expected
def test_type_casting(self):
"""Test successful casting of values to specified types."""
env_str = "port=9090, debug=true, timeout=12.5, user=admin"
type_map = {"port": int, "debug": bool, "timeout": float}
expected = {"port": 9090, "debug": True, "timeout": 12.5, "user": "admin"}
result = parse_dict_from_str(env_str, type_map=type_map)
assert result == expected
def test_type_casting_with_defaults(self):
"""Test casting when values come from both defaults and env string."""
defaults = {"port": 8000, "debug": False, "retries": 3}
env_str = "port=9090, debug=true"
type_map = {"port": int, "debug": bool, "retries": int}
# The 'retries' value comes from defaults and is already an int,
# so it should not be processed by the caster.
expected = {"port": 9090, "debug": True, "retries": 3}
result = parse_dict_from_str(env_str, defaults=defaults, type_map=type_map)
assert result == expected
assert isinstance(result["retries"], int)
def test_path_casting(self, tmp_path: Path):
"""Test successful casting of a string to a resolved pathlib.Path object."""
# Create a dummy file to resolve against
test_file = tmp_path / "test_file.txt"
test_file.touch()
env_str = f"config_path={test_file}"
type_map = {"config_path": Path}
result = parse_dict_from_str(env_str, type_map=type_map)
# The result should be a resolved Path object
assert isinstance(result["config_path"], Path)
assert result["config_path"] == test_file.resolve()
def test_custom_separator(self):
"""Test parsing with a custom separator like a semicolon."""
env_str = "host=db; port=5432; user=test"
expected = {"host": "db", "port": "5432", "user": "test"}
result = parse_dict_from_str(env_str, separator=";")
assert result == expected
def test_edge_cases_in_string(self):
"""Test malformed strings to ensure robustness."""
# Malformed pair 'debug' is skipped, extra comma is ignored
env_str = "key=val,, debug, foo=bar"
expected = {"key": "val", "foo": "bar"}
assert parse_dict_from_str(env_str) == expected
# Value can contain the equals sign
env_str = "url=postgres://user:pass@host:5432/db"
expected = {"url": "postgres://user:pass@host:5432/db"}
assert parse_dict_from_str(env_str) == expected
def test_casting_error_handling(self):
"""Test that a ValueError is raised for invalid casting."""
env_str = "port=not-a-number"
type_map = {"port": int}
with pytest.raises(ValueError) as excinfo:
parse_dict_from_str(env_str, type_map=type_map)
assert "Error casting key 'port'" in str(excinfo.value)
assert "value 'not-a-number'" in str(excinfo.value)
assert "to type 'int'" in str(excinfo.value)
def test_bool_casting_error(self):
"""Test that an invalid boolean string raises a ValueError."""
env_str = "debug=maybe"
type_map = {"debug": bool}
with pytest.raises(ValueError, match="Error casting key 'debug'"):
parse_dict_from_str(env_str, type_map=type_map)
def test_nested_key_parsing_basic(self):
"""Basic nested key parsing using dot-notation."""
env_str = "database.host=db.example.com, database.port=5432, logging.level=INFO"
result = parse_dict_from_str(env_str)
assert result == {
"database": {"host": "db.example.com", "port": "5432"},
"logging": {"level": "INFO"},
}
def test_nested_overrides_defaults_and_deepcopy(self):
"""Nested env keys override defaults and defaults are deep-copied."""
defaults = {"database": {"host": "127.0.0.1", "port": 3306, "user": "default"}}
env_str = "database.host=db.example.com, debug=true"
result = parse_dict_from_str(
env_str,
defaults=defaults,
type_map={"debug": bool},
)
assert result["database"]["host"] == "db.example.com"
# Unchanged default preserved
assert result["database"]["port"] == 3306
assert result["database"]["user"] == "default"
# Default object was deep-copied (no same nested object identity)
assert result is not defaults
assert result["database"] is not defaults["database"]
def test_nested_type_casting(self):
"""Type casting for nested keys (dot-notation) should work."""
env_str = "database.host=db.example.com, database.port=5433, debug=false"
type_map = {"database.port": int, "debug": bool}
result = parse_dict_from_str(env_str, type_map=type_map)
assert result["database"]["host"] == "db.example.com"
assert result["database"]["port"] == 5433
assert isinstance(result["database"]["port"], int)
assert result["debug"] is False
assert isinstance(result["debug"], bool)
def test_nested_casting_error_message(self):
"""Error messages should include the full dotted key name on failure."""
env_str = "database.port=not-a-number"
type_map = {"database.port": int}
with pytest.raises(ValueError) as excinfo:
parse_dict_from_str(env_str, type_map=type_map)
msg = str(excinfo.value)
assert "Error casting key 'database.port'" in msg
assert "value 'not-a-number'" in msg
assert "to type 'int'" in msg
def test_type_map_does_not_recast_non_string_defaults(self):
"""If a default already provides a non-string value, the caster should skip it."""
defaults = {"database": {"port": 3306}}
type_map = {"database.port": int}
result = parse_dict_from_str(None, defaults=defaults, type_map=type_map)
assert result["database"]["port"] == 3306
assert isinstance(result["database"]["port"], int)
class TestGetIntFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("42", 42, id="positive"),
pytest.param("-10", -10, id="negative"),
pytest.param("0", 0, id="zero"),
pytest.param("999", 999, id="large_positive"),
pytest.param("-999", -999, id="large_negative"),
],
)
def test_existing_env_var_valid_ints(self, mocker, env_value, expected):
"""Test that existing environment variables with valid integers return correct values."""
mocker.patch.dict(os.environ, {"INT_VAR": env_value})
assert get_int_from_env("INT_VAR") == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(100, 100, id="positive_default"),
pytest.param(0, 0, id="zero_default"),
pytest.param(-50, -50, id="negative_default"),
pytest.param(None, None, id="none_default"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_int_from_env("MISSING_VAR", default=default) == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_int_from_env("MISSING_VAR") is None
@pytest.mark.parametrize(
"invalid_value",
[
pytest.param("not_a_number", id="text"),
pytest.param("42.5", id="float"),
pytest.param("42a", id="alpha_suffix"),
pytest.param("", id="empty"),
pytest.param(" ", id="whitespace"),
pytest.param("true", id="boolean"),
pytest.param("1.0", id="decimal"),
],
)
def test_invalid_int_values_raise_error(self, mocker, invalid_value):
"""Test that invalid integer values raise ValueError."""
mocker.patch.dict(os.environ, {"INVALID_INT": invalid_value})
with pytest.raises(ValueError):
get_int_from_env("INVALID_INT")
class TestGetEnvChoice:
@pytest.fixture
def valid_choices(self) -> set[str]:
"""Fixture providing a set of valid environment choices."""
return {"development", "staging", "production"}
def test_returns_valid_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function returns the environment value when it's valid."""
mocker.patch.dict("os.environ", {"TEST_ENV": "development"})
result = get_choice_from_env("TEST_ENV", valid_choices)
assert result == "development"
def test_returns_default_when_env_not_set(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function returns default value when env var is not set."""
mocker.patch.dict("os.environ", {}, clear=True)
result = get_choice_from_env("TEST_ENV", valid_choices, default="staging")
assert result == "staging"
def test_raises_error_when_env_not_set_and_no_default(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when env var is missing and no default."""
mocker.patch.dict("os.environ", {}, clear=True)
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
assert "Environment variable 'TEST_ENV' is required but not set" in str(
exc_info.value,
)
def test_raises_error_when_env_value_invalid(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when env value is not in choices."""
mocker.patch.dict("os.environ", {"TEST_ENV": "invalid_value"})
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
error_msg = str(exc_info.value)
assert (
"Environment variable 'TEST_ENV' has invalid value 'invalid_value'"
in error_msg
)
assert "Valid choices are:" in error_msg
assert "development" in error_msg
assert "staging" in error_msg
assert "production" in error_msg
def test_raises_error_when_default_invalid(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when default value is not in choices."""
mocker.patch.dict("os.environ", {}, clear=True)
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices, default="invalid_default")
error_msg = str(exc_info.value)
assert (
"Environment variable 'TEST_ENV' has invalid value 'invalid_default'"
in error_msg
)
def test_case_sensitive_validation(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that validation is case sensitive."""
mocker.patch.dict("os.environ", {"TEST_ENV": "DEVELOPMENT"})
with pytest.raises(ValueError):
get_choice_from_env("TEST_ENV", valid_choices)
def test_empty_string_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test behavior with empty string environment value."""
mocker.patch.dict("os.environ", {"TEST_ENV": ""})
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
assert "has invalid value ''" in str(exc_info.value)
def test_whitespace_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test behavior with whitespace-only environment value."""
mocker.patch.dict("os.environ", {"TEST_ENV": " development "})
with pytest.raises(ValueError):
get_choice_from_env("TEST_ENV", valid_choices)
def test_single_choice_set(self, mocker: MockerFixture) -> None:
"""Test function works correctly with single choice set."""
single_choice: set[str] = {"production"}
mocker.patch.dict("os.environ", {"TEST_ENV": "production"})
result = get_choice_from_env("TEST_ENV", single_choice)
assert result == "production"
def test_large_choice_set(self, mocker: MockerFixture) -> None:
"""Test function works correctly with large choice set."""
large_choices: set[str] = {f"option_{i}" for i in range(100)}
mocker.patch.dict("os.environ", {"TEST_ENV": "option_50"})
result = get_choice_from_env("TEST_ENV", large_choices)
assert result == "option_50"
def test_different_env_keys(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test function works with different environment variable keys."""
test_cases = [
("DJANGO_ENV", "development"),
("DATABASE_BACKEND", "staging"),
("LOG_LEVEL", "production"),
("APP_MODE", "development"),
]
for env_key, env_value in test_cases:
mocker.patch.dict("os.environ", {env_key: env_value})
result = get_choice_from_env(env_key, valid_choices)
assert result == env_value

View File

@@ -78,15 +78,11 @@ class TestCustomAccountAdapter(TestCase):
adapter = get_adapter()
# Test when PAPERLESS_URL is None
with override_settings(
PAPERLESS_URL=None,
ACCOUNT_DEFAULT_HTTP_PROTOCOL="https",
):
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
# Test when PAPERLESS_URL is not None
with override_settings(PAPERLESS_URL="https://bar.com"):

View File

@@ -2,17 +2,13 @@ import os
from pathlib import Path
from unittest import mock
import pytest
from django.core.checks import Warning
from django.test import TestCase
from django.test import override_settings
from pytest_mock import MockerFixture
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless.checks import audit_log_check
from paperless.checks import binaries_check
from paperless.checks import check_deprecated_db_settings
from paperless.checks import debug_mode_check
from paperless.checks import paths_check
from paperless.checks import settings_values_check
@@ -241,157 +237,3 @@ class TestAuditLogChecks(TestCase):
("auditlog table was found but audit log is disabled."),
msg.msg,
)
DEPRECATED_VARS: dict[str, str] = {
"PAPERLESS_DB_TIMEOUT": "timeout",
"PAPERLESS_DB_POOLSIZE": "pool.min_size / pool.max_size",
"PAPERLESS_DBSSLMODE": "sslmode",
"PAPERLESS_DBSSLROOTCERT": "sslrootcert",
"PAPERLESS_DBSSLCERT": "sslcert",
"PAPERLESS_DBSSLKEY": "sslkey",
}
class TestDeprecatedDbSettings:
"""Test suite for the check_deprecated_db_settings system check."""
def test_no_deprecated_vars_returns_empty(
self,
mocker: MockerFixture,
) -> None:
"""No warnings when none of the deprecated vars are present."""
# clear=True ensures vars from the outer test environment do not leak in
mocker.patch.dict(os.environ, {}, clear=True)
result = check_deprecated_db_settings(None)
assert result == []
@pytest.mark.parametrize(
("env_var", "db_option_key"),
[
("PAPERLESS_DB_TIMEOUT", "timeout"),
("PAPERLESS_DB_POOLSIZE", "pool.min_size / pool.max_size"),
("PAPERLESS_DBSSLMODE", "sslmode"),
("PAPERLESS_DBSSLROOTCERT", "sslrootcert"),
("PAPERLESS_DBSSLCERT", "sslcert"),
("PAPERLESS_DBSSLKEY", "sslkey"),
],
ids=[
"db-timeout",
"db-poolsize",
"ssl-mode",
"ssl-rootcert",
"ssl-cert",
"ssl-key",
],
)
def test_single_deprecated_var_produces_one_warning(
self,
mocker: MockerFixture,
env_var: str,
db_option_key: str,
) -> None:
"""Each deprecated var in isolation produces exactly one warning."""
mocker.patch.dict(os.environ, {env_var: "some_value"}, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == 1
warning = result[0]
assert isinstance(warning, Warning)
assert warning.id == "paperless.W001"
assert env_var in warning.hint
assert db_option_key in warning.hint
def test_multiple_deprecated_vars_produce_one_warning_each(
self,
mocker: MockerFixture,
) -> None:
"""Each deprecated var present in the environment gets its own warning."""
set_vars = {
"PAPERLESS_DB_TIMEOUT": "30",
"PAPERLESS_DB_POOLSIZE": "10",
"PAPERLESS_DBSSLMODE": "require",
}
mocker.patch.dict(os.environ, set_vars, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == len(set_vars)
assert all(isinstance(w, Warning) for w in result)
assert all(w.id == "paperless.W001" for w in result)
all_hints = " ".join(w.hint for w in result)
for var_name in set_vars:
assert var_name in all_hints
def test_all_deprecated_vars_produces_one_warning_each(
self,
mocker: MockerFixture,
) -> None:
"""All deprecated vars set simultaneously produces one warning per var."""
all_vars = dict.fromkeys(DEPRECATED_VARS, "some_value")
mocker.patch.dict(os.environ, all_vars, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == len(DEPRECATED_VARS)
assert all(isinstance(w, Warning) for w in result)
assert all(w.id == "paperless.W001" for w in result)
def test_unset_vars_not_mentioned_in_warnings(
self,
mocker: MockerFixture,
) -> None:
"""Vars absent from the environment do not appear in any warning."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DB_TIMEOUT": "30"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DB_TIMEOUT" in result[0].hint
unset_vars = [v for v in DEPRECATED_VARS if v != "PAPERLESS_DB_TIMEOUT"]
for var_name in unset_vars:
assert var_name not in result[0].hint
def test_empty_string_var_not_treated_as_set(
self,
mocker: MockerFixture,
) -> None:
"""A var set to an empty string is not flagged as a deprecated setting."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DB_TIMEOUT": ""},
clear=True,
)
result = check_deprecated_db_settings(None)
assert result == []
def test_warning_mentions_migration_target(
self,
mocker: MockerFixture,
) -> None:
"""Each warning hints at PAPERLESS_DB_OPTIONS as the migration target."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DBSSLMODE": "require"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DB_OPTIONS" in result[0].hint
def test_warning_message_identifies_var(
self,
mocker: MockerFixture,
) -> None:
"""The warning message (not just the hint) identifies the offending var."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DBSSLCERT": "/path/to/cert.pem"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DBSSLCERT" in result[0].msg

View File

@@ -9,6 +9,7 @@ from celery.schedules import crontab
from paperless.settings import _parse_base_paths
from paperless.settings import _parse_beat_schedule
from paperless.settings import _parse_dateparser_languages
from paperless.settings import _parse_db_settings
from paperless.settings import _parse_ignore_dates
from paperless.settings import _parse_paperless_url
from paperless.settings import _parse_redis_url
@@ -377,6 +378,64 @@ class TestCeleryScheduleParsing(TestCase):
)
class TestDBSettings(TestCase):
def test_db_timeout_with_sqlite(self) -> None:
"""
GIVEN:
- PAPERLESS_DB_TIMEOUT is set
WHEN:
- Settings are parsed
THEN:
- PAPERLESS_DB_TIMEOUT set for sqlite
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_DB_TIMEOUT": "10",
},
):
databases = _parse_db_settings()
self.assertDictEqual(
{
"timeout": 10.0,
},
databases["default"]["OPTIONS"],
)
def test_db_timeout_with_not_sqlite(self) -> None:
"""
GIVEN:
- PAPERLESS_DB_TIMEOUT is set but db is not sqlite
WHEN:
- Settings are parsed
THEN:
- PAPERLESS_DB_TIMEOUT set correctly in non-sqlite db & for fallback sqlite db
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_DBHOST": "127.0.0.1",
"PAPERLESS_DB_TIMEOUT": "10",
},
):
databases = _parse_db_settings()
self.assertDictEqual(
databases["default"]["OPTIONS"],
databases["default"]["OPTIONS"]
| {
"connect_timeout": 10.0,
},
)
self.assertDictEqual(
{
"timeout": 10.0,
},
databases["sqlite"]["OPTIONS"],
)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""

View File

@@ -1,7 +1,7 @@
import tempfile
from pathlib import Path
from django.test import override_settings
from django.conf import settings
def test_favicon_view(client):
@@ -11,14 +11,15 @@ def test_favicon_view(client):
favicon_path.parent.mkdir(parents=True, exist_ok=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
with override_settings(STATIC_ROOT=static_dir):
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
settings.STATIC_ROOT = static_dir
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
def test_favicon_view_missing_file(client):
with override_settings(STATIC_ROOT=Path(tempfile.mkdtemp())):
response = client.get("/favicon.ico")
assert response.status_code == 404
settings.STATIC_ROOT = Path(tempfile.mkdtemp())
response = client.get("/favicon.ico")
assert response.status_code == 404

View File

@@ -5,7 +5,6 @@ from pathlib import Path
from bleach import clean
from bleach import linkify
from django.conf import settings
from django.utils import timezone
from django.utils.timezone import is_naive
from django.utils.timezone import make_aware
from gotenberg_client import GotenbergClient
@@ -333,9 +332,7 @@ class MailDocumentParser(DocumentParser):
if data["attachments"]:
data["attachments_label"] = "Attachments"
data["date"] = clean_html(
timezone.localtime(mail.date).strftime("%Y-%m-%d %H:%M"),
)
data["date"] = clean_html(mail.date.astimezone().strftime("%Y-%m-%d %H:%M"))
data["content"] = clean_html(mail.text.strip())
from django.template.loader import render_to_string

View File

@@ -6,7 +6,6 @@ from unittest import mock
import httpx
import pytest
from django.test.html import parse_html
from django.utils import timezone
from pytest_django.fixtures import SettingsWrapper
from pytest_httpx import HTTPXMock
from pytest_mock import MockerFixture
@@ -635,14 +634,13 @@ class TestParser:
THEN:
- Resulting HTML is as expected
"""
with timezone.override("UTC"):
mail = mail_parser.parse_file_to_message(html_email_file)
html_file = mail_parser.mail_to_html(mail)
mail = mail_parser.parse_file_to_message(html_email_file)
html_file = mail_parser.mail_to_html(mail)
expected_html = parse_html(html_email_html_file.read_text())
actual_html = parse_html(html_file.read_text())
expected_html = parse_html(html_email_html_file.read_text())
actual_html = parse_html(html_file.read_text())
assert expected_html == actual_html
assert expected_html == actual_html
def test_generate_pdf_from_mail(
self,

View File

@@ -1,6 +1,5 @@
import shutil
import tempfile
import unicodedata
import uuid
from pathlib import Path
from unittest import mock
@@ -848,18 +847,8 @@ class TestParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
"application/pdf",
)
# OCR output for RTL text varies across platforms/versions due to
# bidi controls and presentation forms; normalize before assertion.
normalized_text = "".join(
char
for char in unicodedata.normalize("NFKC", parser.get_text())
if unicodedata.category(char) != "Cf" and not char.isspace()
)
self.assertIn("ةرازو", normalized_text)
self.assertTrue(
any(token in normalized_text for token in ("ةیلخادلا", "الاخليد")),
)
# Copied from the PDF to here. Don't even look at it
self.assertIn("ةﯾﻠﺧﺎدﻻ ةرازو", parser.get_text())
@mock.patch("ocrmypdf.ocr")
def test_gs_rendering_error(self, m) -> None:

View File

@@ -18,10 +18,7 @@ nav = [
"setup.md",
"usage.md",
"configuration.md",
{ Administration = [
"administration.md",
{ "v3 Migration Guide" = "migration-v3.md" },
] },
"administration.md",
"advanced_usage.md",
"api.md",
"development.md",