Compare commits

..

1 Commits

Author SHA1 Message Date
Trenton H
d3bd374e83 Instead of manual temporary directory management, use a context manager 2026-03-25 08:10:01 -07:00
2 changed files with 240 additions and 250 deletions

View File

@@ -1,7 +1,6 @@
import datetime
import hashlib
import os
import shutil
import tempfile
from enum import StrEnum
from pathlib import Path
@@ -339,18 +338,15 @@ class ConsumerPlugin(
Return the document object if it was successfully created.
"""
tempdir = None
# Preflight has already run including progress update to 0%
self.log.info(f"Consuming {self.filename}")
try:
# Preflight has already run including progress update to 0%
self.log.info(f"Consuming {self.filename}")
# For the actual work, copy the file into a tempdir
tempdir = tempfile.TemporaryDirectory(
prefix="paperless-ngx",
dir=settings.SCRATCH_DIR,
)
self.working_copy = Path(tempdir.name) / Path(self.filename)
# For the actual work, copy the file into a tempdir
with tempfile.TemporaryDirectory(
prefix="paperless-ngx",
dir=settings.SCRATCH_DIR,
) as tmpdir:
self.working_copy = Path(tmpdir) / Path(self.filename)
copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy)
self.unmodified_original = None
@@ -382,7 +378,7 @@ class ConsumerPlugin(
self.log.debug(f"Detected mime type after qpdf: {mime_type}")
# Save the original file for later
self.unmodified_original = (
Path(tempdir.name) / Path("uo") / Path(self.filename)
Path(tmpdir) / Path("uo") / Path(self.filename)
)
self.unmodified_original.parent.mkdir(exist_ok=True)
copy_file_with_basic_stats(
@@ -401,7 +397,6 @@ class ConsumerPlugin(
)
)
if not parser_class:
tempdir.cleanup()
self._fail(
ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
f"Unsupported mime type {mime_type}",
@@ -416,281 +411,276 @@ class ConsumerPlugin(
)
self.run_pre_consume_script()
except:
if tempdir:
tempdir.cleanup()
raise
# This doesn't parse the document yet, but gives us a parser.
with parser_class() as document_parser:
document_parser.configure(
ParserContext(mailrule_id=self.input_doc.mailrule_id),
)
self.log.debug(f"Parser: {document_parser.name} v{document_parser.version}")
# Parse the document. This may take some time.
text = None
date = None
thumbnail = None
archive_path = None
page_count = None
try:
self._send_progress(
20,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.PARSING_DOCUMENT,
# This doesn't parse the document yet, but gives us a parser.
with parser_class() as document_parser:
document_parser.configure(
ParserContext(mailrule_id=self.input_doc.mailrule_id),
)
self.log.debug(f"Parsing {self.filename}...")
document_parser.parse(self.working_copy, mime_type)
self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(
70,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
self.log.debug(
f"Parser: {document_parser.name} v{document_parser.version}",
)
thumbnail = document_parser.get_thumbnail(self.working_copy, mime_type)
text = document_parser.get_text()
date = document_parser.get_date()
if date is None:
# Parse the document. This may take some time.
text = None
date = None
thumbnail = None
archive_path = None
page_count = None
try:
self._send_progress(
90,
20,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.PARSE_DATE,
ConsumerStatusShortMessage.PARSING_DOCUMENT,
)
with get_date_parser() as date_parser:
date = next(date_parser.parse(self.filename, text), None)
archive_path = document_parser.get_archive_path()
page_count = document_parser.get_page_count(
self.working_copy,
mime_type,
)
self.log.debug(f"Parsing {self.filename}...")
except ParseError as e:
if tempdir:
tempdir.cleanup()
self._fail(
str(e),
f"Error occurred while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
except Exception as e:
if tempdir:
tempdir.cleanup()
self._fail(
str(e),
f"Unexpected error while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
document_parser.parse(self.working_copy, mime_type)
# Prepare the document classifier.
self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(
70,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
)
thumbnail = document_parser.get_thumbnail(
self.working_copy,
mime_type,
)
# TODO: I don't really like to do this here, but this way we avoid
# reloading the classifier multiple times, since there are multiple
# post-consume hooks that all require the classifier.
classifier = load_classifier()
self._send_progress(
95,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.SAVE_DOCUMENT,
)
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
text = document_parser.get_text()
date = document_parser.get_date()
if date is None:
self._send_progress(
90,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.PARSE_DATE,
)
original_document = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,
mime_type=mime_type,
)
actor = None
with get_date_parser() as date_parser:
date = next(date_parser.parse(self.filename, text), None)
archive_path = document_parser.get_archive_path()
page_count = document_parser.get_page_count(
self.working_copy,
mime_type,
)
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
if (
settings.AUDIT_LOG_ENABLED
and self.metadata.actor_id is not None
):
actor = User.objects.filter(
pk=self.metadata.actor_id,
).first()
if actor is not None:
from auditlog.context import ( # type: ignore[import-untyped]
set_actor,
)
except ParseError as e:
self._fail(
str(e),
f"Error occurred while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
except Exception as e:
self._fail(
str(e),
f"Unexpected error while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
with set_actor(actor):
# Prepare the document classifier.
# TODO: I don't really like to do this here, but this way we avoid
# reloading the classifier multiple times, since there are multiple
# post-consume hooks that all require the classifier.
classifier = load_classifier()
self._send_progress(
95,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.SAVE_DOCUMENT,
)
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
)
original_document = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,
mime_type=mime_type,
)
actor = None
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
if (
settings.AUDIT_LOG_ENABLED
and self.metadata.actor_id is not None
):
actor = User.objects.filter(
pk=self.metadata.actor_id,
).first()
if actor is not None:
from auditlog.context import ( # type: ignore[import-untyped]
set_actor,
)
with set_actor(actor):
original_document.save()
else:
original_document.save()
else:
original_document.save()
# Create a log entry for the version addition, if enabled
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import ( # type: ignore[import-untyped]
LogEntry,
)
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Added": ["None", original_document.id],
},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
},
)
document = original_document
else:
original_document.save()
# Create a log entry for the version addition, if enabled
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import ( # type: ignore[import-untyped]
LogEntry,
document = self._store(
text=text,
date=date,
page_count=page_count,
mime_type=mime_type,
)
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Added": ["None", original_document.id],
},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
},
)
document = original_document
else:
document = self._store(
text=text,
date=date,
page_count=page_count,
mime_type=mime_type,
)
# If we get here, it was successful. Proceed with post-consume
# hooks. If they fail, nothing will get changed.
# If we get here, it was successful. Proceed with post-consume
# hooks. If they fail, nothing will get changed.
document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group,
classifier=classifier,
original_file=self.unmodified_original
if self.unmodified_original
else self.working_copy,
)
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
document,
use_format=False,
)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group,
classifier=classifier,
original_file=self.unmodified_original
if self.unmodified_original
else self.working_copy,
document.source_path,
)
self._write(
thumbnail,
document.thumbnail_path,
)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
)
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_archive_filename))
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
generated_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
archive_path,
document.archive_path,
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy,
document.source_path,
)
with Path(archive_path).open("rb") as f:
document.archive_checksum = hashlib.md5(
f.read(),
).hexdigest()
self._write(
thumbnail,
document.thumbnail_path,
)
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
if archive_path and Path(archive_path).is_file():
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
)
if (
len(str(generated_archive_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
self._write(
archive_path,
document.archive_path,
)
if document.root_document_id:
document_updated.send(
sender=self.__class__,
document=document.root_document,
)
with Path(archive_path).open("rb") as f:
document.archive_checksum = hashlib.md5(
f.read(),
).hexdigest()
# Delete the file only if it was successfully consumed
self.log.debug(
f"Deleting original file {self.input_doc.original_file}",
)
self.input_doc.original_file.unlink()
self.log.debug(f"Deleting working copy {self.working_copy}")
self.working_copy.unlink()
if self.unmodified_original is not None: # pragma: no cover
# Don't save with the lock active. Saving will cause the file
# renaming logic to acquire the lock as well.
# This triggers things like file renaming
document.save()
if document.root_document_id:
document_updated.send(
sender=self.__class__,
document=document.root_document,
)
# Delete the file only if it was successfully consumed
self.log.debug(
f"Deleting unmodified original file {self.unmodified_original}",
f"Deleting original file {self.input_doc.original_file}",
)
self.unmodified_original.unlink()
self.input_doc.original_file.unlink()
self.log.debug(f"Deleting working copy {self.working_copy}")
self.working_copy.unlink()
if self.unmodified_original is not None: # pragma: no cover
self.log.debug(
f"Deleting unmodified original file {self.unmodified_original}",
)
self.unmodified_original.unlink()
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
shadow_file = (
Path(self.input_doc.original_file).parent
/ f"._{Path(self.input_doc.original_file).name}"
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
shadow_file = (
Path(self.input_doc.original_file).parent
/ f"._{Path(self.input_doc.original_file).name}"
)
if Path(shadow_file).is_file():
self.log.debug(f"Deleting shadow file {shadow_file}")
Path(shadow_file).unlink()
except Exception as e:
self._fail(
str(e),
f"The following error occurred while storing document "
f"{self.filename} after parsing: {e}",
exc_info=True,
exception=e,
)
if Path(shadow_file).is_file():
self.log.debug(f"Deleting shadow file {shadow_file}")
Path(shadow_file).unlink()
except Exception as e:
self._fail(
str(e),
f"The following error occurred while storing document "
f"{self.filename} after parsing: {e}",
exc_info=True,
exception=e,
)
finally:
tempdir.cleanup()
self.run_post_consume_script(document)
self.log.info(f"Document {document} consumption finished")
@@ -834,7 +824,7 @@ class ConsumerPlugin(
self.metadata.view_users is not None
or self.metadata.view_groups is not None
or self.metadata.change_users is not None
or self.metadata.change_groups is not None
or self.metadata.change_users is not None
):
permissions = {
"view": {
@@ -867,7 +857,7 @@ class ConsumerPlugin(
Path(source).open("rb") as read_file,
Path(target).open("wb") as write_file,
):
shutil.copyfileobj(read_file, write_file)
write_file.write(read_file.read())
# Attempt to copy file's original stats, but it's ok if we can't
try:

View File

@@ -53,8 +53,8 @@ from documents.models import Tag
from documents.models import WorkflowRun
from documents.models import WorkflowTrigger
from documents.plugins.base import ConsumeTaskPlugin
from documents.plugins.base import ProgressManager
from documents.plugins.base import StopConsumeTaskError
from documents.plugins.helpers import ProgressManager
from documents.plugins.helpers import ProgressStatusOptions
from documents.sanity_checker import SanityCheckFailedException
from documents.signals import document_updated
@@ -533,13 +533,13 @@ def check_scheduled_workflows() -> None:
id__in=matched_ids,
)
if documents.exists():
if documents.count() > 0:
documents = prefilter_documents_by_workflowtrigger(
documents,
trigger,
)
if documents.exists():
if documents.count() > 0:
logger.debug(
f"Found {documents.count()} documents for trigger {trigger}",
)