Compare commits

..

2 Commits

Author SHA1 Message Date
Trenton H
57ea7a716b Chore: convert test_adapter.py to pytest style 2026-03-03 13:45:01 -08:00
Trenton H
ba7b538398 Chore: convert paperless unit tests to pytest style 2026-03-03 13:27:20 -08:00
18 changed files with 459 additions and 1170 deletions

View File

@@ -305,7 +305,6 @@ markers = [
"greenmail: Tests requiring Greenmail service",
"date_parsing: Tests which cover date parsing from content or filename",
"management: Tests which cover management commands/functionality",
"profiling: Benchmarks that profile and compare implementation performance",
]
[tool.pytest_env]

View File

@@ -1238,8 +1238,8 @@
<context context-type="linenumber">82</context>
</context-group>
</trans-unit>
<trans-unit id="7860582931776068318" datatype="html">
<source>Add document version</source>
<trans-unit id="8035757452478567832" datatype="html">
<source>Update existing document</source>
<context-group purpose="location">
<context context-type="sourcefile">src/app/components/admin/settings/settings.component.html</context>
<context context-type="linenumber">280</context>
@@ -8411,8 +8411,8 @@
<context context-type="linenumber">832</context>
</context-group>
</trans-unit>
<trans-unit id="5203024009814367559" datatype="html">
<source>This operation will add rotated versions of the <x id="PH" equiv-text="this.list.selected.size"/> document(s).</source>
<trans-unit id="6390006284731990222" datatype="html">
<source>This operation will permanently rotate the original version of <x id="PH" equiv-text="this.list.selected.size"/> document(s).</source>
<context-group purpose="location">
<context context-type="sourcefile">src/app/components/document-list/bulk-editor/bulk-editor.component.ts</context>
<context context-type="linenumber">833</context>

View File

@@ -277,7 +277,7 @@
<div class="col">
<select class="form-select" formControlName="pdfEditorDefaultEditMode">
<option [ngValue]="PdfEditorEditMode.Create" i18n>Create new document(s)</option>
<option [ngValue]="PdfEditorEditMode.Update" i18n>Add document version</option>
<option [ngValue]="PdfEditorEditMode.Update" i18n>Update existing document</option>
</select>
</div>
</div>

View File

@@ -84,7 +84,7 @@
<input type="radio" class="btn-check" [(ngModel)]="editMode" [value]="PdfEditorEditMode.Update" id="editModeUpdate" name="editmode" [disabled]="hasSplit()">
<label for="editModeUpdate" class="btn btn-outline-primary btn-sm">
<i-bs name="pencil"></i-bs>
<span class="form-check-label ms-2" i18n>Add document version</span>
<span class="form-check-label ms-2" i18n>Update existing document</span>
</label>
</div>
@if (editMode === PdfEditorEditMode.Create) {

View File

@@ -830,7 +830,7 @@ export class BulkEditorComponent
})
const rotateDialog = modal.componentInstance as RotateConfirmDialogComponent
rotateDialog.title = $localize`Rotate confirm`
rotateDialog.messageBold = $localize`This operation will add rotated versions of the ${this.list.selected.size} document(s).`
rotateDialog.messageBold = $localize`This operation will permanently rotate the original version of ${this.list.selected.size} document(s).`
rotateDialog.btnClass = 'btn-danger'
rotateDialog.btnCaption = $localize`Proceed`
rotateDialog.documentID = Array.from(this.list.selected)[0]

View File

