From 58789e506145cdcd948c2ac8c6960bbedad084f3 Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Mon, 20 Apr 2026 13:19:54 -0700 Subject: [PATCH] Chore: Structured consume task return values (#12612) --- src/documents/admin.py | 1 - src/documents/consumer.py | 24 +++++++++++--- src/documents/data_models.py | 24 ++++++++++++++ .../migrations/0019_task_system_redesign.py | 9 ------ src/documents/models.py | 7 ----- src/documents/serialisers.py | 23 +++++++++----- src/documents/signals/handlers.py | 31 +++---------------- src/documents/tasks.py | 19 +++++++++++- src/documents/tests/factories.py | 1 - src/documents/tests/test_api_status.py | 6 ++-- src/documents/tests/test_barcodes.py | 4 +-- src/documents/tests/test_double_sided.py | 8 ++--- src/documents/tests/test_task_signals.py | 30 +++++++++++------- src/documents/views.py | 18 +++++++++-- src/paperless_mail/mail.py | 2 +- 15 files changed, 125 insertions(+), 82 deletions(-) diff --git a/src/documents/admin.py b/src/documents/admin.py index 3730160fb..d0a43ce9c 100644 --- a/src/documents/admin.py +++ b/src/documents/admin.py @@ -167,7 +167,6 @@ class TaskAdmin(admin.ModelAdmin): "wait_time_seconds", "input_data", "result_data", - "result_message", ) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 83bc832ce..390ce3e66 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -20,6 +20,7 @@ from rest_framework.reverse import reverse from documents.classifier import load_classifier from documents.data_models import ConsumableDocument +from documents.data_models import ConsumeFileSuccessResult from documents.data_models import DocumentMetadataOverrides from documents.file_handling import create_source_path_directory from documents.file_handling import generate_filename @@ -90,6 +91,15 @@ class ConsumerError(Exception): pass +class ConsumeFileDuplicateError(ConsumerError): + """Raised when a file is rejected because it duplicates an existing document.""" + + def __init__(self, message: str, duplicate_id: int, *, in_trash: bool) -> None: + super().__init__(message) + self.duplicate_id = duplicate_id + self.in_trash = in_trash + + class ConsumerStatusShortMessage(StrEnum): DOCUMENT_ALREADY_EXISTS = "document_already_exists" DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash" @@ -395,7 +405,7 @@ class ConsumerPlugin( exception=e, ) - def run(self) -> str: + def run(self) -> "ConsumeFileSuccessResult": """ Return the document object if it was successfully created. """ @@ -771,7 +781,7 @@ class ConsumerPlugin( # Return the most up to date fields document.refresh_from_db() - return f"Success. New document id {document.pk} created" + return ConsumeFileSuccessResult(document_id=document.pk) def _parse_title_placeholders(self, title: str) -> str: local_added = timezone.localtime(timezone.now()) @@ -1010,9 +1020,13 @@ class ConsumerPreflightPlugin( ) failure_msg += " Note: existing document is in the trash." - self._fail( - status_msg, - failure_msg, + self._send_progress(100, 100, ProgressStatusOptions.FAILED, status_msg) + self.log.error(failure_msg) + in_trash = duplicates_in_trash.exists() + raise ConsumeFileDuplicateError( + f"{self.filename}: {failure_msg}", + duplicate.pk, + in_trash=in_trash, ) def pre_check_directories(self) -> None: diff --git a/src/documents/data_models.py b/src/documents/data_models.py index a2452a8af..6d9e3a187 100644 --- a/src/documents/data_models.py +++ b/src/documents/data_models.py @@ -2,6 +2,7 @@ import dataclasses import datetime from enum import IntEnum from pathlib import Path +from typing import TypedDict import magic from guardian.shortcuts import get_groups_with_perms @@ -184,3 +185,26 @@ class ConsumableDocument: # Get the file type once at init # Note this function isn't called when the object is unpickled self.mime_type = magic.from_file(self.original_file, mime=True) + + +class ConsumeFileDuplicateResult(TypedDict): + """Returned by consume_file when the file is rejected as a duplicate.""" + + duplicate_of: int + duplicate_in_trash: bool + + +class ConsumeFileSuccessResult(TypedDict): + """Returned by consume_file when the document is created successfully.""" + + document_id: int + + +class ConsumeFileStoppedResult(TypedDict): + """Returned by consume_file when a plugin raises StopConsumeTaskError. + + Examples: barcode split dispatched child tasks, double-sided scan waiting + for the second half, workflow deleted the document during consumption. + """ + + reason: str diff --git a/src/documents/migrations/0019_task_system_redesign.py b/src/documents/migrations/0019_task_system_redesign.py index 85ba489f1..7b7a77cf4 100644 --- a/src/documents/migrations/0019_task_system_redesign.py +++ b/src/documents/migrations/0019_task_system_redesign.py @@ -170,15 +170,6 @@ class Migration(migrations.Migration): verbose_name="Result Data", ), ), - ( - "result_message", - models.TextField( - blank=True, - help_text="Human-readable result message", - null=True, - verbose_name="Result Message", - ), - ), ( "acknowledged", models.BooleanField( diff --git a/src/documents/models.py b/src/documents/models.py index da51823ae..98202de0f 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -790,13 +790,6 @@ class PaperlessTask(ModelWithOwner): help_text=_("Structured result data from task execution"), ) - result_message = models.TextField( - null=True, - blank=True, - verbose_name=_("Result Message"), - help_text=_("Human-readable result message"), - ) - # Acknowledgment acknowledged = models.BooleanField( default=False, diff --git a/src/documents/serialisers.py b/src/documents/serialisers.py index 94dca2035..3cba0fafa 100644 --- a/src/documents/serialisers.py +++ b/src/documents/serialisers.py @@ -2477,7 +2477,6 @@ class TaskSerializerV10(OwnedObjectSerializer): "wait_time_seconds", "input_data", "result_data", - "result_message", "related_document_ids", "acknowledged", "owner", @@ -2504,12 +2503,8 @@ class TaskSerializerV9(serializers.ModelSerializer): # v9 field: status -> uppercase Celery state strings status = serializers.SerializerMethodField() - # v9 field: result -> result_message (with legacy format fallback) - result = serializers.CharField( - source="result_message", - read_only=True, - allow_null=True, - ) + # v9 field: result -> derived from result_data + result = serializers.SerializerMethodField() # v9 field: related_document -> first document ID from result_data related_document = serializers.SerializerMethodField() @@ -2541,6 +2536,20 @@ class TaskSerializerV9(serializers.ModelSerializer): PaperlessTask.TaskType.LLM_INDEX: "llmindex_update", } + def get_result(self, obj: PaperlessTask) -> str | None: + """Reconstruct a human-readable result string from result_data for v9 clients.""" + if not obj.result_data: + return None + if doc_id := obj.result_data.get("document_id"): + return f"Success. New document id {doc_id} created" + if reason := obj.result_data.get("reason"): + return reason + if dup_id := obj.result_data.get("duplicate_of"): + return f"Not consuming: It is a duplicate of document #{dup_id}" + if error := obj.result_data.get("error_message"): + return error + return None + def get_task_name(self, obj: PaperlessTask) -> str: return self._TASK_TYPE_TO_V9_NAME.get(obj.task_type, obj.task_type) diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 952ce4df9..5b481b4ef 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -3,7 +3,6 @@ from __future__ import annotations import datetime import hashlib import logging -import re as _re import shutil import traceback as _tb from pathlib import Path @@ -1101,25 +1100,6 @@ def _extract_owner_id( return None # pragma: no cover -def _parse_consume_result(result: str) -> dict | None: - """Parse a consume_file string result into a structured dict. - - consume_file returns human-readable strings rather than dicts (e.g. - "Success. New document id 42 created" or "It is a duplicate of foo (#7)"). - This function extracts the document ID or duplicate reference so the - result can be stored as structured data on the PaperlessTask record. - Returns None when the string does not match any known pattern. - """ - if match := _re.search(r"New document id (\d+) created", result): - return {"document_id": int(match.group(1))} - if match := _re.search(r"It is a duplicate of .* \(#(\d+)\)", result): - return { - "duplicate_of": int(match.group(1)), - "duplicate_in_trash": "existing document is in the trash" in result, - } - return None # pragma: no cover - - @before_task_publish.connect def before_task_publish_handler( sender=None, @@ -1196,8 +1176,7 @@ def task_postrun_handler( Records task completion and result data for non-failure outcomes. Skips FAILURE states entirely, since task_failure_handler fires first - and fully owns the failure path (status, date_done, duration, - result_data, result_message). + and fully owns the failure path (status, date_done, duration, result_data). https://docs.celeryq.dev/en/stable/userguide/signals.html#task-postrun """ @@ -1236,10 +1215,9 @@ def task_postrun_handler( if isinstance(retval, dict): task_instance.result_data = retval changed_fields.append("result_data") - elif isinstance(retval, str): - task_instance.result_message = retval - task_instance.result_data = _parse_consume_result(retval) - changed_fields.extend(["result_message", "result_data"]) + if "duplicate_of" in retval: + task_instance.status = PaperlessTask.Status.FAILURE + changed_fields.append("status") task_instance.save(update_fields=changed_fields) except Exception: # pragma: no cover @@ -1282,7 +1260,6 @@ def task_failure_handler( update_fields: dict = { "status": PaperlessTask.Status.FAILURE, "result_data": result_data, - "result_message": str(exception) if exception else None, "date_done": now, } diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 3bb3ff40c..8f346e36c 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -26,11 +26,15 @@ from documents.caching import clear_document_caches from documents.classifier import DocumentClassifier from documents.classifier import load_classifier from documents.consumer import AsnCheckPlugin +from documents.consumer import ConsumeFileDuplicateError from documents.consumer import ConsumerPlugin from documents.consumer import ConsumerPreflightPlugin from documents.consumer import WorkflowTriggerPlugin from documents.consumer import should_produce_archive from documents.data_models import ConsumableDocument +from documents.data_models import ConsumeFileDuplicateResult +from documents.data_models import ConsumeFileStoppedResult +from documents.data_models import ConsumeFileSuccessResult from documents.data_models import DocumentMetadataOverrides from documents.double_sided import CollatePlugin from documents.file_handling import create_source_path_directory @@ -121,6 +125,11 @@ def consume_file( self: Task, input_doc: ConsumableDocument, overrides: DocumentMetadataOverrides | None = None, +) -> ( + ConsumeFileSuccessResult + | ConsumeFileStoppedResult + | ConsumeFileDuplicateResult + | None ): token = consume_task_id.set((self.request.id or "")[:8]) try: @@ -153,6 +162,7 @@ def consume_file( TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir, ): tmp_dir = Path(tmp_dir) + msg = None for plugin_class in plugins: plugin_name = plugin_class.NAME @@ -183,7 +193,14 @@ def consume_file( except StopConsumeTaskError as e: logger.info(f"{plugin_name} requested task exit: {e.message}") - return e.message + return ConsumeFileStoppedResult(reason=e.message) + + except ConsumeFileDuplicateError as e: + logger.info(f"{plugin_name} rejected duplicate: {e}") + return ConsumeFileDuplicateResult( + duplicate_of=e.duplicate_id, + duplicate_in_trash=e.in_trash, + ) except Exception as e: logger.exception(f"{plugin_name} failed: {e}") diff --git a/src/documents/tests/factories.py b/src/documents/tests/factories.py index 21d8bcb37..aaadd6d14 100644 --- a/src/documents/tests/factories.py +++ b/src/documents/tests/factories.py @@ -78,5 +78,4 @@ class PaperlessTaskFactory(DjangoModelFactory): status = PaperlessTask.Status.PENDING input_data = factory.LazyFunction(dict) result_data = None - result_message = None acknowledged = False diff --git a/src/documents/tests/test_api_status.py b/src/documents/tests/test_api_status.py index 69cfe2c34..ef94f8b33 100644 --- a/src/documents/tests/test_api_status.py +++ b/src/documents/tests/test_api_status.py @@ -299,7 +299,7 @@ class TestSystemStatus(APITestCase): task_type=PaperlessTask.TaskType.TRAIN_CLASSIFIER, trigger_source=PaperlessTask.TriggerSource.SCHEDULED, status=PaperlessTask.Status.FAILURE, - result_message="Classifier training failed", + result_data={"error_message": "Classifier training failed"}, ) self.client.force_login(self.user) response = self.client.get(self.ENDPOINT) @@ -360,7 +360,7 @@ class TestSystemStatus(APITestCase): task_type=PaperlessTask.TaskType.SANITY_CHECK, trigger_source=PaperlessTask.TriggerSource.SCHEDULED, status=PaperlessTask.Status.FAILURE, - result_message="5 issues found.", + result_data={"error_message": "5 issues found."}, ) self.client.force_login(self.user) response = self.client.get(self.ENDPOINT) @@ -429,7 +429,7 @@ class TestSystemStatus(APITestCase): task_type=PaperlessTask.TaskType.LLM_INDEX, trigger_source=PaperlessTask.TriggerSource.SCHEDULED, status=PaperlessTask.Status.FAILURE, - result_message="AI index update failed", + result_data={"error_message": "AI index update failed"}, ) self.client.force_login(self.user) response = self.client.get(self.ENDPOINT) diff --git a/src/documents/tests/test_barcodes.py b/src/documents/tests/test_barcodes.py index 0a8da4eb0..77b1f07bb 100644 --- a/src/documents/tests/test_barcodes.py +++ b/src/documents/tests/test_barcodes.py @@ -632,7 +632,7 @@ class TestBarcodeNewConsume( ), overrides, ), - "Barcode splitting complete!", + {"reason": "Barcode splitting complete!"}, ) # 2 new document consume tasks created self.assertEqual(self.consume_file_mock.call_count, 2) @@ -1049,7 +1049,7 @@ class TestTagBarcode(DirectoriesMixin, SampleDirMixin, GetReaderPluginMixin, Tes None, ) - self.assertEqual(result, "Barcode splitting complete!") + self.assertEqual(result, {"reason": "Barcode splitting complete!"}) documents = Document.objects.all().order_by("id") self.assertEqual(documents.count(), 3) diff --git a/src/documents/tests/test_double_sided.py b/src/documents/tests/test_double_sided.py index 9ae7ce63a..1189512df 100644 --- a/src/documents/tests/test_double_sided.py +++ b/src/documents/tests/test_double_sided.py @@ -83,7 +83,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase): dt.datetime.now(), delta=dt.timedelta(seconds=5), ) - self.assertIn("Received odd numbered pages", msg) + self.assertIn("Received odd numbered pages", msg["reason"]) def test_collation(self) -> None: """ @@ -129,7 +129,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase): ) msg = self.consume_file("double-sided-odd.pdf") self.assertIsFile(self.staging_file) - self.assertIn("Received odd numbered pages", msg) + self.assertIn("Received odd numbered pages", msg["reason"]) def test_less_odd_pages_then_even_fails(self) -> None: """ @@ -212,7 +212,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase): """ msg = self.consume_file("simple.pdf", Path("..") / "simple.pdf") self.assertIsNotFile(self.staging_file) - self.assertRegex(msg, r"Success. New document id \d+ created") + self.assertIsInstance(msg.get("document_id"), int) def test_subdirectory_upload(self) -> None: """ @@ -252,4 +252,4 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase): """ msg = self.consume_file("simple.pdf") self.assertIsNotFile(self.staging_file) - self.assertRegex(msg, r"Success. New document id \d+ created") + self.assertIsInstance(msg.get("document_id"), int) diff --git a/src/documents/tests/test_task_signals.py b/src/documents/tests/test_task_signals.py index 56d964822..02221743a 100644 --- a/src/documents/tests/test_task_signals.py +++ b/src/documents/tests/test_task_signals.py @@ -238,30 +238,38 @@ class TestTaskPostrunHandler: task.refresh_from_db() assert task.status == PaperlessTask.Status.STARTED - def test_parses_legacy_new_document_string(self): - task = self._started_task() + def test_records_success_with_consume_result(self): + """ConsumeFileSuccessResult dict is stored directly as result_data.""" + from documents.data_models import ConsumeFileSuccessResult + task = self._started_task() task_postrun_handler( task_id=task.task_id, - retval="New document id 42 created", + retval=ConsumeFileSuccessResult(document_id=42), state="SUCCESS", ) task.refresh_from_db() - assert task.result_data["document_id"] == 42 - assert task.result_message == "New document id 42 created" + assert task.result_data == {"document_id": 42} + + def test_records_stopped_with_reason(self): + """ConsumeFileStoppedResult dict is stored directly as result_data.""" + from documents.data_models import ConsumeFileStoppedResult - def test_parses_duplicate_string(self): - """Duplicate detection returns a string with SUCCESS state (StopConsumeTaskError is caught and returned, not raised).""" task = self._started_task() - task_postrun_handler( task_id=task.task_id, - retval="It is a duplicate of some document (#99).", + retval=ConsumeFileStoppedResult(reason="Barcode splitting complete!"), state="SUCCESS", ) task.refresh_from_db() - assert task.result_data["duplicate_of"] == 99 - assert task.result_data["duplicate_in_trash"] is False + assert task.result_data == {"reason": "Barcode splitting complete!"} + + def test_none_retval_stores_no_result_data(self): + """None return value (non-consume tasks) leaves result_data untouched.""" + task = self._started_task() + task_postrun_handler(task_id=task.task_id, retval=None, state="SUCCESS") + task.refresh_from_db() + assert task.result_data is None def test_ignores_unknown_task_id(self): diff --git a/src/documents/views.py b/src/documents/views.py index a8fd29a84..51b1abe96 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -4685,7 +4685,11 @@ class SystemStatusView(PassUserMixin): classifier_error = "No classifier training tasks found" elif last_trained_task.status != PaperlessTask.Status.SUCCESS: classifier_status = "ERROR" - classifier_error = last_trained_task.result_message + classifier_error = ( + last_trained_task.result_data.get("error_message") + if last_trained_task.result_data + else None + ) classifier_last_trained = ( last_trained_task.date_done if last_trained_task else None ) @@ -4705,7 +4709,11 @@ class SystemStatusView(PassUserMixin): sanity_check_error = "No sanity check tasks found" elif last_sanity_check.status != PaperlessTask.Status.SUCCESS: sanity_check_status = "ERROR" - sanity_check_error = last_sanity_check.result_message + sanity_check_error = ( + last_sanity_check.result_data.get("error_message") + if last_sanity_check.result_data + else None + ) sanity_check_last_run = ( last_sanity_check.date_done if last_sanity_check else None ) @@ -4730,7 +4738,11 @@ class SystemStatusView(PassUserMixin): llmindex_error = "No LLM index update tasks found" elif last_llmindex_update.status == PaperlessTask.Status.FAILURE: llmindex_status = "ERROR" - llmindex_error = last_llmindex_update.result_message + llmindex_error = ( + last_llmindex_update.result_data.get("error_message") + if last_llmindex_update.result_data + else None + ) llmindex_last_modified = ( last_llmindex_update.date_done if last_llmindex_update else None ) diff --git a/src/paperless_mail/mail.py b/src/paperless_mail/mail.py index 430bceb4f..d551cc8cd 100644 --- a/src/paperless_mail/mail.py +++ b/src/paperless_mail/mail.py @@ -239,7 +239,7 @@ def mailbox_login(mailbox: MailBox, account: MailAccount) -> None: @shared_task def apply_mail_action( - result: list[str], + result: list, rule_id: int, message_uid: str, message_subject: str,