diff --git a/docs/source/api.md b/docs/source/api.md index f631b5f..4562983 100644 --- a/docs/source/api.md +++ b/docs/source/api.md @@ -14,6 +14,14 @@ :members: ``` +## parsedmarc.opensearch + +```{eval-rst} +.. automodule:: parsedmarc.opensearch + :members: +``` + + ## parsedmarc.splunk ```{eval-rst} diff --git a/docs/source/index.md b/docs/source/index.md index dc4dbfb..aee0802 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -26,7 +26,7 @@ Thanks to all [contributors]! ``` `parsedmarc` is a Python module and CLI utility for parsing DMARC reports. -When used with Elasticsearch and Kibana (or Splunk), it works as a self-hosted +When used with Elasticsearch and Kibana (or Splunk), or with OpenSearch and Grafana, it works as a self-hosted open source alternative to commercial DMARC report processing services such as Agari Brand Protection, Dmarcian, OnDMARC, ProofPoint Email Fraud Defense, and Valimail. @@ -40,7 +40,7 @@ and Valimail. - Consistent data structures - Simple JSON and/or CSV output - Optionally email the results -- Optionally send the results to Elasticsearch and/or Splunk, for use with +- Optionally send the results to Elasticsearch/OpenSearch and/or Splunk, for use with premade dashboards - Optionally send reports to Apache Kafka @@ -52,6 +52,7 @@ installation usage output elasticsearch +opensearch kibana splunk davmail diff --git a/docs/source/opensearch.md b/docs/source/opensearch.md new file mode 100644 index 0000000..627a25b --- /dev/null +++ b/docs/source/opensearch.md @@ -0,0 +1,14 @@ +# OpenSearch and Grafana + +To set up visual dashboards of DMARC data, install OpenSearch and Grafana. + +## Installation + +OpenSearch: https://opensearch.org/docs/latest/install-and-configure/install-opensearch/index/ +Grafana: https://grafana.com/docs/grafana/latest/setup-grafana/installation/ + +## Records retention + +Starting in version 5.0.0, `parsedmarc` stores data in a separate +index for each day to make it easy to comply with records +retention regulations such as GDPR. diff --git a/docs/source/usage.md b/docs/source/usage.md index cc0e035..98f3aaf 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -82,6 +82,10 @@ delete = False hosts = 127.0.0.1:9200 ssl = False +[opensearch] +hosts = https://admin:admin@127.0.0.1:9200 +ssl = True + [splunk_hec] url = https://splunkhec.example.com token = HECTokenGoesHere @@ -238,6 +242,28 @@ The full set of configuration options are: creating the index (Default: `1`) - `number_of_replicas` - int: The number of replicas to use when creating the index (Default: `0`) +- `opensearch` + - `hosts` - str: A comma separated list of hostnames and ports + or URLs (e.g. `127.0.0.1:9200` or + `https://user:secret@localhost`) + + :::{note} + Special characters in the username or password must be + [URL encoded]. + ::: + - `user` - str: Basic auth username + - `password` - str: Basic auth password + - `apiKey` - str: API key + - `ssl` - bool: Use an encrypted SSL/TLS connection + (Default: `True`) + - `timeout` - float: Timeout in seconds (Default: 60) + - `cert_path` - str: Path to a trusted certificates + - `index_suffix` - str: A suffix to apply to the index names + - `monthly_indexes` - bool: Use monthly indexes instead of daily indexes + - `number_of_shards` - int: The number of shards to use when + creating the index (Default: `1`) + - `number_of_replicas` - int: The number of replicas to use when + creating the index (Default: `0`) - `splunk_hec` - `url` - str: The URL of the Splunk HTTP Events Collector (HEC) - `token` - str: The HEC token diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 0f01bbd..d1c505c 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -18,7 +18,7 @@ import time from tqdm import tqdm from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \ - parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \ + parse_report_file, get_dmarc_reports_from_mbox, elastic, opensearch, kafkaclient, \ splunk, save_output, email_results, ParserError, __version__, \ InvalidDMARCReport, s3, syslog, loganalytics @@ -106,6 +106,27 @@ def _main(): except Exception as error_: logger.error("Elasticsearch exception error: {}".format( error_.__str__())) + + try: + if opts.opensearch_hosts: + shards = opts.opensearch_number_of_shards + replicas = opts.opensearch_number_of_replicas + opensearch.save_aggregate_report_to_opensearch( + report, + index_suffix=opts.opensearch_index_suffix, + monthly_indexes=opts.opensearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas + ) + except opensearch.AlreadySaved as warning: + logger.warning(warning.__str__()) + except opensearch.OpenSearchError as error_: + logger.error("OpenSearch Error: {0}".format( + error_.__str__())) + except Exception as error_: + logger.error("OpenSearch exception error: {}".format( + error_.__str__())) + try: if opts.kafka_hosts: kafka_client.save_aggregate_reports_to_kafka( @@ -113,16 +134,19 @@ def _main(): except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) + try: if opts.s3_bucket: s3_client.save_aggregate_report_to_s3(report) except Exception as error_: logger.error("S3 Error: {0}".format(error_.__str__())) + try: if opts.syslog_server: syslog_client.save_aggregate_report_to_syslog(report) except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + if opts.hec: try: aggregate_reports_ = reports_["aggregate_reports"] @@ -131,6 +155,7 @@ def _main(): aggregate_reports_) except splunk.SplunkError as e: logger.error("Splunk HEC error: {0}".format(e.__str__())) + if opts.save_forensic: for report in reports_["forensic_reports"]: try: @@ -150,6 +175,25 @@ def _main(): error_.__str__())) except InvalidDMARCReport as error_: logger.error(error_.__str__()) + + try: + shards = opts.opensearch_number_of_shards + replicas = opts.opensearch_number_of_replicas + if opts.opensearch_hosts: + opensearch.save_forensic_report_to_opensearch( + report, + index_suffix=opts.opensearch_index_suffix, + monthly_indexes=opts.opensearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas) + except opensearch.AlreadySaved as warning: + logger.warning(warning.__str__()) + except opensearch.OpenSearchError as error_: + logger.error("OpenSearch Error: {0}".format( + error_.__str__())) + except InvalidDMARCReport as error_: + logger.error(error_.__str__()) + try: if opts.kafka_hosts: kafka_client.save_forensic_reports_to_kafka( @@ -157,16 +201,19 @@ def _main(): except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) + try: if opts.s3_bucket: s3_client.save_forensic_report_to_s3(report) except Exception as error_: logger.error("S3 Error: {0}".format(error_.__str__())) + try: if opts.syslog_server: syslog_client.save_forensic_report_to_syslog(report) except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + if opts.hec: try: forensic_reports_ = reports_["forensic_reports"] @@ -175,6 +222,7 @@ def _main(): forensic_reports_) except splunk.SplunkError as e: logger.error("Splunk HEC error: {0}".format(e.__str__())) + if opts.save_smtp_tls: for report in reports_["smtp_tls_reports"]: try: @@ -194,6 +242,25 @@ def _main(): error_.__str__())) except InvalidDMARCReport as error_: logger.error(error_.__str__()) + + try: + shards = opts.opensearch_number_of_shards + replicas = opts.opensearch_number_of_replicas + if opts.opensearch_hosts: + opensearch.save_smtp_tls_report_to_opensearch( + report, + index_suffix=opts.opensearch_index_suffix, + monthly_indexes=opts.opensearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas) + except opensearch.AlreadySaved as warning: + logger.warning(warning.__str__()) + except opensearch.OpenSearchError as error_: + logger.error("OpenSearch Error: {0}".format( + error_.__str__())) + except InvalidDMARCReport as error_: + logger.error(error_.__str__()) + try: if opts.kafka_hosts: kafka_client.save_smtp_tls_reports_to_kafka( @@ -201,16 +268,19 @@ def _main(): except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) + try: if opts.s3_bucket: s3_client.save_smtp_tls_report_to_s3(report) except Exception as error_: logger.error("S3 Error: {0}".format(error_.__str__())) + try: if opts.syslog_server: syslog_client.save_smtp_tls_report_to_syslog(report) except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + if opts.hec: try: smtp_tls_reports_ = reports_["smtp_tls_reports"] @@ -219,6 +289,7 @@ def _main(): smtp_tls_reports_) except splunk.SplunkError as e: logger.error("Splunk HEC error: {0}".format(e.__str__())) + if opts.la_dce: try: la_client = loganalytics.LogAnalyticsClient( @@ -366,6 +437,17 @@ def _main(): elasticsearch_username=None, elasticsearch_password=None, elasticsearch_apiKey=None, + opensearch_hosts=None, + opensearch_timeout=60, + opensearch_number_of_shards=1, + opensearch_number_of_replicas=0, + opensearch_index_suffix=None, + opensearch_ssl=True, + opensearch_ssl_cert_path=None, + opensearch_monthly_indexes=False, + opensearch_username=None, + opensearch_password=None, + opensearch_apiKey=None, kafka_hosts=None, kafka_username=None, kafka_password=None, @@ -674,6 +756,49 @@ def _main(): if "apiKey" in elasticsearch_config: opts.elasticsearch_apiKey = elasticsearch_config[ "apiKey"] + + if "opensearch" in config: + opensearch_config = config["opensearch"] + if "hosts" in opensearch_config: + opts.opensearch_hosts = _str_to_list(opensearch_config[ + "hosts"]) + else: + logger.critical("hosts setting missing from the " + "opensearch config section") + exit(-1) + if "timeout" in opensearch_config: + timeout = opensearch_config.getfloat("timeout") + opts.opensearch_timeout = timeout + if "number_of_shards" in opensearch_config: + number_of_shards = opensearch_config.getint( + "number_of_shards") + opts.opensearch_number_of_shards = number_of_shards + if "number_of_replicas" in opensearch_config: + number_of_replicas = opensearch_config.getint( + "number_of_replicas") + opts.opensearch_number_of_replicas = number_of_replicas + if "index_suffix" in opensearch_config: + opts.opensearch_index_suffix = opensearch_config[ + "index_suffix"] + if "monthly_indexes" in opensearch_config: + monthly = opensearch_config.getboolean("monthly_indexes") + opts.opensearch_monthly_indexes = monthly + if "ssl" in opensearch_config: + opts.opensearch_ssl = opensearch_config.getboolean( + "ssl") + if "cert_path" in opensearch_config: + opts.opensearch_ssl_cert_path = opensearch_config[ + "cert_path"] + if "user" in opensearch_config: + opts.opensearch_username = opensearch_config[ + "user"] + if "password" in opensearch_config: + opts.opensearch_password = opensearch_config[ + "password"] + if "apiKey" in opensearch_config: + opts.opensearch_apiKey = opensearch_config[ + "apiKey"] + if "splunk_hec" in config.sections(): hec_config = config["splunk_hec"] if "url" in hec_config: @@ -697,6 +822,7 @@ def _main(): if "skip_certificate_verification" in hec_config: opts.hec_skip_certificate_verification = hec_config[ "skip_certificate_verification"] + if "kafka" in config.sections(): kafka_config = config["kafka"] if "hosts" in kafka_config: @@ -739,6 +865,7 @@ def _main(): else: logger.critical("forensic_topic setting missing from the " "splunk_hec config section") + if "smtp" in config.sections(): smtp_config = config["smtp"] if "host" in smtp_config: @@ -783,6 +910,7 @@ def _main(): opts.smtp_attachment = smtp_config["attachment"] if "message" in smtp_config: opts.smtp_message = smtp_config["message"] + if "s3" in config.sections(): s3_config = config["s3"] if "bucket" in s3_config: @@ -840,6 +968,7 @@ def _main(): if "oauth2_port" in gmail_api_config: opts.gmail_api_oauth2_port = \ gmail_api_config.get("oauth2_port", 8080) + if "log_analytics" in config.sections(): log_analytics_config = config["log_analytics"] opts.la_client_id = \ @@ -917,6 +1046,33 @@ def _main(): logger.exception("Elasticsearch Error") exit(1) + try: + if opts.opensearch_hosts: + os_aggregate_index = "dmarc_aggregate" + os_forensic_index = "dmarc_forensic" + os_smtp_tls_index = "smtp_tls" + if opts.opensearch_index_suffix: + suffix = opts.opensearch_index_suffix + os_aggregate_index = "{0}_{1}".format( + os_aggregate_index, suffix) + os_forensic_index = "{0}_{1}".format( + os_forensic_index, suffix) + os_smtp_tls_index = "{0}_{1}".format( + os_smtp_tls_index, suffix + ) + opensearch.set_hosts(opts.opensearch_hosts, + opts.opensearch_ssl, + opts.opensearch_ssl_cert_path, + opts.opensearch_username, + opts.opensearch_password, + opts.opensearch_apiKey, + timeout=opts.opensearch_timeout) + opensearch.migrate_indexes(aggregate_indexes=[os_aggregate_index], + forensic_indexes=[os_forensic_index]) + except opensearch.OpenSearchError: + logger.exception("OpenSearch Error") + exit(1) + if opts.s3_bucket: try: s3_client = s3.S3Client( diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 977531e..fc9c1ef 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -349,7 +349,7 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, number_of_shards=1, number_of_replicas=0): """ - Saves a parsed DMARC aggregate report to ElasticSearch + Saves a parsed DMARC aggregate report to Elasticsearch Args: aggregate_report (OrderedDict): A parsed forensic report @@ -484,7 +484,7 @@ def save_forensic_report_to_elasticsearch(forensic_report, number_of_shards=1, number_of_replicas=0): """ - Saves a parsed DMARC forensic report to ElasticSearch + Saves a parsed DMARC forensic report to Elasticsearch Args: forensic_report (OrderedDict): A parsed forensic report @@ -627,7 +627,7 @@ def save_smtp_tls_report_to_elasticsearch(report, number_of_shards=1, number_of_replicas=0): """ - Saves a parsed SMTP TLS report to elasticSearch + Saves a parsed SMTP TLS report to Elasticsearch Args: report (OrderedDict): A parsed SMTP TLS report diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py new file mode 100644 index 0000000..3b1f3eb --- /dev/null +++ b/parsedmarc/opensearch.py @@ -0,0 +1,744 @@ +# -*- coding: utf-8 -*- + +from collections import OrderedDict + +from opensearchpy import Q, connections, Object, Document, Index, Nested, \ + InnerDoc, Integer, Text, Boolean, Ip, Date, Search +from opensearchpy.helpers import reindex + +from parsedmarc.log import logger +from parsedmarc.utils import human_timestamp_to_datetime +from parsedmarc import InvalidForensicReport + + +class OpenSearchError(Exception): + """Raised when an OpenSearch error occurs""" + + +class _PolicyOverride(InnerDoc): + type = Text() + comment = Text() + + +class _PublishedPolicy(InnerDoc): + domain = Text() + adkim = Text() + aspf = Text() + p = Text() + sp = Text() + pct = Integer() + fo = Text() + + +class _DKIMResult(InnerDoc): + domain = Text() + selector = Text() + result = Text() + + +class _SPFResult(InnerDoc): + domain = Text() + scope = Text() + results = Text() + + +class _AggregateReportDoc(Document): + class Index: + name = "dmarc_aggregate" + + xml_schema = Text() + org_name = Text() + org_email = Text() + org_extra_contact_info = Text() + report_id = Text() + date_range = Date() + date_begin = Date() + date_end = Date() + errors = Text() + published_policy = Object(_PublishedPolicy) + source_ip_address = Ip() + source_country = Text() + source_reverse_dns = Text() + source_Base_domain = Text() + message_count = Integer + disposition = Text() + dkim_aligned = Boolean() + spf_aligned = Boolean() + passed_dmarc = Boolean() + policy_overrides = Nested(_PolicyOverride) + header_from = Text() + envelope_from = Text() + envelope_to = Text() + dkim_results = Nested(_DKIMResult) + spf_results = Nested(_SPFResult) + + def add_policy_override(self, type_, comment): + self.policy_overrides.append(_PolicyOverride(type=type_, + comment=comment)) + + def add_dkim_result(self, domain, selector, result): + self.dkim_results.append(_DKIMResult(domain=domain, + selector=selector, + result=result)) + + def add_spf_result(self, domain, scope, result): + self.spf_results.append(_SPFResult(domain=domain, + scope=scope, + result=result)) + + def save(self, ** kwargs): + self.passed_dmarc = False + self.passed_dmarc = self.spf_aligned or self.dkim_aligned + + return super().save(** kwargs) + + +class _EmailAddressDoc(InnerDoc): + display_name = Text() + address = Text() + + +class _EmailAttachmentDoc(Document): + filename = Text() + content_type = Text() + sha256 = Text() + + +class _ForensicSampleDoc(InnerDoc): + raw = Text() + headers = Object() + headers_only = Boolean() + to = Nested(_EmailAddressDoc) + subject = Text() + filename_safe_subject = Text() + _from = Object(_EmailAddressDoc) + date = Date() + reply_to = Nested(_EmailAddressDoc) + cc = Nested(_EmailAddressDoc) + bcc = Nested(_EmailAddressDoc) + body = Text() + attachments = Nested(_EmailAttachmentDoc) + + def add_to(self, display_name, address): + self.to.append(_EmailAddressDoc(display_name=display_name, + address=address)) + + def add_reply_to(self, display_name, address): + self.reply_to.append(_EmailAddressDoc(display_name=display_name, + address=address)) + + def add_cc(self, display_name, address): + self.cc.append(_EmailAddressDoc(display_name=display_name, + address=address)) + + def add_bcc(self, display_name, address): + self.bcc.append(_EmailAddressDoc(display_name=display_name, + address=address)) + + def add_attachment(self, filename, content_type, sha256): + self.attachments.append(_EmailAttachmentDoc(filename=filename, + content_type=content_type, sha256=sha256)) + + +class _ForensicReportDoc(Document): + class Index: + name = "dmarc_forensic" + + feedback_type = Text() + user_agent = Text() + version = Text() + original_mail_from = Text() + arrival_date = Date() + domain = Text() + original_envelope_id = Text() + authentication_results = Text() + delivery_results = Text() + source_ip_address = Ip() + source_country = Text() + source_reverse_dns = Text() + source_authentication_mechanisms = Text() + source_auth_failures = Text() + dkim_domain = Text() + original_rcpt_to = Text() + sample = Object(_ForensicSampleDoc) + + +class _SMTPTLSFailureDetailsDoc(InnerDoc): + result_type = Text() + sending_mta_ip = Ip() + receiving_mx_helo = Text() + receiving_ip = Ip() + failed_session_count = Integer() + additional_information_uri = Text() + failure_reason_code = Text() + + +class _SMTPTLSPolicyDoc(InnerDoc): + policy_domain = Text() + policy_type = Text() + policy_strings = Text() + mx_host_patterns = Text() + successful_session_count = Integer() + failed_session_count = Integer() + failure_details = Nested(_SMTPTLSFailureDetailsDoc) + + def add_failure_details(self, result_type, ip_address, + receiving_ip, + receiving_mx_helo, + failed_session_count, + receiving_mx_hostname=None, + additional_information_uri=None, + failure_reason_code=None): + self.failure_details.append( + result_type=result_type, + ip_address=ip_address, + receiving_mx_hostname=receiving_mx_hostname, + receiving_mx_helo=receiving_mx_helo, + receiving_ip=receiving_ip, + failed_session_count=failed_session_count, + additional_information=additional_information_uri, + failure_reason_code=failure_reason_code + ) + + +class _SMTPTLSFailureReportDoc(Document): + + class Index: + name = "smtp_tls" + + organization_name = Text() + date_range = Date() + date_begin = Date() + date_end = Date() + contact_info = Text() + report_id = Text() + policies = Nested(_SMTPTLSPolicyDoc) + + def add_policy(self, policy_type, policy_domain, + successful_session_count, + failed_session_count, + policy_string=None, + mx_host_patterns=None, + failure_details=None): + self.policies.append(policy_type=policy_type, + policy_domain=policy_domain, + successful_session_count=successful_session_count, + failed_session_count=failed_session_count, + policy_string=policy_string, + mx_host_patterns=mx_host_patterns, + failure_details=failure_details) + + +class AlreadySaved(ValueError): + """Raised when a report to be saved matches an existing report""" + + +def set_hosts(hosts, use_ssl=False, ssl_cert_path=None, + username=None, password=None, apiKey=None, timeout=60.0): + """ + Sets the OpenSearch hosts to use + + Args: + hosts (str|list): A single hostname or URL, or list of hostnames or URLs + use_ssl (bool): Use an HTTPS connection to the server + ssl_cert_path (str): Path to the certificate chain + username (str): The username to use for authentication + password (str): The password to use for authentication + apiKey (str): The Base64 encoded API key to use for authentication + timeout (float): Timeout in seconds + """ + if not isinstance(hosts, list): + hosts = [hosts] + conn_params = { + "hosts": hosts, + "timeout": timeout + } + if use_ssl: + conn_params['use_ssl'] = True + if ssl_cert_path: + conn_params['verify_certs'] = True + conn_params['ca_certs'] = ssl_cert_path + else: + conn_params['verify_certs'] = False + if username: + conn_params['http_auth'] = (username+":"+password) + if apiKey: + conn_params['api_key'] = apiKey + connections.create_connection(**conn_params) + + +def create_indexes(names, settings=None): + """ + Create OpenSearch indexes + + Args: + names (list): A list of index names + settings (dict): Index settings + + """ + for name in names: + index = Index(name) + try: + if not index.exists(): + logger.debug("Creating OpenSearch index: {0}".format(name)) + if settings is None: + index.settings(number_of_shards=1, + number_of_replicas=0) + else: + index.settings(**settings) + index.create() + except Exception as e: + raise OpenSearchError( + "OpenSearch error: {0}".format(e.__str__())) + + +def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): + """ + Updates index mappings + + Args: + aggregate_indexes (list): A list of aggregate index names + forensic_indexes (list): A list of forensic index names + """ + version = 2 + if aggregate_indexes is None: + aggregate_indexes = [] + if forensic_indexes is None: + forensic_indexes = [] + for aggregate_index_name in aggregate_indexes: + if not Index(aggregate_index_name).exists(): + continue + aggregate_index = Index(aggregate_index_name) + doc = "doc" + fo_field = "published_policy.fo" + fo = "fo" + fo_mapping = aggregate_index.get_field_mapping(fields=[fo_field]) + fo_mapping = fo_mapping[list(fo_mapping.keys())[0]]["mappings"] + if doc not in fo_mapping: + continue + + fo_mapping = fo_mapping[doc][fo_field]["mapping"][fo] + fo_type = fo_mapping["type"] + if fo_type == "long": + new_index_name = "{0}-v{1}".format(aggregate_index_name, version) + body = {"properties": {"published_policy.fo": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + Index(new_index_name).create() + Index(new_index_name).put_mapping(doc_type=doc, body=body) + reindex(connections.get_connection(), aggregate_index_name, + new_index_name) + Index(aggregate_index_name).delete() + + for forensic_index in forensic_indexes: + pass + + +def save_aggregate_report_to_opensearch(aggregate_report, + index_suffix=None, + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=0): + """ + Saves a parsed DMARC aggregate report to OpenSearch + + Args: + aggregate_report (OrderedDict): A parsed forensic report + index_suffix (str): The suffix of the name of the index to save to + monthly_indexes (bool): Use monthly indexes instead of daily indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the index + + Raises: + AlreadySaved + """ + logger.info("Saving aggregate report to OpenSearch") + aggregate_report = aggregate_report.copy() + metadata = aggregate_report["report_metadata"] + org_name = metadata["org_name"] + report_id = metadata["report_id"] + domain = aggregate_report["policy_published"]["domain"] + begin_date = human_timestamp_to_datetime(metadata["begin_date"], + to_utc=True) + end_date = human_timestamp_to_datetime(metadata["end_date"], + to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + aggregate_report["begin_date"] = begin_date + aggregate_report["end_date"] = end_date + date_range = [aggregate_report["begin_date"], + aggregate_report["end_date"]] + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search = Search(index="dmarc_aggregate_{0}*".format(index_suffix)) + else: + search = Search(index="dmarc_aggregate*") + query = org_name_query & report_id_query & domain_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise OpenSearchError("OpenSearch's search for existing report \ + error: {}".format(error_.__str__())) + + if len(existing) > 0: + raise AlreadySaved("An aggregate report ID {0} from {1} about {2} " + "with a date range of {3} UTC to {4} UTC already " + "exists in " + "OpenSearch".format(report_id, + org_name, + domain, + begin_date_human, + end_date_human)) + published_policy = _PublishedPolicy( + domain=aggregate_report["policy_published"]["domain"], + adkim=aggregate_report["policy_published"]["adkim"], + aspf=aggregate_report["policy_published"]["aspf"], + p=aggregate_report["policy_published"]["p"], + sp=aggregate_report["policy_published"]["sp"], + pct=aggregate_report["policy_published"]["pct"], + fo=aggregate_report["policy_published"]["fo"] + ) + + for record in aggregate_report["records"]: + agg_doc = _AggregateReportDoc( + xml_schema=aggregate_report["xml_schema"], + org_name=metadata["org_name"], + org_email=metadata["org_email"], + org_extra_contact_info=metadata["org_extra_contact_info"], + report_id=metadata["report_id"], + date_range=date_range, + date_begin=aggregate_report["begin_date"], + date_end=aggregate_report["end_date"], + errors=metadata["errors"], + published_policy=published_policy, + source_ip_address=record["source"]["ip_address"], + source_country=record["source"]["country"], + source_reverse_dns=record["source"]["reverse_dns"], + source_base_domain=record["source"]["base_domain"], + message_count=record["count"], + disposition=record["policy_evaluated"]["disposition"], + dkim_aligned=record["policy_evaluated"]["dkim"] is not None and + record["policy_evaluated"]["dkim"].lower() == "pass", + spf_aligned=record["policy_evaluated"]["spf"] is not None and + record["policy_evaluated"]["spf"].lower() == "pass", + header_from=record["identifiers"]["header_from"], + envelope_from=record["identifiers"]["envelope_from"], + envelope_to=record["identifiers"]["envelope_to"] + ) + + for override in record["policy_evaluated"]["policy_override_reasons"]: + agg_doc.add_policy_override(type_=override["type"], + comment=override["comment"]) + + for dkim_result in record["auth_results"]["dkim"]: + agg_doc.add_dkim_result(domain=dkim_result["domain"], + selector=dkim_result["selector"], + result=dkim_result["result"]) + + for spf_result in record["auth_results"]["spf"]: + agg_doc.add_spf_result(domain=spf_result["domain"], + scope=spf_result["scope"], + result=spf_result["result"]) + + index = "dmarc_aggregate" + if index_suffix: + index = "{0}_{1}".format(index, index_suffix) + index = "{0}-{1}".format(index, index_date) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + create_indexes([index], index_settings) + agg_doc.meta.index = index + + try: + agg_doc.save() + except Exception as e: + raise OpenSearchError( + "OpenSearch error: {0}".format(e.__str__())) + + +def save_forensic_report_to_opensearch(forensic_report, + index_suffix=None, + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=0): + """ + Saves a parsed DMARC forensic report to OpenSearch + + Args: + forensic_report (OrderedDict): A parsed forensic report + index_suffix (str): The suffix of the name of the index to save to + monthly_indexes (bool): Use monthly indexes instead of daily + indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the + index + + Raises: + AlreadySaved + + """ + logger.info("Saving forensic report to OpenSearch") + forensic_report = forensic_report.copy() + sample_date = None + if forensic_report["parsed_sample"]["date"] is not None: + sample_date = forensic_report["parsed_sample"]["date"] + sample_date = human_timestamp_to_datetime(sample_date) + original_headers = forensic_report["parsed_sample"]["headers"] + headers = OrderedDict() + for original_header in original_headers: + headers[original_header.lower()] = original_headers[original_header] + + arrival_date_human = forensic_report["arrival_date_utc"] + arrival_date = human_timestamp_to_datetime(arrival_date_human) + + if index_suffix is not None: + search = Search(index="dmarc_forensic_{0}*".format(index_suffix)) + else: + search = Search(index="dmarc_forensic*") + arrival_query = {"match": {"arrival_date": arrival_date}} + q = Q(arrival_query) + + from_ = None + to_ = None + subject = None + if "from" in headers: + from_ = headers["from"] + from_query = {"match_phrase": {"sample.headers.from": from_}} + q = q & Q(from_query) + if "to" in headers: + to_ = headers["to"] + to_query = {"match_phrase": {"sample.headers.to": to_}} + q = q & Q(to_query) + if "subject" in headers: + subject = headers["subject"] + subject_query = {"match_phrase": {"sample.headers.subject": subject}} + q = q & Q(subject_query) + + search.query = q + existing = search.execute() + + if len(existing) > 0: + raise AlreadySaved("A forensic sample to {0} from {1} " + "with a subject of {2} and arrival date of {3} " + "already exists in " + "OpenSearch".format(to_, + from_, + subject, + arrival_date_human + )) + + parsed_sample = forensic_report["parsed_sample"] + sample = _ForensicSampleDoc( + raw=forensic_report["sample"], + headers=headers, + headers_only=forensic_report["sample_headers_only"], + date=sample_date, + subject=forensic_report["parsed_sample"]["subject"], + filename_safe_subject=parsed_sample["filename_safe_subject"], + body=forensic_report["parsed_sample"]["body"] + ) + + for address in forensic_report["parsed_sample"]["to"]: + sample.add_to(display_name=address["display_name"], + address=address["address"]) + for address in forensic_report["parsed_sample"]["reply_to"]: + sample.add_reply_to(display_name=address["display_name"], + address=address["address"]) + for address in forensic_report["parsed_sample"]["cc"]: + sample.add_cc(display_name=address["display_name"], + address=address["address"]) + for address in forensic_report["parsed_sample"]["bcc"]: + sample.add_bcc(display_name=address["display_name"], + address=address["address"]) + for attachment in forensic_report["parsed_sample"]["attachments"]: + sample.add_attachment(filename=attachment["filename"], + content_type=attachment["mail_content_type"], + sha256=attachment["sha256"]) + try: + forensic_doc = _ForensicReportDoc( + feedback_type=forensic_report["feedback_type"], + user_agent=forensic_report["user_agent"], + version=forensic_report["version"], + original_mail_from=forensic_report["original_mail_from"], + arrival_date=arrival_date, + domain=forensic_report["reported_domain"], + original_envelope_id=forensic_report["original_envelope_id"], + authentication_results=forensic_report["authentication_results"], + delivery_results=forensic_report["delivery_result"], + source_ip_address=forensic_report["source"]["ip_address"], + source_country=forensic_report["source"]["country"], + source_reverse_dns=forensic_report["source"]["reverse_dns"], + source_base_domain=forensic_report["source"]["base_domain"], + authentication_mechanisms=forensic_report[ + "authentication_mechanisms"], + auth_failure=forensic_report["auth_failure"], + dkim_domain=forensic_report["dkim_domain"], + original_rcpt_to=forensic_report["original_rcpt_to"], + sample=sample + ) + + index = "dmarc_forensic" + if index_suffix: + index = "{0}_{1}".format(index, index_suffix) + if monthly_indexes: + index_date = arrival_date.strftime("%Y-%m") + else: + index_date = arrival_date.strftime("%Y-%m-%d") + index = "{0}-{1}".format(index, index_date) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + create_indexes([index], index_settings) + forensic_doc.meta.index = index + try: + forensic_doc.save() + except Exception as e: + raise OpenSearchError( + "OpenSearch error: {0}".format(e.__str__())) + except KeyError as e: + raise InvalidForensicReport( + "Forensic report missing required field: {0}".format(e.__str__())) + + +def save_smtp_tls_report_to_opensearch(report, + index_suffix=None, + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=0): + """ + Saves a parsed SMTP TLS report to OpenSearch + + Args: + report (OrderedDict): A parsed SMTP TLS report + index_suffix (str): The suffix of the name of the index to save to + monthly_indexes (bool): Use monthly indexes instead of daily indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the index + + Raises: + AlreadySaved + """ + logger.info("Saving aggregate report to OpenSearch") + org_name = report["org_name"] + report_id = report["report_id"] + begin_date = human_timestamp_to_datetime(report["begin_date"], + to_utc=True) + end_date = human_timestamp_to_datetime(report["end_date"], + to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + report["begin_date"] = begin_date + report["end_date"] = end_date + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search = Search(index="smtp_tls_{0}*".format(index_suffix)) + else: + search = Search(index="smtp_tls") + query = org_name_query & report_id_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise OpenSearchError("OpenSearch's search for existing report \ + error: {}".format(error_.__str__())) + + if len(existing) > 0: + raise AlreadySaved(f"An SMTP TLS report ID {report_id} from " + f" {org_name} with a date range of " + f"{begin_date_human} UTC to " + f"{end_date_human} UTC already " + "exists in OpenSearch") + + index = "smtp_tls" + if index_suffix: + index = "{0}_{1}".format(index, index_suffix) + index = "{0}-{1}".format(index, index_date) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + + smtp_tls_doc = _SMTPTLSFailureReportDoc( + organization_name=report["organization_name"], + date_range=[report["date_begin"], report["date_end"]], + date_begin=report["date_begin"], + date_end=report["date_end"], + contact_info=report["contact_info"], + report_id=report["report_id"] + ) + + for policy in report['policies']: + policy_strings = None + mx_host_patterns = None + if "policy_strings" in policy: + policy_strings = policy["policy_strings"] + if "mx_host_patterns" in policy: + mx_host_patterns = policy["mx_host_patterns"] + policy_doc = _SMTPTLSPolicyDoc( + policy_domain=policy["policy_domain"], + policy_type=policy["policy_type"], + policy_string=policy_strings, + mx_host_patterns=mx_host_patterns + ) + if "failure_details" in policy: + failure_details = policy["failure_details"] + receiving_mx_hostname = None + additional_information_uri = None + failure_reason_code = None + if "receiving_mx_hostname" in failure_details: + receiving_mx_hostname = failure_details[ + "receiving_mx_hostname"] + if "additional_information_uri" in failure_details: + additional_information_uri = failure_details[ + "additional_information_uri"] + if "failure_reason_code" in failure_details: + failure_reason_code = failure_details["failure_reason_code"] + policy_doc.add_failure_details( + result_type=failure_details["result_type"], + ip_address=failure_details["ip_address"], + receiving_ip=failure_details["receiving_ip"], + receiving_mx_helo=failure_details["receiving_mx_helo"], + failed_session_count=failure_details["failed_session_count"], + receiving_mx_hostname=receiving_mx_hostname, + additional_information_uri=additional_information_uri, + failure_reason_code=failure_reason_code + ) + smtp_tls_doc.policies.append(policy_doc) + + create_indexes([index], index_settings) + smtp_tls_doc.meta.index = index + + try: + smtp_tls_doc.save() + except Exception as e: + raise OpenSearchError( + "OpenSearch error: {0}".format(e.__str__())) diff --git a/pyproject.toml b/pyproject.toml index e509597..b6ff05a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "lxml>=4.4.0", "mailsuite>=1.6.1", "msgraph-core==0.2.2", + "opensearch-py>=2.4.2,<=3.0.0", "publicsuffixlist>=0.10.0", "requests>=2.22.0", "tqdm>=4.31.1", diff --git a/requirements.txt b/requirements.txt index 7ebb670..aaf360d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ imapclient>=2.1.0 dateparser>=1.1.1 elasticsearch<7.14.0 elasticsearch-dsl>=7.4.0 +opensearch-py>=2.4.2,<=3.0.0 kafka-python>=1.4.4 mailsuite>=1.6.1 nose>=1.3.7