From a7fb20713b1ec25e1f845f59a3c098634c5ee7ae Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Thu, 27 Sep 2018 12:01:48 -0400 Subject: [PATCH] 4.1.0 --- CHANGELOG.md | 8 ++++++++ parsedmarc/__init__.py | 4 +++- parsedmarc/cli.py | 29 ++++++++++++++++++++++++++--- parsedmarc/elastic.py | 41 ++++++++++++++++++++++++++++------------- setup.py | 2 +- 5 files changed, 66 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b1ace8..3667377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +4.1.0 +----- + +- Add options for Elasticsearch prefixes and suffixes +- If an aggregate report has the invalid `disposition` value `pass`, change +it to `none` + + 4.0.2 ----- diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 70d7692..3baf515 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -44,7 +44,7 @@ import imapclient.exceptions import dateparser import mailparser -__version__ = "4.0.2" +__version__ = "4.1.0" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -362,6 +362,8 @@ def _parse_report_record(record, nameservers=None, timeout=2.0): ]) if "disposition" in policy_evaluated: new_policy_evaluated["disposition"] = policy_evaluated["disposition"] + if new_policy_evaluated["disposition"].strip().lower() == "pass": + new_policy_evaluated["disposition"] = "none" if "dkim" in policy_evaluated: new_policy_evaluated["dkim"] = policy_evaluated["dkim"] if "spf" in policy_evaluated: diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index ddb678f..236406b 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -29,7 +29,8 @@ def _main(): for report in reports_["aggregate_reports"]: try: if args.elasticsearch_host: - elastic.save_aggregate_report_to_elasticsearch(report) + elastic.save_aggregate_report_to_elasticsearch( + report, index=es_aggregate_index) except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except ElasticsearchException as error_: @@ -47,7 +48,8 @@ def _main(): for report in reports_["forensic_reports"]: try: if args.elasticsearch_host: - elastic.save_forensic_report_to_elasticsearch(report) + elastic.save_forensic_report_to_elasticsearch( + report, index=es_forensic_index) except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except ElasticsearchException as error_: @@ -94,6 +96,14 @@ def _main(): help="A list of one or more Elasticsearch " "hostnames or URLs to use (e.g. " "localhost:9200)") + arg_parser.add_argument("--elasticsearch-index-prefix", + help="Prefix to add in front of the " + "dmarc_aggregate and dmarc_forensic " + "Elasticsearch index names, joined by _") + arg_parser.add_argument("--elasticsearch-index-suffix", + help="Append this suffix to the " + "dmarc_aggregate and dmarc_forensic " + "Elasticsearch index names, joined by _") arg_parser.add_argument("--hec", help="URL to a Splunk HTTP Event " "Collector (HEC)") arg_parser.add_argument("--hec-token", help="The authorization token for " @@ -162,13 +172,26 @@ def _main(): arg_parser.print_help() exit(1) + es_aggregate_index = "dmarc_aggregate" + es_forensic_index = "dmarc_forensic" + + if args.elasticsearch_index_prefix: + prefix = args.elasticsearch_index_prefix + es_aggregate_index = "{0}_{1}".format(prefix, es_aggregate_index) + es_forensic_index = "{0}_{1}".format(prefix, es_forensic_index) + + if args.elasticsearch_index_suffix: + suffix = args.elasticsearch_index_suffix + es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) + es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) + if args.save_aggregate or args.save_forensic: if args.elasticsearch_host is None and args.hec is None: args.elasticsearch_host = ["localhost:9200"] try: if args.elasticsearch_host: elastic.set_hosts(args.elasticsearch_host) - elastic.create_indexes() + elastic.create_indexes([es_aggregate_index, es_forensic_index]) except ElasticsearchException as error: logger.error("Elasticsearch Error: {0}".format(error.__str__())) exit(1) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 58f2a9f..1e06e80 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -7,9 +7,6 @@ from elasticsearch_dsl.search import Q from elasticsearch_dsl import connections, Object, Document, Index, Nested, \ InnerDoc, Integer, Text, Boolean, DateRange, Ip, Date -aggregate_index = Index("dmarc_aggregate") -forensic_index = Index("dmarc_forensic") - class _PolicyOverride(InnerDoc): type = Text() @@ -172,20 +169,33 @@ def set_hosts(hosts): connections.create_connection(hosts=hosts, timeout=20) -def create_indexes(): - """Creates the required indexes""" - if not aggregate_index.exists(): - aggregate_index.create() - if not forensic_index.exists(): - forensic_index.create() +def create_indexes(names=None, settings=None): + """ + Create Elasticsearch indexes + + Args: + names (list): A list of index names + ["dmarc_aggregate", "dmarc_forensic"] by default + settings (dict): Index settings + + """ + if names is None: + names = ["dmarc_aggregate", "dmarc_forensic"] + for name in names: + index = Index(name) + if not index.exists(): + index.put_settings(settings) + index.create() -def save_aggregate_report_to_elasticsearch(aggregate_report): +def save_aggregate_report_to_elasticsearch(aggregate_report, + index="dmarc_aggregate"): """ Saves a parsed DMARC aggregate report to ElasticSearch Args: aggregate_report (OrderedDict): A parsed forensic report + index (str): The name of the index to save to Raises: AlreadySaved @@ -210,7 +220,7 @@ def save_aggregate_report_to_elasticsearch(aggregate_report): begin_date_query = Q(dict(match=dict(date_range=begin_date))) end_date_query = Q(dict(match=dict(date_range=end_date))) - search = aggregate_index.search() + search = Index(index).search() search.query = org_name_query & report_id_query & domain_query & \ begin_date_query & end_date_query @@ -270,15 +280,19 @@ def save_aggregate_report_to_elasticsearch(aggregate_report): agg_doc.add_spf_result(domain=spf_result["domain"], scope=spf_result["scope"], result=spf_result["result"]) + + agg_doc.meta.index = index agg_doc.save() -def save_forensic_report_to_elasticsearch(forensic_report): +def save_forensic_report_to_elasticsearch(forensic_report, + index="dmarc_forensic"): """ Saves a parsed DMARC forensic report to ElasticSearch Args: forensic_report (OrderedDict): A parsed forensic report + index (str): The name of the index to save to Raises: AlreadySaved @@ -295,7 +309,7 @@ def save_forensic_report_to_elasticsearch(forensic_report): arrival_date_human = forensic_report["arrival_date_utc"] arrival_date = parsedmarc.human_timestamp_to_datetime(arrival_date_human) - search = forensic_index.search() + search = Index(index).search() from_query = {"match": {"sample.headers.from": headers["from"]}} subject_query = {"match": {"sample.headers.subject": headers["subject"]}} arrival_query = {"match": {"sample.headers.arrival_date": arrival_date}} @@ -364,4 +378,5 @@ def save_forensic_report_to_elasticsearch(forensic_report): sample=sample ) + forensic_doc.meta.index = index forensic_doc.save() diff --git a/setup.py b/setup.py index fa38571..923e8af 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ from setuptools import setup from codecs import open from os import path -__version__ = "4.0.2" +__version__ = "4.1.0" description = "A Python package and CLI for parsing aggregate and " \ "forensic DMARC reports"