Files
parsedmarc/tests/test_elastic.py
T

891 lines
36 KiB
Python

"""Tests for parsedmarc.elastic
Mocks at the elasticsearch-dsl SDK boundary (connections.create_connection,
Index, Search, Document.save) so the tests verify the parsedmarc-side
transformation logic — document construction, index naming, deduplication
queries, error wrapping — without needing a running Elasticsearch cluster.
"""
import unittest
from unittest.mock import MagicMock, call, patch
import parsedmarc.elastic as elastic_module
from parsedmarc import InvalidFailureReport
from parsedmarc.elastic import (
AlreadySaved,
ElasticsearchError,
create_indexes,
migrate_indexes,
save_aggregate_report_to_elasticsearch,
save_failure_report_to_elasticsearch,
save_smtp_tls_report_to_elasticsearch,
set_hosts,
)
# ---------------------------------------------------------------------------
# Sample report fixtures
# ---------------------------------------------------------------------------
def _aggregate_report(**overrides):
base = {
"xml_schema": "draft",
"xml_namespace": None,
"report_metadata": {
"org_name": "TestOrg",
"org_email": "dmarc@example.com",
"org_extra_contact_info": None,
"report_id": "agg-1",
"begin_date": "2024-01-15 00:00:00",
"end_date": "2024-01-16 00:00:00",
"timespan_requires_normalization": False,
"original_timespan_seconds": 86400,
"errors": [],
"generator": "TestGen/1.0",
},
"policy_published": {
"domain": "example.com",
"adkim": "r",
"aspf": "r",
"p": "none",
"sp": "none",
"pct": None,
"fo": None,
"np": "reject",
"testing": "n",
"discovery_method": "treewalk",
},
"records": [
{
"interval_begin": "2024-01-15 00:00:00",
"interval_end": "2024-01-16 00:00:00",
"normalized_timespan": False,
"source": {
"ip_address": "192.0.2.1",
"country": "US",
"reverse_dns": None,
"base_domain": None,
"name": None,
"type": None,
"asn": 64496,
"as_name": "Example AS",
"as_domain": "example.net",
},
"count": 4,
"alignment": {"spf": True, "dkim": True, "dmarc": True},
"policy_evaluated": {
"disposition": "none",
"dkim": "pass",
"spf": "pass",
"policy_override_reasons": [
{"type": "local_policy", "comment": "approved"}
],
},
"identifiers": {
"header_from": "example.com",
"envelope_from": "example.com",
"envelope_to": "rcpt@example.com",
},
"auth_results": {
"dkim": [
{
"domain": "example.com",
"selector": "s",
"result": "pass",
"human_result": None,
}
],
"spf": [
{
"domain": "example.com",
"scope": "mfrom",
"result": "pass",
"human_result": None,
}
],
},
}
],
}
base.update(overrides)
return base
def _failure_report(**overrides):
base = {
"feedback_type": "auth-failure",
"user_agent": "test/1.0",
"version": "1",
"original_envelope_id": None,
"original_mail_from": "x@example.com",
"original_rcpt_to": None,
"arrival_date": "Thu, 1 Jan 2024 00:00:00 +0000",
"arrival_date_utc": "2024-01-01 00:00:00",
"authentication_results": None,
"delivery_result": "other",
"auth_failure": ["dmarc"],
"authentication_mechanisms": [],
"dkim_domain": None,
"reported_domain": "example.com",
"sample_headers_only": True,
"source": {
"ip_address": "192.0.2.5",
"country": "US",
"reverse_dns": None,
"base_domain": None,
"name": None,
"type": None,
"asn": 64496,
"as_name": "Example AS",
"as_domain": "example.net",
},
"sample": "raw",
"parsed_sample": {
"headers": {
# mailparser emits headers as [[display_name, address]]
# lists; an empty display becomes [["", address]].
"From": [["Sender Name", "sender@example.com"]],
"To": [["", "rcpt@example.com"]],
"Subject": "Test",
},
"subject": "Test",
"filename_safe_subject": "Test",
"body": "body",
"date": "Thu, 1 Jan 2024 00:00:00 +0000",
"to": [{"display_name": None, "address": "rcpt@example.com"}],
"reply_to": [],
"cc": [],
"bcc": [],
"attachments": [],
},
}
base.update(overrides)
return base
def _smtp_tls_report(**overrides):
base = {
"organization_name": "TestOrg",
"begin_date": "2024-02-03T00:00:00Z",
"end_date": "2024-02-04T00:00:00Z",
"contact_info": "tls@example.com",
"report_id": "tls-1",
"policies": [
{
"policy_domain": "example.com",
"policy_type": "sts",
"successful_session_count": 100,
"failed_session_count": 1,
"policy_strings": ["version: STSv1"],
"mx_host_patterns": ["*.example.com"],
"failure_details": [
{
"result_type": "certificate-expired",
"failed_session_count": 1,
"receiving_mx_hostname": "mx.example.com",
"sending_mta_ip": "10.0.0.1",
}
],
}
],
}
base.update(overrides)
return base
def _empty_search():
"""A Search() mock whose .execute() returns an empty hit list."""
search = MagicMock()
search.execute.return_value = []
return search
def _populated_search():
"""A Search() mock whose .execute() returns a non-empty hit list."""
search = MagicMock()
search.execute.return_value = [MagicMock()]
return search
# ---------------------------------------------------------------------------
# set_hosts: connection-parameter assembly
# ---------------------------------------------------------------------------
class TestSetHosts(unittest.TestCase):
"""Verify the conn_params dict handed to elasticsearch-dsl
matches each documented option. Each branch corresponds to a
real-world deployment shape (TLS, basic auth, API key, custom CA)."""
def test_single_host_string_normalized_to_list(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("https://es:9200")
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["hosts"], ["https://es:9200"])
def test_host_list_preserved(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts(["es1:9200", "es2:9200"])
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["hosts"], ["es1:9200", "es2:9200"])
def test_timeout_default_60s(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200")
self.assertEqual(mock_conn.call_args.kwargs["timeout"], 60.0)
def test_timeout_custom(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", timeout=30.0)
self.assertEqual(mock_conn.call_args.kwargs["timeout"], 30.0)
def test_use_ssl_enables_verify_by_default(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True)
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["use_ssl"], True)
self.assertEqual(kwargs["verify_certs"], True)
self.assertNotIn("ca_certs", kwargs)
def test_use_ssl_with_custom_ca(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True, ssl_cert_path="/etc/ca.pem")
kwargs = mock_conn.call_args.kwargs
self.assertEqual(kwargs["ca_certs"], "/etc/ca.pem")
def test_skip_certificate_verification_sets_verify_false(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", use_ssl=True, skip_certificate_verification=True)
self.assertEqual(mock_conn.call_args.kwargs["verify_certs"], False)
def test_username_password_sets_http_auth(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", username="u", password="p")
self.assertEqual(mock_conn.call_args.kwargs["http_auth"], ("u", "p"))
def test_username_without_password_not_set(self):
"""Half-configured auth is suspicious enough not to send."""
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", username="u")
self.assertNotIn("http_auth", mock_conn.call_args.kwargs)
def test_api_key_set(self):
with patch("parsedmarc.elastic.connections.create_connection") as mock_conn:
set_hosts("es:9200", api_key="base64key==")
self.assertEqual(mock_conn.call_args.kwargs["api_key"], "base64key==")
# ---------------------------------------------------------------------------
# create_indexes
# ---------------------------------------------------------------------------
class TestCreateIndexes(unittest.TestCase):
def test_creates_missing_index_with_default_settings(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(["dmarc_aggregate-2024-01-15"])
mock_index.settings.assert_called_once_with(
number_of_shards=1, number_of_replicas=0
)
mock_index.create.assert_called_once()
def test_creates_with_custom_settings(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(
["idx"], settings={"number_of_shards": 3, "refresh_interval": "5s"}
)
mock_index.settings.assert_called_once_with(
number_of_shards=3, refresh_interval="5s"
)
def test_skips_existing_index(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = True
create_indexes(["idx"])
mock_index.create.assert_not_called()
def test_wraps_sdk_error(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index_cls.return_value.exists.side_effect = RuntimeError(
"cluster down"
)
with self.assertRaises(ElasticsearchError) as ctx:
create_indexes(["idx"])
self.assertIn("cluster down", str(ctx.exception))
class TestCreateIndexesServerless(unittest.TestCase):
"""Serverless mode strips shard/replica keys but keeps everything else.
Elastic Cloud Serverless rejects ``number_of_shards`` and
``number_of_replicas`` with HTTP 400. Other settings like
``refresh_interval`` are accepted and must pass through unchanged.
"""
def setUp(self):
self._original = elastic_module._SERVERLESS
elastic_module._SERVERLESS = True
def tearDown(self):
elastic_module._SERVERLESS = self._original
def test_serverless_default_skips_settings_entirely(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(["idx"])
mock_index.settings.assert_not_called()
mock_index.create.assert_called_once()
def test_serverless_filters_rejected_keys_and_passes_others_through(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(
["idx"],
settings={
"number_of_shards": 3,
"number_of_replicas": 2,
"refresh_interval": "5s",
},
)
mock_index.settings.assert_called_once_with(refresh_interval="5s")
def test_serverless_skips_settings_when_only_rejected_keys(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index = mock_index_cls.return_value
mock_index.exists.return_value = False
create_indexes(
["idx"], settings={"number_of_shards": 3, "number_of_replicas": 2}
)
mock_index.settings.assert_not_called()
mock_index.create.assert_called_once()
# ---------------------------------------------------------------------------
# migrate_indexes
# ---------------------------------------------------------------------------
class TestMigrateIndexes(unittest.TestCase):
"""The legacy `published_policy.fo` field was mapped as `long` in
older indexes. migrate_indexes detects that and rebuilds the index
with the text/keyword shape. The branch is gnarly; a regression
would silently leave old data un-migrated."""
def test_no_indexes_is_noop(self):
migrate_indexes() # Should not raise
def test_skips_non_existent_index(self):
with patch("parsedmarc.elastic.Index") as mock_index_cls:
mock_index_cls.return_value.exists.return_value = False
migrate_indexes(aggregate_indexes=["missing"])
# exists() returned False — no field_mapping fetch.
mock_index_cls.return_value.get_field_mapping.assert_not_called()
def test_skips_when_doc_mapping_absent(self):
"""An index that has 'fo' but not under the 'doc' type
(e.g., empty index with default mapping) is left alone."""
with patch("parsedmarc.elastic.Index") as mock_index_cls:
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {"some_key": {"mappings": {}}}
with patch("parsedmarc.elastic.reindex") as mock_reindex:
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2023-01-01"])
mock_reindex.assert_not_called()
def test_migrates_when_fo_is_long(self):
"""The actual migration path: when fo is mapped as 'long',
a v2 index is created with the corrected mapping, data is
reindexed, and the old index is deleted."""
with (
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch("parsedmarc.elastic.reindex") as mock_reindex,
patch("parsedmarc.elastic.connections.get_connection") as mock_get_conn,
):
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {
"dmarc_aggregate-2023-01-01": {
"mappings": {
"doc": {
"published_policy.fo": {"mapping": {"fo": {"type": "long"}}}
}
}
}
}
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2023-01-01"])
# reindex called from old → new (v2) index.
mock_reindex.assert_called_once()
# connections.get_connection consulted to get the ES client.
mock_get_conn.assert_called_once()
def test_skips_when_fo_already_text(self):
with (
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch("parsedmarc.elastic.reindex") as mock_reindex,
):
idx = mock_index_cls.return_value
idx.exists.return_value = True
idx.get_field_mapping.return_value = {
"dmarc_aggregate-2024-01-01": {
"mappings": {
"doc": {
"published_policy.fo": {"mapping": {"fo": {"type": "text"}}}
}
}
}
}
migrate_indexes(aggregate_indexes=["dmarc_aggregate-2024-01-01"])
mock_reindex.assert_not_called()
# ---------------------------------------------------------------------------
# save_aggregate_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveAggregateReport(unittest.TestCase):
"""The aggregate-report save fans out across multiple SDK calls:
Search (for dedup), Index.create (for the daily/monthly index),
Document.save. Each test patches the boundary it needs and
leaves the rest alone."""
def _patches(self, search_factory=_empty_search):
return [
patch("parsedmarc.elastic.Search", return_value=search_factory()),
patch(
"parsedmarc.elastic.Index",
return_value=MagicMock(exists=MagicMock(return_value=True)),
),
patch.object(elastic_module._AggregateReportDoc, "save"),
]
def test_save_emits_one_document_per_record(self):
report = _aggregate_report()
report["records"].append(report["records"][0].copy())
patches = self._patches()
with patches[0], patches[1], patches[2] as mock_save:
save_aggregate_report_to_elasticsearch(report)
# Two records → two saves.
self.assertEqual(mock_save.call_count, 2)
def test_already_saved_raises_when_search_returns_hit(self):
"""The dedup query is the only thing preventing
double-indexing on re-run. A regression would silently
re-save reports, inflating Kibana counts."""
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._AggregateReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_aggregate_report_to_elasticsearch(_aggregate_report())
mock_save.assert_not_called()
def test_search_exception_wraps_to_elasticsearch_error(self):
bad_search = MagicMock()
bad_search.execute.side_effect = RuntimeError("network")
with (
patch("parsedmarc.elastic.Search", return_value=bad_search),
patch("parsedmarc.elastic.Index"),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_aggregate_report_to_elasticsearch(_aggregate_report())
self.assertIn("network", str(ctx.exception))
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._AggregateReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_aggregate_report_to_elasticsearch(_aggregate_report())
self.assertIn("disk", str(ctx.exception))
def test_index_name_uses_daily_format_by_default(self):
"""Index naming: dmarc_aggregate-YYYY-MM-DD by default."""
index_calls = []
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(_aggregate_report())
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_aggregate-2024-01-15", index_calls)
def test_index_name_uses_monthly_format_when_flag_set(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(
_aggregate_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_aggregate-2024-01", index_calls)
def test_index_name_honours_suffix_and_prefix(self):
"""Prefix/suffix support multi-tenant setups where one ES
cluster serves several DMARC owners."""
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_index_cls.return_value.exists.return_value = True
save_aggregate_report_to_elasticsearch(
_aggregate_report(),
index_suffix="tenant_a",
index_prefix="customer1_",
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("customer1_dmarc_aggregate_tenant_a-2024-01-15", index_calls)
def test_dedup_search_pattern_uses_suffix_wildcard(self):
"""Existing-report search uses '*' so it matches both
daily and monthly index buckets."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch(
"parsedmarc.elastic.Index",
return_value=MagicMock(exists=MagicMock(return_value=True)),
),
patch.object(elastic_module._AggregateReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_aggregate_report_to_elasticsearch(
_aggregate_report(), index_suffix="tenant_a", index_prefix="cust_"
)
# Search index pattern wraps prefix+name+suffix with trailing wildcard.
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("cust_dmarc_aggregate_tenant_a*", search_index)
# ---------------------------------------------------------------------------
# save_failure_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveFailureReport(unittest.TestCase):
def test_save_emits_one_document(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(_failure_report())
mock_save.assert_called_once()
def test_already_saved_raises_on_dedup_hit(self):
"""Failure-report dedup uses arrival_date + From/To/Subject
from the parsed sample. A hit means we've already indexed
this exact failure sample."""
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_failure_report_to_elasticsearch(_failure_report())
mock_save.assert_not_called()
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._FailureReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError) as ctx:
save_failure_report_to_elasticsearch(_failure_report())
self.assertIn("disk", str(ctx.exception))
def test_keyerror_wraps_to_invalid_failure_report(self):
"""A malformed failure report (missing a required field) is
surfaced as InvalidFailureReport so the caller can route it
differently from infra errors."""
report = _failure_report()
del report["feedback_type"]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
with self.assertRaises(InvalidFailureReport):
save_failure_report_to_elasticsearch(report)
def test_index_dedup_pattern_searches_both_old_and_new_names(self):
"""The split-PR rename forensic→failure left existing data
in dmarc_forensic*; the dedup search must check both names
so re-runs don't double-index."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_failure_report_to_elasticsearch(_failure_report())
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("dmarc_failure*", search_index)
self.assertIn("dmarc_forensic*", search_index)
def test_index_name_uses_arrival_date_for_monthly_partition(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._FailureReportDoc, "save"),
):
save_failure_report_to_elasticsearch(
_failure_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("dmarc_failure-2024-01", index_calls)
def test_failure_search_index_with_suffix_and_prefix(self):
"""When both suffix and prefix are set, the dedup search
pattern joins them onto BOTH dmarc_failure* and
dmarc_forensic* (the rename back-compat)."""
with (
patch("parsedmarc.elastic.Search") as mock_search_cls,
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save"),
):
mock_search_cls.return_value.execute.return_value = []
save_failure_report_to_elasticsearch(
_failure_report(),
index_suffix="tenant_a",
index_prefix="cust_",
)
search_index = mock_search_cls.call_args.kwargs["index"]
self.assertIn("cust_dmarc_failure_tenant_a*", search_index)
self.assertIn("cust_dmarc_forensic_tenant_a*", search_index)
def test_failure_index_honours_suffix_and_prefix(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._FailureReportDoc, "save"),
):
save_failure_report_to_elasticsearch(
_failure_report(),
index_suffix="tenant_a",
index_prefix="cust_",
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("cust_dmarc_failure_tenant_a-2024-01-01", index_calls)
def test_from_header_with_empty_display_name(self):
"""When the From display name is empty, the code uses the
address alone (covers the early-return branch in the
display-name handling)."""
report = _failure_report()
report["parsed_sample"]["headers"]["From"] = [["", "sender@example.com"]]
report["parsed_sample"]["headers"]["To"] = [["", "rcpt@example.com"]]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_to_header_with_non_empty_display_joins_with_brackets(self):
"""The other branch: non-empty display joins display+addr
with " <" and appends ">", e.g. 'RT <rcpt@example.com>'."""
report = _failure_report()
report["parsed_sample"]["headers"]["To"] = [["RT", "rcpt@example.com"]]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_sample_address_lists_indexed_for_reply_to_cc_bcc_attachments(self):
"""A failure report sample can carry reply_to / cc / bcc /
attachments. Each populates a nested InnerDoc on the sample —
if the add_* helpers regress, those nested docs would be
silently empty in Elasticsearch."""
report = _failure_report()
report["parsed_sample"]["reply_to"] = [
{"display_name": "RT", "address": "rt@example.com"}
]
report["parsed_sample"]["cc"] = [
{"display_name": "CC", "address": "cc@example.com"}
]
report["parsed_sample"]["bcc"] = [
{"display_name": "", "address": "bcc@example.com"}
]
report["parsed_sample"]["attachments"] = [
{
"filename": "a.pdf",
"mail_content_type": "application/pdf",
"sha256": "deadbeef",
}
]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._FailureReportDoc, "save") as mock_save,
):
save_failure_report_to_elasticsearch(report)
mock_save.assert_called_once()
# ---------------------------------------------------------------------------
# save_smtp_tls_report_to_elasticsearch
# ---------------------------------------------------------------------------
class TestSaveSmtpTlsReport(unittest.TestCase):
def test_save_emits_one_document(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
mock_save.assert_called_once()
def test_already_saved_raises_on_dedup_hit(self):
with (
patch("parsedmarc.elastic.Search", return_value=_populated_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
with self.assertRaises(AlreadySaved):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
mock_save.assert_not_called()
def test_search_exception_wraps_to_elasticsearch_error(self):
bad = MagicMock()
bad.execute.side_effect = RuntimeError("network")
with (
patch("parsedmarc.elastic.Search", return_value=bad),
patch("parsedmarc.elastic.Index"),
):
with self.assertRaises(ElasticsearchError):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
def test_save_exception_wraps_to_elasticsearch_error(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(
elastic_module._SMTPTLSReportDoc,
"save",
side_effect=RuntimeError("disk"),
),
):
with self.assertRaises(ElasticsearchError):
save_smtp_tls_report_to_elasticsearch(_smtp_tls_report())
def test_index_name_uses_begin_date_for_monthly_partition(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._SMTPTLSReportDoc, "save"),
):
save_smtp_tls_report_to_elasticsearch(
_smtp_tls_report(), monthly_indexes=True
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("smtp_tls-2024-02", index_calls)
def test_index_name_honours_suffix_and_prefix(self):
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index") as mock_index_cls,
patch.object(elastic_module._SMTPTLSReportDoc, "save"),
):
save_smtp_tls_report_to_elasticsearch(
_smtp_tls_report(), index_suffix="t1", index_prefix="cust_"
)
index_calls = [c.args[0] for c in mock_index_cls.call_args_list]
self.assertIn("cust_smtp_tls_t1-2024-02-03", index_calls)
def test_policy_without_strings_or_mx_patterns(self):
"""policy_strings / mx_host_patterns are optional in the
report shape — verify the branch where they're absent."""
report = _smtp_tls_report()
for policy in report["policies"]:
policy.pop("policy_strings", None)
policy.pop("mx_host_patterns", None)
policy.pop("failure_details", None)
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(report)
mock_save.assert_called_once()
def test_failure_details_all_optional_fields_populated(self):
"""Exercise every optional field in failure_details so the
full set of `if "x" in failure_detail` branches runs."""
report = _smtp_tls_report()
report["policies"][0]["failure_details"] = [
{
"result_type": "certificate-expired",
"failed_session_count": 1,
"receiving_mx_hostname": "mx.example.com",
"additional_information_uri": "https://example.com/why",
"failure_reason_code": "ERR_CERT",
"ip_address": "10.0.0.5",
"receiving_ip": "10.0.0.2",
"receiving_mx_helo": "mx.helo.example.com",
"sending_mta_ip": "10.0.0.1",
}
]
with (
patch("parsedmarc.elastic.Search", return_value=_empty_search()),
patch("parsedmarc.elastic.Index"),
patch.object(elastic_module._SMTPTLSReportDoc, "save") as mock_save,
):
save_smtp_tls_report_to_elasticsearch(report)
mock_save.assert_called_once()
class TestBackwardCompatAlias(unittest.TestCase):
def test_save_forensic_alias_points_to_save_failure(self):
self.assertIs(
elastic_module.save_forensic_report_to_elasticsearch,
elastic_module.save_failure_report_to_elasticsearch,
)
def test_forensic_doc_alias_points_to_failure_doc(self):
self.assertIs(
elastic_module._ForensicReportDoc, elastic_module._FailureReportDoc
)
self.assertIs(
elastic_module._ForensicSampleDoc, elastic_module._FailureSampleDoc
)
# Silence unused-import lint in the test module preamble.
_ = call
if __name__ == "__main__":
unittest.main(verbosity=2)