@@ -3,8 +3,6 @@ import json
import os
import shutil
import tempfile
from itertools import chain
from itertools import islice
from pathlib import Path
from typing import TYPE_CHECKING
@@ -21,7 +19,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core import serializers
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.utils import timezone
from filelock import FileLock
@@ -29,8 +26,6 @@ from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
if TYPE_CHECKING:
from collections.abc import Generator
from django.db.models import QuerySet
if settings.AUDIT_LOG_ENABLED:
@@ -65,22 +60,6 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
def serialize_queryset_batched(
queryset: "QuerySet",
*,
batch_size: int = 500,
) -> "Generator[list[dict], None, None]":
"""Yield batches of serialized records from a QuerySet.
Each batch is a list of dicts in Django's Python serialization format.
Uses QuerySet.iterator() to avoid loading the full queryset into memory,
and islice to collect chunk-sized batches serialized in a single call.
"""
iterator = queryset.iterator(chunk_size=batch_size)
while chunk := list(islice(iterator, batch_size)):
yield serializers.serialize("python", chunk)
class Command(CryptMixin, BaseCommand):
help = (
"Decrypt and rename all files in our collection into a given target "
@@ -207,17 +186,6 @@ class Command(CryptMixin, BaseCommand):
help="If provided, is used to encrypt sensitive data in the export",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help=(
"Number of records to process per batch during serialization. "
"Lower values reduce peak memory usage; higher values improve "
"throughput. Default: 500."
),
)
def handle(self, *args, **options) -> None:
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
@@ -232,7 +200,6 @@ class Command(CryptMixin, BaseCommand):
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: str | None = options.get("passphrase")
self.batch_size: int = options["batch_size"]
self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set()
@@ -327,13 +294,8 @@ class Command(CryptMixin, BaseCommand):
# Build an overall manifest
for key, object_query in manifest_key_to_object_query.items():
manifest_dict[key] = list(
chain.from_iterable(
serialize_queryset_batched(
object_query,
batch_size=self.batch_size,
),
),
manifest_dict[key] = json.loads(
serializers.serialize("json", object_query),
)
self.encrypt_secret_fields(manifest_dict)
@@ -550,24 +512,14 @@ class Command(CryptMixin, BaseCommand):
self.files_in_export_dir.remove(target)
if self.compare_json:
target_checksum = hashlib.md5(target.read_bytes()).hexdigest()
src_str = json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
)
src_str = json.dumps(content, indent=2, ensure_ascii=False)
src_checksum = hashlib.md5(src_str.encode("utf-8")).hexdigest()
if src_checksum == target_checksum:
perform_write = False
if perform_write:
target.write_text(
json.dumps(
content,
cls=DjangoJSONEncoder,
indent=2,
ensure_ascii=False,
),
json.dumps(content, indent=2, ensure_ascii=False),
encoding="utf-8",
)

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.2.11 on 2026-03-03 16:27
# Generated by Django 5.2.7 on 2026-01-15 22:08
import datetime
@@ -21,207 +21,6 @@ class Migration(migrations.Migration):
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
replaces = [
("documents", "0001_initial"),
("documents", "0002_auto_20151226_1316"),
("documents", "0003_sender"),
("documents", "0004_auto_20160114_1844"),
(
"documents",
"0004_auto_20160114_1844_squashed_0011_auto_20160303_1929",
),
("documents", "0005_auto_20160123_0313"),
("documents", "0006_auto_20160123_0430"),
("documents", "0007_auto_20160126_2114"),
("documents", "0008_document_file_type"),
("documents", "0009_auto_20160214_0040"),
("documents", "0010_log"),
("documents", "0011_auto_20160303_1929"),
("documents", "0012_auto_20160305_0040"),
("documents", "0013_auto_20160325_2111"),
("documents", "0014_document_checksum"),
("documents", "0015_add_insensitive_to_match"),
(
"documents",
"0015_add_insensitive_to_match_squashed_0018_auto_20170715_1712",
),
("documents", "0016_auto_20170325_1558"),
("documents", "0017_auto_20170512_0507"),
("documents", "0018_auto_20170715_1712"),
("documents", "0019_add_consumer_user"),
("documents", "0020_document_added"),
("documents", "0021_document_storage_type"),
("documents", "0022_auto_20181007_1420"),
("documents", "0023_document_current_filename"),
("documents", "1000_update_paperless_all"),
("documents", "1001_auto_20201109_1636"),
("documents", "1002_auto_20201111_1105"),
("documents", "1003_mime_types"),
("documents", "1004_sanity_check_schedule"),
("documents", "1005_checksums"),
("documents", "1006_auto_20201208_2209"),
(
"documents",
"1006_auto_20201208_2209_squashed_1011_auto_20210101_2340",
),
("documents", "1007_savedview_savedviewfilterrule"),
("documents", "1008_auto_20201216_1736"),
("documents", "1009_auto_20201216_2005"),
("documents", "1010_auto_20210101_2159"),
("documents", "1011_auto_20210101_2340"),
("documents", "1012_fix_archive_files"),
("documents", "1013_migrate_tag_colour"),
("documents", "1014_auto_20210228_1614"),
("documents", "1015_remove_null_characters"),
("documents", "1016_auto_20210317_1351"),
(
"documents",
"1016_auto_20210317_1351_squashed_1020_merge_20220518_1839",
),
("documents", "1017_alter_savedviewfilterrule_rule_type"),
("documents", "1018_alter_savedviewfilterrule_value"),
("documents", "1019_storagepath_document_storage_path"),
("documents", "1019_uisettings"),
("documents", "1020_merge_20220518_1839"),
("documents", "1021_webp_thumbnail_conversion"),
("documents", "1022_paperlesstask"),
(
"documents",
"1022_paperlesstask_squashed_1036_alter_savedviewfilterrule_rule_type",
),
("documents", "1023_add_comments"),
("documents", "1024_document_original_filename"),
("documents", "1025_alter_savedviewfilterrule_rule_type"),
("documents", "1026_transition_to_celery"),
(
"documents",
"1027_remove_paperlesstask_attempted_task_and_more",
),
(
"documents",
"1028_remove_paperlesstask_task_args_and_more",
),
("documents", "1029_alter_document_archive_serial_number"),
("documents", "1030_alter_paperlesstask_task_file_name"),
(
"documents",
"1031_remove_savedview_user_correspondent_owner_and_more",
),
(
"documents",
"1032_alter_correspondent_matching_algorithm_and_more",
),
(
"documents",
"1033_alter_documenttype_options_alter_tag_options_and_more",
),
("documents", "1034_alter_savedviewfilterrule_rule_type"),
("documents", "1035_rename_comment_note"),
("documents", "1036_alter_savedviewfilterrule_rule_type"),
("documents", "1037_webp_encrypted_thumbnail_conversion"),
("documents", "1038_sharelink"),
("documents", "1039_consumptiontemplate"),
(
"documents",
"1040_customfield_customfieldinstance_and_more",
),
("documents", "1041_alter_consumptiontemplate_sources"),
(
"documents",
"1042_consumptiontemplate_assign_custom_fields_and_more",
),
("documents", "1043_alter_savedviewfilterrule_rule_type"),
(
"documents",
"1044_workflow_workflowaction_workflowtrigger_and_more",
),
(
"documents",
"1045_alter_customfieldinstance_value_monetary",
),
(
"documents",
"1045_alter_customfieldinstance_value_monetary_squashed_1049_document_deleted_at_document_restored_at",
),
(
"documents",
"1046_workflowaction_remove_all_correspondents_and_more",
),
("documents", "1047_savedview_display_mode_and_more"),
("documents", "1048_alter_savedviewfilterrule_rule_type"),
(
"documents",
"1049_document_deleted_at_document_restored_at",
),
("documents", "1050_customfield_extra_data_and_more"),
(
"documents",
"1051_alter_correspondent_owner_alter_document_owner_and_more",
),
("documents", "1052_document_transaction_id"),
("documents", "1053_document_page_count"),
(
"documents",
"1054_customfieldinstance_value_monetary_amount_and_more",
),
("documents", "1055_alter_storagepath_path"),
(
"documents",
"1056_customfieldinstance_deleted_at_and_more",
),
("documents", "1057_paperlesstask_owner"),
(
"documents",
"1058_workflowtrigger_schedule_date_custom_field_and_more",
),
(
"documents",
"1059_workflowactionemail_workflowactionwebhook_and_more",
),
(
"documents",
"1060_alter_customfieldinstance_value_select",
),
("documents", "1061_workflowactionwebhook_as_json"),
("documents", "1062_alter_savedviewfilterrule_rule_type"),
(
"documents",
"1063_paperlesstask_type_alter_paperlesstask_task_name_and_more",
),
("documents", "1064_delete_log"),
(
"documents",
"1065_workflowaction_assign_custom_fields_values",
),
(
"documents",
"1066_alter_workflowtrigger_schedule_offset_days",
),
("documents", "1067_alter_document_created"),
("documents", "1068_alter_document_created"),
(
"documents",
"1069_workflowtrigger_filter_has_storage_path_and_more",
),
(
"documents",
"1070_customfieldinstance_value_long_text_and_more",
),
(
"documents",
"1071_tag_tn_ancestors_count_tag_tn_ancestors_pks_and_more",
),
(
"documents",
"1072_workflowtrigger_filter_custom_field_query_and_more",
),
("documents", "1073_migrate_workflow_title_jinja"),
(
"documents",
"1074_workflowrun_deleted_at_workflowrun_restored_at_and_more",
),
]
operations = [
migrations.CreateModel(
name="WorkflowActionEmail",
@@ -386,6 +185,70 @@ class Migration(migrations.Migration):
"abstract": False,
},
),
migrations.CreateModel(
name="CustomField",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"created",
models.DateTimeField(
db_index=True,
default=django.utils.timezone.now,
editable=False,
verbose_name="created",
),
),
("name", models.CharField(max_length=128)),
(
"data_type",
models.CharField(
choices=[
("string", "String"),
("url", "URL"),
("date", "Date"),
("boolean", "Boolean"),
("integer", "Integer"),
("float", "Float"),
("monetary", "Monetary"),
("documentlink", "Document Link"),
("select", "Select"),
("longtext", "Long Text"),
],
editable=False,
max_length=50,
verbose_name="data type",
),
),
(
"extra_data",
models.JSONField(
blank=True,
help_text="Extra data for the custom field, such as select options",
null=True,
verbose_name="extra data",
),
),
],
options={
"verbose_name": "custom field",
"verbose_name_plural": "custom fields",
"ordering": ("created",),
"constraints": [
models.UniqueConstraint(
fields=("name",),
name="documents_customfield_unique_name",
),
],
},
),
migrations.CreateModel(
name="DocumentType",
fields=[
@@ -870,6 +733,17 @@ class Migration(migrations.Migration):
verbose_name="correspondent",
),
),
(
"owner",
models.ForeignKey(
blank=True,
default=None,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
verbose_name="owner",
),
),
(
"document_type",
models.ForeignKey(
@@ -893,14 +767,12 @@ class Migration(migrations.Migration):
),
),
(
"owner",
models.ForeignKey(
"tags",
models.ManyToManyField(
blank=True,
default=None,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
verbose_name="owner",
related_name="documents",
to="documents.tag",
verbose_name="tags",
),
),
],
@@ -910,140 +782,6 @@ class Migration(migrations.Migration):
"ordering": ("-created",),
},
),
migrations.AddField(
model_name="document",
name="tags",
field=models.ManyToManyField(
blank=True,
related_name="documents",
to="documents.tag",
verbose_name="tags",
),
),
migrations.CreateModel(
name="Note",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("deleted_at", models.DateTimeField(blank=True, null=True)),
("restored_at", models.DateTimeField(blank=True, null=True)),
("transaction_id", models.UUIDField(blank=True, null=True)),
(
"note",
models.TextField(
blank=True,
help_text="Note for the document",
verbose_name="content",
),
),
(
"created",
models.DateTimeField(
db_index=True,
default=django.utils.timezone.now,
verbose_name="created",
),
),
(
"document",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="notes",
to="documents.document",
verbose_name="document",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="notes",
to=settings.AUTH_USER_MODEL,
verbose_name="user",
),
),
],
options={
"verbose_name": "note",
"verbose_name_plural": "notes",
"ordering": ("created",),
},
),
migrations.CreateModel(
name="CustomField",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"created",
models.DateTimeField(
db_index=True,
default=django.utils.timezone.now,
editable=False,
verbose_name="created",
),
),
("name", models.CharField(max_length=128)),
(
"data_type",
models.CharField(
choices=[
("string", "String"),
("url", "URL"),
("date", "Date"),
("boolean", "Boolean"),
("integer", "Integer"),
("float", "Float"),
("monetary", "Monetary"),
("documentlink", "Document Link"),
("select", "Select"),
("longtext", "Long Text"),
],
editable=False,
max_length=50,
verbose_name="data type",
),
),
(
"extra_data",
models.JSONField(
blank=True,
help_text="Extra data for the custom field, such as select options",
null=True,
verbose_name="extra data",
),
),
],
options={
"verbose_name": "custom field",
"verbose_name_plural": "custom fields",
"ordering": ("created",),
"constraints": [
models.UniqueConstraint(
fields=("name",),
name="documents_customfield_unique_name",
),
],
},
),
migrations.CreateModel(
name="CustomFieldInstance",
fields=[
@@ -1142,6 +880,66 @@ class Migration(migrations.Migration):
"ordering": ("created",),
},
),
migrations.CreateModel(
name="Note",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("deleted_at", models.DateTimeField(blank=True, null=True)),
("restored_at", models.DateTimeField(blank=True, null=True)),
("transaction_id", models.UUIDField(blank=True, null=True)),
(
"note",
models.TextField(
blank=True,
help_text="Note for the document",
verbose_name="content",
),
),
(
"created",
models.DateTimeField(
db_index=True,
default=django.utils.timezone.now,
verbose_name="created",
),
),
(
"document",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="notes",
to="documents.document",
verbose_name="document",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="notes",
to=settings.AUTH_USER_MODEL,
verbose_name="user",
),
),
],
options={
"verbose_name": "note",
"verbose_name_plural": "notes",
"ordering": ("created",),
},
),
migrations.CreateModel(
name="PaperlessTask",
fields=[
@@ -1188,6 +986,7 @@ class Migration(migrations.Migration):
("train_classifier", "Train Classifier"),
("check_sanity", "Check Sanity"),
("index_optimize", "Index Optimize"),
("llmindex_update", "LLM Index Update"),
],
help_text="Name of the task that was run",
max_length=255,
@@ -1581,7 +1380,6 @@ class Migration(migrations.Migration):
verbose_name="Workflow Action Type",
),
),
("order", models.PositiveIntegerField(default=0, verbose_name="order")),
(
"assign_title",
models.TextField(

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.2.11 on 2026-03-03 16:27
# Generated by Django 5.2.9 on 2026-01-20 18:46
import django.db.models.deletion
from django.db import migrations
@@ -9,14 +9,8 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
("documents", "0001_squashed"),
("paperless_mail", "0001_squashed"),
]
# This migration needs a "replaces", but it doesn't matter which.
# Chose the last 2.20.x migration
replaces = [
("documents", "1075_workflowaction_order"),
("documents", "0001_initial"),
("paperless_mail", "0001_initial"),
]
operations = [

View File

@@ -6,7 +6,7 @@ from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0002_squashed"),
("documents", "0002_initial"),
]
operations = [

View File

@@ -1,30 +0,0 @@
# Generated by Django 5.2.11 on 2026-03-03 16:42
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "0013_document_root_document"),
]
operations = [
migrations.AlterField(
model_name="paperlesstask",
name="task_name",
field=models.CharField(
choices=[
("consume_file", "Consume File"),
("train_classifier", "Train Classifier"),
("check_sanity", "Check Sanity"),
("index_optimize", "Index Optimize"),
("llmindex_update", "LLM Index Update"),
],
help_text="Name of the task that was run",
max_length=255,
null=True,
verbose_name="Task Name",
),
),
]

