From 8f702b9bc291212c0a743d4c3ffa265c0d1f84ae Mon Sep 17 00:00:00 2001 From: zscholl Date: Fri, 15 Feb 2019 17:05:15 -0700 Subject: [PATCH 1/7] added parallelization option to cli parsing --- parsedmarc/__init__.py | 44 ++++++++++++++++---------- parsedmarc/cli.py | 72 ++++++++++++++++++++++++++++++++---------- parsedmarc/utils.py | 10 ++++-- 3 files changed, 90 insertions(+), 36 deletions(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index b6d0731..c73dde9 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -83,7 +83,7 @@ class InvalidForensicReport(InvalidDMARCReport): """Raised when an invalid DMARC forensic report is encountered""" -def _parse_report_record(record, nameservers=None, dns_timeout=2.0): +def _parse_report_record(record, nameservers=None, dns_timeout=2.0, parallel=False): """ Converts a record from a DMARC aggregate report into a more consistent format @@ -104,7 +104,8 @@ def _parse_report_record(record, nameservers=None, dns_timeout=2.0): new_record_source = get_ip_address_info(record["row"]["source_ip"], cache=IP_ADDRESS_CACHE, nameservers=nameservers, - timeout=dns_timeout) + timeout=dns_timeout, + parallel=parallel) new_record["source"] = new_record_source new_record["count"] = int(record["row"]["count"]) policy_evaluated = record["row"]["policy_evaluated"].copy() @@ -204,7 +205,7 @@ def _parse_report_record(record, nameservers=None, dns_timeout=2.0): return new_record -def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0): +def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0, parallel=False): """Parses a DMARC XML report string and returns a consistent OrderedDict Args: @@ -303,13 +304,15 @@ def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0): for record in report["record"]: report_record = _parse_report_record(record, nameservers=nameservers, - dns_timeout=timeout) + dns_timeout=timeout, + parallel=parallel) records.append(report_record) else: report_record = _parse_report_record(report["record"], nameservers=nameservers, - dns_timeout=timeout) + dns_timeout=timeout, + parallel=parallel) records.append(report_record) new_report["records"] = records @@ -375,7 +378,7 @@ def extract_xml(input_): return xml -def parse_aggregate_report_file(_input, nameservers=None, dns_timeout=2.0): +def parse_aggregate_report_file(_input, nameservers=None, dns_timeout=2.0, parallel=False): """Parses a file at the given path, a file-like object. or bytes as a aggregate DMARC report @@ -392,7 +395,8 @@ def parse_aggregate_report_file(_input, nameservers=None, dns_timeout=2.0): return parse_aggregate_report_xml(xml, nameservers=nameservers, - timeout=dns_timeout) + timeout=dns_timeout, + parallel=parallel) def parsed_aggregate_reports_to_csv(reports): @@ -507,7 +511,8 @@ def parsed_aggregate_reports_to_csv(reports): def parse_forensic_report(feedback_report, sample, msg_date, nameservers=None, dns_timeout=2.0, - strip_attachment_payloads=False): + strip_attachment_payloads=False, + parallel=False): """ Converts a DMARC forensic report and sample to a ``OrderedDict`` @@ -551,7 +556,8 @@ def parse_forensic_report(feedback_report, sample, msg_date, ip_address = parsed_report["source_ip"] parsed_report_source = get_ip_address_info(ip_address, nameservers=nameservers, - timeout=dns_timeout) + timeout=dns_timeout, + parallel=parallel) parsed_report["source"] = parsed_report_source del parsed_report["source_ip"] @@ -651,7 +657,7 @@ def parsed_forensic_reports_to_csv(reports): def parse_report_email(input_, nameservers=None, dns_timeout=2.0, - strip_attachment_payloads=False): + strip_attachment_payloads=False, parallel=False): """ Parses a DMARC report from an email @@ -722,7 +728,8 @@ def parse_report_email(input_, nameservers=None, dns_timeout=2.0, aggregate_report = parse_aggregate_report_file( payload, nameservers=ns, - dns_timeout=dns_timeout) + dns_timeout=dns_timeout, + parallel=parallel) result = OrderedDict([("report_type", "aggregate"), ("report", aggregate_report)]) return result @@ -749,13 +756,15 @@ def parse_report_email(input_, nameservers=None, dns_timeout=2.0, date, nameservers=nameservers, dns_timeout=dns_timeout, - strip_attachment_payloads=strip_attachment_payloads) + strip_attachment_payloads=strip_attachment_payloads, + parallel=parallel) except InvalidForensicReport as e: error = 'Message with subject "{0}" ' \ 'is not a valid ' \ 'forensic DMARC report: {1}'.format(subject, e) raise InvalidForensicReport(error) except Exception as e: + print("DEBUGGGING: {}".format(e)) raise InvalidForensicReport(e.__str__()) result = OrderedDict([("report_type", "forensic"), @@ -769,7 +778,7 @@ def parse_report_email(input_, nameservers=None, dns_timeout=2.0, def parse_report_file(input_, nameservers=None, dns_timeout=2.0, - strip_attachment_payloads=False): + strip_attachment_payloads=False, parallel=False): """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -794,7 +803,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, content = file_object.read() try: report = parse_aggregate_report_file(content, nameservers=nameservers, - dns_timeout=dns_timeout) + dns_timeout=dns_timeout, + parallel=parallel) results = OrderedDict([("report_type", "aggregate"), ("report", report)]) except InvalidAggregateReport: @@ -803,8 +813,10 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, results = parse_report_email(content, nameservers=nameservers, dns_timeout=dns_timeout, - strip_attachment_payloads=sa) - except InvalidDMARCReport: + strip_attachment_payloads=sa, + parallel=parallel) + except InvalidDMARCReport as e: + print("DEBUGGING: {}".format(e)) raise InvalidDMARCReport("Not a valid aggregate or forensic " "report") return results diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 14883a3..2af989c 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -11,19 +11,43 @@ import logging from collections import OrderedDict import json from ssl import CERT_NONE, create_default_context +from multiprocessing import Pool, Value +from itertools import repeat +from contextlib import contextmanager +import time +from tqdm import tqdm from parsedmarc import IMAPError, get_dmarc_reports_from_inbox, \ parse_report_file, elastic, kafkaclient, splunk, save_output, \ - watch_inbox, email_results, SMTPError, ParserError, __version__ + watch_inbox, email_results, SMTPError, ParserError, __version__, \ + InvalidDMARCReport logger = logging.getLogger("parsedmarc") - def _str_to_list(s): """Converts a comma separated string to a list""" _list = s.split(",") return list(map(lambda i: i.lstrip(), _list)) +def cli_parse(file_path, sa, nameservers, dns_timeout, parallel=False): + """Separated this function for multiprocessing""" + try: + file_results = parse_report_file(file_path, + nameservers=nameservers, + dns_timeout=dns_timeout, + strip_attachment_payloads=sa, + parallel=parallel) + except ParserError as error: + return (error, file_path) + finally: + global counter + with counter.get_lock(): + counter.value += 1 + return (file_results, file_path) + +def init(ctr): + global counter + counter = ctr def _main(): """Called when the module is executed""" @@ -134,7 +158,7 @@ def _main(): args = arg_parser.parse_args() opts = Namespace(file_path=args.file_path, - onfig_file=args.config_file, + config_file=args.config_file, strip_attachment_payloads=args.strip_attachment_payloads, output=args.output, nameservers=args.nameservers, @@ -211,6 +235,10 @@ def _main(): opts.silent = general_config.getboolean("silent") if "log_file" in general_config: opts.log_file = general_config["log_file"] + if "n_cpus" in general_config: + opts.n_cpus = general_config.getint("n_cpus") + if "chunksize" in general_config: + opts.chunksize = general_config.getint("chunksize") if "imap" in config.sections(): imap_config = config["imap"] if "host" in imap_config: @@ -360,21 +388,31 @@ def _main(): file_paths += glob(file_path) file_paths = list(set(file_paths)) - for file_path in file_paths: - try: - sa = opts.strip_attachment_payloads - file_results = parse_report_file(file_path, - nameservers=opts.nameservers, - dns_timeout=opts.dns_timeout, - strip_attachment_payloads=sa) - if file_results["report_type"] == "aggregate": - aggregate_reports.append(file_results["report"]) - elif file_results["report_type"] == "forensic": - forensic_reports.append(file_results["report"]) + counter = Value('i', 0) + pool = Pool(opts.n_cpus, initializer=init, initargs=(counter,)) + results = pool.starmap_async(cli_parse, zip(file_paths, + repeat(opts.strip_attachment_payloads), + repeat(opts.nameservers), + repeat(opts.dns_timeout), + repeat(opts.n_cpus >= 1)), opts.chunksize) + pbar = tqdm(total=len(file_paths)) + while not results.ready(): + pbar.update(counter.value - pbar.n) + time.sleep(0.1) + pbar.close() + results = results.get() + pool.close() + pool.join() - except ParserError as error: - logger.error("Failed to parse {0} - {1}".format(file_path, - error)) + for result in results: + if type(result[0]) is InvalidDMARCReport: + logger.error("Failed to parse {0} - {1}".format(result[1], + result[0])) + else: + if result[0]["report_type"] == "aggregate": + aggregate_reports.append(result[0]["report"]) + elif result[0]["report_type"] == "forensic": + forensic_reports.append(result[0]["report"]) if opts.imap_host: try: diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index ffef046..4bea979 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -269,6 +269,7 @@ def get_ip_address_country(ip_address): Args: location (str): Local location for the database file """ + import pdb; pdb.set_trace() url = "https://geolite.maxmind.com/download/geoip/database/" \ "GeoLite2-Country.tar.gz" # Use a browser-like user agent string to bypass some proxy blocks @@ -314,7 +315,7 @@ def get_ip_address_country(ip_address): return country -def get_ip_address_info(ip_address, cache=None, nameservers=None, timeout=2.0): +def get_ip_address_info(ip_address, cache=None, nameservers=None, timeout=2.0, parallel=False): """ Returns reverse DNS and country information for the given IP address @@ -339,8 +340,11 @@ def get_ip_address_info(ip_address, cache=None, nameservers=None, timeout=2.0): reverse_dns = get_reverse_dns(ip_address, nameservers=nameservers, timeout=timeout) - country = get_ip_address_country(ip_address) - info["country"] = country + if not parallel: + country = get_ip_address_country(ip_address) + info["country"] = country + else: + info["country"] = None info["reverse_dns"] = reverse_dns info["base_domain"] = None if reverse_dns is not None: From baa544217faee4234cef189d0e48d206ce8e6934 Mon Sep 17 00:00:00 2001 From: zscholl Date: Fri, 15 Feb 2019 17:17:20 -0700 Subject: [PATCH 2/7] updated README --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index e827f45..2c4b185 100644 --- a/README.rst +++ b/README.rst @@ -138,6 +138,8 @@ The full set of configuration options are: - ``debug`` - bool: Print debugging messages - ``silent`` - bool: Only print errors (Default: True) - ``log_file`` - str: Write log messages to a file at this path + - ``n_cpus`` - str: Number of process to run in parallel when parsing in CLI mode (Default: 1) + - ``chunksize`` - str: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of filels - ``imap`` - ``host`` - str: The IMAP server hostname or IP address - ``port`` - int: The IMAP server port (Default: 993) From 884b3759e764d12084ca0488aa47655c3c0c6017 Mon Sep 17 00:00:00 2001 From: zscholl Date: Fri, 15 Feb 2019 17:19:31 -0700 Subject: [PATCH 3/7] updated requirements --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 8e0ff1a..d91e138 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ flake8 sphinx sphinx_rtd_theme wheel +tqdm rstcheck>=3.3.1 pygments dnspython>=1.16.0 @@ -18,3 +19,4 @@ dateparser>=0.7.1 elasticsearch>=6.3.1 elasticsearch-dsl>=0.0.12 kafka-python>=1.4.4 +tqdm>=4.31.1 \ No newline at end of file From 6f7a3336235458097d56363aaab491dc0fdecde4 Mon Sep 17 00:00:00 2001 From: zscholl Date: Fri, 15 Feb 2019 17:20:51 -0700 Subject: [PATCH 4/7] fix typo --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 2c4b185..bf16f34 100644 --- a/README.rst +++ b/README.rst @@ -139,7 +139,7 @@ The full set of configuration options are: - ``silent`` - bool: Only print errors (Default: True) - ``log_file`` - str: Write log messages to a file at this path - ``n_cpus`` - str: Number of process to run in parallel when parsing in CLI mode (Default: 1) - - ``chunksize`` - str: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of filels + - ``chunksize`` - str: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files - ``imap`` - ``host`` - str: The IMAP server hostname or IP address - ``port`` - int: The IMAP server port (Default: 993) From 304074ade5679cf3ba4efd19aaa976a0b5bae437 Mon Sep 17 00:00:00 2001 From: zscholl Date: Fri, 15 Feb 2019 17:34:26 -0700 Subject: [PATCH 5/7] fix requirements.txt --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index d91e138..0e5977d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,5 +18,4 @@ mail-parser>=3.9.2 dateparser>=0.7.1 elasticsearch>=6.3.1 elasticsearch-dsl>=0.0.12 -kafka-python>=1.4.4 -tqdm>=4.31.1 \ No newline at end of file +kafka-python>=1.4.4 \ No newline at end of file From 8fc856d0e3abe0ac8dc1b224ff5122516539e649 Mon Sep 17 00:00:00 2001 From: zscholl Date: Wed, 20 Feb 2019 11:25:46 -0700 Subject: [PATCH 6/7] change n_cpus to n_procs. fix PEP8 issues. remove debugging statements. --- README.rst | 2 +- parsedmarc/__init__.py | 12 +++++++----- parsedmarc/cli.py | 26 +++++++++++++++----------- parsedmarc/utils.py | 4 ++-- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/README.rst b/README.rst index bf16f34..8685ee8 100644 --- a/README.rst +++ b/README.rst @@ -138,7 +138,7 @@ The full set of configuration options are: - ``debug`` - bool: Print debugging messages - ``silent`` - bool: Only print errors (Default: True) - ``log_file`` - str: Write log messages to a file at this path - - ``n_cpus`` - str: Number of process to run in parallel when parsing in CLI mode (Default: 1) + - ``n_procs`` - str: Number of process to run in parallel when parsing in CLI mode (Default: 1) - ``chunksize`` - str: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files - ``imap`` - ``host`` - str: The IMAP server hostname or IP address diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 2b3c93f..aab42e2 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -83,7 +83,8 @@ class InvalidForensicReport(InvalidDMARCReport): """Raised when an invalid DMARC forensic report is encountered""" -def _parse_report_record(record, nameservers=None, dns_timeout=2.0, parallel=False): +def _parse_report_record(record, nameservers=None, dns_timeout=2.0, + parallel=False): """ Converts a record from a DMARC aggregate report into a more consistent format @@ -207,7 +208,8 @@ def _parse_report_record(record, nameservers=None, dns_timeout=2.0, parallel=Fal return new_record -def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0, parallel=False): +def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0, + parallel=False): """Parses a DMARC XML report string and returns a consistent OrderedDict Args: @@ -380,7 +382,8 @@ def extract_xml(input_): return xml -def parse_aggregate_report_file(_input, nameservers=None, dns_timeout=2.0, parallel=False): +def parse_aggregate_report_file(_input, nameservers=None, dns_timeout=2.0, + parallel=False): """Parses a file at the given path, a file-like object. or bytes as a aggregate DMARC report @@ -817,8 +820,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, dns_timeout=dns_timeout, strip_attachment_payloads=sa, parallel=parallel) - except InvalidDMARCReport as e: - print("DEBUGGING: {}".format(e)) + except InvalidDMARCReport: raise InvalidDMARCReport("Not a valid aggregate or forensic " "report") return results diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 2af989c..401a7d7 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -13,7 +13,6 @@ import json from ssl import CERT_NONE, create_default_context from multiprocessing import Pool, Value from itertools import repeat -from contextlib import contextmanager import time from tqdm import tqdm @@ -24,11 +23,13 @@ from parsedmarc import IMAPError, get_dmarc_reports_from_inbox, \ logger = logging.getLogger("parsedmarc") + def _str_to_list(s): """Converts a comma separated string to a list""" _list = s.split(",") return list(map(lambda i: i.lstrip(), _list)) + def cli_parse(file_path, sa, nameservers, dns_timeout, parallel=False): """Separated this function for multiprocessing""" try: @@ -38,17 +39,19 @@ def cli_parse(file_path, sa, nameservers, dns_timeout, parallel=False): strip_attachment_payloads=sa, parallel=parallel) except ParserError as error: - return (error, file_path) + return (error, file_path) finally: global counter with counter.get_lock(): counter.value += 1 return (file_results, file_path) + def init(ctr): global counter counter = ctr + def _main(): """Called when the module is executed""" def process_reports(reports_): @@ -235,8 +238,8 @@ def _main(): opts.silent = general_config.getboolean("silent") if "log_file" in general_config: opts.log_file = general_config["log_file"] - if "n_cpus" in general_config: - opts.n_cpus = general_config.getint("n_cpus") + if "n_procs" in general_config: + opts.n_procs = general_config.getint("n_procs") if "chunksize" in general_config: opts.chunksize = general_config.getint("chunksize") if "imap" in config.sections(): @@ -389,12 +392,13 @@ def _main(): file_paths = list(set(file_paths)) counter = Value('i', 0) - pool = Pool(opts.n_cpus, initializer=init, initargs=(counter,)) - results = pool.starmap_async(cli_parse, zip(file_paths, - repeat(opts.strip_attachment_payloads), - repeat(opts.nameservers), - repeat(opts.dns_timeout), - repeat(opts.n_cpus >= 1)), opts.chunksize) + pool = Pool(opts.n_procs, initializer=init, initargs=(counter,)) + results = pool.starmap_async(cli_parse, + zip(file_paths, + repeat(opts.strip_attachment_payloads), + repeat(opts.nameservers), + repeat(opts.dns_timeout), + repeat(opts.n_procs >= 1)), opts.chunksize) pbar = tqdm(total=len(file_paths)) while not results.ready(): pbar.update(counter.value - pbar.n) @@ -402,7 +406,7 @@ def _main(): pbar.close() results = results.get() pool.close() - pool.join() + pool.join() for result in results: if type(result[0]) is InvalidDMARCReport: diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index 11f1e9c..e86dfca 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -267,7 +267,6 @@ def get_ip_address_country(ip_address): Args: location (str): Local location for the database file """ - import pdb; pdb.set_trace() url = "https://geolite.maxmind.com/download/geoip/database/" \ "GeoLite2-Country.tar.gz" # Use a browser-like user agent string to bypass some proxy blocks @@ -324,7 +323,8 @@ def get_ip_address_country(ip_address): return country -def get_ip_address_info(ip_address, cache=None, nameservers=None, timeout=2.0, parallel=False): +def get_ip_address_info(ip_address, cache=None, nameservers=None, + timeout=2.0, parallel=False): """ Returns reverse DNS and country information for the given IP address From 182cc251fc4ea10c1e80ff20a03a1d2fc6ed4887 Mon Sep 17 00:00:00 2001 From: zscholl Date: Wed, 20 Feb 2019 15:14:50 -0700 Subject: [PATCH 7/7] fix another pep8. fix default param issue --- parsedmarc/cli.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 401a7d7..33331e3 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -205,7 +205,9 @@ def _main(): smtp_to=[], smtp_subject="parsedmarc report", smtp_message="Please see the attached DMARC results.", - log_file=args.log_file + log_file=args.log_file, + n_procs=1, + chunksize=1 ) args = arg_parser.parse_args() @@ -398,7 +400,8 @@ def _main(): repeat(opts.strip_attachment_payloads), repeat(opts.nameservers), repeat(opts.dns_timeout), - repeat(opts.n_procs >= 1)), opts.chunksize) + repeat(opts.n_procs >= 1)), + opts.chunksize) pbar = tqdm(total=len(file_paths)) while not results.ready(): pbar.update(counter.value - pbar.n)