Compare commits

..

11 Commits

Author SHA1 Message Date
Trenton H
0887203d45 feat(profiling): add workflow trigger matching profiling
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 14:55:25 -07:00
Trenton H
ea14c0b06f fix(profiling): use sha256 for sanity corpus checksums
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 14:50:21 -07:00
Trenton H
a8dc332abb feat(profiling): add sanity checker profiling
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 14:42:00 -07:00
Trenton H
e64b9a4cfd feat(profiling): add matching pipeline profiling
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 14:30:33 -07:00
Trenton H
6ba1acd7d3 fix(profiling): fix stale docstring and add module_db docstring in doclist test 2026-04-11 14:19:51 -07:00
Trenton H
d006b79fd1 feat(profiling): add document list API and selection_data profiling
Adds test_doclist_profile.py with 8 profiling tests covering the
/api/documents/ list path (ORM ordering, page sizes, single-doc detail,
cProfile) and _get_selection_data_for_queryset in isolation and via API.
Also registers the 'profiling' pytest marker in pyproject.toml.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 14:13:17 -07:00
Trenton H
24b754b44c fix(profiling): fix stale run paths in docstrings and consolidate profiling imports 2026-04-11 13:57:00 -07:00
Trenton H
a1a3520a8c refactor(profiling): use shared profile_cpu in backend search test
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 13:46:43 -07:00
Trenton H
23449cda17 refactor(profiling): use shared profile_cpu/measure_memory in classifier test
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 13:44:57 -07:00
Trenton H
ca3f5665ba fix(profiling): correct docstring import path and add Callable type annotation 2026-04-11 13:29:48 -07:00
Trenton H
9aa0914c3f feat(profiling): add profile_cpu and measure_memory shared helpers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 13:22:43 -07:00
10 changed files with 2342 additions and 5 deletions

