diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 809d6c647..64622dc74 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -338,18 +338,15 @@ class ConsumerPlugin( Return the document object if it was successfully created. """ - tempdir = None + # Preflight has already run including progress update to 0% + self.log.info(f"Consuming {self.filename}") - try: - # Preflight has already run including progress update to 0% - self.log.info(f"Consuming {self.filename}") - - # For the actual work, copy the file into a tempdir - tempdir = tempfile.TemporaryDirectory( - prefix="paperless-ngx", - dir=settings.SCRATCH_DIR, - ) - self.working_copy = Path(tempdir.name) / Path(self.filename) + # For the actual work, copy the file into a tempdir + with tempfile.TemporaryDirectory( + prefix="paperless-ngx", + dir=settings.SCRATCH_DIR, + ) as tmpdir: + self.working_copy = Path(tmpdir) / Path(self.filename) copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy) self.unmodified_original = None @@ -381,7 +378,7 @@ class ConsumerPlugin( self.log.debug(f"Detected mime type after qpdf: {mime_type}") # Save the original file for later self.unmodified_original = ( - Path(tempdir.name) / Path("uo") / Path(self.filename) + Path(tmpdir) / Path("uo") / Path(self.filename) ) self.unmodified_original.parent.mkdir(exist_ok=True) copy_file_with_basic_stats( @@ -400,7 +397,6 @@ class ConsumerPlugin( ) ) if not parser_class: - tempdir.cleanup() self._fail( ConsumerStatusShortMessage.UNSUPPORTED_TYPE, f"Unsupported mime type {mime_type}", @@ -415,281 +411,276 @@ class ConsumerPlugin( ) self.run_pre_consume_script() - except: - if tempdir: - tempdir.cleanup() - raise - # This doesn't parse the document yet, but gives us a parser. - with parser_class() as document_parser: - document_parser.configure( - ParserContext(mailrule_id=self.input_doc.mailrule_id), - ) - - self.log.debug(f"Parser: {document_parser.name} v{document_parser.version}") - - # Parse the document. This may take some time. - - text = None - date = None - thumbnail = None - archive_path = None - page_count = None - - try: - self._send_progress( - 20, - 100, - ProgressStatusOptions.WORKING, - ConsumerStatusShortMessage.PARSING_DOCUMENT, + # This doesn't parse the document yet, but gives us a parser. + with parser_class() as document_parser: + document_parser.configure( + ParserContext(mailrule_id=self.input_doc.mailrule_id), ) - self.log.debug(f"Parsing {self.filename}...") - document_parser.parse(self.working_copy, mime_type) - - self.log.debug(f"Generating thumbnail for {self.filename}...") - self._send_progress( - 70, - 100, - ProgressStatusOptions.WORKING, - ConsumerStatusShortMessage.GENERATING_THUMBNAIL, + self.log.debug( + f"Parser: {document_parser.name} v{document_parser.version}", ) - thumbnail = document_parser.get_thumbnail(self.working_copy, mime_type) - text = document_parser.get_text() - date = document_parser.get_date() - if date is None: + # Parse the document. This may take some time. + + text = None + date = None + thumbnail = None + archive_path = None + page_count = None + + try: self._send_progress( - 90, + 20, 100, ProgressStatusOptions.WORKING, - ConsumerStatusShortMessage.PARSE_DATE, + ConsumerStatusShortMessage.PARSING_DOCUMENT, ) - with get_date_parser() as date_parser: - date = next(date_parser.parse(self.filename, text), None) - archive_path = document_parser.get_archive_path() - page_count = document_parser.get_page_count( - self.working_copy, - mime_type, - ) + self.log.debug(f"Parsing {self.filename}...") - except ParseError as e: - if tempdir: - tempdir.cleanup() - self._fail( - str(e), - f"Error occurred while consuming document {self.filename}: {e}", - exc_info=True, - exception=e, - ) - except Exception as e: - if tempdir: - tempdir.cleanup() - self._fail( - str(e), - f"Unexpected error while consuming document {self.filename}: {e}", - exc_info=True, - exception=e, - ) + document_parser.parse(self.working_copy, mime_type) - # Prepare the document classifier. + self.log.debug(f"Generating thumbnail for {self.filename}...") + self._send_progress( + 70, + 100, + ProgressStatusOptions.WORKING, + ConsumerStatusShortMessage.GENERATING_THUMBNAIL, + ) + thumbnail = document_parser.get_thumbnail( + self.working_copy, + mime_type, + ) - # TODO: I don't really like to do this here, but this way we avoid - # reloading the classifier multiple times, since there are multiple - # post-consume hooks that all require the classifier. - - classifier = load_classifier() - - self._send_progress( - 95, - 100, - ProgressStatusOptions.WORKING, - ConsumerStatusShortMessage.SAVE_DOCUMENT, - ) - # now that everything is done, we can start to store the document - # in the system. This will be a transaction and reasonably fast. - try: - with transaction.atomic(): - # store the document. - if self.input_doc.root_document_id: - # If this is a new version of an existing document, we need - # to make sure we're not creating a new document, but updating - # the existing one. - root_doc = Document.objects.get( - pk=self.input_doc.root_document_id, + text = document_parser.get_text() + date = document_parser.get_date() + if date is None: + self._send_progress( + 90, + 100, + ProgressStatusOptions.WORKING, + ConsumerStatusShortMessage.PARSE_DATE, ) - original_document = self._create_version_from_root( - root_doc, - text=text, - page_count=page_count, - mime_type=mime_type, - ) - actor = None + with get_date_parser() as date_parser: + date = next(date_parser.parse(self.filename, text), None) + archive_path = document_parser.get_archive_path() + page_count = document_parser.get_page_count( + self.working_copy, + mime_type, + ) - # Save the new version, potentially creating an audit log entry for the version addition if enabled. - if ( - settings.AUDIT_LOG_ENABLED - and self.metadata.actor_id is not None - ): - actor = User.objects.filter( - pk=self.metadata.actor_id, - ).first() - if actor is not None: - from auditlog.context import ( # type: ignore[import-untyped] - set_actor, - ) + except ParseError as e: + self._fail( + str(e), + f"Error occurred while consuming document {self.filename}: {e}", + exc_info=True, + exception=e, + ) + except Exception as e: + self._fail( + str(e), + f"Unexpected error while consuming document {self.filename}: {e}", + exc_info=True, + exception=e, + ) - with set_actor(actor): + # Prepare the document classifier. + + # TODO: I don't really like to do this here, but this way we avoid + # reloading the classifier multiple times, since there are multiple + # post-consume hooks that all require the classifier. + + classifier = load_classifier() + + self._send_progress( + 95, + 100, + ProgressStatusOptions.WORKING, + ConsumerStatusShortMessage.SAVE_DOCUMENT, + ) + # now that everything is done, we can start to store the document + # in the system. This will be a transaction and reasonably fast. + try: + with transaction.atomic(): + # store the document. + if self.input_doc.root_document_id: + # If this is a new version of an existing document, we need + # to make sure we're not creating a new document, but updating + # the existing one. + root_doc = Document.objects.get( + pk=self.input_doc.root_document_id, + ) + original_document = self._create_version_from_root( + root_doc, + text=text, + page_count=page_count, + mime_type=mime_type, + ) + actor = None + + # Save the new version, potentially creating an audit log entry for the version addition if enabled. + if ( + settings.AUDIT_LOG_ENABLED + and self.metadata.actor_id is not None + ): + actor = User.objects.filter( + pk=self.metadata.actor_id, + ).first() + if actor is not None: + from auditlog.context import ( # type: ignore[import-untyped] + set_actor, + ) + + with set_actor(actor): + original_document.save() + else: original_document.save() else: original_document.save() + + # Create a log entry for the version addition, if enabled + if settings.AUDIT_LOG_ENABLED: + from auditlog.models import ( # type: ignore[import-untyped] + LogEntry, + ) + + LogEntry.objects.log_create( + instance=root_doc, + changes={ + "Version Added": ["None", original_document.id], + }, + action=LogEntry.Action.UPDATE, + actor=actor, + additional_data={ + "reason": "Version added", + "version_id": original_document.id, + }, + ) + document = original_document else: - original_document.save() - - # Create a log entry for the version addition, if enabled - if settings.AUDIT_LOG_ENABLED: - from auditlog.models import ( # type: ignore[import-untyped] - LogEntry, + document = self._store( + text=text, + date=date, + page_count=page_count, + mime_type=mime_type, ) - LogEntry.objects.log_create( - instance=root_doc, - changes={ - "Version Added": ["None", original_document.id], - }, - action=LogEntry.Action.UPDATE, - actor=actor, - additional_data={ - "reason": "Version added", - "version_id": original_document.id, - }, - ) - document = original_document - else: - document = self._store( - text=text, - date=date, - page_count=page_count, - mime_type=mime_type, - ) + # If we get here, it was successful. Proceed with post-consume + # hooks. If they fail, nothing will get changed. - # If we get here, it was successful. Proceed with post-consume - # hooks. If they fail, nothing will get changed. - - document_consumption_finished.send( - sender=self.__class__, - document=document, - logging_group=self.logging_group, - classifier=classifier, - original_file=self.unmodified_original - if self.unmodified_original - else self.working_copy, - ) - - # After everything is in the database, copy the files into - # place. If this fails, we'll also rollback the transaction. - with FileLock(settings.MEDIA_LOCK): - generated_filename = generate_unique_filename(document) - if ( - len(str(generated_filename)) - > Document.MAX_STORED_FILENAME_LENGTH - ): - self.log.warning( - "Generated source filename exceeds db path limit, falling back to default naming", - ) - generated_filename = generate_filename( - document, - use_format=False, - ) - document.filename = generated_filename - create_source_path_directory(document.source_path) - - self._write( - self.unmodified_original - if self.unmodified_original is not None + document_consumption_finished.send( + sender=self.__class__, + document=document, + logging_group=self.logging_group, + classifier=classifier, + original_file=self.unmodified_original + if self.unmodified_original else self.working_copy, - document.source_path, ) - self._write( - thumbnail, - document.thumbnail_path, - ) - - if archive_path and Path(archive_path).is_file(): - generated_archive_filename = generate_unique_filename( - document, - archive_filename=True, - ) + # After everything is in the database, copy the files into + # place. If this fails, we'll also rollback the transaction. + with FileLock(settings.MEDIA_LOCK): + generated_filename = generate_unique_filename(document) if ( - len(str(generated_archive_filename)) + len(str(generated_filename)) > Document.MAX_STORED_FILENAME_LENGTH ): self.log.warning( - "Generated archive filename exceeds db path limit, falling back to default naming", + "Generated source filename exceeds db path limit, falling back to default naming", ) - generated_archive_filename = generate_filename( + generated_filename = generate_filename( document, - archive_filename=True, use_format=False, ) - document.archive_filename = generated_archive_filename - create_source_path_directory(document.archive_path) + document.filename = generated_filename + create_source_path_directory(document.source_path) + self._write( - archive_path, - document.archive_path, + self.unmodified_original + if self.unmodified_original is not None + else self.working_copy, + document.source_path, ) - with Path(archive_path).open("rb") as f: - document.archive_checksum = hashlib.md5( - f.read(), - ).hexdigest() + self._write( + thumbnail, + document.thumbnail_path, + ) - # Don't save with the lock active. Saving will cause the file - # renaming logic to acquire the lock as well. - # This triggers things like file renaming - document.save() + if archive_path and Path(archive_path).is_file(): + generated_archive_filename = generate_unique_filename( + document, + archive_filename=True, + ) + if ( + len(str(generated_archive_filename)) + > Document.MAX_STORED_FILENAME_LENGTH + ): + self.log.warning( + "Generated archive filename exceeds db path limit, falling back to default naming", + ) + generated_archive_filename = generate_filename( + document, + archive_filename=True, + use_format=False, + ) + document.archive_filename = generated_archive_filename + create_source_path_directory(document.archive_path) + self._write( + archive_path, + document.archive_path, + ) - if document.root_document_id: - document_updated.send( - sender=self.__class__, - document=document.root_document, - ) + with Path(archive_path).open("rb") as f: + document.archive_checksum = hashlib.md5( + f.read(), + ).hexdigest() - # Delete the file only if it was successfully consumed - self.log.debug( - f"Deleting original file {self.input_doc.original_file}", - ) - self.input_doc.original_file.unlink() - self.log.debug(f"Deleting working copy {self.working_copy}") - self.working_copy.unlink() - if self.unmodified_original is not None: # pragma: no cover + # Don't save with the lock active. Saving will cause the file + # renaming logic to acquire the lock as well. + # This triggers things like file renaming + document.save() + + if document.root_document_id: + document_updated.send( + sender=self.__class__, + document=document.root_document, + ) + + # Delete the file only if it was successfully consumed self.log.debug( - f"Deleting unmodified original file {self.unmodified_original}", + f"Deleting original file {self.input_doc.original_file}", ) - self.unmodified_original.unlink() + self.input_doc.original_file.unlink() + self.log.debug(f"Deleting working copy {self.working_copy}") + self.working_copy.unlink() + if self.unmodified_original is not None: # pragma: no cover + self.log.debug( + f"Deleting unmodified original file {self.unmodified_original}", + ) + self.unmodified_original.unlink() - # https://github.com/jonaswinkler/paperless-ng/discussions/1037 - shadow_file = ( - Path(self.input_doc.original_file).parent - / f"._{Path(self.input_doc.original_file).name}" + # https://github.com/jonaswinkler/paperless-ng/discussions/1037 + shadow_file = ( + Path(self.input_doc.original_file).parent + / f"._{Path(self.input_doc.original_file).name}" + ) + + if Path(shadow_file).is_file(): + self.log.debug(f"Deleting shadow file {shadow_file}") + Path(shadow_file).unlink() + + except Exception as e: + self._fail( + str(e), + f"The following error occurred while storing document " + f"{self.filename} after parsing: {e}", + exc_info=True, + exception=e, ) - if Path(shadow_file).is_file(): - self.log.debug(f"Deleting shadow file {shadow_file}") - Path(shadow_file).unlink() - - except Exception as e: - self._fail( - str(e), - f"The following error occurred while storing document " - f"{self.filename} after parsing: {e}", - exc_info=True, - exception=e, - ) - finally: - tempdir.cleanup() - self.run_post_consume_script(document) self.log.info(f"Document {document} consumption finished")