Files
parsedmarc/tests/test_elastic.py
T
Sean Whalen b7b8383fa4 Expand honest test coverage from 59% to 83%; fix two latent bugs (#775)
* Expand honest test coverage from 59% to 83%; fix two latent bugs

271 new tests across the output modules, ES/OS clients, CLI config
parsing, and the top-level parsing surface. Coverage measured against
shipped code only (see [tool.coverage.run] source = ["parsedmarc"]
omit = ["*/parsedmarc/resources/maps/*.py"] in pyproject.toml).

Per-module results:

  s3.py             38% → 100%   (also fixes SMTP-TLS-to-S3 bug below)
  gelf.py           40% → 100%
  syslog.py         46% → 100%
  kafkaclient.py    34% → 100%
  splunk.py         24% → 100%
  loganalytics.py   56% → 100%
  webhook.py        78% → 100%   (also removes redundant try/except)
  elastic.py        36% →  99%
  opensearch.py     40% →  99%
  cli.py            52% →  69%
  __init__.py       74% →  76%   (also fixes append_json bug below)
  utils.py          84% (unchanged in this PR)
  TOTAL             59% →  83%

The remaining 17% is honest. The biggest unreached blocks are
_main() in cli.py and the watch-mode mailbox iteration in __init__.py,
both of which would require either standing up live subsystems (real
Elasticsearch, real IMAP) or mocking deep enough that the test would
verify the mock rather than the code. The PR-A AGENTS.md guidance —
"if 90% requires faking it, ship 85% honestly" — applies here.

Bugs fixed while writing tests:

1. parsedmarc/s3.py — SMTP-TLS-to-S3 was completely broken.
   save_report_to_s3 unconditionally read report["report_metadata"]
   when building S3 object metadata, but RFC 8460 §4.3 SMTP TLS
   reports are flat (no report_metadata sub-object). The CLI's
   surrounding try/except silently swallowed the KeyError, so every
   SMTP-TLS report quietly failed to upload. Also fixes a related
   issue: parse_smtp_tls_report_json stores begin_date as the raw
   ISO-8601 string from the report (per the SMTPTLSReport TypedDict
   and RFC 8460 §4.3), but the S3 code path assumed a datetime
   with .year / .month / .day attributes. Both fixed; the broken
   metadata-extraction branch now uses the flat-report fields, and
   the date branch normalizes via human_timestamp_to_datetime.

2. parsedmarc/__init__.py — append_json corrupted JSON output files
   on the second write. The original implementation opened files in
   "a+" mode, then seek()ed backwards to overwrite the trailing "]"
   with ",\n" before appending more elements. Python's docs are
   explicit (https://docs.python.org/3/library/functions.html#open):
   on POSIX, writes in "a"/"a+" mode always go to EOF regardless of
   seek() position. The result was that the second call produced
   [...]\n],\n[...] -style corrupted output instead of a single
   merged array. Replaced with a read-merge-write pattern: load the
   existing array (if any), append the new elements, rewrite the
   whole file. The CSV cousin append_csv was not affected — it
   doesn't seek backwards.

3. parsedmarc/webhook.py — removed redundant try/except blocks in
   save_aggregate_report_to_webhook / save_failure_report_to_webhook
   / save_smtp_tls_report_to_webhook. _send_to_webhook already
   catches every Exception itself, so the outer except blocks were
   unreachable dead code (covered nothing, defended against nothing,
   and inflated the source-line count without testing value).

Testing approach: mocks at SDK boundaries (boto3 resource, kafka
producer, requests session, opensearch/elasticsearch Document/Search,
azure LogsIngestionClient). Tests verify the parsedmarc-side
transformation logic — document/event construction, index/topic
naming, dedup queries, error wrapping — rather than asserting on
mock invocations as a proxy for behaviour. Where a branch is
defensive against a caller that doesn't exist in the codebase, the
test is omitted (commented in code rather than hidden behind a
pragma).

547 tests total (was 276), all passing. ruff check + format clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Document the two bug fixes from this PR in the 10.0.0 changelog

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Document testing standards in AGENTS.md

Adds a "Testing standards" section covering the principles applied in
PR-A (split) and PR-B (coverage expansion):

- Coverage measures shipped code only — don't reintroduce tests/* to
  the scope, don't expand omit, don't use # pragma: no cover.
- Honest tests assert on observable behaviour, not "the mock was called".
  Mock at SDK boundaries; parse the payload that gets sent.
- "If 90% requires faking it, ship 85% honestly" — coverage is a tool,
  not a goal. PR-B's deliberate stops at cli.py 69% and __init__.py 76%
  are the documented precedent for when to halt.
- Verify bug claims against the relevant RFC, internal types, installed
  SDK source, or upstream docs before changing code. Cite the source in
  the commit message and test docstring (RFC 8460 §4.3 and the Python
  open() docs for #775's two bug fixes are the pattern to follow).
- Bugs found while writing tests are fixed in the same PR; the test
  doubles as the regression guard.
- File layout (tests/test_<module>.py) is non-negotiable; module-level
  test loggers need fresh-handler setup so test ordering doesn't break
  assertLogs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Cover the corrupt-file fallback in append_json

Codecov flagged 2 missing patch-coverage lines on PR #775: the
except (json.JSONDecodeError, OSError) branch in append_json, which
falls back to overwriting when the existing file isn't a parseable
JSON array. Two new tests in tests/test_init.py:TestAppendJson
exercise both paths:

- test_corrupt_existing_file_is_overwritten_cleanly: existing file
  contains invalid JSON; append_json overwrites with the new array.
- test_existing_file_with_non_list_root_is_overwritten: existing
  file parses as {"foo": ...} (dict, not list); the isinstance guard
  rejects it and we overwrite cleanly.

Patch coverage now 100% on the bug fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 20:35:22 -04:00

843 lines
34 KiB
Python

"""Tests for parsedmarc.elastic
Mocks at the elasticsearch-dsl SDK boundary (connections.create_connection,
Index, Search, Document.save) so the tests verify the parsedmarc-side
transformation logic — document construction, index naming, deduplication
queries, error wrapping — without needing a running Elasticsearch cluster.
"""
import unittest
from unittest.mock import MagicMock, call, patch
import parsedmarc.elastic as elastic_module
from parsedmarc import InvalidFailureReport
from parsedmarc.elastic import (
AlreadySaved,
ElasticsearchError,
create_indexes,
migrate_indexes,
save_aggregate_report_to_elasticsearch,
save_failure_report_to_elasticsearch,
save_smtp_tls_report_to_elasticsearch,
set_hosts,
)
# ---------------------------------------------------------------------------
# Sample report fixtures
# ---------------------------------------------------------------------------
def _aggregate_report(**overrides):
base = {
"xml_schema": "draft",
"xml_namespace": None,
"report_metadata": {
"org_name": "TestOrg",
"org_email": "dmarc@example.com",
"org_extra_contact_info": None,
"report_id": "agg-1",
"begin_date": "2024-01-15 00:00:00",
"end_date": "2024-01-16 00:00:00",
"timespan_requires_normalization": False,
"original_timespan_seconds": 86400,
"errors": [],
"generator": "TestGen/1.0",
},
"policy_published": {
"domain": "example.com",
"adkim": "r",
"aspf": "r",
"p": "none",
"sp": "none",
"pct": None,
"fo": None,
"np": "reject",
"testing": "n",
"discovery_method": "treewalk",
},
"records": [
{
"interval_begin": "2024-01-15 00:00:00",
"interval_end": "2024-01-16 00:00:00",
"normalized_timespan": False,
"source": {
"ip_address": "192.0.2.1",
"country": "US",
"reverse_dns": None,
"base_domain": None,
"name": None,
"type": None,
"asn": 64496,
"as_name": "Example AS",
"as_domain": "example.net",
},
"count": 4,
"alignment": {"spf": True, "dkim": True, "dmarc": True},
"policy_evaluated": {
"disposition": "none",
"dkim": "pass",
"spf": "pass",
"policy_override_reasons": [
{"type": "local_policy", "comment": "approved"}
],
},
"identifiers": {
"header_from": "example.com",
"envelope_from": "example.com",
"envelope_to": "rcpt@example.com",
},
"auth_results": {
"dkim": [
{
"domain": "example.com",
"selector": "s",
"result": "pass",
"human_result": None,
}
],
"spf": [
{
"domain": "example.com",
"scope": "mfrom",
"result": "pass",
"human_result": None,
}
],
},
}
],
}
base.update(overrides)
return base
def _failure_report(**overrides):
base = {
"feedback_type": "auth-failure",
"user_agent": "test/1.0",
"version": "1",
"original_envelope_id": None,
"original_mail_from": "x@example.com",
"original_rcpt_to": None,
"arrival_date": "Thu, 1 Jan 2024 00:00:00 +0000",
"arrival_date_utc": "2024-01-01 00:00:00",
"authentication_results": None,
"delivery_result": "other",
"auth_failure": ["dmarc"],
"authentication_mechanisms": [],
"dkim_domain": None,
"reported_domain": "example.com",
"sample_headers_only": True,
"source": {
"ip_address": "192.0.2.5",
"country": "US",
"reverse_dns": None,
"base_domain": None,
"name": None,
"type": None,
"asn": 64496,
"as_name": "Example AS",
"as_domain": "example.net",
},
"sample": "raw",
"parsed_sample": {
"headers": {
# mailparser emits headers as [[display_name, address]]
# lists; an empty display becomes [["", address]].
"From": [["Sender Name", "sender@example.com"]],
"To": [["", "rcpt@example.com"]],
"Subject": "Test",
},
"subject": "Test",
"filename_safe_subject": "Test",
"body": "body",
"date": "Thu, 1 Jan 2024 00:00:00 +0000",
"to": [{"display_name": None, "address": "rcpt@example.com"}],
"reply_to": [],
"cc": [],
"bcc": [],
"attachments": [],
},
}
base.update(overrides)
return base
def _smtp_tls_report(**overrides):
base = {
"organization_name": "TestOrg",
"begin_date": "2024-02-03T00:00:00Z",
"end_date": "2024-02-04T00:00:00Z",
"contact_info": "tls@example.com",
"report_id": "tls-1",
"policies": [
{
"policy_domain": "example.com",
"policy_type": "sts",
"successful_session_count": 100,
"failed_session_count": 1,
"policy_strings": ["version: STSv1"],
"mx_host_patterns": ["*.example.com"],
"failure_details": [
{
"result_type": "certificate-expired",
"failed_session_count": 1,
"receiving_mx_hostname": "mx.example.com",
"sending_mta_ip": "10.0.0.1",
}
],
}
],
}
base.update(overrides)
return base
def _empty_search():
"""A Search() mock whose .execute() returns an empty hit list."""
search = MagicMock()
search.execute.return_value = []
return search
def _populated_search():
"""A Search() mock whose .execute() returns a non-empty hit list."""
search = MagicMock()
search.execute.return_value = [MagicMock()]
return search
# ---------------------------------------------------------------------------
# set_hosts: connection-parameter assembly
# ---------------------------------------------------------------------------
class TestSetHosts(unittest.TestCase):
"""Verify the conn_params dict handed to elasticsearch-dsl
matches each documented option. Each branch corresponds to a
real-world deployment shape (TLS, basic auth, API key, custom CA)."""
def test_single_host_string_normalized_to_list(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("https://es:9200")
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["hosts"], ["https://es:9200"])
def test_host_list_preserved(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts(["es1:9200", "es2:9200"])
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["hosts"], ["es1:9200", "es2:9200"])
def test_timeout_default_60s(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200")
self.assertEqual(mock_conn.call_args.kwargs["timeout"], 60.0)
def test_timeout_custom(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", timeout=30.0)
self.assertEqual(mock_conn.call_args.kwargs["timeout"], 30.0)
def test_use_ssl_enables_verify_by_default(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True)
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["use_ssl"], True)
self.assertEqual(kwargs["verify_certs"], True)
self.assertNotIn("ca_certs", kwargs)
def test_use_ssl_with_custom_ca(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True, ssl_cert_path="/etc/ca.pem")
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["ca_certs"], "/etc/ca.pem")
def test_skip_certificate_verification_sets_verify_false(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True, skip_certificate_verification=True)
self.assertEqual(mock_conn.call_args.kwargs["verify_certs"], False)
def test_username_password_sets_http_auth(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", username="u", password="p")
self.assertEqual(mock_conn.call_args.kwargs["http_auth"], ("u", "p"))
def test_username_without_password_not_set(self):
"""Half-configured auth is suspicious enough not to send."""
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", username="u")
self.assertNotIn("http_auth", mock_conn.call_args.kwargs)
def test_api_key_set(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", api_key="base64key==")
self.assertEqual(mock_conn.call_args.kwargs["api_key"], "base64key==")
# ---------------------------------------------------------------------------
# create_indexes
# ---------------------------------------------------------------------------
class TestCreateIndexes(unittest.TestCase):
def test_creates_missing_index_with_default_settings(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(["dmarc_aggregate-2024-01-15"])
mock_index.settings.assert_called_once_with(
number_of_shards=1, number_of_replicas=0
)
mock_index.create.assert_called_once()
def test_creates_with_custom_settings(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(
["idx"], settings={"number_of_shards": 3, "refresh_interval": "5s"}
)
mock_index.settings.assert_called_once_with(
number_of_shards=3, refresh_interval="5s"
)
def test_skips_existing_index(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = True
create_indexes(["idx"])
mock_index.create.assert_not_called()
def test_wraps_sdk_error(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index_cls.return_value.exists.side_effect = RuntimeError(
"cluster down"
)
with self.assertRaises(ElasticsearchError) as ctx:
create_indexes(["idx"])
self.assertIn("cluster down", str(ctx.exception))
# ---------------------------------------------------------------------------
# migrate_indexes
# ---------------------------------------------------------------------------
class TestMigrateIndexes(unittest.TestCase):
"""The legacy `published_policy.fo` field was mapped as `long` in
older indexes. migrate_indexes detects that and rebuilds the index
with the text/keyword shape. The branch is gnarly; a regression
would silently leave old data un-migrated."""
def test_no_indexes_is_noop(self):
migrate_indexes() # Should not raise
def test_skips_non_existent_index(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index_cls.return_value.exists.return_value = False
migrate_indexes(aggregate_indexes=["missing"])
# exists() returned False — no field_mapping fetch.
mock_index_cls.return_value.get_field_mapping.assert_not_called()
def test_skips_when_doc_mapping_absent(self):
"""An index that has 'fo' but not under the 'doc' type
(e.g., empty index with default mapping) is left alone."""
with patch("parsedmarc.elastic.Index") as mock_index_cls:
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {"some_key": {"mappings": {}}}
with patch("parsedmarc.elastic.reindex") as mock_reindex:
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2023-01-01"])
mock_reindex.assert_not_called()
def test_migrates_when_fo_is_long(self):
"""The actual migration path: when fo is mapped as 'long',
a v2 index is created with the corrected mapping, data is
reindexed, and the old index is deleted."""
with (
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch("parsedmarc.elastic.reindex") as mock_reindex,
patch("parsedmarc.elastic.connections.get_connection") as mock_get_conn,
):
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {
"dmarc_aggregate-2023-01-01": {
"mappings": {
"doc": {
"published_policy.fo": {"mapping": {"fo": {"type": "long"}}}
}
}
}
}
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2023-01-01"])
# reindex called from old → new (v2) index.
mock_reindex.assert_called_once()
# connections.get_connection consulted to get the ES client.
mock_get_conn.assert_called_once()
def test_skips_when_fo_already_text(self):
with (
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch("parsedmarc.elastic.reindex") as mock_reindex,
):
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {
"dmarc_aggregate-2024-01-01": {
"mappings": {
"doc": {
"published_policy.fo": {"mapping": {"fo": {"type": "text"}}}
}
}
}
}
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2024-01-01"])
mock_reindex.assert_not_called()
# ---------------------------------------------------------------------------
# save_aggregate_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveAggregateReport(unittest.TestCase):
"""The aggregate-report save fans out across multiple SDK calls:
Search (for dedup), Index.create (for the daily/monthly index),
Document.save. Each test patches the boundary it needs and
leaves the rest alone."""
def _patches(self, search_factory=_empty_search):
return [
patch("parsedmarc.elastic.Search", return_value=search_factory()),
patch(
"parsedmarc.elastic.Index",
return_value=MagicMock(exists=MagicMock(return_value=True)),
),
patch.object(elastic_module._AggregateReportDoc, "save"),
]
def test_save_emits_one_document_per_record(self):
report = _aggregate_report()
report["records"].append(report["records"][0].copy())
patches = self._patches()
with patches[0], patches[1], patches[2] as mock_save:
save_aggregate_report_to_elasticsearch(report)
# Two records → two saves.
self.assertEqual(mock_save.call_count, 2)
def test_already_saved_raises_when_search_returns_hit(self):
"""The dedup query is the only thing preventing
double-indexing on re-run. A regression would silently
re-save reports, inflating Kibana counts."""
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._AggregateReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_aggregate_report_to_elasticsearch(_aggregate_report())
mock_save.assert_not_called()
def test_search_exception_wraps_to_elasticsearch_error(self):
bad_search = MagicMock()
bad_search.execute.side_effect = RuntimeError("network")
with (
patch("parsedmarc.elastic.Search", return_value=bad_search),
patch("parsedmarc.elastic.Index"),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_aggregate_report_to_elasticsearch(_aggregate_report())
self.assertIn("network", str(ctx.exception))
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._AggregateReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_aggregate_report_to_elasticsearch(_aggregate_report())
self.assertIn("disk", str(ctx.exception))
def test_index_name_uses_daily_format_by_default(self):
"""Index naming: dmarc_aggregate-YYYY-MM-DD by default."""
index_calls = []
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(_aggregate_report())
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_aggregate-2024-01-15", index_calls)
def test_index_name_uses_monthly_format_when_flag_set(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(
_aggregate_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_aggregate-2024-01", index_calls)
def test_index_name_honours_suffix_and_prefix(self):
"""Prefix/suffix support multi-tenant setups where one ES
cluster serves several DMARC owners."""
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(
_aggregate_report(),
index_suffix="tenant_a",
index_prefix="customer1_",
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("customer1_dmarc_aggregate_tenant_a-2024-01-15", index_calls)
def test_dedup_search_pattern_uses_suffix_wildcard(self):
"""Existing-report search uses '*' so it matches both
daily and monthly index buckets."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch(
"parsedmarc.elastic.Index",
return_value=MagicMock(exists=MagicMock(return_value=True)),
),
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_aggregate_report_to_elasticsearch(
_aggregate_report(), index_suffix="tenant_a", index_prefix="cust_"
)
# Search index pattern wraps prefix+name+suffix with trailing wildcard.
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("cust_dmarc_aggregate_tenant_a*", search_index)
# ---------------------------------------------------------------------------
# save_failure_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveFailureReport(unittest.TestCase):
def test_save_emits_one_document(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(_failure_report())
mock_save.assert_called_once()
def test_already_saved_raises_on_dedup_hit(self):
"""Failure-report dedup uses arrival_date + From/To/Subject
from the parsed sample. A hit means we've already indexed
this exact failure sample."""
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_failure_report_to_elasticsearch(_failure_report())
mock_save.assert_not_called()
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._FailureReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_failure_report_to_elasticsearch(_failure_report())
self.assertIn("disk", str(ctx.exception))
def test_keyerror_wraps_to_invalid_failure_report(self):
"""A malformed failure report (missing a required field) is
surfaced as InvalidFailureReport so the caller can route it
differently from infra errors."""
report = _failure_report()
del report["feedback_type"]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
with self.assertRaises(InvalidFailureReport):
save_failure_report_to_elasticsearch(report)
def test_index_dedup_pattern_searches_both_old_and_new_names(self):
"""The split-PR rename forensic→failure left existing data
in dmarc_forensic*; the dedup search must check both names
so re-runs don't double-index."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_failure_report_to_elasticsearch(_failure_report())
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("dmarc_failure*", search_index)
self.assertIn("dmarc_forensic*", search_index)
def test_index_name_uses_arrival_date_for_monthly_partition(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._FailureReportDoc, "save"),
):
save_failure_report_to_elasticsearch(
_failure_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_failure-2024-01", index_calls)
def test_failure_search_index_with_suffix_and_prefix(self):
"""When both suffix and prefix are set, the dedup search
pattern joins them onto BOTH dmarc_failure* and
dmarc_forensic* (the rename back-compat)."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_failure_report_to_elasticsearch(
_failure_report(),
index_suffix="tenant_a",
index_prefix="cust_",
)
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("cust_dmarc_failure_tenant_a*", search_index)
self.assertIn("cust_dmarc_forensic_tenant_a*", search_index)
def test_failure_index_honours_suffix_and_prefix(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._FailureReportDoc, "save"),
):
save_failure_report_to_elasticsearch(
_failure_report(),
index_suffix="tenant_a",
index_prefix="cust_",
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("cust_dmarc_failure_tenant_a-2024-01-01", index_calls)
def test_from_header_with_empty_display_name(self):
"""When the From display name is empty, the code uses the
address alone (covers the early-return branch in the
display-name handling)."""
report = _failure_report()
report["parsed_sample"]["headers"]["From"] = [["", "sender@example.com"]]
report["parsed_sample"]["headers"]["To"] = [["", "rcpt@example.com"]]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_to_header_with_non_empty_display_joins_with_brackets(self):
"""The other branch: non-empty display joins display+addr
with " <" and appends ">", e.g. 'RT <rcpt@example.com>'."""
report = _failure_report()
report["parsed_sample"]["headers"]["To"] = [["RT", "rcpt@example.com"]]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_sample_address_lists_indexed_for_reply_to_cc_bcc_attachments(self):
"""A failure report sample can carry reply_to / cc / bcc /
attachments. Each populates a nested InnerDoc on the sample —
if the add_* helpers regress, those nested docs would be
silently empty in Elasticsearch."""
report = _failure_report()
report["parsed_sample"]["reply_to"] = [
{"display_name": "RT", "address": "rt@example.com"}
]
report["parsed_sample"]["cc"] = [
{"display_name": "CC", "address": "cc@example.com"}
]
report["parsed_sample"]["bcc"] = [
{"display_name": "", "address": "bcc@example.com"}
]
report["parsed_sample"]["attachments"] = [
{
"filename": "a.pdf",
"mail_content_type": "application/pdf",
"sha256": "deadbeef",
}
]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
# ---------------------------------------------------------------------------
# save_smtp_tls_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveSmtpTlsReport(unittest.TestCase):
def test_save_emits_one_document(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
mock_save.assert_called_once()
def test_already_saved_raises_on_dedup_hit(self):
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
mock_save.assert_not_called()
def test_search_exception_wraps_to_elasticsearch_error(self):
bad = MagicMock()
bad.execute.side_effect = RuntimeError("network")
with (
patch("parsedmarc.elastic.Search", return_value=bad),
patch("parsedmarc.elastic.Index"),
):
with self.assertRaises(ElasticsearchError):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._SMTPTLSReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
def test_index_name_uses_begin_date_for_monthly_partition(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._SMTPTLSReportDoc, "save"),
):
save_smtp_tls_report_to_elasticsearch(
_smtp_tls_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("smtp_tls-2024-02", index_calls)
def test_index_name_honours_suffix_and_prefix(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._SMTPTLSReportDoc, "save"),
):
save_smtp_tls_report_to_elasticsearch(
_smtp_tls_report(), index_suffix="t1", index_prefix="cust_"
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("cust_smtp_tls_t1-2024-02-03", index_calls)
def test_policy_without_strings_or_mx_patterns(self):
"""policy_strings / mx_host_patterns are optional in the
report shape — verify the branch where they're absent."""
report = _smtp_tls_report()
for policy in report["policies"]:
policy.pop("policy_strings", None)
policy.pop("mx_host_patterns", None)
policy.pop("failure_details", None)
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_failure_details_all_optional_fields_populated(self):
"""Exercise every optional field in failure_details so the
full set of `if "x" in failure_detail` branches runs."""
report = _smtp_tls_report()
report["policies"][0]["failure_details"] = [
{
"result_type": "certificate-expired",
"failed_session_count": 1,
"receiving_mx_hostname": "mx.example.com",
"additional_information_uri": "https://example.com/why",
"failure_reason_code": "ERR_CERT",
"ip_address": "10.0.0.5",
"receiving_ip": "10.0.0.2",
"receiving_mx_helo": "mx.helo.example.com",
"sending_mta_ip": "10.0.0.1",
}
]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(report)
mock_save.assert_called_once()
class TestBackwardCompatAlias(unittest.TestCase):
def test_save_forensic_alias_points_to_save_failure(self):
self.assertIs(
elastic_module.save_forensic_report_to_elasticsearch,
elastic_module.save_failure_report_to_elasticsearch,
)
def test_forensic_doc_alias_points_to_failure_doc(self):
self.assertIs(
elastic_module._ForensicReportDoc, elastic_module._FailureReportDoc
)
self.assertIs(
elastic_module._ForensicSampleDoc, elastic_module._FailureSampleDoc
)
# Silence unused-import lint in the test module preamble.
_ = call
if __name__ == "__main__":
unittest.main(verbosity=2)