View File

@@ -1,71 +0,0 @@
"""
Temporary profiling utilities for comparing implementations.
Usage in a management command or shell::
from documents.profiling import profile_block
with profile_block("new check_sanity"):
messages = check_sanity()
with profile_block("old check_sanity"):
messages = check_sanity_old()
Drop this file when done.
"""
from __future__ import annotations
import tracemalloc
from contextlib import contextmanager
from time import perf_counter
from typing import TYPE_CHECKING
from django.db import connection
from django.db import reset_queries
from django.test.utils import override_settings
if TYPE_CHECKING:
from collections.abc import Generator
@contextmanager
def profile_block(label: str = "block") -> Generator[None, None, None]:
"""Profile memory, wall time, and DB queries for a code block.
Prints a summary to stdout on exit. Requires no external packages.
Enables DEBUG temporarily to capture Django's query log.
"""
tracemalloc.start()
snapshot_before = tracemalloc.take_snapshot()
with override_settings(DEBUG=True):
reset_queries()
start = perf_counter()
yield
elapsed = perf_counter() - start
queries = list(connection.queries)
snapshot_after = tracemalloc.take_snapshot()
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Compare snapshots for top allocations
stats = snapshot_after.compare_to(snapshot_before, "lineno")
query_time = sum(float(q["time"]) for q in queries)
mem_diff = sum(s.size_diff for s in stats)
print(f"\n{'=' * 60}") # noqa: T201
print(f" Profile: {label}") # noqa: T201
print(f"{'=' * 60}") # noqa: T201
print(f" Wall time: {elapsed:.4f}s") # noqa: T201
print(f" Queries: {len(queries)} ({query_time:.4f}s)") # noqa: T201
print(f" Memory delta: {mem_diff / 1024:.1f} KiB") # noqa: T201
print(f" Peak memory: {peak / 1024:.1f} KiB") # noqa: T201
print("\n Top 5 allocations:") # noqa: T201
for stat in stats[:5]:
print(f" {stat}") # noqa: T201
print(f"{'=' * 60}\n") # noqa: T201

