From 8b8307571a95ac14febf7541e890087bda81c3cc Mon Sep 17 00:00:00 2001 From: shamoon <4887959+shamoon@users.noreply.github.com> Date: Tue, 3 Mar 2026 13:19:56 -0800 Subject: [PATCH] Fix: enforce path limit for db filename fields (#12235) --- src/documents/consumer.py | 30 +++++++++++++++++++++-- src/documents/file_handling.py | 18 ++++++++------ src/documents/models.py | 7 +++--- src/documents/signals/handlers.py | 25 ++++++++++++++++--- src/documents/tests/test_consumer.py | 27 ++++++++++++++++++++ src/documents/tests/test_file_handling.py | 15 ++++++++++++ 6 files changed, 107 insertions(+), 15 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 86641a243..8700305a3 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -19,6 +19,7 @@ from documents.classifier import load_classifier from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides from documents.file_handling import create_source_path_directory +from documents.file_handling import generate_filename from documents.file_handling import generate_unique_filename from documents.loggers import LoggingMixin from documents.models import Correspondent @@ -493,7 +494,19 @@ class ConsumerPlugin( # After everything is in the database, copy the files into # place. If this fails, we'll also rollback the transaction. with FileLock(settings.MEDIA_LOCK): - document.filename = generate_unique_filename(document) + generated_filename = generate_unique_filename(document) + if ( + len(str(generated_filename)) + > Document.MAX_STORED_FILENAME_LENGTH + ): + self.log.warning( + "Generated source filename exceeds db path limit, falling back to default naming", + ) + generated_filename = generate_filename( + document, + use_format=False, + ) + document.filename = generated_filename create_source_path_directory(document.source_path) self._write( @@ -511,10 +524,23 @@ class ConsumerPlugin( ) if archive_path and Path(archive_path).is_file(): - document.archive_filename = generate_unique_filename( + generated_archive_filename = generate_unique_filename( document, archive_filename=True, ) + if ( + len(str(generated_archive_filename)) + > Document.MAX_STORED_FILENAME_LENGTH + ): + self.log.warning( + "Generated archive filename exceeds db path limit, falling back to default naming", + ) + generated_archive_filename = generate_filename( + document, + archive_filename=True, + use_format=False, + ) + document.archive_filename = generated_archive_filename create_source_path_directory(document.archive_path) self._write( document.storage_type, diff --git a/src/documents/file_handling.py b/src/documents/file_handling.py index 48cd57311..3abcdd842 100644 --- a/src/documents/file_handling.py +++ b/src/documents/file_handling.py @@ -128,17 +128,21 @@ def generate_filename( counter=0, append_gpg=True, archive_filename=False, + use_format=True, ) -> Path: base_path: Path | None = None # Determine the source of the format string - if doc.storage_path is not None: - filename_format = doc.storage_path.path - elif settings.FILENAME_FORMAT is not None: - # Maybe convert old to new style - filename_format = convert_format_str_to_template_format( - settings.FILENAME_FORMAT, - ) + if use_format: + if doc.storage_path is not None: + filename_format = doc.storage_path.path + elif settings.FILENAME_FORMAT is not None: + # Maybe convert old to new style + filename_format = convert_format_str_to_template_format( + settings.FILENAME_FORMAT, + ) + else: + filename_format = None else: filename_format = None diff --git a/src/documents/models.py b/src/documents/models.py index c7c082a7b..3a1c393fa 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -160,6 +160,7 @@ class Document(SoftDeleteModel, ModelWithOwner): (STORAGE_TYPE_UNENCRYPTED, _("Unencrypted")), (STORAGE_TYPE_GPG, _("Encrypted with GNU Privacy Guard")), ) + MAX_STORED_FILENAME_LENGTH: Final[int] = 1024 correspondent = models.ForeignKey( Correspondent, @@ -267,7 +268,7 @@ class Document(SoftDeleteModel, ModelWithOwner): filename = models.FilePathField( _("filename"), - max_length=1024, + max_length=MAX_STORED_FILENAME_LENGTH, editable=False, default=None, unique=True, @@ -277,7 +278,7 @@ class Document(SoftDeleteModel, ModelWithOwner): archive_filename = models.FilePathField( _("archive filename"), - max_length=1024, + max_length=MAX_STORED_FILENAME_LENGTH, editable=False, default=None, unique=True, @@ -287,7 +288,7 @@ class Document(SoftDeleteModel, ModelWithOwner): original_filename = models.CharField( _("original filename"), - max_length=1024, + max_length=MAX_STORED_FILENAME_LENGTH, editable=False, default=None, unique=False, diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index d6edb523a..591d235bd 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -460,8 +460,22 @@ def update_filename_and_move_files( old_filename = instance.filename old_source_path = instance.source_path + move_original = False + + old_archive_filename = instance.archive_filename + old_archive_path = instance.archive_path + move_archive = False candidate_filename = generate_filename(instance) + if len(str(candidate_filename)) > Document.MAX_STORED_FILENAME_LENGTH: + msg = ( + f"Document {instance!s}: Generated filename exceeds db path " + f"limit ({len(str(candidate_filename))} > " + f"{Document.MAX_STORED_FILENAME_LENGTH}): {candidate_filename!s}" + ) + logger.warning(msg) + raise CannotMoveFilesException(msg) + candidate_source_path = ( settings.ORIGINALS_DIR / candidate_filename ).resolve() @@ -480,11 +494,16 @@ def update_filename_and_move_files( instance.filename = str(new_filename) move_original = old_filename != instance.filename - old_archive_filename = instance.archive_filename - old_archive_path = instance.archive_path - if instance.has_archive_version: archive_candidate = generate_filename(instance, archive_filename=True) + if len(str(archive_candidate)) > Document.MAX_STORED_FILENAME_LENGTH: + msg = ( + f"Document {instance!s}: Generated archive filename exceeds " + f"db path limit ({len(str(archive_candidate))} > " + f"{Document.MAX_STORED_FILENAME_LENGTH}): {archive_candidate!s}" + ) + logger.warning(msg) + raise CannotMoveFilesException(msg) archive_candidate_path = ( settings.ARCHIVE_DIR / archive_candidate ).resolve() diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 6387b5e95..58435eac5 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -633,6 +633,33 @@ class TestConsumer( self._assert_first_last_send_progress() + @mock.patch("documents.consumer.generate_unique_filename") + def testFilenameHandlingFallsBackWhenGeneratedPathExceedsDbLimit(self, m): + m.side_effect = lambda doc, archive_filename=False: Path( + ("a" * 1100 + ".pdf") if not archive_filename else ("b" * 1100 + ".pdf"), + ) + + with self.get_consumer( + self.get_test_file(), + DocumentMetadataOverrides(title="new docs"), + ) as consumer: + consumer.run() + + document = Document.objects.first() + self.assertIsNotNone(document) + assert document is not None + + self.assertEqual(document.filename, f"{document.pk:07d}.pdf") + self.assertLessEqual(len(document.filename), 1024) + self.assertLessEqual( + len(document.archive_filename), + 1024, + ) + self.assertIsFile(document.source_path) + self.assertIsFile(document.archive_path) + + self._assert_first_last_send_progress() + @override_settings(FILENAME_FORMAT="{correspondent}/{title}") @mock.patch("documents.signals.handlers.generate_unique_filename") def testFilenameHandlingUnstableFormat(self, m): diff --git a/src/documents/tests/test_file_handling.py b/src/documents/tests/test_file_handling.py index 186483655..52f06cb41 100644 --- a/src/documents/tests/test_file_handling.py +++ b/src/documents/tests/test_file_handling.py @@ -1699,6 +1699,21 @@ class TestCustomFieldFilenameUpdates( self.assertTrue(Path(self.doc.source_path).is_file()) self.assertLessEqual(m.call_count, 1) + @override_settings(FILENAME_FORMAT=None) + def test_overlong_storage_path_keeps_existing_filename(self): + initial_filename = generate_filename(self.doc) + Document.objects.filter(pk=self.doc.pk).update(filename=str(initial_filename)) + self.doc.refresh_from_db() + Path(self.doc.source_path).parent.mkdir(parents=True, exist_ok=True) + Path(self.doc.source_path).touch() + + self.doc.storage_path = StoragePath.objects.create(path="a" * 1100) + self.doc.save() + + self.doc.refresh_from_db() + self.assertEqual(Path(self.doc.filename), initial_filename) + self.assertTrue(Path(self.doc.source_path).is_file()) + class TestPathDateLocalization: """