Compare commits

..

1 Commits

Author SHA1 Message Date
Trenton H
d3bd374e83 Instead of manual temporary directory management, use a context manager 2026-03-25 08:10:01 -07:00

View File

@@ -338,18 +338,15 @@ class ConsumerPlugin(
Return the document object if it was successfully created. Return the document object if it was successfully created.
""" """
tempdir = None # Preflight has already run including progress update to 0%
self.log.info(f"Consuming {self.filename}")
try: # For the actual work, copy the file into a tempdir
# Preflight has already run including progress update to 0% with tempfile.TemporaryDirectory(
self.log.info(f"Consuming {self.filename}") prefix="paperless-ngx",
dir=settings.SCRATCH_DIR,
# For the actual work, copy the file into a tempdir ) as tmpdir:
tempdir = tempfile.TemporaryDirectory( self.working_copy = Path(tmpdir) / Path(self.filename)
prefix="paperless-ngx",
dir=settings.SCRATCH_DIR,
)
self.working_copy = Path(tempdir.name) / Path(self.filename)
copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy) copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy)
self.unmodified_original = None self.unmodified_original = None
@@ -381,7 +378,7 @@ class ConsumerPlugin(
self.log.debug(f"Detected mime type after qpdf: {mime_type}") self.log.debug(f"Detected mime type after qpdf: {mime_type}")
# Save the original file for later # Save the original file for later
self.unmodified_original = ( self.unmodified_original = (
Path(tempdir.name) / Path("uo") / Path(self.filename) Path(tmpdir) / Path("uo") / Path(self.filename)
) )
self.unmodified_original.parent.mkdir(exist_ok=True) self.unmodified_original.parent.mkdir(exist_ok=True)
copy_file_with_basic_stats( copy_file_with_basic_stats(
@@ -400,7 +397,6 @@ class ConsumerPlugin(
) )
) )
if not parser_class: if not parser_class:
tempdir.cleanup()
self._fail( self._fail(
ConsumerStatusShortMessage.UNSUPPORTED_TYPE, ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
f"Unsupported mime type {mime_type}", f"Unsupported mime type {mime_type}",
@@ -415,281 +411,276 @@ class ConsumerPlugin(
) )
self.run_pre_consume_script() self.run_pre_consume_script()
except:
if tempdir:
tempdir.cleanup()
raise
# This doesn't parse the document yet, but gives us a parser. # This doesn't parse the document yet, but gives us a parser.
with parser_class() as document_parser: with parser_class() as document_parser:
document_parser.configure( document_parser.configure(
ParserContext(mailrule_id=self.input_doc.mailrule_id), ParserContext(mailrule_id=self.input_doc.mailrule_id),
)
self.log.debug(f"Parser: {document_parser.name} v{document_parser.version}")
# Parse the document. This may take some time.
text = None
date = None
thumbnail = None
archive_path = None
page_count = None
try:
self._send_progress(
20,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.PARSING_DOCUMENT,
) )
self.log.debug(f"Parsing {self.filename}...")
document_parser.parse(self.working_copy, mime_type) self.log.debug(
f"Parser: {document_parser.name} v{document_parser.version}",
self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(
70,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
) )
thumbnail = document_parser.get_thumbnail(self.working_copy, mime_type)
text = document_parser.get_text() # Parse the document. This may take some time.
date = document_parser.get_date()
if date is None: text = None
date = None
thumbnail = None
archive_path = None
page_count = None
try:
self._send_progress( self._send_progress(
90, 20,
100, 100,
ProgressStatusOptions.WORKING, ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.PARSE_DATE, ConsumerStatusShortMessage.PARSING_DOCUMENT,
) )
with get_date_parser() as date_parser: self.log.debug(f"Parsing {self.filename}...")
date = next(date_parser.parse(self.filename, text), None)
archive_path = document_parser.get_archive_path()
page_count = document_parser.get_page_count(
self.working_copy,
mime_type,
)
except ParseError as e: document_parser.parse(self.working_copy, mime_type)
if tempdir:
tempdir.cleanup()
self._fail(
str(e),
f"Error occurred while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
except Exception as e:
if tempdir:
tempdir.cleanup()
self._fail(
str(e),
f"Unexpected error while consuming document {self.filename}: {e}",
exc_info=True,
exception=e,
)
# Prepare the document classifier. self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(
70,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
)
thumbnail = document_parser.get_thumbnail(
self.working_copy,
mime_type,
)
# TODO: I don't really like to do this here, but this way we avoid text = document_parser.get_text()
# reloading the classifier multiple times, since there are multiple date = document_parser.get_date()
# post-consume hooks that all require the classifier. if date is None:
self._send_progress(
classifier = load_classifier() 90,
100,
self._send_progress( ProgressStatusOptions.WORKING,
95, ConsumerStatusShortMessage.PARSE_DATE,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.SAVE_DOCUMENT,
)
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
) )
original_document = self._create_version_from_root( with get_date_parser() as date_parser:
root_doc, date = next(date_parser.parse(self.filename, text), None)
text=text, archive_path = document_parser.get_archive_path()
page_count=page_count, page_count = document_parser.get_page_count(
mime_type=mime_type, self.working_copy,
) mime_type,
actor = None )
# Save the new version, potentially creating an audit log entry for the version addition if enabled. except ParseError as e:
if ( self._fail(
settings.AUDIT_LOG_ENABLED str(e),
and self.metadata.actor_id is not None f"Error occurred while consuming document {self.filename}: {e}",
): exc_info=True,
actor = User.objects.filter( exception=e,
pk=self.metadata.actor_id, )
).first() except Exception as e:
if actor is not None: self._fail(
from auditlog.context import ( # type: ignore[import-untyped] str(e),
set_actor, f"Unexpected error while consuming document {self.filename}: {e}",
) exc_info=True,
exception=e,
)
with set_actor(actor): # Prepare the document classifier.
# TODO: I don't really like to do this here, but this way we avoid
# reloading the classifier multiple times, since there are multiple
# post-consume hooks that all require the classifier.
classifier = load_classifier()
self._send_progress(
95,
100,
ProgressStatusOptions.WORKING,
ConsumerStatusShortMessage.SAVE_DOCUMENT,
)
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
with transaction.atomic():
# store the document.
if self.input_doc.root_document_id:
# If this is a new version of an existing document, we need
# to make sure we're not creating a new document, but updating
# the existing one.
root_doc = Document.objects.get(
pk=self.input_doc.root_document_id,
)
original_document = self._create_version_from_root(
root_doc,
text=text,
page_count=page_count,
mime_type=mime_type,
)
actor = None
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
if (
settings.AUDIT_LOG_ENABLED
and self.metadata.actor_id is not None
):
actor = User.objects.filter(
pk=self.metadata.actor_id,
).first()
if actor is not None:
from auditlog.context import ( # type: ignore[import-untyped]
set_actor,
)
with set_actor(actor):
original_document.save()
else:
original_document.save() original_document.save()
else: else:
original_document.save() original_document.save()
# Create a log entry for the version addition, if enabled
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import ( # type: ignore[import-untyped]
LogEntry,
)
LogEntry.objects.log_create(
instance=root_doc,
changes={
"Version Added": ["None", original_document.id],
},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
},
)
document = original_document
else: else:
original_document.save() document = self._store(
text=text,
# Create a log entry for the version addition, if enabled date=date,
if settings.AUDIT_LOG_ENABLED: page_count=page_count,
from auditlog.models import ( # type: ignore[import-untyped] mime_type=mime_type,
LogEntry,
) )
LogEntry.objects.log_create( # If we get here, it was successful. Proceed with post-consume
instance=root_doc, # hooks. If they fail, nothing will get changed.
changes={
"Version Added": ["None", original_document.id],
},
action=LogEntry.Action.UPDATE,
actor=actor,
additional_data={
"reason": "Version added",
"version_id": original_document.id,
},
)
document = original_document
else:
document = self._store(
text=text,
date=date,
page_count=page_count,
mime_type=mime_type,
)
# If we get here, it was successful. Proceed with post-consume document_consumption_finished.send(
# hooks. If they fail, nothing will get changed. sender=self.__class__,
document=document,
document_consumption_finished.send( logging_group=self.logging_group,
sender=self.__class__, classifier=classifier,
document=document, original_file=self.unmodified_original
logging_group=self.logging_group, if self.unmodified_original
classifier=classifier,
original_file=self.unmodified_original
if self.unmodified_original
else self.working_copy,
)
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
document,
use_format=False,
)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
self.unmodified_original
if self.unmodified_original is not None
else self.working_copy, else self.working_copy,
document.source_path,
) )
self._write( # After everything is in the database, copy the files into
thumbnail, # place. If this fails, we'll also rollback the transaction.
document.thumbnail_path, with FileLock(settings.MEDIA_LOCK):
) generated_filename = generate_unique_filename(document)
if archive_path and Path(archive_path).is_file():
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
)
if ( if (
len(str(generated_archive_filename)) len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH > Document.MAX_STORED_FILENAME_LENGTH
): ):
self.log.warning( self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming", "Generated source filename exceeds db path limit, falling back to default naming",
) )
generated_archive_filename = generate_filename( generated_filename = generate_filename(
document, document,
archive_filename=True,
use_format=False, use_format=False,
) )
document.archive_filename = generated_archive_filename document.filename = generated_filename
create_source_path_directory(document.archive_path) create_source_path_directory(document.source_path)
self._write( self._write(
archive_path, self.unmodified_original
document.archive_path, if self.unmodified_original is not None
else self.working_copy,
document.source_path,
) )
with Path(archive_path).open("rb") as f: self._write(
document.archive_checksum = hashlib.md5( thumbnail,
f.read(), document.thumbnail_path,
).hexdigest() )
# Don't save with the lock active. Saving will cause the file if archive_path and Path(archive_path).is_file():
# renaming logic to acquire the lock as well. generated_archive_filename = generate_unique_filename(
# This triggers things like file renaming document,
document.save() archive_filename=True,
)
if (
len(str(generated_archive_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
self._write(
archive_path,
document.archive_path,
)
if document.root_document_id: with Path(archive_path).open("rb") as f:
document_updated.send( document.archive_checksum = hashlib.md5(
sender=self.__class__, f.read(),
document=document.root_document, ).hexdigest()
)
# Delete the file only if it was successfully consumed # Don't save with the lock active. Saving will cause the file
self.log.debug( # renaming logic to acquire the lock as well.
f"Deleting original file {self.input_doc.original_file}", # This triggers things like file renaming
) document.save()
self.input_doc.original_file.unlink()
self.log.debug(f"Deleting working copy {self.working_copy}") if document.root_document_id:
self.working_copy.unlink() document_updated.send(
if self.unmodified_original is not None: # pragma: no cover sender=self.__class__,
document=document.root_document,
)
# Delete the file only if it was successfully consumed
self.log.debug( self.log.debug(
f"Deleting unmodified original file {self.unmodified_original}", f"Deleting original file {self.input_doc.original_file}",
) )
self.unmodified_original.unlink() self.input_doc.original_file.unlink()
self.log.debug(f"Deleting working copy {self.working_copy}")
self.working_copy.unlink()
if self.unmodified_original is not None: # pragma: no cover
self.log.debug(
f"Deleting unmodified original file {self.unmodified_original}",
)
self.unmodified_original.unlink()
# https://github.com/jonaswinkler/paperless-ng/discussions/1037 # https://github.com/jonaswinkler/paperless-ng/discussions/1037
shadow_file = ( shadow_file = (
Path(self.input_doc.original_file).parent Path(self.input_doc.original_file).parent
/ f"._{Path(self.input_doc.original_file).name}" / f"._{Path(self.input_doc.original_file).name}"
)
if Path(shadow_file).is_file():
self.log.debug(f"Deleting shadow file {shadow_file}")
Path(shadow_file).unlink()
except Exception as e:
self._fail(
str(e),
f"The following error occurred while storing document "
f"{self.filename} after parsing: {e}",
exc_info=True,
exception=e,
) )
if Path(shadow_file).is_file():
self.log.debug(f"Deleting shadow file {shadow_file}")
Path(shadow_file).unlink()
except Exception as e:
self._fail(
str(e),
f"The following error occurred while storing document "
f"{self.filename} after parsing: {e}",
exc_info=True,
exception=e,
)
finally:
tempdir.cleanup()
self.run_post_consume_script(document) self.run_post_consume_script(document)
self.log.info(f"Document {document} consumption finished") self.log.info(f"Document {document} consumption finished")