150
profiling.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Temporary profiling utilities for comparing implementations.
Usage in a management command or shell::
from profiling import profile_block, profile_cpu, measure_memory
with profile_block("new check_sanity"):
messages = check_sanity()
with profile_block("old check_sanity"):
messages = check_sanity_old()
Drop this file when done.
"""
from __future__ import annotations
import tracemalloc
from collections.abc import Callable # noqa: TC003
from collections.abc import Generator # noqa: TC003
from contextlib import contextmanager
from time import perf_counter
from typing import Any
from django.db import connection
from django.db import reset_queries
from django.test.utils import override_settings
@contextmanager
def profile_block(label: str = "block") -> Generator[None, None, None]:
"""Profile memory, wall time, and DB queries for a code block.
Prints a summary to stdout on exit. Requires no external packages.
Enables DEBUG temporarily to capture Django's query log.
"""
tracemalloc.start()
snapshot_before = tracemalloc.take_snapshot()
with override_settings(DEBUG=True):
reset_queries()
start = perf_counter()
yield
elapsed = perf_counter() - start
queries = list(connection.queries)
snapshot_after = tracemalloc.take_snapshot()
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Compare snapshots for top allocations
stats = snapshot_after.compare_to(snapshot_before, "lineno")
query_time = sum(float(q["time"]) for q in queries)
mem_diff = sum(s.size_diff for s in stats)
print(f"\n{'=' * 60}") # noqa: T201
print(f" Profile: {label}") # noqa: T201
print(f"{'=' * 60}") # noqa: T201
print(f" Wall time: {elapsed:.4f}s") # noqa: T201
print(f" Queries: {len(queries)} ({query_time:.4f}s)") # noqa: T201
print(f" Memory delta: {mem_diff / 1024:.1f} KiB") # noqa: T201
print(f" Peak memory: {peak / 1024:.1f} KiB") # noqa: T201
print("\n Top 5 allocations:") # noqa: T201
for stat in stats[:5]:
print(f" {stat}") # noqa: T201
print(f"{'=' * 60}\n") # noqa: T201
def profile_cpu(
fn: Callable[[], Any],
*,
label: str,
top: int = 30,
sort: str = "cumtime",
) -> tuple[Any, float]:
"""Run *fn()* under cProfile, print stats, return (result, elapsed_s).
Args:
fn: Zero-argument callable to profile.
label: Human-readable label printed in the header.
top: Number of cProfile rows to print.
sort: cProfile sort key (default: cumulative time).
Returns:
``(result, elapsed_s)`` where *result* is the return value of *fn()*.
"""
import cProfile
import io
import pstats
pr = cProfile.Profile()
t0 = perf_counter()
pr.enable()
result = fn()
pr.disable()
elapsed = perf_counter() - t0
buf = io.StringIO()
ps = pstats.Stats(pr, stream=buf).sort_stats(sort)
ps.print_stats(top)
print(f"\n{'=' * 72}") # noqa: T201
print(f" {label}") # noqa: T201
print(f" wall time: {elapsed * 1000:.1f} ms") # noqa: T201
print(f"{'=' * 72}") # noqa: T201
print(buf.getvalue()) # noqa: T201
return result, elapsed
def measure_memory(fn: Callable[[], Any], *, label: str) -> tuple[Any, float, float]:
"""Run *fn()* under tracemalloc, print allocation report.
Args:
fn: Zero-argument callable to profile.
label: Human-readable label printed in the header.
Returns:
``(result, peak_kib, delta_kib)``.
"""
tracemalloc.start()
snapshot_before = tracemalloc.take_snapshot()
t0 = perf_counter()
result = fn()
elapsed = perf_counter() - t0
snapshot_after = tracemalloc.take_snapshot()
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
stats = snapshot_after.compare_to(snapshot_before, "lineno")
delta_kib = sum(s.size_diff for s in stats) / 1024
print(f"\n{'=' * 72}") # noqa: T201
print(f" [memory] {label}") # noqa: T201
print(f" wall time: {elapsed * 1000:.1f} ms") # noqa: T201
print(f" memory delta: {delta_kib:+.1f} KiB") # noqa: T201
print(f" peak traced: {peak / 1024:.1f} KiB") # noqa: T201
print(f"{'=' * 72}") # noqa: T201
print(" Top allocation sites (by size_diff):") # noqa: T201
for stat in stats[:20]:
if stat.size_diff != 0:
print( # noqa: T201
f" {stat.size_diff / 1024:+8.1f} KiB {stat.traceback.format()[0]}",
)
return result, peak / 1024, delta_kib

View File

@@ -24,7 +24,7 @@ dependencies = [
"dateparser~=1.2",
# WARNING: django does not use semver.
# Only patch versions are guaranteed to not introduce breaking changes.
"django~=5.2.13",
"django~=5.2.10",
"django-allauth[mfa,socialaccount]~=65.15.0",
"django-auditlog~=3.4.1",
"django-cachalot~=2.9.0",
@@ -312,6 +312,7 @@ markers = [
"date_parsing: Tests which cover date parsing from content or filename",
"management: Tests which cover management commands/functionality",
"search: Tests for the Tantivy search backend",
"profiling: Performance profiling tests — print measurements, no assertions",
]
[tool.pytest_env]

346
test_backend_profile.py Normal file
View File

@@ -0,0 +1,346 @@
# ruff: noqa: T201
"""
cProfile-based search pipeline profiling with a 20k-document dataset.
Run with:
uv run pytest ../test_backend_profile.py \
-m profiling --override-ini="addopts=" -s -v
Each scenario prints:
- Wall time for the operation
- cProfile stats sorted by cumulative time (top 25 callers)
This is a developer tool, not a correctness test. Nothing here should
fail unless the code is broken.
"""
from __future__ import annotations
import random
import time
from typing import TYPE_CHECKING
import pytest
from profiling import profile_cpu
from documents.models import Document
from documents.search._backend import TantivyBackend
from documents.search._backend import reset_backend
if TYPE_CHECKING:
from pathlib import Path
# transaction=False (default): tests roll back, but the module-scoped fixture
# commits its data outside the test transaction so it remains visible throughout.
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
# ---------------------------------------------------------------------------
# Dataset constants
# ---------------------------------------------------------------------------
NUM_DOCS = 20_000
SEED = 42
# Terms and their approximate match rates across the corpus.
# "rechnung" -> ~70% of docs (~14 000)
# "mahnung" -> ~20% of docs (~4 000)
# "kontonummer" -> ~5% of docs (~1 000)
# "rarewort" -> ~1% of docs (~200)
COMMON_TERM = "rechnung"
MEDIUM_TERM = "mahnung"
RARE_TERM = "kontonummer"
VERY_RARE_TERM = "rarewort"
PAGE_SIZE = 25
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_FILLER_WORDS = [
"dokument", # codespell:ignore
"seite",
"datum",
"betrag",
"nummer",
"konto",
"firma",
"vertrag",
"lieferant",
"bestellung",
"steuer",
"mwst",
"leistung",
"auftrag",
"zahlung",
]
def _build_content(rng: random.Random) -> str:
"""Return a short paragraph with terms embedded at the desired rates."""
words = rng.choices(_FILLER_WORDS, k=15)
if rng.random() < 0.70:
words.append(COMMON_TERM)
if rng.random() < 0.20:
words.append(MEDIUM_TERM)
if rng.random() < 0.05:
words.append(RARE_TERM)
if rng.random() < 0.01:
words.append(VERY_RARE_TERM)
rng.shuffle(words)
return " ".join(words)
def _time(fn, *, label: str, runs: int = 3):
"""Run *fn()* several times and report min/avg/max wall time (no cProfile)."""
times = []
result = None
for _ in range(runs):
t0 = time.perf_counter()
result = fn()
times.append(time.perf_counter() - t0)
mn, avg, mx = min(times), sum(times) / len(times), max(times)
print(
f" {label}: min={mn * 1000:.1f}ms avg={avg * 1000:.1f}ms max={mx * 1000:.1f}ms (n={runs})",
)
return result
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def large_backend(tmp_path_factory, module_db) -> TantivyBackend:
"""
Build a 20 000-document DB + on-disk Tantivy index, shared across all
profiling scenarios in this module. Teardown deletes all documents.
"""
index_path: Path = tmp_path_factory.mktemp("tantivy_profile")
# ---- 1. Bulk-create Document rows ----------------------------------------
rng = random.Random(SEED)
docs = [
Document(
title=f"Document {i:05d}",
content=_build_content(rng),
checksum=f"{i:064x}",
pk=i + 1,
)
for i in range(NUM_DOCS)
]
t0 = time.perf_counter()
Document.objects.bulk_create(docs, batch_size=1_000)
db_time = time.perf_counter() - t0
print(f"\n[setup] bulk_create {NUM_DOCS} docs: {db_time:.2f}s")
# ---- 2. Build Tantivy index -----------------------------------------------
backend = TantivyBackend(path=index_path)
backend.open()
t0 = time.perf_counter()
with backend.batch_update() as batch:
for doc in Document.objects.iterator(chunk_size=500):
batch.add_or_update(doc)
idx_time = time.perf_counter() - t0
print(f"[setup] index {NUM_DOCS} docs: {idx_time:.2f}s")
# ---- 3. Report corpus stats -----------------------------------------------
for term in (COMMON_TERM, MEDIUM_TERM, RARE_TERM, VERY_RARE_TERM):
count = len(backend.search_ids(term, user=None))
print(f"[setup] '{term}' -> {count} hits")
yield backend
# ---- Teardown ------------------------------------------------------------
backend.close()
reset_backend()
Document.objects.all().delete()
# ---------------------------------------------------------------------------
# Profiling tests — each scenario is a separate function so pytest can run
# them individually or all together with -m profiling.
# ---------------------------------------------------------------------------
class TestSearchIdsProfile:
"""Profile backend.search_ids() — pure Tantivy, no DB."""
def test_search_ids_large(self, large_backend: TantivyBackend):
"""~14 000 hits: how long does Tantivy take to collect all IDs?"""
profile_cpu(
lambda: large_backend.search_ids(COMMON_TERM, user=None),
label=f"search_ids('{COMMON_TERM}') [large result set ~14k]",
)
def test_search_ids_medium(self, large_backend: TantivyBackend):
"""~4 000 hits."""
profile_cpu(
lambda: large_backend.search_ids(MEDIUM_TERM, user=None),
label=f"search_ids('{MEDIUM_TERM}') [medium result set ~4k]",
)
def test_search_ids_rare(self, large_backend: TantivyBackend):
"""~1 000 hits."""
profile_cpu(
lambda: large_backend.search_ids(RARE_TERM, user=None),
label=f"search_ids('{RARE_TERM}') [rare result set ~1k]",
)
class TestIntersectAndOrderProfile:
"""
Profile the DB intersection step: filter(pk__in=search_ids).
This is the 'intersect_and_order' logic from views.py.
"""
def test_intersect_large(self, large_backend: TantivyBackend):
"""Intersect 14k Tantivy IDs with all 20k ORM-visible docs."""
all_ids = large_backend.search_ids(COMMON_TERM, user=None)
qs = Document.objects.all()
print(f"\n Tantivy returned {len(all_ids)} IDs")
profile_cpu(
lambda: list(qs.filter(pk__in=all_ids).values_list("pk", flat=True)),
label=f"filter(pk__in={len(all_ids)} ids) [large, use_tantivy_sort=True path]",
)
# Also time it a few times to get stable numbers
print()
_time(
lambda: list(qs.filter(pk__in=all_ids).values_list("pk", flat=True)),
label=f"filter(pk__in={len(all_ids)}) repeated",
)
def test_intersect_rare(self, large_backend: TantivyBackend):
"""Intersect ~1k Tantivy IDs — the happy path."""
all_ids = large_backend.search_ids(RARE_TERM, user=None)
qs = Document.objects.all()
print(f"\n Tantivy returned {len(all_ids)} IDs")
profile_cpu(
lambda: list(qs.filter(pk__in=all_ids).values_list("pk", flat=True)),
label=f"filter(pk__in={len(all_ids)} ids) [rare, use_tantivy_sort=True path]",
)
class TestHighlightHitsProfile:
"""Profile backend.highlight_hits() — per-doc Tantivy lookups with BM25 scoring."""
def test_highlight_page1(self, large_backend: TantivyBackend):
"""25-doc highlight for page 1 (rank_start=1)."""
all_ids = large_backend.search_ids(COMMON_TERM, user=None)
page_ids = all_ids[:PAGE_SIZE]
profile_cpu(
lambda: large_backend.highlight_hits(
COMMON_TERM,
page_ids,
rank_start=1,
),
label=f"highlight_hits page 1 (ids {all_ids[0]}..{all_ids[PAGE_SIZE - 1]})",
)
def test_highlight_page_middle(self, large_backend: TantivyBackend):
"""25-doc highlight for a mid-corpus page (rank_start=page_offset+1)."""
all_ids = large_backend.search_ids(COMMON_TERM, user=None)
mid = len(all_ids) // 2
page_ids = all_ids[mid : mid + PAGE_SIZE]
page_offset = mid
profile_cpu(
lambda: large_backend.highlight_hits(
COMMON_TERM,
page_ids,
rank_start=page_offset + 1,
),
label=f"highlight_hits page ~{mid // PAGE_SIZE} (offset {page_offset})",
)
def test_highlight_repeated(self, large_backend: TantivyBackend):
"""Multiple runs of page-1 highlight to see variance."""
all_ids = large_backend.search_ids(COMMON_TERM, user=None)
page_ids = all_ids[:PAGE_SIZE]
print()
_time(
lambda: large_backend.highlight_hits(COMMON_TERM, page_ids, rank_start=1),
label="highlight_hits page 1",
runs=5,
)
class TestFullPipelineProfile:
"""
Profile the combined pipeline as it runs in views.py:
search_ids -> filter(pk__in) -> highlight_hits
"""
def _run_pipeline(
self,
backend: TantivyBackend,
term: str,
page: int = 1,
):
all_ids = backend.search_ids(term, user=None)
qs = Document.objects.all()
visible_ids = set(qs.filter(pk__in=all_ids).values_list("pk", flat=True))
ordered_ids = [i for i in all_ids if i in visible_ids]
page_offset = (page - 1) * PAGE_SIZE
page_ids = ordered_ids[page_offset : page_offset + PAGE_SIZE]
hits = backend.highlight_hits(
term,
page_ids,
rank_start=page_offset + 1,
)
return ordered_ids, hits
def test_pipeline_large_page1(self, large_backend: TantivyBackend):
"""Full pipeline: large result set, page 1."""
ordered_ids, hits = profile_cpu(
lambda: self._run_pipeline(large_backend, COMMON_TERM, page=1),
label=f"full pipeline '{COMMON_TERM}' page 1",
)[0]
print(f" -> {len(ordered_ids)} total results, {len(hits)} hits on page")
def test_pipeline_large_page5(self, large_backend: TantivyBackend):
"""Full pipeline: large result set, page 5."""
ordered_ids, hits = profile_cpu(
lambda: self._run_pipeline(large_backend, COMMON_TERM, page=5),
label=f"full pipeline '{COMMON_TERM}' page 5",
)[0]
print(f" -> {len(ordered_ids)} total results, {len(hits)} hits on page")
def test_pipeline_rare(self, large_backend: TantivyBackend):
"""Full pipeline: rare term, page 1 (fast path)."""
ordered_ids, hits = profile_cpu(
lambda: self._run_pipeline(large_backend, RARE_TERM, page=1),
label=f"full pipeline '{RARE_TERM}' page 1",
)[0]
print(f" -> {len(ordered_ids)} total results, {len(hits)} hits on page")
def test_pipeline_repeated(self, large_backend: TantivyBackend):
"""Repeated runs to get stable timing (no cProfile overhead)."""
print()
for term, label in [
(COMMON_TERM, f"'{COMMON_TERM}' (large)"),
(MEDIUM_TERM, f"'{MEDIUM_TERM}' (medium)"),
(RARE_TERM, f"'{RARE_TERM}' (rare)"),
]:
_time(
lambda t=term: self._run_pipeline(large_backend, t, page=1),
label=f"full pipeline {label} page 1",
runs=3,
)

605
test_classifier_profile.py Normal file
View File

@@ -0,0 +1,605 @@
# ruff: noqa: T201
"""
cProfile + tracemalloc classifier profiling test.
Run with:
uv run pytest ../test_classifier_profile.py \
-m profiling --override-ini="addopts=" -s -v
Corpus: 5 000 documents, 40 correspondents (25 AUTO), 25 doc types (15 AUTO),
50 tags (30 AUTO), 20 storage paths (12 AUTO).
Document content is generated with Faker for realistic base text, with a
per-label fingerprint injected so the MLP has a real learning signal.
Scenarios:
- train() full corpus — memory and CPU profiles
- second train() no-op path — shows cost of the skip check
- save()/load() round-trip — model file size and memory cost
- _update_data_vectorizer_hash() isolated hash overhead
- predict_*() four independent calls per document — the 4x redundant
vectorization path used by the signal handlers
- _vectorize() cache-miss vs cache-hit breakdown
Memory: tracemalloc (delta + peak + top-20 allocation sites).
CPU: cProfile sorted by cumulative time (top 30).
"""
from __future__ import annotations
import random
import time
from typing import TYPE_CHECKING
import pytest
from django.test import override_settings
from faker import Faker
from profiling import measure_memory
from profiling import profile_cpu
from documents.classifier import DocumentClassifier
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
if TYPE_CHECKING:
from pathlib import Path
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
# ---------------------------------------------------------------------------
# Corpus parameters
# ---------------------------------------------------------------------------
NUM_DOCS = 5_000
NUM_CORRESPONDENTS = 40 # first 25 are MATCH_AUTO
NUM_DOC_TYPES = 25 # first 15 are MATCH_AUTO
NUM_TAGS = 50 # first 30 are MATCH_AUTO
NUM_STORAGE_PATHS = 20 # first 12 are MATCH_AUTO
NUM_AUTO_CORRESPONDENTS = 25
NUM_AUTO_DOC_TYPES = 15
NUM_AUTO_TAGS = 30
NUM_AUTO_STORAGE_PATHS = 12
SEED = 42
# ---------------------------------------------------------------------------
# Content generation
# ---------------------------------------------------------------------------
def _make_label_fingerprint(
fake: Faker,
label_seed: int,
n_words: int = 6,
) -> list[str]:
"""
Generate a small set of unique-looking words to use as the learning
fingerprint for a label. Each label gets its own seeded Faker so the
fingerprints are distinct and reproducible.
"""
per_label_fake = Faker()
per_label_fake.seed_instance(label_seed)
# Mix word() and last_name() to get varied, pronounceable tokens
words: list[str] = []
while len(words) < n_words:
w = per_label_fake.word().lower()
if w not in words:
words.append(w)
return words
def _build_fingerprints(
num_correspondents: int,
num_doc_types: int,
num_tags: int,
num_paths: int,
) -> tuple[list[list[str]], list[list[str]], list[list[str]], list[list[str]]]:
"""Pre-generate per-label fingerprints. Expensive once, free to reuse."""
fake = Faker()
# Use deterministic seeds offset by type so fingerprints don't collide
corr_fps = [
_make_label_fingerprint(fake, 1_000 + i) for i in range(num_correspondents)
]
dtype_fps = [_make_label_fingerprint(fake, 2_000 + i) for i in range(num_doc_types)]
tag_fps = [_make_label_fingerprint(fake, 3_000 + i) for i in range(num_tags)]
path_fps = [_make_label_fingerprint(fake, 4_000 + i) for i in range(num_paths)]
return corr_fps, dtype_fps, tag_fps, path_fps
def _build_content(
fake: Faker,
corr_fp: list[str] | None,
dtype_fp: list[str] | None,
tag_fps: list[list[str]],
path_fp: list[str] | None,
) -> str:
"""
Combine a Faker paragraph (realistic base text) with per-label
fingerprint words so the classifier has a genuine learning signal.
"""
# 3-sentence paragraph provides realistic vocabulary
base = fake.paragraph(nb_sentences=3)
extras: list[str] = []
if corr_fp:
extras.extend(corr_fp)
if dtype_fp:
extras.extend(dtype_fp)
for fp in tag_fps:
extras.extend(fp)
if path_fp:
extras.extend(path_fp)
if extras:
return base + " " + " ".join(extras)
return base
# ---------------------------------------------------------------------------
# Module-scoped corpus fixture
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def classifier_corpus(tmp_path_factory, module_db):
"""
Build the full 5 000-document corpus once for all profiling tests.
Label objects are created individually (small number), documents are
bulk-inserted, and tag M2M rows go through the through-table.
Yields a dict with the model path and a sample content string for
prediction tests. All rows are deleted on teardown.
"""
model_path: Path = tmp_path_factory.mktemp("cls_profile") / "model.pickle"
with override_settings(MODEL_FILE=model_path):
fake = Faker()
Faker.seed(SEED)
rng = random.Random(SEED)
# Pre-generate fingerprints for all labels
print("\n[setup] Generating label fingerprints...")
corr_fps, dtype_fps, tag_fps, path_fps = _build_fingerprints(
NUM_CORRESPONDENTS,
NUM_DOC_TYPES,
NUM_TAGS,
NUM_STORAGE_PATHS,
)
# -----------------------------------------------------------------
# 1. Create label objects
# -----------------------------------------------------------------
print(f"[setup] Creating {NUM_CORRESPONDENTS} correspondents...")
correspondents: list[Correspondent] = []
for i in range(NUM_CORRESPONDENTS):
algo = (
MatchingModel.MATCH_AUTO
if i < NUM_AUTO_CORRESPONDENTS
else MatchingModel.MATCH_NONE
)
correspondents.append(
Correspondent.objects.create(
name=fake.company(),
matching_algorithm=algo,
),
)
print(f"[setup] Creating {NUM_DOC_TYPES} document types...")
doc_types: list[DocumentType] = []
for i in range(NUM_DOC_TYPES):
algo = (
MatchingModel.MATCH_AUTO
if i < NUM_AUTO_DOC_TYPES
else MatchingModel.MATCH_NONE
)
doc_types.append(
DocumentType.objects.create(
name=fake.bs()[:64],
matching_algorithm=algo,
),
)
print(f"[setup] Creating {NUM_TAGS} tags...")
tags: list[Tag] = []
for i in range(NUM_TAGS):
algo = (
MatchingModel.MATCH_AUTO
if i < NUM_AUTO_TAGS
else MatchingModel.MATCH_NONE
)
tags.append(
Tag.objects.create(
name=f"{fake.word()} {i}",
matching_algorithm=algo,
is_inbox_tag=False,
),
)
print(f"[setup] Creating {NUM_STORAGE_PATHS} storage paths...")
storage_paths: list[StoragePath] = []
for i in range(NUM_STORAGE_PATHS):
algo = (
MatchingModel.MATCH_AUTO
if i < NUM_AUTO_STORAGE_PATHS
else MatchingModel.MATCH_NONE
)
storage_paths.append(
StoragePath.objects.create(
name=fake.word(),
path=f"{fake.word()}/{fake.word()}/{{title}}",
matching_algorithm=algo,
),
)
# -----------------------------------------------------------------
# 2. Build document rows and M2M assignments
# -----------------------------------------------------------------
print(f"[setup] Building {NUM_DOCS} document rows...")
doc_rows: list[Document] = []
doc_tag_map: list[tuple[int, int]] = [] # (doc_position, tag_index)
for i in range(NUM_DOCS):
corr_idx = (
rng.randrange(NUM_CORRESPONDENTS) if rng.random() < 0.80 else None
)
dt_idx = rng.randrange(NUM_DOC_TYPES) if rng.random() < 0.80 else None
sp_idx = rng.randrange(NUM_STORAGE_PATHS) if rng.random() < 0.70 else None
# 1-4 tags; most documents get at least one
n_tags = rng.randint(1, 4) if rng.random() < 0.85 else 0
assigned_tag_indices = rng.sample(range(NUM_TAGS), min(n_tags, NUM_TAGS))
content = _build_content(
fake,
corr_fp=corr_fps[corr_idx] if corr_idx is not None else None,
dtype_fp=dtype_fps[dt_idx] if dt_idx is not None else None,
tag_fps=[tag_fps[ti] for ti in assigned_tag_indices],
path_fp=path_fps[sp_idx] if sp_idx is not None else None,
)
doc_rows.append(
Document(
title=fake.sentence(nb_words=5),
content=content,
checksum=f"{i:064x}",
correspondent=correspondents[corr_idx]
if corr_idx is not None
else None,
document_type=doc_types[dt_idx] if dt_idx is not None else None,
storage_path=storage_paths[sp_idx] if sp_idx is not None else None,
),
)
for ti in assigned_tag_indices:
doc_tag_map.append((i, ti))
t0 = time.perf_counter()
Document.objects.bulk_create(doc_rows, batch_size=500)
print(
f"[setup] bulk_create {NUM_DOCS} documents: {time.perf_counter() - t0:.2f}s",
)
# -----------------------------------------------------------------
# 3. Bulk-create M2M through-table rows
# -----------------------------------------------------------------
created_docs = list(Document.objects.order_by("pk"))
through_rows = [
Document.tags.through(
document_id=created_docs[pos].pk,
tag_id=tags[ti].pk,
)
for pos, ti in doc_tag_map
if pos < len(created_docs)
]
t0 = time.perf_counter()
Document.tags.through.objects.bulk_create(
through_rows,
batch_size=1_000,
ignore_conflicts=True,
)
print(
f"[setup] bulk_create {len(through_rows)} tag M2M rows: "
f"{time.perf_counter() - t0:.2f}s",
)
# Sample content for prediction tests
sample_content = _build_content(
fake,
corr_fp=corr_fps[0],
dtype_fp=dtype_fps[0],
tag_fps=[tag_fps[0], tag_fps[1], tag_fps[5]],
path_fp=path_fps[0],
)
yield {
"model_path": model_path,
"sample_content": sample_content,
}
# Teardown
print("\n[teardown] Removing corpus...")
Document.objects.all().delete()
Correspondent.objects.all().delete()
DocumentType.objects.all().delete()
Tag.objects.all().delete()
StoragePath.objects.all().delete()
# ---------------------------------------------------------------------------
# Training profiles
# ---------------------------------------------------------------------------
class TestClassifierTrainingProfile:
"""Profile DocumentClassifier.train() on the full corpus."""
def test_train_memory(self, classifier_corpus, tmp_path):
"""
Peak memory allocated during train().
tracemalloc reports the delta and top allocation sites.
"""
model_path = tmp_path / "model.pickle"
with override_settings(MODEL_FILE=model_path):
classifier = DocumentClassifier()
result, _, _ = measure_memory(
classifier.train,
label=(
f"train() [{NUM_DOCS} docs | "
f"{NUM_CORRESPONDENTS} correspondents ({NUM_AUTO_CORRESPONDENTS} AUTO) | "
f"{NUM_DOC_TYPES} doc types ({NUM_AUTO_DOC_TYPES} AUTO) | "
f"{NUM_TAGS} tags ({NUM_AUTO_TAGS} AUTO) | "
f"{NUM_STORAGE_PATHS} paths ({NUM_AUTO_STORAGE_PATHS} AUTO)]"
),
)
assert result is True, "train() must return True on first run"
print("\n Classifiers trained:")
print(
f" tags_classifier: {classifier.tags_classifier is not None}",
)
print(
f" correspondent_classifier: {classifier.correspondent_classifier is not None}",
)
print(
f" document_type_classifier: {classifier.document_type_classifier is not None}",
)
print(
f" storage_path_classifier: {classifier.storage_path_classifier is not None}",
)
if classifier.data_vectorizer is not None:
vocab_size = len(classifier.data_vectorizer.vocabulary_)
print(f" vocabulary size: {vocab_size} terms")
def test_train_cpu(self, classifier_corpus, tmp_path):
"""
CPU profile of train() — shows time spent in DB queries,
CountVectorizer.fit_transform(), and four MLPClassifier.fit() calls.
"""
model_path = tmp_path / "model_cpu.pickle"
with override_settings(MODEL_FILE=model_path):
classifier = DocumentClassifier()
profile_cpu(
classifier.train,
label=f"train() [{NUM_DOCS} docs]",
top=30,
)
def test_train_second_call_noop(self, classifier_corpus, tmp_path):
"""
No-op path: second train() on unchanged data should return False.
Still queries the DB to build the hash — shown here as the remaining cost.
"""
model_path = tmp_path / "model_noop.pickle"
with override_settings(MODEL_FILE=model_path):
classifier = DocumentClassifier()
t0 = time.perf_counter()
classifier.train()
first_ms = (time.perf_counter() - t0) * 1000
result, second_elapsed = profile_cpu(
classifier.train,
label="train() second call (no-op — same data unchanged)",
top=20,
)
assert result is False, "second train() should skip and return False"
print(f"\n First train: {first_ms:.1f} ms (full fit)")
print(f" Second train: {second_elapsed * 1000:.1f} ms (skip)")
print(f" Speedup: {first_ms / (second_elapsed * 1000):.1f}x")
def test_vectorizer_hash_cost(self, classifier_corpus, tmp_path):
"""
Isolate _update_data_vectorizer_hash() — pickles the entire
CountVectorizer just to SHA256 it. Called at both save and load.
"""
import pickle
model_path = tmp_path / "model_hash.pickle"
with override_settings(MODEL_FILE=model_path):
classifier = DocumentClassifier()
classifier.train()
profile_cpu(
classifier._update_data_vectorizer_hash,
label="_update_data_vectorizer_hash() [pickle.dumps vectorizer + sha256]",
top=10,
)
pickled_size = len(pickle.dumps(classifier.data_vectorizer))
vocab_size = len(classifier.data_vectorizer.vocabulary_)
print(f"\n Vocabulary size: {vocab_size} terms")
print(f" Pickled vectorizer: {pickled_size / 1024:.1f} KiB")
def test_save_load_roundtrip(self, classifier_corpus, tmp_path):
"""
Profile save() and load() — model file size directly reflects how
much memory the classifier occupies on disk (and roughly in RAM).
"""
model_path = tmp_path / "model_saveload.pickle"
with override_settings(MODEL_FILE=model_path):
classifier = DocumentClassifier()
classifier.train()
_, save_peak, _ = measure_memory(
classifier.save,
label="save() [pickle.dumps + HMAC + atomic rename]",
)
file_size_kib = model_path.stat().st_size / 1024
print(f"\n Model file size: {file_size_kib:.1f} KiB")
classifier2 = DocumentClassifier()
_, load_peak, _ = measure_memory(
classifier2.load,
label="load() [read file + verify HMAC + pickle.loads]",
)
print("\n Summary:")
print(f" Model file size: {file_size_kib:.1f} KiB")
print(f" Save peak memory: {save_peak:.1f} KiB")
print(f" Load peak memory: {load_peak:.1f} KiB")
# ---------------------------------------------------------------------------
# Prediction profiles
# ---------------------------------------------------------------------------
class TestClassifierPredictionProfile:
"""
Profile the four predict_*() methods — specifically the redundant
per-call vectorization overhead from the signal handler pattern.
"""
@pytest.fixture(autouse=True)
def trained_classifier(self, classifier_corpus, tmp_path):
model_path = tmp_path / "model_pred.pickle"
self._ctx = override_settings(MODEL_FILE=model_path)
self._ctx.enable()
self.classifier = DocumentClassifier()
self.classifier.train()
self.content = classifier_corpus["sample_content"]
yield
self._ctx.disable()
def test_predict_all_four_separately_cpu(self):
"""
Profile all four predict_*() calls in the order the signal handlers
fire them. Call 1 is a cache miss; calls 2-4 hit the locmem cache
but still pay sha256 + pickle.loads each time.
"""
from django.core.cache import caches
caches["read-cache"].clear()
content = self.content
print(f"\n Content length: {len(content)} chars")
calls = [
("predict_correspondent", self.classifier.predict_correspondent),
("predict_document_type", self.classifier.predict_document_type),
("predict_tags", self.classifier.predict_tags),
("predict_storage_path", self.classifier.predict_storage_path),
]
timings: list[tuple[str, float]] = []
for name, fn in calls:
_, elapsed = profile_cpu(
lambda f=fn: f(content),
label=f"{name}() [call {len(timings) + 1}/4]",
top=15,
)
timings.append((name, elapsed * 1000))
print("\n Per-call timings (sequential, locmem cache):")
for name, ms in timings:
print(f" {name:<32s} {ms:8.3f} ms")
print(f" {'TOTAL':<32s} {sum(t for _, t in timings):8.3f} ms")
def test_predict_all_four_memory(self):
"""
Memory allocated for the full four-prediction sequence, both cold
and warm, to show pickle serialization allocation per call.
"""
from django.core.cache import caches
content = self.content
calls = [
self.classifier.predict_correspondent,
self.classifier.predict_document_type,
self.classifier.predict_tags,
self.classifier.predict_storage_path,
]
caches["read-cache"].clear()
measure_memory(
lambda: [fn(content) for fn in calls],
label="all four predict_*() [cache COLD — first call misses]",
)
measure_memory(
lambda: [fn(content) for fn in calls],
label="all four predict_*() [cache WARM — all calls hit]",
)
def test_vectorize_cache_miss_vs_hit(self):
"""
Isolate the cost of a cache miss (sha256 + transform + pickle.dumps)
vs a cache hit (sha256 + pickle.loads).
"""
from django.core.cache import caches
read_cache = caches["read-cache"]
content = self.content
read_cache.clear()
_, miss_elapsed = profile_cpu(
lambda: self.classifier._vectorize(content),
label="_vectorize() [MISS: sha256 + transform + pickle.dumps]",
top=15,
)
_, hit_elapsed = profile_cpu(
lambda: self.classifier._vectorize(content),
label="_vectorize() [HIT: sha256 + pickle.loads]",
top=15,
)
print(f"\n Cache miss: {miss_elapsed * 1000:.3f} ms")
print(f" Cache hit: {hit_elapsed * 1000:.3f} ms")
print(f" Hit is {miss_elapsed / hit_elapsed:.1f}x faster than miss")
def test_content_hash_overhead(self):
"""
Micro-benchmark the sha256 of the content string — paid on every
_vectorize() call regardless of cache state, including x4 per doc.
"""
import hashlib
content = self.content
encoded = content.encode()
runs = 5_000
t0 = time.perf_counter()
for _ in range(runs):
hashlib.sha256(encoded).hexdigest()
us_per_call = (time.perf_counter() - t0) / runs * 1_000_000
print(f"\n Content: {len(content)} chars / {len(encoded)} bytes")
print(f" sha256 cost per call: {us_per_call:.2f} us (avg over {runs} runs)")
print(f" x4 calls per document: {us_per_call * 4:.2f} us total overhead")

293
test_doclist_profile.py Normal file
View File

@@ -0,0 +1,293 @@
"""
Document list API profiling — no search, pure ORM path.
Run with:
uv run pytest ../test_doclist_profile.py \
-m profiling --override-ini="addopts=" -s -v
Corpus: 5 000 documents, 30 correspondents, 20 doc types, 80 tags,
~500 notes (10 %), 10 custom fields with instances on ~50 % of docs.
Scenarios
---------
TestDocListProfile
- test_list_default_ordering GET /api/documents/ created desc, page 1, page_size=25
- test_list_title_ordering same with ordering=title
- test_list_page_size_comparison page_size=10 / 25 / 100 in sequence
- test_list_detail_fields GET /api/documents/{id}/ — single document serializer cost
- test_list_cpu_profile cProfile of one list request
TestSelectionDataProfile
- test_selection_data_unfiltered _get_selection_data_for_queryset(all docs) in isolation
- test_selection_data_via_api GET /api/documents/?include_selection_data=true
- test_selection_data_filtered filtered vs unfiltered COUNT query comparison
"""
from __future__ import annotations
import datetime
import random
import time
import pytest
from django.contrib.auth.models import User
from faker import Faker
from profiling import profile_block
from profiling import profile_cpu
from rest_framework.test import APIClient
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import Note
from documents.models import Tag
from documents.views import DocumentViewSet
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
# ---------------------------------------------------------------------------
# Corpus parameters
# ---------------------------------------------------------------------------
NUM_DOCS = 5_000
NUM_CORRESPONDENTS = 30
NUM_DOC_TYPES = 20
NUM_TAGS = 80
NOTE_FRACTION = 0.10
CUSTOM_FIELD_COUNT = 10
CUSTOM_FIELD_FRACTION = 0.50
PAGE_SIZE = 25
SEED = 42
# ---------------------------------------------------------------------------
# Module-scoped corpus fixture
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def doclist_corpus(module_db):
"""
Build a 5 000-document corpus with tags, notes, custom fields, correspondents,
and doc types. All objects are deleted on teardown.
"""
fake = Faker()
Faker.seed(SEED)
rng = random.Random(SEED)
print(f"\n[setup] Creating {NUM_CORRESPONDENTS} correspondents...") # noqa: T201
correspondents = [
Correspondent.objects.create(name=f"dlcorp-{i}-{fake.company()}"[:128])
for i in range(NUM_CORRESPONDENTS)
]
print(f"[setup] Creating {NUM_DOC_TYPES} doc types...") # noqa: T201
doc_types = [
DocumentType.objects.create(name=f"dltype-{i}-{fake.word()}"[:128])
for i in range(NUM_DOC_TYPES)
]
print(f"[setup] Creating {NUM_TAGS} tags...") # noqa: T201
tags = [
Tag.objects.create(name=f"dltag-{i}-{fake.word()}"[:100])
for i in range(NUM_TAGS)
]
print(f"[setup] Creating {CUSTOM_FIELD_COUNT} custom fields...") # noqa: T201
custom_fields = [
CustomField.objects.create(
name=f"Field {i}",
data_type=CustomField.FieldDataType.STRING,
)
for i in range(CUSTOM_FIELD_COUNT)
]
note_user = User.objects.create_user(username="doclistnoteuser", password="x")
owner = User.objects.create_superuser(username="doclistowner", password="admin")
print(f"[setup] Building {NUM_DOCS} document rows...") # noqa: T201
base_date = datetime.date(2018, 1, 1)
raw_docs = []
for i in range(NUM_DOCS):
day_offset = rng.randint(0, 6 * 365)
raw_docs.append(
Document(
title=fake.sentence(nb_words=rng.randint(3, 8)).rstrip("."),
content="\n\n".join(
fake.paragraph(nb_sentences=rng.randint(2, 5))
for _ in range(rng.randint(1, 3))
),
checksum=f"DL{i:07d}",
correspondent=rng.choice(correspondents + [None] * 5),
document_type=rng.choice(doc_types + [None] * 4),
created=base_date + datetime.timedelta(days=day_offset),
owner=owner if rng.random() < 0.8 else None,
),
)
t0 = time.perf_counter()
documents = Document.objects.bulk_create(raw_docs)
print(f"[setup] bulk_create {NUM_DOCS} docs: {time.perf_counter() - t0:.2f}s") # noqa: T201
t0 = time.perf_counter()
for doc in documents:
k = rng.randint(0, 5)
if k:
doc.tags.add(*rng.sample(tags, k))
print(f"[setup] tag M2M assignments: {time.perf_counter() - t0:.2f}s") # noqa: T201
note_docs = rng.sample(documents, int(NUM_DOCS * NOTE_FRACTION))
Note.objects.bulk_create(
[
Note(
document=doc,
note=fake.sentence(nb_words=rng.randint(4, 15)),
user=note_user,
)
for doc in note_docs
],
)
cf_docs = rng.sample(documents, int(NUM_DOCS * CUSTOM_FIELD_FRACTION))
CustomFieldInstance.objects.bulk_create(
[
CustomFieldInstance(
document=doc,
field=rng.choice(custom_fields),
value_text=fake.word(),
)
for doc in cf_docs
],
)
first_doc_pk = documents[0].pk
yield {"owner": owner, "first_doc_pk": first_doc_pk, "tags": tags}
print("\n[teardown] Removing doclist corpus...") # noqa: T201
Document.objects.all().delete()
Correspondent.objects.all().delete()
DocumentType.objects.all().delete()
Tag.objects.all().delete()
CustomField.objects.all().delete()
User.objects.filter(username__in=["doclistnoteuser", "doclistowner"]).delete()
# ---------------------------------------------------------------------------
# TestDocListProfile
# ---------------------------------------------------------------------------
class TestDocListProfile:
"""Profile GET /api/documents/ — pure ORM path, no Tantivy."""
@pytest.fixture(autouse=True)
def _client(self, doclist_corpus):
owner = doclist_corpus["owner"]
self.client = APIClient()
self.client.force_authenticate(user=owner)
self.first_doc_pk = doclist_corpus["first_doc_pk"]
def test_list_default_ordering(self):
"""GET /api/documents/ default ordering (-created), page 1, page_size=25."""
with profile_block(
f"GET /api/documents/ default ordering [page_size={PAGE_SIZE}]",
):
response = self.client.get(
f"/api/documents/?page=1&page_size={PAGE_SIZE}",
)
assert response.status_code == 200
def test_list_title_ordering(self):
"""GET /api/documents/ ordered by title — tests ORM sort path."""
with profile_block(
f"GET /api/documents/?ordering=title [page_size={PAGE_SIZE}]",
):
response = self.client.get(
f"/api/documents/?ordering=title&page=1&page_size={PAGE_SIZE}",
)
assert response.status_code == 200
def test_list_page_size_comparison(self):
"""Compare serializer cost at page_size=10, 25, 100."""
for page_size in [10, 25, 100]:
with profile_block(f"GET /api/documents/ [page_size={page_size}]"):
response = self.client.get(
f"/api/documents/?page=1&page_size={page_size}",
)
assert response.status_code == 200
def test_list_detail_fields(self):
"""GET /api/documents/{id}/ — per-doc serializer cost with all relations."""
pk = self.first_doc_pk
with profile_block(f"GET /api/documents/{pk}/ — single doc serializer"):
response = self.client.get(f"/api/documents/{pk}/")
assert response.status_code == 200
def test_list_cpu_profile(self):
"""cProfile of one list request — surfaces hot frames in serializer."""
profile_cpu(
lambda: self.client.get(
f"/api/documents/?page=1&page_size={PAGE_SIZE}",
),
label=f"GET /api/documents/ cProfile [page_size={PAGE_SIZE}]",
top=30,
)
# ---------------------------------------------------------------------------
# TestSelectionDataProfile
# ---------------------------------------------------------------------------
class TestSelectionDataProfile:
"""Profile _get_selection_data_for_queryset — the 5+ COUNT queries per request."""
@pytest.fixture(autouse=True)
def _setup(self, doclist_corpus):
owner = doclist_corpus["owner"]
self.client = APIClient()
self.client.force_authenticate(user=owner)
self.tags = doclist_corpus["tags"]
def test_selection_data_unfiltered(self):
"""Call _get_selection_data_for_queryset(all docs) directly — COUNT queries in isolation."""
viewset = DocumentViewSet()
qs = Document.objects.all()
with profile_block("_get_selection_data_for_queryset(all docs) — direct call"):
viewset._get_selection_data_for_queryset(qs)
def test_selection_data_via_api(self):
"""Full API round-trip with include_selection_data=true."""
with profile_block(
f"GET /api/documents/?include_selection_data=true [page_size={PAGE_SIZE}]",
):
response = self.client.get(
f"/api/documents/?page=1&page_size={PAGE_SIZE}&include_selection_data=true",
)
assert response.status_code == 200
assert "selection_data" in response.data
def test_selection_data_filtered(self):
"""selection_data on a tag-filtered queryset — filtered COUNT vs unfiltered."""
tag = self.tags[0]
viewset = DocumentViewSet()
filtered_qs = Document.objects.filter(tags=tag)
unfiltered_qs = Document.objects.all()
print(f"\n Tag '{tag.name}' matches {filtered_qs.count()} docs") # noqa: T201
with profile_block("_get_selection_data_for_queryset(unfiltered)"):
viewset._get_selection_data_for_queryset(unfiltered_qs)
with profile_block("_get_selection_data_for_queryset(filtered by tag)"):
viewset._get_selection_data_for_queryset(filtered_qs)

284
test_matching_profile.py Normal file
View File

@@ -0,0 +1,284 @@
"""
Matching pipeline profiling.
Run with:
uv run pytest ../test_matching_profile.py \
-m profiling --override-ini="addopts=" -s -v
Corpus: 1 document + 50 correspondents, 100 tags, 25 doc types, 20 storage
paths. Labels are spread across all six matching algorithms
(NONE, ANY, ALL, LITERAL, REGEX, FUZZY, AUTO).
Classifier is passed as None -- MATCH_AUTO models skip prediction gracefully,
which is correct for isolating the ORM query and Python-side evaluation cost.
Scenarios
---------
TestMatchingPipelineProfile
- test_match_correspondents 50 correspondents, algorithm mix
- test_match_tags 100 tags
- test_match_document_types 25 doc types
- test_match_storage_paths 20 storage paths
- test_full_match_sequence all four in order (cumulative consumption cost)
- test_algorithm_breakdown each MATCH_* algorithm in isolation
"""
from __future__ import annotations
import random
import pytest
from faker import Faker
from profiling import profile_block
from documents.matching import match_correspondents
from documents.matching import match_document_types
from documents.matching import match_storage_paths
from documents.matching import match_tags
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
NUM_CORRESPONDENTS = 50
NUM_TAGS = 100
NUM_DOC_TYPES = 25
NUM_STORAGE_PATHS = 20
SEED = 42
# Algorithm distribution across labels (cycles through in order)
_ALGORITHMS = [
MatchingModel.MATCH_NONE,
MatchingModel.MATCH_ANY,
MatchingModel.MATCH_ALL,
MatchingModel.MATCH_LITERAL,
MatchingModel.MATCH_REGEX,
MatchingModel.MATCH_FUZZY,
MatchingModel.MATCH_AUTO,
]
def _algo(i: int) -> int:
return _ALGORITHMS[i % len(_ALGORITHMS)]
# ---------------------------------------------------------------------------
# Module-scoped corpus fixture
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def matching_corpus(module_db):
"""
1 document with realistic content + dense matching model sets.
Classifier=None so MATCH_AUTO models are simply skipped.
"""
fake = Faker()
Faker.seed(SEED)
random.seed(SEED)
# ---- matching models ---------------------------------------------------
print(f"\n[setup] Creating {NUM_CORRESPONDENTS} correspondents...") # noqa: T201
correspondents = []
for i in range(NUM_CORRESPONDENTS):
algo = _algo(i)
match_text = (
fake.word()
if algo not in (MatchingModel.MATCH_NONE, MatchingModel.MATCH_AUTO)
else ""
)
if algo == MatchingModel.MATCH_REGEX:
match_text = r"\b" + fake.word() + r"\b"
correspondents.append(
Correspondent.objects.create(
name=f"mcorp-{i}-{fake.company()}"[:128],
matching_algorithm=algo,
match=match_text,
),
)
print(f"[setup] Creating {NUM_TAGS} tags...") # noqa: T201
tags = []
for i in range(NUM_TAGS):
algo = _algo(i)
match_text = (
fake.word()
if algo not in (MatchingModel.MATCH_NONE, MatchingModel.MATCH_AUTO)
else ""
)
if algo == MatchingModel.MATCH_REGEX:
match_text = r"\b" + fake.word() + r"\b"
tags.append(
Tag.objects.create(
name=f"mtag-{i}-{fake.word()}"[:100],
matching_algorithm=algo,
match=match_text,
),
)
print(f"[setup] Creating {NUM_DOC_TYPES} doc types...") # noqa: T201
doc_types = []
for i in range(NUM_DOC_TYPES):
algo = _algo(i)
match_text = (
fake.word()
if algo not in (MatchingModel.MATCH_NONE, MatchingModel.MATCH_AUTO)
else ""
)
if algo == MatchingModel.MATCH_REGEX:
match_text = r"\b" + fake.word() + r"\b"
doc_types.append(
DocumentType.objects.create(
name=f"mtype-{i}-{fake.word()}"[:128],
matching_algorithm=algo,
match=match_text,
),
)
print(f"[setup] Creating {NUM_STORAGE_PATHS} storage paths...") # noqa: T201
storage_paths = []
for i in range(NUM_STORAGE_PATHS):
algo = _algo(i)
match_text = (
fake.word()
if algo not in (MatchingModel.MATCH_NONE, MatchingModel.MATCH_AUTO)
else ""
)
if algo == MatchingModel.MATCH_REGEX:
match_text = r"\b" + fake.word() + r"\b"
storage_paths.append(
StoragePath.objects.create(
name=f"mpath-{i}-{fake.word()}",
path=f"{fake.word()}/{{title}}",
matching_algorithm=algo,
match=match_text,
),
)
# ---- document with diverse content ------------------------------------
doc = Document.objects.create(
title="quarterly invoice payment tax financial statement",
content=" ".join(fake.paragraph(nb_sentences=5) for _ in range(3)),
checksum="MATCHPROF0001",
)
print(f"[setup] Document pk={doc.pk}, content length={len(doc.content)} chars") # noqa: T201
print( # noqa: T201
f" Correspondents: {NUM_CORRESPONDENTS} "
f"({sum(1 for c in correspondents if c.matching_algorithm == MatchingModel.MATCH_AUTO)} AUTO)",
)
print( # noqa: T201
f" Tags: {NUM_TAGS} "
f"({sum(1 for t in tags if t.matching_algorithm == MatchingModel.MATCH_AUTO)} AUTO)",
)
yield {"doc": doc}
# Teardown
print("\n[teardown] Removing matching corpus...") # noqa: T201
Document.objects.all().delete()
Correspondent.objects.all().delete()
Tag.objects.all().delete()
DocumentType.objects.all().delete()
StoragePath.objects.all().delete()
# ---------------------------------------------------------------------------
# TestMatchingPipelineProfile
# ---------------------------------------------------------------------------
class TestMatchingPipelineProfile:
"""Profile the matching functions called per document during consumption."""
@pytest.fixture(autouse=True)
def _setup(self, matching_corpus):
self.doc = matching_corpus["doc"]
def test_match_correspondents(self):
"""50 correspondents, algorithm mix. Query count + time."""
with profile_block(
f"match_correspondents() [{NUM_CORRESPONDENTS} correspondents, mixed algorithms]",
):
result = match_correspondents(self.doc, classifier=None)
print(f" -> {len(result)} matched") # noqa: T201
def test_match_tags(self):
"""100 tags -- densest set in real installs."""
with profile_block(f"match_tags() [{NUM_TAGS} tags, mixed algorithms]"):
result = match_tags(self.doc, classifier=None)
print(f" -> {len(result)} matched") # noqa: T201
def test_match_document_types(self):
"""25 doc types."""
with profile_block(
f"match_document_types() [{NUM_DOC_TYPES} types, mixed algorithms]",
):
result = match_document_types(self.doc, classifier=None)
print(f" -> {len(result)} matched") # noqa: T201
def test_match_storage_paths(self):
"""20 storage paths."""
with profile_block(
f"match_storage_paths() [{NUM_STORAGE_PATHS} paths, mixed algorithms]",
):
result = match_storage_paths(self.doc, classifier=None)
print(f" -> {len(result)} matched") # noqa: T201
def test_full_match_sequence(self):
"""All four match_*() calls in order -- cumulative cost per document consumed."""
with profile_block(
"full match sequence: correspondents + doc_types + tags + storage_paths",
):
match_correspondents(self.doc, classifier=None)
match_document_types(self.doc, classifier=None)
match_tags(self.doc, classifier=None)
match_storage_paths(self.doc, classifier=None)
def test_algorithm_breakdown(self):
"""Create one correspondent per algorithm and time each independently."""
import time
from documents.matching import matches
fake = Faker()
algo_names = {
MatchingModel.MATCH_NONE: "MATCH_NONE",
MatchingModel.MATCH_ANY: "MATCH_ANY",
MatchingModel.MATCH_ALL: "MATCH_ALL",
MatchingModel.MATCH_LITERAL: "MATCH_LITERAL",
MatchingModel.MATCH_REGEX: "MATCH_REGEX",
MatchingModel.MATCH_FUZZY: "MATCH_FUZZY",
}
doc = self.doc
print() # noqa: T201
for algo, name in algo_names.items():
match_text = fake.word() if algo != MatchingModel.MATCH_NONE else ""
if algo == MatchingModel.MATCH_REGEX:
match_text = r"\b" + fake.word() + r"\b"
model = Correspondent(
name=f"algo-test-{name}",
matching_algorithm=algo,
match=match_text,
)
# Time 1000 iterations to get stable microsecond readings
runs = 1_000
t0 = time.perf_counter()
for _ in range(runs):
matches(model, doc)
us_per_call = (time.perf_counter() - t0) / runs * 1_000_000
print( # noqa: T201
f" {name:<20s} {us_per_call:8.2f} us/call (match={match_text[:20]!r})",
)

154
test_sanity_profile.py Normal file
View File

@@ -0,0 +1,154 @@
"""
Sanity checker profiling.
Run with:
uv run pytest ../test_sanity_profile.py \
-m profiling --override-ini="addopts=" -s -v
Corpus: 2 000 documents with stub files (original + archive + thumbnail)
created in a temp MEDIA_ROOT.
Scenarios
---------
TestSanityCheckerProfile
- test_sanity_full_corpus full check_sanity() -- cProfile + tracemalloc
- test_sanity_query_pattern profile_block summary: query count + time
"""
from __future__ import annotations
import hashlib
import time
import pytest
from django.test import override_settings
from profiling import measure_memory
from profiling import profile_block
from profiling import profile_cpu
from documents.models import Document
from documents.sanity_checker import check_sanity
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
NUM_DOCS = 2_000
SEED = 42
# ---------------------------------------------------------------------------
# Module-scoped fixture: temp directories + corpus
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def sanity_corpus(tmp_path_factory, module_db):
"""
Build a 2 000-document corpus. For each document create stub files
(1-byte placeholders) in ORIGINALS_DIR, ARCHIVE_DIR, and THUMBNAIL_DIR
so the sanity checker's file-existence and checksum checks have real targets.
"""
media = tmp_path_factory.mktemp("sanity_media")
originals_dir = media / "documents" / "originals"
archive_dir = media / "documents" / "archive"
thumb_dir = media / "documents" / "thumbnails"
for d in (originals_dir, archive_dir, thumb_dir):
d.mkdir(parents=True)
# Use override_settings as a context manager for the whole fixture lifetime
settings_ctx = override_settings(
MEDIA_ROOT=media,
ORIGINALS_DIR=originals_dir,
ARCHIVE_DIR=archive_dir,
THUMBNAIL_DIR=thumb_dir,
MEDIA_LOCK=media / "media.lock",
)
settings_ctx.enable()
print(f"\n[setup] Creating {NUM_DOCS} documents with stub files...") # noqa: T201
t0 = time.perf_counter()
docs = []
for i in range(NUM_DOCS):
content = f"document content for doc {i}"
checksum = hashlib.sha256(content.encode()).hexdigest()
orig_filename = f"{i:07d}.pdf"
arch_filename = f"{i:07d}.pdf"
orig_path = originals_dir / orig_filename
arch_path = archive_dir / arch_filename
orig_path.write_bytes(content.encode())
arch_path.write_bytes(content.encode())
docs.append(
Document(
title=f"Document {i:05d}",
content=content,
checksum=checksum,
archive_checksum=checksum,
filename=orig_filename,
archive_filename=arch_filename,
mime_type="application/pdf",
),
)
created = Document.objects.bulk_create(docs, batch_size=500)
# Thumbnails use doc.pk, so create them after bulk_create assigns pks
for doc in created:
thumb_path = thumb_dir / f"{doc.pk:07d}.webp"
thumb_path.write_bytes(b"\x00") # minimal thumbnail stub
print( # noqa: T201
f"[setup] bulk_create + file creation: {time.perf_counter() - t0:.2f}s",
)
yield {"media": media}
# Teardown
print("\n[teardown] Removing sanity corpus...") # noqa: T201
Document.objects.all().delete()
settings_ctx.disable()
# ---------------------------------------------------------------------------
# TestSanityCheckerProfile
# ---------------------------------------------------------------------------
class TestSanityCheckerProfile:
"""Profile check_sanity() on a realistic corpus with real files."""
@pytest.fixture(autouse=True)
def _setup(self, sanity_corpus):
self.media = sanity_corpus["media"]
def test_sanity_full_corpus(self):
"""Full check_sanity() -- cProfile surfaces hot frames, tracemalloc shows peak."""
_, elapsed = profile_cpu(
lambda: check_sanity(scheduled=False),
label=f"check_sanity() [{NUM_DOCS} docs, real files]",
top=25,
)
_, peak_kib, delta_kib = measure_memory(
lambda: check_sanity(scheduled=False),
label=f"check_sanity() [{NUM_DOCS} docs] -- memory",
)
print("\n Summary:") # noqa: T201
print(f" Wall time (CPU profile run): {elapsed * 1000:.1f} ms") # noqa: T201
print(f" Peak memory (second run): {peak_kib:.1f} KiB") # noqa: T201
print(f" Memory delta: {delta_kib:+.1f} KiB") # noqa: T201
def test_sanity_query_pattern(self):
"""profile_block view: query count + query time + wall time in one summary."""
with profile_block(f"check_sanity() [{NUM_DOCS} docs] -- query count"):
check_sanity(scheduled=False)

273
test_search_profiling.py Normal file
View File

@@ -0,0 +1,273 @@
"""
Search performance profiling tests.
Run explicitly — excluded from the normal test suite:
uv run pytest -m profiling -s -p no:xdist --override-ini="addopts=" -v
The ``-s`` flag is required to see profile_block() output.
The ``-p no:xdist`` flag disables parallel execution for accurate measurements.
Corpus: 5 000 documents generated deterministically from a fixed Faker seed,
with realistic variety: 30 correspondents, 15 document types, 50 tags, ~500
notes spread across ~10 % of documents.
"""
from __future__ import annotations
import random
import pytest
from django.contrib.auth.models import User
from faker import Faker
from profiling import profile_block
from rest_framework.test import APIClient
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import Note
from documents.models import Tag
from documents.search import get_backend
from documents.search import reset_backend
from documents.search._backend import SearchMode
pytestmark = [pytest.mark.profiling, pytest.mark.search, pytest.mark.django_db]
# ---------------------------------------------------------------------------
# Corpus parameters
# ---------------------------------------------------------------------------
DOC_COUNT = 5_000
SEED = 42
NUM_CORRESPONDENTS = 30
NUM_DOC_TYPES = 15
NUM_TAGS = 50
NOTE_FRACTION = 0.10 # ~500 documents get a note
PAGE_SIZE = 25
def _build_corpus(rng: random.Random, fake: Faker) -> None:
"""
Insert the full corpus into the database and index it.
Uses bulk_create for the Document rows (fast) then handles the M2M tag
relationships and notes individually. Indexes the full corpus with a
single backend.rebuild() call.
"""
import datetime
# ---- lookup objects -------------------------------------------------
correspondents = [
Correspondent.objects.create(name=f"profcorp-{i}-{fake.company()}"[:128])
for i in range(NUM_CORRESPONDENTS)
]
doc_types = [
DocumentType.objects.create(name=f"proftype-{i}-{fake.word()}"[:128])
for i in range(NUM_DOC_TYPES)
]
tags = [
Tag.objects.create(name=f"proftag-{i}-{fake.word()}"[:100])
for i in range(NUM_TAGS)
]
note_user = User.objects.create_user(username="profnoteuser", password="x")
# ---- bulk-create documents ------------------------------------------
base_date = datetime.date(2018, 1, 1)
raw_docs = []
for i in range(DOC_COUNT):
day_offset = rng.randint(0, 6 * 365)
created = base_date + datetime.timedelta(days=day_offset)
raw_docs.append(
Document(
title=fake.sentence(nb_words=rng.randint(3, 9)).rstrip("."),
content="\n\n".join(
fake.paragraph(nb_sentences=rng.randint(3, 7))
for _ in range(rng.randint(2, 5))
),
checksum=f"PROF{i:07d}",
correspondent=rng.choice(correspondents + [None] * 8),
document_type=rng.choice(doc_types + [None] * 4),
created=created,
),
)
documents = Document.objects.bulk_create(raw_docs)
# ---- tags (M2M, post-bulk) ------------------------------------------
for doc in documents:
k = rng.randint(0, 5)
if k:
doc.tags.add(*rng.sample(tags, k))
# ---- notes on ~10 % of docs -----------------------------------------
note_docs = rng.sample(documents, int(DOC_COUNT * NOTE_FRACTION))
for doc in note_docs:
Note.objects.create(
document=doc,
note=fake.sentence(nb_words=rng.randint(6, 20)),
user=note_user,
)
# ---- build Tantivy index --------------------------------------------
backend = get_backend()
qs = Document.objects.select_related(
"correspondent",
"document_type",
"storage_path",
"owner",
).prefetch_related("tags", "notes__user", "custom_fields__field")
backend.rebuild(qs)
class TestSearchProfiling:
"""
Performance profiling for the Tantivy search backend and DRF API layer.
Each test builds a fresh 5 000-document corpus, exercises one hot path,
and prints profile_block() measurements to stdout. No correctness
assertions — the goal is to surface hot spots and track regressions.
"""
@pytest.fixture(autouse=True)
def _setup(self, tmp_path, settings):
index_dir = tmp_path / "index"
index_dir.mkdir()
settings.INDEX_DIR = index_dir
reset_backend()
rng = random.Random(SEED)
fake = Faker()
Faker.seed(SEED)
self.user = User.objects.create_superuser(
username="profiler",
password="admin",
)
self.client = APIClient()
self.client.force_authenticate(user=self.user)
_build_corpus(rng, fake)
yield
reset_backend()
# -- 1. Backend: search_ids relevance ---------------------------------
def test_profile_search_ids_relevance(self):
"""Profile: search_ids() with relevance ordering across several queries."""
backend = get_backend()
queries = [
"invoice payment",
"annual report",
"bank statement",
"contract agreement",
"receipt",
]
with profile_block(f"search_ids — relevance ({len(queries)} queries)"):
for q in queries:
backend.search_ids(q, user=None)
# -- 2. Backend: search_ids with Tantivy-native sort ------------------
def test_profile_search_ids_sorted(self):
"""Profile: search_ids() sorted by a Tantivy fast field (created)."""
backend = get_backend()
with profile_block("search_ids — sorted by created (asc + desc)"):
backend.search_ids(
"the",
user=None,
sort_field="created",
sort_reverse=False,
)
backend.search_ids(
"the",
user=None,
sort_field="created",
sort_reverse=True,
)
# -- 3. Backend: highlight_hits for a page of 25 ----------------------
def test_profile_highlight_hits(self):
"""Profile: highlight_hits() for a 25-document page."""
backend = get_backend()
all_ids = backend.search_ids("report", user=None)
page_ids = all_ids[:PAGE_SIZE]
with profile_block(f"highlight_hits — {len(page_ids)} docs"):
backend.highlight_hits("report", page_ids)
# -- 4. Backend: autocomplete -----------------------------------------
def test_profile_autocomplete(self):
"""Profile: autocomplete() with eight common prefixes."""
backend = get_backend()
prefixes = ["inv", "pay", "con", "rep", "sta", "acc", "doc", "fin"]
with profile_block(f"autocomplete — {len(prefixes)} prefixes"):
for prefix in prefixes:
backend.autocomplete(prefix, limit=10)
# -- 5. Backend: simple-mode search (TEXT and TITLE) ------------------
def test_profile_search_ids_simple_modes(self):
"""Profile: search_ids() in TEXT and TITLE simple-search modes."""
backend = get_backend()
queries = ["invoice 2023", "annual report", "bank statement"]
with profile_block(
f"search_ids — TEXT + TITLE modes ({len(queries)} queries each)",
):
for q in queries:
backend.search_ids(q, user=None, search_mode=SearchMode.TEXT)
backend.search_ids(q, user=None, search_mode=SearchMode.TITLE)
# -- 6. API: full round-trip, relevance + page 1 ----------------------
def test_profile_api_relevance_search(self):
"""Profile: full API search round-trip, relevance order, page 1."""
with profile_block(
f"API /documents/?query=… relevance (page 1, page_size={PAGE_SIZE})",
):
response = self.client.get(
f"/api/documents/?query=invoice+payment&page=1&page_size={PAGE_SIZE}",
)
assert response.status_code == 200
# -- 7. API: full round-trip, ORM-ordered (title) ---------------------
def test_profile_api_orm_sorted_search(self):
"""Profile: full API search round-trip with ORM-delegated sort (title)."""
with profile_block("API /documents/?query=…&ordering=title"):
response = self.client.get(
f"/api/documents/?query=report&ordering=title&page=1&page_size={PAGE_SIZE}",
)
assert response.status_code == 200
# -- 8. API: full round-trip, score sort ------------------------------
def test_profile_api_score_sort(self):
"""Profile: full API search with ordering=-score (relevance, preserve order)."""
with profile_block("API /documents/?query=…&ordering=-score"):
response = self.client.get(
f"/api/documents/?query=statement&ordering=-score&page=1&page_size={PAGE_SIZE}",
)
assert response.status_code == 200
# -- 9. API: full round-trip, with selection_data ---------------------
def test_profile_api_with_selection_data(self):
"""Profile: full API search including include_selection_data=true."""
with profile_block("API /documents/?query=…&include_selection_data=true"):
response = self.client.get(
f"/api/documents/?query=contract&page=1&page_size={PAGE_SIZE}"
"&include_selection_data=true",
)
assert response.status_code == 200
assert "selection_data" in response.data
# -- 10. API: paginated (page 2) --------------------------------------
def test_profile_api_page_2(self):
"""Profile: full API search, page 2 — exercises page offset arithmetic."""
with profile_block(f"API /documents/?query=…&page=2&page_size={PAGE_SIZE}"):
response = self.client.get(
f"/api/documents/?query=the&page=2&page_size={PAGE_SIZE}",
)
assert response.status_code == 200

231
test_workflow_profile.py Normal file
View File

@@ -0,0 +1,231 @@
"""
Workflow trigger matching profiling.
Run with:
uv run pytest ../test_workflow_profile.py \
-m profiling --override-ini="addopts=" -s -v
Corpus: 500 documents + correspondents + tags + sets of WorkflowTrigger
objects at 5 and 20 count to allow scaling comparisons.
Scenarios
---------
TestWorkflowMatchingProfile
- test_existing_document_5_workflows existing_document_matches_workflow x 5 triggers
- test_existing_document_20_workflows same x 20 triggers
- test_workflow_prefilter prefilter_documents_by_workflowtrigger on 500 docs
- test_trigger_type_comparison compare DOCUMENT_ADDED vs DOCUMENT_UPDATED overhead
"""
from __future__ import annotations
import random
import time
import pytest
from faker import Faker
from profiling import profile_block
from documents.matching import existing_document_matches_workflow
from documents.matching import prefilter_documents_by_workflowtrigger
from documents.models import Correspondent
from documents.models import Document
from documents.models import Tag
from documents.models import Workflow
from documents.models import WorkflowAction
from documents.models import WorkflowTrigger
pytestmark = [pytest.mark.profiling, pytest.mark.django_db]
NUM_DOCS = 500
NUM_CORRESPONDENTS = 10
NUM_TAGS = 20
SEED = 42
# ---------------------------------------------------------------------------
# Module-scoped fixture
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def module_db(django_db_setup, django_db_blocker):
"""Unlock the DB for the whole module (module-scoped)."""
with django_db_blocker.unblock():
yield
@pytest.fixture(scope="module")
def workflow_corpus(module_db):
"""
500 documents + correspondents + tags + sets of workflow triggers
at 5 and 20 count to allow scaling comparisons.
"""
fake = Faker()
Faker.seed(SEED)
rng = random.Random(SEED)
# ---- lookup objects ---------------------------------------------------
print("\n[setup] Creating lookup objects...") # noqa: T201
correspondents = [
Correspondent.objects.create(name=f"wfcorp-{i}-{fake.company()}"[:128])
for i in range(NUM_CORRESPONDENTS)
]
tags = [
Tag.objects.create(name=f"wftag-{i}-{fake.word()}"[:100])
for i in range(NUM_TAGS)
]
# ---- documents --------------------------------------------------------
print(f"[setup] Building {NUM_DOCS} documents...") # noqa: T201
raw_docs = []
for i in range(NUM_DOCS):
raw_docs.append(
Document(
title=fake.sentence(nb_words=4).rstrip("."),
content=fake.paragraph(nb_sentences=3),
checksum=f"WF{i:07d}",
correspondent=rng.choice(correspondents + [None] * 3),
),
)
documents = Document.objects.bulk_create(raw_docs, batch_size=500)
for doc in documents:
k = rng.randint(0, 3)
if k:
doc.tags.add(*rng.sample(tags, k))
sample_doc = documents[0]
print(f"[setup] Sample doc pk={sample_doc.pk}") # noqa: T201
# ---- build triggers at scale 5 and 20 --------------------------------
_wf_counter = [0]
def _make_triggers(n: int, trigger_type: int) -> list[WorkflowTrigger]:
triggers = []
for i in range(n):
# Alternate between no filter and a correspondent filter
corr = correspondents[i % NUM_CORRESPONDENTS] if i % 3 == 0 else None
trigger = WorkflowTrigger.objects.create(
type=trigger_type,
filter_has_correspondent=corr,
)
action = WorkflowAction.objects.create(
type=WorkflowAction.WorkflowActionType.ASSIGNMENT,
)
idx = _wf_counter[0]
_wf_counter[0] += 1
wf = Workflow.objects.create(name=f"wf-profile-{idx}")
wf.triggers.add(trigger)
wf.actions.add(action)
triggers.append(trigger)
return triggers
print("[setup] Creating workflow triggers...") # noqa: T201
triggers_5 = _make_triggers(5, WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED)
triggers_20 = _make_triggers(
20,
WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
)
triggers_added = _make_triggers(
5,
WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
yield {
"doc": sample_doc,
"triggers_5": triggers_5,
"triggers_20": triggers_20,
"triggers_added": triggers_added,
}
# Teardown
print("\n[teardown] Removing workflow corpus...") # noqa: T201
Workflow.objects.all().delete()
WorkflowTrigger.objects.all().delete()
WorkflowAction.objects.all().delete()
Document.objects.all().delete()
Correspondent.objects.all().delete()
Tag.objects.all().delete()
# ---------------------------------------------------------------------------
# TestWorkflowMatchingProfile
# ---------------------------------------------------------------------------
class TestWorkflowMatchingProfile:
"""Profile workflow trigger evaluation per document save."""
@pytest.fixture(autouse=True)
def _setup(self, workflow_corpus):
self.doc = workflow_corpus["doc"]
self.triggers_5 = workflow_corpus["triggers_5"]
self.triggers_20 = workflow_corpus["triggers_20"]
self.triggers_added = workflow_corpus["triggers_added"]
def test_existing_document_5_workflows(self):
"""existing_document_matches_workflow x 5 DOCUMENT_UPDATED triggers."""
doc = self.doc
triggers = self.triggers_5
with profile_block(
f"existing_document_matches_workflow [{len(triggers)} triggers]",
):
for trigger in triggers:
existing_document_matches_workflow(doc, trigger)
def test_existing_document_20_workflows(self):
"""existing_document_matches_workflow x 20 triggers -- shows linear scaling."""
doc = self.doc
triggers = self.triggers_20
with profile_block(
f"existing_document_matches_workflow [{len(triggers)} triggers]",
):
for trigger in triggers:
existing_document_matches_workflow(doc, trigger)
# Also time each call individually to show per-trigger overhead
timings = []
for trigger in triggers:
t0 = time.perf_counter()
existing_document_matches_workflow(doc, trigger)
timings.append((time.perf_counter() - t0) * 1_000_000)
avg_us = sum(timings) / len(timings)
print(f"\n Per-trigger avg: {avg_us:.1f} us (n={len(timings)})") # noqa: T201
def test_workflow_prefilter(self):
"""prefilter_documents_by_workflowtrigger on 500 docs -- tag + correspondent filters."""
qs = Document.objects.all()
print(f"\n Corpus: {qs.count()} documents") # noqa: T201
for trigger in self.triggers_20[:3]:
label = (
f"prefilter_documents_by_workflowtrigger "
f"[corr={trigger.filter_has_correspondent_id}]"
)
with profile_block(label):
result = prefilter_documents_by_workflowtrigger(qs, trigger)
# Evaluate the queryset
count = result.count()
print(f" -> {count} docs passed filter") # noqa: T201
def test_trigger_type_comparison(self):
"""Compare per-call overhead of DOCUMENT_UPDATED vs DOCUMENT_ADDED."""
doc = self.doc
runs = 200
for label, triggers in [
("DOCUMENT_UPDATED", self.triggers_5),
("DOCUMENT_ADDED", self.triggers_added),
]:
t0 = time.perf_counter()
for _ in range(runs):
for trigger in triggers:
existing_document_matches_workflow(doc, trigger)
total_calls = runs * len(triggers)
us_per_call = (time.perf_counter() - t0) / total_calls * 1_000_000
print( # noqa: T201
f" {label:<22s} {us_per_call:.2f} us/call "
f"({total_calls} calls, {len(triggers)} triggers)",
)

8
uv.lock generated
View File

@@ -875,15 +875,15 @@ wheels = [
[[package]]
name = "django"
version = "5.2.13"
version = "5.2.12"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "asgiref", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "sqlparse", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1f/c5/c69e338eb2959f641045802e5ea87ca4bf5ac90c5fd08953ca10742fad51/django-5.2.13.tar.gz", hash = "sha256:a31589db5188d074c63f0945c3888fad104627dfcc236fb2b97f71f89da33bc4", size = 10890368, upload-time = "2026-04-07T14:02:15.072Z" }
sdist = { url = "https://files.pythonhosted.org/packages/bd/55/b9445fc0695b03746f355c05b2eecc54c34e05198c686f4fc4406b722b52/django-5.2.12.tar.gz", hash = "sha256:6b809af7165c73eff5ce1c87fdae75d4da6520d6667f86401ecf55b681eb1eeb", size = 10860574, upload-time = "2026-03-03T13:56:05.509Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/59/b1/51ab36b2eefcf8cdb9338c7188668a157e29e30306bfc98a379704c9e10d/django-5.2.13-py3-none-any.whl", hash = "sha256:5788fce61da23788a8ce6f02583765ab060d396720924789f97fa42119d37f7a", size = 8310982, upload-time = "2026-04-07T14:02:08.883Z" },
{ url = "https://files.pythonhosted.org/packages/4e/32/4b144e125678efccf5d5b61581de1c4088d6b0286e46096e3b8de0d556c8/django-5.2.12-py3-none-any.whl", hash = "sha256:4853482f395c3a151937f6991272540fcbf531464f254a347bf7c89f53c8cff7", size = 8310245, upload-time = "2026-03-03T13:56:01.174Z" },
]
[[package]]
@@ -3014,7 +3014,7 @@ requires-dist = [
{ name = "channels-redis", specifier = "~=4.2" },
{ name = "concurrent-log-handler", specifier = "~=0.9.25" },
{ name = "dateparser", specifier = "~=1.2" },
{ name = "django", specifier = "~=5.2.13" },
{ name = "django", specifier = "~=5.2.10" },
{ name = "django-allauth", extras = ["mfa", "socialaccount"], specifier = "~=65.15.0" },
{ name = "django-auditlog", specifier = "~=3.4.1" },
{ name = "django-cachalot", specifier = "~=2.9.0" },