Chore: Converts all call sites and test asserts to use apply_async and headers (#12591)

This commit is contained in:
Trenton H
2026-04-20 11:40:04 -07:00
committed by GitHub
parent 733d873e34
commit fbf4e32646
24 changed files with 465 additions and 425 deletions
+94 -52
View File
@@ -22,6 +22,7 @@ from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import PaperlessTask
from documents.models import StoragePath
from documents.models import Tag
from documents.permissions import set_permissions_for_object
@@ -113,7 +114,10 @@ def set_correspondent(
affected_docs = list(qs.values_list("pk", flat=True))
qs.update(correspondent=correspondent)
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -132,8 +136,9 @@ def set_storage_path(doc_ids: list[int], storage_path: StoragePath) -> Literal["
affected_docs = list(qs.values_list("pk", flat=True))
qs.update(storage_path=storage_path)
bulk_update_documents.delay(
document_ids=affected_docs,
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -151,7 +156,10 @@ def set_document_type(doc_ids: list[int], document_type: DocumentType) -> Litera
affected_docs = list(qs.values_list("pk", flat=True))
qs.update(document_type=document_type)
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -177,7 +185,10 @@ def add_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
DocumentTagRelationship.objects.bulk_create(to_create)
if affected_docs:
bulk_update_documents.delay(document_ids=list(affected_docs))
bulk_update_documents.apply_async(
kwargs={"document_ids": list(affected_docs)},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -195,7 +206,10 @@ def remove_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
qs.delete()
if affected_docs:
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -254,7 +268,10 @@ def modify_tags(
)
if affected_docs:
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
except Exception as e:
logger.error(f"Error modifying tags: {e}")
return "ERROR"
@@ -326,7 +343,10 @@ def modify_custom_fields(
field_id__in=remove_custom_fields,
).hard_delete()
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -369,8 +389,9 @@ def delete(doc_ids: list[int]) -> Literal["OK"]:
def reprocess(doc_ids: list[int]) -> Literal["OK"]:
for document_id in doc_ids:
update_document_content_maybe_archive_file.delay(
document_id=document_id,
update_document_content_maybe_archive_file.apply_async(
kwargs={"document_id": document_id},
headers={"trigger_source": PaperlessTask.TriggerSource.MANUAL},
)
return "OK"
@@ -396,7 +417,10 @@ def set_permissions(
affected_docs = list(qs.values_list("pk", flat=True))
bulk_update_documents.delay(document_ids=affected_docs)
bulk_update_documents.apply_async(
kwargs={"document_ids": affected_docs},
headers={"trigger_source": PaperlessTask.TriggerSource.SYSTEM},
)
return "OK"
@@ -407,6 +431,7 @@ def rotate(
*,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
logger.info(
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
@@ -453,13 +478,16 @@ def rotate(
if user is not None:
overrides.actor_id = user.id
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
overrides,
consume_file.apply_async(
kwargs={
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
logger.info(
f"Queued new rotated version for document {root_doc.id} by {degrees} degrees",
@@ -478,6 +506,7 @@ def merge(
archive_fallback: bool = False,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
logger.info(
f"Attempting to merge {len(doc_ids)} documents into a single document.",
@@ -556,12 +585,12 @@ def merge(
logger.info("Adding merged document to the task queue.")
consume_task = consume_file.s(
ConsumableDocument(
input_doc=ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
overrides,
)
overrides=overrides,
).set(headers={"trigger_source": trigger_source})
if delete_originals:
backup = release_archive_serial_numbers(affected_docs)
@@ -577,7 +606,7 @@ def merge(
restore_archive_serial_numbers(backup)
raise
else:
consume_task.delay()
consume_task.apply_async()
return "OK"
@@ -589,6 +618,7 @@ def split(
delete_originals: bool = False,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
logger.info(
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
@@ -631,12 +661,12 @@ def split(
)
consume_tasks.append(
consume_file.s(
ConsumableDocument(
input_doc=ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
overrides,
),
overrides=overrides,
).set(headers={"trigger_source": trigger_source}),
)
if delete_originals:
@@ -669,6 +699,7 @@ def delete_pages(
*,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
logger.info(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
@@ -698,13 +729,16 @@ def delete_pages(
overrides = DocumentMetadataOverrides().from_document(root_doc)
if user is not None:
overrides.actor_id = user.id
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
overrides,
consume_file.apply_async(
kwargs={
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
logger.info(
f"Queued new version for document {root_doc.id} after deleting pages {pages}",
@@ -724,6 +758,7 @@ def edit_pdf(
include_metadata: bool = True,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
"""
Operations is a list of dictionaries describing the final PDF pages.
@@ -781,13 +816,16 @@ def edit_pdf(
if user is not None:
overrides.owner_id = user.id
overrides.actor_id = user.id
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
overrides,
consume_file.apply_async(
kwargs={
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
else:
consume_tasks = []
@@ -812,12 +850,12 @@ def edit_pdf(
pdf.save(version_filepath)
consume_tasks.append(
consume_file.s(
ConsumableDocument(
input_doc=ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=version_filepath,
),
overrides,
),
overrides=overrides,
).set(headers={"trigger_source": trigger_source}),
)
if delete_original:
@@ -853,6 +891,7 @@ def remove_password(
include_metadata: bool = True,
source_mode: SourceMode = SourceModeChoices.LATEST_VERSION,
user: User | None = None,
trigger_source: PaperlessTask.TriggerSource = PaperlessTask.TriggerSource.WEB_UI,
) -> Literal["OK"]:
"""
Remove password protection from PDF documents.
@@ -887,13 +926,16 @@ def remove_password(
if user is not None:
overrides.owner_id = user.id
overrides.actor_id = user.id
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
overrides,
consume_file.apply_async(
kwargs={
"input_doc": ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
"overrides": overrides,
},
headers={"trigger_source": trigger_source},
)
else:
consume_tasks = []
@@ -908,12 +950,12 @@ def remove_password(
consume_tasks.append(
consume_file.s(
ConsumableDocument(
input_doc=ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
overrides,
),
overrides=overrides,
).set(headers={"trigger_source": trigger_source}),
)
if delete_original: