diff --git a/docker-compose.yml b/docker-compose.yml index fd45ceb..922e91c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,3 +28,30 @@ services: interval: 10s timeout: 10s retries: 24 + + opensearch: + image: opensearchproject/opensearch:2.18.0 + environment: + - network.host=127.0.0.1 + - http.host=0.0.0.0 + - node.name=opensearch + - discovery.type=single-node + - cluster.name=parsedmarc-cluster + - discovery.seed_hosts=opensearch + - bootstrap.memory_lock=true + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD} + ports: + - 127.0.0.1:9201:9200 + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -XGET http://localhost:9201/_cluster/health?pretty | grep status | grep -q '\\(green\\|yellow\\)'" + ] + interval: 10s + timeout: 10s + retries: 24 diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 6bb4136..43ffff8 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -202,13 +202,15 @@ class _SMTPTLSPolicyDoc(InnerDoc): receiving_ip, receiving_mx_helo, failed_session_count, + sending_mta_ip=None, receiving_mx_hostname=None, additional_information_uri=None, failure_reason_code=None, ): - self.failure_details.append( + _details = _SMTPTLSFailureDetailsDoc( result_type=result_type, ip_address=ip_address, + sending_mta_ip=sending_mta_ip, receiving_mx_hostname=receiving_mx_hostname, receiving_mx_helo=receiving_mx_helo, receiving_ip=receiving_ip, @@ -216,9 +218,10 @@ class _SMTPTLSPolicyDoc(InnerDoc): additional_information=additional_information_uri, failure_reason_code=failure_reason_code, ) + self.failure_details.append(_details) -class _SMTPTLSFailureReportDoc(Document): +class _SMTPTLSReportDoc(Document): class Index: name = "smtp_tls" @@ -499,6 +502,7 @@ def save_aggregate_report_to_opensearch( index = "{0}_{1}".format(index, index_suffix) if index_prefix: index = "{0}{1}".format(index_prefix, index) + index = "{0}-{1}".format(index, index_date) index_settings = dict( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas @@ -685,7 +689,7 @@ def save_smtp_tls_report_to_opensearch( AlreadySaved """ logger.info("Saving aggregate report to OpenSearch") - org_name = report["org_name"] + org_name = report["organization_name"] report_id = report["report_id"] begin_date = human_timestamp_to_datetime(report["begin_date"], to_utc=True) end_date = human_timestamp_to_datetime(report["end_date"], to_utc=True) @@ -741,11 +745,11 @@ def save_smtp_tls_report_to_opensearch( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) - smtp_tls_doc = _SMTPTLSFailureReportDoc( - organization_name=report["organization_name"], - date_range=[report["date_begin"], report["date_end"]], - date_begin=report["date_begin"], - date_end=report["date_end"], + smtp_tls_doc = _SMTPTLSReportDoc( + org_name=report["organization_name"], + date_range=[report["begin_date"], report["end_date"]], + date_begin=report["begin_date"], + date_end=report["end_date"], contact_info=report["contact_info"], report_id=report["report_id"], ) @@ -760,32 +764,48 @@ def save_smtp_tls_report_to_opensearch( policy_doc = _SMTPTLSPolicyDoc( policy_domain=policy["policy_domain"], policy_type=policy["policy_type"], + succesful_session_count=policy["successful_session_count"], + failed_session_count=policy["failed_session_count"], policy_string=policy_strings, mx_host_patterns=mx_host_patterns, ) if "failure_details" in policy: - failure_details = policy["failure_details"] - receiving_mx_hostname = None - additional_information_uri = None - failure_reason_code = None - if "receiving_mx_hostname" in failure_details: - receiving_mx_hostname = failure_details["receiving_mx_hostname"] - if "additional_information_uri" in failure_details: - additional_information_uri = failure_details[ - "additional_information_uri" - ] - if "failure_reason_code" in failure_details: - failure_reason_code = failure_details["failure_reason_code"] - policy_doc.add_failure_details( - result_type=failure_details["result_type"], - ip_address=failure_details["ip_address"], - receiving_ip=failure_details["receiving_ip"], - receiving_mx_helo=failure_details["receiving_mx_helo"], - failed_session_count=failure_details["failed_session_count"], - receiving_mx_hostname=receiving_mx_hostname, - additional_information_uri=additional_information_uri, - failure_reason_code=failure_reason_code, - ) + for failure_detail in policy["failure_details"]: + receiving_mx_hostname = None + additional_information_uri = None + failure_reason_code = None + ip_address = None + receiving_ip = None + receiving_mx_helo = None + sending_mta_ip = None + + if "receiving_mx_hostname" in failure_detail: + receiving_mx_hostname = failure_detail["receiving_mx_hostname"] + if "additional_information_uri" in failure_detail: + additional_information_uri = failure_detail[ + "additional_information_uri" + ] + if "failure_reason_code" in failure_detail: + failure_reason_code = failure_detail["failure_reason_code"] + if "ip_address" in failure_detail: + ip_address = failure_detail["ip_address"] + if "receiving_ip" in failure_detail: + receiving_ip = failure_detail["receiving_ip"] + if "receiving_mx_helo" in failure_detail: + receiving_mx_helo = failure_detail["receiving_mx_helo"] + if "sending_mta_ip" in failure_detail: + sending_mta_ip = failure_detail["sending_mta_ip"] + policy_doc.add_failure_details( + result_type=failure_detail["result_type"], + ip_address=ip_address, + receiving_ip=receiving_ip, + receiving_mx_helo=receiving_mx_helo, + failed_session_count=failure_detail["failed_session_count"], + sending_mta_ip=sending_mta_ip, + receiving_mx_hostname=receiving_mx_hostname, + additional_information_uri=additional_information_uri, + failure_reason_code=failure_reason_code, + ) smtp_tls_doc.policies.append(policy_doc) create_indexes([index], index_settings)