Compare commits

..

5 Commits

Author SHA1 Message Date
Trenton H
19fd2de16b Fixes the default value provided to the list parser 2026-03-04 14:56:38 -08:00
Trenton H
afbf28c975 Chore: Move settings-related test files into settings/ subfolder
Move test_settings.py, test_db_cache.py, and test_remote_user.py to
paperless/tests/settings/ as they all directly test settings parsing
functions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 14:47:47 -08:00
Trenton H
3ec1a33cbc Chore: Remove duplicate bool conversion tests from TestGetBoolFromEnv
TestStringToBool already covers str_to_bool exhaustively. TestGetBoolFromEnv
now only tests the env var lookup concerns (present, missing, default).
Also remove TestGetEnvChoice.test_different_env_keys which duplicated
test_returns_valid_env_value.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 14:47:47 -08:00
Trenton H
8787d403f2 Chore: Split test_settings.py and add regression tests for Redis URL parsing
- Migrate tests for custom parsers to test_custom_parsers.py
- Add TestParseIgnoreDates and test_parse_dateparser_languages
- Add regression test cases for Redis URLs with credentials (PR #12239)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 14:47:47 -08:00
Trenton H
c6a40191f6 Chore: Refactor settings parsers into dedicated modules
- Add get_bool_from_env, get_float_from_env, get_path_from_env,
  get_list_from_env to parsers.py (ported from paperless-ngx-next)
- Move _parse_redis_url, _parse_beat_schedule, _parse_base_paths out
  of settings/__init__.py into custom.py as public functions
- Replace all inline __get_* helpers in __init__.py with the
  dedicated parser functions
- Include maxsplit=1 fix in parse_redis_url for socket paths with
  multiple colons
- Add comprehensive tests for all new parser functions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 14:47:47 -08:00
14 changed files with 1376 additions and 1232 deletions

View File

@@ -3,8 +3,6 @@ import json
import os
import shutil
import tempfile
from itertools import chain
from itertools import islice
from pathlib import Path
from typing import TYPE_CHECKING
@@ -21,7 +19,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core import serializers
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.utils import timezone
from filelock import FileLock
@@ -29,8 +26,6 @@ from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
if TYPE_CHECKING:
from collections.abc import Generator
from django.db.models import QuerySet
if settings.AUDIT_LOG_ENABLED:
@@ -65,22 +60,6 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
def serialize_queryset_batched(
queryset: "QuerySet",
*,
batch_size: int = 500,
) -> "Generator[list[dict], None, None]":
"""Yield batches of serialized records from a QuerySet.
Each batch is a list of dicts in Django's Python serialization format.
Uses QuerySet.iterator() to avoid loading the full queryset into memory,
and islice to collect chunk-sized batches serialized in a single call.
"""
iterator = queryset.iterator(chunk_size=batch_size)
while chunk := list(islice(iterator, batch_size)):
yield serializers.serialize("python", chunk)
class Command(CryptMixin, BaseCommand):
help = (
"Decrypt and rename all files in our collection into a given target "
@@ -207,17 +186,6 @@ class Command(CryptMixin, BaseCommand):
help="If provided, is used to encrypt sensitive data in the export",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help=(
"Number of records to process per batch during serialization. "
"Lower values reduce peak memory usage; higher values improve "
"throughput. Default: 500."
),
)
def handle(self, *args, **options) -> None:
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
@@ -232,7 +200,6 @@ class Command(CryptMixin, BaseCommand):
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: str | None = options.get("passphrase")
self.batch_size: int = options["batch_size"]
self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set()
@@ -327,13 +294,8 @@ class Command(CryptMixin, BaseCommand):
# Build an overall manifest
for key, object_query in manifest_key_to_object_query.items():
manifest_dict[key] = list(
chain.from_iterable(
serialize_queryset_batched(
object_query,
batch_size=self.batch_size,
),
),
manifest_dict[key] = json.loads(
serializers.serialize("json", object_query),
)
self.encrypt_secret_fields(manifest_dict)
@@ -550,24 +512,14 @@ class Command(CryptMixin, BaseCommand):
self.files_in_export_dir.remove(target)
if self.compare_json:
target_checksum = hashlib.md5(target.read_bytes()).hexdigest()
src_str = json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
)
src_str = json.dumps(content, indent=2, ensure_ascii=False)
src_checksum = hashlib.md5(src_str.encode("utf-8")).hexdigest()
if src_checksum == target_checksum:
perform_write = False
if perform_write:
target.write_text(
json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
),
json.dumps(content, indent=2, ensure_ascii=False),
encoding="utf-8",
)

View File

