Files
paperless-ngx/src/documents/models.py
T
Trenton H 8e67828bd7 Feature: Redesign the task system (#12584)
* feat(tasks): replace PaperlessTask model with structured redesign

Drop the old string-based PaperlessTask table and recreate it with
Status/TaskType/TriggerSource enums, JSONField result storage, and
duration tracking fields. Update all call sites to use the new API.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): rewrite signal handlers to track all task types

Replace the old consume_file-only handler with a full rewrite that tracks
6 task types (consume_file, train_classifier, sanity_check, index_optimize,
llm_index, mail_fetch) with proper trigger source detection, input data
extraction, legacy result string parsing, duration/wait time recording,
and structured error capture on failure.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): add traceback and revoked state coverage to signal tests

* refactor(tasks): remove manual PaperlessTask creation and scheduled/auto params

All task records are now created exclusively via Celery signals (Task 2).
Removed PaperlessTask creation/update from train_classifier, sanity_check,
llmindex_index, and check_sanity. Removed scheduled= and auto= parameters
from all 7 call sites. Updated apply_async callers to use trigger_source
headers instead. Exceptions now propagate naturally from task functions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): auto-inject trigger_source=scheduled header for all beat tasks

Inject `headers: {"trigger_source": "scheduled"}` into every Celery beat
schedule entry so signal handlers can identify scheduler-originated tasks
without per-task instrumentation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update serializer, filter, and viewset with v9 backwards compat

- Replace TasksViewSerializer/RunTaskViewSerializer with TaskSerializerV10
  (new field names), TaskSerializerV9 (v9 compat), TaskSummarySerializer,
  and RunTaskSerializer
- Add AcknowledgeTasksViewSerializer unchanged (kept existing validation)
- Expand PaperlessTaskFilterSet with MultipleChoiceFilter for task_type,
  trigger_source, status; add is_complete, date_created_after/before filters
- Replace TasksViewSet.get_serializer_class() to branch on request.version
- Add get_queryset() v9 compat for task_name/type query params
- Add acknowledge_all, summary, active actions to TasksViewSet
- Rewrite run action to use apply_async with trigger_source header
- Add timedelta import to views.py; add MultipleChoiceFilter/DateTimeFilter
  to filters.py imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): add read_only_fields to TaskSerializerV9, enforce admin via permission_classes on run action

* test(tasks): rewrite API task tests for redesigned model and v9 compat

Replaces the old Django TestCase-based tests with pytest-style classes using
PaperlessTaskFactory. Covers v10 field names, v9 backwards-compat field
mapping, filtering, ordering, acknowledge, acknowledge_all, summary, active,
and run endpoints. Also adds PaperlessTaskFactory to factories.py and fixes
a redundant source= kwarg in TaskSerializerV10.related_document_ids.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): fix two spec gaps in task API test suite

Move test_list_is_owner_aware to TestGetTasksV10 (it tests GET /api/tasks/,
not acknowledge). Add test_related_document_ids_includes_duplicate_of to
cover the duplicate_of path in the related_document_ids property.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): address code quality review findings

Remove trivial field-existence tests per project conventions. Fix
potentially flaky ordering test to use explicit date_created values.
Add is_complete=false filter test, v9 type filter input direction test,
and tighten TestActive second test to target REVOKED specifically.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update TaskAdmin for redesigned model

Add date_created, duration_seconds to list_display; add trigger_source
to list_filter; add input_data, duration_seconds, wait_time_seconds to
readonly_fields.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): update Angular types and service for task redesign

Replace PaperlessTaskName/PaperlessTaskType/PaperlessTaskStatus enums
with new PaperlessTaskType, PaperlessTaskTriggerSource, PaperlessTaskStatus
enums. Update PaperlessTask interface to new field names (task_type,
trigger_source, input_data, result_message, related_document_ids).
Update TasksService to filter by task_type instead of task_name.
Update tasks component and system-status-dialog to use new field names.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore(tasks): remove django-celery-results

PaperlessTask now tracks all task results via Celery signals. The
django-celery-results DB backend was write-only -- nothing reads
from it. Drop the package and add a migration to clean up the
orphaned tables.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test: fix remaining tests broken by task system redesign

Update all tests that created PaperlessTask objects with old field names
to use PaperlessTaskFactory and new field names (task_type, trigger_source,
status, result_message). Use apply_async instead of delay where mocked.
Drop TestCheckSanityTaskRecording — tests PaperlessTask creation that was
intentionally removed from check_sanity().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): improve test_api_tasks.py structure and add api marker

- Move admin_client, v9_client, user_client fixtures to conftest.py so
  they can be reused by other API tests; all three now build on the
  rest_api_client fixture instead of creating APIClient() directly
- Move regular_user fixture to conftest.py (was already done, now also
  used by the new client fixtures)
- Add docstrings to every test method describing the behaviour under test
- Move timedelta/timezone imports to module level
- Register 'api' pytest marker in pyproject.toml and apply pytestmark to
  the entire file so all 40 tests are selectable via -m api

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(tasks): simplify task tracking code after redesign

- Extract COMPLETE_STATUSES as a class constant on PaperlessTask,
  eliminating the repeated status tuple across models.py, views.py (3×),
  and filters.py
- Extract _CELERY_STATE_TO_STATUS as a module-level constant instead of
  rebuilding the dict on every task_postrun
- Extract _V9_TYPE_TO_TRIGGER_SOURCE and _RUNNABLE_TASKS as class
  constants on TasksViewSet instead of rebuilding on every request
- Extract _TRIGGER_SOURCE_TO_V9_TYPE as a class constant on
  TaskSerializerV9 instead of rebuilding per serialized object
- Extract _get_consume_args helper to deduplicate identical arg
  extraction logic in _extract_input_data, _determine_trigger_source,
  and _extract_owner_id
- Move inline imports (re, traceback) and Avg to module level
- Fix _DOCUMENT_SOURCE_TO_TRIGGER type annotation key type to
  DocumentSource instead of Any
- Remove redundant truthiness checks in SystemStatusView branches
  already guarded by an is-None check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(tasks): add docstrings and rename _parse_legacy_result

- Add docstrings to _extract_input_data, _determine_trigger_source,
  _extract_owner_id explaining what each helper does and why
- Rename _parse_legacy_result -> _parse_consume_result: the function
  parses current consume_file string outputs (consumer.py returns
  "New document id N created" and "It is a duplicate of X (#N)"),
  not legacy data; the old name was misleading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(tasks): extend and harden the task system redesign

- TaskType: add EMPTY_TRASH, CHECK_WORKFLOWS, CLEANUP_SHARE_LINKS;
  remove INDEX_REBUILD (no backing task — beat schedule uses index_optimize)
- TRACKED_TASKS: wire up all nine task types including the three new ones
  and llmindex_index / process_mail_accounts
- Add task_revoked_handler so cancelled/expired tasks are marked REVOKED
- Fix double-write: task_postrun_handler no longer overwrites result_data
  when status is already FAILURE (task_failure_handler owns that write)
