diff --git a/src/documents/tasks.py b/src/documents/tasks.py index ae65a5fbe..c40b1ff3f 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -61,6 +61,7 @@ from documents.utils import compute_checksum from documents.utils import identity from documents.workflows.utils import get_workflows_for_trigger from paperless.config import AIConfig +from paperless.logging import consume_task_id from paperless.parsers import ParserContext from paperless.parsers.registry import get_parser_registry from paperless_ai.indexing import llm_index_add_or_update_document @@ -147,76 +148,85 @@ def consume_file( input_doc: ConsumableDocument, overrides: DocumentMetadataOverrides | None = None, ): - # Default no overrides - if overrides is None: - overrides = DocumentMetadataOverrides() + token = consume_task_id.set((self.request.id or "")[:8]) + try: + # Default no overrides + if overrides is None: + overrides = DocumentMetadataOverrides() - plugins: list[type[ConsumeTaskPlugin]] = ( - [ - ConsumerPreflightPlugin, - ConsumerPlugin, - ] - if input_doc.root_document_id is not None - else [ - ConsumerPreflightPlugin, - AsnCheckPlugin, - CollatePlugin, - BarcodePlugin, - AsnCheckPlugin, # Re-run ASN check after barcode reading - WorkflowTriggerPlugin, - ConsumerPlugin, - ] - ) + plugins: list[type[ConsumeTaskPlugin]] = ( + [ + ConsumerPreflightPlugin, + ConsumerPlugin, + ] + if input_doc.root_document_id is not None + else [ + ConsumerPreflightPlugin, + AsnCheckPlugin, + CollatePlugin, + BarcodePlugin, + AsnCheckPlugin, # Re-run ASN check after barcode reading + WorkflowTriggerPlugin, + ConsumerPlugin, + ] + ) - with ( - ProgressManager( - overrides.filename or input_doc.original_file.name, - self.request.id, - ) as status_mgr, - TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir, - ): - tmp_dir = Path(tmp_dir) - for plugin_class in plugins: - plugin_name = plugin_class.NAME - - plugin = plugin_class( - input_doc, - overrides, - status_mgr, - tmp_dir, + with ( + ProgressManager( + overrides.filename or input_doc.original_file.name, self.request.id, - ) + ) as status_mgr, + TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir, + ): + tmp_dir = Path(tmp_dir) + for plugin_class in plugins: + plugin_name = plugin_class.NAME - if not plugin.able_to_run: - logger.debug(f"Skipping plugin {plugin_name}") - continue + plugin = plugin_class( + input_doc, + overrides, + status_mgr, + tmp_dir, + self.request.id, + ) - try: - logger.debug(f"Executing plugin {plugin_name}") - plugin.setup() + if not plugin.able_to_run: + logger.debug(f"Skipping plugin {plugin_name}") + continue - msg = plugin.run() + try: + logger.debug(f"Executing plugin {plugin_name}") + plugin.setup() - if msg is not None: - logger.info(f"{plugin_name} completed with: {msg}") - else: - logger.info(f"{plugin_name} completed with no message") + msg = plugin.run() - overrides = plugin.metadata + if msg is not None: + logger.info(f"{plugin_name} completed with: {msg}") + else: + logger.info(f"{plugin_name} completed with no message") - except StopConsumeTaskError as e: - logger.info(f"{plugin_name} requested task exit: {e.message}") - return e.message + overrides = plugin.metadata - except Exception as e: - logger.exception(f"{plugin_name} failed: {e}") - status_mgr.send_progress(ProgressStatusOptions.FAILED, f"{e}", 100, 100) - raise + except StopConsumeTaskError as e: + logger.info(f"{plugin_name} requested task exit: {e.message}") + return e.message - finally: - plugin.cleanup() + except Exception as e: + logger.exception(f"{plugin_name} failed: {e}") + status_mgr.send_progress( + ProgressStatusOptions.FAILED, + f"{e}", + 100, + 100, + ) + raise - return msg + finally: + plugin.cleanup() + + return msg + finally: + consume_task_id.reset(token) @shared_task diff --git a/src/paperless/logging.py b/src/paperless/logging.py new file mode 100644 index 000000000..ce2eff4fc --- /dev/null +++ b/src/paperless/logging.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import logging +from contextvars import ContextVar + +consume_task_id: ContextVar[str] = ContextVar("consume_task_id", default="") + + +class ConsumeTaskFormatter(logging.Formatter): + """ + Logging formatter that prepends a short task correlation ID to messages + emitted during document consumption. + + The ID is the first 8 characters of the Celery task UUID, set via the + ``consume_task_id`` ContextVar at the entry of ``consume_file``. When + the ContextVar is empty (any log outside a consume task) no prefix is + added and the output is identical to the standard verbose format. + """ + + def __init__(self) -> None: + super().__init__( + fmt="[{asctime}] [{levelname}] [{name}] {task_prefix}{message}", + style="{", + validate=False, # {task_prefix} is not a standard LogRecord attribute, so Python's + # init-time format-string validation would raise ValueError without + # this. Runtime safety comes from format() always setting + # record.task_prefix before calling super().format(). + ) + + def format(self, record: logging.LogRecord) -> str: + task_id = consume_task_id.get() + record.task_prefix = f"[{task_id}] " if task_id else "" + return super().format(record) diff --git a/src/paperless/settings/__init__.py b/src/paperless/settings/__init__.py index 964295020..fac1391b3 100644 --- a/src/paperless/settings/__init__.py +++ b/src/paperless/settings/__init__.py @@ -592,8 +592,7 @@ LOGGING = { "disable_existing_loggers": False, "formatters": { "verbose": { - "format": "[{asctime}] [{levelname}] [{name}] {message}", - "style": "{", + "()": "paperless.logging.ConsumeTaskFormatter", }, "simple": { "format": "{levelname} {message}", diff --git a/src/paperless/tests/test_logging.py b/src/paperless/tests/test_logging.py new file mode 100644 index 000000000..dbd36c7d0 --- /dev/null +++ b/src/paperless/tests/test_logging.py @@ -0,0 +1,34 @@ +import logging + +from paperless.logging import ConsumeTaskFormatter +from paperless.logging import consume_task_id + + +def _make_record(msg: str = "Test message") -> logging.LogRecord: + return logging.LogRecord( + name="paperless.consumer", + level=logging.INFO, + pathname="", + lineno=0, + msg=msg, + args=(), + exc_info=None, + ) + + +def test_formatter_includes_task_id_when_set(): + token = consume_task_id.set("a8098c1a") + try: + formatter = ConsumeTaskFormatter() + output = formatter.format(_make_record()) + assert "[a8098c1a] Test message" in output + finally: + consume_task_id.reset(token) + + +def test_formatter_omits_prefix_when_no_task_id(): + # ContextVar default is "" — no task active + formatter = ConsumeTaskFormatter() + output = formatter.format(_make_record()) + assert "[] " not in output + assert "Test message" in output