Chore: Structured consume task return values (#12612)

This commit is contained in:
Trenton H
2026-04-20 13:19:54 -07:00
committed by GitHub
parent 7492cda794
commit 58789e5061
15 changed files with 125 additions and 82 deletions
-1
View File
@@ -167,7 +167,6 @@ class TaskAdmin(admin.ModelAdmin):
"wait_time_seconds",
"input_data",
"result_data",
"result_message",
)
+19 -5
View File
@@ -20,6 +20,7 @@ from rest_framework.reverse import reverse
from documents.classifier import load_classifier
from documents.data_models import ConsumableDocument
from documents.data_models import ConsumeFileSuccessResult
from documents.data_models import DocumentMetadataOverrides
from documents.file_handling import create_source_path_directory
from documents.file_handling import generate_filename
@@ -90,6 +91,15 @@ class ConsumerError(Exception):
pass
class ConsumeFileDuplicateError(ConsumerError):
"""Raised when a file is rejected because it duplicates an existing document."""
def __init__(self, message: str, duplicate_id: int, *, in_trash: bool) -> None:
super().__init__(message)
self.duplicate_id = duplicate_id
self.in_trash = in_trash
class ConsumerStatusShortMessage(StrEnum):
DOCUMENT_ALREADY_EXISTS = "document_already_exists"
DOCUMENT_ALREADY_EXISTS_IN_TRASH = "document_already_exists_in_trash"
@@ -395,7 +405,7 @@ class ConsumerPlugin(
exception=e,
)
def run(self) -> str:
def run(self) -> "ConsumeFileSuccessResult":
"""
Return the document object if it was successfully created.
"""
@@ -771,7 +781,7 @@ class ConsumerPlugin(
# Return the most up to date fields
document.refresh_from_db()
return f"Success. New document id {document.pk} created"
return ConsumeFileSuccessResult(document_id=document.pk)
def _parse_title_placeholders(self, title: str) -> str:
local_added = timezone.localtime(timezone.now())
@@ -1010,9 +1020,13 @@ class ConsumerPreflightPlugin(
)
failure_msg += " Note: existing document is in the trash."
self._fail(
status_msg,
failure_msg,
self._send_progress(100, 100, ProgressStatusOptions.FAILED, status_msg)
self.log.error(failure_msg)
in_trash = duplicates_in_trash.exists()
raise ConsumeFileDuplicateError(
f"{self.filename}: {failure_msg}",
duplicate.pk,
in_trash=in_trash,
)
def pre_check_directories(self) -> None:
+24
View File
@@ -2,6 +2,7 @@ import dataclasses
import datetime
from enum import IntEnum
from pathlib import Path
from typing import TypedDict
import magic
from guardian.shortcuts import get_groups_with_perms
@@ -184,3 +185,26 @@ class ConsumableDocument:
# Get the file type once at init
# Note this function isn't called when the object is unpickled
self.mime_type = magic.from_file(self.original_file, mime=True)
class ConsumeFileDuplicateResult(TypedDict):
"""Returned by consume_file when the file is rejected as a duplicate."""
duplicate_of: int
duplicate_in_trash: bool
class ConsumeFileSuccessResult(TypedDict):
"""Returned by consume_file when the document is created successfully."""
document_id: int
class ConsumeFileStoppedResult(TypedDict):
"""Returned by consume_file when a plugin raises StopConsumeTaskError.
Examples: barcode split dispatched child tasks, double-sided scan waiting
for the second half, workflow deleted the document during consumption.
"""
reason: str
@@ -170,15 +170,6 @@ class Migration(migrations.Migration):
verbose_name="Result Data",
),
),
(
"result_message",
models.TextField(
blank=True,
help_text="Human-readable result message",
null=True,
verbose_name="Result Message",
),
),
(
"acknowledged",
models.BooleanField(
-7
View File
@@ -790,13 +790,6 @@ class PaperlessTask(ModelWithOwner):
help_text=_("Structured result data from task execution"),
)
result_message = models.TextField(
null=True,
blank=True,
verbose_name=_("Result Message"),
help_text=_("Human-readable result message"),
)
# Acknowledgment
acknowledged = models.BooleanField(
default=False,
+16 -7
View File
@@ -2477,7 +2477,6 @@ class TaskSerializerV10(OwnedObjectSerializer):
"wait_time_seconds",
"input_data",
"result_data",
"result_message",
"related_document_ids",
"acknowledged",
"owner",
@@ -2504,12 +2503,8 @@ class TaskSerializerV9(serializers.ModelSerializer):
# v9 field: status -> uppercase Celery state strings
status = serializers.SerializerMethodField()
# v9 field: result -> result_message (with legacy format fallback)
result = serializers.CharField(
source="result_message",
read_only=True,
allow_null=True,
)
# v9 field: result -> derived from result_data
result = serializers.SerializerMethodField()
# v9 field: related_document -> first document ID from result_data
related_document = serializers.SerializerMethodField()
@@ -2541,6 +2536,20 @@ class TaskSerializerV9(serializers.ModelSerializer):
PaperlessTask.TaskType.LLM_INDEX: "llmindex_update",
}
def get_result(self, obj: PaperlessTask) -> str | None:
"""Reconstruct a human-readable result string from result_data for v9 clients."""
if not obj.result_data:
return None
if doc_id := obj.result_data.get("document_id"):
return f"Success. New document id {doc_id} created"
if reason := obj.result_data.get("reason"):
return reason
if dup_id := obj.result_data.get("duplicate_of"):
return f"Not consuming: It is a duplicate of document #{dup_id}"
if error := obj.result_data.get("error_message"):
return error
return None
def get_task_name(self, obj: PaperlessTask) -> str:
return self._TASK_TYPE_TO_V9_NAME.get(obj.task_type, obj.task_type)
+4 -27
View File
@@ -3,7 +3,6 @@ from __future__ import annotations
import datetime
import hashlib
import logging
import re as _re
import shutil
import traceback as _tb
from pathlib import Path
@@ -1101,25 +1100,6 @@ def _extract_owner_id(
return None # pragma: no cover
def _parse_consume_result(result: str) -> dict | None:
"""Parse a consume_file string result into a structured dict.
consume_file returns human-readable strings rather than dicts (e.g.
"Success. New document id 42 created" or "It is a duplicate of foo (#7)").
This function extracts the document ID or duplicate reference so the
result can be stored as structured data on the PaperlessTask record.
Returns None when the string does not match any known pattern.
"""
if match := _re.search(r"New document id (\d+) created", result):
return {"document_id": int(match.group(1))}
if match := _re.search(r"It is a duplicate of .* \(#(\d+)\)", result):
return {
"duplicate_of": int(match.group(1)),
"duplicate_in_trash": "existing document is in the trash" in result,
}
return None # pragma: no cover
@before_task_publish.connect
def before_task_publish_handler(
sender=None,
@@ -1196,8 +1176,7 @@ def task_postrun_handler(
Records task completion and result data for non-failure outcomes.
Skips FAILURE states entirely, since task_failure_handler fires first
and fully owns the failure path (status, date_done, duration,
result_data, result_message).
and fully owns the failure path (status, date_done, duration, result_data).
https://docs.celeryq.dev/en/stable/userguide/signals.html#task-postrun
"""
@@ -1236,10 +1215,9 @@ def task_postrun_handler(
if isinstance(retval, dict):
task_instance.result_data = retval
changed_fields.append("result_data")
elif isinstance(retval, str):
task_instance.result_message = retval
task_instance.result_data = _parse_consume_result(retval)
changed_fields.extend(["result_message", "result_data"])
if "duplicate_of" in retval:
task_instance.status = PaperlessTask.Status.FAILURE
changed_fields.append("status")
task_instance.save(update_fields=changed_fields)
except Exception: # pragma: no cover
@@ -1282,7 +1260,6 @@ def task_failure_handler(
update_fields: dict = {
"status": PaperlessTask.Status.FAILURE,
"result_data": result_data,
"result_message": str(exception) if exception else None,
"date_done": now,
}
+18 -1
View File
@@ -26,11 +26,15 @@ from documents.caching import clear_document_caches
from documents.classifier import DocumentClassifier
from documents.classifier import load_classifier
from documents.consumer import AsnCheckPlugin
from documents.consumer import ConsumeFileDuplicateError
from documents.consumer import ConsumerPlugin
from documents.consumer import ConsumerPreflightPlugin
from documents.consumer import WorkflowTriggerPlugin
from documents.consumer import should_produce_archive
from documents.data_models import ConsumableDocument
from documents.data_models import ConsumeFileDuplicateResult
from documents.data_models import ConsumeFileStoppedResult
from documents.data_models import ConsumeFileSuccessResult
from documents.data_models import DocumentMetadataOverrides
from documents.double_sided import CollatePlugin
from documents.file_handling import create_source_path_directory
@@ -121,6 +125,11 @@ def consume_file(
self: Task,
input_doc: ConsumableDocument,
overrides: DocumentMetadataOverrides | None = None,
) -> (
ConsumeFileSuccessResult
| ConsumeFileStoppedResult
| ConsumeFileDuplicateResult
| None
):
token = consume_task_id.set((self.request.id or "")[:8])
try:
@@ -153,6 +162,7 @@ def consume_file(
TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir,
):
tmp_dir = Path(tmp_dir)
msg = None
for plugin_class in plugins:
plugin_name = plugin_class.NAME
@@ -183,7 +193,14 @@ def consume_file(
except StopConsumeTaskError as e:
logger.info(f"{plugin_name} requested task exit: {e.message}")
return e.message
return ConsumeFileStoppedResult(reason=e.message)
except ConsumeFileDuplicateError as e:
logger.info(f"{plugin_name} rejected duplicate: {e}")
return ConsumeFileDuplicateResult(
duplicate_of=e.duplicate_id,
duplicate_in_trash=e.in_trash,
)
except Exception as e:
logger.exception(f"{plugin_name} failed: {e}")
-1
View File
@@ -78,5 +78,4 @@ class PaperlessTaskFactory(DjangoModelFactory):
status = PaperlessTask.Status.PENDING
input_data = factory.LazyFunction(dict)
result_data = None
result_message = None
acknowledged = False
+3 -3
View File
@@ -299,7 +299,7 @@ class TestSystemStatus(APITestCase):
task_type=PaperlessTask.TaskType.TRAIN_CLASSIFIER,
trigger_source=PaperlessTask.TriggerSource.SCHEDULED,
status=PaperlessTask.Status.FAILURE,
result_message="Classifier training failed",
result_data={"error_message": "Classifier training failed"},
)
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
@@ -360,7 +360,7 @@ class TestSystemStatus(APITestCase):
task_type=PaperlessTask.TaskType.SANITY_CHECK,
trigger_source=PaperlessTask.TriggerSource.SCHEDULED,
status=PaperlessTask.Status.FAILURE,
result_message="5 issues found.",
result_data={"error_message": "5 issues found."},
)
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
@@ -429,7 +429,7 @@ class TestSystemStatus(APITestCase):
task_type=PaperlessTask.TaskType.LLM_INDEX,
trigger_source=PaperlessTask.TriggerSource.SCHEDULED,
status=PaperlessTask.Status.FAILURE,
result_message="AI index update failed",
result_data={"error_message": "AI index update failed"},
)
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
+2 -2
View File
@@ -632,7 +632,7 @@ class TestBarcodeNewConsume(
),
overrides,
),
"Barcode splitting complete!",
{"reason": "Barcode splitting complete!"},
)
# 2 new document consume tasks created
self.assertEqual(self.consume_file_mock.call_count, 2)
@@ -1049,7 +1049,7 @@ class TestTagBarcode(DirectoriesMixin, SampleDirMixin, GetReaderPluginMixin, Tes
None,
)
self.assertEqual(result, "Barcode splitting complete!")
self.assertEqual(result, {"reason": "Barcode splitting complete!"})
documents = Document.objects.all().order_by("id")
self.assertEqual(documents.count(), 3)
+4 -4
View File
@@ -83,7 +83,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
dt.datetime.now(),
delta=dt.timedelta(seconds=5),
)
self.assertIn("Received odd numbered pages", msg)
self.assertIn("Received odd numbered pages", msg["reason"])
def test_collation(self) -> None:
"""
@@ -129,7 +129,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
)
msg = self.consume_file("double-sided-odd.pdf")
self.assertIsFile(self.staging_file)
self.assertIn("Received odd numbered pages", msg)
self.assertIn("Received odd numbered pages", msg["reason"])
def test_less_odd_pages_then_even_fails(self) -> None:
"""
@@ -212,7 +212,7 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
"""
msg = self.consume_file("simple.pdf", Path("..") / "simple.pdf")
self.assertIsNotFile(self.staging_file)
self.assertRegex(msg, r"Success. New document id \d+ created")
self.assertIsInstance(msg.get("document_id"), int)
def test_subdirectory_upload(self) -> None:
"""
@@ -252,4 +252,4 @@ class TestDoubleSided(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
"""
msg = self.consume_file("simple.pdf")
self.assertIsNotFile(self.staging_file)
self.assertRegex(msg, r"Success. New document id \d+ created")
self.assertIsInstance(msg.get("document_id"), int)
+19 -11
View File
@@ -238,30 +238,38 @@ class TestTaskPostrunHandler:
task.refresh_from_db()
assert task.status == PaperlessTask.Status.STARTED
def test_parses_legacy_new_document_string(self):
task = self._started_task()
def test_records_success_with_consume_result(self):
"""ConsumeFileSuccessResult dict is stored directly as result_data."""
from documents.data_models import ConsumeFileSuccessResult
task = self._started_task()
task_postrun_handler(
task_id=task.task_id,
retval="New document id 42 created",
retval=ConsumeFileSuccessResult(document_id=42),
state="SUCCESS",
)
task.refresh_from_db()
assert task.result_data["document_id"] == 42
assert task.result_message == "New document id 42 created"
assert task.result_data == {"document_id": 42}
def test_records_stopped_with_reason(self):
"""ConsumeFileStoppedResult dict is stored directly as result_data."""
from documents.data_models import ConsumeFileStoppedResult
def test_parses_duplicate_string(self):
"""Duplicate detection returns a string with SUCCESS state (StopConsumeTaskError is caught and returned, not raised)."""
task = self._started_task()
task_postrun_handler(
task_id=task.task_id,
retval="It is a duplicate of some document (#99).",
retval=ConsumeFileStoppedResult(reason="Barcode splitting complete!"),
state="SUCCESS",
)
task.refresh_from_db()
assert task.result_data["duplicate_of"] == 99
assert task.result_data["duplicate_in_trash"] is False
assert task.result_data == {"reason": "Barcode splitting complete!"}
def test_none_retval_stores_no_result_data(self):
"""None return value (non-consume tasks) leaves result_data untouched."""
task = self._started_task()
task_postrun_handler(task_id=task.task_id, retval=None, state="SUCCESS")
task.refresh_from_db()
assert task.result_data is None
def test_ignores_unknown_task_id(self):
+15 -3
View File
@@ -4685,7 +4685,11 @@ class SystemStatusView(PassUserMixin):
classifier_error = "No classifier training tasks found"
elif last_trained_task.status != PaperlessTask.Status.SUCCESS:
classifier_status = "ERROR"
classifier_error = last_trained_task.result_message
classifier_error = (
last_trained_task.result_data.get("error_message")
if last_trained_task.result_data
else None
)
classifier_last_trained = (
last_trained_task.date_done if last_trained_task else None
)
@@ -4705,7 +4709,11 @@ class SystemStatusView(PassUserMixin):
sanity_check_error = "No sanity check tasks found"
elif last_sanity_check.status != PaperlessTask.Status.SUCCESS:
sanity_check_status = "ERROR"
sanity_check_error = last_sanity_check.result_message
sanity_check_error = (
last_sanity_check.result_data.get("error_message")
if last_sanity_check.result_data
else None
)
sanity_check_last_run = (
last_sanity_check.date_done if last_sanity_check else None
)
@@ -4730,7 +4738,11 @@ class SystemStatusView(PassUserMixin):
llmindex_error = "No LLM index update tasks found"
elif last_llmindex_update.status == PaperlessTask.Status.FAILURE:
llmindex_status = "ERROR"
llmindex_error = last_llmindex_update.result_message
llmindex_error = (
last_llmindex_update.result_data.get("error_message")
if last_llmindex_update.result_data
else None
)
llmindex_last_modified = (
last_llmindex_update.date_done if last_llmindex_update else None
)
+1 -1
View File
@@ -239,7 +239,7 @@ def mailbox_login(mailbox: MailBox, account: MailAccount) -> None:
@shared_task
def apply_mail_action(
result: list[str],
result: list,
rule_id: int,
message_uid: str,
message_subject: str,