View File

@@ -204,61 +204,6 @@ def audit_log_check(app_configs, **kwargs):
return result
@register()
def check_v3_minimum_upgrade_version(
app_configs: object,
**kwargs: object,
) -> list[Error]:
"""Enforce that upgrades to v3 must start from v2.20.9.
v3 squashes all prior migrations into 0001_squashed and 0002_squashed.
If a user skips v2.20.9, the data migration in 1075_workflowaction_order
never runs and the squash may apply schema changes against an incomplete
database state.
"""
from django.db import DatabaseError
from django.db import OperationalError
try:
all_tables = connections["default"].introspection.table_names()
if "django_migrations" not in all_tables:
return []
with connections["default"].cursor() as cursor:
cursor.execute(
"SELECT name FROM django_migrations WHERE app = %s",
["documents"],
)
applied: set[str] = {row[0] for row in cursor.fetchall()}
if not applied:
return []
# Already in a valid v3 state
if {"0001_squashed", "0002_squashed"} & applied:
return []
# On v2.20.9 exactly — squash will pick up cleanly from here
if "1075_workflowaction_order" in applied:
return []
except (DatabaseError, OperationalError):
return []
return [
Error(
"Cannot upgrade to Paperless-ngx v3 from this version.",
hint=(
"Upgrading to v3 can only be performed from v2.20.9."
"Please upgrade to v2.20.9, run migrations, then upgrade to v3."
"See https://docs.paperless-ngx.com/setup/#upgrading for details."
),
id="paperless.E002",
),
]
@register()
def check_deprecated_db_settings(
app_configs: object,

View File

@@ -1,107 +1,100 @@
from unittest import mock
import logging
import pytest
from allauth.account.adapter import get_adapter
from allauth.core import context
from allauth.socialaccount.adapter import get_adapter as get_social_adapter
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.forms import ValidationError
from django.http import HttpRequest
from django.test import TestCase
from django.test import override_settings
from django.urls import reverse
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from rest_framework.authtoken.models import Token
from paperless.adapter import DrfTokenStrategy
class TestCustomAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
@pytest.mark.django_db
class TestCustomAccountAdapter:
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
adapter = get_adapter()
# With no accounts, signups should be allowed
self.assertTrue(adapter.is_open_for_signup(None))
assert adapter.is_open_for_signup(None)
User.objects.create_user("testuser")
# Test when ACCOUNT_ALLOW_SIGNUPS is True
settings.ACCOUNT_ALLOW_SIGNUPS = True
self.assertTrue(adapter.is_open_for_signup(None))
assert adapter.is_open_for_signup(None)
# Test when ACCOUNT_ALLOW_SIGNUPS is False
settings.ACCOUNT_ALLOW_SIGNUPS = False
self.assertFalse(adapter.is_open_for_signup(None))
assert not adapter.is_open_for_signup(None)
def test_is_safe_url(self) -> None:
def test_is_safe_url(self, settings: SettingsWrapper) -> None:
request = HttpRequest()
request.get_host = mock.Mock(return_value="example.com")
request.get_host = lambda: "example.com"
with context.request_context(request):
adapter = get_adapter()
with override_settings(ALLOWED_HOSTS=["*"]):
# True because request host is same
url = "https://example.com"
self.assertTrue(adapter.is_safe_url(url))
url = "https://evil.com"
settings.ALLOWED_HOSTS = ["*"]
# True because request host is same
assert adapter.is_safe_url("https://example.com")
# False despite wildcard because request host is different
self.assertFalse(adapter.is_safe_url(url))
assert not adapter.is_safe_url("https://evil.com")
settings.ALLOWED_HOSTS = ["example.com"]
url = "https://example.com"
# True because request host is same
self.assertTrue(adapter.is_safe_url(url))
assert adapter.is_safe_url("https://example.com")
settings.ALLOWED_HOSTS = ["*", "example.com"]
url = "//evil.com"
# False because request host is not in allowed hosts
self.assertFalse(adapter.is_safe_url(url))
assert not adapter.is_safe_url("//evil.com")
@mock.patch("allauth.core.internal.ratelimit.consume", return_value=True)
def test_pre_authenticate(self, mock_consume) -> None:
def test_pre_authenticate(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
mocker.patch("allauth.core.internal.ratelimit.consume", return_value=True)
adapter = get_adapter()
request = HttpRequest()
request.get_host = mock.Mock(return_value="example.com")
request.get_host = lambda: "example.com"
settings.DISABLE_REGULAR_LOGIN = False
adapter.pre_authenticate(request)
settings.DISABLE_REGULAR_LOGIN = True
with self.assertRaises(ValidationError):
with pytest.raises(ValidationError):
adapter.pre_authenticate(request)
def test_get_reset_password_from_key_url(self) -> None:
def test_get_reset_password_from_key_url(self, settings: SettingsWrapper) -> None:
request = HttpRequest()
request.get_host = mock.Mock(return_value="foo.org")
request.get_host = lambda: "foo.org"
with context.request_context(request):
adapter = get_adapter()
# Test when PAPERLESS_URL is None
with override_settings(
PAPERLESS_URL=None,
ACCOUNT_DEFAULT_HTTP_PROTOCOL="https",
):
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
settings.PAPERLESS_URL = None
settings.ACCOUNT_DEFAULT_HTTP_PROTOCOL = "https"
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
# Test when PAPERLESS_URL is not None
with override_settings(PAPERLESS_URL="https://bar.com"):
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
settings.PAPERLESS_URL = "https://bar.com"
expected_url = f"https://bar.com{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
assert adapter.get_reset_password_from_key_url("UID-KEY") == expected_url
@override_settings(ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
Group.objects.create(name="group1")
user = User.objects.create_user("testuser")
adapter = get_adapter()
form = mock.Mock(
form = mocker.MagicMock(
cleaned_data={
"username": "testuser",
"email": "user@example.com",
@@ -110,88 +103,81 @@ class TestCustomAccountAdapter(TestCase):
user = adapter.save_user(HttpRequest(), user, form, commit=True)
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
def test_fresh_install_save_creates_superuser(self) -> None:
def test_fresh_install_save_creates_superuser(self, mocker: MockerFixture) -> None:
adapter = get_adapter()
form = mock.Mock(
form = mocker.MagicMock(
cleaned_data={
"username": "testuser",
"email": "user@paperless-ngx.com",
},
)
user = adapter.save_user(HttpRequest(), User(), form, commit=True)
self.assertTrue(user.is_superuser)
assert user.is_superuser
# Next time, it should not create a superuser
form = mock.Mock(
form = mocker.MagicMock(
cleaned_data={
"username": "testuser2",
"email": "user2@paperless-ngx.com",
},
)
user2 = adapter.save_user(HttpRequest(), User(), form, commit=True)
self.assertFalse(user2.is_superuser)
assert not user2.is_superuser
class TestCustomSocialAccountAdapter(TestCase):
def test_is_open_for_signup(self) -> None:
class TestCustomSocialAccountAdapter:
@pytest.mark.django_db
def test_is_open_for_signup(self, settings: SettingsWrapper) -> None:
adapter = get_social_adapter()
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is True
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = True
self.assertTrue(adapter.is_open_for_signup(None, None))
assert adapter.is_open_for_signup(None, None)
# Test when SOCIALACCOUNT_ALLOW_SIGNUPS is False
settings.SOCIALACCOUNT_ALLOW_SIGNUPS = False
self.assertFalse(adapter.is_open_for_signup(None, None))
assert not adapter.is_open_for_signup(None, None)
def test_get_connect_redirect_url(self) -> None:
adapter = get_social_adapter()
request = None
socialaccount = None
assert adapter.get_connect_redirect_url(None, None) == reverse("base")
# Test the default URL
expected_url = reverse("base")
self.assertEqual(
adapter.get_connect_redirect_url(request, socialaccount),
expected_url,
)
@override_settings(SOCIAL_ACCOUNT_DEFAULT_GROUPS=["group1", "group2"])
def test_save_user_adds_groups(self) -> None:
@pytest.mark.django_db
def test_save_user_adds_groups(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
settings.SOCIAL_ACCOUNT_DEFAULT_GROUPS = ["group1", "group2"]
Group.objects.create(name="group1")
adapter = get_social_adapter()
request = HttpRequest()
user = User.objects.create_user("testuser")
sociallogin = mock.Mock(
user=user,
)
sociallogin = mocker.MagicMock(user=user)
user = adapter.save_user(request, sociallogin, None)
user = adapter.save_user(HttpRequest(), sociallogin, None)
self.assertEqual(user.groups.count(), 1)
self.assertTrue(user.groups.filter(name="group1").exists())
self.assertFalse(user.groups.filter(name="group2").exists())
assert user.groups.count() == 1
assert user.groups.filter(name="group1").exists()
assert not user.groups.filter(name="group2").exists()
def test_error_logged_on_authentication_error(self) -> None:
def test_error_logged_on_authentication_error(
self,
caplog: pytest.LogCaptureFixture,
) -> None:
adapter = get_social_adapter()
request = HttpRequest()
with self.assertLogs("paperless.auth", level="INFO") as log_cm:
with caplog.at_level(logging.INFO, logger="paperless.auth"):
adapter.on_authentication_error(
request,
HttpRequest(),
provider="test-provider",
error="Error",
exception="Test authentication error",
)
self.assertTrue(
any("Test authentication error" in message for message in log_cm.output),
)
assert any("Test authentication error" in msg for msg in caplog.messages)
class TestDrfTokenStrategy(TestCase):
@pytest.mark.django_db
class TestDrfTokenStrategy:
def test_create_access_token_creates_new_token(self) -> None:
"""
GIVEN:
@@ -201,7 +187,6 @@ class TestDrfTokenStrategy(TestCase):
THEN:
- A new token is created and its key is returned
"""
user = User.objects.create_user("testuser")
request = HttpRequest()
request.user = user
@@ -209,13 +194,9 @@ class TestDrfTokenStrategy(TestCase):
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
# Verify a token was created
self.assertIsNotNone(token_key)
self.assertTrue(Token.objects.filter(user=user).exists())
# Verify the returned key matches the created token
token = Token.objects.get(user=user)
self.assertEqual(token_key, token.key)
assert token_key is not None
assert Token.objects.filter(user=user).exists()
assert token_key == Token.objects.get(user=user).key
def test_create_access_token_returns_existing_token(self) -> None:
"""
@@ -226,7 +207,6 @@ class TestDrfTokenStrategy(TestCase):
THEN:
- The same token key is returned (no new token created)
"""
user = User.objects.create_user("testuser")
existing_token = Token.objects.create(user=user)
@@ -236,11 +216,8 @@ class TestDrfTokenStrategy(TestCase):
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
# Verify the existing token key is returned
self.assertEqual(token_key, existing_token.key)
# Verify only one token exists (no duplicate created)
self.assertEqual(Token.objects.filter(user=user).count(), 1)
assert token_key == existing_token.key
assert Token.objects.filter(user=user).count() == 1
def test_create_access_token_returns_none_for_unauthenticated_user(self) -> None:
"""
@@ -251,12 +228,11 @@ class TestDrfTokenStrategy(TestCase):
THEN:
- None is returned and no token is created
"""
request = HttpRequest()
request.user = AnonymousUser()
strategy = DrfTokenStrategy()
token_key = strategy.create_access_token(request)
self.assertIsNone(token_key)
self.assertEqual(Token.objects.count(), 0)
assert token_key is None
assert Token.objects.count() == 0

View File

@@ -1,73 +1,98 @@
import os
from dataclasses import dataclass
from pathlib import Path
from unittest import mock
import pytest
from django.core.checks import Error
from django.core.checks import Warning
from django.test import TestCase
from django.test import override_settings
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless.checks import audit_log_check
from paperless.checks import binaries_check
from paperless.checks import check_deprecated_db_settings
from paperless.checks import check_v3_minimum_upgrade_version
from paperless.checks import debug_mode_check
from paperless.checks import paths_check
from paperless.checks import settings_values_check
class TestChecks(DirectoriesMixin, TestCase):
def test_binaries(self) -> None:
self.assertEqual(binaries_check(None), [])
@dataclass(frozen=True, slots=True)
class PaperlessTestDirs:
data_dir: Path
media_dir: Path
consumption_dir: Path
@override_settings(CONVERT_BINARY="uuuhh")
def test_binaries_fail(self) -> None:
self.assertEqual(len(binaries_check(None)), 1)
def test_paths_check(self) -> None:
self.assertEqual(paths_check(None), [])
# TODO: consolidate with documents/tests/conftest.py PaperlessDirs/paperless_dirs
# once the paperless and documents test suites are ready to share fixtures.
@pytest.fixture()
def directories(tmp_path: Path, settings: SettingsWrapper) -> PaperlessTestDirs:
data_dir = tmp_path / "data"
media_dir = tmp_path / "media"
consumption_dir = tmp_path / "consumption"
@override_settings(
MEDIA_ROOT=Path("uuh"),
DATA_DIR=Path("whatever"),
CONSUMPTION_DIR=Path("idontcare"),
for d in (data_dir, media_dir, consumption_dir):
d.mkdir()
settings.DATA_DIR = data_dir
settings.MEDIA_ROOT = media_dir
settings.CONSUMPTION_DIR = consumption_dir
return PaperlessTestDirs(
data_dir=data_dir,
media_dir=media_dir,
consumption_dir=consumption_dir,
)
def test_paths_check_dont_exist(self) -> None:
msgs = paths_check(None)
self.assertEqual(len(msgs), 3, str(msgs))
for msg in msgs:
self.assertTrue(msg.msg.endswith("is set but doesn't exist."))
def test_paths_check_no_access(self) -> None:
Path(self.dirs.data_dir).chmod(0o000)
Path(self.dirs.media_dir).chmod(0o000)
Path(self.dirs.consumption_dir).chmod(0o000)
class TestChecks:
def test_binaries(self) -> None:
assert binaries_check(None) == []
self.addCleanup(os.chmod, self.dirs.data_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.media_dir, 0o777)
self.addCleanup(os.chmod, self.dirs.consumption_dir, 0o777)
def test_binaries_fail(self, settings: SettingsWrapper) -> None:
settings.CONVERT_BINARY = "uuuhh"
assert len(binaries_check(None)) == 1
@pytest.mark.usefixtures("directories")
def test_paths_check(self) -> None:
assert paths_check(None) == []
def test_paths_check_dont_exist(self, settings: SettingsWrapper) -> None:
settings.MEDIA_ROOT = Path("uuh")
settings.DATA_DIR = Path("whatever")
settings.CONSUMPTION_DIR = Path("idontcare")
msgs = paths_check(None)
self.assertEqual(len(msgs), 3)
assert len(msgs) == 3, str(msgs)
for msg in msgs:
self.assertTrue(msg.msg.endswith("is not writeable"))
assert msg.msg.endswith("is set but doesn't exist.")
@override_settings(DEBUG=False)
def test_debug_disabled(self) -> None:
self.assertEqual(debug_mode_check(None), [])
def test_paths_check_no_access(self, directories: PaperlessTestDirs) -> None:
directories.data_dir.chmod(0o000)
directories.media_dir.chmod(0o000)
directories.consumption_dir.chmod(0o000)
@override_settings(DEBUG=True)
def test_debug_enabled(self) -> None:
self.assertEqual(len(debug_mode_check(None)), 1)
try:
msgs = paths_check(None)
finally:
directories.data_dir.chmod(0o777)
directories.media_dir.chmod(0o777)
directories.consumption_dir.chmod(0o777)
assert len(msgs) == 3
for msg in msgs:
assert msg.msg.endswith("is not writeable")
def test_debug_disabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = False
assert debug_mode_check(None) == []
def test_debug_enabled(self, settings: SettingsWrapper) -> None:
settings.DEBUG = True
assert len(debug_mode_check(None)) == 1
class TestSettingsChecksAgainstDefaults(DirectoriesMixin, TestCase):
class TestSettingsChecksAgainstDefaults:
def test_all_valid(self) -> None:
"""
GIVEN:
@@ -78,104 +103,71 @@ class TestSettingsChecksAgainstDefaults(DirectoriesMixin, TestCase):
- No system check errors reported
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 0)
assert len(msgs) == 0
class TestOcrSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(OCR_OUTPUT_TYPE="notapdf")
def test_invalid_output_type(self) -> None:
class TestOcrSettingsChecks:
@pytest.mark.parametrize(
("setting", "value", "expected_msg"),
[
pytest.param(
"OCR_OUTPUT_TYPE",
"notapdf",
'OCR output type "notapdf"',
id="invalid-output-type",
),
pytest.param(
"OCR_MODE",
"makeitso",
'OCR output mode "makeitso"',
id="invalid-mode",
),
pytest.param(
"OCR_MODE",
"skip_noarchive",
"deprecated",
id="deprecated-mode",
),
pytest.param(
"OCR_SKIP_ARCHIVE_FILE",
"invalid",
'OCR_SKIP_ARCHIVE_FILE setting "invalid"',
id="invalid-skip-archive-file",
),
pytest.param(
"OCR_CLEAN",
"cleanme",
'OCR clean mode "cleanme"',
id="invalid-clean",
),
],
)
def test_invalid_setting_produces_one_error(
self,
settings: SettingsWrapper,
setting: str,
value: str,
expected_msg: str,
) -> None:
"""
GIVEN:
- Default settings
- OCR output type is invalid
- One OCR setting is set to an invalid value
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR output type
- Exactly one system check error is reported containing the expected message
"""
setattr(settings, setting, value)
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR output type "notapdf"', msg.msg)
@override_settings(OCR_MODE="makeitso")
def test_invalid_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR output mode "makeitso"', msg.msg)
@override_settings(OCR_MODE="skip_noarchive")
def test_deprecated_ocr_type(self) -> None:
"""
GIVEN:
- Default settings
- OCR type is deprecated
WHEN:
- Settings are validated
THEN:
- deprecation warning reported for OCR type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("deprecated", msg.msg)
@override_settings(OCR_SKIP_ARCHIVE_FILE="invalid")
def test_invalid_ocr_skip_archive_file(self) -> None:
"""
GIVEN:
- Default settings
- OCR_SKIP_ARCHIVE_FILE is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR_SKIP_ARCHIVE_FILE
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR_SKIP_ARCHIVE_FILE setting "invalid"', msg.msg)
@override_settings(OCR_CLEAN="cleanme")
def test_invalid_ocr_clean(self) -> None:
"""
GIVEN:
- Default settings
- OCR cleaning type is invalid
WHEN:
- Settings are validated
THEN:
- system check error reported for OCR cleaning type
"""
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('OCR clean mode "cleanme"', msg.msg)
assert len(msgs) == 1
assert expected_msg in msgs[0].msg
class TestTimezoneSettingsChecks(DirectoriesMixin, TestCase):
@override_settings(TIME_ZONE="TheMoon\\MyCrater")
def test_invalid_timezone(self) -> None:
class TestTimezoneSettingsChecks:
def test_invalid_timezone(self, settings: SettingsWrapper) -> None:
"""
GIVEN:
- Default settings
@@ -185,17 +177,16 @@ class TestTimezoneSettingsChecks(DirectoriesMixin, TestCase):
THEN:
- system check error reported for timezone
"""
settings.TIME_ZONE = "TheMoon\\MyCrater"
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn('Timezone "TheMoon\\MyCrater"', msg.msg)
assert len(msgs) == 1
assert 'Timezone "TheMoon\\MyCrater"' in msgs[0].msg
class TestEmailCertSettingsChecks(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@override_settings(EMAIL_CERTIFICATE_FILE=Path("/tmp/not_actually_here.pem"))
def test_not_valid_file(self) -> None:
class TestEmailCertSettingsChecks:
def test_not_valid_file(self, settings: SettingsWrapper) -> None:
"""
GIVEN:
- Default settings
@@ -205,19 +196,22 @@ class TestEmailCertSettingsChecks(DirectoriesMixin, FileSystemAssertsMixin, Test
THEN:
- system check error reported for email certificate
"""
self.assertIsNotFile("/tmp/not_actually_here.pem")
cert_path = Path("/tmp/not_actually_here.pem")
assert not cert_path.is_file()
settings.EMAIL_CERTIFICATE_FILE = cert_path
msgs = settings_values_check(None)
self.assertEqual(len(msgs), 1)
msg = msgs[0]
self.assertIn("Email cert /tmp/not_actually_here.pem is not a file", msg.msg)
assert len(msgs) == 1
assert "Email cert /tmp/not_actually_here.pem is not a file" in msgs[0].msg
class TestAuditLogChecks(TestCase):
def test_was_enabled_once(self) -> None:
class TestAuditLogChecks:
def test_was_enabled_once(
self,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
"""
GIVEN:
- Audit log is not enabled
@@ -226,23 +220,18 @@ class TestAuditLogChecks(TestCase):
THEN:
- system check error reported for disabling audit log
"""
introspect_mock = mock.MagicMock()
settings.AUDIT_LOG_ENABLED = False
introspect_mock = mocker.MagicMock()
introspect_mock.introspection.table_names.return_value = ["auditlog_logentry"]
with override_settings(AUDIT_LOG_ENABLED=False):
with mock.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
):
msgs = audit_log_check(None)
mocker.patch.dict(
"paperless.checks.connections",
{"default": introspect_mock},
)
self.assertEqual(len(msgs), 1)
msgs = audit_log_check(None)
msg = msgs[0]
self.assertIn(
("auditlog table was found but audit log is disabled."),
msg.msg,
)
assert len(msgs) == 1
assert "auditlog table was found but audit log is disabled." in msgs[0].msg
DEPRECATED_VARS: dict[str, str] = {
@@ -271,20 +260,16 @@ class TestDeprecatedDbSettings:
@pytest.mark.parametrize(
("env_var", "db_option_key"),
[
("PAPERLESS_DB_TIMEOUT", "timeout"),
("PAPERLESS_DB_POOLSIZE", "pool.min_size / pool.max_size"),
("PAPERLESS_DBSSLMODE", "sslmode"),
("PAPERLESS_DBSSLROOTCERT", "sslrootcert"),
("PAPERLESS_DBSSLCERT", "sslcert"),
("PAPERLESS_DBSSLKEY", "sslkey"),
],
ids=[
"db-timeout",
"db-poolsize",
"ssl-mode",
"ssl-rootcert",
"ssl-cert",
"ssl-key",
pytest.param("PAPERLESS_DB_TIMEOUT", "timeout", id="db-timeout"),
pytest.param(
"PAPERLESS_DB_POOLSIZE",
"pool.min_size / pool.max_size",
id="db-poolsize",
),
pytest.param("PAPERLESS_DBSSLMODE", "sslmode", id="ssl-mode"),
pytest.param("PAPERLESS_DBSSLROOTCERT", "sslrootcert", id="ssl-rootcert"),
pytest.param("PAPERLESS_DBSSLCERT", "sslcert", id="ssl-cert"),
pytest.param("PAPERLESS_DBSSLKEY", "sslkey", id="ssl-key"),
],
)
def test_single_deprecated_var_produces_one_warning(
@@ -397,240 +382,3 @@ class TestDeprecatedDbSettings:
assert len(result) == 1
assert "PAPERLESS_DBSSLCERT" in result[0].msg
class TestV3MinimumUpgradeVersionCheck:
"""Test suite for check_v3_minimum_upgrade_version system check."""
@pytest.fixture
def build_conn_mock(self, mocker: MockerFixture):
"""Factory fixture that builds a connections['default'] mock.
Usage::
conn = build_conn_mock(tables=["django_migrations"], applied=["1075_..."])
"""
def _build(tables: list[str], applied: list[str]) -> mock.MagicMock:
conn = mocker.MagicMock()
conn.introspection.table_names.return_value = tables
cursor = conn.cursor.return_value.__enter__.return_value
cursor.fetchall.return_value = [(name,) for name in applied]
return conn
return _build
def test_no_migrations_table_fresh_install(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- No django_migrations table exists in the database
WHEN:
- The v3 upgrade check runs
THEN:
- No errors are reported (fresh install, nothing to enforce)
"""
mocker.patch.dict(
"paperless.checks.connections",
{"default": build_conn_mock([], [])},
)
assert check_v3_minimum_upgrade_version(None) == []
def test_no_documents_migrations_fresh_install(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- django_migrations table exists but has no documents app rows
WHEN:
- The v3 upgrade check runs
THEN:
- No errors are reported (fresh install, nothing to enforce)
"""
mocker.patch.dict(
"paperless.checks.connections",
{"default": build_conn_mock(["django_migrations"], [])},
)
assert check_v3_minimum_upgrade_version(None) == []
def test_v3_state_with_0001_squashed(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- 0001_squashed is recorded in django_migrations
WHEN:
- The v3 upgrade check runs
THEN:
- No errors are reported (DB is already in a valid v3 state)
"""
mocker.patch.dict(
"paperless.checks.connections",
{
"default": build_conn_mock(
["django_migrations"],
["0001_squashed", "0002_squashed", "0003_workflowaction_order"],
),
},
)
assert check_v3_minimum_upgrade_version(None) == []
def test_v3_state_with_0002_squashed_only(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- Only 0002_squashed is recorded in django_migrations
WHEN:
- The v3 upgrade check runs
THEN:
- No errors are reported (0002_squashed alone confirms a valid v3 state)
"""
mocker.patch.dict(
"paperless.checks.connections",
{"default": build_conn_mock(["django_migrations"], ["0002_squashed"])},
)
assert check_v3_minimum_upgrade_version(None) == []
def test_v2_20_9_state_ready_to_upgrade(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- 1075_workflowaction_order (the last v2.20.9 migration) is in the DB
WHEN:
- The v3 upgrade check runs
THEN:
- No errors are reported (squash will pick up cleanly from this state)
"""
mocker.patch.dict(
"paperless.checks.connections",
{
"default": build_conn_mock(
["django_migrations"],
[
"1074_workflowrun_deleted_at_workflowrun_restored_at_and_more",
"1075_workflowaction_order",
],
),
},
)
assert check_v3_minimum_upgrade_version(None) == []
def test_v2_20_8_raises_error(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- 1074 (last v2.20.8 migration) is applied but 1075 is not
WHEN:
- The v3 upgrade check runs
THEN:
- An Error with id paperless.E002 is returned
"""
mocker.patch.dict(
"paperless.checks.connections",
{
"default": build_conn_mock(
["django_migrations"],
["1074_workflowrun_deleted_at_workflowrun_restored_at_and_more"],
),
},
)
result = check_v3_minimum_upgrade_version(None)
assert len(result) == 1
assert isinstance(result[0], Error)
assert result[0].id == "paperless.E002"
def test_very_old_version_raises_error(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- Only old migrations (well below v2.20.9) are applied
WHEN:
- The v3 upgrade check runs
THEN:
- An Error with id paperless.E002 is returned
"""
mocker.patch.dict(
"paperless.checks.connections",
{
"default": build_conn_mock(
["django_migrations"],
["1000_update_paperless_all", "1022_paperlesstask"],
),
},
)
result = check_v3_minimum_upgrade_version(None)
assert len(result) == 1
assert isinstance(result[0], Error)
assert result[0].id == "paperless.E002"
def test_error_hint_mentions_v2_20_9(
self,
mocker: MockerFixture,
build_conn_mock,
) -> None:
"""
GIVEN:
- DB is on an old v2 version (pre-v2.20.9)
WHEN:
- The v3 upgrade check runs
THEN:
- The error hint explicitly references v2.20.9 so users know what to do
"""
mocker.patch.dict(
"paperless.checks.connections",
{"default": build_conn_mock(["django_migrations"], ["1022_paperlesstask"])},
)
result = check_v3_minimum_upgrade_version(None)
assert len(result) == 1
assert "v2.20.9" in result[0].hint
def test_db_error_is_swallowed(self, mocker: MockerFixture) -> None:
"""
GIVEN:
- A DatabaseError is raised when querying the DB
WHEN:
- The v3 upgrade check runs
THEN:
- No exception propagates and an empty list is returned
"""
from django.db import DatabaseError
conn = mocker.MagicMock()
conn.introspection.table_names.side_effect = DatabaseError("connection refused")
mocker.patch.dict("paperless.checks.connections", {"default": conn})
assert check_v3_minimum_upgrade_version(None) == []
def test_operational_error_is_swallowed(self, mocker: MockerFixture) -> None:
"""
GIVEN:
- An OperationalError is raised when querying the DB
WHEN:
- The v3 upgrade check runs
THEN:
- No exception propagates and an empty list is returned
"""
from django.db import OperationalError
conn = mocker.MagicMock()
conn.introspection.table_names.side_effect = OperationalError("DB unavailable")
mocker.patch.dict("paperless.checks.connections", {"default": conn})
assert check_v3_minimum_upgrade_version(None) == []

View File

@@ -9,35 +9,50 @@ from paperless.utils import ocr_to_dateparser_languages
@pytest.mark.parametrize(
("ocr_language", "expected"),
[
# One language
("eng", ["en"]),
# Multiple languages
("fra+ita+lao", ["fr", "it", "lo"]),
# Languages that don't have a two-letter equivalent
("fil", ["fil"]),
# Languages with a script part supported by dateparser
("aze_cyrl+srp_latn", ["az-Cyrl", "sr-Latn"]),
# Languages with a script part not supported by dateparser
# In this case, default to the language without script
("deu_frak", ["de"]),
# Traditional and simplified chinese don't have the same name in dateparser,
# so they're converted to the general chinese language
("chi_tra+chi_sim", ["zh"]),
# If a language is not supported by dateparser, fallback to the supported ones
("eng+unsupported_language+por", ["en", "pt"]),
# If no language is supported, fallback to default
("unsupported1+unsupported2", []),
# Duplicate languages, should not duplicate in result
("eng+eng", ["en"]),
# Language with script, but script is not mapped
("ita_unknownscript", ["it"]),
pytest.param("eng", ["en"], id="single-language"),
pytest.param("fra+ita+lao", ["fr", "it", "lo"], id="multiple-languages"),
pytest.param("fil", ["fil"], id="no-two-letter-equivalent"),
pytest.param(
"aze_cyrl+srp_latn",
["az-Cyrl", "sr-Latn"],
id="script-supported-by-dateparser",
),
pytest.param(
"deu_frak",
["de"],
id="script-not-supported-falls-back-to-language",
),
pytest.param(
"chi_tra+chi_sim",
["zh"],
id="chinese-variants-collapse-to-general",
),
pytest.param(
"eng+unsupported_language+por",
["en", "pt"],
id="unsupported-language-skipped",
),
pytest.param(
"unsupported1+unsupported2",
[],
id="all-unsupported-returns-empty",
),
pytest.param("eng+eng", ["en"], id="duplicates-deduplicated"),
pytest.param(
"ita_unknownscript",
["it"],
id="unknown-script-falls-back-to-language",
),
],
)
def test_ocr_to_dateparser_languages(ocr_language, expected):
def test_ocr_to_dateparser_languages(ocr_language: str, expected: list[str]) -> None:
assert sorted(ocr_to_dateparser_languages(ocr_language)) == sorted(expected)
def test_ocr_to_dateparser_languages_exception(monkeypatch, caplog):
def test_ocr_to_dateparser_languages_exception(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
# Patch LocaleDataLoader.get_locale_map to raise an exception
class DummyLoader:
def get_locale_map(self, locales=None):

View File

@@ -1,24 +1,31 @@
import tempfile
from pathlib import Path
from django.test import override_settings
from django.test import Client
from pytest_django.fixtures import SettingsWrapper
def test_favicon_view(client):
with tempfile.TemporaryDirectory() as tmpdir:
static_dir = Path(tmpdir)
favicon_path = static_dir / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True, exist_ok=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
def test_favicon_view(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
favicon_path = tmp_path / "paperless" / "img" / "favicon.ico"
favicon_path.parent.mkdir(parents=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
with override_settings(STATIC_ROOT=static_dir):
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
def test_favicon_view_missing_file(client):
with override_settings(STATIC_ROOT=Path(tempfile.mkdtemp())):
response = client.get("/favicon.ico")
assert response.status_code == 404
def test_favicon_view_missing_file(
client: Client,
tmp_path: Path,
settings: SettingsWrapper,
) -> None:
settings.STATIC_ROOT = tmp_path
response = client.get("/favicon.ico")
assert response.status_code == 404

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.2.11 on 2026-03-03 16:27
# Generated by Django 5.2.9 on 2026-01-20 18:46
import django.db.models.deletion
import django.utils.timezone
@@ -15,50 +15,6 @@ class Migration(migrations.Migration):
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
replaces = [
("paperless_mail", "0001_initial"),
("paperless_mail", "0001_initial_squashed_0009_mailrule_assign_tags"),
("paperless_mail", "0002_auto_20201117_1334"),
("paperless_mail", "0003_auto_20201118_1940"),
("paperless_mail", "0004_mailrule_order"),
("paperless_mail", "0005_help_texts"),
("paperless_mail", "0006_auto_20210101_2340"),
("paperless_mail", "0007_auto_20210106_0138"),
("paperless_mail", "0008_auto_20210516_0940"),
("paperless_mail", "0009_alter_mailrule_action_alter_mailrule_folder"),
("paperless_mail", "0009_mailrule_assign_tags"),
("paperless_mail", "0010_auto_20220311_1602"),
("paperless_mail", "0011_remove_mailrule_assign_tag"),
(
"paperless_mail",
"0011_remove_mailrule_assign_tag_squashed_0024_alter_mailrule_name_and_more",
),
("paperless_mail", "0012_alter_mailrule_assign_tags"),
("paperless_mail", "0013_merge_20220412_1051"),
("paperless_mail", "0014_alter_mailrule_action"),
("paperless_mail", "0015_alter_mailrule_action"),
("paperless_mail", "0016_mailrule_consumption_scope"),
("paperless_mail", "0017_mailaccount_owner_mailrule_owner"),
("paperless_mail", "0018_processedmail"),
("paperless_mail", "0019_mailrule_filter_to"),
("paperless_mail", "0020_mailaccount_is_token"),
("paperless_mail", "0021_alter_mailaccount_password"),
("paperless_mail", "0022_mailrule_assign_owner_from_rule_and_more"),
("paperless_mail", "0023_remove_mailrule_filter_attachment_filename_and_more"),
("paperless_mail", "0024_alter_mailrule_name_and_more"),
(
"paperless_mail",
"0025_alter_mailaccount_owner_alter_mailrule_owner_and_more",
),
("paperless_mail", "0026_mailrule_enabled"),
(
"paperless_mail",
"0027_mailaccount_expiration_mailaccount_account_type_and_more",
),
("paperless_mail", "0028_alter_mailaccount_password_and_more"),
("paperless_mail", "0029_mailrule_pdf_layout"),
]
operations = [
migrations.CreateModel(
name="MailAccount",

View File

@@ -6,7 +6,7 @@ from django.db import models
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0001_squashed"),
("paperless_mail", "0001_initial"),
]
operations = [