- v9 serialiser: map EMAIL_CONSUME and FOLDER_CONSUME to AUTO_TASK
- views: scope task list to owner for regular users, admins see all;
  validate ?days= query param and return 400 on bad input
- tests: add test_list_admin_sees_all_tasks; rename/fix
  test_parses_duplicate_string (duplicates produce SUCCESS, not FAILURE);
  use PaperlessTaskFactory in modified tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): fix MAIL_FETCH null input_data and postrun double-query

- _extract_input_data: return {} instead of {"account_ids": None} when
  process_mail_accounts is called without an explicit account list (the
  normal beat-scheduled path); add test to cover this path
- task_postrun_handler: replace filter().first() + filter().update() with
  get() + save(update_fields=[...]) — single fetch, single write,
  consistent with task_prerun_handler

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): add queryset stub to satisfy drf-spectacular schema generation

TasksViewSet.get_queryset() accesses request.user, which drf-spectacular
cannot provide during static schema generation.  Adding a class-level
queryset = PaperlessTask.objects.none() gives spectacular a model to
introspect without invoking get_queryset(), eliminating both warnings
and the test_valid_schema failure.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): fill coverage gaps in task system

- test_task_signals: add TestTaskRevokedHandler (marks REVOKED, ignores
  None request, ignores unknown id); switch existing direct
  PaperlessTask.objects.create calls to PaperlessTaskFactory; import
  pytest_mock and use MockerFixture typing on mocker params
- test_api_tasks: add test_rejects_invalid_days_param to TestSummary
- tasks.service.spec: add dismissAllTasks test (POST acknowledge_all +
  reload)
- models: add pragma: no cover to __str__, is_complete, and
  related_document_ids (trivial delegates, covered indirectly)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Well, that was a bad push.

* Fixes v9 API compatability with testing coverage

* fix(tasks): restore INDEX_OPTIMIZE enum and remove no-op run button

INDEX_OPTIMIZE was dropped from the TaskType enum but still referenced
in _RUNNABLE_TASKS (views.py) and the frontend system-status-dialog,
causing an AttributeError at import time. Restore the enum value in the
model and migration so the serializer accepts it, but remove it from
_RUNNABLE_TASKS since index_optimize is a Tantivy no-op. Remove the
frontend "Run Task" button for index optimization accordingly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): v9 type filter now matches all equivalent trigger sources

The v9 ?type= query param mapped each value to a single TriggerSource,
but the serializer maps multiple sources to the same v9 type value.
A task serialized as "auto_task" would not appear when filtering by
?type=auto_task if its trigger_source was email_consume or
folder_consume. Same issue for "manual_task" missing web_ui and
api_upload sources. Changed to trigger_source__in with the full set
of sources for each v9 type value.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): give task_failure_handler full ownership of FAILURE path

task_postrun_handler now early-returns for FAILURE states instead of
redundantly writing status and date_done. task_failure_handler now
computes duration_seconds and wait_time_seconds so failed tasks get
complete timing data. This eliminates a wasted .get() + .save() round
trip on every failed task and gives each handler a clean, non-overlapping
responsibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tasks): resolve trigger_source header via TriggerSource enum lookup

Replace two hardcoded string comparisons ("scheduled", "system") with a
single TriggerSource(header_source) lookup so the enum values are the
single source of truth. Any valid TriggerSource DB value passed in the
header is accepted; invalid values fall through to the document-source /
MANUAL logic. Update tests to pass enum values in headers rather than raw
strings, and add a test for the invalid-header fallback path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(tasks): use TriggerSource enum values at all apply_async call sites

Replace raw strings ("system", "manual") with PaperlessTask.TriggerSource
enum values in the three callers that can import models. The settings
file remains a raw string (models cannot be imported at settings load
time) with a comment pointing to the enum value it must match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(tasks): parametrize repetitive test cases in task test files

test_api_tasks.py:
- Collapse six trigger_source->v9-type tests into one parametrized test,
  adding the previously untested API_UPLOAD case
- Collapse three task_name mapping tests (two remaps + pass-through)
  into one parametrized test
- Collapse two acknowledge_all status tests into one parametrized test
- Collapse two run-endpoint 400 tests into one parametrized test
- Update run/ assertions to use TriggerSource enum values

test_task_signals.py:
- Collapse three trigger_source header tests into one parametrized test
- Collapse two DocumentSource->TriggerSource mapping tests into one
  parametrized test
- Collapse two prerun ignore-invalid-id tests into one parametrized test

All parametrize cases use pytest.param with descriptive ids.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Handle JSON serialization for datetime and Path.  Further restrist the v9 permissions as Copilot suggests

* That should fix the generated schema/browser

* Use XSerializer for the schema

* A few more basic cases I see no value in covering

* Drops the migration related stuff too.  Just in case we want it again or it confuses people

* fix: annotate tasks_summary_retrieve as array of TaskSummarySerializer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: annotate tasks_active_retrieve as array of TaskSerializerV10

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Restore task running to superuser only

* Removes the acknowledge/dismiss all stuff

* Aligns v10 and v9 task permissions with each other

* Short blurb just to warn users about the tasks being cleared

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 09:28:41 -07:00

1894 lines
52 KiB
Python