@@ -6,18 +6,25 @@ import math
import multiprocessing
import os
import tempfile
from os import PathLike
from pathlib import Path
from typing import Final
from urllib.parse import urlparse
from celery.schedules import crontab
from compression_middleware.middleware import CompressionMiddleware
from dateparser.languages.loader import LocaleDataLoader
from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv
from paperless.settings.custom import parse_beat_schedule
from paperless.settings.custom import parse_dateparser_languages
from paperless.settings.custom import parse_db_settings
from paperless.settings.custom import parse_hosting_settings
from paperless.settings.custom import parse_ignore_dates
from paperless.settings.custom import parse_redis_url
from paperless.settings.parsers import get_bool_from_env
from paperless.settings.parsers import get_float_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import get_list_from_env
from paperless.settings.parsers import get_path_from_env
logger = logging.getLogger("paperless.settings")
@@ -45,239 +52,8 @@ for path in [
os.environ["OMP_THREAD_LIMIT"] = "1"
def __get_boolean(key: str, default: str = "NO") -> bool:
"""
Return a boolean value based on whatever the user has supplied in the
environment based on whether the value "looks like" it's True or not.
"""
return bool(os.getenv(key, default).lower() in ("yes", "y", "1", "t", "true"))
def __get_int(key: str, default: int) -> int:
"""
Return an integer value based on the environment variable or a default
"""
return int(os.getenv(key, default))
def __get_optional_int(key: str) -> int | None:
"""
Returns None if the environment key is not present, otherwise an integer
"""
if key in os.environ:
return __get_int(key, -1) # pragma: no cover
return None
def __get_float(key: str, default: float) -> float:
"""
Return an integer value based on the environment variable or a default
"""
return float(os.getenv(key, default))
def __get_path(
key: str,
default: PathLike | str,
) -> Path:
"""
Return a normalized, absolute path based on the environment variable or a default,
if provided
"""
if key in os.environ:
return Path(os.environ[key]).resolve()
return Path(default).resolve()
def __get_optional_path(key: str) -> Path | None:
"""
Returns None if the environment key is not present, otherwise a fully resolved Path
"""
if key in os.environ:
return __get_path(key, "")
return None
def __get_list(
key: str,
default: list[str] | None = None,
sep: str = ",",
) -> list[str]:
"""
Return a list of elements from the environment, as separated by the given
string, or the default if the key does not exist
"""
if key in os.environ:
return list(filter(None, os.environ[key].split(sep)))
elif default is not None:
return default
else:
return []
def _parse_redis_url(env_redis: str | None) -> tuple[str, str]:
"""
Gets the Redis information from the environment or a default and handles
converting from incompatible django_channels and celery formats.
Returns a tuple of (celery_url, channels_url)
"""
# Not set, return a compatible default
if env_redis is None:
return ("redis://localhost:6379", "redis://localhost:6379")
if "unix" in env_redis.lower():
# channels_redis socket format, looks like:
# "unix:///path/to/redis.sock"
_, path = env_redis.split(":", 1)
# Optionally setting a db number
if "?db=" in env_redis:
path, number = path.split("?db=")
return (f"redis+socket:{path}?virtual_host={number}", env_redis)
else:
return (f"redis+socket:{path}", env_redis)
elif "+socket" in env_redis.lower():
# celery socket style, looks like:
# "redis+socket:///path/to/redis.sock"
_, path = env_redis.split(":", 1)
if "?virtual_host=" in env_redis:
# Virtual host (aka db number)
path, number = path.split("?virtual_host=")
return (env_redis, f"unix:{path}?db={number}")
else:
return (env_redis, f"unix:{path}")
# Not a socket
return (env_redis, env_redis)
def _parse_beat_schedule() -> dict:
"""
Configures the scheduled tasks, according to default or
environment variables. Task expiration is configured so the task will
expire (and not run), shortly before the default frequency will put another
of the same task into the queue
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
"""
schedule = {}
tasks = [
{
"name": "Check all e-mail accounts",
"env_key": "PAPERLESS_EMAIL_TASK_CRON",
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
"options": {
# 1 minute before default schedule sends again
"expires": 9.0 * 60.0,
},
},
{
"name": "Train the classifier",
"env_key": "PAPERLESS_TRAIN_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Optimize the index",
"env_key": "PAPERLESS_INDEX_TASK_CRON",
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Perform sanity check",
"env_key": "PAPERLESS_SANITY_TASK_CRON",
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
"options": {
# 1 hour before default schedule sends again
"expires": ((7.0 * 24.0) - 1.0) * 60.0 * 60.0,
},
},
{
"name": "Empty trash",
"env_key": "PAPERLESS_EMPTY_TRASH_TASK_CRON",
# Default daily at 01:00
"env_default": "0 1 * * *",
"task": "documents.tasks.empty_trash",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Check and run scheduled workflows",
"env_key": "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.check_scheduled_workflows",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Rebuild LLM index",
"env_key": "PAPERLESS_LLM_INDEX_TASK_CRON",
# Default daily at 02:10
"env_default": "10 2 * * *",
"task": "documents.tasks.llmindex_index",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Cleanup expired share link bundles",
"env_key": "PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON",
# Default daily at 02:00
"env_default": "0 2 * * *",
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
]
for task in tasks:
# Either get the environment setting or use the default
value = os.getenv(task["env_key"], task["env_default"])
# Don't add disabled tasks to the schedule
if value == "disable":
continue
# I find https://crontab.guru/ super helpful
# crontab(5) format
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
"options": task["options"],
}
return schedule
# NEVER RUN WITH DEBUG IN PRODUCTION.
DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
DEBUG = get_bool_from_env("PAPERLESS_DEBUG", "NO")
###############################################################################
@@ -286,21 +62,21 @@ DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
STATIC_ROOT = __get_path("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
STATIC_ROOT = get_path_from_env("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
MEDIA_ROOT = __get_path("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media")
MEDIA_ROOT = get_path_from_env("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media")
ORIGINALS_DIR = MEDIA_ROOT / "documents" / "originals"
ARCHIVE_DIR = MEDIA_ROOT / "documents" / "archive"
THUMBNAIL_DIR = MEDIA_ROOT / "documents" / "thumbnails"
SHARE_LINK_BUNDLE_DIR = MEDIA_ROOT / "documents" / "share_link_bundles"
DATA_DIR = __get_path("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data")
DATA_DIR = get_path_from_env("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data")
NLTK_DIR = __get_path("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data")
NLTK_DIR = get_path_from_env("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data")
# Check deprecated setting first
EMPTY_TRASH_DIR = (
__get_path("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR"))
get_path_from_env("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR"))
if os.getenv("PAPERLESS_TRASH_DIR") or os.getenv("PAPERLESS_EMPTY_TRASH_DIR")
else None
)
@@ -309,21 +85,21 @@ EMPTY_TRASH_DIR = (
# threads.
MEDIA_LOCK = MEDIA_ROOT / "media.lock"
INDEX_DIR = DATA_DIR / "index"
MODEL_FILE = __get_path(
MODEL_FILE = get_path_from_env(
"PAPERLESS_MODEL_FILE",
DATA_DIR / "classification_model.pickle",
)
LLM_INDEX_DIR = DATA_DIR / "llm_index"
LOGGING_DIR = __get_path("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
CONSUMPTION_DIR = __get_path(
CONSUMPTION_DIR = get_path_from_env(
"PAPERLESS_CONSUMPTION_DIR",
BASE_DIR.parent / "consume",
)
# This will be created if it doesn't exist
SCRATCH_DIR = __get_path(
SCRATCH_DIR = get_path_from_env(
"PAPERLESS_SCRATCH_DIR",
Path(tempfile.gettempdir()) / "paperless",
)
@@ -332,7 +108,7 @@ SCRATCH_DIR = __get_path(
# Application Definition #
###############################################################################
env_apps = __get_list("PAPERLESS_APPS")
env_apps = get_list_from_env("PAPERLESS_APPS")
INSTALLED_APPS = [
"whitenoise.runserver_nostatic",
@@ -405,7 +181,7 @@ MIDDLEWARE = [
]
# Optional to enable compression
if __get_boolean("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover
if get_bool_from_env("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover
MIDDLEWARE.insert(0, "compression_middleware.middleware.CompressionMiddleware")
# Workaround to not compress streaming responses (e.g. chat).
@@ -424,20 +200,8 @@ CompressionMiddleware.process_response = patched_process_response
ROOT_URLCONF = "paperless.urls"
def _parse_base_paths() -> tuple[str, str, str, str, str]:
script_name = os.getenv("PAPERLESS_FORCE_SCRIPT_NAME")
base_url = (script_name or "") + "/"
login_url = base_url + "accounts/login/"
login_redirect_url = base_url + "dashboard"
logout_redirect_url = os.getenv(
"PAPERLESS_LOGOUT_REDIRECT_URL",
login_url + "?loggedout=1",
)
return script_name, base_url, login_url, login_redirect_url, logout_redirect_url
FORCE_SCRIPT_NAME, BASE_URL, LOGIN_URL, LOGIN_REDIRECT_URL, LOGOUT_REDIRECT_URL = (
_parse_base_paths()
parse_hosting_settings()
)
# DRF Spectacular settings
@@ -471,7 +235,7 @@ STORAGES = {
"default": {"BACKEND": "django.core.files.storage.FileSystemStorage"},
}
_CELERY_REDIS_URL, _CHANNELS_REDIS_URL = _parse_redis_url(
_CELERY_REDIS_URL, _CHANNELS_REDIS_URL = parse_redis_url(
os.getenv("PAPERLESS_REDIS", None),
)
_REDIS_KEY_PREFIX = os.getenv("PAPERLESS_REDIS_PREFIX", "")
@@ -520,8 +284,8 @@ EMAIL_PORT: Final[int] = int(os.getenv("PAPERLESS_EMAIL_PORT", 25))
EMAIL_HOST_USER: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_USER", "")
EMAIL_HOST_PASSWORD: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_PASSWORD", "")
DEFAULT_FROM_EMAIL: Final[str] = os.getenv("PAPERLESS_EMAIL_FROM", EMAIL_HOST_USER)
EMAIL_USE_TLS: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_TLS")
EMAIL_USE_SSL: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_SSL")
EMAIL_USE_TLS: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_TLS")
EMAIL_USE_SSL: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_SSL")
EMAIL_SUBJECT_PREFIX: Final[str] = "[Paperless-ngx] "
EMAIL_TIMEOUT = 30.0
EMAIL_ENABLED = EMAIL_HOST != "localhost" or EMAIL_HOST_USER != ""
@@ -546,20 +310,22 @@ ACCOUNT_DEFAULT_HTTP_PROTOCOL = os.getenv(
)
ACCOUNT_ADAPTER = "paperless.adapter.CustomAccountAdapter"
ACCOUNT_ALLOW_SIGNUPS = __get_boolean("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS")
ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_ACCOUNT_DEFAULT_GROUPS")
ACCOUNT_ALLOW_SIGNUPS = get_bool_from_env("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS")
ACCOUNT_DEFAULT_GROUPS = get_list_from_env("PAPERLESS_ACCOUNT_DEFAULT_GROUPS")
SOCIALACCOUNT_ADAPTER = "paperless.adapter.CustomSocialAccountAdapter"
SOCIALACCOUNT_ALLOW_SIGNUPS = __get_boolean(
SOCIALACCOUNT_ALLOW_SIGNUPS = get_bool_from_env(
"PAPERLESS_SOCIALACCOUNT_ALLOW_SIGNUPS",
"yes",
)
SOCIALACCOUNT_AUTO_SIGNUP = __get_boolean("PAPERLESS_SOCIAL_AUTO_SIGNUP")
SOCIALACCOUNT_AUTO_SIGNUP = get_bool_from_env("PAPERLESS_SOCIAL_AUTO_SIGNUP")
SOCIALACCOUNT_PROVIDERS = json.loads(
os.getenv("PAPERLESS_SOCIALACCOUNT_PROVIDERS", "{}"),
)
SOCIAL_ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS")
SOCIAL_ACCOUNT_SYNC_GROUPS = __get_boolean("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
SOCIAL_ACCOUNT_DEFAULT_GROUPS = get_list_from_env(
"PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS",
)
SOCIAL_ACCOUNT_SYNC_GROUPS = get_bool_from_env("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM: Final[str] = os.getenv(
"PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM",
"groups",
@@ -571,8 +337,8 @@ MFA_TOTP_ISSUER = "Paperless-ngx"
ACCOUNT_EMAIL_SUBJECT_PREFIX = "[Paperless-ngx] "
DISABLE_REGULAR_LOGIN = __get_boolean("PAPERLESS_DISABLE_REGULAR_LOGIN")
REDIRECT_LOGIN_TO_SSO = __get_boolean("PAPERLESS_REDIRECT_LOGIN_TO_SSO")
DISABLE_REGULAR_LOGIN = get_bool_from_env("PAPERLESS_DISABLE_REGULAR_LOGIN")
REDIRECT_LOGIN_TO_SSO = get_bool_from_env("PAPERLESS_REDIRECT_LOGIN_TO_SSO")
AUTO_LOGIN_USERNAME = os.getenv("PAPERLESS_AUTO_LOGIN_USERNAME")
@@ -585,12 +351,15 @@ ACCOUNT_EMAIL_VERIFICATION = (
)
)
ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = __get_boolean(
ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = get_bool_from_env(
"PAPERLESS_ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS",
"True",
)
ACCOUNT_SESSION_REMEMBER = __get_boolean("PAPERLESS_ACCOUNT_SESSION_REMEMBER", "True")
ACCOUNT_SESSION_REMEMBER = get_bool_from_env(
"PAPERLESS_ACCOUNT_SESSION_REMEMBER",
"True",
)
SESSION_EXPIRE_AT_BROWSER_CLOSE = not ACCOUNT_SESSION_REMEMBER
SESSION_COOKIE_AGE = int(
os.getenv("PAPERLESS_SESSION_COOKIE_AGE", 60 * 60 * 24 * 7 * 3),
@@ -607,8 +376,8 @@ if AUTO_LOGIN_USERNAME:
def _parse_remote_user_settings() -> str:
global MIDDLEWARE, AUTHENTICATION_BACKENDS, REST_FRAMEWORK
enable = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER")
enable_api = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API")
enable = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER")
enable_api = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API")
if enable or enable_api:
MIDDLEWARE.append("paperless.auth.HttpRemoteUserMiddleware")
AUTHENTICATION_BACKENDS.insert(
@@ -636,16 +405,16 @@ HTTP_REMOTE_USER_HEADER_NAME = _parse_remote_user_settings()
X_FRAME_OPTIONS = "SAMEORIGIN"
# The next 3 settings can also be set using just PAPERLESS_URL
CSRF_TRUSTED_ORIGINS = __get_list("PAPERLESS_CSRF_TRUSTED_ORIGINS")
CSRF_TRUSTED_ORIGINS = get_list_from_env("PAPERLESS_CSRF_TRUSTED_ORIGINS")
if DEBUG:
# Allow access from the angular development server during debugging
CSRF_TRUSTED_ORIGINS.append("http://localhost:4200")
# We allow CORS from localhost:8000
CORS_ALLOWED_ORIGINS = __get_list(
CORS_ALLOWED_ORIGINS = get_list_from_env(
"PAPERLESS_CORS_ALLOWED_HOSTS",
["http://localhost:8000"],
default=["http://localhost:8000"],
)
if DEBUG:
@@ -658,7 +427,7 @@ CORS_EXPOSE_HEADERS = [
"Content-Disposition",
]
ALLOWED_HOSTS = __get_list("PAPERLESS_ALLOWED_HOSTS", ["*"])
ALLOWED_HOSTS = get_list_from_env("PAPERLESS_ALLOWED_HOSTS", default=["*"])
if ALLOWED_HOSTS != ["*"]:
# always allow localhost. Necessary e.g. for healthcheck in docker.
ALLOWED_HOSTS.append("localhost")
@@ -678,10 +447,10 @@ def _parse_paperless_url():
PAPERLESS_URL = _parse_paperless_url()
# For use with trusted proxies
TRUSTED_PROXIES = __get_list("PAPERLESS_TRUSTED_PROXIES")
TRUSTED_PROXIES = get_list_from_env("PAPERLESS_TRUSTED_PROXIES")
USE_X_FORWARDED_HOST = __get_boolean("PAPERLESS_USE_X_FORWARD_HOST", "false")
USE_X_FORWARDED_PORT = __get_boolean("PAPERLESS_USE_X_FORWARD_PORT", "false")
USE_X_FORWARDED_HOST = get_bool_from_env("PAPERLESS_USE_X_FORWARD_HOST", "false")
USE_X_FORWARDED_PORT = get_bool_from_env("PAPERLESS_USE_X_FORWARD_PORT", "false")
SECURE_PROXY_SSL_HEADER = (
tuple(json.loads(os.environ["PAPERLESS_PROXY_SSL_HEADER"]))
if "PAPERLESS_PROXY_SSL_HEADER" in os.environ
@@ -724,7 +493,7 @@ CSRF_COOKIE_NAME = f"{COOKIE_PREFIX}csrftoken"
SESSION_COOKIE_NAME = f"{COOKIE_PREFIX}sessionid"
LANGUAGE_COOKIE_NAME = f"{COOKIE_PREFIX}django_language"
EMAIL_CERTIFICATE_FILE = __get_optional_path("PAPERLESS_EMAIL_CERTIFICATE_LOCATION")
EMAIL_CERTIFICATE_FILE = get_path_from_env("PAPERLESS_EMAIL_CERTIFICATE_LOCATION")
###############################################################################
@@ -875,7 +644,7 @@ CELERY_BROKER_URL = _CELERY_REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
CELERY_WORKER_CONCURRENCY: Final[int] = get_int_from_env("PAPERLESS_TASK_WORKERS", 1)
TASK_WORKERS = CELERY_WORKER_CONCURRENCY
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_WORKER_SEND_TASK_EVENTS = True
@@ -888,7 +657,7 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
}
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
CELERY_TASK_TIME_LIMIT: Final[int] = get_int_from_env("PAPERLESS_WORKER_TIMEOUT", 1800)
CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
@@ -900,7 +669,7 @@ CELERY_TASK_SERIALIZER = "pickle"
CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"]
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule
CELERY_BEAT_SCHEDULE = _parse_beat_schedule()
CELERY_BEAT_SCHEDULE = parse_beat_schedule()
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule-filename
CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
@@ -908,14 +677,14 @@ CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
# Cachalot: Database read cache.
def _parse_cachalot_settings():
ttl = __get_int("PAPERLESS_READ_CACHE_TTL", 3600)
ttl = get_int_from_env("PAPERLESS_READ_CACHE_TTL", 3600)
ttl = min(ttl, 31536000) if ttl > 0 else 3600
_, redis_url = _parse_redis_url(
_, redis_url = parse_redis_url(
os.getenv("PAPERLESS_READ_CACHE_REDIS_URL", _CHANNELS_REDIS_URL),
)
result = {
"CACHALOT_CACHE": "read-cache",
"CACHALOT_ENABLED": __get_boolean(
"CACHALOT_ENABLED": get_bool_from_env(
"PAPERLESS_DB_READ_CACHE_ENABLED",
default="no",
),
@@ -1000,9 +769,9 @@ CONSUMER_POLLING_INTERVAL = float(os.getenv("PAPERLESS_CONSUMER_POLLING_INTERVAL
CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5))
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
CONSUMER_DELETE_DUPLICATES = get_bool_from_env("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
CONSUMER_RECURSIVE = get_bool_from_env("PAPERLESS_CONSUMER_RECURSIVE")
# Ignore regex patterns, matched against filename only
CONSUMER_IGNORE_PATTERNS = list(
@@ -1024,13 +793,13 @@ CONSUMER_IGNORE_DIRS = list(
),
)
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
CONSUMER_SUBDIRS_AS_TAGS = get_bool_from_env("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
CONSUMER_ENABLE_BARCODES: Final[bool] = __get_boolean(
CONSUMER_ENABLE_BARCODES: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_ENABLE_BARCODES",
)
CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = __get_boolean(
CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT",
)
@@ -1039,7 +808,7 @@ CONSUMER_BARCODE_STRING: Final[str] = os.getenv(
"PATCHT",
)
CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = __get_boolean(
CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_ENABLE_ASN_BARCODE",
)
@@ -1048,23 +817,26 @@ CONSUMER_ASN_BARCODE_PREFIX: Final[str] = os.getenv(
"ASN",
)
CONSUMER_BARCODE_UPSCALE: Final[float] = __get_float(
CONSUMER_BARCODE_UPSCALE: Final[float] = get_float_from_env(
"PAPERLESS_CONSUMER_BARCODE_UPSCALE",
0.0,
)
CONSUMER_BARCODE_DPI: Final[int] = __get_int("PAPERLESS_CONSUMER_BARCODE_DPI", 300)
CONSUMER_BARCODE_DPI: Final[int] = get_int_from_env(
"PAPERLESS_CONSUMER_BARCODE_DPI",
300,
)
CONSUMER_BARCODE_MAX_PAGES: Final[int] = __get_int(
CONSUMER_BARCODE_MAX_PAGES: Final[int] = get_int_from_env(
"PAPERLESS_CONSUMER_BARCODE_MAX_PAGES",
0,
)
CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = __get_boolean(
CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = get_bool_from_env(
"PAPERLESS_CONSUMER_BARCODE_RETAIN_SPLIT_PAGES",
)
CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = __get_boolean(
CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_ENABLE_TAG_BARCODE",
)
@@ -1077,11 +849,11 @@ CONSUMER_TAG_BARCODE_MAPPING = dict(
),
)
CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = __get_boolean(
CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_TAG_BARCODE_SPLIT",
)
CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = __get_boolean(
CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED",
)
@@ -1090,13 +862,13 @@ CONSUMER_COLLATE_DOUBLE_SIDED_SUBDIR_NAME: Final[str] = os.getenv(
"double-sided",
)
CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = __get_boolean(
CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = get_bool_from_env(
"PAPERLESS_CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT",
)
CONSUMER_PDF_RECOVERABLE_MIME_TYPES = ("application/octet-stream",)
OCR_PAGES = __get_optional_int("PAPERLESS_OCR_PAGES")
OCR_PAGES = get_int_from_env("PAPERLESS_OCR_PAGES")
# The default language that tesseract will attempt to use when parsing
# documents. It should be a 3-letter language code consistent with ISO 639.
@@ -1110,20 +882,20 @@ OCR_MODE = os.getenv("PAPERLESS_OCR_MODE", "skip")
OCR_SKIP_ARCHIVE_FILE = os.getenv("PAPERLESS_OCR_SKIP_ARCHIVE_FILE", "never")
OCR_IMAGE_DPI = __get_optional_int("PAPERLESS_OCR_IMAGE_DPI")
OCR_IMAGE_DPI = get_int_from_env("PAPERLESS_OCR_IMAGE_DPI")
OCR_CLEAN = os.getenv("PAPERLESS_OCR_CLEAN", "clean")
OCR_DESKEW: Final[bool] = __get_boolean("PAPERLESS_OCR_DESKEW", "true")
OCR_DESKEW: Final[bool] = get_bool_from_env("PAPERLESS_OCR_DESKEW", "true")
OCR_ROTATE_PAGES: Final[bool] = __get_boolean("PAPERLESS_OCR_ROTATE_PAGES", "true")
OCR_ROTATE_PAGES: Final[bool] = get_bool_from_env("PAPERLESS_OCR_ROTATE_PAGES", "true")
OCR_ROTATE_PAGES_THRESHOLD: Final[float] = __get_float(
OCR_ROTATE_PAGES_THRESHOLD: Final[float] = get_float_from_env(
"PAPERLESS_OCR_ROTATE_PAGES_THRESHOLD",
12.0,
)
OCR_MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
OCR_MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env(
"PAPERLESS_OCR_MAX_IMAGE_PIXELS",
)
@@ -1134,7 +906,7 @@ OCR_COLOR_CONVERSION_STRATEGY = os.getenv(
OCR_USER_ARGS = os.getenv("PAPERLESS_OCR_USER_ARGS")
MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env(
"PAPERLESS_MAX_IMAGE_PIXELS",
)
@@ -1149,7 +921,7 @@ CONVERT_MEMORY_LIMIT = os.getenv("PAPERLESS_CONVERT_MEMORY_LIMIT")
GS_BINARY = os.getenv("PAPERLESS_GS_BINARY", "gs")
# Fallback layout for .eml consumption
EMAIL_PARSE_DEFAULT_LAYOUT = __get_int(
EMAIL_PARSE_DEFAULT_LAYOUT = get_int_from_env(
"PAPERLESS_EMAIL_PARSE_DEFAULT_LAYOUT",
1, # MailRule.PdfLayout.TEXT_HTML but that can't be imported here
)
@@ -1163,23 +935,9 @@ DATE_ORDER = os.getenv("PAPERLESS_DATE_ORDER", "DMY")
FILENAME_DATE_ORDER = os.getenv("PAPERLESS_FILENAME_DATE_ORDER")
def _parse_dateparser_languages(languages: str | None):
language_list = languages.split("+") if languages else []
# There is an unfixed issue in zh-Hant and zh-Hans locales in the dateparser lib.
# See: https://github.com/scrapinghub/dateparser/issues/875
for index, language in enumerate(language_list):
if language.startswith("zh-") and "zh" not in language_list:
logger.warning(
f'Chinese locale detected: {language}. dateparser might fail to parse some dates with this locale, so Chinese ("zh") will be used as a fallback.',
)
language_list.append("zh")
return list(LocaleDataLoader().get_locale_map(locales=language_list))
# If not set, we will infer it at runtime
DATE_PARSER_LANGUAGES = (
_parse_dateparser_languages(
parse_dateparser_languages(
os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES"),
)
if os.getenv("PAPERLESS_DATE_PARSER_LANGUAGES")
@@ -1190,7 +948,7 @@ DATE_PARSER_LANGUAGES = (
# Maximum number of dates taken from document start to end to show as suggestions for
# `created` date in the frontend. Duplicates are removed, which can result in
# fewer dates shown.
NUMBER_OF_SUGGESTED_DATES = __get_int("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3)
NUMBER_OF_SUGGESTED_DATES = get_int_from_env("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3)
# Specify the filename format for out files
FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
@@ -1198,7 +956,7 @@ FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
# If this is enabled, variables in filename format will resolve to
# empty-string instead of 'none'.
# Directories with 'empty names' are omitted, too.
FILENAME_FORMAT_REMOVE_NONE = __get_boolean(
FILENAME_FORMAT_REMOVE_NONE = get_bool_from_env(
"PAPERLESS_FILENAME_FORMAT_REMOVE_NONE",
"NO",
)
@@ -1209,7 +967,7 @@ THUMBNAIL_FONT_NAME = os.getenv(
)
# Tika settings
TIKA_ENABLED = __get_boolean("PAPERLESS_TIKA_ENABLED", "NO")
TIKA_ENABLED = get_bool_from_env("PAPERLESS_TIKA_ENABLED", "NO")
TIKA_ENDPOINT = os.getenv("PAPERLESS_TIKA_ENDPOINT", "http://localhost:9998")
TIKA_GOTENBERG_ENDPOINT = os.getenv(
"PAPERLESS_TIKA_GOTENBERG_ENDPOINT",
@@ -1219,52 +977,21 @@ TIKA_GOTENBERG_ENDPOINT = os.getenv(
if TIKA_ENABLED:
INSTALLED_APPS.append("paperless_tika.apps.PaperlessTikaConfig")
AUDIT_LOG_ENABLED = __get_boolean("PAPERLESS_AUDIT_LOG_ENABLED", "true")
AUDIT_LOG_ENABLED = get_bool_from_env("PAPERLESS_AUDIT_LOG_ENABLED", "true")
if AUDIT_LOG_ENABLED:
INSTALLED_APPS.append("auditlog")
MIDDLEWARE.append("auditlog.middleware.AuditlogMiddleware")
def _parse_ignore_dates(
env_ignore: str,
date_order: str = DATE_ORDER,
) -> set[datetime.datetime]:
"""
If the PAPERLESS_IGNORE_DATES environment variable is set, parse the
user provided string(s) into dates
Args:
env_ignore (str): The value of the environment variable, comma separated dates
date_order (str, optional): The format of the date strings.
Defaults to DATE_ORDER.
Returns:
Set[datetime.datetime]: The set of parsed date objects
"""
import dateparser
ignored_dates = set()
for s in env_ignore.split(","):
d = dateparser.parse(
s,
settings={
"DATE_ORDER": date_order,
},
)
if d:
ignored_dates.add(d.date())
return ignored_dates
# List dates that should be ignored when trying to parse date from document text
IGNORE_DATES: set[datetime.date] = set()
if os.getenv("PAPERLESS_IGNORE_DATES") is not None:
IGNORE_DATES = _parse_ignore_dates(os.getenv("PAPERLESS_IGNORE_DATES"))
IGNORE_DATES = parse_ignore_dates(os.getenv("PAPERLESS_IGNORE_DATES"), DATE_ORDER)
ENABLE_UPDATE_CHECK = os.getenv("PAPERLESS_ENABLE_UPDATE_CHECK", "default")
if ENABLE_UPDATE_CHECK != "default":
ENABLE_UPDATE_CHECK = __get_boolean("PAPERLESS_ENABLE_UPDATE_CHECK")
ENABLE_UPDATE_CHECK = get_bool_from_env("PAPERLESS_ENABLE_UPDATE_CHECK")
APP_TITLE = os.getenv("PAPERLESS_APP_TITLE", None)
APP_LOGO = os.getenv("PAPERLESS_APP_LOGO", None)
@@ -1309,7 +1036,7 @@ def _get_nltk_language_setting(ocr_lang: str) -> str | None:
return iso_code_to_nltk.get(ocr_lang)
NLTK_ENABLED: Final[bool] = __get_boolean("PAPERLESS_ENABLE_NLTK", "yes")
NLTK_ENABLED: Final[bool] = get_bool_from_env("PAPERLESS_ENABLE_NLTK", "yes")
NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
@@ -1318,7 +1045,7 @@ NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
###############################################################################
EMAIL_GNUPG_HOME: Final[str | None] = os.getenv("PAPERLESS_EMAIL_GNUPG_HOME")
EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = __get_boolean(
EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = get_bool_from_env(
"PAPERLESS_ENABLE_GPG_DECRYPTOR",
)
@@ -1326,7 +1053,7 @@ EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = __get_boolean(
###############################################################################
# Soft Delete #
###############################################################################
EMPTY_TRASH_DELAY = max(__get_int("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1)
EMPTY_TRASH_DELAY = max(get_int_from_env("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1)
###############################################################################
@@ -1351,21 +1078,17 @@ OUTLOOK_OAUTH_ENABLED = bool(
###############################################################################
# Webhooks
###############################################################################
WEBHOOKS_ALLOWED_SCHEMES = set(
WEBHOOKS_ALLOWED_SCHEMES = {
s.lower()
for s in __get_list(
for s in get_list_from_env(
"PAPERLESS_WEBHOOKS_ALLOWED_SCHEMES",
["http", "https"],
default=["http", "https"],
)
)
WEBHOOKS_ALLOWED_PORTS = set(
int(p)
for p in __get_list(
"PAPERLESS_WEBHOOKS_ALLOWED_PORTS",
[],
)
)
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = __get_boolean(
}
WEBHOOKS_ALLOWED_PORTS = {
int(p) for p in get_list_from_env("PAPERLESS_WEBHOOKS_ALLOWED_PORTS", default=[])
}
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = get_bool_from_env(
"PAPERLESS_WEBHOOKS_ALLOW_INTERNAL_REQUESTS",
"true",
)
@@ -1380,7 +1103,7 @@ REMOTE_OCR_ENDPOINT = os.getenv("PAPERLESS_REMOTE_OCR_ENDPOINT")
################################################################################
# AI Settings #
################################################################################
AI_ENABLED = __get_boolean("PAPERLESS_AI_ENABLED", "NO")
AI_ENABLED = get_bool_from_env("PAPERLESS_AI_ENABLED", "NO")
LLM_EMBEDDING_BACKEND = os.getenv(
"PAPERLESS_AI_LLM_EMBEDDING_BACKEND",
) # "huggingface" or "openai"

View File

@@ -1,11 +1,191 @@
import datetime
import logging
import os
from pathlib import Path
from typing import Any
from celery.schedules import crontab
from dateparser.languages.loader import LocaleDataLoader
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str
logger = logging.getLogger(__name__)
def parse_hosting_settings() -> tuple[str | None, str, str, str, str]:
script_name = os.getenv("PAPERLESS_FORCE_SCRIPT_NAME")
base_url = (script_name or "") + "/"
login_url = base_url + "accounts/login/"
login_redirect_url = base_url + "dashboard"
logout_redirect_url = os.getenv(
"PAPERLESS_LOGOUT_REDIRECT_URL",
login_url + "?loggedout=1",
)
return script_name, base_url, login_url, login_redirect_url, logout_redirect_url
def parse_redis_url(env_redis: str | None) -> tuple[str, str]:
"""
Gets the Redis information from the environment or a default and handles
converting from incompatible django_channels and celery formats.
Returns a tuple of (celery_url, channels_url)
"""
# Not set, return a compatible default
if env_redis is None:
return ("redis://localhost:6379", "redis://localhost:6379")
if "unix" in env_redis.lower():
# channels_redis socket format, looks like:
# "unix:///path/to/redis.sock"
_, path = env_redis.split(":", maxsplit=1)
# Optionally setting a db number
if "?db=" in env_redis:
path, number = path.split("?db=")
return (f"redis+socket:{path}?virtual_host={number}", env_redis)
else:
return (f"redis+socket:{path}", env_redis)
elif "+socket" in env_redis.lower():
# celery socket style, looks like:
# "redis+socket:///path/to/redis.sock"
_, path = env_redis.split(":", maxsplit=1)
if "?virtual_host=" in env_redis:
# Virtual host (aka db number)
path, number = path.split("?virtual_host=")
return (env_redis, f"unix:{path}?db={number}")
else:
return (env_redis, f"unix:{path}")
# Not a socket
return (env_redis, env_redis)
def parse_beat_schedule() -> dict:
"""
Configures the scheduled tasks, according to default or
environment variables. Task expiration is configured so the task will
expire (and not run), shortly before the default frequency will put another
of the same task into the queue
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
"""
schedule = {}
tasks = [
{
"name": "Check all e-mail accounts",
"env_key": "PAPERLESS_EMAIL_TASK_CRON",
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
"options": {
# 1 minute before default schedule sends again
"expires": 9.0 * 60.0,
},
},
{
"name": "Train the classifier",
"env_key": "PAPERLESS_TRAIN_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Optimize the index",
"env_key": "PAPERLESS_INDEX_TASK_CRON",
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Perform sanity check",
"env_key": "PAPERLESS_SANITY_TASK_CRON",
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
"options": {
# 1 hour before default schedule sends again
"expires": ((7.0 * 24.0) - 1.0) * 60.0 * 60.0,
},
},
{
"name": "Empty trash",
"env_key": "PAPERLESS_EMPTY_TRASH_TASK_CRON",
# Default daily at 01:00
"env_default": "0 1 * * *",
"task": "documents.tasks.empty_trash",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Check and run scheduled workflows",
"env_key": "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.check_scheduled_workflows",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0 * 60.0,
},
},
{
"name": "Rebuild LLM index",
"env_key": "PAPERLESS_LLM_INDEX_TASK_CRON",
# Default daily at 02:10
"env_default": "10 2 * * *",
"task": "documents.tasks.llmindex_index",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Cleanup expired share link bundles",
"env_key": "PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON",
# Default daily at 02:00
"env_default": "0 2 * * *",
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0 * 60.0 * 60.0,
},
},
]
for task in tasks:
# Either get the environment setting or use the default
value = os.getenv(task["env_key"], task["env_default"])
# Don't add disabled tasks to the schedule
if value == "disable":
continue
# I find https://crontab.guru/ super helpful
# crontab(5) format
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
"options": task["options"],
}
return schedule
def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
"""Parse database settings from environment variables.
@@ -120,3 +300,48 @@ def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
)
return {"default": db_config}
def parse_dateparser_languages(languages: str | None) -> list[str]:
language_list = languages.split("+") if languages else []
# There is an unfixed issue in zh-Hant and zh-Hans locales in the dateparser lib.
# See: https://github.com/scrapinghub/dateparser/issues/875
for index, language in enumerate(language_list):
if language.startswith("zh-") and "zh" not in language_list:
logger.warning(
f"Chinese locale detected: {language}. dateparser might fail to parse"
f' some dates with this locale, so Chinese ("zh") will be used as a fallback.',
)
language_list.append("zh")
return list(LocaleDataLoader().get_locale_map(locales=language_list))
def parse_ignore_dates(
env_ignore: str,
date_order: str,
) -> set[datetime.date]:
"""
If the PAPERLESS_IGNORE_DATES environment variable is set, parse the
user provided string(s) into dates
Args:
env_ignore (str): The value of the environment variable, comma separated dates
date_order (str): The format of the date strings.
Returns:
set[datetime.date]: The set of parsed date objects
"""
import dateparser
ignored_dates = set()
for s in env_ignore.split(","):
d = dateparser.parse(
s,
settings={
"DATE_ORDER": date_order,
},
)
if d:
ignored_dates.add(d.date())
return ignored_dates

View File

@@ -156,6 +156,108 @@ def parse_dict_from_str(
return settings
def get_bool_from_env(key: str, default: str = "NO") -> bool:
"""
Return a boolean value based on whatever the user has supplied in the
environment based on whether the value "looks like" it's True or not.
"""
return str_to_bool(os.getenv(key, default))
@overload
def get_float_from_env(key: str) -> float | None: ...
@overload
def get_float_from_env(key: str, default: None) -> float | None: ...
@overload
def get_float_from_env(key: str, default: float) -> float: ...
def get_float_from_env(key: str, default: float | None = None) -> float | None:
"""
Return a float value based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default
return float(os.environ[key])
@overload
def get_path_from_env(key: str) -> Path | None: ...
@overload
def get_path_from_env(key: str, default: None) -> Path | None: ...
@overload
def get_path_from_env(key: str, default: Path | str) -> Path: ...
def get_path_from_env(key: str, default: Path | str | None = None) -> Path | None:
"""
Return a Path object based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default if default is None else Path(default).resolve()
return Path(os.environ[key]).resolve()
def get_list_from_env(
key: str,
separator: str = ",",
default: list[T] | None = None,
*,
strip_whitespace: bool = True,
remove_empty: bool = True,
required: bool = False,
) -> list[str] | list[T]:
"""
Get and parse a list from an environment variable or return a default.
Args:
key: Environment variable name
separator: Character(s) to split on (default: ',')
default: Default value to return if env var is not set or empty
strip_whitespace: Whether to strip whitespace from each element
remove_empty: Whether to remove empty strings from the result
required: If True, raise an error when the env var is missing and no default provided
Returns:
List of strings or list of type-cast values, or default if env var is empty/None
Raises:
ValueError: If required=True and env var is missing and there is no default
"""
# Get the environment variable value
env_value = os.environ.get(key)
# Handle required environment variables
if required and env_value is None and default is None:
raise ValueError(f"Required environment variable '{key}' is not set")
if env_value:
items = env_value.split(separator)
if strip_whitespace:
items = [item.strip() for item in items]
if remove_empty:
items = [item for item in items if item]
return items
elif default is not None:
return default
else:
return []
def get_choice_from_env(
env_key: str,
choices: set[str],

View File

@@ -1,10 +1,279 @@
import datetime
import os
from pathlib import Path
from typing import Any
import pytest
from celery.schedules import crontab
from pytest_mock import MockerFixture
from paperless.settings.custom import parse_beat_schedule
from paperless.settings.custom import parse_dateparser_languages
from paperless.settings.custom import parse_db_settings
from paperless.settings.custom import parse_hosting_settings
from paperless.settings.custom import parse_ignore_dates
from paperless.settings.custom import parse_redis_url
class TestRedisSocketConversion:
@pytest.mark.parametrize(
("input_url", "expected"),
[
pytest.param(
None,
("redis://localhost:6379", "redis://localhost:6379"),
id="none_uses_default",
),
pytest.param(
"redis+socket:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
id="celery_style_socket",
),
pytest.param(
"unix:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
id="redis_py_style_socket",
),
pytest.param(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
"unix:///run/redis/redis.sock?db=5",
),
id="celery_style_socket_with_db",
),
pytest.param(
"unix:///run/redis/redis.sock?db=10",
(
"redis+socket:///run/redis/redis.sock?virtual_host=10",
"unix:///run/redis/redis.sock?db=10",
),
id="redis_py_style_socket_with_db",
),
pytest.param(
"redis://myredishost:6379",
("redis://myredishost:6379", "redis://myredishost:6379"),
id="host_with_port_unchanged",
),
# Credentials in unix:// URL contain multiple colons (user:password@)
# Regression test for https://github.com/paperless-ngx/paperless-ngx/pull/12239
pytest.param(
"unix://user:password@/run/redis/redis.sock",
(
"redis+socket://user:password@/run/redis/redis.sock",
"unix://user:password@/run/redis/redis.sock",
),
id="redis_py_style_socket_with_credentials",
),
pytest.param(
"redis+socket://user:password@/run/redis/redis.sock",
(
"redis+socket://user:password@/run/redis/redis.sock",
"unix://user:password@/run/redis/redis.sock",
),
id="celery_style_socket_with_credentials",
),
],
)
def test_redis_socket_parsing(
self,
input_url: str | None,
expected: tuple[str, str],
) -> None:
"""
GIVEN:
- Various Redis connection URI formats
WHEN:
- The URI is parsed
THEN:
- Socket based URIs are translated
- Non-socket URIs are unchanged
- None provided uses default
"""
result = parse_redis_url(input_url)
assert expected == result
class TestParseHostingSettings:
@pytest.mark.parametrize(
("env", "expected"),
[
pytest.param(
{},
(
None,
"/",
"/accounts/login/",
"/dashboard",
"/accounts/login/?loggedout=1",
),
id="no_env_vars",
),
pytest.param(
{"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless"},
(
"/paperless",
"/paperless/",
"/paperless/accounts/login/",
"/paperless/dashboard",
"/paperless/accounts/login/?loggedout=1",
),
id="force_script_name_only",
),
pytest.param(
{
"PAPERLESS_FORCE_SCRIPT_NAME": "/docs",
"PAPERLESS_LOGOUT_REDIRECT_URL": "/custom/logout",
},
(
"/docs",
"/docs/",
"/docs/accounts/login/",
"/docs/dashboard",
"/custom/logout",
),
id="force_script_name_and_logout_redirect",
),
],
)
def test_parse_hosting_settings(
self,
mocker: MockerFixture,
env: dict[str, str],
expected: tuple[str | None, str, str, str, str],
) -> None:
"""Test parse_hosting_settings with various env configurations."""
mocker.patch.dict(os.environ, env, clear=True)
result = parse_hosting_settings()
assert result == expected
def make_expected_schedule(
overrides: dict[str, dict[str, Any]] | None = None,
disabled: set[str] | None = None,
) -> dict[str, Any]:
"""
Build the expected schedule with optional overrides and disabled tasks.
"""
mail_expire = 9.0 * 60.0
classifier_expire = 59.0 * 60.0
index_expire = 23.0 * 60.0 * 60.0
sanity_expire = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
empty_trash_expire = 23.0 * 60.0 * 60.0
workflow_expire = 59.0 * 60.0
llm_index_expire = 23.0 * 60.0 * 60.0
share_link_cleanup_expire = 23.0 * 60.0 * 60.0
schedule: dict[str, Any] = {
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": mail_expire},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": classifier_expire},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": index_expire},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": sanity_expire},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": empty_trash_expire},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": workflow_expire},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute="10", hour="2"),
"options": {"expires": llm_index_expire},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour="2"),
"options": {"expires": share_link_cleanup_expire},
},
}
overrides = overrides or {}
disabled = disabled or set()
for key, val in overrides.items():
schedule[key] = {**schedule.get(key, {}), **val}
for key in disabled:
schedule.pop(key, None)
return schedule
class TestParseBeatSchedule:
@pytest.mark.parametrize(
("env", "expected"),
[
pytest.param({}, make_expected_schedule(), id="defaults"),
pytest.param(
{"PAPERLESS_EMAIL_TASK_CRON": "*/50 * * * mon"},
make_expected_schedule(
overrides={
"Check all e-mail accounts": {
"schedule": crontab(minute="*/50", day_of_week="mon"),
},
},
),
id="email-changed",
),
pytest.param(
{"PAPERLESS_INDEX_TASK_CRON": "disable"},
make_expected_schedule(disabled={"Optimize the index"}),
id="index-disabled",
),
pytest.param(
{
"PAPERLESS_EMAIL_TASK_CRON": "disable",
"PAPERLESS_TRAIN_TASK_CRON": "disable",
"PAPERLESS_SANITY_TASK_CRON": "disable",
"PAPERLESS_INDEX_TASK_CRON": "disable",
"PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
"PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
"PAPERLESS_LLM_INDEX_TASK_CRON": "disable",
"PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON": "disable",
},
{},
id="all-disabled",
),
],
)
def test_parse_beat_schedule(
self,
env: dict[str, str],
expected: dict[str, Any],
mocker: MockerFixture,
) -> None:
mocker.patch.dict(os.environ, env, clear=False)
schedule = parse_beat_schedule()
assert schedule == expected
class TestParseDbSettings:
@@ -264,3 +533,85 @@ class TestParseDbSettings:
settings = parse_db_settings(tmp_path)
assert settings == expected_database_settings
class TestParseIgnoreDates:
"""Tests the parsing of the PAPERLESS_IGNORE_DATES setting value."""
def test_no_ignore_dates_set(self) -> None:
"""
GIVEN:
- No ignore dates are set
THEN:
- No ignore dates are parsed
"""
assert parse_ignore_dates("", "YMD") == set()
@pytest.mark.parametrize(
("env_str", "date_format", "expected"),
[
pytest.param(
"1985-05-01",
"YMD",
{datetime.date(1985, 5, 1)},
id="single-ymd",
),
pytest.param(
"1985-05-01,1991-12-05",
"YMD",
{datetime.date(1985, 5, 1), datetime.date(1991, 12, 5)},
id="multiple-ymd",
),
pytest.param(
"2010-12-13",
"YMD",
{datetime.date(2010, 12, 13)},
id="single-ymd-2",
),
pytest.param(
"11.01.10",
"DMY",
{datetime.date(2010, 1, 11)},
id="single-dmy",
),
pytest.param(
"11.01.2001,15-06-1996",
"DMY",
{datetime.date(2001, 1, 11), datetime.date(1996, 6, 15)},
id="multiple-dmy",
),
],
)
def test_ignore_dates_parsed(
self,
env_str: str,
date_format: str,
expected: set[datetime.date],
) -> None:
"""
GIVEN:
- Ignore dates are set per certain inputs
THEN:
- All ignore dates are parsed
"""
assert parse_ignore_dates(env_str, date_format) == expected
@pytest.mark.parametrize(
("languages", "expected"),
[
("de", ["de"]),
("zh", ["zh"]),
("fr+en", ["fr", "en"]),
# Locales must be supported
("en-001+fr-CA", ["en-001", "fr-CA"]),
("en-001+fr", ["en-001", "fr"]),
# Special case for Chinese: variants seem to miss some dates,
# so we always add "zh" as a fallback.
("en+zh-Hans-HK", ["en", "zh-Hans-HK", "zh"]),
("en+zh-Hans", ["en", "zh-Hans", "zh"]),
("en+zh-Hans+zh-Hant", ["en", "zh-Hans", "zh-Hant", "zh"]),
],
)
def test_parse_dateparser_languages(languages: str, expected: list[str]) -> None:
assert sorted(parse_dateparser_languages(languages)) == sorted(expected)

View File

@@ -4,8 +4,12 @@ from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from paperless.settings.parsers import get_bool_from_env
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_float_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import get_list_from_env
from paperless.settings.parsers import get_path_from_env
from paperless.settings.parsers import parse_dict_from_str
from paperless.settings.parsers import str_to_bool
@@ -205,6 +209,29 @@ class TestParseDictFromString:
assert isinstance(result["database"]["port"], int)
class TestGetBoolFromEnv:
def test_existing_env_var(self, mocker):
"""Test that an existing environment variable is read and converted."""
mocker.patch.dict(os.environ, {"TEST_VAR": "true"})
assert get_bool_from_env("TEST_VAR") is True
def test_missing_env_var_uses_default_no(self, mocker):
"""Test that a missing environment variable uses default 'NO' and returns False."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_bool_from_env("MISSING_VAR") is False
def test_missing_env_var_with_explicit_default(self, mocker):
"""Test that a missing environment variable uses the provided default."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_bool_from_env("MISSING_VAR", default="yes") is True
def test_invalid_value_raises_error(self, mocker):
"""Test that an invalid value raises ValueError (delegates to str_to_bool)."""
mocker.patch.dict(os.environ, {"INVALID_VAR": "maybe"})
with pytest.raises(ValueError):
get_bool_from_env("INVALID_VAR")
class TestGetIntFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
@@ -259,6 +286,199 @@ class TestGetIntFromEnv:
get_int_from_env("INVALID_INT")
class TestGetFloatFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("3.14", 3.14, id="pi"),
pytest.param("42", 42.0, id="int_as_float"),
pytest.param("-2.5", -2.5, id="negative"),
pytest.param("0.0", 0.0, id="zero_float"),
pytest.param("0", 0.0, id="zero_int"),
pytest.param("1.5e2", 150.0, id="sci_positive"),
pytest.param("1e-3", 0.001, id="sci_negative"),
pytest.param("-1.23e4", -12300.0, id="sci_large"),
],
)
def test_existing_env_var_valid_floats(self, mocker, env_value, expected):
"""Test that existing environment variables with valid floats return correct values."""
mocker.patch.dict(os.environ, {"FLOAT_VAR": env_value})
assert get_float_from_env("FLOAT_VAR") == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(3.14, 3.14, id="pi_default"),
pytest.param(0.0, 0.0, id="zero_default"),
pytest.param(-2.5, -2.5, id="negative_default"),
pytest.param(None, None, id="none_default"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_float_from_env("MISSING_VAR", default=default) == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_float_from_env("MISSING_VAR") is None
@pytest.mark.parametrize(
"invalid_value",
[
pytest.param("not_a_number", id="text"),
pytest.param("42.5.0", id="double_decimal"),
pytest.param("42a", id="alpha_suffix"),
pytest.param("", id="empty"),
pytest.param(" ", id="whitespace"),
pytest.param("true", id="boolean"),
pytest.param("1.2.3", id="triple_decimal"),
],
)
def test_invalid_float_values_raise_error(self, mocker, invalid_value):
"""Test that invalid float values raise ValueError."""
mocker.patch.dict(os.environ, {"INVALID_FLOAT": invalid_value})
with pytest.raises(ValueError):
get_float_from_env("INVALID_FLOAT")
class TestGetPathFromEnv:
@pytest.mark.parametrize(
"env_value",
[
pytest.param("/tmp/test", id="absolute"),
pytest.param("relative/path", id="relative"),
pytest.param("/path/with spaces/file.txt", id="spaces"),
pytest.param(".", id="current_dir"),
pytest.param("..", id="parent_dir"),
pytest.param("/", id="root"),
],
)
def test_existing_env_var_paths(self, mocker, env_value):
"""Test that existing environment variables with paths return resolved Path objects."""
mocker.patch.dict(os.environ, {"PATH_VAR": env_value})
result = get_path_from_env("PATH_VAR")
assert isinstance(result, Path)
assert result == Path(env_value).resolve()
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_path_from_env("MISSING_VAR") is None
def test_missing_env_var_with_none_default(self, mocker):
"""Test that missing environment variable with None default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_path_from_env("MISSING_VAR", default=None) is None
@pytest.mark.parametrize(
"default_path_str",
[
pytest.param("/default/path", id="absolute_default"),
pytest.param("relative/default", id="relative_default"),
pytest.param(".", id="current_default"),
],
)
def test_missing_env_var_with_path_defaults(self, mocker, default_path_str):
"""Test that missing environment variables return resolved default Path objects."""
mocker.patch.dict(os.environ, {}, clear=True)
default_path = Path(default_path_str)
result = get_path_from_env("MISSING_VAR", default=default_path)
assert isinstance(result, Path)
assert result == default_path.resolve()
def test_relative_paths_are_resolved(self, mocker):
"""Test that relative paths are properly resolved to absolute paths."""
mocker.patch.dict(os.environ, {"REL_PATH": "relative/path"})
result = get_path_from_env("REL_PATH")
assert result is not None
assert result.is_absolute()
class TestGetListFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("a,b,c", ["a", "b", "c"], id="basic_comma_separated"),
pytest.param("single", ["single"], id="single_element"),
pytest.param("", [], id="empty_string"),
pytest.param("a, b , c", ["a", "b", "c"], id="whitespace_trimmed"),
pytest.param("a,,b,c", ["a", "b", "c"], id="empty_elements_removed"),
],
)
def test_existing_env_var_basic_parsing(self, mocker, env_value, expected):
"""Test that existing environment variables are parsed correctly."""
mocker.patch.dict(os.environ, {"LIST_VAR": env_value})
result = get_list_from_env("LIST_VAR")
assert result == expected
@pytest.mark.parametrize(
("separator", "env_value", "expected"),
[
pytest.param("|", "a|b|c", ["a", "b", "c"], id="pipe_separator"),
pytest.param(":", "a:b:c", ["a", "b", "c"], id="colon_separator"),
pytest.param(";", "a;b;c", ["a", "b", "c"], id="semicolon_separator"),
],
)
def test_custom_separators(self, mocker, separator, env_value, expected):
"""Test that custom separators work correctly."""
mocker.patch.dict(os.environ, {"LIST_VAR": env_value})
result = get_list_from_env("LIST_VAR", separator=separator)
assert result == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(
["default1", "default2"],
["default1", "default2"],
id="string_list_default",
),
pytest.param([1, 2, 3], [1, 2, 3], id="int_list_default"),
pytest.param(None, [], id="none_default_returns_empty_list"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("MISSING_VAR", default=default)
assert result == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns empty list."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("MISSING_VAR")
assert result == []
def test_required_env_var_missing_raises_error(self, mocker):
"""Test that missing required environment variable raises ValueError."""
mocker.patch.dict(os.environ, {}, clear=True)
with pytest.raises(
ValueError,
match="Required environment variable 'REQUIRED_VAR' is not set",
):
get_list_from_env("REQUIRED_VAR", required=True)
def test_required_env_var_with_default_does_not_raise(self, mocker):
"""Test that required environment variable with default does not raise error."""
mocker.patch.dict(os.environ, {}, clear=True)
result = get_list_from_env("REQUIRED_VAR", default=["default"], required=True)
assert result == ["default"]
def test_strip_whitespace_false(self, mocker):
"""Test that whitespace is preserved when strip_whitespace=False."""
mocker.patch.dict(os.environ, {"LIST_VAR": " a , b , c "})
result = get_list_from_env("LIST_VAR", strip_whitespace=False)
assert result == [" a ", " b ", " c "]
def test_remove_empty_false(self, mocker):
"""Test that empty elements are preserved when remove_empty=False."""
mocker.patch.dict(os.environ, {"LIST_VAR": "a,,b,,c"})
result = get_list_from_env("LIST_VAR", remove_empty=False)
assert result == ["a", "", "b", "", "c"]
class TestGetEnvChoice:
@pytest.fixture
def valid_choices(self) -> set[str]:
@@ -394,21 +614,3 @@ class TestGetEnvChoice:
result = get_choice_from_env("TEST_ENV", large_choices)
assert result == "option_50"
def test_different_env_keys(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test function works with different environment variable keys."""
test_cases = [
("DJANGO_ENV", "development"),
("DATABASE_BACKEND", "staging"),
("LOG_LEVEL", "production"),
("APP_MODE", "development"),
]
for env_key, env_value in test_cases:
mocker.patch.dict("os.environ", {env_key: env_value})
result = get_choice_from_env(env_key, valid_choices)
assert result == env_value

View File

@@ -0,0 +1,56 @@
import os
from unittest import TestCase
from unittest import mock
from paperless.settings import _parse_paperless_url
from paperless.settings import default_threads_per_worker
class TestThreadCalculation(TestCase):
def test_workers_threads(self) -> None:
"""
GIVEN:
- Certain CPU counts
WHEN:
- Threads per worker is calculated
THEN:
- Threads per worker less than or equal to CPU count
- At least 1 thread per worker
"""
default_workers = 1
for i in range(1, 64):
with mock.patch(
"paperless.settings.multiprocessing.cpu_count",
) as cpu_count:
cpu_count.return_value = i
default_threads = default_threads_per_worker(default_workers)
self.assertGreaterEqual(default_threads, 1)
self.assertLessEqual(default_workers * default_threads, i)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""
GIVEN:
- PAPERLESS_URL is set
WHEN:
- The URL is parsed
THEN:
- The URL is returned and present in related settings
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_URL": "https://example.com",
},
):
url = _parse_paperless_url()
self.assertEqual("https://example.com", url)
from django.conf import settings
self.assertIn(url, settings.CSRF_TRUSTED_ORIGINS)
self.assertIn(url, settings.CORS_ALLOWED_ORIGINS)

View File

@@ -1,100 +1,107 @@
import logging
from unittest import mock
import pytest
from allauth.account.adapter import get_adapter
from allauth.core import context
from allauth.socialaccount.adapter import get_adapter as get_social_adapter
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.forms import ValidationError
from django.http import HttpRequest
from django.test import TestCase
from django.test import override_settings
from django.urls import reverse
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from rest_framework.authtoken.models import Token
from paperless.adapter import DrfTokenStrategy
@pytest.mark.django_db
class TestCustomAccountAdapter:
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
class TestCustomAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
adapter = get_adapter()
# With no accounts, signups should be allowed
assert adapter.is_open_for_signup(None)
self.assertTrue(adapter.is_open_for_signup(None))
User.objects.create_user("testuser")
# Test when ACCOUNT_ALLOW_SIGNUPS is True
settings.ACCOUNT_ALLOW_SIGNUPS = True
assert adapter.is_open_for_signup(None)
self.assertTrue(adapter.is_open_for_signup(None))
# Test when ACCOUNT_ALLOW_SIGNUPS is False
settings.ACCOUNT_ALLOW_SIGNUPS = False
assert not adapter.is_open_for_signup(None)
self.assertFalse(adapter.is_open_for_signup(None))
def test_is_safe_url(self, settings: SettingsWrapper) -> None:
def test_is_safe_url(self) -> None:
request = HttpRequest()
request.get_host = lambda: "example.com"
request.get_host = mock.Mock(return_value="example.com")
with context.request_context(request):
adapter = get_adapter()
with override_settings(ALLOWED_HOSTS=["*"]):
# True because request host is same
url = "https://example.com"
self.assertTrue(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["*"]
# True because request host is same
assert adapter.is_safe_url("https://example.com")
url = "https://evil.com"
# False despite wildcard because request host is different
assert not adapter.is_safe_url("https://evil.com")
self.assertFalse(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["example.com"]
url = "https://example.com"
# True because request host is same
assert adapter.is_safe_url("https://example.com")
self.assertTrue(adapter.is_safe_url(url))
settings.ALLOWED_HOSTS = ["*", "example.com"]
url = "//evil.com"
# False because request host is not in allowed hosts
assert not adapter.is_safe_url("//evil.com")
self.assertFalse(adapter.is_safe_url(url))
def test_pre_authenticate(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
mocker.patch("allauth.core.internal.ratelimit.consume", return_value=True)
@mock.patch("allauth.core.internal.ratelimit.consume", return_value=True)
def test_pre_authenticate(self, mock_consume) -> None:
adapter = get_adapter()
request = HttpRequest()
request.get_host = lambda: "example.com"
request.get_host = mock.Mock(return_value="example.com")
settings.DISABLE_REGULAR_LOGIN = False
adapter.pre_authenticate(request)
settings.DISABLE_REGULAR_LOGIN = True
with pytest.raises(ValidationError):
with self.assertRaises(ValidationError):
adapter.pre_authenticate(request)
def test_get_reset_password_from_key_url(self, settings: SettingsWrapper) -> None:
def test_get_reset_password_from_key_url(self) -> None:
request = HttpRequest()
request.get_host = lambda: "foo.org"
request.get_host = mock.Mock(return_value="foo.org")
with context.request_context(request):
adapter = get_adapter()
settings.PAPERLESS_URL = None
settings.ACCOUNT_DEFAULT_HTTP_PROTOCOL = "https"
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
# Test when PAPERLESS_URL is None
with override_settings(
PAPERLESS_URL=None,
ACCOUNT_DEFAULT_HTTP_PROTOCOL="https",
):
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
settings.PAPERLESS_URL = "https://bar.com"
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
# Test when PAPERLESS_URL is not None
with override_settings(PAPERLESS_URL="https://bar.com"):
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
@override_settings(ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
Group.objects.create(name="group1")
user = User.objects.create_user("testuser")
adapter = get_adapter()
form = mocker.MagicMock(
form = mock.Mock(
cleaned_data={
"username": "testuser",
"email": "user@example.com",
@@ -103,81 +110,88 @@ class TestCustomAccountAdapter:
user = adapter.save_user(HttpRequest(), user, form, commit=True)
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
def test_fresh_install_save_creates_superuser(self, mocker: MockerFixture) -> None:
def test_fresh_install_save_creates_superuser(self) -> None:
adapter = get_adapter()
form = mocker.MagicMock(
form = mock.Mock(
cleaned_data={
"username": "testuser",
"email": "user@paperless-ngx.com",
},
)
user = adapter.save_user(HttpRequest(), User(), form, commit=True)
assert user.is_superuser
self.assertTrue(user.is_superuser)
form = mocker.MagicMock(
# Next time, it should not create a superuser
form = mock.Mock(
cleaned_data={
"username": "testuser2",
"email": "user2@paperless-ngx.com",
},
)
user2 = adapter.save_user(HttpRequest(), User(), form, commit=True)
assert not user2.is_superuser
self.assertFalse(user2.is_superuser)
class TestCustomSocialAccountAdapter:
@pytest.mark.django_db
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
class TestCustomSocialAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
adapter = get_social_adapter()
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is True
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = True
assert adapter.is_open_for_signup(None, None)
self.assertTrue(adapter.is_open_for_signup(None, None))
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is False
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = False
assert not adapter.is_open_for_signup(None, None)
self.assertFalse(adapter.is_open_for_signup(None, None))
def test_get_connect_redirect_url(self) -> None:
adapter = get_social_adapter()
assert adapter.get_connect_redirect_url(None, None) == reverse("base")
request = None
socialaccount = None
@pytest.mark.django_db
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.SOCIAL_ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
# Test the default URL
expected_url = reverse("base")
self.assertEqual(
adapter.get_connect_redirect_url(request, socialaccount),
expected_url,
)
@override_settings(SOCIAL_ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
Group.objects.create(name="group1")
adapter = get_social_adapter()
request = HttpRequest()
user = User.objects.create_user("testuser")
sociallogin = mocker.MagicMock(user=user)
sociallogin = mock.Mock(
user=user,
)
user = adapter.save_user(HttpRequest(), sociallogin, None)
user = adapter.save_user(request, sociallogin, None)
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
def test_error_logged_on_authentication_error(
self,
caplog: pytest.LogCaptureFixture,
) -> None:
def test_error_logged_on_authentication_error(self) -> None:
adapter = get_social_adapter()
with caplog.at_level(logging.INFO, logger="paperless.auth"):
request = HttpRequest()
with self.assertLogs("paperless.auth", level="INFO") as log_cm:
adapter.on_authentication_error(
HttpRequest(),
request,
provider="test-provider",
error="Error",
exception="Test authentication error",
)
assert any("Test authentication error" in msg for msg in caplog.messages)
self.assertTrue(
any("Test authentication error" in message for message in log_cm.output),
)
@pytest.mark.django_db
class TestDrfTokenStrategy:
class TestDrfTokenStrategy(TestCase):
def test_create_access_token_creates_new_token(self) -> None:
"""
GIVEN:
@@ -187,6 +201,7 @@ class TestDrfTokenStrategy:
THEN:
- A new token is created and its key is returned
"""
user = User.objects.create_user("testuser")
request = HttpRequest()
request.user = user
@@ -194,9 +209,13 @@ class TestDrfTokenStrategy:
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key is not None
assert Token.objects.filter(user=user).exists()
assert token_key == Token.objects.get(user=user).key
# Verify a token was created
self.assertIsNotNone(token_key)
self.assertTrue(Token.objects.filter(user=user).exists())
# Verify the returned key matches the created token
token = Token.objects.get(user=user)
self.assertEqual(token_key, token.key)
def test_create_access_token_returns_existing_token(self) -> None:
"""
@@ -207,6 +226,7 @@ class TestDrfTokenStrategy:
THEN:
- The same token key is returned (no new token created)
"""
user = User.objects.create_user("testuser")
existing_token = Token.objects.create(user=user)
@@ -216,8 +236,11 @@ class TestDrfTokenStrategy:
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key == existing_token.key
assert Token.objects.filter(user=user).count() == 1
# Verify the existing token key is returned
self.assertEqual(token_key, existing_token.key)
# Verify only one token exists (no duplicate created)
self.assertEqual(Token.objects.filter(user=user).count(), 1)
def test_create_access_token_returns_none_for_unauthenticated_user(self) -> None:
"""
@@ -228,11 +251,12 @@ class TestDrfTokenStrategy:
THEN:
- None is returned and no token is created
"""
request = HttpRequest()
request.user = AnonymousUser()
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
assert token_key is None
assert Token.objects.count() == 0
self.assertIsNone(token_key)
self.assertEqual(Token.objects.count(), 0)

View File

@@ -1,13 +1,16 @@
import os
from dataclasses import dataclass
from pathlib import Path
from unittest import mock
import pytest
from django.core.checks import Error
from django.core.checks import Warning
from pytest_django.fixtures import SettingsWrapper
from django.test import TestCase
from django.test import override_settings
from pytest_mock import MockerFixture
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless.checks import audit_log_check
from paperless.checks import binaries_check
from paperless.checks import check_deprecated_db_settings
@@ -17,84 +20,54 @@ from paperless.checks import paths_check
from paperless.checks import settings_values_check
@dataclass(frozen=True, slots=True)
class PaperlessTestDirs:
data_dir: Path
media_dir: Path
consumption_dir: Path
# TODO: consolidate with documents/tests/conftest.py PaperlessDirs/paperless_dirs
# once the paperless and documents test suites are ready to share fixtures.
@pytest.fixture()
def directories(tmp_path: Path, settings: SettingsWrapper) -> PaperlessTestDirs:
data_dir = tmp_path / "data"
media_dir = tmp_path / "media"
consumption_dir = tmp_path / "consumption"
for d in (data_dir, media_dir, consumption_dir):
d.mkdir()
settings.DATA_DIR = data_dir
settings.MEDIA_ROOT = media_dir
settings.CONSUMPTION_DIR = consumption_dir
return PaperlessTestDirs(
data_dir=data_dir,
media_dir=media_dir,
consumption_dir=consumption_dir,
)
class TestChecks:
class TestChecks(DirectoriesMixin, TestCase):
def test_binaries(self) -> None:
assert binaries_check(None) == []
self.assertEqual(binaries_check(None), [])
def test_binaries_fail(self, settings: SettingsWrapper) -> None:
settings.CONVERT_BINARY = "uuuhh"
assert len(binaries_check(None)) == 1
@override_settings(CONVERT_BINARY="uuuhh")
def test_binaries_fail(self) -> None:
self.assertEqual(len(binaries_check(None)), 1)
@pytest.mark.usefixtures("directories")
def test_paths_check(self) -> None:
assert paths_check(None) == []
self.assertEqual(paths_check(None), [])
def test_paths_check_dont_exist(self, settings: SettingsWrapper) -> None:
settings.MEDIA_ROOT = Path("uuh")
settings.DATA_DIR = Path("whatever")
settings.CONSUMPTION_DIR = Path("idontcare")
@override_settings(
MEDIA_ROOT=Path("uuh"),
DATA_DIR=Path("whatever"),
CONSUMPTION_DIR=Path("idontcare"),
)
def test_paths_check_dont_exist(self) -> None:
msgs = paths_check(None)
self.assertEqual(len(msgs), 3, str(msgs))
for msg in msgs:
self.assertTrue(msg.msg.endswith("is set but doesn't exist."))
def test_paths_check_no_access(self) -> None:
Path(self.dirs.data_dir).chmod(0o000)
Path(self.dirs.media_dir).chmod(0o000)
Path(self.dirs.consumption_dir).chmod(0o000)
self.addCleanup(os.chmod, self.dirs.data_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.media_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.consumption_dir, 0o777)
msgs = paths_check(None)
self.assertEqual(len(msgs), 3)
assert len(msgs) == 3, str(msgs)
for msg in msgs:
assert msg.msg.endswith("is set but doesn't exist.")
self.assertTrue(msg.msg.endswith("is not writeable"))
def test_paths_check_no_access(self, directories: PaperlessTestDirs) -> None:
directories.data_dir.chmod(0o000)
directories.media_dir.chmod(0o000)
directories.consumption_dir.chmod(0o000)
@override_settings(DEBUG=False)
def test_debug_disabled(self) -> None:
self.assertEqual(debug_mode_check(None), [])
try:
msgs = paths_check(None)
finally:
directories.data_dir.chmod(0o777)
directories.media_dir.chmod(0o777)
directories.consumption_dir.chmod(0o777)
assert len(msgs) == 3
for msg in msgs:
assert msg.msg.endswith("is not writeable")
def test_debug_disabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = False
assert debug_mode_check(None) == []
def test_debug_enabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = True
assert len(debug_mode_check(None)) == 1
@override_settings(DEBUG=True)
def test_debug_enabled(self) -> None:
self.assertEqual(len(debug_mode_check(None)), 1)
class TestSettingsChecksAgainstDefaults:
class TestSettingsChecksAgainstDefaults(DirectoriesMixin, TestCase):
def test_all_valid(self) -> None:
"""
GIVEN:
@@ -105,71 +78,104 @@ class TestSettingsChecksAgainstDefaults:
- No system check errors reported
"""
msgs = settings_values_check(None)
assert len(msgs) == 0
self.assertEqual(len(msgs), 0)
class TestOcrSettingsChecks:
@pytest.mark.parametrize(
("setting", "value", "expected_msg"),
[
pytest.param(
"OCR_OUTPUT_TYPE",
"notapdf",
'OCR output type "notapdf"',
id="invalid-output-type",
),
pytest.param(
"OCR_MODE",
"makeitso",
'OCR output mode "makeitso"',
id="invalid-mode",
),
pytest.param(
"OCR_MODE",
"skip_noarchive",
"deprecated",
id="deprecated-mode",
),
pytest.param(
"OCR_SKIP_ARCHIVE_FILE",
"invalid",
'OCR_SKIP_ARCHIVE_FILE setting "invalid"',
id="invalid-skip-archive-file",
),
pytest.param(
"OCR_CLEAN",
"cleanme",
'OCR clean mode "cleanme"',
id="invalid-clean",
),
],
)
def test_invalid_setting_produces_one_error(
self,
settings: SettingsWrapper,
setting: str,
value: str,
expected_msg: str,
) -> None:
class TestOcrSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(OCR_OUTPUT_TYPE="notapdf")
def test_invalid_output_type(self) -> None:
"""
GIVEN:
- Default settings
- One OCR setting is set to an invalid value
- OCR output type is invalid
WHEN:
- Settings are validated
THEN:
- Exactly one system check error is reported containing the expected message
- system check error reported for OCR output type
"""
setattr(settings, setting, value)
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert expected_msg in msgs[0].msg
msg = msgs[0]
self.assertIn('OCR output type "notapdf"', msg.msg)
@override_settings(OCR_MODE="makeitso")
def test_invalid_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR output mode "makeitso"', msg.msg)
@override_settings(OCR_MODE="skip_noarchive")
def test_deprecated_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is deprecated
WHEN:
- Settings are validated
THEN:
- deprecation warning reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("deprecated", msg.msg)
@override_settings(OCR_SKIP_ARCHIVE_FILE="invalid")
def test_invalid_ocr_skip_archive_file(self) -> None:
"""
GIVEN:
- Default settings
- OCR_SKIP_ARCHIVE_FILE is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR_SKIP_ARCHIVE_FILE
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR_SKIP_ARCHIVE_FILE setting "invalid"', msg.msg)
@override_settings(OCR_CLEAN="cleanme")
def test_invalid_ocr_clean(self) -> None:
"""
GIVEN:
- Default settings
- OCR cleaning type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR cleaning type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR clean mode "cleanme"', msg.msg)
class TestTimezoneSettingsChecks:
def test_invalid_timezone(self, settings: SettingsWrapper) -> None:
class TestTimezoneSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(TIME_ZONE="TheMoon\\MyCrater")
def test_invalid_timezone(self) -> None:
"""
GIVEN:
- Default settings
@@ -179,16 +185,17 @@ class TestTimezoneSettingsChecks:
THEN:
- system check error reported for timezone
"""
settings.TIME_ZONE = "TheMoon\\MyCrater"
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert 'Timezone "TheMoon\\MyCrater"' in msgs[0].msg
msg = msgs[0]
self.assertIn('Timezone "TheMoon\\MyCrater"', msg.msg)
class TestEmailCertSettingsChecks:
def test_not_valid_file(self, settings: SettingsWrapper) -> None:
class TestEmailCertSettingsChecks(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@override_settings(EMAIL_CERTIFICATE_FILE=Path("/tmp/not_actually_here.pem"))
def test_not_valid_file(self) -> None:
"""
GIVEN:
- Default settings
@@ -198,22 +205,19 @@ class TestEmailCertSettingsChecks:
THEN:
- system check error reported for email certificate
"""
cert_path = Path("/tmp/not_actually_here.pem")
assert not cert_path.is_file()
settings.EMAIL_CERTIFICATE_FILE = cert_path
self.assertIsNotFile("/tmp/not_actually_here.pem")
msgs = settings_values_check(None)
assert len(msgs) == 1
assert "Email cert /tmp/not_actually_here.pem is not a file" in msgs[0].msg
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("Email cert /tmp/not_actually_here.pem is not a file", msg.msg)
class TestAuditLogChecks:
def test_was_enabled_once(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
class TestAuditLogChecks(TestCase):
def test_was_enabled_once(self) -> None:
"""
GIVEN:
- Audit log is not enabled
@@ -222,18 +226,23 @@ class TestAuditLogChecks:
THEN:
- system check error reported for disabling audit log
"""
settings.AUDIT_LOG_ENABLED = False
introspect_mock = mocker.MagicMock()
introspect_mock = mock.MagicMock()
introspect_mock.introspection.table_names.return_value = ["auditlog_logentry"]
mocker.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
)
with override_settings(AUDIT_LOG_ENABLED=False):
with mock.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
):
msgs = audit_log_check(None)
msgs = audit_log_check(None)
self.assertEqual(len(msgs), 1)
assert len(msgs) == 1
assert "auditlog table was found but audit log is disabled." in msgs[0].msg
msg = msgs[0]
self.assertIn(
("auditlog table was found but audit log is disabled."),
msg.msg,
)
DEPRECATED_VARS: dict[str, str] = {
@@ -262,16 +271,20 @@ class TestDeprecatedDbSettings:
@pytest.mark.parametrize(
("env_var", "db_option_key"),
[
pytest.param("PAPERLESS_DB_TIMEOUT", "timeout", id="db-timeout"),
pytest.param(
"PAPERLESS_DB_POOLSIZE",
"pool.min_size / pool.max_size",
id="db-poolsize",
),
pytest.param("PAPERLESS_DBSSLMODE", "sslmode", id="ssl-mode"),
pytest.param("PAPERLESS_DBSSLROOTCERT", "sslrootcert", id="ssl-rootcert"),
pytest.param("PAPERLESS_DBSSLCERT", "sslcert", id="ssl-cert"),
pytest.param("PAPERLESS_DBSSLKEY", "sslkey", id="ssl-key"),
("PAPERLESS_DB_TIMEOUT", "timeout"),
("PAPERLESS_DB_POOLSIZE", "pool.min_size / pool.max_size"),
("PAPERLESS_DBSSLMODE", "sslmode"),
("PAPERLESS_DBSSLROOTCERT", "sslrootcert"),
("PAPERLESS_DBSSLCERT", "sslcert"),
("PAPERLESS_DBSSLKEY", "sslkey"),
],
ids=[
"db-timeout",
"db-poolsize",
"ssl-mode",
"ssl-rootcert",
"ssl-cert",
"ssl-key",
],
)
def test_single_deprecated_var_produces_one_warning(

View File

@@ -1,482 +0,0 @@
import datetime
import os
from unittest import TestCase
from unittest import mock
import pytest
from celery.schedules import crontab
from paperless.settings import _parse_base_paths
from paperless.settings import _parse_beat_schedule
from paperless.settings import _parse_dateparser_languages
from paperless.settings import _parse_ignore_dates
from paperless.settings import _parse_paperless_url
from paperless.settings import _parse_redis_url
from paperless.settings import default_threads_per_worker
class TestIgnoreDateParsing(TestCase):
"""
Tests the parsing of the PAPERLESS_IGNORE_DATES setting value
"""
def _parse_checker(self, test_cases) -> None:
"""
Helper function to check ignore date parsing
Args:
test_cases (_type_): _description_
"""
for env_str, date_format, expected_date_set in test_cases:
self.assertSetEqual(
_parse_ignore_dates(env_str, date_format),
expected_date_set,
)
def test_no_ignore_dates_set(self) -> None:
"""
GIVEN:
- No ignore dates are set
THEN:
- No ignore dates are parsed
"""
self.assertSetEqual(_parse_ignore_dates(""), set())
def test_single_ignore_dates_set(self) -> None:
"""
GIVEN:
- Ignore dates are set per certain inputs
THEN:
- All ignore dates are parsed
"""
test_cases = [
("1985-05-01", "YMD", {datetime.date(1985, 5, 1)}),
(
"1985-05-01,1991-12-05",
"YMD",
{datetime.date(1985, 5, 1), datetime.date(1991, 12, 5)},
),
("2010-12-13", "YMD", {datetime.date(2010, 12, 13)}),
("11.01.10", "DMY", {datetime.date(2010, 1, 11)}),
(
"11.01.2001,15-06-1996",
"DMY",
{datetime.date(2001, 1, 11), datetime.date(1996, 6, 15)},
),
]
self._parse_checker(test_cases)
class TestThreadCalculation(TestCase):
def test_workers_threads(self) -> None:
"""
GIVEN:
- Certain CPU counts
WHEN:
- Threads per worker is calculated
THEN:
- Threads per worker less than or equal to CPU count
- At least 1 thread per worker
"""
default_workers = 1
for i in range(1, 64):
with mock.patch(
"paperless.settings.multiprocessing.cpu_count",
) as cpu_count:
cpu_count.return_value = i
default_threads = default_threads_per_worker(default_workers)
self.assertGreaterEqual(default_threads, 1)
self.assertLessEqual(default_workers * default_threads, i)
class TestRedisSocketConversion(TestCase):
def test_redis_socket_parsing(self) -> None:
"""
GIVEN:
- Various Redis connection URI formats
WHEN:
- The URI is parsed
THEN:
- Socket based URIs are translated
- Non-socket URIs are unchanged
- None provided uses default
"""
for input, expected in [
# Nothing is set
(None, ("redis://localhost:6379", "redis://localhost:6379")),
# celery style
(
"redis+socket:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
),
# redis-py / channels-redis style
(
"unix:///run/redis/redis.sock",
(
"redis+socket:///run/redis/redis.sock",
"unix:///run/redis/redis.sock",
),
),
# celery style with db
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
(
"redis+socket:///run/redis/redis.sock?virtual_host=5",
"unix:///run/redis/redis.sock?db=5",
),
),
# redis-py / channels-redis style with db
(
"unix:///run/redis/redis.sock?db=10",
(
"redis+socket:///run/redis/redis.sock?virtual_host=10",
"unix:///run/redis/redis.sock?db=10",
),
),
# Just a host with a port
(
"redis://myredishost:6379",
("redis://myredishost:6379", "redis://myredishost:6379"),
),
]:
result = _parse_redis_url(input)
self.assertTupleEqual(expected, result)
class TestCeleryScheduleParsing(TestCase):
MAIL_EXPIRE_TIME = 9.0 * 60.0
CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0
INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
SANITY_EXPIRE_TIME = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
EMPTY_TRASH_EXPIRE_TIME = 23.0 * 60.0 * 60.0
RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME = 59.0 * 60.0
LLM_INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME = 23.0 * 60.0 * 60.0
def test_schedule_configuration_default(self) -> None:
"""
GIVEN:
- No configured task schedules
WHEN:
- The celery beat schedule is built
THEN:
- The default schedule is returned
"""
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_changed(self) -> None:
"""
GIVEN:
- Email task is configured non-default
WHEN:
- The celery beat schedule is built
THEN:
- The email task is configured per environment
- The default schedule is returned for other tasks
"""
with mock.patch.dict(
os.environ,
{"PAPERLESS_EMAIL_TASK_CRON": "*/50 * * * mon"},
):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/50", day_of_week="mon"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_disabled(self) -> None:
"""
GIVEN:
- Search index task is disabled
WHEN:
- The celery beat schedule is built
THEN:
- The search index task is not present
- The default schedule is returned for other tasks
"""
with mock.patch.dict(os.environ, {"PAPERLESS_INDEX_TASK_CRON": "disable"}):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
"Empty trash": {
"task": "documents.tasks.empty_trash",
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
"Rebuild LLM index": {
"task": "documents.tasks.llmindex_index",
"schedule": crontab(minute=10, hour=2),
"options": {
"expires": self.LLM_INDEX_EXPIRE_TIME,
},
},
"Cleanup expired share link bundles": {
"task": "documents.tasks.cleanup_expired_share_link_bundles",
"schedule": crontab(minute=0, hour=2),
"options": {
"expires": self.CLEANUP_EXPIRED_SHARE_BUNDLES_EXPIRE_TIME,
},
},
},
schedule,
)
def test_schedule_configuration_disabled_all(self) -> None:
"""
GIVEN:
- All tasks are disabled
WHEN:
- The celery beat schedule is built
THEN:
- No tasks are scheduled
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_EMAIL_TASK_CRON": "disable",
"PAPERLESS_TRAIN_TASK_CRON": "disable",
"PAPERLESS_SANITY_TASK_CRON": "disable",
"PAPERLESS_INDEX_TASK_CRON": "disable",
"PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
"PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
"PAPERLESS_LLM_INDEX_TASK_CRON": "disable",
"PAPERLESS_SHARE_LINK_BUNDLE_CLEANUP_CRON": "disable",
},
):
schedule = _parse_beat_schedule()
self.assertDictEqual(
{},
schedule,
)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""
GIVEN:
- PAPERLESS_URL is set
WHEN:
- The URL is parsed
THEN:
- The URL is returned and present in related settings
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_URL": "https://example.com",
},
):
url = _parse_paperless_url()
self.assertEqual("https://example.com", url)
from django.conf import settings
self.assertIn(url, settings.CSRF_TRUSTED_ORIGINS)
self.assertIn(url, settings.CORS_ALLOWED_ORIGINS)
class TestPathSettings(TestCase):
def test_default_paths(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is not set
WHEN:
- Settings are parsed
THEN:
- Paths are as expected
"""
base_paths = _parse_base_paths()
self.assertEqual(None, base_paths[0]) # FORCE_SCRIPT_NAME
self.assertEqual("/", base_paths[1]) # BASE_URL
self.assertEqual("/accounts/login/", base_paths[2]) # LOGIN_URL
self.assertEqual("/dashboard", base_paths[3]) # LOGIN_REDIRECT_URL
self.assertEqual(
"/accounts/login/?loggedout=1",
base_paths[4],
) # LOGOUT_REDIRECT_URL
@mock.patch("os.environ", {"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless"})
def test_subpath(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is set
WHEN:
- Settings are parsed
THEN:
- The path is returned and present in related settings
"""
base_paths = _parse_base_paths()
self.assertEqual("/paperless", base_paths[0]) # FORCE_SCRIPT_NAME
self.assertEqual("/paperless/", base_paths[1]) # BASE_URL
self.assertEqual("/paperless/accounts/login/", base_paths[2]) # LOGIN_URL
self.assertEqual("/paperless/dashboard", base_paths[3]) # LOGIN_REDIRECT_URL
self.assertEqual(
"/paperless/accounts/login/?loggedout=1",
base_paths[4],
) # LOGOUT_REDIRECT_URL
@mock.patch(
"os.environ",
{
"PAPERLESS_FORCE_SCRIPT_NAME": "/paperless",
"PAPERLESS_LOGOUT_REDIRECT_URL": "/foobar/",
},
)
def test_subpath_with_explicit_logout_url(self) -> None:
"""
GIVEN:
- PAPERLESS_FORCE_SCRIPT_NAME is set and so is PAPERLESS_LOGOUT_REDIRECT_URL
WHEN:
- Settings are parsed
THEN:
- The correct logout redirect URL is returned
"""
base_paths = _parse_base_paths()
self.assertEqual("/paperless/", base_paths[1]) # BASE_URL
self.assertEqual("/foobar/", base_paths[4]) # LOGOUT_REDIRECT_URL
@pytest.mark.parametrize(
("languages", "expected"),
[
("de", ["de"]),
("zh", ["zh"]),
("fr+en", ["fr", "en"]),
# Locales must be supported
("en-001+fr-CA", ["en-001", "fr-CA"]),
("en-001+fr", ["en-001", "fr"]),
# Special case for Chinese: variants seem to miss some dates,
# so we always add "zh" as a fallback.
("en+zh-Hans-HK", ["en", "zh-Hans-HK", "zh"]),
("en+zh-Hans", ["en", "zh-Hans", "zh"]),
("en+zh-Hans+zh-Hant", ["en", "zh-Hans", "zh-Hant", "zh"]),
],
)
def test_parser_date_parser_languages(languages, expected) -> None:
assert sorted(_parse_dateparser_languages(languages)) == sorted(expected)

View File

@@ -9,50 +9,35 @@ from paperless.utils import ocr_to_dateparser_languages
@pytest.mark.parametrize(
("ocr_language", "expected"),
[
pytest.param("eng", ["en"], id="single-language"),
pytest.param("fra+ita+lao", ["fr", "it", "lo"], id="multiple-languages"),
pytest.param("fil", ["fil"], id="no-two-letter-equivalent"),
pytest.param(
"aze_cyrl+srp_latn",
["az-Cyrl", "sr-Latn"],
id="script-supported-by-dateparser",
),
pytest.param(
"deu_frak",
["de"],
id="script-not-supported-falls-back-to-language",
),
pytest.param(
"chi_tra+chi_sim",
["zh"],
id="chinese-variants-collapse-to-general",
),
pytest.param(
"eng+unsupported_language+por",
["en", "pt"],
id="unsupported-language-skipped",
),
pytest.param(
"unsupported1+unsupported2",
[],
id="all-unsupported-returns-empty",
),
pytest.param("eng+eng", ["en"], id="duplicates-deduplicated"),
pytest.param(
"ita_unknownscript",
["it"],
id="unknown-script-falls-back-to-language",
),
# One language
("eng", ["en"]),
# Multiple languages
("fra+ita+lao", ["fr", "it", "lo"]),
# Languages that don't have a two-letter equivalent
("fil", ["fil"]),
# Languages with a script part supported by dateparser
("aze_cyrl+srp_latn", ["az-Cyrl", "sr-Latn"]),
# Languages with a script part not supported by dateparser
# In this case, default to the language without script
("deu_frak", ["de"]),
# Traditional and simplified chinese don't have the same name in dateparser,
# so they're converted to the general chinese language
("chi_tra+chi_sim", ["zh"]),
# If a language is not supported by dateparser, fallback to the supported ones
("eng+unsupported_language+por", ["en", "pt"]),
# If no language is supported, fallback to default
("unsupported1+unsupported2", []),
# Duplicate languages, should not duplicate in result
("eng+eng", ["en"]),
# Language with script, but script is not mapped
("ita_unknownscript", ["it"]),
],
)
def test_ocr_to_dateparser_languages(ocr_language: str, expected: list[str]) -> None:
def test_ocr_to_dateparser_languages(ocr_language, expected):
assert sorted(ocr_to_dateparser_languages(ocr_language)) == sorted(expected)
def test_ocr_to_dateparser_languages_exception(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
def test_ocr_to_dateparser_languages_exception(monkeypatch, caplog):
# Patch LocaleDataLoader.get_locale_map to raise an exception
class DummyLoader:
def get_locale_map(self, locales=None):

View File

@@ -1,31 +1,24 @@
import tempfile
from pathlib import Path
from django.test import Client
from pytest_django.fixtures import SettingsWrapper
from django.test import override_settings
def test_favicon_view(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
favicon_path = tmp_path / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
def test_favicon_view(client):
with tempfile.TemporaryDirectory() as tmpdir:
static_dir = Path(tmpdir)
favicon_path = static_dir / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True, exist_ok=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
with override_settings(STATIC_ROOT=static_dir):
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
def test_favicon_view_missing_file(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 404
def test_favicon_view_missing_file(client):
with override_settings(STATIC_ROOT=Path(tempfile.mkdtemp())):
response = client.get("/favicon.ico")
assert response.status_code == 404