mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-03-26 10:52:46 +00:00
Compare commits
1 Commits
dev
...
fix/contex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3bd374e83 |
@@ -338,18 +338,15 @@ class ConsumerPlugin(
|
|||||||
Return the document object if it was successfully created.
|
Return the document object if it was successfully created.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
tempdir = None
|
# Preflight has already run including progress update to 0%
|
||||||
|
self.log.info(f"Consuming {self.filename}")
|
||||||
|
|
||||||
try:
|
# For the actual work, copy the file into a tempdir
|
||||||
# Preflight has already run including progress update to 0%
|
with tempfile.TemporaryDirectory(
|
||||||
self.log.info(f"Consuming {self.filename}")
|
prefix="paperless-ngx",
|
||||||
|
dir=settings.SCRATCH_DIR,
|
||||||
# For the actual work, copy the file into a tempdir
|
) as tmpdir:
|
||||||
tempdir = tempfile.TemporaryDirectory(
|
self.working_copy = Path(tmpdir) / Path(self.filename)
|
||||||
prefix="paperless-ngx",
|
|
||||||
dir=settings.SCRATCH_DIR,
|
|
||||||
)
|
|
||||||
self.working_copy = Path(tempdir.name) / Path(self.filename)
|
|
||||||
copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy)
|
copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy)
|
||||||
self.unmodified_original = None
|
self.unmodified_original = None
|
||||||
|
|
||||||
@@ -381,7 +378,7 @@ class ConsumerPlugin(
|
|||||||
self.log.debug(f"Detected mime type after qpdf: {mime_type}")
|
self.log.debug(f"Detected mime type after qpdf: {mime_type}")
|
||||||
# Save the original file for later
|
# Save the original file for later
|
||||||
self.unmodified_original = (
|
self.unmodified_original = (
|
||||||
Path(tempdir.name) / Path("uo") / Path(self.filename)
|
Path(tmpdir) / Path("uo") / Path(self.filename)
|
||||||
)
|
)
|
||||||
self.unmodified_original.parent.mkdir(exist_ok=True)
|
self.unmodified_original.parent.mkdir(exist_ok=True)
|
||||||
copy_file_with_basic_stats(
|
copy_file_with_basic_stats(
|
||||||
@@ -400,7 +397,6 @@ class ConsumerPlugin(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
if not parser_class:
|
if not parser_class:
|
||||||
tempdir.cleanup()
|
|
||||||
self._fail(
|
self._fail(
|
||||||
ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
|
ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
|
||||||
f"Unsupported mime type {mime_type}",
|
f"Unsupported mime type {mime_type}",
|
||||||
@@ -415,281 +411,276 @@ class ConsumerPlugin(
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.run_pre_consume_script()
|
self.run_pre_consume_script()
|
||||||
except:
|
|
||||||
if tempdir:
|
|
||||||
tempdir.cleanup()
|
|
||||||
raise
|
|
||||||
|
|
||||||
# This doesn't parse the document yet, but gives us a parser.
|
# This doesn't parse the document yet, but gives us a parser.
|
||||||
with parser_class() as document_parser:
|
with parser_class() as document_parser:
|
||||||
document_parser.configure(
|
document_parser.configure(
|
||||||
ParserContext(mailrule_id=self.input_doc.mailrule_id),
|
ParserContext(mailrule_id=self.input_doc.mailrule_id),
|
||||||
)
|
|
||||||
|
|
||||||
self.log.debug(f"Parser: {document_parser.name} v{document_parser.version}")
|
|
||||||
|
|
||||||
# Parse the document. This may take some time.
|
|
||||||
|
|
||||||
text = None
|
|
||||||
date = None
|
|
||||||
thumbnail = None
|
|
||||||
archive_path = None
|
|
||||||
page_count = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._send_progress(
|
|
||||||
20,
|
|
||||||
100,
|
|
||||||
ProgressStatusOptions.WORKING,
|
|
||||||
ConsumerStatusShortMessage.PARSING_DOCUMENT,
|
|
||||||
)
|
)
|
||||||
self.log.debug(f"Parsing {self.filename}...")
|
|
||||||
|
|
||||||
document_parser.parse(self.working_copy, mime_type)
|
self.log.debug(
|
||||||
|
f"Parser: {document_parser.name} v{document_parser.version}",
|
||||||
self.log.debug(f"Generating thumbnail for {self.filename}...")
|
|
||||||
self._send_progress(
|
|
||||||
70,
|
|
||||||
100,
|
|
||||||
ProgressStatusOptions.WORKING,
|
|
||||||
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
|
|
||||||
)
|
)
|
||||||
thumbnail = document_parser.get_thumbnail(self.working_copy, mime_type)
|
|
||||||
|
|
||||||
text = document_parser.get_text()
|
# Parse the document. This may take some time.
|
||||||
date = document_parser.get_date()
|
|
||||||
if date is None:
|
text = None
|
||||||
|
date = None
|
||||||
|
thumbnail = None
|
||||||
|
archive_path = None
|
||||||
|
page_count = None
|
||||||
|
|
||||||
|
try:
|
||||||
self._send_progress(
|
self._send_progress(
|
||||||
90,
|
20,
|
||||||
100,
|
100,
|
||||||
ProgressStatusOptions.WORKING,
|
ProgressStatusOptions.WORKING,
|
||||||
ConsumerStatusShortMessage.PARSE_DATE,
|
ConsumerStatusShortMessage.PARSING_DOCUMENT,
|
||||||
)
|
)
|
||||||
with get_date_parser() as date_parser:
|
self.log.debug(f"Parsing {self.filename}...")
|
||||||
date = next(date_parser.parse(self.filename, text), None)
|
|
||||||
archive_path = document_parser.get_archive_path()
|
|
||||||
page_count = document_parser.get_page_count(
|
|
||||||
self.working_copy,
|
|
||||||
mime_type,
|
|
||||||
)
|
|
||||||
|
|
||||||
except ParseError as e:
|
document_parser.parse(self.working_copy, mime_type)
|
||||||
if tempdir:
|
|
||||||
tempdir.cleanup()
|
|
||||||
self._fail(
|
|
||||||
str(e),
|
|
||||||
f"Error occurred while consuming document {self.filename}: {e}",
|
|
||||||
exc_info=True,
|
|
||||||
exception=e,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
if tempdir:
|
|
||||||
tempdir.cleanup()
|
|
||||||
self._fail(
|
|
||||||
str(e),
|
|
||||||
f"Unexpected error while consuming document {self.filename}: {e}",
|
|
||||||
exc_info=True,
|
|
||||||
exception=e,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Prepare the document classifier.
|
self.log.debug(f"Generating thumbnail for {self.filename}...")
|
||||||
|
self._send_progress(
|
||||||
|
70,
|
||||||
|
100,
|
||||||
|
ProgressStatusOptions.WORKING,
|
||||||
|
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
|
||||||
|
)
|
||||||
|
thumbnail = document_parser.get_thumbnail(
|
||||||
|
self.working_copy,
|
||||||
|
mime_type,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: I don't really like to do this here, but this way we avoid
|
text = document_parser.get_text()
|
||||||
# reloading the classifier multiple times, since there are multiple
|
date = document_parser.get_date()
|
||||||
# post-consume hooks that all require the classifier.
|
if date is None:
|
||||||
|
self._send_progress(
|
||||||
classifier = load_classifier()
|
90,
|
||||||
|
100,
|
||||||
self._send_progress(
|
ProgressStatusOptions.WORKING,
|
||||||
95,
|
ConsumerStatusShortMessage.PARSE_DATE,
|
||||||
100,
|
|
||||||
ProgressStatusOptions.WORKING,
|
|
||||||
ConsumerStatusShortMessage.SAVE_DOCUMENT,
|
|
||||||
)
|
|
||||||
# now that everything is done, we can start to store the document
|
|
||||||
# in the system. This will be a transaction and reasonably fast.
|
|
||||||
try:
|
|
||||||
with transaction.atomic():
|
|
||||||
# store the document.
|
|
||||||
if self.input_doc.root_document_id:
|
|
||||||
# If this is a new version of an existing document, we need
|
|
||||||
# to make sure we're not creating a new document, but updating
|
|
||||||
# the existing one.
|
|
||||||
root_doc = Document.objects.get(
|
|
||||||
pk=self.input_doc.root_document_id,
|
|
||||||
)
|
)
|
||||||
original_document = self._create_version_from_root(
|
with get_date_parser() as date_parser:
|
||||||
root_doc,
|
date = next(date_parser.parse(self.filename, text), None)
|
||||||
text=text,
|
archive_path = document_parser.get_archive_path()
|
||||||
page_count=page_count,
|
page_count = document_parser.get_page_count(
|
||||||
mime_type=mime_type,
|
self.working_copy,
|
||||||
)
|
mime_type,
|
||||||
actor = None
|
)
|
||||||
|
|
||||||
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
|
except ParseError as e:
|
||||||
if (
|
self._fail(
|
||||||
settings.AUDIT_LOG_ENABLED
|
str(e),
|
||||||
and self.metadata.actor_id is not None
|
f"Error occurred while consuming document {self.filename}: {e}",
|
||||||
):
|
exc_info=True,
|
||||||
actor = User.objects.filter(
|
exception=e,
|
||||||
pk=self.metadata.actor_id,
|
)
|
||||||
).first()
|
except Exception as e:
|
||||||
if actor is not None:
|
self._fail(
|
||||||
from auditlog.context import ( # type: ignore[import-untyped]
|
str(e),
|
||||||
set_actor,
|
f"Unexpected error while consuming document {self.filename}: {e}",
|
||||||
)
|
exc_info=True,
|
||||||
|
exception=e,
|
||||||
|
)
|
||||||
|
|
||||||
with set_actor(actor):
|
# Prepare the document classifier.
|
||||||
|
|
||||||
|
# TODO: I don't really like to do this here, but this way we avoid
|
||||||
|
# reloading the classifier multiple times, since there are multiple
|
||||||
|
# post-consume hooks that all require the classifier.
|
||||||
|
|
||||||
|
classifier = load_classifier()
|
||||||
|
|
||||||
|
self._send_progress(
|
||||||
|
95,
|
||||||
|
100,
|
||||||
|
ProgressStatusOptions.WORKING,
|
||||||
|
ConsumerStatusShortMessage.SAVE_DOCUMENT,
|
||||||
|
)
|
||||||
|
# now that everything is done, we can start to store the document
|
||||||
|
# in the system. This will be a transaction and reasonably fast.
|
||||||
|
try:
|
||||||
|
with transaction.atomic():
|
||||||
|
# store the document.
|
||||||
|
if self.input_doc.root_document_id:
|
||||||
|
# If this is a new version of an existing document, we need
|
||||||
|
# to make sure we're not creating a new document, but updating
|
||||||
|
# the existing one.
|
||||||
|
root_doc = Document.objects.get(
|
||||||
|
pk=self.input_doc.root_document_id,
|
||||||
|
)
|
||||||
|
original_document = self._create_version_from_root(
|
||||||
|
root_doc,
|
||||||
|
text=text,
|
||||||
|
page_count=page_count,
|
||||||
|
mime_type=mime_type,
|
||||||
|
)
|
||||||
|
actor = None
|
||||||
|
|
||||||
|
# Save the new version, potentially creating an audit log entry for the version addition if enabled.
|
||||||
|
if (
|
||||||
|
settings.AUDIT_LOG_ENABLED
|
||||||
|
and self.metadata.actor_id is not None
|
||||||
|
):
|
||||||
|
actor = User.objects.filter(
|
||||||
|
pk=self.metadata.actor_id,
|
||||||
|
).first()
|
||||||
|
if actor is not None:
|
||||||
|
from auditlog.context import ( # type: ignore[import-untyped]
|
||||||
|
set_actor,
|
||||||
|
)
|
||||||
|
|
||||||
|
with set_actor(actor):
|
||||||
|
original_document.save()
|
||||||
|
else:
|
||||||
original_document.save()
|
original_document.save()
|
||||||
else:
|
else:
|
||||||
original_document.save()
|
original_document.save()
|
||||||
|
|
||||||
|
# Create a log entry for the version addition, if enabled
|
||||||
|
if settings.AUDIT_LOG_ENABLED:
|
||||||
|
from auditlog.models import ( # type: ignore[import-untyped]
|
||||||
|
LogEntry,
|
||||||
|
)
|
||||||
|
|
||||||
|
LogEntry.objects.log_create(
|
||||||
|
instance=root_doc,
|
||||||
|
changes={
|
||||||
|
"Version Added": ["None", original_document.id],
|
||||||
|
},
|
||||||
|
action=LogEntry.Action.UPDATE,
|
||||||
|
actor=actor,
|
||||||
|
additional_data={
|
||||||
|
"reason": "Version added",
|
||||||
|
"version_id": original_document.id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
document = original_document
|
||||||
else:
|
else:
|
||||||
original_document.save()
|
document = self._store(
|
||||||
|
text=text,
|
||||||
# Create a log entry for the version addition, if enabled
|
date=date,
|
||||||
if settings.AUDIT_LOG_ENABLED:
|
page_count=page_count,
|
||||||
from auditlog.models import ( # type: ignore[import-untyped]
|
mime_type=mime_type,
|
||||||
LogEntry,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
LogEntry.objects.log_create(
|
# If we get here, it was successful. Proceed with post-consume
|
||||||
instance=root_doc,
|
# hooks. If they fail, nothing will get changed.
|
||||||
changes={
|
|
||||||
"Version Added": ["None", original_document.id],
|
|
||||||
},
|
|
||||||
action=LogEntry.Action.UPDATE,
|
|
||||||
actor=actor,
|
|
||||||
additional_data={
|
|
||||||
"reason": "Version added",
|
|
||||||
"version_id": original_document.id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
document = original_document
|
|
||||||
else:
|
|
||||||
document = self._store(
|
|
||||||
text=text,
|
|
||||||
date=date,
|
|
||||||
page_count=page_count,
|
|
||||||
mime_type=mime_type,
|
|
||||||
)
|
|
||||||
|
|
||||||
# If we get here, it was successful. Proceed with post-consume
|
document_consumption_finished.send(
|
||||||
# hooks. If they fail, nothing will get changed.
|
sender=self.__class__,
|
||||||
|
document=document,
|
||||||
document_consumption_finished.send(
|
logging_group=self.logging_group,
|
||||||
sender=self.__class__,
|
classifier=classifier,
|
||||||
document=document,
|
original_file=self.unmodified_original
|
||||||
logging_group=self.logging_group,
|
if self.unmodified_original
|
||||||
classifier=classifier,
|
|
||||||
original_file=self.unmodified_original
|
|
||||||
if self.unmodified_original
|
|
||||||
else self.working_copy,
|
|
||||||
)
|
|
||||||
|
|
||||||
# After everything is in the database, copy the files into
|
|
||||||
# place. If this fails, we'll also rollback the transaction.
|
|
||||||
with FileLock(settings.MEDIA_LOCK):
|
|
||||||
generated_filename = generate_unique_filename(document)
|
|
||||||
if (
|
|
||||||
len(str(generated_filename))
|
|
||||||
> Document.MAX_STORED_FILENAME_LENGTH
|
|
||||||
):
|
|
||||||
self.log.warning(
|
|
||||||
"Generated source filename exceeds db path limit, falling back to default naming",
|
|
||||||
)
|
|
||||||
generated_filename = generate_filename(
|
|
||||||
document,
|
|
||||||
use_format=False,
|
|
||||||
)
|
|
||||||
document.filename = generated_filename
|
|
||||||
create_source_path_directory(document.source_path)
|
|
||||||
|
|
||||||
self._write(
|
|
||||||
self.unmodified_original
|
|
||||||
if self.unmodified_original is not None
|
|
||||||
else self.working_copy,
|
else self.working_copy,
|
||||||
document.source_path,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self._write(
|
# After everything is in the database, copy the files into
|
||||||
thumbnail,
|
# place. If this fails, we'll also rollback the transaction.
|
||||||
document.thumbnail_path,
|
with FileLock(settings.MEDIA_LOCK):
|
||||||
)
|
generated_filename = generate_unique_filename(document)
|
||||||
|
|
||||||
if archive_path and Path(archive_path).is_file():
|
|
||||||
generated_archive_filename = generate_unique_filename(
|
|
||||||
document,
|
|
||||||
archive_filename=True,
|
|
||||||
)
|
|
||||||
if (
|
if (
|
||||||
len(str(generated_archive_filename))
|
len(str(generated_filename))
|
||||||
> Document.MAX_STORED_FILENAME_LENGTH
|
> Document.MAX_STORED_FILENAME_LENGTH
|
||||||
):
|
):
|
||||||
self.log.warning(
|
self.log.warning(
|
||||||
"Generated archive filename exceeds db path limit, falling back to default naming",
|
"Generated source filename exceeds db path limit, falling back to default naming",
|
||||||
)
|
)
|
||||||
generated_archive_filename = generate_filename(
|
generated_filename = generate_filename(
|
||||||
document,
|
document,
|
||||||
archive_filename=True,
|
|
||||||
use_format=False,
|
use_format=False,
|
||||||
)
|
)
|
||||||
document.archive_filename = generated_archive_filename
|
document.filename = generated_filename
|
||||||
create_source_path_directory(document.archive_path)
|
create_source_path_directory(document.source_path)
|
||||||
|
|
||||||
self._write(
|
self._write(
|
||||||
archive_path,
|
self.unmodified_original
|
||||||
document.archive_path,
|
if self.unmodified_original is not None
|
||||||
|
else self.working_copy,
|
||||||
|
document.source_path,
|
||||||
)
|
)
|
||||||
|
|
||||||
with Path(archive_path).open("rb") as f:
|
self._write(
|
||||||
document.archive_checksum = hashlib.md5(
|
thumbnail,
|
||||||
f.read(),
|
document.thumbnail_path,
|
||||||
).hexdigest()
|
)
|
||||||
|
|
||||||
# Don't save with the lock active. Saving will cause the file
|
if archive_path and Path(archive_path).is_file():
|
||||||
# renaming logic to acquire the lock as well.
|
generated_archive_filename = generate_unique_filename(
|
||||||
# This triggers things like file renaming
|
document,
|
||||||
document.save()
|
archive_filename=True,
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
len(str(generated_archive_filename))
|
||||||
|
> Document.MAX_STORED_FILENAME_LENGTH
|
||||||
|
):
|
||||||
|
self.log.warning(
|
||||||
|
"Generated archive filename exceeds db path limit, falling back to default naming",
|
||||||
|
)
|
||||||
|
generated_archive_filename = generate_filename(
|
||||||
|
document,
|
||||||
|
archive_filename=True,
|
||||||
|
use_format=False,
|
||||||
|
)
|
||||||
|
document.archive_filename = generated_archive_filename
|
||||||
|
create_source_path_directory(document.archive_path)
|
||||||
|
self._write(
|
||||||
|
archive_path,
|
||||||
|
document.archive_path,
|
||||||
|
)
|
||||||
|
|
||||||
if document.root_document_id:
|
with Path(archive_path).open("rb") as f:
|
||||||
document_updated.send(
|
document.archive_checksum = hashlib.md5(
|
||||||
sender=self.__class__,
|
f.read(),
|
||||||
document=document.root_document,
|
).hexdigest()
|
||||||
)
|
|
||||||
|
|
||||||
# Delete the file only if it was successfully consumed
|
# Don't save with the lock active. Saving will cause the file
|
||||||
self.log.debug(
|
# renaming logic to acquire the lock as well.
|
||||||
f"Deleting original file {self.input_doc.original_file}",
|
# This triggers things like file renaming
|
||||||
)
|
document.save()
|
||||||
self.input_doc.original_file.unlink()
|
|
||||||
self.log.debug(f"Deleting working copy {self.working_copy}")
|
if document.root_document_id:
|
||||||
self.working_copy.unlink()
|
document_updated.send(
|
||||||
if self.unmodified_original is not None: # pragma: no cover
|
sender=self.__class__,
|
||||||
|
document=document.root_document,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete the file only if it was successfully consumed
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
f"Deleting unmodified original file {self.unmodified_original}",
|
f"Deleting original file {self.input_doc.original_file}",
|
||||||
)
|
)
|
||||||
self.unmodified_original.unlink()
|
self.input_doc.original_file.unlink()
|
||||||
|
self.log.debug(f"Deleting working copy {self.working_copy}")
|
||||||
|
self.working_copy.unlink()
|
||||||
|
if self.unmodified_original is not None: # pragma: no cover
|
||||||
|
self.log.debug(
|
||||||
|
f"Deleting unmodified original file {self.unmodified_original}",
|
||||||
|
)
|
||||||
|
self.unmodified_original.unlink()
|
||||||
|
|
||||||
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
|
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
|
||||||
shadow_file = (
|
shadow_file = (
|
||||||
Path(self.input_doc.original_file).parent
|
Path(self.input_doc.original_file).parent
|
||||||
/ f"._{Path(self.input_doc.original_file).name}"
|
/ f"._{Path(self.input_doc.original_file).name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if Path(shadow_file).is_file():
|
||||||
|
self.log.debug(f"Deleting shadow file {shadow_file}")
|
||||||
|
Path(shadow_file).unlink()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self._fail(
|
||||||
|
str(e),
|
||||||
|
f"The following error occurred while storing document "
|
||||||
|
f"{self.filename} after parsing: {e}",
|
||||||
|
exc_info=True,
|
||||||
|
exception=e,
|
||||||
)
|
)
|
||||||
|
|
||||||
if Path(shadow_file).is_file():
|
|
||||||
self.log.debug(f"Deleting shadow file {shadow_file}")
|
|
||||||
Path(shadow_file).unlink()
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self._fail(
|
|
||||||
str(e),
|
|
||||||
f"The following error occurred while storing document "
|
|
||||||
f"{self.filename} after parsing: {e}",
|
|
||||||
exc_info=True,
|
|
||||||
exception=e,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
tempdir.cleanup()
|
|
||||||
|
|
||||||
self.run_post_consume_script(document)
|
self.run_post_consume_script(document)
|
||||||
|
|
||||||
self.log.info(f"Document {document} consumption finished")
|
self.log.info(f"Document {document} consumption finished")
|
||||||
|
|||||||
Reference in New Issue
Block a user