import datetime
from pathlib import Path
from typing import Final
import pathvalidate
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator
from django.core.validators import MinValueValidator
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from multiselectfield import MultiSelectField
from treenode.models import TreeNodeModel
if settings.AUDIT_LOG_ENABLED:
from auditlog.registry import auditlog
from django.db.models import Case
from django.db.models import PositiveIntegerField
from django.db.models.functions import Cast
from django.db.models.functions import Length
from django.db.models.functions import Substr
from django_softdelete.models import SoftDeleteModel
from documents.data_models import DocumentSource
from documents.parsers import get_default_file_extension
class ModelWithOwner(models.Model):
owner = models.ForeignKey(
User,
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
verbose_name=_("owner"),
)
class Meta:
abstract = True
class MatchingModel(ModelWithOwner):
MATCH_NONE = 0
MATCH_ANY = 1
MATCH_ALL = 2
MATCH_LITERAL = 3
MATCH_REGEX = 4
MATCH_FUZZY = 5
MATCH_AUTO = 6
MATCHING_ALGORITHMS = (
(MATCH_NONE, _("None")),
(MATCH_ANY, _("Any word")),
(MATCH_ALL, _("All words")),
(MATCH_LITERAL, _("Exact match")),
(MATCH_REGEX, _("Regular expression")),
(MATCH_FUZZY, _("Fuzzy word")),
(MATCH_AUTO, _("Automatic")),
)
name = models.CharField(_("name"), max_length=128)
match = models.CharField(_("match"), max_length=256, blank=True)
matching_algorithm = models.PositiveSmallIntegerField(
_("matching algorithm"),
choices=MATCHING_ALGORITHMS,
default=MATCH_ANY,
)
is_insensitive = models.BooleanField(_("is insensitive"), default=True)
class Meta(ModelWithOwner.Meta):
abstract = True
ordering = ("name",)
constraints = [
models.UniqueConstraint(
fields=["name", "owner"],
name="%(app_label)s_%(class)s_unique_name_owner",
),
models.UniqueConstraint(
name="%(app_label)s_%(class)s_name_uniq",
fields=["name"],
condition=models.Q(owner__isnull=True),
),
]
def __str__(self):
return self.name
class Correspondent(MatchingModel):
class Meta(MatchingModel.Meta):
verbose_name = _("correspondent")
verbose_name_plural = _("correspondents")
class Tag(MatchingModel, TreeNodeModel):
color = models.CharField(_("color"), max_length=7, default="#a6cee3")
# Maximum allowed nesting depth for tags (root = 1, max depth = 5)
MAX_NESTING_DEPTH: Final[int] = 5
is_inbox_tag = models.BooleanField(
_("is inbox tag"),
default=False,
help_text=_(
"Marks this tag as an inbox tag: All newly consumed "
"documents will be tagged with inbox tags.",
),
)
class Meta(MatchingModel.Meta, TreeNodeModel.Meta):
verbose_name = _("tag")
verbose_name_plural = _("tags")
def clean(self) -> None:
# Prevent self-parenting and assigning a descendant as parent
parent = self.get_parent()
if parent == self:
raise ValidationError({"parent": _("Cannot set itself as parent.")})
if parent and self.pk is not None and self.is_ancestor_of(parent):
raise ValidationError({"parent": _("Cannot set parent to a descendant.")})
# Enforce maximum nesting depth
new_parent_depth = 0
if parent:
new_parent_depth = parent.get_ancestors_count() + 1
height = 0 if self.pk is None else self.get_depth()
deepest_new_depth = (new_parent_depth + 1) + height
if deepest_new_depth > self.MAX_NESTING_DEPTH:
raise ValidationError({"parent": _("Maximum nesting depth exceeded.")})
return super().clean()
class DocumentType(MatchingModel):
class Meta(MatchingModel.Meta):
verbose_name = _("document type")
verbose_name_plural = _("document types")
class StoragePath(MatchingModel):
path = models.TextField(
_("path"),
)
class Meta(MatchingModel.Meta):
verbose_name = _("storage path")
verbose_name_plural = _("storage paths")
class Document(SoftDeleteModel, ModelWithOwner): # type: ignore[django-manager-missing]
MAX_STORED_FILENAME_LENGTH: Final[int] = 1024
correspondent = models.ForeignKey(
Correspondent,
blank=True,
null=True,
related_name="documents",
on_delete=models.SET_NULL,
verbose_name=_("correspondent"),
)
storage_path = models.ForeignKey(
StoragePath,
blank=True,
null=True,
related_name="documents",
on_delete=models.SET_NULL,
verbose_name=_("storage path"),
)
title = models.CharField(_("title"), max_length=128, blank=True, db_index=True)
document_type = models.ForeignKey(
DocumentType,
blank=True,
null=True,
related_name="documents",
on_delete=models.SET_NULL,
verbose_name=_("document type"),
)
content = models.TextField(
_("content"),
blank=True,
help_text=_(
"The raw, text-only data of the document. This field is "
"primarily used for searching.",
),
)
content_length = models.GeneratedField(
expression=Length("content"),
output_field=PositiveIntegerField(default=0),
db_persist=True,
null=False,
serialize=False,
help_text="Length of the content field in characters. Automatically maintained by the database for faster statistics computation.",
)
mime_type = models.CharField(_("mime type"), max_length=256, editable=False)
tags = models.ManyToManyField(
Tag,
related_name="documents",
blank=True,
verbose_name=_("tags"),
)
checksum = models.CharField(
_("checksum"),
max_length=64,
editable=False,
help_text=_("The checksum of the original document."),
)
archive_checksum = models.CharField(
_("archive checksum"),
max_length=64,
editable=False,
blank=True,
null=True,
help_text=_("The checksum of the archived document."),
)
page_count = models.PositiveIntegerField(
_("page count"),
blank=False,
null=True,
unique=False,
db_index=False,
validators=[MinValueValidator(1)],
help_text=_(
"The number of pages of the document.",
),
)
created = models.DateField(
_("created"),
default=datetime.date.today,
db_index=True,
)
modified = models.DateTimeField(
_("modified"),
auto_now=True,
editable=False,
db_index=True,
)
added = models.DateTimeField(
_("added"),
default=timezone.now,
editable=False,
db_index=True,
)
filename = models.FilePathField(
_("filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=True,
null=True,
help_text=_("Current filename in storage"),
)
archive_filename = models.FilePathField(
_("archive filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=True,
null=True,
help_text=_("Current archive filename in storage"),
)
original_filename = models.CharField(
_("original filename"),
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=False,
null=True,
help_text=_("The original name of the file when it was uploaded"),
)
ARCHIVE_SERIAL_NUMBER_MIN: Final[int] = 0
ARCHIVE_SERIAL_NUMBER_MAX: Final[int] = 0xFF_FF_FF_FF
archive_serial_number = models.PositiveIntegerField(
_("archive serial number"),
blank=True,
null=True,
unique=True,
db_index=True,
validators=[
MaxValueValidator(ARCHIVE_SERIAL_NUMBER_MAX),
MinValueValidator(ARCHIVE_SERIAL_NUMBER_MIN),
],
help_text=_(
"The position of this document in your physical document archive.",
),
)
root_document = models.ForeignKey(
"self",
blank=True,
null=True,
related_name="versions",
on_delete=models.CASCADE,
verbose_name=_("root document for this version"),
)
version_index = models.PositiveIntegerField(
_("version index"),
blank=True,
null=True,
db_index=True,
help_text=_("Index of this version within the root document."),
)
version_label = models.CharField(
_("version label"),
max_length=64,
blank=True,
null=True,
help_text=_("Optional short label for a document version."),
)
class Meta:
ordering = ("-created",)
verbose_name = _("document")
verbose_name_plural = _("documents")
constraints = [
models.UniqueConstraint(
fields=["root_document", "version_index"],
condition=models.Q(
root_document__isnull=False,
version_index__isnull=False,
),
name="documents_document_root_version_index_uniq",
),
]
def __str__(self) -> str:
created = self.created.isoformat()
res = f"{created}"
if self.correspondent:
res += f" {self.correspondent}"
if self.title:
res += f" {self.title}"
return res
def get_effective_content(self) -> str | None:
"""
Returns the effective content for the document.
For root documents, this is the latest version's content when available.
For version documents, this is always the document's own content.
If the queryset already annotated ``effective_content``, that value is used.
"""
if hasattr(self, "effective_content"):
return getattr(self, "effective_content")
if self.root_document_id is not None or self.pk is None:
return self.content
prefetched_cache = getattr(self, "_prefetched_objects_cache", None)
prefetched_versions = (
prefetched_cache.get("versions")
if isinstance(prefetched_cache, dict)
else None
)
if prefetched_versions is not None:
# Empty list means prefetch ran and found no versions — use own content.
if not prefetched_versions:
return self.content
latest_prefetched = max(prefetched_versions, key=lambda doc: doc.id)
return latest_prefetched.content
latest_version_content = (
Document.objects.filter(root_document=self)
.order_by("-id")
.values_list("content", flat=True)
.first()
)
return (
latest_version_content
if latest_version_content is not None
else self.content
)
@property
def suggestion_content(self):
"""
Returns the document text used to generate suggestions.
If the document content length exceeds a specified limit,
the text is cropped to include the start and end segments.
Otherwise, the full content is returned.
This improves processing speed for large documents while keeping
enough context for accurate suggestions.
"""
effective_content = self.get_effective_content()
if not effective_content or len(effective_content) <= 1200000:
return effective_content
else:
# Use 80% from the start and 20% from the end
# to preserve both opening and closing context.
head_len = 800000
tail_len = 200000
return " ".join(
(
effective_content[:head_len],
effective_content[-tail_len:],
),
)
@property
def source_path(self) -> Path:
fname = str(self.filename) if self.filename else f"{self.pk:07}{self.file_type}"
return (settings.ORIGINALS_DIR / Path(fname)).resolve()
@property
def source_file(self):
return Path(self.source_path).open("rb")
@property
def has_archive_version(self) -> bool:
return self.archive_filename is not None
@property
def archive_path(self) -> Path | None:
if self.has_archive_version:
return (settings.ARCHIVE_DIR / Path(str(self.archive_filename))).resolve()
else:
return None
@property
def archive_file(self):
return Path(self.archive_path).open("rb")
def get_public_filename(self, *, archive=False, counter=0, suffix=None) -> str:
"""
Returns a sanitized filename for the document, not including any paths.
"""
result = str(self)
if counter:
result += f"_{counter:02}"
if suffix:
result += suffix
if archive:
result += ".pdf"
else:
result += self.file_type
return pathvalidate.sanitize_filename(result, replacement_text="-")
@property
def file_type(self):
return get_default_file_extension(self.mime_type)
@property
def thumbnail_path(self) -> Path:
webp_file_name = f"{self.pk:07}.webp"
webp_file_path = settings.THUMBNAIL_DIR / Path(webp_file_name)
return webp_file_path.resolve()
@property
def thumbnail_file(self):
return Path(self.thumbnail_path).open("rb")
@property
def created_date(self):
return self.created
def add_nested_tags(self, tags) -> None:
tag_ids = set()
for tag in tags:
tag_ids.add(tag.id)
tag_ids.update(tag.get_ancestors_pks())
tags_to_add = self.tags.model.objects.filter(id__in=tag_ids)
self.tags.add(*tags_to_add)
def delete(
self,
*args,
**kwargs,
):
# If deleting a root document, move all its versions to trash as well.
if self.root_document_id is None:
Document.objects.filter(root_document=self).delete()
return super().delete(
*args,
**kwargs,
)
class SavedView(ModelWithOwner):
class DisplayMode(models.TextChoices):
TABLE = ("table", _("Table"))
SMALL_CARDS = ("smallCards", _("Small Cards"))
LARGE_CARDS = ("largeCards", _("Large Cards"))
class DisplayFields(models.TextChoices):
TITLE = ("title", _("Title"))
CREATED = ("created", _("Created"))
ADDED = ("added", _("Added"))
TAGS = ("tag"), _("Tags")
CORRESPONDENT = ("correspondent", _("Correspondent"))
DOCUMENT_TYPE = ("documenttype", _("Document Type"))
STORAGE_PATH = ("storagepath", _("Storage Path"))
NOTES = ("note", _("Note"))
OWNER = ("owner", _("Owner"))
SHARED = ("shared", _("Shared"))
ASN = ("asn", _("ASN"))
PAGE_COUNT = ("pagecount", _("Pages"))
CUSTOM_FIELD = ("custom_field_%d", ("Custom Field"))
name = models.CharField(_("name"), max_length=128)
sort_field = models.CharField(
_("sort field"),
max_length=128,
null=True,
blank=True,
)
sort_reverse = models.BooleanField(_("sort reverse"), default=False)
page_size = models.PositiveIntegerField(
_("View page size"),
null=True,
blank=True,
validators=[MinValueValidator(1)],
)
display_mode = models.CharField(
max_length=128,
verbose_name=_("View display mode"),
choices=DisplayMode.choices,
null=True,
blank=True,
)
display_fields = models.JSONField(
verbose_name=_("Document display fields"),
null=True,
blank=True,
)
class Meta:
ordering = ("name",)
verbose_name = _("saved view")
verbose_name_plural = _("saved views")
def __str__(self):
return f"SavedView {self.name}"
class SavedViewFilterRule(models.Model):
RULE_TYPES = [
(0, _("title contains")),
(1, _("content contains")),
(2, _("ASN is")),
(3, _("correspondent is")),
(4, _("document type is")),
(5, _("is in inbox")),
(6, _("has tag")),
(7, _("has any tag")),
(8, _("created before")),
(9, _("created after")),
(10, _("created year is")),
(11, _("created month is")),
(12, _("created day is")),
(13, _("added before")),
(14, _("added after")),
(15, _("modified before")),
(16, _("modified after")),
(17, _("does not have tag")),
(18, _("does not have ASN")),
(19, _("title or content contains")),
(20, _("fulltext query")),
(21, _("more like this")),
(22, _("has tags in")),
(23, _("ASN greater than")),
(24, _("ASN less than")),
(25, _("storage path is")),
(26, _("has correspondent in")),
(27, _("does not have correspondent in")),
(28, _("has document type in")),
(29, _("does not have document type in")),
(30, _("has storage path in")),
(31, _("does not have storage path in")),
(32, _("owner is")),
(33, _("has owner in")),
(34, _("does not have owner")),
(35, _("does not have owner in")),
(36, _("has custom field value")),
(37, _("is shared by me")),
(38, _("has custom fields")),
(39, _("has custom field in")),
(40, _("does not have custom field in")),
(41, _("does not have custom field")),
(42, _("custom fields query")),
(43, _("created to")),
(44, _("created from")),
(45, _("added to")),
(46, _("added from")),
(47, _("mime type is")),
(48, _("simple title search")),
(49, _("simple text search")),
]
saved_view = models.ForeignKey(
SavedView,
on_delete=models.CASCADE,
related_name="filter_rules",
verbose_name=_("saved view"),
)
rule_type = models.PositiveSmallIntegerField(_("rule type"), choices=RULE_TYPES)
value = models.CharField(_("value"), max_length=255, blank=True, null=True)
class Meta:
verbose_name = _("filter rule")
verbose_name_plural = _("filter rules")
def __str__(self) -> str:
return f"SavedViewFilterRule: {self.rule_type} : {self.value}"
# Extending User Model Using a One-To-One Link
class UiSettings(models.Model):
user = models.OneToOneField(
User,
on_delete=models.CASCADE,
related_name="ui_settings",
)
settings = models.JSONField(null=True)
def __str__(self):
return self.user.username
class PaperlessTask(ModelWithOwner):
"""
Tracks background task execution for user visibility and debugging.
State transitions:
PENDING -> STARTED -> SUCCESS
PENDING -> STARTED -> FAILURE
PENDING -> REVOKED (if cancelled before starting)
"""
class Status(models.TextChoices):
PENDING = "pending", _("Pending")
STARTED = "started", _("Started")
SUCCESS = "success", _("Success")
FAILURE = "failure", _("Failure")
REVOKED = "revoked", _("Revoked")
class TaskType(models.TextChoices):
CONSUME_FILE = "consume_file", _("Consume File")
TRAIN_CLASSIFIER = "train_classifier", _("Train Classifier")
SANITY_CHECK = "sanity_check", _("Sanity Check")
INDEX_OPTIMIZE = "index_optimize", _("Index Optimize")
MAIL_FETCH = "mail_fetch", _("Mail Fetch")
LLM_INDEX = "llm_index", _("LLM Index")
EMPTY_TRASH = "empty_trash", _("Empty Trash")
CHECK_WORKFLOWS = "check_workflows", _("Check Workflows")
BULK_UPDATE = "bulk_update", _("Bulk Update")
REPROCESS_DOCUMENT = "reprocess_document", _("Reprocess Document")
BUILD_SHARE_LINK = "build_share_link", _("Build Share Link")
BULK_DELETE = "bulk_delete", _("Bulk Delete")
COMPLETE_STATUSES = (
Status.SUCCESS,
Status.FAILURE,
Status.REVOKED,
)
class TriggerSource(models.TextChoices):
SCHEDULED = "scheduled", _("Scheduled") # Celery beat
WEB_UI = "web_ui", _("Web UI") # Document uploaded via web
API_UPLOAD = "api_upload", _("API Upload") # Document uploaded via API
FOLDER_CONSUME = "folder_consume", _("Folder Consume") # Consume folder
EMAIL_CONSUME = "email_consume", _("Email Consume") # Email attachment
SYSTEM = "system", _("System") # Auto-triggered (self-heal, config side-effect)
MANUAL = "manual", _("Manual") # User explicitly ran via /api/tasks/run/
# Identification
task_id = models.CharField(
max_length=72,
unique=True,
verbose_name=_("Task ID"),
help_text=_("Celery task ID"),
)
task_type = models.CharField(
max_length=50,
choices=TaskType.choices,
verbose_name=_("Task Type"),
help_text=_("The kind of work being performed"),
db_index=True,
)
trigger_source = models.CharField(
max_length=50,
choices=TriggerSource.choices,
verbose_name=_("Trigger Source"),
help_text=_("What initiated this task"),
db_index=True,
)
# State tracking
status = models.CharField(
max_length=30,
choices=Status.choices,
default=Status.PENDING,
verbose_name=_("Status"),
db_index=True,
)
# Timestamps
date_created = models.DateTimeField(
default=timezone.now,
verbose_name=_("Created"),
db_index=True,
)
date_started = models.DateTimeField(
null=True,
blank=True,
verbose_name=_("Started"),
)
date_done = models.DateTimeField(
null=True,
blank=True,
verbose_name=_("Completed"),
db_index=True,
)
# Duration fields -- populated by task_postrun signal handler
duration_seconds = models.FloatField(
null=True,
blank=True,
verbose_name=_("Duration (seconds)"),
help_text=_("Elapsed time from start to completion"),
)
wait_time_seconds = models.FloatField(
null=True,
blank=True,
verbose_name=_("Wait Time (seconds)"),
help_text=_("Time from task creation to worker pickup"),
)
# Input/Output data
input_data = models.JSONField(
default=dict,
blank=True,
verbose_name=_("Input Data"),
help_text=_("Structured input parameters for the task"),
)
result_data = models.JSONField(
null=True,
blank=True,
verbose_name=_("Result Data"),
help_text=_("Structured result data from task execution"),
)
result_message = models.TextField(
null=True,
blank=True,
verbose_name=_("Result Message"),
help_text=_("Human-readable result message"),
)
# Acknowledgment
acknowledged = models.BooleanField(
default=False,
verbose_name=_("Acknowledged"),
db_index=True,
)
class Meta:
verbose_name = _("Task")
verbose_name_plural = _("Tasks")
ordering = ["-date_created"]
indexes = [
models.Index(fields=["status", "date_created"]),
models.Index(fields=["task_type", "status"]),
models.Index(fields=["owner", "acknowledged", "date_created"]),
]
def __str__(self) -> str: # pragma: no cover
return f"{self.get_task_type_display()} [{self.task_id[:8]}]"
@property
def is_complete(self) -> bool: # pragma: no cover
return self.status in self.COMPLETE_STATUSES
@property
def related_document_ids(self) -> list[int]: # pragma: no cover
if not self.result_data:
return []
if doc_id := self.result_data.get("document_id"):
return [doc_id]
if dup_id := self.result_data.get("duplicate_of"):
return [dup_id]
return []
class Note(SoftDeleteModel):
note = models.TextField(
_("content"),
blank=True,
help_text=_("Note for the document"),
)
created = models.DateTimeField(
_("created"),
default=timezone.now,
db_index=True,
)
document = models.ForeignKey(
Document,
blank=True,
null=True,
related_name="notes",
on_delete=models.CASCADE,
verbose_name=_("document"),
)
user = models.ForeignKey(
User,
blank=True,
null=True,
related_name="notes",
on_delete=models.SET_NULL,
verbose_name=_("user"),
)
class Meta:
ordering = ("created",)
verbose_name = _("note")
verbose_name_plural = _("notes")
def __str__(self):
return self.note
class ShareLink(SoftDeleteModel):
class FileVersion(models.TextChoices):
ARCHIVE = ("archive", _("Archive"))
ORIGINAL = ("original", _("Original"))
created = models.DateTimeField(
_("created"),
default=timezone.now,
db_index=True,
blank=True,
editable=False,
)
expiration = models.DateTimeField(
_("expiration"),
blank=True,
null=True,
db_index=True,
)
slug = models.SlugField(
_("slug"),
db_index=True,
unique=True,
blank=True,
editable=False,
)
document = models.ForeignKey(
Document,
blank=True,
related_name="share_links",
on_delete=models.CASCADE,
verbose_name=_("document"),
)
file_version = models.CharField(
max_length=50,
choices=FileVersion.choices,
default=FileVersion.ARCHIVE,
)
owner = models.ForeignKey(
User,
blank=True,
null=True,
related_name="share_links",
on_delete=models.SET_NULL,
verbose_name=_("owner"),
)
class Meta:
ordering = ("created",)
verbose_name = _("share link")
verbose_name_plural = _("share links")
def __str__(self):
return f"Share Link for {self.document.title}"
class ShareLinkBundle(models.Model):
class Status(models.TextChoices):
PENDING = ("pending", _("Pending"))
PROCESSING = ("processing", _("Processing"))
READY = ("ready", _("Ready"))
FAILED = ("failed", _("Failed"))
created = models.DateTimeField(
_("created"),
default=timezone.now,
db_index=True,
blank=True,
editable=False,
)
expiration = models.DateTimeField(
_("expiration"),
blank=True,
null=True,
db_index=True,
)
slug = models.SlugField(
_("slug"),
db_index=True,
unique=True,
blank=True,
editable=False,
)
owner = models.ForeignKey(
User,
blank=True,
null=True,
related_name="share_link_bundles",
on_delete=models.SET_NULL,
verbose_name=_("owner"),
)
file_version = models.CharField(
max_length=50,
choices=ShareLink.FileVersion.choices,
default=ShareLink.FileVersion.ARCHIVE,
)
status = models.CharField(
max_length=50,
choices=Status.choices,
default=Status.PENDING,
)
size_bytes = models.PositiveIntegerField(
_("size (bytes)"),
blank=True,
null=True,
)
last_error = models.JSONField(
_("last error"),
blank=True,
null=True,
default=None,
)
file_path = models.CharField(
_("file path"),
max_length=512,
blank=True,
)
built_at = models.DateTimeField(
_("built at"),
null=True,
blank=True,
)
documents = models.ManyToManyField(
"documents.Document",
related_name="share_link_bundles",
verbose_name=_("documents"),
)
class Meta:
ordering = ("-created",)
verbose_name = _("share link bundle")
verbose_name_plural = _("share link bundles")
def __str__(self):
return _("Share link bundle %(slug)s") % {"slug": self.slug}
@property
def absolute_file_path(self) -> Path | None:
if not self.file_path:
return None
return (settings.SHARE_LINK_BUNDLE_DIR / Path(self.file_path)).resolve()
def remove_file(self) -> None:
if self.absolute_file_path is not None and self.absolute_file_path.exists():
try:
self.absolute_file_path.unlink()
except OSError:
pass
def delete(self, using=None, *, keep_parents=False):
self.remove_file()
return super().delete(using=using, keep_parents=keep_parents)
class CustomField(models.Model):
"""
Defines the name and type of a custom field
"""
class FieldDataType(models.TextChoices):
STRING = ("string", _("String"))
URL = ("url", _("URL"))
DATE = ("date", _("Date"))
BOOL = ("boolean"), _("Boolean")
INT = ("integer", _("Integer"))
FLOAT = ("float", _("Float"))
MONETARY = ("monetary", _("Monetary"))
DOCUMENTLINK = ("documentlink", _("Document Link"))
SELECT = ("select", _("Select"))
LONG_TEXT = ("longtext", _("Long Text"))
created = models.DateTimeField(
_("created"),
default=timezone.now,
db_index=True,
editable=False,
)
name = models.CharField(max_length=128)
data_type = models.CharField(
_("data type"),
max_length=50,
choices=FieldDataType.choices,
editable=False,
)
extra_data = models.JSONField(
_("extra data"),
null=True,
blank=True,
help_text=_(
"Extra data for the custom field, such as select options",
),
)
class Meta:
ordering = ("created",)
verbose_name = _("custom field")
verbose_name_plural = _("custom fields")
constraints = [
models.UniqueConstraint(
fields=["name"],
name="%(app_label)s_%(class)s_unique_name",
),
]
def __str__(self) -> str:
return f"{self.name} : {self.data_type}"
class CustomFieldInstance(SoftDeleteModel):
"""
A single instance of a field, attached to a CustomField for the name and type
and attached to a single Document to be metadata for it
"""
TYPE_TO_DATA_STORE_NAME_MAP = {
CustomField.FieldDataType.STRING: "value_text",
CustomField.FieldDataType.URL: "value_url",
CustomField.FieldDataType.DATE: "value_date",
CustomField.FieldDataType.BOOL: "value_bool",
CustomField.FieldDataType.INT: "value_int",
CustomField.FieldDataType.FLOAT: "value_float",
CustomField.FieldDataType.MONETARY: "value_monetary",
CustomField.FieldDataType.DOCUMENTLINK: "value_document_ids",
CustomField.FieldDataType.SELECT: "value_select",
CustomField.FieldDataType.LONG_TEXT: "value_long_text",
}
created = models.DateTimeField(
_("created"),
default=timezone.now,
db_index=True,
editable=False,
)
document = models.ForeignKey(
Document,
blank=False,
null=False,
on_delete=models.CASCADE,
related_name="custom_fields",
editable=False,
)
field = models.ForeignKey(
CustomField,
blank=False,
null=False,
on_delete=models.CASCADE,
related_name="fields",
editable=False,
)
# Actual data storage
value_text = models.CharField(max_length=128, null=True)
value_bool = models.BooleanField(null=True)
value_url = models.URLField(null=True)
value_date = models.DateField(null=True)
value_int = models.IntegerField(null=True)
value_float = models.FloatField(null=True)
value_monetary = models.CharField(null=True, max_length=128)
value_monetary_amount = models.GeneratedField(
expression=Case(
# If the value starts with a number and no currency symbol, use the whole string
models.When(
value_monetary__regex=r"^\d+",
then=Cast(
Substr("value_monetary", 1),
output_field=models.DecimalField(decimal_places=2, max_digits=65),
),
),
# If the value starts with a 3-char currency symbol, use the rest of the string
default=Cast(
Substr("value_monetary", 4),
output_field=models.DecimalField(decimal_places=2, max_digits=65),
),
output_field=models.DecimalField(decimal_places=2, max_digits=65),
),
output_field=models.DecimalField(decimal_places=2, max_digits=65),
db_persist=True,
)
value_document_ids = models.JSONField(null=True)
value_select = models.CharField(null=True, max_length=16)
value_long_text = models.TextField(null=True)
class Meta:
ordering = ("created",)
verbose_name = _("custom field instance")
verbose_name_plural = _("custom field instances")
constraints = [
models.UniqueConstraint(
fields=["document", "field"],
name="%(app_label)s_%(class)s_unique_document_field",
),
]
def __str__(self) -> str:
return str(self.field.name) + f" : {self.value_for_search}"
@classmethod
def get_value_field_name(cls, data_type: CustomField.FieldDataType):
try:
return cls.TYPE_TO_DATA_STORE_NAME_MAP[data_type]
except KeyError: # pragma: no cover
raise NotImplementedError(data_type)
@property
def value(self):
"""
Based on the data type, access the actual value the instance stores
A little shorthand/quick way to get what is actually here
"""
value_field_name = self.get_value_field_name(self.field.data_type)
return getattr(self, value_field_name)
@property
def value_for_search(self) -> str | None:
"""
Return the value suitable for full-text indexing and display, or None
if the value is unset.
For SELECT fields, resolves the human-readable label rather than the
opaque option ID stored in value_select.
"""
if self.value is None:
return None
if self.field.data_type == CustomField.FieldDataType.SELECT:
options = (self.field.extra_data or {}).get("select_options", [])
return next(
(o["label"] for o in options if o.get("id") == self.value),
None,
)
return str(self.value)
if settings.AUDIT_LOG_ENABLED:
auditlog.register(
Document,
m2m_fields={"tags"},
exclude_fields=["content_length", "modified"],
)
auditlog.register(Correspondent)
auditlog.register(Tag)
auditlog.register(DocumentType)
auditlog.register(Note)
auditlog.register(CustomField)
auditlog.register(CustomFieldInstance)
class WorkflowTrigger(models.Model):
class WorkflowTriggerMatching(models.IntegerChoices):
# No auto matching
NONE = MatchingModel.MATCH_NONE, _("None")
ANY = MatchingModel.MATCH_ANY, _("Any word")
ALL = MatchingModel.MATCH_ALL, _("All words")
LITERAL = MatchingModel.MATCH_LITERAL, _("Exact match")
REGEX = MatchingModel.MATCH_REGEX, _("Regular expression")
FUZZY = MatchingModel.MATCH_FUZZY, _("Fuzzy word")
class WorkflowTriggerType(models.IntegerChoices):
CONSUMPTION = 1, _("Consumption Started")
DOCUMENT_ADDED = 2, _("Document Added")
DOCUMENT_UPDATED = 3, _("Document Updated")
SCHEDULED = 4, _("Scheduled")
class DocumentSourceChoices(models.IntegerChoices):
CONSUME_FOLDER = DocumentSource.ConsumeFolder.value, _("Consume Folder")
API_UPLOAD = DocumentSource.ApiUpload.value, _("Api Upload")
MAIL_FETCH = DocumentSource.MailFetch.value, _("Mail Fetch")
WEB_UI = DocumentSource.WebUI.value, _("Web UI")
class ScheduleDateField(models.TextChoices):
ADDED = "added", _("Added")
CREATED = "created", _("Created")
MODIFIED = "modified", _("Modified")
CUSTOM_FIELD = "custom_field", _("Custom Field")
type = models.PositiveSmallIntegerField(
_("Workflow Trigger Type"),
choices=WorkflowTriggerType.choices,
default=WorkflowTriggerType.CONSUMPTION,
)
sources = MultiSelectField(
max_length=7,
choices=DocumentSourceChoices.choices,
default=f"{DocumentSource.ConsumeFolder},{DocumentSource.ApiUpload},{DocumentSource.MailFetch},{DocumentSource.WebUI}",
)
filter_path = models.CharField(
_("filter path"),
max_length=256,
null=True,
blank=True,
help_text=_(
"Only consume documents with a path that matches "
"this if specified. Wildcards specified as * are "
"allowed. Case insensitive.",
),
)
filter_filename = models.CharField(
_("filter filename"),
max_length=256,
null=True,
blank=True,
help_text=_(
"Only consume documents which entirely match this "
"filename if specified. Wildcards such as *.pdf or "
"*invoice* are allowed. Case insensitive.",
),
)
filter_mailrule = models.ForeignKey(
"paperless_mail.MailRule",
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("filter documents from this mail rule"),
)
match = models.CharField(_("match"), max_length=256, blank=True)
matching_algorithm = models.PositiveSmallIntegerField(
_("matching algorithm"),
choices=WorkflowTriggerMatching.choices,
default=WorkflowTriggerMatching.NONE,
)
is_insensitive = models.BooleanField(_("is insensitive"), default=True)
filter_has_tags = models.ManyToManyField(
Tag,
blank=True,
verbose_name=_("has these tag(s)"),
)
filter_has_all_tags = models.ManyToManyField(
Tag,
blank=True,
related_name="workflowtriggers_has_all",
verbose_name=_("has all of these tag(s)"),
)
filter_has_not_tags = models.ManyToManyField(
Tag,
blank=True,
related_name="workflowtriggers_has_not",
verbose_name=_("does not have these tag(s)"),
)
filter_has_document_type = models.ForeignKey(
DocumentType,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("has this document type"),
)
filter_has_any_document_types = models.ManyToManyField(
DocumentType,
blank=True,
related_name="workflowtriggers_has_any_document_type",
verbose_name=_("has one of these document types"),
)
filter_has_not_document_types = models.ManyToManyField(
DocumentType,
blank=True,
related_name="workflowtriggers_has_not_document_type",
verbose_name=_("does not have these document type(s)"),
)
filter_has_correspondent = models.ForeignKey(
Correspondent,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("has this correspondent"),
)
filter_has_not_correspondents = models.ManyToManyField(
Correspondent,
blank=True,
related_name="workflowtriggers_has_not_correspondent",
verbose_name=_("does not have these correspondent(s)"),
)
filter_has_any_correspondents = models.ManyToManyField(
Correspondent,
blank=True,
related_name="workflowtriggers_has_any_correspondent",
verbose_name=_("has one of these correspondents"),
)
filter_has_storage_path = models.ForeignKey(
StoragePath,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("has this storage path"),
)
filter_has_any_storage_paths = models.ManyToManyField(
StoragePath,
blank=True,
related_name="workflowtriggers_has_any_storage_path",
verbose_name=_("has one of these storage paths"),
)
filter_has_not_storage_paths = models.ManyToManyField(
StoragePath,
blank=True,
related_name="workflowtriggers_has_not_storage_path",
verbose_name=_("does not have these storage path(s)"),
)
filter_custom_field_query = models.TextField(
_("filter custom field query"),
null=True,
blank=True,
help_text=_("JSON-encoded custom field query expression."),
)
schedule_offset_days = models.SmallIntegerField(
_("schedule offset days"),
default=0,
help_text=_(
"The number of days to offset the schedule trigger by.",
),
)
schedule_is_recurring = models.BooleanField(
_("schedule is recurring"),
default=False,
help_text=_(
"If the schedule should be recurring.",
),
)
schedule_recurring_interval_days = models.PositiveSmallIntegerField(
_("schedule recurring delay in days"),
default=1,
validators=[MinValueValidator(1)],
help_text=_(
"The number of days between recurring schedule triggers.",
),
)
schedule_date_field = models.CharField(
_("schedule date field"),
max_length=20,
choices=ScheduleDateField.choices,
default=ScheduleDateField.ADDED,
help_text=_(
"The field to check for a schedule trigger.",
),
)
schedule_date_custom_field = models.ForeignKey(
CustomField,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("schedule date custom field"),
)
class Meta:
verbose_name = _("workflow trigger")
verbose_name_plural = _("workflow triggers")
def __str__(self):
return f"WorkflowTrigger {self.pk}"
class WorkflowActionEmail(models.Model):
subject = models.CharField(
_("email subject"),
max_length=256,
null=False,
help_text=_(
"The subject of the email, can include some placeholders, "
"see documentation.",
),
)
body = models.TextField(
_("email body"),
null=False,
help_text=_(
"The body (message) of the email, can include some placeholders, "
"see documentation.",
),
)
to = models.TextField(
_("emails to"),
null=False,
help_text=_(
"The destination email addresses, comma separated.",
),
)
include_document = models.BooleanField(
default=False,
verbose_name=_("include document in email"),
)
def __str__(self):
return f"Workflow Email Action {self.pk}"
class WorkflowActionWebhook(models.Model):
# We dont use the built-in URLField because it is not flexible enough
# validation is handled in the serializer
url = models.CharField(
_("webhook url"),
null=False,
max_length=256,
help_text=_("The destination URL for the notification."),
)
use_params = models.BooleanField(
default=True,
verbose_name=_("use parameters"),
)
as_json = models.BooleanField(
default=False,
verbose_name=_("send as JSON"),
)
params = models.JSONField(
_("webhook parameters"),
null=True,
blank=True,
help_text=_("The parameters to send with the webhook URL if body not used."),
)
body = models.TextField(
_("webhook body"),
null=True,
blank=True,
help_text=_("The body to send with the webhook URL if parameters not used."),
)
headers = models.JSONField(
_("webhook headers"),
null=True,
blank=True,
help_text=_("The headers to send with the webhook URL."),
)
include_document = models.BooleanField(
default=False,
verbose_name=_("include document in webhook"),
)
def __str__(self):
return f"Workflow Webhook Action {self.pk}"
class WorkflowAction(models.Model):
class WorkflowActionType(models.IntegerChoices):
ASSIGNMENT = (
1,
_("Assignment"),
)
REMOVAL = (
2,
_("Removal"),
)
EMAIL = (
3,
_("Email"),
)
WEBHOOK = (
4,
_("Webhook"),
)
PASSWORD_REMOVAL = (
5,
_("Password removal"),
)
MOVE_TO_TRASH = (
6,
_("Move to trash"),
)
type = models.PositiveSmallIntegerField(
_("Workflow Action Type"),
choices=WorkflowActionType.choices,
default=WorkflowActionType.ASSIGNMENT,
)
order = models.PositiveSmallIntegerField(_("order"), default=0)
assign_title = models.TextField(
_("assign title"),
null=True,
blank=True,
help_text=_(
"Assign a document title, must be a Jinja2 template, see documentation.",
),
)
assign_tags = models.ManyToManyField(
Tag,
blank=True,
related_name="+",
verbose_name=_("assign this tag"),
)
assign_document_type = models.ForeignKey(
DocumentType,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
verbose_name=_("assign this document type"),
)
assign_correspondent = models.ForeignKey(
Correspondent,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
verbose_name=_("assign this correspondent"),
)
assign_storage_path = models.ForeignKey(
StoragePath,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
verbose_name=_("assign this storage path"),
)
assign_owner = models.ForeignKey(
User,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
verbose_name=_("assign this owner"),
)
assign_view_users = models.ManyToManyField(
User,
blank=True,
related_name="+",
verbose_name=_("grant view permissions to these users"),
)
assign_view_groups = models.ManyToManyField(
Group,
blank=True,
related_name="+",
verbose_name=_("grant view permissions to these groups"),
)
assign_change_users = models.ManyToManyField(
User,
blank=True,
related_name="+",
verbose_name=_("grant change permissions to these users"),
)
assign_change_groups = models.ManyToManyField(
Group,
blank=True,
related_name="+",
verbose_name=_("grant change permissions to these groups"),
)
assign_custom_fields = models.ManyToManyField(
CustomField,
blank=True,
related_name="+",
verbose_name=_("assign these custom fields"),
)
assign_custom_fields_values = models.JSONField(
_("custom field values"),
null=True,
blank=True,
help_text=_(
"Optional values to assign to the custom fields.",
),
default=dict,
)
remove_tags = models.ManyToManyField(
Tag,
blank=True,
related_name="+",
verbose_name=_("remove these tag(s)"),
)
remove_all_tags = models.BooleanField(
default=False,
verbose_name=_("remove all tags"),
)
remove_document_types = models.ManyToManyField(
DocumentType,
blank=True,
related_name="+",
verbose_name=_("remove these document type(s)"),
)
remove_all_document_types = models.BooleanField(
default=False,
verbose_name=_("remove all document types"),
)
remove_correspondents = models.ManyToManyField(
Correspondent,
blank=True,
related_name="+",
verbose_name=_("remove these correspondent(s)"),
)
remove_all_correspondents = models.BooleanField(
default=False,
verbose_name=_("remove all correspondents"),
)
remove_storage_paths = models.ManyToManyField(
StoragePath,
blank=True,
related_name="+",
verbose_name=_("remove these storage path(s)"),
)
remove_all_storage_paths = models.BooleanField(
default=False,
verbose_name=_("remove all storage paths"),
)
remove_owners = models.ManyToManyField(
User,
blank=True,
related_name="+",
verbose_name=_("remove these owner(s)"),
)
remove_all_owners = models.BooleanField(
default=False,
verbose_name=_("remove all owners"),
)
remove_view_users = models.ManyToManyField(
User,
blank=True,
related_name="+",
verbose_name=_("remove view permissions for these users"),
)
remove_view_groups = models.ManyToManyField(
Group,
blank=True,
related_name="+",
verbose_name=_("remove view permissions for these groups"),
)
remove_change_users = models.ManyToManyField(
User,
blank=True,
related_name="+",
verbose_name=_("remove change permissions for these users"),
)
remove_change_groups = models.ManyToManyField(
Group,
blank=True,
related_name="+",
verbose_name=_("remove change permissions for these groups"),
)
remove_all_permissions = models.BooleanField(
default=False,
verbose_name=_("remove all permissions"),
)
remove_custom_fields = models.ManyToManyField(
CustomField,
blank=True,
related_name="+",
verbose_name=_("remove these custom fields"),
)
remove_all_custom_fields = models.BooleanField(
default=False,
verbose_name=_("remove all custom fields"),
)
email = models.ForeignKey(
WorkflowActionEmail,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="action",
verbose_name=_("email"),
)
webhook = models.ForeignKey(
WorkflowActionWebhook,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="action",
verbose_name=_("webhook"),
)
passwords = models.JSONField(
_("passwords"),
null=True,
blank=True,
help_text=_(
"Passwords to try when removing PDF protection. Separate with commas or new lines.",
),
)
class Meta:
verbose_name = _("workflow action")
verbose_name_plural = _("workflow actions")
def __str__(self):
return f"WorkflowAction {self.pk}"
class Workflow(models.Model):
name = models.CharField(_("name"), max_length=256, unique=True)
order = models.SmallIntegerField(_("order"), default=0)
triggers = models.ManyToManyField(
WorkflowTrigger,
related_name="workflows",
blank=False,
verbose_name=_("triggers"),
)
actions = models.ManyToManyField(
WorkflowAction,
related_name="workflows",
blank=False,
verbose_name=_("actions"),
)
enabled = models.BooleanField(_("enabled"), default=True)
def __str__(self):
return f"Workflow: {self.name}"
class WorkflowRun(SoftDeleteModel):
workflow = models.ForeignKey(
Workflow,
on_delete=models.CASCADE,
related_name="runs",
verbose_name=_("workflow"),
)
type = models.PositiveSmallIntegerField(
_("workflow trigger type"),
choices=WorkflowTrigger.WorkflowTriggerType.choices,
null=True,
)
document = models.ForeignKey(
Document,
null=True,
on_delete=models.CASCADE,
related_name="workflow_runs",
verbose_name=_("document"),
)
run_at = models.DateTimeField(
_("date run"),
default=timezone.now,
db_index=True,
)
class Meta:
verbose_name = _("workflow run")
verbose_name_plural = _("workflow runs")
def __str__(self) -> str:
return f"WorkflowRun of {self.workflow} at {self.run_at} on {self.document}"