diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index 38c0044..d99d911 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,3 +1,3 @@ -__version__ = "9.3.0" +__version__ = "10.0.0" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 6193548..3b8f4e0 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -13,6 +13,7 @@ from elasticsearch_dsl import ( InnerDoc, Integer, Ip, + Keyword, Nested, Object, Search, @@ -21,7 +22,7 @@ from elasticsearch_dsl import ( ) from elasticsearch_dsl.search import Q -from parsedmarc import InvalidForensicReport +from parsedmarc import InvalidFailureReport from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime @@ -90,6 +91,10 @@ class _AggregateReportDoc(Document): envelope_to = Text() dkim_results = Nested(_DKIMResult) spf_results = Nested(_SPFResult) + np = Keyword() + testing = Keyword() + discovery_method = Keyword() + generator = Text() def add_policy_override(self, type_: str, comment: str): self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue] @@ -120,7 +125,7 @@ class _EmailAttachmentDoc(Document): sha256 = Text() -class _ForensicSampleDoc(InnerDoc): +class _FailureSampleDoc(InnerDoc): raw = Text() headers = Object() headers_only = Boolean() @@ -157,9 +162,9 @@ class _ForensicSampleDoc(InnerDoc): ) # pyright: ignore[reportCallIssue] -class _ForensicReportDoc(Document): +class _FailureReportDoc(Document): class Index: - name = "dmarc_forensic" + name = "dmarc_failure" feedback_type = Text() user_agent = Text() @@ -177,7 +182,7 @@ class _ForensicReportDoc(Document): source_auth_failures = Text() dkim_domain = Text() original_rcpt_to = Text() - sample = Object(_ForensicSampleDoc) + sample = Object(_FailureSampleDoc) class _SMTPTLSFailureDetailsDoc(InnerDoc): @@ -327,20 +332,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None): def migrate_indexes( aggregate_indexes: Optional[list[str]] = None, - forensic_indexes: Optional[list[str]] = None, + failure_indexes: Optional[list[str]] = None, ): """ Updates index mappings Args: aggregate_indexes (list): A list of aggregate index names - forensic_indexes (list): A list of forensic index names + failure_indexes (list): A list of failure index names """ version = 2 if aggregate_indexes is None: aggregate_indexes = [] - if forensic_indexes is None: - forensic_indexes = [] + if failure_indexes is None: + failure_indexes = [] for aggregate_index_name in aggregate_indexes: if not Index(aggregate_index_name).exists(): continue @@ -370,7 +375,7 @@ def migrate_indexes( reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType] Index(aggregate_index_name).delete() - for forensic_index in forensic_indexes: + for failure_index in failure_indexes: pass @@ -386,7 +391,7 @@ def save_aggregate_report_to_elasticsearch( Saves a parsed DMARC aggregate report to Elasticsearch Args: - aggregate_report (dict): A parsed forensic report + aggregate_report (dict): A parsed aggregate report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes @@ -535,8 +540,8 @@ def save_aggregate_report_to_elasticsearch( raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__())) -def save_forensic_report_to_elasticsearch( - forensic_report: dict[str, Any], +def save_failure_report_to_elasticsearch( + failure_report: dict[str, Any], index_suffix: Optional[Any] = None, index_prefix: Optional[str] = None, monthly_indexes: Optional[bool] = False, @@ -544,10 +549,10 @@ def save_forensic_report_to_elasticsearch( number_of_replicas: int = 0, ): """ - Saves a parsed DMARC forensic report to Elasticsearch + Saves a parsed DMARC failure report to Elasticsearch Args: - forensic_report (dict): A parsed forensic report + failure_report (dict): A parsed failure report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily @@ -560,24 +565,24 @@ def save_forensic_report_to_elasticsearch( AlreadySaved """ - logger.info("Saving forensic report to Elasticsearch") - forensic_report = forensic_report.copy() + logger.info("Saving failure report to Elasticsearch") + failure_report = failure_report.copy() sample_date = None - if forensic_report["parsed_sample"]["date"] is not None: - sample_date = forensic_report["parsed_sample"]["date"] + if failure_report["parsed_sample"]["date"] is not None: + sample_date = failure_report["parsed_sample"]["date"] sample_date = human_timestamp_to_datetime(sample_date) - original_headers = forensic_report["parsed_sample"]["headers"] + original_headers = failure_report["parsed_sample"]["headers"] headers: dict[str, Any] = {} for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] - arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"]) + arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"]) arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000) if index_suffix is not None: - search_index = "dmarc_forensic_{0}*".format(index_suffix) + search_index = "dmarc_failure_{0}*".format(index_suffix) else: - search_index = "dmarc_forensic*" + search_index = "dmarc_failure*" if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) @@ -620,64 +625,64 @@ def save_forensic_report_to_elasticsearch( if len(existing) > 0: raise AlreadySaved( - "A forensic sample to {0} from {1} " + "A failure sample to {0} from {1} " "with a subject of {2} and arrival date of {3} " "already exists in " "Elasticsearch".format( - to_, from_, subject, forensic_report["arrival_date_utc"] + to_, from_, subject, failure_report["arrival_date_utc"] ) ) - parsed_sample = forensic_report["parsed_sample"] - sample = _ForensicSampleDoc( - raw=forensic_report["sample"], + parsed_sample = failure_report["parsed_sample"] + sample = _FailureSampleDoc( + raw=failure_report["sample"], headers=headers, - headers_only=forensic_report["sample_headers_only"], + headers_only=failure_report["sample_headers_only"], date=sample_date, - subject=forensic_report["parsed_sample"]["subject"], + subject=failure_report["parsed_sample"]["subject"], filename_safe_subject=parsed_sample["filename_safe_subject"], - body=forensic_report["parsed_sample"]["body"], + body=failure_report["parsed_sample"]["body"], ) - for address in forensic_report["parsed_sample"]["to"]: + for address in failure_report["parsed_sample"]["to"]: sample.add_to(display_name=address["display_name"], address=address["address"]) - for address in forensic_report["parsed_sample"]["reply_to"]: + for address in failure_report["parsed_sample"]["reply_to"]: sample.add_reply_to( display_name=address["display_name"], address=address["address"] ) - for address in forensic_report["parsed_sample"]["cc"]: + for address in failure_report["parsed_sample"]["cc"]: sample.add_cc(display_name=address["display_name"], address=address["address"]) - for address in forensic_report["parsed_sample"]["bcc"]: + for address in failure_report["parsed_sample"]["bcc"]: sample.add_bcc(display_name=address["display_name"], address=address["address"]) - for attachment in forensic_report["parsed_sample"]["attachments"]: + for attachment in failure_report["parsed_sample"]["attachments"]: sample.add_attachment( filename=attachment["filename"], content_type=attachment["mail_content_type"], sha256=attachment["sha256"], ) try: - forensic_doc = _ForensicReportDoc( - feedback_type=forensic_report["feedback_type"], - user_agent=forensic_report["user_agent"], - version=forensic_report["version"], - original_mail_from=forensic_report["original_mail_from"], + failure_doc = _FailureReportDoc( + feedback_type=failure_report["feedback_type"], + user_agent=failure_report["user_agent"], + version=failure_report["version"], + original_mail_from=failure_report["original_mail_from"], arrival_date=arrival_date_epoch_milliseconds, - domain=forensic_report["reported_domain"], - original_envelope_id=forensic_report["original_envelope_id"], - authentication_results=forensic_report["authentication_results"], - delivery_results=forensic_report["delivery_result"], - source_ip_address=forensic_report["source"]["ip_address"], - source_country=forensic_report["source"]["country"], - source_reverse_dns=forensic_report["source"]["reverse_dns"], - source_base_domain=forensic_report["source"]["base_domain"], - authentication_mechanisms=forensic_report["authentication_mechanisms"], - auth_failure=forensic_report["auth_failure"], - dkim_domain=forensic_report["dkim_domain"], - original_rcpt_to=forensic_report["original_rcpt_to"], + domain=failure_report["reported_domain"], + original_envelope_id=failure_report["original_envelope_id"], + authentication_results=failure_report["authentication_results"], + delivery_results=failure_report["delivery_result"], + source_ip_address=failure_report["source"]["ip_address"], + source_country=failure_report["source"]["country"], + source_reverse_dns=failure_report["source"]["reverse_dns"], + source_base_domain=failure_report["source"]["base_domain"], + authentication_mechanisms=failure_report["authentication_mechanisms"], + auth_failure=failure_report["auth_failure"], + dkim_domain=failure_report["dkim_domain"], + original_rcpt_to=failure_report["original_rcpt_to"], sample=sample, ) - index = "dmarc_forensic" + index = "dmarc_failure" if index_suffix: index = "{0}_{1}".format(index, index_suffix) if index_prefix: @@ -691,14 +696,14 @@ def save_forensic_report_to_elasticsearch( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) create_indexes([index], index_settings) - forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + failure_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] try: - forensic_doc.save() + failure_doc.save() except Exception as e: raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__())) except KeyError as e: - raise InvalidForensicReport( - "Forensic report missing required field: {0}".format(e.__str__()) + raise InvalidFailureReport( + "Failure report missing required field: {0}".format(e.__str__()) ) @@ -851,3 +856,9 @@ def save_smtp_tls_report_to_elasticsearch( smtp_tls_doc.save() except Exception as e: raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__())) + + +# Backward-compatible aliases +_ForensicSampleDoc = _FailureSampleDoc +_ForensicReportDoc = _FailureReportDoc +save_forensic_report_to_elasticsearch = save_failure_report_to_elasticsearch diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index 2ac5a5a..a0dfd93 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -9,10 +9,12 @@ from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler from parsedmarc import ( parsed_aggregate_reports_to_csv_rows, - parsed_forensic_reports_to_csv_rows, + parsed_failure_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows, ) -from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport +from typing import Any + +from parsedmarc.types import AggregateReport, SMTPTLSReport log_context_data = threading.local() @@ -57,11 +59,11 @@ class GelfClient(object): log_context_data.parsedmarc = None - def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]): - rows = parsed_forensic_reports_to_csv_rows(forensic_reports) + def save_failure_report_to_gelf(self, failure_reports: list[dict[str, Any]]): + rows = parsed_failure_reports_to_csv_rows(failure_reports) for row in rows: log_context_data.parsedmarc = row - self.logger.info("parsedmarc forensic report") + self.logger.info("parsedmarc failure report") def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) @@ -73,3 +75,7 @@ class GelfClient(object): """Remove and close the GELF handler, releasing its connection.""" self.logger.removeHandler(self.handler) self.handler.close() + + +# Backward-compatible aliases +GelfClient.save_forensic_report_to_gelf = GelfClient.save_failure_report_to_gelf diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 227e102..b15dfb5 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -143,31 +143,31 @@ class KafkaClient(object): except Exception as e: raise KafkaError("Kafka error: {0}".format(e.__str__())) - def save_forensic_reports_to_kafka( + def save_failure_reports_to_kafka( self, - forensic_reports: Union[dict[str, Any], list[dict[str, Any]]], - forensic_topic: str, + failure_reports: Union[dict[str, Any], list[dict[str, Any]]], + failure_topic: str, ): """ - Saves forensic DMARC reports to Kafka, sends individual + Saves failure DMARC reports to Kafka, sends individual records (slices) since Kafka requires messages to be <= 1MB by default. Args: - forensic_reports (list): A list of forensic report dicts + failure_reports (list): A list of failure report dicts to save to Kafka - forensic_topic (str): The name of the Kafka topic + failure_topic (str): The name of the Kafka topic """ - if isinstance(forensic_reports, dict): - forensic_reports = [forensic_reports] + if isinstance(failure_reports, dict): + failure_reports = [failure_reports] - if len(forensic_reports) < 1: + if len(failure_reports) < 1: return try: - logger.debug("Saving forensic reports to Kafka") - self.producer.send(forensic_topic, forensic_reports) + logger.debug("Saving failure reports to Kafka") + self.producer.send(failure_topic, failure_reports) except UnknownTopicOrPartitionError: raise KafkaError("Kafka error: Unknown topic or partition on broker") except Exception as e: @@ -188,7 +188,7 @@ class KafkaClient(object): by default. Args: - smtp_tls_reports (list): A list of forensic report dicts + smtp_tls_reports (list): A list of SMTP TLS report dicts to save to Kafka smtp_tls_topic (str): The name of the Kafka topic @@ -200,7 +200,7 @@ class KafkaClient(object): return try: - logger.debug("Saving forensic reports to Kafka") + logger.debug("Saving SMTP TLS reports to Kafka") self.producer.send(smtp_tls_topic, smtp_tls_reports) except UnknownTopicOrPartitionError: raise KafkaError("Kafka error: Unknown topic or partition on broker") @@ -210,3 +210,7 @@ class KafkaClient(object): self.producer.flush() except Exception as e: raise KafkaError("Kafka error: {0}".format(e.__str__())) + + +# Backward-compatible aliases +KafkaClient.save_forensic_reports_to_kafka = KafkaClient.save_failure_reports_to_kafka diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py index 10a941b..079d316 100644 --- a/parsedmarc/loganalytics.py +++ b/parsedmarc/loganalytics.py @@ -38,9 +38,9 @@ class LogAnalyticsConfig: The Stream name where the Aggregate DMARC reports need to be pushed. - dcr_forensic_stream (str): + dcr_failure_stream (str): The Stream name where - the Forensic DMARC reports + the Failure DMARC reports need to be pushed. dcr_smtp_tls_stream (str): The Stream name where @@ -56,7 +56,7 @@ class LogAnalyticsConfig: dce: str, dcr_immutable_id: str, dcr_aggregate_stream: str, - dcr_forensic_stream: str, + dcr_failure_stream: str, dcr_smtp_tls_stream: str, ): self.client_id = client_id @@ -65,7 +65,7 @@ class LogAnalyticsConfig: self.dce = dce self.dcr_immutable_id = dcr_immutable_id self.dcr_aggregate_stream = dcr_aggregate_stream - self.dcr_forensic_stream = dcr_forensic_stream + self.dcr_failure_stream = dcr_failure_stream self.dcr_smtp_tls_stream = dcr_smtp_tls_stream @@ -84,7 +84,7 @@ class LogAnalyticsClient(object): dce: str, dcr_immutable_id: str, dcr_aggregate_stream: str, - dcr_forensic_stream: str, + dcr_failure_stream: str, dcr_smtp_tls_stream: str, ): self.conf = LogAnalyticsConfig( @@ -94,7 +94,7 @@ class LogAnalyticsClient(object): dce=dce, dcr_immutable_id=dcr_immutable_id, dcr_aggregate_stream=dcr_aggregate_stream, - dcr_forensic_stream=dcr_forensic_stream, + dcr_failure_stream=dcr_failure_stream, dcr_smtp_tls_stream=dcr_smtp_tls_stream, ) if ( @@ -135,7 +135,7 @@ class LogAnalyticsClient(object): self, results: dict[str, Any], save_aggregate: bool, - save_forensic: bool, + save_failure: bool, save_smtp_tls: bool, ): """ @@ -146,13 +146,13 @@ class LogAnalyticsClient(object): Args: results (list): - The DMARC reports (Aggregate & Forensic) + The DMARC reports (Aggregate & Failure) save_aggregate (bool): Whether Aggregate reports can be saved into Log Analytics - save_forensic (bool): - Whether Forensic reports can be saved into Log Analytics + save_failure (bool): + Whether Failure reports can be saved into Log Analytics save_smtp_tls (bool): - Whether Forensic reports can be saved into Log Analytics + Whether Failure reports can be saved into Log Analytics """ conf = self.conf credential = ClientSecretCredential( @@ -173,16 +173,16 @@ class LogAnalyticsClient(object): ) logger.info("Successfully pushed aggregate reports.") if ( - results["forensic_reports"] - and conf.dcr_forensic_stream - and len(results["forensic_reports"]) > 0 - and save_forensic + results["failure_reports"] + and conf.dcr_failure_stream + and len(results["failure_reports"]) > 0 + and save_failure ): - logger.info("Publishing forensic reports.") + logger.info("Publishing failure reports.") self.publish_json( - results["forensic_reports"], logs_client, conf.dcr_forensic_stream + results["failure_reports"], logs_client, conf.dcr_failure_stream ) - logger.info("Successfully pushed forensic reports.") + logger.info("Successfully pushed failure reports.") if ( results["smtp_tls_reports"] and conf.dcr_smtp_tls_stream diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 5b46d6c..fd17645 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -14,6 +14,7 @@ from opensearchpy import ( InnerDoc, Integer, Ip, + Keyword, Nested, Object, Q, @@ -24,7 +25,7 @@ from opensearchpy import ( ) from opensearchpy.helpers import reindex -from parsedmarc import InvalidForensicReport +from parsedmarc import InvalidFailureReport from parsedmarc.log import logger from parsedmarc.utils import human_timestamp_to_datetime @@ -93,6 +94,10 @@ class _AggregateReportDoc(Document): envelope_to = Text() dkim_results = Nested(_DKIMResult) spf_results = Nested(_SPFResult) + np = Keyword() + testing = Keyword() + discovery_method = Keyword() + generator = Text() def add_policy_override(self, type_: str, comment: str): self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) @@ -123,7 +128,7 @@ class _EmailAttachmentDoc(Document): sha256 = Text() -class _ForensicSampleDoc(InnerDoc): +class _FailureSampleDoc(InnerDoc): raw = Text() headers = Object() headers_only = Boolean() @@ -160,9 +165,9 @@ class _ForensicSampleDoc(InnerDoc): ) -class _ForensicReportDoc(Document): +class _FailureReportDoc(Document): class Index: - name = "dmarc_forensic" + name = "dmarc_failure" feedback_type = Text() user_agent = Text() @@ -180,7 +185,7 @@ class _ForensicReportDoc(Document): source_auth_failures = Text() dkim_domain = Text() original_rcpt_to = Text() - sample = Object(_ForensicSampleDoc) + sample = Object(_FailureSampleDoc) class _SMTPTLSFailureDetailsDoc(InnerDoc): @@ -356,20 +361,20 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None): def migrate_indexes( aggregate_indexes: Optional[list[str]] = None, - forensic_indexes: Optional[list[str]] = None, + failure_indexes: Optional[list[str]] = None, ): """ Updates index mappings Args: aggregate_indexes (list): A list of aggregate index names - forensic_indexes (list): A list of forensic index names + failure_indexes (list): A list of failure index names """ version = 2 if aggregate_indexes is None: aggregate_indexes = [] - if forensic_indexes is None: - forensic_indexes = [] + if failure_indexes is None: + failure_indexes = [] for aggregate_index_name in aggregate_indexes: if not Index(aggregate_index_name).exists(): continue @@ -399,7 +404,7 @@ def migrate_indexes( reindex(connections.get_connection(), aggregate_index_name, new_index_name) Index(aggregate_index_name).delete() - for forensic_index in forensic_indexes: + for failure_index in failure_indexes: pass @@ -415,7 +420,7 @@ def save_aggregate_report_to_opensearch( Saves a parsed DMARC aggregate report to OpenSearch Args: - aggregate_report (dict): A parsed forensic report + aggregate_report (dict): A parsed aggregate report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes @@ -564,8 +569,8 @@ def save_aggregate_report_to_opensearch( raise OpenSearchError("OpenSearch error: {0}".format(e.__str__())) -def save_forensic_report_to_opensearch( - forensic_report: dict[str, Any], +def save_failure_report_to_opensearch( + failure_report: dict[str, Any], index_suffix: Optional[str] = None, index_prefix: Optional[str] = None, monthly_indexes: bool = False, @@ -573,10 +578,10 @@ def save_forensic_report_to_opensearch( number_of_replicas: int = 0, ): """ - Saves a parsed DMARC forensic report to OpenSearch + Saves a parsed DMARC failure report to OpenSearch Args: - forensic_report (dict): A parsed forensic report + failure_report (dict): A parsed failure report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily @@ -589,24 +594,24 @@ def save_forensic_report_to_opensearch( AlreadySaved """ - logger.info("Saving forensic report to OpenSearch") - forensic_report = forensic_report.copy() + logger.info("Saving failure report to OpenSearch") + failure_report = failure_report.copy() sample_date = None - if forensic_report["parsed_sample"]["date"] is not None: - sample_date = forensic_report["parsed_sample"]["date"] + if failure_report["parsed_sample"]["date"] is not None: + sample_date = failure_report["parsed_sample"]["date"] sample_date = human_timestamp_to_datetime(sample_date) - original_headers = forensic_report["parsed_sample"]["headers"] + original_headers = failure_report["parsed_sample"]["headers"] headers: dict[str, Any] = {} for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] - arrival_date = human_timestamp_to_datetime(forensic_report["arrival_date_utc"]) + arrival_date = human_timestamp_to_datetime(failure_report["arrival_date_utc"]) arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000) if index_suffix is not None: - search_index = "dmarc_forensic_{0}*".format(index_suffix) + search_index = "dmarc_failure_{0}*".format(index_suffix) else: - search_index = "dmarc_forensic*" + search_index = "dmarc_failure*" if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) @@ -649,64 +654,64 @@ def save_forensic_report_to_opensearch( if len(existing) > 0: raise AlreadySaved( - "A forensic sample to {0} from {1} " + "A failure sample to {0} from {1} " "with a subject of {2} and arrival date of {3} " "already exists in " "OpenSearch".format( - to_, from_, subject, forensic_report["arrival_date_utc"] + to_, from_, subject, failure_report["arrival_date_utc"] ) ) - parsed_sample = forensic_report["parsed_sample"] - sample = _ForensicSampleDoc( - raw=forensic_report["sample"], + parsed_sample = failure_report["parsed_sample"] + sample = _FailureSampleDoc( + raw=failure_report["sample"], headers=headers, - headers_only=forensic_report["sample_headers_only"], + headers_only=failure_report["sample_headers_only"], date=sample_date, - subject=forensic_report["parsed_sample"]["subject"], + subject=failure_report["parsed_sample"]["subject"], filename_safe_subject=parsed_sample["filename_safe_subject"], - body=forensic_report["parsed_sample"]["body"], + body=failure_report["parsed_sample"]["body"], ) - for address in forensic_report["parsed_sample"]["to"]: + for address in failure_report["parsed_sample"]["to"]: sample.add_to(display_name=address["display_name"], address=address["address"]) - for address in forensic_report["parsed_sample"]["reply_to"]: + for address in failure_report["parsed_sample"]["reply_to"]: sample.add_reply_to( display_name=address["display_name"], address=address["address"] ) - for address in forensic_report["parsed_sample"]["cc"]: + for address in failure_report["parsed_sample"]["cc"]: sample.add_cc(display_name=address["display_name"], address=address["address"]) - for address in forensic_report["parsed_sample"]["bcc"]: + for address in failure_report["parsed_sample"]["bcc"]: sample.add_bcc(display_name=address["display_name"], address=address["address"]) - for attachment in forensic_report["parsed_sample"]["attachments"]: + for attachment in failure_report["parsed_sample"]["attachments"]: sample.add_attachment( filename=attachment["filename"], content_type=attachment["mail_content_type"], sha256=attachment["sha256"], ) try: - forensic_doc = _ForensicReportDoc( - feedback_type=forensic_report["feedback_type"], - user_agent=forensic_report["user_agent"], - version=forensic_report["version"], - original_mail_from=forensic_report["original_mail_from"], + failure_doc = _FailureReportDoc( + feedback_type=failure_report["feedback_type"], + user_agent=failure_report["user_agent"], + version=failure_report["version"], + original_mail_from=failure_report["original_mail_from"], arrival_date=arrival_date_epoch_milliseconds, - domain=forensic_report["reported_domain"], - original_envelope_id=forensic_report["original_envelope_id"], - authentication_results=forensic_report["authentication_results"], - delivery_results=forensic_report["delivery_result"], - source_ip_address=forensic_report["source"]["ip_address"], - source_country=forensic_report["source"]["country"], - source_reverse_dns=forensic_report["source"]["reverse_dns"], - source_base_domain=forensic_report["source"]["base_domain"], - authentication_mechanisms=forensic_report["authentication_mechanisms"], - auth_failure=forensic_report["auth_failure"], - dkim_domain=forensic_report["dkim_domain"], - original_rcpt_to=forensic_report["original_rcpt_to"], + domain=failure_report["reported_domain"], + original_envelope_id=failure_report["original_envelope_id"], + authentication_results=failure_report["authentication_results"], + delivery_results=failure_report["delivery_result"], + source_ip_address=failure_report["source"]["ip_address"], + source_country=failure_report["source"]["country"], + source_reverse_dns=failure_report["source"]["reverse_dns"], + source_base_domain=failure_report["source"]["base_domain"], + authentication_mechanisms=failure_report["authentication_mechanisms"], + auth_failure=failure_report["auth_failure"], + dkim_domain=failure_report["dkim_domain"], + original_rcpt_to=failure_report["original_rcpt_to"], sample=sample, ) - index = "dmarc_forensic" + index = "dmarc_failure" if index_suffix: index = "{0}_{1}".format(index, index_suffix) if index_prefix: @@ -720,14 +725,14 @@ def save_forensic_report_to_opensearch( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) create_indexes([index], index_settings) - forensic_doc.meta.index = index + failure_doc.meta.index = index try: - forensic_doc.save() + failure_doc.save() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__())) except KeyError as e: - raise InvalidForensicReport( - "Forensic report missing required field: {0}".format(e.__str__()) + raise InvalidFailureReport( + "Failure report missing required field: {0}".format(e.__str__()) ) @@ -880,3 +885,9 @@ def save_smtp_tls_report_to_opensearch( smtp_tls_doc.save() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__())) + + +# Backward-compatible aliases +_ForensicSampleDoc = _FailureSampleDoc +_ForensicReportDoc = _FailureReportDoc +save_forensic_report_to_opensearch = save_failure_report_to_opensearch diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index 99e03b3..09cd8c4 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -56,8 +56,8 @@ class S3Client(object): def save_aggregate_report_to_s3(self, report: dict[str, Any]): self.save_report_to_s3(report, "aggregate") - def save_forensic_report_to_s3(self, report: dict[str, Any]): - self.save_report_to_s3(report, "forensic") + def save_failure_report_to_s3(self, report: dict[str, Any]): + self.save_report_to_s3(report, "failure") def save_smtp_tls_report_to_s3(self, report: dict[str, Any]): self.save_report_to_s3(report, "smtp_tls") @@ -101,3 +101,7 @@ class S3Client(object): self.s3.meta.client.close() except Exception: pass + + +# Backward-compatible aliases +S3Client.save_forensic_report_to_s3 = S3Client.save_failure_report_to_s3 diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index f96e000..f800c41 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -134,28 +134,28 @@ class HECClient(object): if response["code"] != 0: raise SplunkError(response["text"]) - def save_forensic_reports_to_splunk( + def save_failure_reports_to_splunk( self, - forensic_reports: Union[list[dict[str, Any]], dict[str, Any]], + failure_reports: Union[list[dict[str, Any]], dict[str, Any]], ): """ - Saves forensic DMARC reports to Splunk + Saves failure DMARC reports to Splunk Args: - forensic_reports (list): A list of forensic report dictionaries + failure_reports (list): A list of failure report dictionaries to save in Splunk """ - logger.debug("Saving forensic reports to Splunk") - if isinstance(forensic_reports, dict): - forensic_reports = [forensic_reports] + logger.debug("Saving failure reports to Splunk") + if isinstance(failure_reports, dict): + failure_reports = [failure_reports] - if len(forensic_reports) < 1: + if len(failure_reports) < 1: return json_str = "" - for report in forensic_reports: + for report in failure_reports: data = self._common_data.copy() - data["sourcetype"] = "dmarc:forensic" + data["sourcetype"] = "dmarc_failure" timestamp = human_timestamp_to_unix_timestamp(report["arrival_date_utc"]) data["time"] = timestamp data["event"] = report.copy() @@ -211,3 +211,7 @@ class HECClient(object): def close(self): """Close the underlying HTTP session.""" self.session.close() + + +# Backward-compatible aliases +HECClient.save_forensic_reports_to_splunk = HECClient.save_failure_reports_to_splunk diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index ec8e757..883987a 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -13,7 +13,7 @@ from typing import Any, Optional from parsedmarc import ( parsed_aggregate_reports_to_csv_rows, - parsed_forensic_reports_to_csv_rows, + parsed_failure_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows, ) @@ -170,8 +170,8 @@ class SyslogClient(object): for row in rows: self.logger.info(json.dumps(row)) - def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]): - rows = parsed_forensic_reports_to_csv_rows(forensic_reports) + def save_failure_report_to_syslog(self, failure_reports: list[dict[str, Any]]): + rows = parsed_failure_reports_to_csv_rows(failure_reports) for row in rows: self.logger.info(json.dumps(row)) @@ -184,3 +184,7 @@ class SyslogClient(object): """Remove and close the syslog handler, releasing its socket.""" self.logger.removeHandler(self.log_handler) self.log_handler.close() + + +# Backward-compatible aliases +SyslogClient.save_forensic_report_to_syslog = SyslogClient.save_failure_report_to_syslog diff --git a/parsedmarc/webhook.py b/parsedmarc/webhook.py index 9b6f66f..c122a72 100644 --- a/parsedmarc/webhook.py +++ b/parsedmarc/webhook.py @@ -16,7 +16,7 @@ class WebhookClient(object): def __init__( self, aggregate_url: str, - forensic_url: str, + failure_url: str, smtp_tls_url: str, timeout: Optional[int] = 60, ): @@ -24,12 +24,12 @@ class WebhookClient(object): Initializes the WebhookClient Args: aggregate_url (str): The aggregate report webhook url - forensic_url (str): The forensic report webhook url + failure_url (str): The failure report webhook url smtp_tls_url (str): The smtp_tls report webhook url timeout (int): The timeout to use when calling the webhooks """ self.aggregate_url = aggregate_url - self.forensic_url = forensic_url + self.failure_url = failure_url self.smtp_tls_url = smtp_tls_url self.timeout = timeout self.session = requests.Session() @@ -38,9 +38,9 @@ class WebhookClient(object): "Content-Type": "application/json", } - def save_forensic_report_to_webhook(self, report: str): + def save_failure_report_to_webhook(self, report: str): try: - self._send_to_webhook(self.forensic_url, report) + self._send_to_webhook(self.failure_url, report) except Exception as error_: logger.error("Webhook Error: {0}".format(error_.__str__())) @@ -67,3 +67,9 @@ class WebhookClient(object): def close(self): """Close the underlying HTTP session.""" self.session.close() + + +# Backward-compatible aliases +WebhookClient.save_forensic_report_to_webhook = ( + WebhookClient.save_failure_report_to_webhook +)