Compare commits

..

6 Commits

Author SHA1 Message Date
shamoon
24a2cfd957 Change: use explicit doc creation instead of clone for versions (#12226) 2026-03-04 15:57:44 -08:00
GitHub Actions
7cf2ef6398 Auto translate strings 2026-03-04 23:29:54 +00:00
shamoon
df03207eef Fix: correct doc version filename handling (#12223) 2026-03-04 23:28:07 +00:00
dependabot[bot]
fa998ecd49 Bump django from 5.2.11 to 5.2.12 (#12249)
Bumps [django](https://github.com/django/django) from 5.2.11 to 5.2.12.
- [Commits](https://github.com/django/django/compare/5.2.11...5.2.12)

---
updated-dependencies:
- dependency-name: django
  dependency-version: 5.2.12
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-04 15:16:25 -08:00
Trenton H
1e21bcd26e Breaking: Drop support for Python 3.10 (#12234) 2026-03-04 15:03:33 -08:00
Trenton H
a9cb89c633 Enhancement: Improve exporter memory efficiency (#12236)
Phase 1 -- Eliminate JSON round-trip in document exporter

Replace json.loads(serializers.serialize("json", qs)) with
serializers.serialize("python", qs) to skip the intermediate
JSON string allocation and parse step. Use DjangoJSONEncoder
in check_and_write_json() to handle native Python types
(datetime, Decimal, UUID) the Python serializer returns.

Phase 2 -- Batched QuerySet serialization in document exporter

Add serialize_queryset_batched() helper that uses QuerySet.iterator()
and itertools.islice to stream records in configurable chunks, bounding
peak memory during serialization to batch_size * avg_record_size rather
than loading the entire QuerySet at once.
2026-03-04 14:54:20 -08:00
35 changed files with 1646 additions and 2149 deletions

View File

@@ -14,10 +14,6 @@ component_management:
# https://docs.codecov.com/docs/carryforward-flags # https://docs.codecov.com/docs/carryforward-flags
flags: flags:
# Backend Python versions # Backend Python versions
backend-python-3.10:
paths:
- src/**
carryforward: true
backend-python-3.11: backend-python-3.11:
paths: paths:
- src/** - src/**
@@ -26,6 +22,14 @@ flags:
paths: paths:
- src/** - src/**
carryforward: true carryforward: true
backend-python-3.13:
paths:
- src/**
carryforward: true
backend-python-3.14:
paths:
- src/**
carryforward: true
# Frontend (shards merge into single flag) # Frontend (shards merge into single flag)
frontend-node-24.x: frontend-node-24.x:
paths: paths:
@@ -41,9 +45,10 @@ coverage:
project: project:
backend: backend:
flags: flags:
- backend-python-3.10
- backend-python-3.11 - backend-python-3.11
- backend-python-3.12 - backend-python-3.12
- backend-python-3.13
- backend-python-3.14
paths: paths:
- src/** - src/**
# https://docs.codecov.com/docs/commit-status#threshold # https://docs.codecov.com/docs/commit-status#threshold
@@ -59,9 +64,10 @@ coverage:
patch: patch:
backend: backend:
flags: flags:
- backend-python-3.10
- backend-python-3.11 - backend-python-3.11
- backend-python-3.12 - backend-python-3.12
- backend-python-3.13
- backend-python-3.14
paths: paths:
- src/** - src/**
target: 100% target: 100%

View File

@@ -31,7 +31,7 @@ jobs:
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
strategy: strategy:
matrix: matrix:
python-version: ['3.10', '3.11', '3.12'] python-version: ['3.11', '3.12', '3.13', '3.14']
fail-fast: false fail-fast: false
steps: steps:
- name: Checkout - name: Checkout

View File

@@ -13,7 +13,9 @@ If you want to implement something big:
## Python ## Python
Paperless supports python 3.10 - 3.12 at this time. We format Python code with [ruff](https://docs.astral.sh/ruff/formatter/). Paperless-ngx currently supports Python 3.11, 3.12, 3.13, and 3.14. As a policy, we aim to support at least the three most recent Python versions, and drop support for versions as they reach end-of-life. Older versions may be supported if dependencies permit, but this is not guaranteed.
We format Python code with [ruff](https://docs.astral.sh/ruff/formatter/).
## Branches ## Branches

View File

@@ -262,6 +262,10 @@ your files differently, you can do that by adjusting the
or using [storage paths (see below)](#storage-paths). Paperless adds the or using [storage paths (see below)](#storage-paths). Paperless adds the
correct file extension e.g. `.pdf`, `.jpg` automatically. correct file extension e.g. `.pdf`, `.jpg` automatically.
When a document has file versions, each version uses the same naming rules and
storage path resolution as any other document file, with an added version suffix
such as `_v1`, `_v2`, etc.
This variable allows you to configure the filename (folders are allowed) This variable allows you to configure the filename (folders are allowed)
using placeholders. For example, configuring this to using placeholders. For example, configuring this to
@@ -353,6 +357,8 @@ If paperless detects that two documents share the same filename,
paperless will automatically append `_01`, `_02`, etc to the filename. paperless will automatically append `_01`, `_02`, etc to the filename.
This happens if all the placeholders in a filename evaluate to the same This happens if all the placeholders in a filename evaluate to the same
value. value.
For versioned files, this counter is appended after the version suffix
(for example `statement_v2_01.pdf`).
If there are any errors in the placeholders included in `PAPERLESS_FILENAME_FORMAT`, If there are any errors in the placeholders included in `PAPERLESS_FILENAME_FORMAT`,
paperless will fall back to using the default naming scheme instead. paperless will fall back to using the default naming scheme instead.

View File

@@ -172,7 +172,7 @@ to enable polling and disable inotify. See [here](configuration.md#polling).
#### Prerequisites #### Prerequisites
- Paperless runs on Linux only, Windows is not supported. - Paperless runs on Linux only, Windows is not supported.
- Python 3 is required with versions 3.10 - 3.12 currently supported. Newer versions may work, but some dependencies may not be fully compatible. - Python 3.11, 3.12, 3.13, or 3.14 is required. As a policy, Paperless-ngx aims to support at least the three most recent Python versions and drops support for versions as they reach end-of-life. Newer versions may work, but some dependencies may not be fully compatible.
#### Installation #### Installation

View File

@@ -95,6 +95,7 @@ Think of versions as **file history** for a document.
- Versions track the underlying file and extracted text content (OCR/text). - Versions track the underlying file and extracted text content (OCR/text).
- Metadata such as tags, correspondent, document type, storage path and custom fields stay on the "root" document. - Metadata such as tags, correspondent, document type, storage path and custom fields stay on the "root" document.
- Version files follow normal filename formatting (including storage paths) and add a `_vN` suffix (for example `_v1`, `_v2`).
- By default, search and document content use the latest version. - By default, search and document content use the latest version.
- In document detail, selecting a version switches the preview, file metadata and content (and download etc buttons) to that version. - In document detail, selecting a version switches the preview, file metadata and content (and download etc buttons) to that version.
- Deleting a non-root version keeps metadata and falls back to the latest remaining version. - Deleting a non-root version keeps metadata and falls back to the latest remaining version.

View File

@@ -3,10 +3,9 @@ name = "paperless-ngx"
version = "2.20.10" version = "2.20.10"
description = "A community-supported supercharged document management system: scan, index and archive all your physical documents" description = "A community-supported supercharged document management system: scan, index and archive all your physical documents"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.11"
classifiers = [ classifiers = [
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.13",
@@ -177,7 +176,7 @@ torch = [
] ]
[tool.ruff] [tool.ruff]
target-version = "py310" target-version = "py311"
line-length = 88 line-length = 88
src = [ src = [
"src", "src",

View File

@@ -1,5 +1,5 @@
from datetime import UTC
from datetime import datetime from datetime import datetime
from datetime import timezone
from typing import Any from typing import Any
from django.conf import settings from django.conf import settings
@@ -139,7 +139,7 @@ def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
# No cache, get the timestamp and cache the datetime # No cache, get the timestamp and cache the datetime
last_modified = datetime.fromtimestamp( last_modified = datetime.fromtimestamp(
doc.thumbnail_path.stat().st_mtime, doc.thumbnail_path.stat().st_mtime,
tz=timezone.utc, tz=UTC,
) )
cache.set(doc_key, last_modified, CACHE_50_MINUTES) cache.set(doc_key, last_modified, CACHE_50_MINUTES)
return last_modified return last_modified

View File

@@ -2,7 +2,7 @@ import datetime
import hashlib import hashlib
import os import os
import tempfile import tempfile
from enum import Enum from enum import StrEnum
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Final from typing import Final
@@ -11,6 +11,7 @@ import magic
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.db import transaction from django.db import transaction
from django.db.models import Max
from django.db.models import Q from django.db.models import Q
from django.utils import timezone from django.utils import timezone
from filelock import FileLock from filelock import FileLock
@@ -82,7 +83,7 @@ class ConsumerError(Exception):
pass pass
class ConsumerStatusShortMessage(str, Enum): class ConsumerStatusShortMessage(StrEnum):
DOCUMENT_ALREADY_EXISTS = "document_already_exists" DOCUMENT_ALREADY_EXISTS = "document_already_exists"
DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash" DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash"
ASN_ALREADY_EXISTS = "asn_already_exists" ASN_ALREADY_EXISTS = "asn_already_exists"
@@ -124,22 +125,6 @@ class ConsumerPluginMixin:
self.filename = self.metadata.filename or self.input_doc.original_file.name self.filename = self.metadata.filename or self.input_doc.original_file.name
if input_doc.root_document_id:
self.log.debug(
f"Document root document id: {input_doc.root_document_id}",
)
root_document = Document.objects.get(pk=input_doc.root_document_id)
version_index = Document.objects.filter(root_document=root_document).count()
filename_path = Path(self.filename)
if filename_path.suffix:
self.filename = str(
filename_path.with_name(
f"{filename_path.stem}_v{version_index}{filename_path.suffix}",
),
)
else:
self.filename = f"{self.filename}_v{version_index}"
def _send_progress( def _send_progress(
self, self,
current_progress: int, current_progress: int,
@@ -185,7 +170,7 @@ class ConsumerPlugin(
): ):
logging_name = LOGGING_NAME logging_name = LOGGING_NAME
def _clone_root_into_version( def _create_version_from_root(
self, self,
root_doc: Document, root_doc: Document,
*, *,
@@ -194,30 +179,38 @@ class ConsumerPlugin(
mime_type: str, mime_type: str,
) -> Document: ) -> Document:
self.log.debug("Saving record for updated version to database") self.log.debug("Saving record for updated version to database")
version_doc = Document.objects.get(pk=root_doc.pk) root_doc_frozen = Document.objects.select_for_update().get(pk=root_doc.pk)
setattr(version_doc, "pk", None) next_version_index = (
version_doc.root_document = root_doc Document.global_objects.filter(
root_document_id=root_doc_frozen.pk,
).aggregate(
max_index=Max("version_index"),
)["max_index"]
or 0
)
file_for_checksum = ( file_for_checksum = (
self.unmodified_original self.unmodified_original
if self.unmodified_original is not None if self.unmodified_original is not None
else self.working_copy else self.working_copy
) )
version_doc.checksum = hashlib.md5( version_doc = Document(
root_document=root_doc_frozen,
version_index=next_version_index + 1,
checksum=hashlib.md5(
file_for_checksum.read_bytes(), file_for_checksum.read_bytes(),
).hexdigest() ).hexdigest(),
version_doc.content = text or "" content=text or "",
version_doc.page_count = page_count page_count=page_count,
version_doc.mime_type = mime_type mime_type=mime_type,
version_doc.original_filename = self.filename original_filename=self.filename,
version_doc.storage_path = root_doc.storage_path owner_id=root_doc_frozen.owner_id,
# Clear unique file path fields so they can be generated uniquely later created=root_doc_frozen.created,
version_doc.filename = None title=root_doc_frozen.title,
version_doc.archive_filename = None added=timezone.now(),
version_doc.archive_checksum = None modified=timezone.now(),
)
if self.metadata.version_label is not None: if self.metadata.version_label is not None:
version_doc.version_label = self.metadata.version_label version_doc.version_label = self.metadata.version_label
version_doc.added = timezone.now()
version_doc.modified = timezone.now()
return version_doc return version_doc
def run_pre_consume_script(self) -> None: def run_pre_consume_script(self) -> None:
@@ -543,7 +536,7 @@ class ConsumerPlugin(
root_doc = Document.objects.get( root_doc = Document.objects.get(
pk=self.input_doc.root_document_id, pk=self.input_doc.root_document_id,
) )
original_document = self._clone_root_into_version( original_document = self._create_version_from_root(
root_doc, root_doc,
text=text, text=text,
page_count=page_count, page_count=page_count,

View File

@@ -129,12 +129,19 @@ def generate_filename(
archive_filename=False, archive_filename=False,
use_format=True, use_format=True,
) -> Path: ) -> Path:
# version docs use the root document for formatting, just with a suffix
context_doc = doc if doc.root_document_id is None else doc.root_document
version_suffix = (
f"_v{doc.version_index}"
if doc.root_document_id is not None and doc.version_index is not None
else ""
)
base_path: Path | None = None base_path: Path | None = None
# Determine the source of the format string # Determine the source of the format string
if use_format: if use_format:
if doc.storage_path is not None: if context_doc.storage_path is not None:
filename_format = doc.storage_path.path filename_format = context_doc.storage_path.path
elif settings.FILENAME_FORMAT is not None: elif settings.FILENAME_FORMAT is not None:
# Maybe convert old to new style # Maybe convert old to new style
filename_format = convert_format_str_to_template_format( filename_format = convert_format_str_to_template_format(
@@ -147,7 +154,7 @@ def generate_filename(
# If we have one, render it # If we have one, render it
if filename_format is not None: if filename_format is not None:
rendered_path: str | None = format_filename(doc, filename_format) rendered_path: str | None = format_filename(context_doc, filename_format)
if rendered_path: if rendered_path:
base_path = Path(rendered_path) base_path = Path(rendered_path)
@@ -161,7 +168,7 @@ def generate_filename(
base_filename = base_path.name base_filename = base_path.name
# Build the final filename with counter and filetype # Build the final filename with counter and filetype
final_filename = f"{base_filename}{counter_str}{filetype_str}" final_filename = f"{base_filename}{version_suffix}{counter_str}{filetype_str}"
# If we have a directory component, include it # If we have a directory component, include it
if str(directory) != ".": if str(directory) != ".":
@@ -170,7 +177,9 @@ def generate_filename(
full_path = Path(final_filename) full_path = Path(final_filename)
else: else:
# No template, use document ID # No template, use document ID
final_filename = f"{doc.pk:07}{counter_str}{filetype_str}" final_filename = (
f"{context_doc.pk:07}{version_suffix}{counter_str}{filetype_str}"
)
full_path = Path(final_filename) full_path = Path(final_filename)
return full_path return full_path

View File

@@ -5,10 +5,10 @@ import math
import re import re
from collections import Counter from collections import Counter
from contextlib import contextmanager from contextlib import contextmanager
from datetime import UTC
from datetime import datetime from datetime import datetime
from datetime import time from datetime import time
from datetime import timedelta from datetime import timedelta
from datetime import timezone
from shutil import rmtree from shutil import rmtree
from time import sleep from time import sleep
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -437,7 +437,7 @@ class ManualResults:
class LocalDateParser(English): class LocalDateParser(English):
def reverse_timezone_offset(self, d): def reverse_timezone_offset(self, d):
return (d.replace(tzinfo=django_timezone.get_current_timezone())).astimezone( return (d.replace(tzinfo=django_timezone.get_current_timezone())).astimezone(
timezone.utc, UTC,
) )
def date_from(self, *args, **kwargs): def date_from(self, *args, **kwargs):
@@ -641,8 +641,8 @@ def rewrite_natural_date_keywords(query_string: str) -> str:
end = datetime(local_now.year - 1, 12, 31, 23, 59, 59, tzinfo=tz) end = datetime(local_now.year - 1, 12, 31, 23, 59, 59, tzinfo=tz)
# Convert to UTC and format # Convert to UTC and format
start_str = start.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") start_str = start.astimezone(UTC).strftime("%Y%m%d%H%M%S")
end_str = end.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") end_str = end.astimezone(UTC).strftime("%Y%m%d%H%M%S")
return f"{field}:[{start_str} TO {end_str}]" return f"{field}:[{start_str} TO {end_str}]"
return re.sub(pattern, repl, query_string, flags=re.IGNORECASE) return re.sub(pattern, repl, query_string, flags=re.IGNORECASE)

View File

@@ -3,6 +3,8 @@ import json
import os import os
import shutil import shutil
import tempfile import tempfile
from itertools import chain
from itertools import islice
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -19,6 +21,7 @@ from django.contrib.contenttypes.models import ContentType
from django.core import serializers from django.core import serializers
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.core.management.base import CommandError from django.core.management.base import CommandError
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction from django.db import transaction
from django.utils import timezone from django.utils import timezone
from filelock import FileLock from filelock import FileLock
@@ -26,6 +29,8 @@ from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission from guardian.models import UserObjectPermission
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Generator
from django.db.models import QuerySet from django.db.models import QuerySet
if settings.AUDIT_LOG_ENABLED: if settings.AUDIT_LOG_ENABLED:
@@ -60,6 +65,22 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule from paperless_mail.models import MailRule
def serialize_queryset_batched(
queryset: "QuerySet",
*,
batch_size: int = 500,
) -> "Generator[list[dict], None, None]":
"""Yield batches of serialized records from a QuerySet.
Each batch is a list of dicts in Django's Python serialization format.
Uses QuerySet.iterator() to avoid loading the full queryset into memory,
and islice to collect chunk-sized batches serialized in a single call.
"""
iterator = queryset.iterator(chunk_size=batch_size)
while chunk := list(islice(iterator, batch_size)):
yield serializers.serialize("python", chunk)
class Command(CryptMixin, BaseCommand): class Command(CryptMixin, BaseCommand):
help = ( help = (
"Decrypt and rename all files in our collection into a given target " "Decrypt and rename all files in our collection into a given target "
@@ -186,6 +207,17 @@ class Command(CryptMixin, BaseCommand):
help="If provided, is used to encrypt sensitive data in the export", help="If provided, is used to encrypt sensitive data in the export",
) )
parser.add_argument(
"--batch-size",
type=int,
default=500,
help=(
"Number of records to process per batch during serialization. "
"Lower values reduce peak memory usage; higher values improve "
"throughput. Default: 500."
),
)
def handle(self, *args, **options) -> None: def handle(self, *args, **options) -> None:
self.target = Path(options["target"]).resolve() self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"] self.split_manifest: bool = options["split_manifest"]
@@ -200,6 +232,7 @@ class Command(CryptMixin, BaseCommand):
self.data_only: bool = options["data_only"] self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"] self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: str | None = options.get("passphrase") self.passphrase: str | None = options.get("passphrase")
self.batch_size: int = options["batch_size"]
self.files_in_export_dir: set[Path] = set() self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set() self.exported_files: set[str] = set()
@@ -294,8 +327,13 @@ class Command(CryptMixin, BaseCommand):
# Build an overall manifest # Build an overall manifest
for key, object_query in manifest_key_to_object_query.items(): for key, object_query in manifest_key_to_object_query.items():
manifest_dict[key] = json.loads( manifest_dict[key] = list(
serializers.serialize("json", object_query), chain.from_iterable(
serialize_queryset_batched(
object_query,
batch_size=self.batch_size,
),
),
) )
self.encrypt_secret_fields(manifest_dict) self.encrypt_secret_fields(manifest_dict)
@@ -512,14 +550,24 @@ class Command(CryptMixin, BaseCommand):
self.files_in_export_dir.remove(target) self.files_in_export_dir.remove(target)
if self.compare_json: if self.compare_json:
target_checksum = hashlib.md5(target.read_bytes()).hexdigest() target_checksum = hashlib.md5(target.read_bytes()).hexdigest()
src_str = json.dumps(content, indent=2, ensure_ascii=False) src_str = json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
)
src_checksum = hashlib.md5(src_str.encode("utf-8")).hexdigest() src_checksum = hashlib.md5(src_str.encode("utf-8")).hexdigest()
if src_checksum == target_checksum: if src_checksum == target_checksum:
perform_write = False perform_write = False
if perform_write: if perform_write:
target.write_text( target.write_text(
json.dumps(content, indent=2, ensure_ascii=False), json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
),
encoding="utf-8", encoding="utf-8",
) )

View File

@@ -0,0 +1,37 @@
# Generated by Django 5.2.11 on 2026-03-02 17:48
from django.conf import settings
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0015_savedview_visibility_to_ui_settings"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name="document",
name="version_index",
field=models.PositiveIntegerField(
blank=True,
db_index=True,
help_text="Index of this version within the root document.",
null=True,
verbose_name="version index",
),
),
migrations.AddConstraint(
model_name="document",
constraint=models.UniqueConstraint(
condition=models.Q(
("root_document__isnull", False),
("version_index__isnull", False),
),
fields=("root_document", "version_index"),
name="documents_document_root_version_index_uniq",
),
),
]

View File

@@ -319,6 +319,14 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
verbose_name=_("root document for this version"), verbose_name=_("root document for this version"),
) )
version_index = models.PositiveIntegerField(
_("version index"),
blank=True,
null=True,
db_index=True,
help_text=_("Index of this version within the root document."),
)
version_label = models.CharField( version_label = models.CharField(
_("version label"), _("version label"),
max_length=64, max_length=64,
@@ -331,6 +339,16 @@ class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-
ordering = ("-created",) ordering = ("-created",)
verbose_name = _("document") verbose_name = _("document")
verbose_name_plural = _("documents") verbose_name_plural = _("documents")
constraints = [
models.UniqueConstraint(
fields=["root_document", "version_index"],
condition=models.Q(
root_document__isnull=False,
version_index__isnull=False,
),
name="documents_document_root_version_index_uniq",
),
]
def __str__(self) -> str: def __str__(self) -> str:
created = self.created.isoformat() created = self.created.isoformat()

View File

@@ -5,11 +5,7 @@ from abc import abstractmethod
from collections.abc import Iterator from collections.abc import Iterator
from dataclasses import dataclass from dataclasses import dataclass
from types import TracebackType from types import TracebackType
from typing import Self
try:
from typing import Self
except ImportError:
from typing_extensions import Self
import dateparser import dateparser

View File

@@ -9,7 +9,7 @@ if TYPE_CHECKING:
from channels_redis.pubsub import RedisPubSubChannelLayer from channels_redis.pubsub import RedisPubSubChannelLayer
class ProgressStatusOptions(str, enum.Enum): class ProgressStatusOptions(enum.StrEnum):
STARTED = "STARTED" STARTED = "STARTED"
WORKING = "WORKING" WORKING = "WORKING"
SUCCESS = "SUCCESS" SUCCESS = "SUCCESS"

View File

@@ -620,6 +620,16 @@ def update_filename_and_move_files(
root=settings.ARCHIVE_DIR, root=settings.ARCHIVE_DIR,
) )
# Keep version files in sync with root
if instance.root_document_id is None:
for version_doc in Document.objects.filter(root_document_id=instance.pk).only(
"pk",
):
update_filename_and_move_files(
Document,
version_doc,
)
@shared_task @shared_task
def process_cf_select_update(custom_field: CustomField) -> None: def process_cf_select_update(custom_field: CustomField) -> None:

View File

@@ -24,7 +24,7 @@ def base_config() -> DateParserConfig:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="YMD", filename_date_order="YMD",
content_date_order="DMY", content_date_order="DMY",
@@ -45,7 +45,7 @@ def config_with_ignore_dates() -> DateParserConfig:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="DMY", filename_date_order="DMY",
content_date_order="MDY", content_date_order="MDY",

View File

@@ -101,50 +101,50 @@ class TestFilterDate:
[ [
# Valid Dates # Valid Dates
pytest.param( pytest.param(
datetime.datetime(2024, 1, 10, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 10, tzinfo=datetime.UTC),
datetime.datetime(2024, 1, 10, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 10, tzinfo=datetime.UTC),
id="valid_past_date", id="valid_past_date",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC),
datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC),
id="exactly_at_reference", id="exactly_at_reference",
), ),
pytest.param( pytest.param(
datetime.datetime(1901, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1901, 1, 1, tzinfo=datetime.UTC),
datetime.datetime(1901, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1901, 1, 1, tzinfo=datetime.UTC),
id="year_1901_valid", id="year_1901_valid",
), ),
# Date is > reference_time # Date is > reference_time
pytest.param( pytest.param(
datetime.datetime(2024, 1, 16, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 16, tzinfo=datetime.UTC),
None, None,
id="future_date_day_after", id="future_date_day_after",
), ),
# date.date() in ignore_dates # date.date() in ignore_dates
pytest.param( pytest.param(
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_midnight_jan1", id="ignored_date_midnight_jan1",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 1, 1, 10, 30, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 1, 1, 10, 30, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_midday_jan1", id="ignored_date_midday_jan1",
), ),
pytest.param( pytest.param(
datetime.datetime(2024, 12, 25, 15, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 12, 25, 15, 0, 0, tzinfo=datetime.UTC),
None, None,
id="ignored_date_dec25_future", id="ignored_date_dec25_future",
), ),
# date.year <= 1900 # date.year <= 1900
pytest.param( pytest.param(
datetime.datetime(1899, 12, 31, tzinfo=datetime.timezone.utc), datetime.datetime(1899, 12, 31, tzinfo=datetime.UTC),
None, None,
id="year_1899", id="year_1899",
), ),
pytest.param( pytest.param(
datetime.datetime(1900, 1, 1, tzinfo=datetime.timezone.utc), datetime.datetime(1900, 1, 1, tzinfo=datetime.UTC),
None, None,
id="year_1900_boundary", id="year_1900_boundary",
), ),
@@ -176,7 +176,7 @@ class TestFilterDate:
1, 1,
12, 12,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
another_ignored = datetime.datetime( another_ignored = datetime.datetime(
2024, 2024,
@@ -184,7 +184,7 @@ class TestFilterDate:
25, 25,
15, 15,
30, 30,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
allowed_date = datetime.datetime( allowed_date = datetime.datetime(
2024, 2024,
@@ -192,7 +192,7 @@ class TestFilterDate:
2, 2,
12, 12,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
assert parser._filter_date(ignored_date) is None assert parser._filter_date(ignored_date) is None
@@ -204,7 +204,7 @@ class TestFilterDate:
regex_parser: RegexDateParserPlugin, regex_parser: RegexDateParserPlugin,
) -> None: ) -> None:
"""Should work with timezone-aware datetimes.""" """Should work with timezone-aware datetimes."""
date_utc = datetime.datetime(2024, 1, 10, 12, 0, tzinfo=datetime.timezone.utc) date_utc = datetime.datetime(2024, 1, 10, 12, 0, tzinfo=datetime.UTC)
result = regex_parser._filter_date(date_utc) result = regex_parser._filter_date(date_utc)
@@ -221,8 +221,8 @@ class TestRegexDateParser:
"report-2023-12-25.txt", "report-2023-12-25.txt",
"Event recorded on 25/12/2022.", "Event recorded on 25/12/2022.",
[ [
datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC),
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
], ],
id="filename-y-m-d_and_content-d-m-y", id="filename-y-m-d_and_content-d-m-y",
), ),
@@ -230,8 +230,8 @@ class TestRegexDateParser:
"img_2023.01.02.jpg", "img_2023.01.02.jpg",
"Taken on 01/02/2023", "Taken on 01/02/2023",
[ [
datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC),
datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc), datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC),
], ],
id="ambiguous-dates-respect-orders", id="ambiguous-dates-respect-orders",
), ),
@@ -239,7 +239,7 @@ class TestRegexDateParser:
"notes.txt", "notes.txt",
"bad date 99/99/9999 and 25/12/2022", "bad date 99/99/9999 and 25/12/2022",
[ [
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
], ],
id="parse-exception-skips-bad-and-yields-good", id="parse-exception-skips-bad-and-yields-good",
), ),
@@ -275,24 +275,24 @@ class TestRegexDateParser:
or "2023.12.25" in date_string or "2023.12.25" in date_string
or "2023-12-25" in date_string or "2023-12-25" in date_string
): ):
return datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC)
# content DMY 25/12/2022 # content DMY 25/12/2022
if "25/12/2022" in date_string or "25-12-2022" in date_string: if "25/12/2022" in date_string or "25-12-2022" in date_string:
return datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC)
# filename YMD 2023.01.02 # filename YMD 2023.01.02
if "2023.01.02" in date_string or "2023-01-02" in date_string: if "2023.01.02" in date_string or "2023-01-02" in date_string:
return datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC)
# ambiguous 01/02/2023 -> respect DATE_ORDER setting # ambiguous 01/02/2023 -> respect DATE_ORDER setting
if "01/02/2023" in date_string: if "01/02/2023" in date_string:
if date_order == "DMY": if date_order == "DMY":
return datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC)
if date_order == "YMD": if date_order == "YMD":
return datetime.datetime(2023, 1, 2, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC)
# fallback # fallback
return datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 2, 1, tzinfo=datetime.UTC)
# simulate parse failure for malformed input # simulate parse failure for malformed input
if "99/99/9999" in date_string or "bad date" in date_string: if "99/99/9999" in date_string or "bad date" in date_string:
@@ -328,7 +328,7 @@ class TestRegexDateParser:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order="YMD", filename_date_order="YMD",
content_date_order="DMY", content_date_order="DMY",
@@ -344,13 +344,13 @@ class TestRegexDateParser:
) -> datetime.datetime | None: ) -> datetime.datetime | None:
if "10/12/2023" in date_string or "10-12-2023" in date_string: if "10/12/2023" in date_string or "10-12-2023" in date_string:
# ignored date # ignored date
return datetime.datetime(2023, 12, 10, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 10, tzinfo=datetime.UTC)
if "01/02/2024" in date_string or "01-02-2024" in date_string: if "01/02/2024" in date_string or "01-02-2024" in date_string:
# future relative to reference_time -> filtered # future relative to reference_time -> filtered
return datetime.datetime(2024, 2, 1, tzinfo=datetime.timezone.utc) return datetime.datetime(2024, 2, 1, tzinfo=datetime.UTC)
if "05/01/2023" in date_string or "05-01-2023" in date_string: if "05/01/2023" in date_string or "05-01-2023" in date_string:
# valid # valid
return datetime.datetime(2023, 1, 5, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 1, 5, tzinfo=datetime.UTC)
return None return None
mocker.patch(target, side_effect=fake_parse) mocker.patch(target, side_effect=fake_parse)
@@ -358,7 +358,7 @@ class TestRegexDateParser:
content = "Ignored: 10/12/2023, Future: 01/02/2024, Keep: 05/01/2023" content = "Ignored: 10/12/2023, Future: 01/02/2024, Keep: 05/01/2023"
results = list(parser.parse("whatever.txt", content)) results = list(parser.parse("whatever.txt", content))
assert results == [datetime.datetime(2023, 1, 5, tzinfo=datetime.timezone.utc)] assert results == [datetime.datetime(2023, 1, 5, tzinfo=datetime.UTC)]
def test_parse_handles_no_matches_and_returns_empty_list( def test_parse_handles_no_matches_and_returns_empty_list(
self, self,
@@ -392,7 +392,7 @@ class TestRegexDateParser:
12, 12,
0, 0,
0, 0,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
), ),
filename_date_order=None, filename_date_order=None,
content_date_order="DMY", content_date_order="DMY",
@@ -409,9 +409,9 @@ class TestRegexDateParser:
) -> datetime.datetime | None: ) -> datetime.datetime | None:
# return distinct datetimes so we can tell which source was parsed # return distinct datetimes so we can tell which source was parsed
if "25/12/2022" in date_string: if "25/12/2022" in date_string:
return datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC)
if "2023-12-25" in date_string: if "2023-12-25" in date_string:
return datetime.datetime(2023, 12, 25, tzinfo=datetime.timezone.utc) return datetime.datetime(2023, 12, 25, tzinfo=datetime.UTC)
return None return None
mock = mocker.patch(target, side_effect=fake_parse) mock = mocker.patch(target, side_effect=fake_parse)
@@ -429,5 +429,5 @@ class TestRegexDateParser:
assert "25/12/2022" in called_date_string assert "25/12/2022" in called_date_string
# And the parser should have yielded the corresponding datetime # And the parser should have yielded the corresponding datetime
assert results == [ assert results == [
datetime.datetime(2022, 12, 25, tzinfo=datetime.timezone.utc), datetime.datetime(2022, 12, 25, tzinfo=datetime.UTC),
] ]

View File

@@ -726,6 +726,14 @@ class TestConsumer(
self.assertIsNotNone(root_doc) self.assertIsNotNone(root_doc)
assert root_doc is not None assert root_doc is not None
root_storage_path = StoragePath.objects.create(
name="version-root-path",
path="root/{{title}}",
)
root_doc.storage_path = root_storage_path
root_doc.archive_serial_number = 42
root_doc.save()
actor = User.objects.create_user( actor = User.objects.create_user(
username="actor", username="actor",
email="actor@example.com", email="actor@example.com",
@@ -762,7 +770,7 @@ class TestConsumer(
) )
consumer.setup() consumer.setup()
try: try:
self.assertTrue(consumer.filename.endswith("_v0.pdf")) self.assertEqual(consumer.filename, version_file.name)
consumer.run() consumer.run()
finally: finally:
consumer.cleanup() consumer.cleanup()
@@ -772,8 +780,10 @@ class TestConsumer(
version = versions.first() version = versions.first()
assert version is not None assert version is not None
assert version.original_filename is not None assert version.original_filename is not None
self.assertEqual(version.version_index, 1)
self.assertEqual(version.version_label, "v2") self.assertEqual(version.version_label, "v2")
self.assertTrue(version.original_filename.endswith("_v0.pdf")) self.assertIsNone(version.archive_serial_number)
self.assertEqual(version.original_filename, version_file.name)
self.assertTrue(bool(version.content)) self.assertTrue(bool(version.content))
@override_settings(AUDIT_LOG_ENABLED=True) @override_settings(AUDIT_LOG_ENABLED=True)
@@ -822,7 +832,7 @@ class TestConsumer(
) )
consumer.setup() consumer.setup()
try: try:
self.assertEqual(consumer.filename, "valid_pdf_version-upload_v0") self.assertEqual(consumer.filename, "valid_pdf_version-upload")
consumer.run() consumer.run()
finally: finally:
consumer.cleanup() consumer.cleanup()
@@ -832,9 +842,67 @@ class TestConsumer(
) )
self.assertIsNotNone(version) self.assertIsNotNone(version)
assert version is not None assert version is not None
self.assertEqual(version.original_filename, "valid_pdf_version-upload_v0") self.assertEqual(version.version_index, 1)
self.assertEqual(version.original_filename, "valid_pdf_version-upload")
self.assertTrue(bool(version.content)) self.assertTrue(bool(version.content))
@override_settings(AUDIT_LOG_ENABLED=True)
@mock.patch("documents.consumer.load_classifier")
def test_consume_version_index_monotonic_after_version_deletion(self, m) -> None:
m.return_value = MagicMock()
with self.get_consumer(self.get_test_file()) as consumer:
consumer.run()
root_doc = Document.objects.first()
self.assertIsNotNone(root_doc)
assert root_doc is not None
def consume_version(version_file: Path) -> Document:
status = DummyProgressManager(version_file.name, None)
overrides = DocumentMetadataOverrides()
doc = ConsumableDocument(
DocumentSource.ApiUpload,
original_file=version_file,
root_document_id=root_doc.pk,
)
preflight = ConsumerPreflightPlugin(
doc,
overrides,
status, # type: ignore[arg-type]
self.dirs.scratch_dir,
"task-id",
)
preflight.setup()
preflight.run()
consumer = ConsumerPlugin(
doc,
overrides,
status, # type: ignore[arg-type]
self.dirs.scratch_dir,
"task-id",
)
consumer.setup()
try:
consumer.run()
finally:
consumer.cleanup()
version = (
Document.objects.filter(root_document=root_doc).order_by("-id").first()
)
assert version is not None
return version
v1 = consume_version(self.get_test_file2())
self.assertEqual(v1.version_index, 1)
v1.delete()
# The next version should have version_index 2, even though version_index 1 was deleted
v2 = consume_version(self.get_test_file())
self.assertEqual(v2.version_index, 2)
@mock.patch("documents.consumer.load_classifier") @mock.patch("documents.consumer.load_classifier")
def testClassifyDocument(self, m) -> None: def testClassifyDocument(self, m) -> None:
correspondent = Correspondent.objects.create( correspondent = Correspondent.objects.create(

View File

@@ -77,6 +77,58 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
settings.ORIGINALS_DIR / "test" / "test.pdf", settings.ORIGINALS_DIR / "test" / "test.pdf",
) )
@override_settings(FILENAME_FORMAT=None)
def test_root_storage_path_change_updates_version_files(self) -> None:
old_storage_path = StoragePath.objects.create(
name="old-path",
path="old/{{title}}",
)
new_storage_path = StoragePath.objects.create(
name="new-path",
path="new/{{title}}",
)
root_doc = Document.objects.create(
title="rootdoc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=old_storage_path,
)
version_doc = Document.objects.create(
title="version-title",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
Document.objects.filter(pk=root_doc.pk).update(
filename=generate_filename(root_doc),
)
Document.objects.filter(pk=version_doc.pk).update(
filename=generate_filename(version_doc),
)
root_doc.refresh_from_db()
version_doc.refresh_from_db()
create_source_path_directory(root_doc.source_path)
Path(root_doc.source_path).touch()
create_source_path_directory(version_doc.source_path)
Path(version_doc.source_path).touch()
root_doc.storage_path = new_storage_path
root_doc.save()
root_doc.refresh_from_db()
version_doc.refresh_from_db()
self.assertEqual(root_doc.filename, "new/rootdoc.pdf")
self.assertEqual(version_doc.filename, "new/rootdoc_v1.pdf")
self.assertIsFile(root_doc.source_path)
self.assertIsFile(version_doc.source_path)
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc.pdf")
self.assertIsNotFile(settings.ORIGINALS_DIR / "old" / "rootdoc_v1.pdf")
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}") @override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming_missing_permissions(self) -> None: def test_file_renaming_missing_permissions(self) -> None:
document = Document() document = Document()
@@ -336,7 +388,11 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
added=d1, added=d1,
) )
self.assertEqual(generate_filename(doc1), Path("1232-01-09.pdf")) # Account for 3.14 padding changes
expected_year: str = d1.strftime("%Y")
expected_filename: Path = Path(f"{expected_year}-01-09.pdf")
self.assertEqual(generate_filename(doc1), expected_filename)
doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1)) doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1))
@@ -1222,6 +1278,94 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
Path("logs.pdf"), Path("logs.pdf"),
) )
@override_settings(FILENAME_FORMAT="{title}")
def test_version_index_suffix_for_template_filename(self) -> None:
root_doc = Document.objects.create(
title="the_doc",
mime_type="application/pdf",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="the_doc",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
)
self.assertEqual(generate_filename(version_doc), Path("the_doc_v1.pdf"))
self.assertEqual(
generate_filename(version_doc, counter=1),
Path("the_doc_v1_01.pdf"),
)
@override_settings(FILENAME_FORMAT=None)
def test_version_index_suffix_for_default_filename(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="root-checksum",
)
version_doc = Document.objects.create(
title="root",
mime_type="text/plain",
checksum="version-checksum",
root_document=root_doc,
version_index=2,
)
self.assertEqual(
generate_filename(version_doc),
Path(f"{root_doc.pk:07d}_v2.txt"),
)
self.assertEqual(
generate_filename(version_doc, archive_filename=True),
Path(f"{root_doc.pk:07d}_v2.pdf"),
)
@override_settings(FILENAME_FORMAT="{original_name}")
def test_version_index_suffix_with_original_name_placeholder(self) -> None:
root_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="root-checksum",
original_filename="root-upload.pdf",
)
version_doc = Document.objects.create(
title="root",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=1,
original_filename="version-upload.pdf",
)
self.assertEqual(generate_filename(version_doc), Path("root-upload_v1.pdf"))
def test_version_index_suffix_with_storage_path(self) -> None:
storage_path = StoragePath.objects.create(
name="vtest",
path="folder/{{title}}",
)
root_doc = Document.objects.create(
title="storage_doc",
mime_type="application/pdf",
checksum="root-checksum",
storage_path=storage_path,
)
version_doc = Document.objects.create(
title="version_title_should_not_be_used",
mime_type="application/pdf",
checksum="version-checksum",
root_document=root_doc,
version_index=3,
)
self.assertEqual(
generate_filename(version_doc),
Path("folder/storage_doc_v3.pdf"),
)
@override_settings( @override_settings(
FILENAME_FORMAT="XX{correspondent}/{title}", FILENAME_FORMAT="XX{correspondent}/{title}",
FILENAME_FORMAT_REMOVE_NONE=True, FILENAME_FORMAT_REMOVE_NONE=True,

View File

@@ -21,7 +21,7 @@ class TestDateLocalization:
14, 14,
30, 30,
5, 5,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
TEST_DATETIME_STRING: str = "2023-10-26T14:30:05+00:00" TEST_DATETIME_STRING: str = "2023-10-26T14:30:05+00:00"

View File

@@ -4698,7 +4698,7 @@ class TestDateWorkflowLocalization(
14, 14,
30, 30,
5, 5,
tzinfo=datetime.timezone.utc, tzinfo=datetime.UTC,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import StrEnum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
@@ -11,7 +11,7 @@ if TYPE_CHECKING:
from django.http import HttpRequest from django.http import HttpRequest
class VersionResolutionError(str, Enum): class VersionResolutionError(StrEnum):
INVALID = "invalid" INVALID = "invalid"
NOT_FOUND = "not_found" NOT_FOUND = "not_found"

File diff suppressed because it is too large Load Diff

View File

@@ -6,25 +6,18 @@ import math
import multiprocessing import multiprocessing
import os import os
import tempfile import tempfile
from os import PathLike
from pathlib import Path from pathlib import Path
from typing import Final from typing import Final
from urllib.parse import urlparse from urllib.parse import urlparse
from celery.schedules import crontab
from compression_middleware.middleware import CompressionMiddleware from compression_middleware.middleware import CompressionMiddleware
from dateparser.languages.loader import LocaleDataLoader
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv from dotenv import load_dotenv
from paperless.settings.custom import parse_beat_schedule
from paperless.settings.custom import parse_dateparser_languages
from paperless.settings.custom import parse_db_settings from paperless.settings.custom import parse_db_settings
from paperless.settings.custom import parse_hosting_settings
from paperless.settings.custom import parse_ignore_dates
from paperless.settings.custom import parse_redis_url
from paperless.settings.parsers import get_bool_from_env
from paperless.settings.parsers import get_float_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import get_list_from_env
from paperless.settings.parsers import get_path_from_env
logger = logging.getLogger("paperless.settings") logger = logging.getLogger("paperless.settings")
@@ -52,8 +45,239 @@ for path in [
os.environ["OMP_THREAD_LIMIT"] = "1" os.environ["OMP_THREAD_LIMIT"] = "1"
def __get_boolean(key: str, default: str = "NO") -> bool:
"""
Return a boolean value based on whatever the user has supplied in the
environment based on whether the value "looks like" it's True or not.
"""
return bool(os.getenv(key, default).lower() in ("yes", "y", "1", "t", "true"))
def __get_int(key: str, default: int) -> int:
"""
Return an integer value based on the environment variable or a default
"""
return int(os.getenv(key, default))
def __get_optional_int(key: str) -> int | None:
"""
Returns None if the environment key is not present, otherwise an integer
"""
if key in os.environ:
return __get_int(key, -1) # pragma: no cover
return None
def __get_float(key: str, default: float) -> float:
"""
Return an integer value based on the environment variable or a default
"""
return float(os.getenv(key, default))
def __get_path(
key: str,
default: PathLike | str,
) -> Path:
"""
Return a normalized, absolute path based on the environment variable or a default,
if provided
"""
if key in os.environ:
return Path(os.environ[key]).resolve()
return Path(default).resolve()
def __get_optional_path(key: str) -> Path | None:
"""
Returns None if the environment key is not present, otherwise a fully resolved Path
"""
if key in os.environ:
return __get_path(key, "")
return None
def __get_list(
key: str,
default: list[str] | None = None,
sep: str = ",",
) -> list[str]:
"""
Return a list of elements from the environment, as separated by the given
string, or the default if the key does not exist
"""
if key in os.environ:
return list(filter(None, os.environ[key].split(sep)))
elif default is not None:
return default
else:
return []
def _parse_redis_url(env_redis: str | None) -> tuple[str, str]:
"""
Gets the Redis information from the environment or a default and handles
converting from incompatible django_channels and celery formats.
Returns a tuple of (celery_url, channels_url)
"""
# Not set, return a compatible default
if env_redis is None:
return ("redis://localhost:6379", "redis://localhost:6379")
if "unix" in env_redis.lower():
# channels_redis socket format, looks like:
# "unix:///path/to/redis.sock"
_, path = env_redis.split(":", 1)
# Optionally setting a db number
if "?db=" in env_redis:
path, number = path.split("?db=")
return (f"redis+socket:{path}?virtual_host={number}", env_redis)
else:
return (f"redis+socket:{path}", env_redis)
elif "+socket" in env_redis.lower():
# celery socket style, looks like:
# "redis+socket:///path/to/redis.sock"
_, path = env_redis.split(":", 1)
if "?virtual_host=" in env_redis:
# Virtual host (aka db number)
path, number = path.split("?virtual_host=")
return (env_redis, f"unix:{path}?db={number}")
else:
return (env_redis, f"unix:{path}")
# Not a socket
return (env_redis, env_redis)
def _parse_beat_schedule() -> dict:
"""
Configures the scheduled tasks, according to default or
environment variables. Task expiration is configured so the task will
expire (and not run), shortly before the default frequency will put another
of the same task into the queue
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
"""
schedule = {}
tasks = [
{
"name": "Check all e-mail accounts",
"env_key": "PAPERLESS_EMAIL_TASK_CRON",
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
"options": {
# 1 minute before default schedule sends again
"expires": 9.0 * 60.0,
},
},
{
"name": "Train the classifier",
"env_key": "PAPERLESS_TRAIN_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Optimize the index",
"env_key": "PAPERLESS_INDEX_TASK_CRON",
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Perform sanity check",
"env_key": "PAPERLESS_SANITY_TASK_CRON",
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
"options": {
# 1 hour before default schedule sends again
"expires": ((7.0 * 24.0) - 1.0) * 60.0 * 60.0,
},
},
{
"name": "Empty trash",
"env_key": "PAPERLESS_EMPTY_TRASH_TASK_CRON",
# Default daily at 01:00
"env_default": "0 1 * * *",
"task": "documents.tasks.empty_trash",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Check and run scheduled workflows",
"env_key": "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.check_scheduled_workflows",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Rebuild LLM index",
"env_key": "PAPERLESS_LLM_INDEX_TASK_CRON",
# Default daily at 02:10
"env_default": "10 2 * * *",
"task": "documents.tasks.llmindex_index",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Cleanup expired share link bundles",
"env_key": "PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON",
# Default daily at 02:00
"env_default": "0 2 * * *",
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
]
for task in tasks:
# Either get the environment setting or use the default
value = os.getenv(task["env_key"], task["env_default"])
# Don't add disabled tasks to the schedule
if value == "disable":
continue
# I find https://crontab.guru/ super helpful
# crontab(5) format
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
"options": task["options"],
}
return schedule
# NEVER RUN WITH DEBUG IN PRODUCTION. # NEVER RUN WITH DEBUG IN PRODUCTION.
DEBUG = get_bool_from_env("PAPERLESS_DEBUG", "NO") DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
############################################################################### ###############################################################################
@@ -62,21 +286,21 @@ DEBUG = get_bool_from_env("PAPERLESS_DEBUG", "NO")
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
STATIC_ROOT = get_path_from_env("PAPERLESS_STATICDIR", BASE_DIR.parent / "static") STATIC_ROOT = __get_path("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
MEDIA_ROOT = get_path_from_env("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media") MEDIA_ROOT = __get_path("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media")
ORIGINALS_DIR = MEDIA_ROOT / "documents" / "originals" ORIGINALS_DIR = MEDIA_ROOT / "documents" / "originals"
ARCHIVE_DIR = MEDIA_ROOT / "documents" / "archive" ARCHIVE_DIR = MEDIA_ROOT / "documents" / "archive"
THUMBNAIL_DIR = MEDIA_ROOT / "documents" / "thumbnails" THUMBNAIL_DIR = MEDIA_ROOT / "documents" / "thumbnails"
SHARE_LINK_BUNDLE_DIR = MEDIA_ROOT / "documents" / "share_link_bundles" SHARE_LINK_BUNDLE_DIR = MEDIA_ROOT / "documents" / "share_link_bundles"
DATA_DIR = get_path_from_env("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data") DATA_DIR = __get_path("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data")
NLTK_DIR = get_path_from_env("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data") NLTK_DIR = __get_path("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data")
# Check deprecated setting first # Check deprecated setting first
EMPTY_TRASH_DIR = ( EMPTY_TRASH_DIR = (
get_path_from_env("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR")) __get_path("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR"))
if os.getenv("PAPERLESS_TRASH_DIR") or os.getenv("PAPERLESS_EMPTY_TRASH_DIR") if os.getenv("PAPERLESS_TRASH_DIR") or os.getenv("PAPERLESS_EMPTY_TRASH_DIR")
else None else None
) )
@@ -85,21 +309,21 @@ EMPTY_TRASH_DIR = (
# threads. # threads.
MEDIA_LOCK = MEDIA_ROOT / "media.lock" MEDIA_LOCK = MEDIA_ROOT / "media.lock"
INDEX_DIR = DATA_DIR / "index" INDEX_DIR = DATA_DIR / "index"
MODEL_FILE = get_path_from_env( MODEL_FILE = __get_path(
"PAPERLESS_MODEL_FILE", "PAPERLESS_MODEL_FILE",
DATA_DIR / "classification_model.pickle", DATA_DIR / "classification_model.pickle",
) )
LLM_INDEX_DIR = DATA_DIR / "llm_index" LLM_INDEX_DIR = DATA_DIR / "llm_index"
LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log") LOGGING_DIR = __get_path("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
CONSUMPTION_DIR = get_path_from_env( CONSUMPTION_DIR = __get_path(
"PAPERLESS_CONSUMPTION_DIR", "PAPERLESS_CONSUMPTION_DIR",
BASE_DIR.parent / "consume", BASE_DIR.parent / "consume",
) )
# This will be created if it doesn't exist # This will be created if it doesn't exist
SCRATCH_DIR = get_path_from_env( SCRATCH_DIR = __get_path(
"PAPERLESS_SCRATCH_DIR", "PAPERLESS_SCRATCH_DIR",
Path(tempfile.gettempdir()) / "paperless", Path(tempfile.gettempdir()) / "paperless",
) )
@@ -108,7 +332,7 @@ SCRATCH_DIR = get_path_from_env(
# Application Definition # # Application Definition #
############################################################################### ###############################################################################
env_apps = get_list_from_env("PAPERLESS_APPS") env_apps = __get_list("PAPERLESS_APPS")
INSTALLED_APPS = [ INSTALLED_APPS = [
"whitenoise.runserver_nostatic", "whitenoise.runserver_nostatic",
@@ -181,7 +405,7 @@ MIDDLEWARE = [
] ]
# Optional to enable compression # Optional to enable compression
if get_bool_from_env("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover if __get_boolean("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover
MIDDLEWARE.insert(0, "compression_middleware.middleware.CompressionMiddleware") MIDDLEWARE.insert(0, "compression_middleware.middleware.CompressionMiddleware")
# Workaround to not compress streaming responses (e.g. chat). # Workaround to not compress streaming responses (e.g. chat).
@@ -200,8 +424,20 @@ CompressionMiddleware.process_response = patched_process_response
ROOT_URLCONF = "paperless.urls" ROOT_URLCONF = "paperless.urls"
def _parse_base_paths() -> tuple[str, str, str, str, str]:
script_name = os.getenv("PAPERLESS_FORCE_SCRIPT_NAME")
base_url = (script_name or "") + "/"
login_url = base_url + "accounts/login/"
login_redirect_url = base_url + "dashboard"
logout_redirect_url = os.getenv(
"PAPERLESS_LOGOUT_REDIRECT_URL",
login_url + "?loggedout=1",
)
return script_name, base_url, login_url, login_redirect_url, logout_redirect_url
FORCE_SCRIPT_NAME, BASE_URL, LOGIN_URL, LOGIN_REDIRECT_URL, LOGOUT_REDIRECT_URL = ( FORCE_SCRIPT_NAME, BASE_URL, LOGIN_URL, LOGIN_REDIRECT_URL, LOGOUT_REDIRECT_URL = (
parse_hosting_settings() _parse_base_paths()
) )
# DRF Spectacular settings # DRF Spectacular settings
@@ -235,7 +471,7 @@ STORAGES = {
"default": {"BACKEND": "django.core.files.storage.FileSystemStorage"}, "default": {"BACKEND": "django.core.files.storage.FileSystemStorage"},
} }
_CELERY_REDIS_URL, _CHANNELS_REDIS_URL = parse_redis_url( _CELERY_REDIS_URL, _CHANNELS_REDIS_URL = _parse_redis_url(
os.getenv("PAPERLESS_REDIS", None), os.getenv("PAPERLESS_REDIS", None),
) )
_REDIS_KEY_PREFIX = os.getenv("PAPERLESS_REDIS_PREFIX", "") _REDIS_KEY_PREFIX = os.getenv("PAPERLESS_REDIS_PREFIX", "")
@@ -284,8 +520,8 @@ EMAIL_PORT: Final[int] = int(os.getenv("PAPERLESS_EMAIL_PORT", 25))
EMAIL_HOST_USER: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_USER", "") EMAIL_HOST_USER: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_USER", "")
EMAIL_HOST_PASSWORD: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_PASSWORD", "") EMAIL_HOST_PASSWORD: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_PASSWORD", "")
DEFAULT_FROM_EMAIL: Final[str] = os.getenv("PAPERLESS_EMAIL_FROM", EMAIL_HOST_USER) DEFAULT_FROM_EMAIL: Final[str] = os.getenv("PAPERLESS_EMAIL_FROM", EMAIL_HOST_USER)
EMAIL_USE_TLS: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_TLS") EMAIL_USE_TLS: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_TLS")
EMAIL_USE_SSL: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_SSL") EMAIL_USE_SSL: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_SSL")
EMAIL_SUBJECT_PREFIX: Final[str] = "[Paperless-ngx] " EMAIL_SUBJECT_PREFIX: Final[str] = "[Paperless-ngx] "
EMAIL_TIMEOUT = 30.0 EMAIL_TIMEOUT = 30.0
EMAIL_ENABLED = EMAIL_HOST != "localhost" or EMAIL_HOST_USER != "" EMAIL_ENABLED = EMAIL_HOST != "localhost" or EMAIL_HOST_USER != ""
@@ -310,22 +546,20 @@ ACCOUNT_DEFAULT_HTTP_PROTOCOL = os.getenv(
) )
ACCOUNT_ADAPTER = "paperless.adapter.CustomAccountAdapter" ACCOUNT_ADAPTER = "paperless.adapter.CustomAccountAdapter"
ACCOUNT_ALLOW_SIGNUPS = get_bool_from_env("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS") ACCOUNT_ALLOW_SIGNUPS = __get_boolean("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS")
ACCOUNT_DEFAULT_GROUPS = get_list_from_env("PAPERLESS_ACCOUNT_DEFAULT_GROUPS") ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_ACCOUNT_DEFAULT_GROUPS")
SOCIALACCOUNT_ADAPTER = "paperless.adapter.CustomSocialAccountAdapter" SOCIALACCOUNT_ADAPTER = "paperless.adapter.CustomSocialAccountAdapter"
SOCIALACCOUNT_ALLOW_SIGNUPS = get_bool_from_env( SOCIALACCOUNT_ALLOW_SIGNUPS = __get_boolean(
"PAPERLESS_SOCIALACCOUNT_ALLOW_SIGNUPS", "PAPERLESS_SOCIALACCOUNT_ALLOW_SIGNUPS",
"yes", "yes",
) )
SOCIALACCOUNT_AUTO_SIGNUP = get_bool_from_env("PAPERLESS_SOCIAL_AUTO_SIGNUP") SOCIALACCOUNT_AUTO_SIGNUP = __get_boolean("PAPERLESS_SOCIAL_AUTO_SIGNUP")
SOCIALACCOUNT_PROVIDERS = json.loads( SOCIALACCOUNT_PROVIDERS = json.loads(
os.getenv("PAPERLESS_SOCIALACCOUNT_PROVIDERS", "{}"), os.getenv("PAPERLESS_SOCIALACCOUNT_PROVIDERS", "{}"),
) )
SOCIAL_ACCOUNT_DEFAULT_GROUPS = get_list_from_env( SOCIAL_ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS")
"PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS", SOCIAL_ACCOUNT_SYNC_GROUPS = __get_boolean("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
)
SOCIAL_ACCOUNT_SYNC_GROUPS = get_bool_from_env("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM: Final[str] = os.getenv( SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM: Final[str] = os.getenv(
"PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM", "PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM",
"groups", "groups",
@@ -337,8 +571,8 @@ MFA_TOTP_ISSUER = "Paperless-ngx"
ACCOUNT_EMAIL_SUBJECT_PREFIX = "[Paperless-ngx] " ACCOUNT_EMAIL_SUBJECT_PREFIX = "[Paperless-ngx] "
DISABLE_REGULAR_LOGIN = get_bool_from_env("PAPERLESS_DISABLE_REGULAR_LOGIN") DISABLE_REGULAR_LOGIN = __get_boolean("PAPERLESS_DISABLE_REGULAR_LOGIN")
REDIRECT_LOGIN_TO_SSO = get_bool_from_env("PAPERLESS_REDIRECT_LOGIN_TO_SSO") REDIRECT_LOGIN_TO_SSO = __get_boolean("PAPERLESS_REDIRECT_LOGIN_TO_SSO")
AUTO_LOGIN_USERNAME = os.getenv("PAPERLESS_AUTO_LOGIN_USERNAME") AUTO_LOGIN_USERNAME = os.getenv("PAPERLESS_AUTO_LOGIN_USERNAME")
@@ -351,15 +585,12 @@ ACCOUNT_EMAIL_VERIFICATION = (
) )
) )
ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = get_bool_from_env( ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = __get_boolean(
"PAPERLESS_ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS", "PAPERLESS_ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS",
"True", "True",
) )
ACCOUNT_SESSION_REMEMBER = get_bool_from_env( ACCOUNT_SESSION_REMEMBER = __get_boolean("PAPERLESS_ACCOUNT_SESSION_REMEMBER", "True")
"PAPERLESS_ACCOUNT_SESSION_REMEMBER",
"True",
)
SESSION_EXPIRE_AT_BROWSER_CLOSE = not ACCOUNT_SESSION_REMEMBER SESSION_EXPIRE_AT_BROWSER_CLOSE = not ACCOUNT_SESSION_REMEMBER
SESSION_COOKIE_AGE = int( SESSION_COOKIE_AGE = int(
os.getenv("PAPERLESS_SESSION_COOKIE_AGE", 60 * 60 * 24 * 7 * 3), os.getenv("PAPERLESS_SESSION_COOKIE_AGE", 60 * 60 * 24 * 7 * 3),
@@ -376,8 +607,8 @@ if AUTO_LOGIN_USERNAME:
def _parse_remote_user_settings() -> str: def _parse_remote_user_settings() -> str:
global MIDDLEWARE, AUTHENTICATION_BACKENDS, REST_FRAMEWORK global MIDDLEWARE, AUTHENTICATION_BACKENDS, REST_FRAMEWORK
enable = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER") enable = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER")
enable_api = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API") enable_api = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API")
if enable or enable_api: if enable or enable_api:
MIDDLEWARE.append("paperless.auth.HttpRemoteUserMiddleware") MIDDLEWARE.append("paperless.auth.HttpRemoteUserMiddleware")
AUTHENTICATION_BACKENDS.insert( AUTHENTICATION_BACKENDS.insert(
@@ -405,16 +636,16 @@ HTTP_REMOTE_USER_HEADER_NAME = _parse_remote_user_settings()
X_FRAME_OPTIONS = "SAMEORIGIN" X_FRAME_OPTIONS = "SAMEORIGIN"
# The next 3 settings can also be set using just PAPERLESS_URL # The next 3 settings can also be set using just PAPERLESS_URL
CSRF_TRUSTED_ORIGINS = get_list_from_env("PAPERLESS_CSRF_TRUSTED_ORIGINS") CSRF_TRUSTED_ORIGINS = __get_list("PAPERLESS_CSRF_TRUSTED_ORIGINS")
if DEBUG: if DEBUG:
# Allow access from the angular development server during debugging # Allow access from the angular development server during debugging
CSRF_TRUSTED_ORIGINS.append("http://localhost:4200") CSRF_TRUSTED_ORIGINS.append("http://localhost:4200")
# We allow CORS from localhost:8000 # We allow CORS from localhost:8000
CORS_ALLOWED_ORIGINS = get_list_from_env( CORS_ALLOWED_ORIGINS = __get_list(
"PAPERLESS_CORS_ALLOWED_HOSTS", "PAPERLESS_CORS_ALLOWED_HOSTS",
default=["http://localhost:8000"], ["http://localhost:8000"],
) )
if DEBUG: if DEBUG:
@@ -427,7 +658,7 @@ CORS_EXPOSE_HEADERS = [
"Content-Disposition", "Content-Disposition",
] ]
ALLOWED_HOSTS = get_list_from_env("PAPERLESS_ALLOWED_HOSTS", default=["*"]) ALLOWED_HOSTS = __get_list("PAPERLESS_ALLOWED_HOSTS", ["*"])
if ALLOWED_HOSTS != ["*"]: if ALLOWED_HOSTS != ["*"]:
# always allow localhost. Necessary e.g. for healthcheck in docker. # always allow localhost. Necessary e.g. for healthcheck in docker.
ALLOWED_HOSTS.append("localhost") ALLOWED_HOSTS.append("localhost")
@@ -447,10 +678,10 @@ def _parse_paperless_url():
PAPERLESS_URL = _parse_paperless_url() PAPERLESS_URL = _parse_paperless_url()
# For use with trusted proxies # For use with trusted proxies
TRUSTED_PROXIES = get_list_from_env("PAPERLESS_TRUSTED_PROXIES") TRUSTED_PROXIES = __get_list("PAPERLESS_TRUSTED_PROXIES")
USE_X_FORWARDED_HOST = get_bool_from_env("PAPERLESS_USE_X_FORWARD_HOST", "false") USE_X_FORWARDED_HOST = __get_boolean("PAPERLESS_USE_X_FORWARD_HOST", "false")
USE_X_FORWARDED_PORT = get_bool_from_env("PAPERLESS_USE_X_FORWARD_PORT", "false") USE_X_FORWARDED_PORT = __get_boolean("PAPERLESS_USE_X_FORWARD_PORT", "false")
SECURE_PROXY_SSL_HEADER = ( SECURE_PROXY_SSL_HEADER = (
tuple(json.loads(os.environ["PAPERLESS_PROXY_SSL_HEADER"])) tuple(json.loads(os.environ["PAPERLESS_PROXY_SSL_HEADER"]))
if "PAPERLESS_PROXY_SSL_HEADER" in os.environ if "PAPERLESS_PROXY_SSL_HEADER" in os.environ
@@ -493,7 +724,7 @@ CSRF_COOKIE_NAME = f"{COOKIE_PREFIX}csrftoken"
SESSION_COOKIE_NAME = f"{COOKIE_PREFIX}sessionid" SESSION_COOKIE_NAME = f"{COOKIE_PREFIX}sessionid"
LANGUAGE_COOKIE_NAME = f"{COOKIE_PREFIX}django_language" LANGUAGE_COOKIE_NAME = f"{COOKIE_PREFIX}django_language"
EMAIL_CERTIFICATE_FILE = get_path_from_env("PAPERLESS_EMAIL_CERTIFICATE_LOCATION") EMAIL_CERTIFICATE_FILE = __get_optional_path("PAPERLESS_EMAIL_CERTIFICATE_LOCATION")
############################################################################### ###############################################################################
@@ -644,7 +875,7 @@ CELERY_BROKER_URL = _CELERY_REDIS_URL
CELERY_TIMEZONE = TIME_ZONE CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY: Final[int] = get_int_from_env("PAPERLESS_TASK_WORKERS", 1) CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
TASK_WORKERS = CELERY_WORKER_CONCURRENCY TASK_WORKERS = CELERY_WORKER_CONCURRENCY
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1 CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_WORKER_SEND_TASK_EVENTS = True CELERY_WORKER_SEND_TASK_EVENTS = True
@@ -657,7 +888,7 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
} }
CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT: Final[int] = get_int_from_env("PAPERLESS_WORKER_TIMEOUT", 1800) CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
CELERY_RESULT_EXTENDED = True CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_BACKEND = "django-db"
@@ -669,7 +900,7 @@ CELERY_TASK_SERIALIZER = "pickle"
CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"] CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"]
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule # https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule
CELERY_BEAT_SCHEDULE = parse_beat_schedule() CELERY_BEAT_SCHEDULE = _parse_beat_schedule()
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule-filename # https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule-filename
CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db") CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
@@ -677,14 +908,14 @@ CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
# Cachalot: Database read cache. # Cachalot: Database read cache.
def _parse_cachalot_settings(): def _parse_cachalot_settings():
ttl = get_int_from_env("PAPERLESS_READ_CACHE_TTL", 3600) ttl = __get_int("PAPERLESS_READ_CACHE_TTL", 3600)
ttl = min(ttl, 31536000) if ttl > 0 else 3600 ttl = min(ttl, 31536000) if ttl > 0 else 3600
_, redis_url = parse_redis_url( _, redis_url = _parse_redis_url(
os.getenv("PAPERLESS_READ_CACHE_REDIS_URL", _CHANNELS_REDIS_URL), os.getenv("PAPERLESS_READ_CACHE_REDIS_URL", _CHANNELS_REDIS_URL),
) )
result = { result = {
"CACHALOT_CACHE": "read-cache", "CACHALOT_CACHE": "read-cache",
"CACHALOT_ENABLED": get_bool_from_env( "CACHALOT_ENABLED": __get_boolean(
"PAPERLESS_DB_READ_CACHE_ENABLED", "PAPERLESS_DB_READ_CACHE_ENABLED",
default="no", default="no",
), ),
@@ -769,9 +1000,9 @@ CONSUMER_POLLING_INTERVAL = float(os.getenv("PAPERLESS_CONSUMER_POLLING_INTERVAL
CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5)) CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5))
CONSUMER_DELETE_DUPLICATES = get_bool_from_env("PAPERLESS_CONSUMER_DELETE_DUPLICATES") CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
CONSUMER_RECURSIVE = get_bool_from_env("PAPERLESS_CONSUMER_RECURSIVE") CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
# Ignore regex patterns, matched against filename only # Ignore regex patterns, matched against filename only
CONSUMER_IGNORE_PATTERNS = list( CONSUMER_IGNORE_PATTERNS = list(
@@ -793,13 +1024,13 @@ CONSUMER_IGNORE_DIRS = list(
), ),
) )
CONSUMER_SUBDIRS_AS_TAGS = get_bool_from_env("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS") CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
CONSUMER_ENABLE_BARCODES: Final[bool] = get_bool_from_env( CONSUMER_ENABLE_BARCODES: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_ENABLE_BARCODES", "PAPERLESS_CONSUMER_ENABLE_BARCODES",
) )
CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = get_bool_from_env( CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT", "PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT",
) )
@@ -808,7 +1039,7 @@ CONSUMER_BARCODE_STRING: Final[str] = os.getenv(
"PATCHT", "PATCHT",
) )
CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = get_bool_from_env( CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_ENABLE_ASN_BARCODE", "PAPERLESS_CONSUMER_ENABLE_ASN_BARCODE",
) )
@@ -817,26 +1048,23 @@ CONSUMER_ASN_BARCODE_PREFIX: Final[str] = os.getenv(
"ASN", "ASN",
) )
CONSUMER_BARCODE_UPSCALE: Final[float] = get_float_from_env( CONSUMER_BARCODE_UPSCALE: Final[float] = __get_float(
"PAPERLESS_CONSUMER_BARCODE_UPSCALE", "PAPERLESS_CONSUMER_BARCODE_UPSCALE",
0.0, 0.0,
) )
CONSUMER_BARCODE_DPI: Final[int] = get_int_from_env( CONSUMER_BARCODE_DPI: Final[int] = __get_int("PAPERLESS_CONSUMER_BARCODE_DPI", 300)
"PAPERLESS_CONSUMER_BARCODE_DPI",
300,
)
CONSUMER_BARCODE_MAX_PAGES: Final[int] = get_int_from_env( CONSUMER_BARCODE_MAX_PAGES: Final[int] = __get_int(
"PAPERLESS_CONSUMER_BARCODE_MAX_PAGES", "PAPERLESS_CONSUMER_BARCODE_MAX_PAGES",
0, 0,
) )
CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = get_bool_from_env( CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = __get_boolean(
"PAPERLESS_CONSUMER_BARCODE_RETAIN_SPLIT_PAGES", "PAPERLESS_CONSUMER_BARCODE_RETAIN_SPLIT_PAGES",
) )
CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = get_bool_from_env( CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_ENABLE_TAG_BARCODE", "PAPERLESS_CONSUMER_ENABLE_TAG_BARCODE",
) )
@@ -849,11 +1077,11 @@ CONSUMER_TAG_BARCODE_MAPPING = dict(
), ),
) )
CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = get_bool_from_env( CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_TAG_BARCODE_SPLIT", "PAPERLESS_CONSUMER_TAG_BARCODE_SPLIT",
) )
CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = get_bool_from_env( CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED", "PAPERLESS_CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED",
) )
@@ -862,13 +1090,13 @@ CONSUMER_COLLATE_DOUBLE_SIDED_SUBDIR_NAME: Final[str] = os.getenv(
"double-sided", "double-sided",
) )
CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = get_bool_from_env( CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = __get_boolean(
"PAPERLESS_CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT", "PAPERLESS_CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT",
) )
CONSUMER_PDF_RECOVERABLE_MIME_TYPES = ("application/octet-stream",) CONSUMER_PDF_RECOVERABLE_MIME_TYPES = ("application/octet-stream",)
OCR_PAGES = get_int_from_env("PAPERLESS_OCR_PAGES") OCR_PAGES = __get_optional_int("PAPERLESS_OCR_PAGES")
# The default language that tesseract will attempt to use when parsing # The default language that tesseract will attempt to use when parsing
# documents. It should be a 3-letter language code consistent with ISO 639. # documents. It should be a 3-letter language code consistent with ISO 639.
@@ -882,20 +1110,20 @@ OCR_MODE = os.getenv("PAPERLESS_OCR_MODE", "skip")
OCR_SKIP_ARCHIVE_FILE = os.getenv("PAPERLESS_OCR_SKIP_ARCHIVE_FILE", "never") OCR_SKIP_ARCHIVE_FILE = os.getenv("PAPERLESS_OCR_SKIP_ARCHIVE_FILE", "never")
OCR_IMAGE_DPI = get_int_from_env("PAPERLESS_OCR_IMAGE_DPI") OCR_IMAGE_DPI = __get_optional_int("PAPERLESS_OCR_IMAGE_DPI")
OCR_CLEAN = os.getenv("PAPERLESS_OCR_CLEAN", "clean") OCR_CLEAN = os.getenv("PAPERLESS_OCR_CLEAN", "clean")
OCR_DESKEW: Final[bool] = get_bool_from_env("PAPERLESS_OCR_DESKEW", "true") OCR_DESKEW: Final[bool] = __get_boolean("PAPERLESS_OCR_DESKEW", "true")
OCR_ROTATE_PAGES: Final[bool] = get_bool_from_env("PAPERLESS_OCR_ROTATE_PAGES", "true") OCR_ROTATE_PAGES: Final[bool] = __get_boolean("PAPERLESS_OCR_ROTATE_PAGES", "true")
OCR_ROTATE_PAGES_THRESHOLD: Final[float] = get_float_from_env( OCR_ROTATE_PAGES_THRESHOLD: Final[float] = __get_float(
"PAPERLESS_OCR_ROTATE_PAGES_THRESHOLD", "PAPERLESS_OCR_ROTATE_PAGES_THRESHOLD",
12.0, 12.0,
) )
OCR_MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env( OCR_MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
"PAPERLESS_OCR_MAX_IMAGE_PIXELS", "PAPERLESS_OCR_MAX_IMAGE_PIXELS",
) )
@@ -906,7 +1134,7 @@ OCR_COLOR_CONVERSION_STRATEGY = os.getenv(
OCR_USER_ARGS = os.getenv("PAPERLESS_OCR_USER_ARGS") OCR_USER_ARGS = os.getenv("PAPERLESS_OCR_USER_ARGS")
MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env( MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
"PAPERLESS_MAX_IMAGE_PIXELS", "PAPERLESS_MAX_IMAGE_PIXELS",
) )
@@ -921,7 +1149,7 @@ CONVERT_MEMORY_LIMIT = os.getenv("PAPERLESS_CONVERT_MEMORY_LIMIT")
GS_BINARY = os.getenv("PAPERLESS_GS_BINARY", "gs") GS_BINARY = os.getenv("PAPERLESS_GS_BINARY", "gs")
# Fallback layout for .eml consumption # Fallback layout for .eml consumption
EMAIL_PARSE_DEFAULT_LAYOUT = get_int_from_env( EMAIL_PARSE_DEFAULT_LAYOUT = __get_int(
"PAPERLESS_EMAIL_PARSE_DEFAULT_LAYOUT", "PAPERLESS_EMAIL_PARSE_DEFAULT_LAYOUT",
1, # MailRule.PdfLayout.TEXT_HTML but that can't be imported here 1, # MailRule.PdfLayout.TEXT_HTML but that can't be imported here
) )
@@ -935,9 +1163,23 @@ DATE_ORDER = os.getenv("PAPERLESS_DATE_ORDER", "DMY")
FILENAME_DATE_ORDER = os.getenv("PAPERLESS_FILENAME_DATE_ORDER") FILENAME_DATE_ORDER = os.getenv("PAPERLESS_FILENAME_DATE_ORDER")
def _parse_dateparser_languages(languages: str | None):
language_list = languages.split("+") if languages else []
# There is an unfixed issue in zh-Hant and zh-Hans locales in the dateparser lib.
# See: https://github.com/scrapinghub/dateparser/issues/875
for index, language in enumerate(language_list):
if language.startswith("zh-") and "zh" not in language_list:
logger.warning(
f'Chinese locale detected: {language}. dateparser might fail to parse some dates with this locale, so Chinese ("zh") will be used as a fallback.',
)
language_list.append("zh")
return list(LocaleDataLoader().get_locale_map(locales=language_list))
# If not set, we will infer it at runtime # If not set, we will infer it at runtime
DATE_PARSER_LANGUAGES = ( DATE_PARSER_LANGUAGES = (
parse_dateparser_languages( _parse_dateparser_languages(
os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES"), os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES"),
) )
if os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES") if os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES")
@@ -948,7 +1190,7 @@ DATE_PARSER_LANGUAGES = (
# Maximum number of dates taken from document start to end to show as suggestions for # Maximum number of dates taken from document start to end to show as suggestions for
# `created` date in the frontend. Duplicates are removed, which can result in # `created` date in the frontend. Duplicates are removed, which can result in
# fewer dates shown. # fewer dates shown.
NUMBER_OF_SUGGESTED_DATES = get_int_from_env("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3) NUMBER_OF_SUGGESTED_DATES = __get_int("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3)
# Specify the filename format for out files # Specify the filename format for out files
FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT") FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
@@ -956,7 +1198,7 @@ FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
# If this is enabled, variables in filename format will resolve to # If this is enabled, variables in filename format will resolve to
# empty-string instead of 'none'. # empty-string instead of 'none'.
# Directories with 'empty names' are omitted, too. # Directories with 'empty names' are omitted, too.
FILENAME_FORMAT_REMOVE_NONE = get_bool_from_env( FILENAME_FORMAT_REMOVE_NONE = __get_boolean(
"PAPERLESS_FILENAME_FORMAT_REMOVE_NONE", "PAPERLESS_FILENAME_FORMAT_REMOVE_NONE",
"NO", "NO",
) )
@@ -967,7 +1209,7 @@ THUMBNAIL_FONT_NAME = os.getenv(
) )
# Tika settings # Tika settings
TIKA_ENABLED = get_bool_from_env("PAPERLESS_TIKA_ENABLED", "NO") TIKA_ENABLED = __get_boolean("PAPERLESS_TIKA_ENABLED", "NO")
TIKA_ENDPOINT = os.getenv("PAPERLESS_TIKA_ENDPOINT", "http://localhost:9998") TIKA_ENDPOINT = os.getenv("PAPERLESS_TIKA_ENDPOINT", "http://localhost:9998")
TIKA_GOTENBERG_ENDPOINT = os.getenv( TIKA_GOTENBERG_ENDPOINT = os.getenv(
"PAPERLESS_TIKA_GOTENBERG_ENDPOINT", "PAPERLESS_TIKA_GOTENBERG_ENDPOINT",
@@ -977,21 +1219,52 @@ TIKA_GOTENBERG_ENDPOINT = os.getenv(
if TIKA_ENABLED: if TIKA_ENABLED:
INSTALLED_APPS.append("paperless_tika.apps.PaperlessTikaConfig") INSTALLED_APPS.append("paperless_tika.apps.PaperlessTikaConfig")
AUDIT_LOG_ENABLED = get_bool_from_env("PAPERLESS_AUDIT_LOG_ENABLED", "true") AUDIT_LOG_ENABLED = __get_boolean("PAPERLESS_AUDIT_LOG_ENABLED", "true")
if AUDIT_LOG_ENABLED: if AUDIT_LOG_ENABLED:
INSTALLED_APPS.append("auditlog") INSTALLED_APPS.append("auditlog")
MIDDLEWARE.append("auditlog.middleware.AuditlogMiddleware") MIDDLEWARE.append("auditlog.middleware.AuditlogMiddleware")
def _parse_ignore_dates(
env_ignore: str,
date_order: str = DATE_ORDER,
) -> set[datetime.datetime]:
"""
If the PAPERLESS_IGNORE_DATES environment variable is set, parse the
user provided string(s) into dates
Args:
env_ignore (str): The value of the environment variable, comma separated dates
date_order (str, optional): The format of the date strings.
Defaults to DATE_ORDER.
Returns:
Set[datetime.datetime]: The set of parsed date objects
"""
import dateparser
ignored_dates = set()
for s in env_ignore.split(","):
d = dateparser.parse(
s,
settings={
"DATE_ORDER": date_order,
},
)
if d:
ignored_dates.add(d.date())
return ignored_dates
# List dates that should be ignored when trying to parse date from document text # List dates that should be ignored when trying to parse date from document text
IGNORE_DATES: set[datetime.date] = set() IGNORE_DATES: set[datetime.date] = set()
if os.getenv("PAPERLESS_IGNORE_DATES") is not None: if os.getenv("PAPERLESS_IGNORE_DATES") is not None:
IGNORE_DATES = parse_ignore_dates(os.getenv("PAPERLESS_IGNORE_DATES"), DATE_ORDER) IGNORE_DATES = _parse_ignore_dates(os.getenv("PAPERLESS_IGNORE_DATES"))
ENABLE_UPDATE_CHECK = os.getenv("PAPERLESS_ENABLE_UPDATE_CHECK", "default") ENABLE_UPDATE_CHECK = os.getenv("PAPERLESS_ENABLE_UPDATE_CHECK", "default")
if ENABLE_UPDATE_CHECK != "default": if ENABLE_UPDATE_CHECK != "default":
ENABLE_UPDATE_CHECK = get_bool_from_env("PAPERLESS_ENABLE_UPDATE_CHECK") ENABLE_UPDATE_CHECK = __get_boolean("PAPERLESS_ENABLE_UPDATE_CHECK")
APP_TITLE = os.getenv("PAPERLESS_APP_TITLE", None) APP_TITLE = os.getenv("PAPERLESS_APP_TITLE", None)
APP_LOGO = os.getenv("PAPERLESS_APP_LOGO", None) APP_LOGO = os.getenv("PAPERLESS_APP_LOGO", None)
@@ -1036,7 +1309,7 @@ def _get_nltk_language_setting(ocr_lang: str) -> str | None:
return iso_code_to_nltk.get(ocr_lang) return iso_code_to_nltk.get(ocr_lang)
NLTK_ENABLED: Final[bool] = get_bool_from_env("PAPERLESS_ENABLE_NLTK", "yes") NLTK_ENABLED: Final[bool] = __get_boolean("PAPERLESS_ENABLE_NLTK", "yes")
NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE) NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
@@ -1045,7 +1318,7 @@ NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
############################################################################### ###############################################################################
EMAIL_GNUPG_HOME: Final[str | None] = os.getenv("PAPERLESS_EMAIL_GNUPG_HOME") EMAIL_GNUPG_HOME: Final[str | None] = os.getenv("PAPERLESS_EMAIL_GNUPG_HOME")
EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = get_bool_from_env( EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = __get_boolean(
"PAPERLESS_ENABLE_GPG_DECRYPTOR", "PAPERLESS_ENABLE_GPG_DECRYPTOR",
) )
@@ -1053,7 +1326,7 @@ EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = get_bool_from_env(
############################################################################### ###############################################################################
# Soft Delete # # Soft Delete #
############################################################################### ###############################################################################
EMPTY_TRASH_DELAY = max(get_int_from_env("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1) EMPTY_TRASH_DELAY = max(__get_int("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1)
############################################################################### ###############################################################################
@@ -1078,17 +1351,21 @@ OUTLOOK_OAUTH_ENABLED = bool(
############################################################################### ###############################################################################
# Webhooks # Webhooks
############################################################################### ###############################################################################
WEBHOOKS_ALLOWED_SCHEMES = { WEBHOOKS_ALLOWED_SCHEMES = set(
s.lower() s.lower()
for s in get_list_from_env( for s in __get_list(
"PAPERLESS_WEBHOOKS_ALLOWED_SCHEMES", "PAPERLESS_WEBHOOKS_ALLOWED_SCHEMES",
default=["http", "https"], ["http", "https"],
) )
} )
WEBHOOKS_ALLOWED_PORTS = { WEBHOOKS_ALLOWED_PORTS = set(
int(p) for p in get_list_from_env("PAPERLESS_WEBHOOKS_ALLOWED_PORTS", default=[]) int(p)
} for p in __get_list(
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = get_bool_from_env( "PAPERLESS_WEBHOOKS_ALLOWED_PORTS",
[],
)
)
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = __get_boolean(
"PAPERLESS_WEBHOOKS_ALLOW_INTERNAL_REQUESTS", "PAPERLESS_WEBHOOKS_ALLOW_INTERNAL_REQUESTS",
"true", "true",
) )
@@ -1103,7 +1380,7 @@ REMOTE_OCR_ENDPOINT = os.getenv("PAPERLESS_REMOTE_OCR_ENDPOINT")
################################################################################ ################################################################################
# AI Settings # # AI Settings #
################################################################################ ################################################################################
AI_ENABLED = get_bool_from_env("PAPERLESS_AI_ENABLED", "NO") AI_ENABLED = __get_boolean("PAPERLESS_AI_ENABLED", "NO")
LLM_EMBEDDING_BACKEND = os.getenv( LLM_EMBEDDING_BACKEND = os.getenv(
"PAPERLESS_AI_LLM_EMBEDDING_BACKEND", "PAPERLESS_AI_LLM_EMBEDDING_BACKEND",
) # "huggingface" or "openai" ) # "huggingface" or "openai"

View File

@@ -1,191 +1,11 @@
import datetime
import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from celery.schedules import crontab
from dateparser.languages.loader import LocaleDataLoader
from paperless.settings.parsers import get_choice_from_env from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str from paperless.settings.parsers import parse_dict_from_str
logger = logging.getLogger(__name__)
def parse_hosting_settings() -> tuple[str | None, str, str, str, str]:
script_name = os.getenv("PAPERLESS_FORCE_SCRIPT_NAME")
base_url = (script_name or "") + "/"
login_url = base_url + "accounts/login/"
login_redirect_url = base_url + "dashboard"
logout_redirect_url = os.getenv(
"PAPERLESS_LOGOUT_REDIRECT_URL",
login_url + "?loggedout=1",
)
return script_name, base_url, login_url, login_redirect_url, logout_redirect_url
def parse_redis_url(env_redis: str | None) -> tuple[str, str]:
"""
Gets the Redis information from the environment or a default and handles
converting from incompatible django_channels and celery formats.
Returns a tuple of (celery_url, channels_url)
"""
# Not set, return a compatible default
if env_redis is None:
return ("redis://localhost:6379", "redis://localhost:6379")
if "unix" in env_redis.lower():
# channels_redis socket format, looks like:
# "unix:///path/to/redis.sock"
_, path = env_redis.split(":", maxsplit=1)
# Optionally setting a db number
if "?db=" in env_redis:
path, number = path.split("?db=")
return (f"redis+socket:{path}?virtual_host={number}", env_redis)
else:
return (f"redis+socket:{path}", env_redis)
elif "+socket" in env_redis.lower():
# celery socket style, looks like:
# "redis+socket:///path/to/redis.sock"
_, path = env_redis.split(":", maxsplit=1)
if "?virtual_host=" in env_redis:
# Virtual host (aka db number)
path, number = path.split("?virtual_host=")
return (env_redis, f"unix:{path}?db={number}")
else:
return (env_redis, f"unix:{path}")
# Not a socket
return (env_redis, env_redis)
def parse_beat_schedule() -> dict:
"""
Configures the scheduled tasks, according to default or
environment variables. Task expiration is configured so the task will
expire (and not run), shortly before the default frequency will put another
of the same task into the queue
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
"""
schedule = {}
tasks = [
{
"name": "Check all e-mail accounts",
"env_key": "PAPERLESS_EMAIL_TASK_CRON",
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
"options": {
# 1 minute before default schedule sends again
"expires": 9.0 * 60.0,
},
},
{
"name": "Train the classifier",
"env_key": "PAPERLESS_TRAIN_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Optimize the index",
"env_key": "PAPERLESS_INDEX_TASK_CRON",
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Perform sanity check",
"env_key": "PAPERLESS_SANITY_TASK_CRON",
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
"options": {
# 1 hour before default schedule sends again
"expires": ((7.0 * 24.0) - 1.0) * 60.0 * 60.0,
},
},
{
"name": "Empty trash",
"env_key": "PAPERLESS_EMPTY_TRASH_TASK_CRON",
# Default daily at 01:00
"env_default": "0 1 * * *",
"task": "documents.tasks.empty_trash",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Check and run scheduled workflows",
"env_key": "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.check_scheduled_workflows",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Rebuild LLM index",
"env_key": "PAPERLESS_LLM_INDEX_TASK_CRON",
# Default daily at 02:10
"env_default": "10 2 * * *",
"task": "documents.tasks.llmindex_index",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Cleanup expired share link bundles",
"env_key": "PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON",
# Default daily at 02:00
"env_default": "0 2 * * *",
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
]
for task in tasks:
# Either get the environment setting or use the default
value = os.getenv(task["env_key"], task["env_default"])
# Don't add disabled tasks to the schedule
if value == "disable":
continue
# I find https://crontab.guru/ super helpful
# crontab(5) format
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
"options": task["options"],
}
return schedule
def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]: def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
"""Parse database settings from environment variables. """Parse database settings from environment variables.
@@ -300,48 +120,3 @@ def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
) )
return {"default": db_config} return {"default": db_config}
def parse_dateparser_languages(languages: str | None) -> list[str]:
language_list = languages.split("+") if languages else []
# There is an unfixed issue in zh-Hant and zh-Hans locales in the dateparser lib.
# See: https://github.com/scrapinghub/dateparser/issues/875
for index, language in enumerate(language_list):
if language.startswith("zh-") and "zh" not in language_list:
logger.warning(
f"Chinese locale detected: {language}. dateparser might fail to parse"
f' some dates with this locale, so Chinese ("zh") will be used as a fallback.',
)
language_list.append("zh")
return list(LocaleDataLoader().get_locale_map(locales=language_list))
def parse_ignore_dates(
env_ignore: str,
date_order: str,
) -> set[datetime.date]:
"""
If the PAPERLESS_IGNORE_DATES environment variable is set, parse the
user provided string(s) into dates
Args:
env_ignore (str): The value of the environment variable, comma separated dates
date_order (str): The format of the date strings.
Returns:
set[datetime.date]: The set of parsed date objects
"""
import dateparser
ignored_dates = set()
for s in env_ignore.split(","):
d = dateparser.parse(
s,
settings={
"DATE_ORDER": date_order,
},
)
if d:
ignored_dates.add(d.date())
return ignored_dates

View File

@@ -156,108 +156,6 @@ def parse_dict_from_str(
return settings return settings
def get_bool_from_env(key: str, default: str = "NO") -> bool:
"""
Return a boolean value based on whatever the user has supplied in the
environment based on whether the value "looks like" it's True or not.
"""
return str_to_bool(os.getenv(key, default))
@overload
def get_float_from_env(key: str) -> float | None: ...
@overload
def get_float_from_env(key: str, default: None) -> float | None: ...
@overload
def get_float_from_env(key: str, default: float) -> float: ...
def get_float_from_env(key: str, default: float | None = None) -> float | None:
"""
Return a float value based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default
return float(os.environ[key])
@overload
def get_path_from_env(key: str) -> Path | None: ...
@overload
def get_path_from_env(key: str, default: None) -> Path | None: ...
@overload
def get_path_from_env(key: str, default: Path | str) -> Path: ...
def get_path_from_env(key: str, default: Path | str | None = None) -> Path | None:
"""
Return a Path object based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default if default is None else Path(default).resolve()
return Path(os.environ[key]).resolve()
def get_list_from_env(
key: str,
separator: str = ",",
default: list[T] | None = None,
*,
strip_whitespace: bool = True,
remove_empty: bool = True,
required: bool = False,
) -> list[str] | list[T]:
"""
Get and parse a list from an environment variable or return a default.
Args:
key: Environment variable name
separator: Character(s) to split on (default: ',')
default: Default value to return if env var is not set or empty
strip_whitespace: Whether to strip whitespace from each element
remove_empty: Whether to remove empty strings from the result
required: If True, raise an error when the env var is missing and no default provided
Returns:
List of strings or list of type-cast values, or default if env var is empty/None
Raises:
ValueError: If required=True and env var is missing and there is no default
"""
# Get the environment variable value
env_value = os.environ.get(key)
# Handle required environment variables
if required and env_value is None and default is None:
raise ValueError(f"Required environment variable '{key}' is not set")
if env_value:
items = env_value.split(separator)
if strip_whitespace:
items = [item.strip() for item in items]
if remove_empty:
items = [item for item in items if item]
return items
elif default is not None:
return default
else:
return []
def get_choice_from_env( def get_choice_from_env(
env_key: str, env_key: str,
choices: set[str], choices: set[str],

View File

@@ -1,279 +1,10 @@
import datetime
import os import os
from pathlib import Path from pathlib import Path
from typing import Any
import pytest import pytest
from celery.schedules import crontab
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from paperless.settings.custom import parse_beat_schedule
from paperless.settings.custom import parse_dateparser_languages
from paperless.settings.custom import parse_db_settings from paperless.settings.custom import parse_db_settings
from paperless.settings.custom import parse_hosting_settings
from paperless.settings.custom import parse_ignore_dates
from paperless.settings.custom import parse_redis_url
class TestRedisSocketConversion:
@pytest.mark.parametrize(
("input_url", "expected"),
[
pytest.param(
None,
("redis://localhost:6379", "redis://localhost:6379"),
id="none_uses_default",
),
pytest.param(
"redis+socket:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
id="celery_style_socket",
),
pytest.param(
"unix:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
id="redis_py_style_socket",
),
pytest.param(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
"unix:///run/redis/redis.sock?db=5",
),
id="celery_style_socket_with_db",
),
pytest.param(
"unix:///run/redis/redis.sock?db=10",
(
"redis+socket:///run/redis/redis.sock?virtual_host=10",
"unix:///run/redis/redis.sock?db=10",
),
id="redis_py_style_socket_with_db",
),
pytest.param(
"redis://myredishost:6379",
("redis://myredishost:6379", "redis://myredishost:6379"),
id="host_with_port_unchanged",
),
# Credentials in unix:// URL contain multiple colons (user:password@)
# Regression test for https://github.com/paperless-ngx/paperless-ngx/pull/12239
pytest.param(
"unix://user:password@/run/redis/redis.sock",
(
"redis+socket://user:password@/run/redis/redis.sock",
"unix://user:password@/run/redis/redis.sock",
),
id="redis_py_style_socket_with_credentials",
),
pytest.param(
"redis+socket://user:password@/run/redis/redis.sock",
(
"redis+socket://user:password@/run/redis/redis.sock",
"unix://user:password@/run/redis/redis.sock",
),
id="celery_style_socket_with_credentials",
),
],
)
def test_redis_socket_parsing(
self,
input_url: str | None,
expected: tuple[str, str],
) -> None:
"""
GIVEN:
- Various Redis connection URI formats
WHEN:
- The URI is parsed
THEN:
- Socket based URIs are translated
- Non-socket URIs are unchanged
- None provided uses default
"""
result = parse_redis_url(input_url)
assert expected == result
class TestParseHostingSettings:
@pytest.mark.parametrize(
("env", "expected"),
[
pytest.param(
{},
(
None,
"/",
"/accounts/login/",
"/dashboard",
"/accounts/login/?loggedout=1",
),
id="no_env_vars",
),
pytest.param(
{"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless"},
(
"/paperless",
"/paperless/",
"/paperless/accounts/login/",
"/paperless/dashboard",
"/paperless/accounts/login/?loggedout=1",
),
id="force_script_name_only",
),
pytest.param(
{
"PAPERLESS_FORCE_SCRIPT_NAME": "/docs",
"PAPERLESS_LOGOUT_REDIRECT_URL": "/custom/logout",
},
(
"/docs",
"/docs/",
"/docs/accounts/login/",
"/docs/dashboard",
"/custom/logout",
),
id="force_script_name_and_logout_redirect",
),
],
)
def test_parse_hosting_settings(
self,
mocker: MockerFixture,
env: dict[str, str],
expected: tuple[str | None, str, str, str, str],
) -> None:
"""Test parse_hosting_settings with various env configurations."""
mocker.patch.dict(os.environ, env, clear=True)
result = parse_hosting_settings()
assert result == expected
def make_expected_schedule(
overrides: dict[str, dict[str, Any]] | None = None,
disabled: set[str] | None = None,
) -> dict[str, Any]:
"""
Build the expected schedule with optional overrides and disabled tasks.
"""
mail_expire = 9.0 * 60.0
classifier_expire = 59.0 * 60.0
index_expire = 23.0 * 60.0 * 60.0
sanity_expire = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
empty_trash_expire = 23.0 * 60.0 * 60.0
workflow_expire = 59.0 * 60.0
llm_index_expire = 23.0 * 60.0 * 60.0
share_link_cleanup_expire = 23.0 * 60.0 * 60.0
schedule: dict[str, Any] = {
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": mail_expire},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": classifier_expire},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": index_expire},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": sanity_expire},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": empty_trash_expire},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": workflow_expire},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute="10", hour="2"),
"options": {"expires": llm_index_expire},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour="2"),
"options": {"expires": share_link_cleanup_expire},
},
}
overrides = overrides or {}
disabled = disabled or set()
for key, val in overrides.items():
schedule[key] = {**schedule.get(key, {}), **val}
for key in disabled:
schedule.pop(key, None)
return schedule
class TestParseBeatSchedule:
@pytest.mark.parametrize(
("env", "expected"),
[
pytest.param({}, make_expected_schedule(), id="defaults"),
pytest.param(
{"PAPERLESS_EMAIL_TASK_CRON": "*/50 * * * mon"},
make_expected_schedule(
overrides={
"Check all e-mail accounts": {
"schedule": crontab(minute="*/50", day_of_week="mon"),
},
},
),
id="email-changed",
),
pytest.param(
{"PAPERLESS_INDEX_TASK_CRON": "disable"},
make_expected_schedule(disabled={"Optimize the index"}),
id="index-disabled",
),
pytest.param(
{
"PAPERLESS_EMAIL_TASK_CRON": "disable",
"PAPERLESS_TRAIN_TASK_CRON": "disable",
"PAPERLESS_SANITY_TASK_CRON": "disable",
"PAPERLESS_INDEX_TASK_CRON": "disable",
"PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
"PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
"PAPERLESS_LLM_INDEX_TASK_CRON": "disable",
"PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON": "disable",
},
{},
id="all-disabled",
),
],
)
def test_parse_beat_schedule(
self,
env: dict[str, str],
expected: dict[str, Any],
mocker: MockerFixture,
) -> None:
mocker.patch.dict(os.environ, env, clear=False)
schedule = parse_beat_schedule()
assert schedule == expected
class TestParseDbSettings: class TestParseDbSettings:
@@ -533,85 +264,3 @@ class TestParseDbSettings:
settings = parse_db_settings(tmp_path) settings = parse_db_settings(tmp_path)
assert settings == expected_database_settings assert settings == expected_database_settings
class TestParseIgnoreDates:
"""Tests the parsing of the PAPERLESS_IGNORE_DATES setting value."""
def test_no_ignore_dates_set(self) -> None:
"""
GIVEN:
- No ignore dates are set
THEN:
- No ignore dates are parsed
"""
assert parse_ignore_dates("", "YMD") == set()
@pytest.mark.parametrize(
("env_str", "date_format", "expected"),
[
pytest.param(
"1985-05-01",
"YMD",
{datetime.date(1985, 5, 1)},
id="single-ymd",
),
pytest.param(
"1985-05-01,1991-12-05",
"YMD",
{datetime.date(1985, 5, 1), datetime.date(1991, 12, 5)},
id="multiple-ymd",
),
pytest.param(
"2010-12-13",
"YMD",
{datetime.date(2010, 12, 13)},
id="single-ymd-2",
),
pytest.param(
"11.01.10",
"DMY",
{datetime.date(2010, 1, 11)},
id="single-dmy",
),
pytest.param(
"11.01.2001,15-06-1996",
"DMY",
{datetime.date(2001, 1, 11), datetime.date(1996, 6, 15)},
id="multiple-dmy",
),
],
)
def test_ignore_dates_parsed(
self,
env_str: str,
date_format: str,
expected: set[datetime.date],
) -> None:
"""
GIVEN:
- Ignore dates are set per certain inputs
THEN:
- All ignore dates are parsed
"""
assert parse_ignore_dates(env_str, date_format) == expected
@pytest.mark.parametrize(
("languages", "expected"),
[
("de", ["de"]),
("zh", ["zh"]),
("fr+en", ["fr", "en"]),
# Locales must be supported
("en-001+fr-CA", ["en-001", "fr-CA"]),
("en-001+fr", ["en-001", "fr"]),
# Special case for Chinese: variants seem to miss some dates,
# so we always add "zh" as a fallback.
("en+zh-Hans-HK", ["en", "zh-Hans-HK", "zh"]),
("en+zh-Hans", ["en", "zh-Hans", "zh"]),
("en+zh-Hans+zh-Hant", ["en", "zh-Hans", "zh-Hant", "zh"]),
],
)
def test_parse_dateparser_languages(languages: str, expected: list[str]) -> None:
assert sorted(parse_dateparser_languages(languages)) == sorted(expected)

View File

@@ -4,12 +4,8 @@ from pathlib import Path
import pytest import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from paperless.settings.parsers import get_bool_from_env
from paperless.settings.parsers import get_choice_from_env from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_float_from_env
from paperless.settings.parsers import get_int_from_env from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import get_list_from_env
from paperless.settings.parsers import get_path_from_env
from paperless.settings.parsers import parse_dict_from_str from paperless.settings.parsers import parse_dict_from_str
from paperless.settings.parsers import str_to_bool from paperless.settings.parsers import str_to_bool
@@ -209,29 +205,6 @@ class TestParseDictFromString:
assert isinstance(result["database"]["port"], int) assert isinstance(result["database"]["port"], int)
class TestGetBoolFromEnv:
def test_existing_env_var(self, mocker):
"""Test that an existing environment variable is read and converted."""
mocker.patch.dict(os.environ, {"TEST_VAR": "true"})
assert get_bool_from_env("TEST_VAR") is True
def test_missing_env_var_uses_default_no(self, mocker):
"""Test that a missing environment variable uses default 'NO' and returns False."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_bool_from_env("MISSING_VAR") is False
def test_missing_env_var_with_explicit_default(self, mocker):
"""Test that a missing environment variable uses the provided default."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_bool_from_env("MISSING_VAR", default="yes") is True
def test_invalid_value_raises_error(self, mocker):
"""Test that an invalid value raises ValueError (delegates to str_to_bool)."""
mocker.patch.dict(os.environ, {"INVALID_VAR": "maybe"})
with pytest.raises(ValueError):
get_bool_from_env("INVALID_VAR")
class TestGetIntFromEnv: class TestGetIntFromEnv:
@pytest.mark.parametrize( @pytest.mark.parametrize(
("env_value", "expected"), ("env_value", "expected"),
@@ -286,199 +259,6 @@ class TestGetIntFromEnv:
get_int_from_env("INVALID_INT") get_int_from_env("INVALID_INT")
class TestGetFloatFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("3.14", 3.14, id="pi"),
pytest.param("42", 42.0, id="int_as_float"),
pytest.param("-2.5", -2.5, id="negative"),
pytest.param("0.0", 0.0, id="zero_float"),
pytest.param("0", 0.0, id="zero_int"),
pytest.param("1.5e2", 150.0, id="sci_positive"),
pytest.param("1e-3", 0.001, id="sci_negative"),
pytest.param("-1.23e4", -12300.0, id="sci_large"),
],
)
def test_existing_env_var_valid_floats(self, mocker, env_value, expected):
"""Test that existing environment variables with valid floats return correct values."""
mocker.patch.dict(os.environ, {"FLOAT_VAR": env_value})
assert get_float_from_env("FLOAT_VAR") == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(3.14, 3.14, id="pi_default"),
pytest.param(0.0, 0.0, id="zero_default"),
pytest.param(-2.5, -2.5, id="negative_default"),
pytest.param(None, None, id="none_default"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_float_from_env("MISSING_VAR", default=default) == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_float_from_env("MISSING_VAR") is None
@pytest.mark.parametrize(
"invalid_value",
[
pytest.param("not_a_number", id="text"),
pytest.param("42.5.0", id="double_decimal"),
pytest.param("42a", id="alpha_suffix"),
pytest.param("", id="empty"),
pytest.param(" ", id="whitespace"),
pytest.param("true", id="boolean"),
pytest.param("1.2.3", id="triple_decimal"),
],
)
def test_invalid_float_values_raise_error(self, mocker, invalid_value):
"""Test that invalid float values raise ValueError."""
mocker.patch.dict(os.environ, {"INVALID_FLOAT": invalid_value})
with pytest.raises(ValueError):
get_float_from_env("INVALID_FLOAT")
class TestGetPathFromEnv:
@pytest.mark.parametrize(
"env_value",
[
pytest.param("/tmp/test", id="absolute"),
pytest.param("relative/path", id="relative"),
pytest.param("/path/with spaces/file.txt", id="spaces"),
pytest.param(".", id="current_dir"),
pytest.param("..", id="parent_dir"),
pytest.param("/", id="root"),
],
)
def test_existing_env_var_paths(self, mocker, env_value):
"""Test that existing environment variables with paths return resolved Path objects."""
mocker.patch.dict(os.environ, {"PATH_VAR": env_value})
result = get_path_from_env("PATH_VAR")
assert isinstance(result, Path)
assert result == Path(env_value).resolve()
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_path_from_env("MISSING_VAR") is None
def test_missing_env_var_with_none_default(self, mocker):
"""Test that missing environment variable with None default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_path_from_env("MISSING_VAR", default=None) is None
@pytest.mark.parametrize(
"default_path_str",
[
pytest.param("/default/path", id="absolute_default"),
pytest.param("relative/default", id="relative_default"),
pytest.param(".", id="current_default"),
],
)
def test_missing_env_var_with_path_defaults(self, mocker, default_path_str):
"""Test that missing environment variables return resolved default Path objects."""
mocker.patch.dict(os.environ, {}, clear=True)
default_path = Path(default_path_str)
result = get_path_from_env("MISSING_VAR", default=default_path)
assert isinstance(result, Path)
assert result == default_path.resolve()
def test_relative_paths_are_resolved(self, mocker):
"""Test that relative paths are properly resolved to absolute paths."""
mocker.patch.dict(os.environ, {"REL_PATH": "relative/path"})
result = get_path_from_env("REL_PATH")
assert result is not None
assert result.is_absolute()
class TestGetListFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("a,b,c", ["a", "b", "c"], id="basic_comma_separated"),
pytest.param("single", ["single"], id="single_element"),
pytest.param("", [], id="empty_string"),
pytest.param("a, b , c", ["a", "b", "c"], id="whitespace_trimmed"),
pytest.param("a,,b,c", ["a", "b", "c"], id="empty_elements_removed"),
],
)
def test_existing_env_var_basic_parsing(self, mocker, env_value, expected):
"""Test that existing environment variables are parsed correctly."""
mocker.patch.dict(os.environ, {"LIST_VAR": env_value})
result = get_list_from_env("LIST_VAR")
assert result == expected
@pytest.mark.parametrize(
("separator", "env_value", "expected"),
[
pytest.param("|", "a|b|c", ["a", "b", "c"], id="pipe_separator"),
pytest.param(":", "a:b:c", ["a", "b", "c"], id="colon_separator"),
pytest.param(";", "a;b;c", ["a", "b", "c"], id="semicolon_separator"),
],
)
def test_custom_separators(self, mocker, separator, env_value, expected):
"""Test that custom separators work correctly."""
mocker.patch.dict(os.environ, {"LIST_VAR": env_value})
result = get_list_from_env("LIST_VAR", separator=separator)
assert result == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(
["default1", "default2"],
["default1", "default2"],
id="string_list_default",
),
pytest.param([1, 2, 3], [1, 2, 3], id="int_list_default"),
pytest.param(None, [], id="none_default_returns_empty_list"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("MISSING_VAR", default=default)
assert result == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns empty list."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("MISSING_VAR")
assert result == []
def test_required_env_var_missing_raises_error(self, mocker):
"""Test that missing required environment variable raises ValueError."""
mocker.patch.dict(os.environ, {}, clear=True)
with pytest.raises(
ValueError,
match="Required environment variable 'REQUIRED_VAR' is not set",
):
get_list_from_env("REQUIRED_VAR", required=True)
def test_required_env_var_with_default_does_not_raise(self, mocker):
"""Test that required environment variable with default does not raise error."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("REQUIRED_VAR", default=["default"], required=True)
assert result == ["default"]
def test_strip_whitespace_false(self, mocker):
"""Test that whitespace is preserved when strip_whitespace=False."""
mocker.patch.dict(os.environ, {"LIST_VAR": " a , b , c "})
result = get_list_from_env("LIST_VAR", strip_whitespace=False)
assert result == [" a ", " b ", " c "]
def test_remove_empty_false(self, mocker):
"""Test that empty elements are preserved when remove_empty=False."""
mocker.patch.dict(os.environ, {"LIST_VAR": "a,,b,,c"})
result = get_list_from_env("LIST_VAR", remove_empty=False)
assert result == ["a", "", "b", "", "c"]
class TestGetEnvChoice: class TestGetEnvChoice:
@pytest.fixture @pytest.fixture
def valid_choices(self) -> set[str]: def valid_choices(self) -> set[str]:
@@ -614,3 +394,21 @@ class TestGetEnvChoice:
result = get_choice_from_env("TEST_ENV", large_choices) result = get_choice_from_env("TEST_ENV", large_choices)
assert result == "option_50" assert result == "option_50"
def test_different_env_keys(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test function works with different environment variable keys."""
test_cases = [
("DJANGO_ENV", "development"),
("DATABASE_BACKEND", "staging"),
("LOG_LEVEL", "production"),
("APP_MODE", "development"),
]
for env_key, env_value in test_cases:
mocker.patch.dict("os.environ", {env_key: env_value})
result = get_choice_from_env(env_key, valid_choices)
assert result == env_value

View File

@@ -1,56 +0,0 @@
import os
from unittest import TestCase
from unittest import mock
from paperless.settings import _parse_paperless_url
from paperless.settings import default_threads_per_worker
class TestThreadCalculation(TestCase):
def test_workers_threads(self) -> None:
"""
GIVEN:
- Certain CPU counts
WHEN:
- Threads per worker is calculated
THEN:
- Threads per worker less than or equal to CPU count
- At least 1 thread per worker
"""
default_workers = 1
for i in range(1, 64):
with mock.patch(
"paperless.settings.multiprocessing.cpu_count",
) as cpu_count:
cpu_count.return_value = i
default_threads = default_threads_per_worker(default_workers)
self.assertGreaterEqual(default_threads, 1)
self.assertLessEqual(default_workers * default_threads, i)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""
GIVEN:
- PAPERLESS_URL is set
WHEN:
- The URL is parsed
THEN:
- The URL is returned and present in related settings
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_URL": "https://example.com",
},
):
url = _parse_paperless_url()
self.assertEqual("https://example.com", url)
from django.conf import settings
self.assertIn(url, settings.CSRF_TRUSTED_ORIGINS)
self.assertIn(url, settings.CORS_ALLOWED_ORIGINS)

View File

@@ -0,0 +1,482 @@
import datetime
import os
from unittest import TestCase
from unittest import mock
import pytest
from celery.schedules import crontab
from paperless.settings import _parse_base_paths
from paperless.settings import _parse_beat_schedule
from paperless.settings import _parse_dateparser_languages
from paperless.settings import _parse_ignore_dates
from paperless.settings import _parse_paperless_url
from paperless.settings import _parse_redis_url
from paperless.settings import default_threads_per_worker
class TestIgnoreDateParsing(TestCase):
"""
Tests the parsing of the PAPERLESS_IGNORE_DATES setting value
"""
def _parse_checker(self, test_cases) -> None:
"""
Helper function to check ignore date parsing
Args:
test_cases (_type_): _description_
"""
for env_str, date_format, expected_date_set in test_cases:
self.assertSetEqual(
_parse_ignore_dates(env_str, date_format),
expected_date_set,
)
def test_no_ignore_dates_set(self) -> None:
"""
GIVEN:
- No ignore dates are set
THEN:
- No ignore dates are parsed
"""
self.assertSetEqual(_parse_ignore_dates(""), set())
def test_single_ignore_dates_set(self) -> None:
"""
GIVEN:
- Ignore dates are set per certain inputs
THEN:
- All ignore dates are parsed
"""
test_cases = [
("1985-05-01", "YMD", {datetime.date(1985, 5, 1)}),
(
"1985-05-01,1991-12-05",
"YMD",
{datetime.date(1985, 5, 1), datetime.date(1991, 12, 5)},
),
("2010-12-13", "YMD", {datetime.date(2010, 12, 13)}),
("11.01.10", "DMY", {datetime.date(2010, 1, 11)}),
(
"11.01.2001,15-06-1996",
"DMY",
{datetime.date(2001, 1, 11), datetime.date(1996, 6, 15)},
),
]
self._parse_checker(test_cases)
class TestThreadCalculation(TestCase):
def test_workers_threads(self) -> None:
"""
GIVEN:
- Certain CPU counts
WHEN:
- Threads per worker is calculated
THEN:
- Threads per worker less than or equal to CPU count
- At least 1 thread per worker
"""
default_workers = 1
for i in range(1, 64):
with mock.patch(
"paperless.settings.multiprocessing.cpu_count",
) as cpu_count:
cpu_count.return_value = i
default_threads = default_threads_per_worker(default_workers)
self.assertGreaterEqual(default_threads, 1)
self.assertLessEqual(default_workers * default_threads, i)
class TestRedisSocketConversion(TestCase):
def test_redis_socket_parsing(self) -> None:
"""
GIVEN:
- Various Redis connection URI formats
WHEN:
- The URI is parsed
THEN:
- Socket based URIs are translated
- Non-socket URIs are unchanged
- None provided uses default
"""
for input, expected in [
# Nothing is set
(None, ("redis://localhost:6379", "redis://localhost:6379")),
# celery style
(
"redis+socket:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
),
# redis-py / channels-redis style
(
"unix:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
),
# celery style with db
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
"unix:///run/redis/redis.sock?db=5",
),
),
# redis-py / channels-redis style with db
(
"unix:///run/redis/redis.sock?db=10",
(
"redis+socket:///run/redis/redis.sock?virtual_host=10",
"unix:///run/redis/redis.sock?db=10",
),
),
# Just a host with a port
(
"redis://myredishost:6379",
("redis://myredishost:6379", "redis://myredishost:6379"),
),
]:
result = _parse_redis_url(input)
self.assertTupleEqual(expected, result)
class TestCeleryScheduleParsing(TestCase):
MAIL_EXPIRE_TIME = 9.0 * 60.0
CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0
INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
SANITY_EXPIRE_TIME = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
EMPTY_TRASH_EXPIRE_TIME = 23.0 * 60.0 * 60.0
RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME = 59.0 * 60.0
LLM_INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME = 23.0 * 60.0 * 60.0
def test_schedule_configuration_default(self) -> None:
"""
GIVEN:
- No configured task schedules
WHEN:
- The celery beat schedule is built
THEN:
- The default schedule is returned
"""
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_changed(self) -> None:
"""
GIVEN:
- Email task is configured non-default
WHEN:
- The celery beat schedule is built
THEN:
- The email task is configured per environment
- The default schedule is returned for other tasks
"""
with mock.patch.dict(
os.environ,
{"PAPERLESS_EMAIL_TASK_CRON": "*/50 * * * mon"},
):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/50", day_of_week="mon"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_disabled(self) -> None:
"""
GIVEN:
- Search index task is disabled
WHEN:
- The celery beat schedule is built
THEN:
- The search index task is not present
- The default schedule is returned for other tasks
"""
with mock.patch.dict(os.environ, {"PAPERLESS_INDEX_TASK_CRON": "disable"}):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_disabled_all(self) -> None:
"""
GIVEN:
- All tasks are disabled
WHEN:
- The celery beat schedule is built
THEN:
- No tasks are scheduled
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_EMAIL_TASK_CRON": "disable",
"PAPERLESS_TRAIN_TASK_CRON": "disable",
"PAPERLESS_SANITY_TASK_CRON": "disable",
"PAPERLESS_INDEX_TASK_CRON": "disable",
"PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
"PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
"PAPERLESS_LLM_INDEX_TASK_CRON": "disable",
"PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON": "disable",
},
):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{},
schedule,
)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""
GIVEN:
- PAPERLESS_URL is set
WHEN:
- The URL is parsed
THEN:
- The URL is returned and present in related settings
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_URL": "https://example.com",
},
):
url = _parse_paperless_url()
self.assertEqual("https://example.com", url)
from django.conf import settings
self.assertIn(url, settings.CSRF_TRUSTED_ORIGINS)
self.assertIn(url, settings.CORS_ALLOWED_ORIGINS)
class TestPathSettings(TestCase):
def test_default_paths(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is not set
WHEN:
- Settings are parsed
THEN:
- Paths are as expected
"""
base_paths = _parse_base_paths()
self.assertEqual(None, base_paths[0]) # FORCE_SCRIPT_NAME
self.assertEqual("/", base_paths[1]) # BASE_URL
self.assertEqual("/accounts/login/", base_paths[2]) # LOGIN_URL
self.assertEqual("/dashboard", base_paths[3]) # LOGIN_REDIRECT_URL
self.assertEqual(
"/accounts/login/?loggedout=1",
base_paths[4],
) # LOGOUT_REDIRECT_URL
@mock.patch("os.environ", {"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless"})
def test_subpath(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is set
WHEN:
- Settings are parsed
THEN:
- The path is returned and present in related settings
"""
base_paths = _parse_base_paths()
self.assertEqual("/paperless", base_paths[0]) # FORCE_SCRIPT_NAME
self.assertEqual("/paperless/", base_paths[1]) # BASE_URL
self.assertEqual("/paperless/accounts/login/", base_paths[2]) # LOGIN_URL
self.assertEqual("/paperless/dashboard", base_paths[3]) # LOGIN_REDIRECT_URL
self.assertEqual(
"/paperless/accounts/login/?loggedout=1",
base_paths[4],
) # LOGOUT_REDIRECT_URL
@mock.patch(
"os.environ",
{
"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless",
"PAPERLESS_LOGOUT_REDIRECT_URL": "/foobar/",
},
)
def test_subpath_with_explicit_logout_url(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is set and so is PAPERLESS_LOGOUT_REDIRECT_URL
WHEN:
- Settings are parsed
THEN:
- The correct logout redirect URL is returned
"""
base_paths = _parse_base_paths()
self.assertEqual("/paperless/", base_paths[1]) # BASE_URL
self.assertEqual("/foobar/", base_paths[4]) # LOGOUT_REDIRECT_URL
@pytest.mark.parametrize(
("languages", "expected"),
[
("de", ["de"]),
("zh", ["zh"]),
("fr+en", ["fr", "en"]),
# Locales must be supported
("en-001+fr-CA", ["en-001", "fr-CA"]),
("en-001+fr", ["en-001", "fr"]),
# Special case for Chinese: variants seem to miss some dates,
# so we always add "zh" as a fallback.
("en+zh-Hans-HK", ["en", "zh-Hans-HK", "zh"]),
("en+zh-Hans", ["en", "zh-Hans", "zh"]),
("en+zh-Hans+zh-Hant", ["en", "zh-Hans", "zh-Hant", "zh"]),
],
)
def test_parser_date_parser_languages(languages, expected) -> None:
assert sorted(_parse_dateparser_languages(languages)) == sorted(expected)

741
uv.lock generated

File diff suppressed because it is too large Load Diff