Compare commits

...

1 Commits

Author SHA1 Message Date
shamoon
8b8307571a Fix: enforce path limit for db filename fields (#12235) 2026-03-03 13:19:56 -08:00
6 changed files with 107 additions and 15 deletions

View File

@@ -19,6 +19,7 @@ from documents.classifier import load_classifier
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
from documents.file_handling import create_source_path_directory
from documents.file_handling import generate_filename
from documents.file_handling import generate_unique_filename
from documents.loggers import LoggingMixin
from documents.models import Correspondent
@@ -493,7 +494,19 @@ class ConsumerPlugin(
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
document.filename = generate_unique_filename(document)
generated_filename = generate_unique_filename(document)
if (
len(str(generated_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated source filename exceeds db path limit, falling back to default naming",
)
generated_filename = generate_filename(
document,
use_format=False,
)
document.filename = generated_filename
create_source_path_directory(document.source_path)
self._write(
@@ -511,10 +524,23 @@ class ConsumerPlugin(
)
if archive_path and Path(archive_path).is_file():
document.archive_filename = generate_unique_filename(
generated_archive_filename = generate_unique_filename(
document,
archive_filename=True,
)
if (
len(str(generated_archive_filename))
> Document.MAX_STORED_FILENAME_LENGTH
):
self.log.warning(
"Generated archive filename exceeds db path limit, falling back to default naming",
)
generated_archive_filename = generate_filename(
document,
archive_filename=True,
use_format=False,
)
document.archive_filename = generated_archive_filename
create_source_path_directory(document.archive_path)
self._write(
document.storage_type,

View File

@@ -128,17 +128,21 @@ def generate_filename(
counter=0,
append_gpg=True,
archive_filename=False,
use_format=True,
) -> Path:
base_path: Path | None = None
# Determine the source of the format string
if doc.storage_path is not None:
filename_format = doc.storage_path.path
elif settings.FILENAME_FORMAT is not None:
# Maybe convert old to new style
filename_format = convert_format_str_to_template_format(
settings.FILENAME_FORMAT,
)
if use_format:
if doc.storage_path is not None:
filename_format = doc.storage_path.path
elif settings.FILENAME_FORMAT is not None:
# Maybe convert old to new style
filename_format = convert_format_str_to_template_format(
settings.FILENAME_FORMAT,
)
else:
filename_format = None
else:
filename_format = None

View File

@@ -160,6 +160,7 @@ class Document(SoftDeleteModel, ModelWithOwner):
(STORAGE_TYPE_UNENCRYPTED, _("Unencrypted")),
(STORAGE_TYPE_GPG, _("Encrypted with GNU Privacy Guard")),
)
MAX_STORED_FILENAME_LENGTH: Final[int] = 1024
correspondent = models.ForeignKey(
Correspondent,
@@ -267,7 +268,7 @@ class Document(SoftDeleteModel, ModelWithOwner):
filename = models.FilePathField(
_("filename"),
max_length=1024,
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=True,
@@ -277,7 +278,7 @@ class Document(SoftDeleteModel, ModelWithOwner):
archive_filename = models.FilePathField(
_("archive filename"),
max_length=1024,
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=True,
@@ -287,7 +288,7 @@ class Document(SoftDeleteModel, ModelWithOwner):
original_filename = models.CharField(
_("original filename"),
max_length=1024,
max_length=MAX_STORED_FILENAME_LENGTH,
editable=False,
default=None,
unique=False,

View File

@@ -460,8 +460,22 @@ def update_filename_and_move_files(
old_filename = instance.filename
old_source_path = instance.source_path
move_original = False
old_archive_filename = instance.archive_filename
old_archive_path = instance.archive_path
move_archive = False
candidate_filename = generate_filename(instance)
if len(str(candidate_filename)) > Document.MAX_STORED_FILENAME_LENGTH:
msg = (
f"Document {instance!s}: Generated filename exceeds db path "
f"limit ({len(str(candidate_filename))} > "
f"{Document.MAX_STORED_FILENAME_LENGTH}): {candidate_filename!s}"
)
logger.warning(msg)
raise CannotMoveFilesException(msg)
candidate_source_path = (
settings.ORIGINALS_DIR / candidate_filename
).resolve()
@@ -480,11 +494,16 @@ def update_filename_and_move_files(
instance.filename = str(new_filename)
move_original = old_filename != instance.filename
old_archive_filename = instance.archive_filename
old_archive_path = instance.archive_path
if instance.has_archive_version:
archive_candidate = generate_filename(instance, archive_filename=True)
if len(str(archive_candidate)) > Document.MAX_STORED_FILENAME_LENGTH:
msg = (
f"Document {instance!s}: Generated archive filename exceeds "
f"db path limit ({len(str(archive_candidate))} > "
f"{Document.MAX_STORED_FILENAME_LENGTH}): {archive_candidate!s}"
)
logger.warning(msg)
raise CannotMoveFilesException(msg)
archive_candidate_path = (
settings.ARCHIVE_DIR / archive_candidate
).resolve()

View File

@@ -633,6 +633,33 @@ class TestConsumer(
self._assert_first_last_send_progress()
@mock.patch("documents.consumer.generate_unique_filename")
def testFilenameHandlingFallsBackWhenGeneratedPathExceedsDbLimit(self, m):
m.side_effect = lambda doc, archive_filename=False: Path(
("a" * 1100 + ".pdf") if not archive_filename else ("b" * 1100 + ".pdf"),
)
with self.get_consumer(
self.get_test_file(),
DocumentMetadataOverrides(title="new docs"),
) as consumer:
consumer.run()
document = Document.objects.first()
self.assertIsNotNone(document)
assert document is not None
self.assertEqual(document.filename, f"{document.pk:07d}.pdf")
self.assertLessEqual(len(document.filename), 1024)
self.assertLessEqual(
len(document.archive_filename),
1024,
)
self.assertIsFile(document.source_path)
self.assertIsFile(document.archive_path)
self._assert_first_last_send_progress()
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
@mock.patch("documents.signals.handlers.generate_unique_filename")
def testFilenameHandlingUnstableFormat(self, m):

View File

@@ -1699,6 +1699,21 @@ class TestCustomFieldFilenameUpdates(
self.assertTrue(Path(self.doc.source_path).is_file())
self.assertLessEqual(m.call_count, 1)
@override_settings(FILENAME_FORMAT=None)
def test_overlong_storage_path_keeps_existing_filename(self):
initial_filename = generate_filename(self.doc)
Document.objects.filter(pk=self.doc.pk).update(filename=str(initial_filename))
self.doc.refresh_from_db()
Path(self.doc.source_path).parent.mkdir(parents=True, exist_ok=True)
Path(self.doc.source_path).touch()
self.doc.storage_path = StoragePath.objects.create(path="a" * 1100)
self.doc.save()
self.doc.refresh_from_db()
self.assertEqual(Path(self.doc.filename), initial_filename)
self.assertTrue(Path(self.doc.source_path).is_file())
class TestPathDateLocalization:
"""