Splits into more sub functions for Sonar

This commit is contained in:
stumpylog
2026-05-04 14:18:27 -07:00
parent 58423ceb62
commit 26620afd62
@@ -313,6 +313,39 @@ class Command(CryptMixin, PaperlessCommand):
),
)
def _finalize_db_load(self, loaded_models: set[type]) -> None:
"""Verify referential integrity and reset auto-increment sequences."""
through_tables = {
field.remote_field.through._meta.db_table
for model in loaded_models
for field in model._meta.many_to_many
if field.remote_field.through._meta.auto_created
}
table_names = [m._meta.db_table for m in loaded_models] + list(through_tables)
if table_names:
connection.check_constraints(table_names=table_names)
if loaded_models:
sequence_sql = connection.ops.sequence_reset_sql(
no_style(),
list(loaded_models),
)
with connection.cursor() as cursor:
for sql in sequence_sql:
cursor.execute(sql)
def _import_error_context_message(self) -> str:
"""Return a diagnostic string explaining a DB import failure."""
if ( # pragma: no cover
self.version is not None and self.version != version.__full_version_str__
):
return ( # pragma: no cover
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}"
)
return "No version information present"
def load_data_to_database(self) -> None:
"""
Streams records from each manifest path and loads them into the database
@@ -383,48 +416,11 @@ class Command(CryptMixin, PaperlessCommand):
flush_all()
# Verify referential integrity now that all rows are inserted,
# including M2M through tables written by .set() above.
through_tables = {
field.remote_field.through._meta.db_table
for model in loaded_models
for field in model._meta.many_to_many
if field.remote_field.through._meta.auto_created
}
table_names = [m._meta.db_table for m in loaded_models] + list(
through_tables,
)
if table_names:
connection.check_constraints(table_names=table_names)
# Sequences must be reset after inserting rows with explicit PKs
# or the next auto-increment insert will collide with an existing PK.
if loaded_models:
sequence_sql = connection.ops.sequence_reset_sql(
no_style(),
list(loaded_models),
)
with connection.cursor() as cursor:
for sql in sequence_sql:
cursor.execute(sql)
self._finalize_db_load(loaded_models)
except (FieldDoesNotExist, DeserializationError, IntegrityError):
self.stdout.write(self.style.ERROR("Database import failed"))
if (
self.version is not None
and self.version != version.__full_version_str__
): # pragma: no cover
self.stdout.write(
self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}",
),
)
else:
self.stdout.write(
self.style.ERROR("No version information present"),
)
self.stdout.write(self.style.ERROR(self._import_error_context_message()))
raise
def handle(self, *args, **options) -> None: