diff --git a/CHANGELOG.md b/CHANGELOG.md index e4e8506..8cd66b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,24 @@ Changelog ========= +7.0.1 +----- + +- Fix startup error (PR #254) + +7.0.0 +----- + +- Fix issue #221: Crash when handling invalid reports without root node (PR #248) +- Use UTC datetime objects for Elasticsearch output (PR #245) +- Fix issues #219, #155, and #103: IMAP connections break on large emails (PR #241) +- Add support for saving reports to S3 buckets (PR #223) +- Pass `offline` parameter to `wait_inbox()` (PR #216) +- Add more details to logging (PR #220) +- Add options customizing the names of output files (Modifications based on PR #225) +- Wait for 5 seconds before attempting to reconnect to an IMAP server (PR #217) +- Add option to process messages in batches (PR #222) + 6.12.0 ------ diff --git a/README.rst b/README.rst index b028445..3d5684e 100644 --- a/README.rst +++ b/README.rst @@ -58,17 +58,20 @@ CLI help :: - usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] - [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] - [-t DNS_TIMEOUT] [--offline] [-s] [--debug] - [--log-file LOG_FILE] [-v] - [file_path [file_path ...]] + usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT] + [--aggregate-json-filename AGGREGATE_JSON_FILENAME] + [--forensic-json-filename FORENSIC_JSON_FILENAME] + [--aggregate-csv-filename AGGREGATE_CSV_FILENAME] + [--forensic-csv-filename FORENSIC_CSV_FILENAME] + [-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] + [-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v] + [file_path ...] Parses DMARC reports positional arguments: file_path one or more paths to aggregate or forensic report - files or emails + files, emails, or mbox files' optional arguments: -h, --help show this help message and exit @@ -78,18 +81,27 @@ CLI help remove attachment payloads from forensic report output -o OUTPUT, --output OUTPUT write output files to the given directory + --aggregate-json-filename AGGREGATE_JSON_FILENAME + filename for the aggregate JSON output file + --forensic-json-filename FORENSIC_JSON_FILENAME + filename for the forensic JSON output file + --aggregate-csv-filename AGGREGATE_CSV_FILENAME + filename for the aggregate CSV output file + --forensic-csv-filename FORENSIC_CSV_FILENAME + filename for the forensic CSV output file -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] - nameservers to query (default is Cloudflare's - nameservers) + nameservers to query -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT number of seconds to wait for an answer from DNS (default: 2.0) --offline do not make online queries for geolocation or DNS -s, --silent only print errors and warnings + --verbose more verbose output --debug print debugging information --log-file LOG_FILE output logging to a file -v, --version show program's version number and exit + .. note:: In ``parsedmarc`` 6.0.0, most CLI options were moved to a configuration file, described below. @@ -128,13 +140,19 @@ For example token = HECTokenGoesHere index = email + [s3] + bucket = my-bucket + path = parsedmarc + The full set of configuration options are: - ``general`` - - ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk - - ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk + - ``save_aggregate`` - bool: Save aggregate report data to Elasticsearch, Splunk and/or S3 + - ``save_forensic`` - bool: Save forensic report data to Elasticsearch, Splunk and/or S3 - ``strip_attachment_payloads`` - bool: Remove attachment payloads from results - ``output`` - str: Directory to place JSON and CSV files in + - ``aggregate_json_filename`` - str: filename for the aggregate JSON output file + - ``forensic_json_filename`` - str: filename for the forensic JSON output file - ``offline`` - bool: Do not use online queries for geolocation or DNS - ``nameservers`` - str: A comma separated list of DNS resolvers (Default: `Cloudflare's public resolvers`_) - ``dns_timeout`` - float: DNS timeout period @@ -142,10 +160,18 @@ The full set of configuration options are: - ``silent`` - bool: Only print errors (Default: True) - ``log_file`` - str: Write log messages to a file at this path - ``n_procs`` - int: Number of process to run in parallel when parsing in CLI mode (Default: 1) - - ``chunk_size`` - int: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files + - ``chunk_size`` - int: Number of files to give to each process when running in parallel. + + .. note:: + Setting this to a number larger than one can improve performance when processing thousands of files - ``imap`` + - ``host`` - str: The IMAP server hostname or IP address - - ``port`` - int: The IMAP server port (Default: 993) + - ``port`` - int: The IMAP server port (Default: 993). + + .. note:: + If your host recommends another port, still try 993 + - ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True) - ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended) - ``user`` - str: The IMAP user @@ -155,11 +181,13 @@ The full set of configuration options are: - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive - ``delete`` - bool: Delete messages after processing them, instead of archiving them - ``test`` - bool: Do not move or delete messages + - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. - ``elasticsearch`` - ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``) - .. note:: + .. note:: Special characters in the username or password must be `URL encoded`_. + - ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True) - ``cert_path`` - str: Path to a trusted certificates - ``index_suffix`` - str: A suffix to apply to the index names @@ -191,6 +219,9 @@ The full set of configuration options are: - ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report) - ``attachment`` - str: The ZIP attachment filenames - ``message`` - str: The email message (Default: Please see the attached parsedmarc report.) +- ``s3`` + - ``bucket`` - str: The S3 bucket name + - ``path`` - int: The path to upload reports to (Default: /) .. warning:: diff --git a/docs/example.ini b/docs/example.ini index a27a670..efa56b3 100644 --- a/docs/example.ini +++ b/docs/example.ini @@ -18,3 +18,7 @@ ssl = False url = https://splunkhec.example.com token = HECTokenGoesHere index = email + +[s3] +bucket = my-bucket +path = parsedmarc \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 449f048..fa9cd62 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -62,36 +62,48 @@ CLI help :: - usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] - [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] - [-t DNS_TIMEOUT] [--offline] [-s] [--debug] - [--log-file LOG_FILE] [-v] - [file_path [file_path ...]] + usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT] + [--aggregate-json-filename AGGREGATE_JSON_FILENAME] + [--forensic-json-filename FORENSIC_JSON_FILENAME] + [--aggregate-csv-filename AGGREGATE_CSV_FILENAME] + [--forensic-csv-filename FORENSIC_CSV_FILENAME] + [-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline] + [-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v] + [file_path ...] - Parses DMARC reports + Parses DMARC reports - positional arguments: - file_path one or more paths to aggregate or forensic report - files or emails + positional arguments: + file_path one or more paths to aggregate or forensic report + files, emails, or mbox files' - optional arguments: - -h, --help show this help message and exit - -c CONFIG_FILE, --config-file CONFIG_FILE - a path to a configuration file (--silent implied) - --strip-attachment-payloads - remove attachment payloads from forensic report output - -o OUTPUT, --output OUTPUT - write output files to the given directory - -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] - nameservers to query - -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT - number of seconds to wait for an answer from DNS - (default: 2.0) - --offline do not make online queries for geolocation or DNS - -s, --silent only print errors and warnings - --debug print debugging information - --log-file LOG_FILE output logging to a file - -v, --version show program's version number and exit + optional arguments: + -h, --help show this help message and exit + -c CONFIG_FILE, --config-file CONFIG_FILE + a path to a configuration file (--silent implied) + --strip-attachment-payloads + remove attachment payloads from forensic report output + -o OUTPUT, --output OUTPUT + write output files to the given directory + --aggregate-json-filename AGGREGATE_JSON_FILENAME + filename for the aggregate JSON output file + --forensic-json-filename FORENSIC_JSON_FILENAME + filename for the forensic JSON output file + --aggregate-csv-filename AGGREGATE_CSV_FILENAME + filename for the aggregate CSV output file + --forensic-csv-filename FORENSIC_CSV_FILENAME + filename for the forensic CSV output file + -n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...] + nameservers to query + -t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT + number of seconds to wait for an answer from DNS + (default: 2.0) + --offline do not make online queries for geolocation or DNS + -s, --silent only print errors and warnings + --verbose more verbose output + --debug print debugging information + --log-file LOG_FILE output logging to a file + -v, --version show program's version number and exit .. note:: @@ -132,13 +144,19 @@ For example token = HECTokenGoesHere index = email + [s3] + bucket = my-bucket + path = parsedmarc + The full set of configuration options are: - ``general`` - - ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk - - ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk + - ``save_aggregate`` - bool: Save aggregate report data to Elasticsearch, Splunk and/or S3 + - ``save_forensic`` - bool: Save forensic report data to Elasticsearch, Splunk and/or S3 - ``strip_attachment_payloads`` - bool: Remove attachment payloads from results - ``output`` - str: Directory to place JSON and CSV files in + - ``aggregate_json_filename`` - str: filename for the aggregate JSON output file + - ``forensic_json_filename`` - str: filename for the forensic JSON output file - ``offline`` - bool: Do not use online queries for geolocation or DNS - ``nameservers`` - str: A comma separated list of DNS resolvers (Default: `Cloudflare's public resolvers`_) - ``dns_timeout`` - float: DNS timeout period @@ -146,31 +164,36 @@ The full set of configuration options are: - ``silent`` - bool: Only print errors (Default: True) - ``log_file`` - str: Write log messages to a file at this path - ``n_procs`` - int: Number of process to run in parallel when parsing in CLI mode (Default: 1) - - ``chunk_size`` - int: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files + - ``chunk_size`` - int: Number of files to give to each process when running in parallel. + + .. note:: + Setting this to a number larger than one can improve performance when processing thousands of files - ``imap`` + - ``host`` - str: The IMAP server hostname or IP address - - ``port`` - int: The IMAP server port (Default: 993) + - ``port`` - int: The IMAP server port (Default: 993). + + .. note:: + If your host recommends another port, still try 993 + - ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True) - ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended) - - ``timeout`` - float: Timeout in seconds to wait for an IMAP operation to complete (Default: 30) - - ``max_retries`` - int: The maximum number of retries after a timeout - ``user`` - str: The IMAP user - - ``password`` - str: The IMAP password (escape ``%`` with a second ``%``) + - ``password`` - str: The IMAP password - ``reports_folder`` - str: The IMAP folder where the incoming reports can be found (Default: INBOX) - ``archive_folder`` - str: The IMAP folder to sort processed emails into (Default: Archive) - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive - ``delete`` - bool: Delete messages after processing them, instead of archiving them - ``test`` - bool: Do not move or delete messages + - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. - ``elasticsearch`` - ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``) - .. note:: + .. note:: Special characters in the username or password must be `URL encoded`_. + - ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True) - - ``user`` - str: Basic auth username - - ``password`` - str: Basic auth password - ``cert_path`` - str: Path to a trusted certificates - - ``timeout`` - float: Timeout in seconds (Default: 60) - ``index_suffix`` - str: A suffix to apply to the index names - ``monthly_indexes`` - bool: Use monthly indexes instead of daily indexes - ``number_of_shards`` - int: The number of shards to use when creating the index (Default: 1) @@ -200,7 +223,9 @@ The full set of configuration options are: - ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report) - ``attachment`` - str: The ZIP attachment filenames - ``message`` - str: The email message (Default: Please see the attached parsedmarc report.) - +- ``s3`` + - ``bucket`` - str: The S3 bucket name + - ``path`` - int: The path to upload reports to (Default: /) .. warning:: diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index c78cfc8..2750819 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -8,6 +8,7 @@ import shutil import xml.parsers.expat as expat import json from datetime import datetime +from time import sleep from collections import OrderedDict from io import BytesIO, StringIO from gzip import GzipFile @@ -35,7 +36,7 @@ from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime from parsedmarc.utils import parse_email -__version__ = "6.12.0" +__version__ = "7.0.1" logging.basicConfig( format='%(levelname)8s:%(filename)s:%(lineno)d:' @@ -203,7 +204,7 @@ def _parse_report_record(record, offline=False, nameservers=None, def parse_aggregate_report_xml(xml, offline=False, nameservers=None, - timeout=2.0, parallel=False): + timeout=2.0, parallel=False, server=None): """Parses a DMARC XML report string and returns a consistent OrderedDict Args: @@ -213,6 +214,7 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None, (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS timeout in seconds parallel (bool): Parallel processing + server (IMAPClient): Connection object Returns: OrderedDict: The parsed aggregate DMARC report @@ -305,8 +307,13 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None, new_report["policy_published"] = new_policy_published if type(report["record"]) == list: - for record in report["record"]: - report_record = _parse_report_record(record, + for i in range(len(report["record"])): + if server is not None and i > 0 and i % 20 == 0: + logger.debug("Sending noop cmd") + server.noop() + logger.debug("Processed {0}/{1}".format( + i, len(report["record"]))) + report_record = _parse_report_record(report["record"][i], offline=offline, nameservers=nameservers, dns_timeout=timeout, @@ -386,7 +393,8 @@ def extract_xml(input_): def parse_aggregate_report_file(_input, offline=False, nameservers=None, dns_timeout=2.0, - parallel=False): + parallel=False, + server=None): """Parses a file at the given path, a file-like object. or bytes as a aggregate DMARC report @@ -397,6 +405,7 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None, (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds parallel (bool): Parallel processing + server (IMAPClient): Connection object Returns: OrderedDict: The parsed DMARC aggregate report @@ -407,7 +416,8 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None, offline=offline, nameservers=nameservers, timeout=dns_timeout, - parallel=parallel) + parallel=parallel, + server=server) def parsed_aggregate_reports_to_csv_rows(reports): @@ -739,7 +749,7 @@ def parsed_forensic_reports_to_csv(reports): def parse_report_email(input_, offline=False, nameservers=None, dns_timeout=2.0, strip_attachment_payloads=False, - parallel=False): + parallel=False, server=None): """ Parses a DMARC report from an email @@ -751,6 +761,7 @@ def parse_report_email(input_, offline=False, nameservers=None, strip_attachment_payloads (bool): Remove attachment payloads from forensic report results parallel (bool): Parallel processing + server (IMAPClient): Connection object Returns: OrderedDict: @@ -777,6 +788,8 @@ def parse_report_email(input_, offline=False, nameservers=None, subject = None feedback_report = None sample = None + if "From" in msg_headers: + logger.info("Parsing mail from {0}".format(msg_headers["From"])) if "Subject" in msg_headers: subject = msg_headers["Subject"] for part in msg.walk(): @@ -814,7 +827,8 @@ def parse_report_email(input_, offline=False, nameservers=None, offline=offline, nameservers=ns, dns_timeout=dns_timeout, - parallel=parallel) + parallel=parallel, + server=server) result = OrderedDict([("report_type", "aggregate"), ("report", aggregate_report)]) return result @@ -864,7 +878,7 @@ def parse_report_email(input_, offline=False, nameservers=None, def parse_report_file(input_, nameservers=None, dns_timeout=2.0, strip_attachment_payloads=False, - offline=False, parallel=False): + offline=False, parallel=False, server=None): """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -877,6 +891,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, forensic report results offline (bool): Do not make online queries for geolocation or DNS parallel (bool): Parallel processing + server (IMAPClient): Connection object Returns: OrderedDict: The parsed DMARC report @@ -896,7 +911,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, offline=offline, nameservers=nameservers, dns_timeout=dns_timeout, - parallel=parallel) + parallel=parallel, + server=server) results = OrderedDict([("report_type", "aggregate"), ("report", report)]) except InvalidAggregateReport: @@ -907,7 +923,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, nameservers=nameservers, dns_timeout=dns_timeout, strip_attachment_payloads=sa, - parallel=parallel) + parallel=parallel, + server=server) except InvalidDMARCReport: raise InvalidDMARCReport("Not a valid aggregate or forensic " "report") @@ -944,7 +961,7 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0, input_)) for i in range(len(message_keys)): message_key = message_keys[i] - logger.debug("Processing message {0} of {1}".format( + logger.info("Processing message {0} of {1}".format( i+1, total_messages )) msg_content = mbox.get_string(message_key) @@ -1005,7 +1022,8 @@ def get_dmarc_reports_from_inbox(connection=None, nameservers=None, dns_timeout=6.0, strip_attachment_payloads=False, - results=None): + results=None, + batch_size=None): """ Fetches and parses DMARC reports from an inbox @@ -1029,6 +1047,7 @@ def get_dmarc_reports_from_inbox(connection=None, strip_attachment_payloads (bool): Remove attachment payloads from forensic report results results (dict): Results from the previous run + batch_size (int): Number of messages to read and process before saving Returns: OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` @@ -1071,11 +1090,18 @@ def get_dmarc_reports_from_inbox(connection=None, total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) - for i in range(len(messages)): + + if batch_size: + message_limit = min(total_messages, batch_size) + else: + message_limit = total_messages + + logger.debug("Processing {0} messages".format(message_limit)) + + for i in range(message_limit): msg_uid = messages[i] logger.debug("Processing message {0} of {1}: UID {2}".format( - i+1, total_messages, msg_uid - + i+1, message_limit, msg_uid )) msg_content = server.fetch_message(msg_uid, parse=False) sa = strip_attachment_payloads @@ -1084,7 +1110,8 @@ def get_dmarc_reports_from_inbox(connection=None, nameservers=nameservers, dns_timeout=dns_timeout, offline=offline, - strip_attachment_payloads=sa) + strip_attachment_payloads=sa, + server=server) if parsed_email["report_type"] == "aggregate": aggregate_reports.append(parsed_email["report"]) aggregate_report_msg_uids.append(msg_uid) @@ -1167,7 +1194,7 @@ def get_dmarc_reports_from_inbox(connection=None, total_messages = len(server.search()) - if not test and total_messages > 0: + if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run results = get_dmarc_reports_from_inbox( connection=server, @@ -1189,7 +1216,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, verify=True, reports_folder="INBOX", archive_folder="Archive", delete=False, test=False, idle_timeout=30, offline=False, nameservers=None, - dns_timeout=6.0, strip_attachment_payloads=False): + dns_timeout=6.0, strip_attachment_payloads=False, + batch_size=None): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function @@ -1212,6 +1240,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, dns_timeout (float): Set the DNS query timeout strip_attachment_payloads (bool): Replace attachment payloads in forensic report samples with None + batch_size (int): Number of messages to read and process before saving """ sa = strip_attachment_payloads @@ -1224,7 +1253,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, offline=offline, nameservers=nameservers, dns_timeout=dns_timeout, - strip_attachment_payloads=sa) + strip_attachment_payloads=sa, + batch_size=batch_size) callback(res) while True: @@ -1236,15 +1266,28 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, idle_timeout=idle_timeout) except (timeout, IMAPClientError): logger.warning("IMAP connection timeout. Reconnecting...") + sleep(5) + except Exception as e: + logger.warning("IMAP connection error. {0}. " + "Reconnecting...".format(e)) + sleep(5) -def save_output(results, output_directory="output"): +def save_output(results, output_directory="output", + aggregate_json_filename="aggregate.json", + forensic_json_filename="forensic.json", + aggregate_csv_filename="aggregate.csv", + forensic_csv_filename="forensic.csv"): """ Save report data in the given directory Args: results (OrderedDict): Parsing results - output_directory: The patch to the directory to save in + output_directory (str): The patch to the directory to save in + aggregate_json_filename (str): Filename for the aggregate JSON file + forensic_json_filename (str): Filename for the forensic JSON file + aggregate_csv_filename (str): Filename for the aggregate CSV file + forensic_csv_filename (str): Filename for the forensic CSV file """ aggregate_reports = results["aggregate_reports"] @@ -1256,22 +1299,30 @@ def save_output(results, output_directory="output"): else: os.makedirs(output_directory) - with open("{0}".format(os.path.join(output_directory, "aggregate.json")), + with open("{0}" + .format(os.path.join(output_directory, + aggregate_json_filename)), "w", newline="\n", encoding="utf-8") as agg_json: agg_json.write(json.dumps(aggregate_reports, ensure_ascii=False, indent=2)) - with open("{0}".format(os.path.join(output_directory, "aggregate.csv")), + with open("{0}" + .format(os.path.join(output_directory, + aggregate_csv_filename)), "w", newline="\n", encoding="utf-8") as agg_csv: csv = parsed_aggregate_reports_to_csv(aggregate_reports) agg_csv.write(csv) - with open("{0}".format(os.path.join(output_directory, "forensic.json")), + with open("{0}" + .format(os.path.join(output_directory, + forensic_json_filename)), "w", newline="\n", encoding="utf-8") as for_json: for_json.write(json.dumps(forensic_reports, ensure_ascii=False, indent=2)) - with open("{0}".format(os.path.join(output_directory, "forensic.csv")), + with open("{0}" + .format(os.path.join(output_directory, + forensic_csv_filename)), "w", newline="\n", encoding="utf-8") as for_csv: csv = parsed_forensic_reports_to_csv(forensic_reports) for_csv.write(csv) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index e9119e0..be2026a 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -19,7 +19,7 @@ from tqdm import tqdm from parsedmarc import get_dmarc_reports_from_inbox, watch_inbox, \ parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \ splunk, save_output, email_results, ParserError, __version__, \ - InvalidDMARCReport + InvalidDMARCReport, s3 from parsedmarc.utils import is_mbox logger = logging.getLogger("parsedmarc") @@ -79,6 +79,14 @@ def _main(): ) except Exception as error_: logger.error("Kafka Error: {0}".format(error_.__str__())) + if opts.s3_bucket: + try: + s3_client = s3.S3Client( + bucket_name=opts.s3_bucket, + bucket_path=opts.s3_path, + ) + except Exception as error_: + logger.error("S3 Error: {0}".format(error_.__str__())) if opts.save_aggregate: for report in reports_["aggregate_reports"]: try: @@ -104,6 +112,11 @@ def _main(): except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) + try: + if opts.s3_bucket: + s3_client.save_aggregate_report_to_s3(report) + except Exception as error_: + logger.error("S3 Error: {0}".format(error_.__str__())) if opts.hec: try: aggregate_reports_ = reports_["aggregate_reports"] @@ -138,6 +151,11 @@ def _main(): except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) + try: + if opts.s3_bucket: + s3_client.save_forensic_report_to_s3(report) + except Exception as error_: + logger.error("S3 Error: {0}".format(error_.__str__())) if opts.hec: try: forensic_reports_ = reports_["forensic_reports"] @@ -160,6 +178,18 @@ def _main(): help=strip_attachment_help, action="store_true") arg_parser.add_argument("-o", "--output", help="write output files to the given directory") + arg_parser.add_argument("--aggregate-json-filename", + help="filename for the aggregate JSON output file", + default="aggregate.json") + arg_parser.add_argument("--forensic-json-filename", + help="filename for the forensic JSON output file", + default="forensic.json") + arg_parser.add_argument("--aggregate-csv-filename", + help="filename for the aggregate CSV output file", + default="aggregate.csv") + arg_parser.add_argument("--forensic-csv-filename", + help="filename for the forensic CSV output file", + default="forensic.csv") arg_parser.add_argument("-n", "--nameservers", nargs="+", help="nameservers to query") arg_parser.add_argument("-t", "--dns_timeout", @@ -185,11 +215,16 @@ def _main(): forensic_reports = [] args = arg_parser.parse_args() + opts = Namespace(file_path=args.file_path, config_file=args.config_file, offline=args.offline, strip_attachment_payloads=args.strip_attachment_payloads, output=args.output, + aggregate_csv_filename=args.aggregate_csv_filename, + aggregate_json_filename=args.aggregate_json_filename, + forensic_csv_filename=args.forensic_csv_filename, + forensic_json_filename=args.forensic_json_filename, nameservers=args.nameservers, silent=args.silent, dns_timeout=args.dns_timeout, @@ -210,6 +245,7 @@ def _main(): imap_watch=False, imap_delete=False, imap_test=False, + imap_batch_size=None, hec=None, hec_token=None, hec_index=None, @@ -241,6 +277,8 @@ def _main(): smtp_to=[], smtp_subject="parsedmarc report", smtp_message="Please see the attached DMARC results.", + s3_bucket=None, + s3_path=None, log_file=args.log_file, n_procs=1, chunk_size=1 @@ -264,6 +302,18 @@ def _main(): "strip_attachment_payloads"] if "output" in general_config: opts.output = general_config["output"] + if "aggregate_json_filename" in general_config: + opts.aggregate_json_filename = general_config[ + "aggregate_json_filename"] + if "forensic_json_filename" in general_config: + opts.forensic_json_filename = general_config[ + "forensic_json_filename"] + if "aggregate_csv_filename" in general_config: + opts.aggregate_csv_filename = general_config[ + "aggregate_csv_filename"] + if "forensic_csv_filename" in general_config: + opts.forensic_csv_filename = general_config[ + "forensic_csv_filename"] if "nameservers" in general_config: opts.nameservers = _str_to_list(general_config["nameservers"]) if "dns_timeout" in general_config: @@ -327,6 +377,10 @@ def _main(): opts.imap_delete = imap_config.getboolean("delete") if "test" in imap_config: opts.imap_test = imap_config.getboolean("test") + if "batch_size" in imap_config: + opts.imap_batch_size = imap_config.getint("batch_size") + else: + opts.imap_batch_size = None if "elasticsearch" in config: elasticsearch_config = config["elasticsearch"] if "hosts" in elasticsearch_config: @@ -469,6 +523,22 @@ def _main(): opts.smtp_attachment = smtp_config["attachment"] if "message" in smtp_config: opts.smtp_message = smtp_config["message"] + if "s3" in config.sections(): + s3_config = config["s3"] + if "bucket" in s3_config: + opts.s3_bucket = s3_config["bucket"] + else: + logger.critical("bucket setting missing from the " + "s3 config section") + exit(-1) + if "path" in s3_config: + opts.s3_path = s3_config["path"] + if opts.s3_path.startswith("/"): + opts.s3_path = opts.s3_path[1:] + if opts.s3_path.endswith("/"): + opts.s3_path = opts.s3_path[:-1] + else: + opts.s3_path = "" logging.basicConfig(level=logging.WARNING) logger.setLevel(logging.WARNING) @@ -490,6 +560,8 @@ def _main(): logger.error("You must supply input files, or an IMAP configuration") exit(1) + logger.info("Starting dmarcparse") + if opts.save_aggregate or opts.save_forensic: try: if opts.elasticsearch_hosts: @@ -613,8 +685,9 @@ def _main(): offline=opts.offline, nameservers=ns, test=opts.imap_test, - strip_attachment_payloads=sa - ) + strip_attachment_payloads=sa, + batch_size=opts.imap_batch_size + ) aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] @@ -627,7 +700,11 @@ def _main(): ("forensic_reports", forensic_reports)]) if opts.output: - save_output(results, output_directory=opts.output) + save_output(results, output_directory=opts.output, + aggregate_json_filename=opts.aggregate_json_filename, + forensic_json_filename=opts.forensic_json_filename, + aggregate_csv_filename=opts.aggregate_csv_filename, + forensic_csv_filename=opts.forensic_csv_filename) process_reports(results) @@ -670,7 +747,9 @@ def _main(): test=opts.imap_test, nameservers=opts.nameservers, dns_timeout=opts.dns_timeout, - strip_attachment_payloads=sa) + strip_attachment_payloads=sa, + batch_size=opts.imap_batch_size, + offline=opts.offline) except FileExistsError as error: logger.error("{0}".format(error.__str__())) exit(1) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 32cdea4..7085983 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -295,14 +295,16 @@ def save_aggregate_report_to_elasticsearch(aggregate_report, Raises: AlreadySaved """ - logger.debug("Saving aggregate report to Elasticsearch") + logger.info("Saving aggregate report to Elasticsearch") aggregate_report = aggregate_report.copy() metadata = aggregate_report["report_metadata"] org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] - begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) - end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) + begin_date = human_timestamp_to_datetime(metadata["begin_date"], + to_utc=True) + end_date = human_timestamp_to_datetime(metadata["end_date"], + to_utc=True) begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") if monthly_indexes: @@ -426,7 +428,7 @@ def save_forensic_report_to_elasticsearch(forensic_report, AlreadySaved """ - logger.debug("Saving forensic report to Elasticsearch") + logger.info("Saving forensic report to Elasticsearch") forensic_report = forensic_report.copy() sample_date = None if forensic_report["parsed_sample"]["date"] is not None: diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py new file mode 100644 index 0000000..41910ed --- /dev/null +++ b/parsedmarc/s3.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- + +import logging +import json +import boto3 + +from parsedmarc.utils import human_timestamp_to_datetime + +logger = logging.getLogger("parsedmarc") + + +class S3Client(object): + """A client for a Amazon S3""" + + def __init__(self, bucket_name, bucket_path): + """ + Initializes the S3Client + Args: + bucket_name (str): The S3 Bucket + bucket_path (str): The path to save reports + """ + self.bucket_name = bucket_name + self.bucket_path = bucket_path + self.metadata_keys = [ + "org_name", + "org_email", + "report_id", + "begin_date", + "end_date", + ] + + self.s3 = boto3.resource('s3') + self.bucket = self.s3.Bucket(self.bucket_name) + + def save_aggregate_report_to_s3(self, report): + self.save_report_to_s3(report, 'aggregate') + + def save_forensic_report_to_s3(self, report): + self.save_report_to_s3(report, 'forensic') + + def save_report_to_s3(self, report, report_type): + report_date = human_timestamp_to_datetime( + report["report_metadata"]["begin_date"] + ) + report_id = report["report_metadata"]["report_id"] + path_template = "{0}/{1}/year={2}/month={3:02d}/day={4:02d}/{5}.json" + object_path = path_template.format( + self.bucket_path, + report_type, + report_date.year, + report_date.month, + report_date.day, + report_id + ) + logger.debug("Saving {0} report to s3://{1}/{2}".format( + report_type, + self.bucket_name, + object_path)) + object_metadata = { + k: v + for k, v in report["report_metadata"].items() + if k in self.metadata_keys + } + self.bucket.put_object( + Body=json.dumps(report), + Key=object_path, + Metadata=object_metadata + ) diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index dbf525f..6b5f980 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -157,7 +157,7 @@ def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0): if record_type == "TXT": resource_records = list(map( lambda r: r.strings, - resolver.query(domain, record_type, lifetime=timeout))) + resolver.resolve(domain, record_type, lifetime=timeout))) _resource_record = [ resource_record[0][:0].join(resource_record) for resource_record in resource_records if resource_record] @@ -165,7 +165,7 @@ def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0): else: records = list(map( lambda r: r.to_text().replace('"', '').rstrip("."), - resolver.query(domain, record_type, lifetime=timeout))) + resolver.resolve(domain, record_type, lifetime=timeout))) if cache: cache[cache_key] = records diff --git a/requirements.txt b/requirements.txt index 4200a90..9ef3fab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,4 @@ sphinx_rtd_theme>=0.4.3 wheel>=0.33.6 codecov>=2.0.15 lxml>=4.4.0 +boto3>=1.16.63 \ No newline at end of file diff --git a/setup.py b/setup.py index 6faaf91..6929893 100644 --- a/setup.py +++ b/setup.py @@ -98,7 +98,8 @@ setup( 'elasticsearch-dsl>=7.2.0,<8.0.0', 'kafka-python>=1.4.4', 'tqdm>=4.31.1', - 'lxml>=4.4.0' + 'lxml>=4.4.0', + 'boto3>=1.16.63' ], entry_points={