Feature: Update consumer logging to include task ID for log correlation (#12510)

This commit is contained in:
Trenton H
2026-04-03 13:31:40 -07:00
committed by GitHub
parent 64debc87a5
commit f32ad98d8e
4 changed files with 136 additions and 60 deletions

View File

@@ -61,6 +61,7 @@ from documents.utils import compute_checksum
from documents.utils import identity
from documents.workflows.utils import get_workflows_for_trigger
from paperless.config import AIConfig
from paperless.logging import consume_task_id
from paperless.parsers import ParserContext
from paperless.parsers.registry import get_parser_registry
from paperless_ai.indexing import llm_index_add_or_update_document
@@ -147,76 +148,85 @@ def consume_file(
input_doc: ConsumableDocument,
overrides: DocumentMetadataOverrides | None = None,
):
# Default no overrides
if overrides is None:
overrides = DocumentMetadataOverrides()
token = consume_task_id.set((self.request.id or "")[:8])
try:
# Default no overrides
if overrides is None:
overrides = DocumentMetadataOverrides()
plugins: list[type[ConsumeTaskPlugin]] = (
[
ConsumerPreflightPlugin,
ConsumerPlugin,
]
if input_doc.root_document_id is not None
else [
ConsumerPreflightPlugin,
AsnCheckPlugin,
CollatePlugin,
BarcodePlugin,
AsnCheckPlugin, # Re-run ASN check after barcode reading
WorkflowTriggerPlugin,
ConsumerPlugin,
]
)
plugins: list[type[ConsumeTaskPlugin]] = (
[
ConsumerPreflightPlugin,
ConsumerPlugin,
]
if input_doc.root_document_id is not None
else [
ConsumerPreflightPlugin,
AsnCheckPlugin,
CollatePlugin,
BarcodePlugin,
AsnCheckPlugin, # Re-run ASN check after barcode reading
WorkflowTriggerPlugin,
ConsumerPlugin,
]
)
with (
ProgressManager(
overrides.filename or input_doc.original_file.name,
self.request.id,
) as status_mgr,
TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir,
):
tmp_dir = Path(tmp_dir)
for plugin_class in plugins:
plugin_name = plugin_class.NAME
plugin = plugin_class(
input_doc,
overrides,
status_mgr,
tmp_dir,
with (
ProgressManager(
overrides.filename or input_doc.original_file.name,
self.request.id,
)
) as status_mgr,
TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir,
):
tmp_dir = Path(tmp_dir)
for plugin_class in plugins:
plugin_name = plugin_class.NAME
if not plugin.able_to_run:
logger.debug(f"Skipping plugin {plugin_name}")
continue
plugin = plugin_class(
input_doc,
overrides,
status_mgr,
tmp_dir,
self.request.id,
)
try:
logger.debug(f"Executing plugin {plugin_name}")
plugin.setup()
if not plugin.able_to_run:
logger.debug(f"Skipping plugin {plugin_name}")
continue
msg = plugin.run()
try:
logger.debug(f"Executing plugin {plugin_name}")
plugin.setup()
if msg is not None:
logger.info(f"{plugin_name} completed with: {msg}")
else:
logger.info(f"{plugin_name} completed with no message")
msg = plugin.run()
overrides = plugin.metadata
if msg is not None:
logger.info(f"{plugin_name} completed with: {msg}")
else:
logger.info(f"{plugin_name} completed with no message")
except StopConsumeTaskError as e:
logger.info(f"{plugin_name} requested task exit: {e.message}")
return e.message
overrides = plugin.metadata
except Exception as e:
logger.exception(f"{plugin_name} failed: {e}")
status_mgr.send_progress(ProgressStatusOptions.FAILED, f"{e}", 100, 100)
raise
except StopConsumeTaskError as e:
logger.info(f"{plugin_name} requested task exit: {e.message}")
return e.message
finally:
plugin.cleanup()
except Exception as e:
logger.exception(f"{plugin_name} failed: {e}")
status_mgr.send_progress(
ProgressStatusOptions.FAILED,
f"{e}",
100,
100,
)
raise
return msg
finally:
plugin.cleanup()
return msg
finally:
consume_task_id.reset(token)
@shared_task

33
src/paperless/logging.py Normal file
View File

@@ -0,0 +1,33 @@
from __future__ import annotations
import logging
from contextvars import ContextVar
consume_task_id: ContextVar[str] = ContextVar("consume_task_id", default="")
class ConsumeTaskFormatter(logging.Formatter):
"""
Logging formatter that prepends a short task correlation ID to messages
emitted during document consumption.
The ID is the first 8 characters of the Celery task UUID, set via the
``consume_task_id`` ContextVar at the entry of ``consume_file``. When
the ContextVar is empty (any log outside a consume task) no prefix is
added and the output is identical to the standard verbose format.
"""
def __init__(self) -> None:
super().__init__(
fmt="[{asctime}] [{levelname}] [{name}] {task_prefix}{message}",
style="{",
validate=False, # {task_prefix} is not a standard LogRecord attribute, so Python's
# init-time format-string validation would raise ValueError without
# this. Runtime safety comes from format() always setting
# record.task_prefix before calling super().format().
)
def format(self, record: logging.LogRecord) -> str:
task_id = consume_task_id.get()
record.task_prefix = f"[{task_id}] " if task_id else ""
return super().format(record)

View File

@@ -592,8 +592,7 @@ LOGGING = {
"disable_existing_loggers": False,
"formatters": {
"verbose": {
"format": "[{asctime}] [{levelname}] [{name}] {message}",
"style": "{",
"()": "paperless.logging.ConsumeTaskFormatter",
},
"simple": {
"format": "{levelname} {message}",

View File

@@ -0,0 +1,34 @@
import logging
from paperless.logging import ConsumeTaskFormatter
from paperless.logging import consume_task_id
def _make_record(msg: str = "Test message") -> logging.LogRecord:
return logging.LogRecord(
name="paperless.consumer",
level=logging.INFO,
pathname="",
lineno=0,
msg=msg,
args=(),
exc_info=None,
)
def test_formatter_includes_task_id_when_set():
token = consume_task_id.set("a8098c1a")
try:
formatter = ConsumeTaskFormatter()
output = formatter.format(_make_record())
assert "[a8098c1a] Test message" in output
finally:
consume_task_id.reset(token)
def test_formatter_omits_prefix_when_no_task_id():
# ContextVar default is "" — no task active
formatter = ConsumeTaskFormatter()
output = formatter.format(_make_record())
assert "[] " not in output
assert "Test message" in output