Merge branch 'domainaware:master' into master

This commit is contained in:
Matthäus Wander
2021-08-22 10:27:07 +02:00
committed by GitHub
11 changed files with 370 additions and 90 deletions
+18
View File
@@ -1,6 +1,24 @@
Changelog
=========
7.0.1
-----
- Fix startup error (PR #254)
7.0.0
-----
- Fix issue #221: Crash when handling invalid reports without root node (PR #248)
- Use UTC datetime objects for Elasticsearch output (PR #245)
- Fix issues #219, #155, and #103: IMAP connections break on large emails (PR #241)
- Add support for saving reports to S3 buckets (PR #223)
- Pass `offline` parameter to `wait_inbox()` (PR #216)
- Add more details to logging (PR #220)
- Add options customizing the names of output files (Modifications based on PR #225)
- Wait for 5 seconds before attempting to reconnect to an IMAP server (PR #217)
- Add option to process messages in batches (PR #222)
6.12.0
------
+44 -13
View File
@@ -58,17 +58,20 @@ CLI help
::
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads]
[-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t DNS_TIMEOUT] [--offline] [-s] [--debug]
[--log-file LOG_FILE] [-v]
[file_path [file_path ...]]
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME]
[--forensic-json-filename FORENSIC_JSON_FILENAME]
[--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline]
[-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v]
[file_path ...]
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or forensic report
files or emails
files, emails, or mbox files'
optional arguments:
-h, --help show this help message and exit
@@ -78,18 +81,27 @@ CLI help
remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query (default is Cloudflare's
nameservers)
nameservers to query
-t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT
number of seconds to wait for an answer from DNS
(default: 2.0)
--offline do not make online queries for geolocation or DNS
-s, --silent only print errors and warnings
--verbose more verbose output
--debug print debugging information
--log-file LOG_FILE output logging to a file
-v, --version show program's version number and exit
.. note::
In ``parsedmarc`` 6.0.0, most CLI options were moved to a configuration file, described below.
@@ -128,13 +140,19 @@ For example
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
The full set of configuration options are:
- ``general``
- ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk
- ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk
- ``save_aggregate`` - bool: Save aggregate report data to Elasticsearch, Splunk and/or S3
- ``save_forensic`` - bool: Save forensic report data to Elasticsearch, Splunk and/or S3
- ``strip_attachment_payloads`` - bool: Remove attachment payloads from results
- ``output`` - str: Directory to place JSON and CSV files in
- ``aggregate_json_filename`` - str: filename for the aggregate JSON output file
- ``forensic_json_filename`` - str: filename for the forensic JSON output file
- ``offline`` - bool: Do not use online queries for geolocation or DNS
- ``nameservers`` - str: A comma separated list of DNS resolvers (Default: `Cloudflare's public resolvers`_)
- ``dns_timeout`` - float: DNS timeout period
@@ -142,10 +160,18 @@ The full set of configuration options are:
- ``silent`` - bool: Only print errors (Default: True)
- ``log_file`` - str: Write log messages to a file at this path
- ``n_procs`` - int: Number of process to run in parallel when parsing in CLI mode (Default: 1)
- ``chunk_size`` - int: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files
- ``chunk_size`` - int: Number of files to give to each process when running in parallel.
.. note::
Setting this to a number larger than one can improve performance when processing thousands of files
- ``imap``
- ``host`` - str: The IMAP server hostname or IP address
- ``port`` - int: The IMAP server port (Default: 993)
- ``port`` - int: The IMAP server port (Default: 993).
.. note::
If your host recommends another port, still try 993
- ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True)
- ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended)
- ``user`` - str: The IMAP user
@@ -155,11 +181,13 @@ The full set of configuration options are:
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)
.. note::
.. note::
Special characters in the username or password must be `URL encoded`_.
- ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True)
- ``cert_path`` - str: Path to a trusted certificates
- ``index_suffix`` - str: A suffix to apply to the index names
@@ -191,6 +219,9 @@ The full set of configuration options are:
- ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report)
- ``attachment`` - str: The ZIP attachment filenames
- ``message`` - str: The email message (Default: Please see the attached parsedmarc report.)
- ``s3``
- ``bucket`` - str: The S3 bucket name
- ``path`` - int: The path to upload reports to (Default: /)
.. warning::
+4
View File
@@ -18,3 +18,7 @@ ssl = False
url = https://splunkhec.example.com
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
+64 -39
View File
@@ -62,36 +62,48 @@ CLI help
::
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads]
[-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t DNS_TIMEOUT] [--offline] [-s] [--debug]
[--log-file LOG_FILE] [-v]
[file_path [file_path ...]]
usage: parsedmarc [-h] [-c CONFIG_FILE] [--strip-attachment-payloads] [-o OUTPUT]
[--aggregate-json-filename AGGREGATE_JSON_FILENAME]
[--forensic-json-filename FORENSIC_JSON_FILENAME]
[--aggregate-csv-filename AGGREGATE_CSV_FILENAME]
[--forensic-csv-filename FORENSIC_CSV_FILENAME]
[-n NAMESERVERS [NAMESERVERS ...]] [-t DNS_TIMEOUT] [--offline]
[-s] [--verbose] [--debug] [--log-file LOG_FILE] [-v]
[file_path ...]
Parses DMARC reports
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or forensic report
files or emails
positional arguments:
file_path one or more paths to aggregate or forensic report
files, emails, or mbox files'
optional arguments:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
--strip-attachment-payloads
remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query
-t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT
number of seconds to wait for an answer from DNS
(default: 2.0)
--offline do not make online queries for geolocation or DNS
-s, --silent only print errors and warnings
--debug print debugging information
--log-file LOG_FILE output logging to a file
-v, --version show program's version number and exit
optional arguments:
-h, --help show this help message and exit
-c CONFIG_FILE, --config-file CONFIG_FILE
a path to a configuration file (--silent implied)
--strip-attachment-payloads
remove attachment payloads from forensic report output
-o OUTPUT, --output OUTPUT
write output files to the given directory
--aggregate-json-filename AGGREGATE_JSON_FILENAME
filename for the aggregate JSON output file
--forensic-json-filename FORENSIC_JSON_FILENAME
filename for the forensic JSON output file
--aggregate-csv-filename AGGREGATE_CSV_FILENAME
filename for the aggregate CSV output file
--forensic-csv-filename FORENSIC_CSV_FILENAME
filename for the forensic CSV output file
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query
-t DNS_TIMEOUT, --dns_timeout DNS_TIMEOUT
number of seconds to wait for an answer from DNS
(default: 2.0)
--offline do not make online queries for geolocation or DNS
-s, --silent only print errors and warnings
--verbose more verbose output
--debug print debugging information
--log-file LOG_FILE output logging to a file
-v, --version show program's version number and exit
.. note::
@@ -132,13 +144,19 @@ For example
token = HECTokenGoesHere
index = email
[s3]
bucket = my-bucket
path = parsedmarc
The full set of configuration options are:
- ``general``
- ``save_aggregate`` - bool: Save aggregate report data to the Elasticsearch and/or Splunk
- ``save_forensic`` - bool: Save forensic report data to the Elasticsearch and/or Splunk
- ``save_aggregate`` - bool: Save aggregate report data to Elasticsearch, Splunk and/or S3
- ``save_forensic`` - bool: Save forensic report data to Elasticsearch, Splunk and/or S3
- ``strip_attachment_payloads`` - bool: Remove attachment payloads from results
- ``output`` - str: Directory to place JSON and CSV files in
- ``aggregate_json_filename`` - str: filename for the aggregate JSON output file
- ``forensic_json_filename`` - str: filename for the forensic JSON output file
- ``offline`` - bool: Do not use online queries for geolocation or DNS
- ``nameservers`` - str: A comma separated list of DNS resolvers (Default: `Cloudflare's public resolvers`_)
- ``dns_timeout`` - float: DNS timeout period
@@ -146,31 +164,36 @@ The full set of configuration options are:
- ``silent`` - bool: Only print errors (Default: True)
- ``log_file`` - str: Write log messages to a file at this path
- ``n_procs`` - int: Number of process to run in parallel when parsing in CLI mode (Default: 1)
- ``chunk_size`` - int: Number of files to give to each process when running in parallel. Setting this to a number larger than one can improve performance when processing thousands of files
- ``chunk_size`` - int: Number of files to give to each process when running in parallel.
.. note::
Setting this to a number larger than one can improve performance when processing thousands of files
- ``imap``
- ``host`` - str: The IMAP server hostname or IP address
- ``port`` - int: The IMAP server port (Default: 993)
- ``port`` - int: The IMAP server port (Default: 993).
.. note::
If your host recommends another port, still try 993
- ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True)
- ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended)
- ``timeout`` - float: Timeout in seconds to wait for an IMAP operation to complete (Default: 30)
- ``max_retries`` - int: The maximum number of retries after a timeout
- ``user`` - str: The IMAP user
- ``password`` - str: The IMAP password (escape ``%`` with a second ``%``)
- ``password`` - str: The IMAP password
- ``reports_folder`` - str: The IMAP folder where the incoming reports can be found (Default: INBOX)
- ``archive_folder`` - str: The IMAP folder to sort processed emails into (Default: Archive)
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)
.. note::
.. note::
Special characters in the username or password must be `URL encoded`_.
- ``ssl`` - bool: Use an encrypted SSL/TLS connection (Default: True)
- ``user`` - str: Basic auth username
- ``password`` - str: Basic auth password
- ``cert_path`` - str: Path to a trusted certificates
- ``timeout`` - float: Timeout in seconds (Default: 60)
- ``index_suffix`` - str: A suffix to apply to the index names
- ``monthly_indexes`` - bool: Use monthly indexes instead of daily indexes
- ``number_of_shards`` - int: The number of shards to use when creating the index (Default: 1)
@@ -200,7 +223,9 @@ The full set of configuration options are:
- ``subject`` - str: The Subject header to use in the email (Default: parsedmarc report)
- ``attachment`` - str: The ZIP attachment filenames
- ``message`` - str: The email message (Default: Please see the attached parsedmarc report.)
- ``s3``
- ``bucket`` - str: The S3 bucket name
- ``path`` - int: The path to upload reports to (Default: /)
.. warning::
+77 -26
View File
@@ -8,6 +8,7 @@ import shutil
import xml.parsers.expat as expat
import json
from datetime import datetime
from time import sleep
from collections import OrderedDict
from io import BytesIO, StringIO
from gzip import GzipFile
@@ -35,7 +36,7 @@ from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
from parsedmarc.utils import parse_email
__version__ = "6.12.0"
__version__ = "7.0.1"
logging.basicConfig(
format='%(levelname)8s:%(filename)s:%(lineno)d:'
@@ -203,7 +204,7 @@ def _parse_report_record(record, offline=False, nameservers=None,
def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
timeout=2.0, parallel=False):
timeout=2.0, parallel=False, server=None):
"""Parses a DMARC XML report string and returns a consistent OrderedDict
Args:
@@ -213,6 +214,7 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
(Cloudflare's public DNS resolvers by default)
timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed aggregate DMARC report
@@ -305,8 +307,13 @@ def parse_aggregate_report_xml(xml, offline=False, nameservers=None,
new_report["policy_published"] = new_policy_published
if type(report["record"]) == list:
for record in report["record"]:
report_record = _parse_report_record(record,
for i in range(len(report["record"])):
if server is not None and i > 0 and i % 20 == 0:
logger.debug("Sending noop cmd")
server.noop()
logger.debug("Processed {0}/{1}".format(
i, len(report["record"])))
report_record = _parse_report_record(report["record"][i],
offline=offline,
nameservers=nameservers,
dns_timeout=timeout,
@@ -386,7 +393,8 @@ def extract_xml(input_):
def parse_aggregate_report_file(_input, offline=False, nameservers=None,
dns_timeout=2.0,
parallel=False):
parallel=False,
server=None):
"""Parses a file at the given path, a file-like object. or bytes as a
aggregate DMARC report
@@ -397,6 +405,7 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None,
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed DMARC aggregate report
@@ -407,7 +416,8 @@ def parse_aggregate_report_file(_input, offline=False, nameservers=None,
offline=offline,
nameservers=nameservers,
timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
def parsed_aggregate_reports_to_csv_rows(reports):
@@ -739,7 +749,7 @@ def parsed_forensic_reports_to_csv(reports):
def parse_report_email(input_, offline=False, nameservers=None,
dns_timeout=2.0, strip_attachment_payloads=False,
parallel=False):
parallel=False, server=None):
"""
Parses a DMARC report from an email
@@ -751,6 +761,7 @@ def parse_report_email(input_, offline=False, nameservers=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict:
@@ -777,6 +788,8 @@ def parse_report_email(input_, offline=False, nameservers=None,
subject = None
feedback_report = None
sample = None
if "From" in msg_headers:
logger.info("Parsing mail from {0}".format(msg_headers["From"]))
if "Subject" in msg_headers:
subject = msg_headers["Subject"]
for part in msg.walk():
@@ -814,7 +827,8 @@ def parse_report_email(input_, offline=False, nameservers=None,
offline=offline,
nameservers=ns,
dns_timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
result = OrderedDict([("report_type", "aggregate"),
("report", aggregate_report)])
return result
@@ -864,7 +878,7 @@ def parse_report_email(input_, offline=False, nameservers=None,
def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
strip_attachment_payloads=False,
offline=False, parallel=False):
offline=False, parallel=False, server=None):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -877,6 +891,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
forensic report results
offline (bool): Do not make online queries for geolocation or DNS
parallel (bool): Parallel processing
server (IMAPClient): Connection object
Returns:
OrderedDict: The parsed DMARC report
@@ -896,7 +911,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
parallel=parallel)
parallel=parallel,
server=server)
results = OrderedDict([("report_type", "aggregate"),
("report", report)])
except InvalidAggregateReport:
@@ -907,7 +923,8 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
parallel=parallel)
parallel=parallel,
server=server)
except InvalidDMARCReport:
raise InvalidDMARCReport("Not a valid aggregate or forensic "
"report")
@@ -944,7 +961,7 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0,
input_))
for i in range(len(message_keys)):
message_key = message_keys[i]
logger.debug("Processing message {0} of {1}".format(
logger.info("Processing message {0} of {1}".format(
i+1, total_messages
))
msg_content = mbox.get_string(message_key)
@@ -1005,7 +1022,8 @@ def get_dmarc_reports_from_inbox(connection=None,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None):
results=None,
batch_size=None):
"""
Fetches and parses DMARC reports from an inbox
@@ -1029,6 +1047,7 @@ def get_dmarc_reports_from_inbox(connection=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1071,11 +1090,18 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))
for i in range(len(messages)):
if batch_size:
message_limit = min(total_messages, batch_size)
else:
message_limit = total_messages
logger.debug("Processing {0} messages".format(message_limit))
for i in range(message_limit):
msg_uid = messages[i]
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, total_messages, msg_uid
i+1, message_limit, msg_uid
))
msg_content = server.fetch_message(msg_uid, parse=False)
sa = strip_attachment_payloads
@@ -1084,7 +1110,8 @@ def get_dmarc_reports_from_inbox(connection=None,
nameservers=nameservers,
dns_timeout=dns_timeout,
offline=offline,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
server=server)
if parsed_email["report_type"] == "aggregate":
aggregate_reports.append(parsed_email["report"])
aggregate_report_msg_uids.append(msg_uid)
@@ -1167,7 +1194,7 @@ def get_dmarc_reports_from_inbox(connection=None,
total_messages = len(server.search())
if not test and total_messages > 0:
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
results = get_dmarc_reports_from_inbox(
connection=server,
@@ -1189,7 +1216,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
verify=True, reports_folder="INBOX",
archive_folder="Archive", delete=False, test=False,
idle_timeout=30, offline=False, nameservers=None,
dns_timeout=6.0, strip_attachment_payloads=False):
dns_timeout=6.0, strip_attachment_payloads=False,
batch_size=None):
"""
Use an IDLE IMAP connection to parse incoming emails, and pass the results
to a callback function
@@ -1212,6 +1240,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
"""
sa = strip_attachment_payloads
@@ -1224,7 +1253,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
batch_size=batch_size)
callback(res)
while True:
@@ -1236,15 +1266,28 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
idle_timeout=idle_timeout)
except (timeout, IMAPClientError):
logger.warning("IMAP connection timeout. Reconnecting...")
sleep(5)
except Exception as e:
logger.warning("IMAP connection error. {0}. "
"Reconnecting...".format(e))
sleep(5)
def save_output(results, output_directory="output"):
def save_output(results, output_directory="output",
aggregate_json_filename="aggregate.json",
forensic_json_filename="forensic.json",
aggregate_csv_filename="aggregate.csv",
forensic_csv_filename="forensic.csv"):
"""
Save report data in the given directory
Args:
results (OrderedDict): Parsing results
output_directory: The patch to the directory to save in
output_directory (str): The patch to the directory to save in
aggregate_json_filename (str): Filename for the aggregate JSON file
forensic_json_filename (str): Filename for the forensic JSON file
aggregate_csv_filename (str): Filename for the aggregate CSV file
forensic_csv_filename (str): Filename for the forensic CSV file
"""
aggregate_reports = results["aggregate_reports"]
@@ -1256,22 +1299,30 @@ def save_output(results, output_directory="output"):
else:
os.makedirs(output_directory)
with open("{0}".format(os.path.join(output_directory, "aggregate.json")),
with open("{0}"
.format(os.path.join(output_directory,
aggregate_json_filename)),
"w", newline="\n", encoding="utf-8") as agg_json:
agg_json.write(json.dumps(aggregate_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "aggregate.csv")),
with open("{0}"
.format(os.path.join(output_directory,
aggregate_csv_filename)),
"w", newline="\n", encoding="utf-8") as agg_csv:
csv = parsed_aggregate_reports_to_csv(aggregate_reports)
agg_csv.write(csv)
with open("{0}".format(os.path.join(output_directory, "forensic.json")),
with open("{0}"
.format(os.path.join(output_directory,
forensic_json_filename)),
"w", newline="\n", encoding="utf-8") as for_json:
for_json.write(json.dumps(forensic_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "forensic.csv")),
with open("{0}"
.format(os.path.join(output_directory,
forensic_csv_filename)),
"w", newline="\n", encoding="utf-8") as for_csv:
csv = parsed_forensic_reports_to_csv(forensic_reports)
for_csv.write(csv)
+84 -5
View File
@@ -19,7 +19,7 @@ from tqdm import tqdm
from parsedmarc import get_dmarc_reports_from_inbox, watch_inbox, \
parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \
splunk, save_output, email_results, ParserError, __version__, \
InvalidDMARCReport
InvalidDMARCReport, s3
from parsedmarc.utils import is_mbox
logger = logging.getLogger("parsedmarc")
@@ -79,6 +79,14 @@ def _main():
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
if opts.s3_bucket:
try:
s3_client = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
@@ -104,6 +112,11 @@ def _main():
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
try:
if opts.s3_bucket:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.hec:
try:
aggregate_reports_ = reports_["aggregate_reports"]
@@ -138,6 +151,11 @@ def _main():
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
try:
if opts.s3_bucket:
s3_client.save_forensic_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
if opts.hec:
try:
forensic_reports_ = reports_["forensic_reports"]
@@ -160,6 +178,18 @@ def _main():
help=strip_attachment_help, action="store_true")
arg_parser.add_argument("-o", "--output",
help="write output files to the given directory")
arg_parser.add_argument("--aggregate-json-filename",
help="filename for the aggregate JSON output file",
default="aggregate.json")
arg_parser.add_argument("--forensic-json-filename",
help="filename for the forensic JSON output file",
default="forensic.json")
arg_parser.add_argument("--aggregate-csv-filename",
help="filename for the aggregate CSV output file",
default="aggregate.csv")
arg_parser.add_argument("--forensic-csv-filename",
help="filename for the forensic CSV output file",
default="forensic.csv")
arg_parser.add_argument("-n", "--nameservers", nargs="+",
help="nameservers to query")
arg_parser.add_argument("-t", "--dns_timeout",
@@ -185,11 +215,16 @@ def _main():
forensic_reports = []
args = arg_parser.parse_args()
opts = Namespace(file_path=args.file_path,
config_file=args.config_file,
offline=args.offline,
strip_attachment_payloads=args.strip_attachment_payloads,
output=args.output,
aggregate_csv_filename=args.aggregate_csv_filename,
aggregate_json_filename=args.aggregate_json_filename,
forensic_csv_filename=args.forensic_csv_filename,
forensic_json_filename=args.forensic_json_filename,
nameservers=args.nameservers,
silent=args.silent,
dns_timeout=args.dns_timeout,
@@ -210,6 +245,7 @@ def _main():
imap_watch=False,
imap_delete=False,
imap_test=False,
imap_batch_size=None,
hec=None,
hec_token=None,
hec_index=None,
@@ -241,6 +277,8 @@ def _main():
smtp_to=[],
smtp_subject="parsedmarc report",
smtp_message="Please see the attached DMARC results.",
s3_bucket=None,
s3_path=None,
log_file=args.log_file,
n_procs=1,
chunk_size=1
@@ -264,6 +302,18 @@ def _main():
"strip_attachment_payloads"]
if "output" in general_config:
opts.output = general_config["output"]
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config[
"aggregate_json_filename"]
if "forensic_json_filename" in general_config:
opts.forensic_json_filename = general_config[
"forensic_json_filename"]
if "aggregate_csv_filename" in general_config:
opts.aggregate_csv_filename = general_config[
"aggregate_csv_filename"]
if "forensic_csv_filename" in general_config:
opts.forensic_csv_filename = general_config[
"forensic_csv_filename"]
if "nameservers" in general_config:
opts.nameservers = _str_to_list(general_config["nameservers"])
if "dns_timeout" in general_config:
@@ -327,6 +377,10 @@ def _main():
opts.imap_delete = imap_config.getboolean("delete")
if "test" in imap_config:
opts.imap_test = imap_config.getboolean("test")
if "batch_size" in imap_config:
opts.imap_batch_size = imap_config.getint("batch_size")
else:
opts.imap_batch_size = None
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
@@ -469,6 +523,22 @@ def _main():
opts.smtp_attachment = smtp_config["attachment"]
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
if "s3" in config.sections():
s3_config = config["s3"]
if "bucket" in s3_config:
opts.s3_bucket = s3_config["bucket"]
else:
logger.critical("bucket setting missing from the "
"s3 config section")
exit(-1)
if "path" in s3_config:
opts.s3_path = s3_config["path"]
if opts.s3_path.startswith("/"):
opts.s3_path = opts.s3_path[1:]
if opts.s3_path.endswith("/"):
opts.s3_path = opts.s3_path[:-1]
else:
opts.s3_path = ""
logging.basicConfig(level=logging.WARNING)
logger.setLevel(logging.WARNING)
@@ -490,6 +560,8 @@ def _main():
logger.error("You must supply input files, or an IMAP configuration")
exit(1)
logger.info("Starting dmarcparse")
if opts.save_aggregate or opts.save_forensic:
try:
if opts.elasticsearch_hosts:
@@ -613,8 +685,9 @@ def _main():
offline=opts.offline,
nameservers=ns,
test=opts.imap_test,
strip_attachment_payloads=sa
)
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
@@ -627,7 +700,11 @@ def _main():
("forensic_reports", forensic_reports)])
if opts.output:
save_output(results, output_directory=opts.output)
save_output(results, output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
aggregate_csv_filename=opts.aggregate_csv_filename,
forensic_csv_filename=opts.forensic_csv_filename)
process_reports(results)
@@ -670,7 +747,9 @@ def _main():
test=opts.imap_test,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=sa)
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size,
offline=opts.offline)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
+6 -4
View File
@@ -295,14 +295,16 @@ def save_aggregate_report_to_elasticsearch(aggregate_report,
Raises:
AlreadySaved
"""
logger.debug("Saving aggregate report to Elasticsearch")
logger.info("Saving aggregate report to Elasticsearch")
aggregate_report = aggregate_report.copy()
metadata = aggregate_report["report_metadata"]
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
begin_date = human_timestamp_to_datetime(metadata["begin_date"],
to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"],
to_utc=True)
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
@@ -426,7 +428,7 @@ def save_forensic_report_to_elasticsearch(forensic_report,
AlreadySaved
"""
logger.debug("Saving forensic report to Elasticsearch")
logger.info("Saving forensic report to Elasticsearch")
forensic_report = forensic_report.copy()
sample_date = None
if forensic_report["parsed_sample"]["date"] is not None:
+68
View File
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import logging
import json
import boto3
from parsedmarc.utils import human_timestamp_to_datetime
logger = logging.getLogger("parsedmarc")
class S3Client(object):
"""A client for a Amazon S3"""
def __init__(self, bucket_name, bucket_path):
"""
Initializes the S3Client
Args:
bucket_name (str): The S3 Bucket
bucket_path (str): The path to save reports
"""
self.bucket_name = bucket_name
self.bucket_path = bucket_path
self.metadata_keys = [
"org_name",
"org_email",
"report_id",
"begin_date",
"end_date",
]
self.s3 = boto3.resource('s3')
self.bucket = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report):
self.save_report_to_s3(report, 'aggregate')
def save_forensic_report_to_s3(self, report):
self.save_report_to_s3(report, 'forensic')
def save_report_to_s3(self, report, report_type):
report_date = human_timestamp_to_datetime(
report["report_metadata"]["begin_date"]
)
report_id = report["report_metadata"]["report_id"]
path_template = "{0}/{1}/year={2}/month={3:02d}/day={4:02d}/{5}.json"
object_path = path_template.format(
self.bucket_path,
report_type,
report_date.year,
report_date.month,
report_date.day,
report_id
)
logger.debug("Saving {0} report to s3://{1}/{2}".format(
report_type,
self.bucket_name,
object_path))
object_metadata = {
k: v
for k, v in report["report_metadata"].items()
if k in self.metadata_keys
}
self.bucket.put_object(
Body=json.dumps(report),
Key=object_path,
Metadata=object_metadata
)
+2 -2
View File
@@ -157,7 +157,7 @@ def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0):
if record_type == "TXT":
resource_records = list(map(
lambda r: r.strings,
resolver.query(domain, record_type, lifetime=timeout)))
resolver.resolve(domain, record_type, lifetime=timeout)))
_resource_record = [
resource_record[0][:0].join(resource_record)
for resource_record in resource_records if resource_record]
@@ -165,7 +165,7 @@ def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0):
else:
records = list(map(
lambda r: r.to_text().replace('"', '').rstrip("."),
resolver.query(domain, record_type, lifetime=timeout)))
resolver.resolve(domain, record_type, lifetime=timeout)))
if cache:
cache[cache_key] = records
+1
View File
@@ -27,3 +27,4 @@ sphinx_rtd_theme>=0.4.3
wheel>=0.33.6
codecov>=2.0.15
lxml>=4.4.0
boto3>=1.16.63
+2 -1
View File
@@ -98,7 +98,8 @@ setup(
'elasticsearch-dsl>=7.2.0,<8.0.0',
'kafka-python>=1.4.4',
'tqdm>=4.31.1',
'lxml>=4.4.0'
'lxml>=4.4.0',
'boto3>=1.16.63'
],
entry_points={