Rename forensic to failure in output/integration modules

Rename all 'forensic' references to 'failure' in the output modules:
- elastic.py, opensearch.py, splunk.py, kafkaclient.py, syslog.py,
  gelf.py, webhook.py, loganalytics.py, s3.py

Changes include:
- Function/method names: save_forensic_* → save_failure_*
- Variable/parameter names: forensic_* → failure_*
- Class names: _ForensicReportDoc → _FailureReportDoc,
  _ForensicSampleDoc → _FailureSampleDoc
- Index/topic/sourcetype names: dmarc_forensic → dmarc_failure
- Log messages and docstrings updated
- Import statements updated to use new names from core module
- Backward-compatible aliases added at end of each file
- DMARCbis aggregate fields added to elastic.py and opensearch.py:
  np (Keyword), testing (Keyword), discovery_method (Keyword),
  generator (Text)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-02-20 20:46:44 +00:00
committed by Sean Whalen
parent 4a138ae8f7
commit 00721fdabb
10 changed files with 223 additions and 173 deletions

View File

@@ -1,3 +1,3 @@
__version__ = "9.3.0"
__version__ = "10.0.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -13,6 +13,7 @@ from elasticsearch_dsl import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Search,
@@ -21,7 +22,7 @@ from elasticsearch_dsl import (
)
from elasticsearch_dsl.search import Q
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidFailureReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -90,6 +91,10 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
@@ -120,7 +125,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _ForensicSampleDoc(InnerDoc):
class _FailureSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -157,9 +162,9 @@ class _ForensicSampleDoc(InnerDoc):
) # pyright: ignore[reportCallIssue]
class _ForensicReportDoc(Document):
class _FailureReportDoc(Document):
class Index:
name = "dmarc_forensic"
name = "dmarc_failure"
feedback_type = Text()
user_agent = Text()
@@ -177,7 +182,7 @@ class _ForensicReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_ForensicSampleDoc)
sample = Object(_FailureSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -327,20 +332,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
forensic_indexes (list): A list of forensic index names
failure_indexes (list): A list of failure index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if forensic_indexes is None:
forensic_indexes = []
if failure_indexes is None:
failure_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -370,7 +375,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
for failure_index in failure_indexes:
pass
@@ -386,7 +391,7 @@ def save_aggregate_report_to_elasticsearch(
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (dict): A parsed aggregate report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -535,8 +540,8 @@ def save_aggregate_report_to_elasticsearch(
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any],
def save_failure_report_to_elasticsearch(
failure_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
@@ -544,10 +549,10 @@ def save_forensic_report_to_elasticsearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC forensic report to Elasticsearch
Saves a parsed DMARC failure report to Elasticsearch
Args:
forensic_report (dict): A parsed forensic report
failure_report (dict): A parsed failure report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -560,24 +565,24 @@ def save_forensic_report_to_elasticsearch(
AlreadySaved
"""
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
logger.info("Saving failure report to Elasticsearch")
failure_report = failure_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
original_headers = failure_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_failure_{0}*".format(index_suffix)
else:
search_index = "dmarc_forensic*"
search_index = "dmarc_failure*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
@@ -620,64 +625,64 @@ def save_forensic_report_to_elasticsearch(
if len(existing) > 0:
raise AlreadySaved(
"A forensic sample to {0} from {1} "
"A failure sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"Elasticsearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
to_, from_, subject, failure_report["arrival_date_utc"]
)
)
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
headers=headers,
headers_only=forensic_report["sample_headers_only"],
headers_only=failure_report["sample_headers_only"],
date=sample_date,
subject=forensic_report["parsed_sample"]["subject"],
subject=failure_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=forensic_report["parsed_sample"]["body"],
body=failure_report["parsed_sample"]["body"],
)
for address in forensic_report["parsed_sample"]["to"]:
for address in failure_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["reply_to"]:
for address in failure_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in forensic_report["parsed_sample"]["cc"]:
for address in failure_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["bcc"]:
for address in failure_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in forensic_report["parsed_sample"]["attachments"]:
for attachment in failure_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_forensic"
index = "dmarc_failure"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -691,14 +696,14 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
failure_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
try:
forensic_doc.save()
failure_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
)
@@ -851,3 +856,9 @@ def save_smtp_tls_report_to_elasticsearch(
smtp_tls_doc.save()
except Exception as e:
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_elasticsearch = save_failure_report_to_elasticsearch

View File

@@ -9,10 +9,12 @@ from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport
from typing import Any
from parsedmarc.types import AggregateReport, SMTPTLSReport
log_context_data = threading.local()
@@ -57,11 +59,11 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
def save_failure_report_to_gelf(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
self.logger.info("parsedmarc failure report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
@@ -73,3 +75,7 @@ class GelfClient(object):
"""Remove and close the GELF handler, releasing its connection."""
self.logger.removeHandler(self.handler)
self.handler.close()
# Backward-compatible aliases
GelfClient.save_forensic_report_to_gelf = GelfClient.save_failure_report_to_gelf

View File

@@ -143,31 +143,31 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(
def save_failure_reports_to_kafka(
self,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
failure_reports: Union[dict[str, Any], list[dict[str, Any]]],
failure_topic: str,
):
"""
Saves forensic DMARC reports to Kafka, sends individual
Saves failure DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
forensic_reports (list): A list of forensic report dicts
failure_reports (list): A list of failure report dicts
to save to Kafka
forensic_topic (str): The name of the Kafka topic
failure_topic (str): The name of the Kafka topic
"""
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if len(forensic_reports) < 1:
if len(failure_reports) < 1:
return
try:
logger.debug("Saving forensic reports to Kafka")
self.producer.send(forensic_topic, forensic_reports)
logger.debug("Saving failure reports to Kafka")
self.producer.send(failure_topic, failure_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
except Exception as e:
@@ -188,7 +188,7 @@ class KafkaClient(object):
by default.
Args:
smtp_tls_reports (list): A list of forensic report dicts
smtp_tls_reports (list): A list of SMTP TLS report dicts
to save to Kafka
smtp_tls_topic (str): The name of the Kafka topic
@@ -200,7 +200,7 @@ class KafkaClient(object):
return
try:
logger.debug("Saving forensic reports to Kafka")
logger.debug("Saving SMTP TLS reports to Kafka")
self.producer.send(smtp_tls_topic, smtp_tls_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Kafka error: Unknown topic or partition on broker")
@@ -210,3 +210,7 @@ class KafkaClient(object):
self.producer.flush()
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
# Backward-compatible aliases
KafkaClient.save_forensic_reports_to_kafka = KafkaClient.save_failure_reports_to_kafka

View File

@@ -38,9 +38,9 @@ class LogAnalyticsConfig:
The Stream name where
the Aggregate DMARC reports
need to be pushed.
dcr_forensic_stream (str):
dcr_failure_stream (str):
The Stream name where
the Forensic DMARC reports
the Failure DMARC reports
need to be pushed.
dcr_smtp_tls_stream (str):
The Stream name where
@@ -56,7 +56,7 @@ class LogAnalyticsConfig:
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_forensic_stream: str,
dcr_failure_stream: str,
dcr_smtp_tls_stream: str,
):
self.client_id = client_id
@@ -65,7 +65,7 @@ class LogAnalyticsConfig:
self.dce = dce
self.dcr_immutable_id = dcr_immutable_id
self.dcr_aggregate_stream = dcr_aggregate_stream
self.dcr_forensic_stream = dcr_forensic_stream
self.dcr_failure_stream = dcr_failure_stream
self.dcr_smtp_tls_stream = dcr_smtp_tls_stream
@@ -84,7 +84,7 @@ class LogAnalyticsClient(object):
dce: str,
dcr_immutable_id: str,
dcr_aggregate_stream: str,
dcr_forensic_stream: str,
dcr_failure_stream: str,
dcr_smtp_tls_stream: str,
):
self.conf = LogAnalyticsConfig(
@@ -94,7 +94,7 @@ class LogAnalyticsClient(object):
dce=dce,
dcr_immutable_id=dcr_immutable_id,
dcr_aggregate_stream=dcr_aggregate_stream,
dcr_forensic_stream=dcr_forensic_stream,
dcr_failure_stream=dcr_failure_stream,
dcr_smtp_tls_stream=dcr_smtp_tls_stream,
)
if (
@@ -135,7 +135,7 @@ class LogAnalyticsClient(object):
self,
results: dict[str, Any],
save_aggregate: bool,
save_forensic: bool,
save_failure: bool,
save_smtp_tls: bool,
):
"""
@@ -146,13 +146,13 @@ class LogAnalyticsClient(object):
Args:
results (list):
The DMARC reports (Aggregate & Forensic)
The DMARC reports (Aggregate & Failure)
save_aggregate (bool):
Whether Aggregate reports can be saved into Log Analytics
save_forensic (bool):
Whether Forensic reports can be saved into Log Analytics
save_failure (bool):
Whether Failure reports can be saved into Log Analytics
save_smtp_tls (bool):
Whether Forensic reports can be saved into Log Analytics
Whether Failure reports can be saved into Log Analytics
"""
conf = self.conf
credential = ClientSecretCredential(
@@ -173,16 +173,16 @@ class LogAnalyticsClient(object):
)
logger.info("Successfully pushed aggregate reports.")
if (
results["forensic_reports"]
and conf.dcr_forensic_stream
and len(results["forensic_reports"]) > 0
and save_forensic
results["failure_reports"]
and conf.dcr_failure_stream
and len(results["failure_reports"]) > 0
and save_failure
):
logger.info("Publishing forensic reports.")
logger.info("Publishing failure reports.")
self.publish_json(
results["forensic_reports"], logs_client, conf.dcr_forensic_stream
results["failure_reports"], logs_client, conf.dcr_failure_stream
)
logger.info("Successfully pushed forensic reports.")
logger.info("Successfully pushed failure reports.")
if (
results["smtp_tls_reports"]
and conf.dcr_smtp_tls_stream

View File

@@ -14,6 +14,7 @@ from opensearchpy import (
InnerDoc,
Integer,
Ip,
Keyword,
Nested,
Object,
Q,
@@ -24,7 +25,7 @@ from opensearchpy import (
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc import InvalidFailureReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
@@ -93,6 +94,10 @@ class _AggregateReportDoc(Document):
envelope_to = Text()
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
np = Keyword()
testing = Keyword()
discovery_method = Keyword()
generator = Text()
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
@@ -123,7 +128,7 @@ class _EmailAttachmentDoc(Document):
sha256 = Text()
class _ForensicSampleDoc(InnerDoc):
class _FailureSampleDoc(InnerDoc):
raw = Text()
headers = Object()
headers_only = Boolean()
@@ -160,9 +165,9 @@ class _ForensicSampleDoc(InnerDoc):
)
class _ForensicReportDoc(Document):
class _FailureReportDoc(Document):
class Index:
name = "dmarc_forensic"
name = "dmarc_failure"
feedback_type = Text()
user_agent = Text()
@@ -180,7 +185,7 @@ class _ForensicReportDoc(Document):
source_auth_failures = Text()
dkim_domain = Text()
original_rcpt_to = Text()
sample = Object(_ForensicSampleDoc)
sample = Object(_FailureSampleDoc)
class _SMTPTLSFailureDetailsDoc(InnerDoc):
@@ -356,20 +361,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
failure_indexes: Optional[list[str]] = None,
):
"""
Updates index mappings
Args:
aggregate_indexes (list): A list of aggregate index names
forensic_indexes (list): A list of forensic index names
failure_indexes (list): A list of failure index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if forensic_indexes is None:
forensic_indexes = []
if failure_indexes is None:
failure_indexes = []
for aggregate_index_name in aggregate_indexes:
if not Index(aggregate_index_name).exists():
continue
@@ -399,7 +404,7 @@ def migrate_indexes(
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
for failure_index in failure_indexes:
pass
@@ -415,7 +420,7 @@ def save_aggregate_report_to_opensearch(
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (dict): A parsed aggregate report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -564,8 +569,8 @@ def save_aggregate_report_to_opensearch(
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
def save_failure_report_to_opensearch(
failure_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
@@ -573,10 +578,10 @@ def save_forensic_report_to_opensearch(
number_of_replicas: int = 0,
):
"""
Saves a parsed DMARC forensic report to OpenSearch
Saves a parsed DMARC failure report to OpenSearch
Args:
forensic_report (dict): A parsed forensic report
failure_report (dict): A parsed failure report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -589,24 +594,24 @@ def save_forensic_report_to_opensearch(
AlreadySaved
"""
logger.info("Saving forensic report to OpenSearch")
forensic_report = forensic_report.copy()
logger.info("Saving failure report to OpenSearch")
failure_report = failure_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
sample_date = forensic_report["parsed_sample"]["date"]
if failure_report["parsed_sample"]["date"] is not None:
sample_date = failure_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
original_headers = failure_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"])
arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"])
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_forensic_{0}*".format(index_suffix)
search_index = "dmarc_failure_{0}*".format(index_suffix)
else:
search_index = "dmarc_forensic*"
search_index = "dmarc_failure*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
@@ -649,64 +654,64 @@ def save_forensic_report_to_opensearch(
if len(existing) > 0:
raise AlreadySaved(
"A forensic sample to {0} from {1} "
"A failure sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(
to_, from_, subject, forensic_report["arrival_date_utc"]
to_, from_, subject, failure_report["arrival_date_utc"]
)
)
parsed_sample = forensic_report["parsed_sample"]
sample = _ForensicSampleDoc(
raw=forensic_report["sample"],
parsed_sample = failure_report["parsed_sample"]
sample = _FailureSampleDoc(
raw=failure_report["sample"],
headers=headers,
headers_only=forensic_report["sample_headers_only"],
headers_only=failure_report["sample_headers_only"],
date=sample_date,
subject=forensic_report["parsed_sample"]["subject"],
subject=failure_report["parsed_sample"]["subject"],
filename_safe_subject=parsed_sample["filename_safe_subject"],
body=forensic_report["parsed_sample"]["body"],
body=failure_report["parsed_sample"]["body"],
)
for address in forensic_report["parsed_sample"]["to"]:
for address in failure_report["parsed_sample"]["to"]:
sample.add_to(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["reply_to"]:
for address in failure_report["parsed_sample"]["reply_to"]:
sample.add_reply_to(
display_name=address["display_name"], address=address["address"]
)
for address in forensic_report["parsed_sample"]["cc"]:
for address in failure_report["parsed_sample"]["cc"]:
sample.add_cc(display_name=address["display_name"], address=address["address"])
for address in forensic_report["parsed_sample"]["bcc"]:
for address in failure_report["parsed_sample"]["bcc"]:
sample.add_bcc(display_name=address["display_name"], address=address["address"])
for attachment in forensic_report["parsed_sample"]["attachments"]:
for attachment in failure_report["parsed_sample"]["attachments"]:
sample.add_attachment(
filename=attachment["filename"],
content_type=attachment["mail_content_type"],
sha256=attachment["sha256"],
)
try:
forensic_doc = _ForensicReportDoc(
feedback_type=forensic_report["feedback_type"],
user_agent=forensic_report["user_agent"],
version=forensic_report["version"],
original_mail_from=forensic_report["original_mail_from"],
failure_doc = _FailureReportDoc(
feedback_type=failure_report["feedback_type"],
user_agent=failure_report["user_agent"],
version=failure_report["version"],
original_mail_from=failure_report["original_mail_from"],
arrival_date=arrival_date_epoch_milliseconds,
domain=forensic_report["reported_domain"],
original_envelope_id=forensic_report["original_envelope_id"],
authentication_results=forensic_report["authentication_results"],
delivery_results=forensic_report["delivery_result"],
source_ip_address=forensic_report["source"]["ip_address"],
source_country=forensic_report["source"]["country"],
source_reverse_dns=forensic_report["source"]["reverse_dns"],
source_base_domain=forensic_report["source"]["base_domain"],
authentication_mechanisms=forensic_report["authentication_mechanisms"],
auth_failure=forensic_report["auth_failure"],
dkim_domain=forensic_report["dkim_domain"],
original_rcpt_to=forensic_report["original_rcpt_to"],
domain=failure_report["reported_domain"],
original_envelope_id=failure_report["original_envelope_id"],
authentication_results=failure_report["authentication_results"],
delivery_results=failure_report["delivery_result"],
source_ip_address=failure_report["source"]["ip_address"],
source_country=failure_report["source"]["country"],
source_reverse_dns=failure_report["source"]["reverse_dns"],
source_base_domain=failure_report["source"]["base_domain"],
authentication_mechanisms=failure_report["authentication_mechanisms"],
auth_failure=failure_report["auth_failure"],
dkim_domain=failure_report["dkim_domain"],
original_rcpt_to=failure_report["original_rcpt_to"],
sample=sample,
)
index = "dmarc_forensic"
index = "dmarc_failure"
if index_suffix:
index = "{0}_{1}".format(index, index_suffix)
if index_prefix:
@@ -720,14 +725,14 @@ def save_forensic_report_to_opensearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index
failure_doc.meta.index = index
try:
forensic_doc.save()
failure_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
except KeyError as e:
raise InvalidForensicReport(
"Forensic report missing required field: {0}".format(e.__str__())
raise InvalidFailureReport(
"Failure report missing required field: {0}".format(e.__str__())
)
@@ -880,3 +885,9 @@ def save_smtp_tls_report_to_opensearch(
smtp_tls_doc.save()
except Exception as e:
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
# Backward-compatible aliases
_ForensicSampleDoc = _FailureSampleDoc
_ForensicReportDoc = _FailureReportDoc
save_forensic_report_to_opensearch = save_failure_report_to_opensearch

View File

@@ -56,8 +56,8 @@ class S3Client(object):
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "forensic")
def save_failure_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "failure")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "smtp_tls")
@@ -101,3 +101,7 @@ class S3Client(object):
self.s3.meta.client.close()
except Exception:
pass
# Backward-compatible aliases
S3Client.save_forensic_report_to_s3 = S3Client.save_failure_report_to_s3

View File

@@ -134,28 +134,28 @@ class HECClient(object):
if response["code"] != 0:
raise SplunkError(response["text"])
def save_forensic_reports_to_splunk(
def save_failure_reports_to_splunk(
self,
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
failure_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
"""
Saves forensic DMARC reports to Splunk
Saves failure DMARC reports to Splunk
Args:
forensic_reports (list): A list of forensic report dictionaries
failure_reports (list): A list of failure report dictionaries
to save in Splunk
"""
logger.debug("Saving forensic reports to Splunk")
if isinstance(forensic_reports, dict):
forensic_reports = [forensic_reports]
logger.debug("Saving failure reports to Splunk")
if isinstance(failure_reports, dict):
failure_reports = [failure_reports]
if len(forensic_reports) < 1:
if len(failure_reports) < 1:
return
json_str = ""
for report in forensic_reports:
for report in failure_reports:
data = self._common_data.copy()
data["sourcetype"] = "dmarc:forensic"
data["sourcetype"] = "dmarc_failure"
timestamp = human_timestamp_to_unix_timestamp(report["arrival_date_utc"])
data["time"] = timestamp
data["event"] = report.copy()
@@ -211,3 +211,7 @@ class HECClient(object):
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
HECClient.save_forensic_reports_to_splunk = HECClient.save_failure_reports_to_splunk

View File

@@ -13,7 +13,7 @@ from typing import Any, Optional
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_failure_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
@@ -170,8 +170,8 @@ class SyslogClient(object):
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
def save_failure_report_to_syslog(self, failure_reports: list[dict[str, Any]]):
rows = parsed_failure_reports_to_csv_rows(failure_reports)
for row in rows:
self.logger.info(json.dumps(row))
@@ -184,3 +184,7 @@ class SyslogClient(object):
"""Remove and close the syslog handler, releasing its socket."""
self.logger.removeHandler(self.log_handler)
self.log_handler.close()
# Backward-compatible aliases
SyslogClient.save_forensic_report_to_syslog = SyslogClient.save_failure_report_to_syslog

View File

@@ -16,7 +16,7 @@ class WebhookClient(object):
def __init__(
self,
aggregate_url: str,
forensic_url: str,
failure_url: str,
smtp_tls_url: str,
timeout: Optional[int] = 60,
):
@@ -24,12 +24,12 @@ class WebhookClient(object):
Initializes the WebhookClient
Args:
aggregate_url (str): The aggregate report webhook url
forensic_url (str): The forensic report webhook url
failure_url (str): The failure report webhook url
smtp_tls_url (str): The smtp_tls report webhook url
timeout (int): The timeout to use when calling the webhooks
"""
self.aggregate_url = aggregate_url
self.forensic_url = forensic_url
self.failure_url = failure_url
self.smtp_tls_url = smtp_tls_url
self.timeout = timeout
self.session = requests.Session()
@@ -38,9 +38,9 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_forensic_report_to_webhook(self, report: str):
def save_failure_report_to_webhook(self, report: str):
try:
self._send_to_webhook(self.forensic_url, report)
self._send_to_webhook(self.failure_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
@@ -67,3 +67,9 @@ class WebhookClient(object):
def close(self):
"""Close the underlying HTTP session."""
self.session.close()
# Backward-compatible aliases
WebhookClient.save_forensic_report_to_webhook = (
WebhookClient.save_failure_report_to_webhook
)