Files
paperless-ngx/src/documents/management/commands/mixins.py
T
Trenton H e30676f889 Feature: Migrate import/export to rich progress (#12260)
* Refactor: migrate exporter/importer from tqdm to PaperlessCommand.track()

Replace direct tqdm usage in document_exporter and document_importer with
the PaperlessCommand base class and its track() method, which is backed by
Rich and handles --no-progress-bar automatically. Also removes the unused
ProgressBarMixin from mixins.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Refactor: add explicit supports_progress_bar and supports_multiprocessing to all PaperlessCommand subclasses

Each management command now explicitly declares both class attributes
rather than relying on defaults, making intent unambiguous at a glance.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 08:59:17 -07:00

140 lines
4.8 KiB
Python

import base64
import os
from typing import TypedDict
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.core.management import CommandError
from documents.settings import EXPORTER_CRYPTO_ALGO_NAME
from documents.settings import EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
from documents.settings import EXPORTER_CRYPTO_KEY_SIZE_NAME
from documents.settings import EXPORTER_CRYPTO_SALT_NAME
from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME
class CryptFields(TypedDict):
exporter_key: str
model_name: str
fields: list[str]
class CryptMixin:
"""
Fully based on:
https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet
To encrypt:
1. Call setup_crypto providing the user provided passphrase
2. Call encrypt_string with a value
3. Store the returned hexadecimal representation of the value
To decrypt:
1. Load the required parameters:
a. key iterations
b. key size
c. key algorithm
2. Call setup_crypto providing the user provided passphrase and stored salt
3. Call decrypt_string with a value
4. Use the returned value
"""
# This matches to Django's default for now
# https://github.com/django/django/blob/adae61942/django/contrib/auth/hashers.py#L315
# Set the defaults to be used during export
# During import, these are overridden from the loaded values to ensure decryption is possible
key_iterations = 1_000_000
salt_size = 16
key_size = 32
kdf_algorithm = "pbkdf2_sha256"
CRYPT_FIELDS: list[CryptFields] = [
{
"exporter_key": "mail_accounts",
"model_name": "paperless_mail.mailaccount",
"fields": [
"password",
"refresh_token",
],
},
{
"exporter_key": "social_tokens",
"model_name": "socialaccount.socialtoken",
"fields": [
"token",
"token_secret",
],
},
]
# O(1) lookup for per-record encryption; derived from CRYPT_FIELDS at class definition time
CRYPT_FIELDS_BY_MODEL: dict[str, list[str]] = {
cfg["model_name"]: cfg["fields"] for cfg in CRYPT_FIELDS
}
def get_crypt_params(self) -> dict[str, dict[str, str | int]]:
return {
EXPORTER_CRYPTO_SETTINGS_NAME: {
EXPORTER_CRYPTO_ALGO_NAME: self.kdf_algorithm,
EXPORTER_CRYPTO_KEY_ITERATIONS_NAME: self.key_iterations,
EXPORTER_CRYPTO_KEY_SIZE_NAME: self.key_size,
EXPORTER_CRYPTO_SALT_NAME: self.salt,
},
}
def load_crypt_params(self, metadata: dict) -> None:
# Load up the values for setting up decryption
self.kdf_algorithm: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_ALGO_NAME
]
self.key_iterations: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
]
self.key_size: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_KEY_SIZE_NAME
]
self.salt: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_SALT_NAME
]
def setup_crypto(self, *, passphrase: str, salt: str | None = None) -> None:
"""
Constructs a class for encryption or decryption using the specified passphrase and salt
Salt is assumed to be a hexadecimal representation of a cryptographically secure random byte string.
If not provided, it will be derived from the system secure random
"""
self.salt = salt or os.urandom(self.salt_size).hex()
# Derive the KDF based on loaded settings
if self.kdf_algorithm == "pbkdf2_sha256":
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=self.key_size,
salt=bytes.fromhex(self.salt),
iterations=self.key_iterations,
)
else: # pragma: no cover
raise CommandError(
f"{self.kdf_algorithm} is an unknown key derivation function",
)
key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
self.fernet = Fernet(key)
def encrypt_string(self, *, value: str) -> str:
"""
Given a string value, encrypts it and returns the hexadecimal representation of the encrypted token
"""
return self.fernet.encrypt(value.encode("utf-8")).hex()
def decrypt_string(self, *, value: str) -> str:
"""
Given a string value, decrypts it and returns the original value of the field
"""
return self.fernet.decrypt(bytes.fromhex(value)).decode("utf-8")