Merge pull request #14 from domainaware/4.0

4.0
This commit is contained in:
Sean Whalen
2018-09-26 13:15:59 -04:00
committed by GitHub
7 changed files with 503 additions and 163 deletions

View File

@@ -1,3 +1,20 @@
4.0.0
-----
- Add support for sending DMARC reports to a Splunk HTTP Events
Collector (HEC)
- Use a browser-like `User-Agent` when downloading the Public Suffix List and
GeoIP DB to avoid being blocked by security proxies
- Reduce default DNS timeout to 2.0 seconds
- Add alignment booleans to JSON output
- Fix `.msg` parsing CLI exception when `msgconvert` is not found in the
system path
- Add `--outgoing-port` and `--outgoing-ssl` options
- Fall back to plain text SMTP if `--outgoing-ssl` is not used and `STARTTLS`
is not supported by the server
- Always use `\n` as the newline when generating CSVs
- Workaround for random Exchange/Office365 `Server Unavailable` IMAP errors
3.9.7
-----
@@ -24,7 +41,7 @@ downloads
3.9.3
-----
- Fix crash when forensic recorts are missing `Arrival-Date`
- Fix crash when forensic reports are missing `Arrival-Date`
3.9.2
-----

View File

@@ -38,16 +38,45 @@ CLI help
::
usage: parsedmarc [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t TIMEOUT] [-H HOST] [-u USER] [-p PASSWORD]
[-r REPORTS_FOLDER] [-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]]
[--save-aggregate] [--save-forensic] [-O OUTGOING_HOST]
[-U OUTGOING_USER] [-P OUTGOING_PASSWORD] [-F OUTGOING_FROM]
[-T OUTGOING_TO [OUTGOING_TO ...]] [-S OUTGOING_SUBJECT]
[-A OUTGOING_ATTACHMENT] [-M OUTGOING_MESSAGE] [-w] [--test]
[-s] [--debug] [-v]
[file_path [file_path ...]]
usage: cli.py [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] [-t TIMEOUT]
[-H HOST] [-u USER] [-p PASSWORD] [-r REPORTS_FOLDER]
[-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]] [--hec HEC]
[--hec-key HEC_KEY] [--hec-index HEC_INDEX] [--save-aggregate]
[--save-forensic] [-O OUTGOING_HOST] [-U OUTGOING_USER]
[-P OUTGOING_PASSWORD] [--outgoing-port OUTGOING_PORT]
[--outgoing-SSL OUTGOING_SSL] [-F OUTGOING_FROM]
[-T OUTGOING_TO [OUTGOING_TO ...]] [-S OUTGOING_SUBJECT]
[-A OUTGOING_ATTACHMENT] [-M OUTGOING_MESSAGE] [-w] [--test]
[-s] [--debug] [-v]
[file_path [file_path ...]]
usage: cli.py [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] [-t TIMEOUT]
[-H HOST] [-u USER] [-p PASSWORD] [-r REPORTS_FOLDER]
[-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]] [--hec HEC]
[--hec-token HEC_TOKEN] [--hec-index HEC_INDEX]
[--save-aggregate] [--save-forensic] [-O OUTGOING_HOST]
[-U OUTGOING_USER] [-P OUTGOING_PASSWORD]
[--outgoing-port OUTGOING_PORT] [--outgoing-SSL OUTGOING_SSL]
[-F OUTGOING_FROM] [-T OUTGOING_TO [OUTGOING_TO ...]]
[-S OUTGOING_SUBJECT] [-A OUTGOING_ATTACHMENT]
[-M OUTGOING_MESSAGE] [-w] [--test] [-s] [--debug] [-v]
[file_path [file_path ...]]
usage: cli.py [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] [-t TIMEOUT]
[-H HOST] [-u USER] [-p PASSWORD] [-r REPORTS_FOLDER]
[-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]] [--hec HEC]
[--hec-token HEC_TOKEN] [--hec-index HEC_INDEX]
[--hec-skip-certificate-verification] [--save-aggregate]
[--save-forensic] [-O OUTGOING_HOST] [-U OUTGOING_USER]
[-P OUTGOING_PASSWORD] [--outgoing-port OUTGOING_PORT]
[--outgoing-SSL OUTGOING_SSL] [-F OUTGOING_FROM]
[-T OUTGOING_TO [OUTGOING_TO ...]] [-S OUTGOING_SUBJECT]
[-A OUTGOING_ATTACHMENT] [-M OUTGOING_MESSAGE] [-w] [--test]
[-s] [--debug] [-v]
[file_path [file_path ...]]
Parses DMARC reports
@@ -55,52 +84,66 @@ CLI help
file_path one or more paths to aggregate or forensic report
files or emails
optional arguments:
-h, --help show this help message and exit
-o OUTPUT, --output OUTPUT
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query (Default 8.8.8.8 4.4.4.4)
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 6.0)
-H HOST, --host HOST IMAP hostname or IP address
-u USER, --user USER IMAP user
-p PASSWORD, --password PASSWORD
IMAP password
-r REPORTS_FOLDER, --reports-folder REPORTS_FOLDER
The IMAP folder containing the reports Default: INBOX
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them Default: Archive
-d, --delete Delete the reports after processing them
-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]], --elasticsearch-host [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]
A list of one or more Elasticsearch hostnames or URLs
to use (Default localhost:9200)
--save-aggregate Save aggregate reports to Elasticsearch
--save-forensic Save forensic reports to Elasticsearch
-O OUTGOING_HOST, --outgoing-host OUTGOING_HOST
Email the results using this host
-U OUTGOING_USER, --outgoing-user OUTGOING_USER
Email the results using this user
-P OUTGOING_PASSWORD, --outgoing-password OUTGOING_PASSWORD
Email the results using this password
-F OUTGOING_FROM, --outgoing-from OUTGOING_FROM
Email the results using this from address
-T OUTGOING_TO [OUTGOING_TO ...], --outgoing-to OUTGOING_TO [OUTGOING_TO ...]
Email the results to these addresses
-S OUTGOING_SUBJECT, --outgoing-subject OUTGOING_SUBJECT
Email the results using this subject
-A OUTGOING_ATTACHMENT, --outgoing-attachment OUTGOING_ATTACHMENT
Email the results using this filename
-M OUTGOING_MESSAGE, --outgoing-message OUTGOING_MESSAGE
Email the results using this message
-w, --watch Use an IMAP IDLE connection to process reports as they
arrive in the inbox
--test Do not move or delete IMAP messages
-s, --silent Only print errors
--debug Print debugging information
-v, --version show program's version number and exit
optional arguments:
-h, --help show this help message and exit
-o OUTPUT, --output OUTPUT
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query (Default is Cloudflare's)
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 2.0)
-H HOST, --host HOST IMAP hostname or IP address
-u USER, --user USER IMAP user
-p PASSWORD, --password PASSWORD
IMAP password
-r REPORTS_FOLDER, --reports-folder REPORTS_FOLDER
The IMAP folder containing the reports Default: INBOX
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them Default: Archive
-d, --delete Delete the reports after processing them
-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]], --elasticsearch-host [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]
A list of one or more Elasticsearch hostnames or URLs
to use (e.g. localhost:9200)
--hec HEC URL to a Splunk HTTP Event Collector (HEC)
--hec-token HEC_TOKEN
The authorization token for a Splunk HTTP event
collector (HEC)
--hec-index HEC_INDEX
The index to use when sending events to the Splunk
HTTP Events
--hec-skip-certificate-verification
Skip certificate verification for Splunk HEC
--save-aggregate Save aggregate reports to search indexes
--save-forensic Save forensic reports to search indexes
-O OUTGOING_HOST, --outgoing-host OUTGOING_HOST
Email the results using this host
-U OUTGOING_USER, --outgoing-user OUTGOING_USER
Email the results using this user
-P OUTGOING_PASSWORD, --outgoing-password OUTGOING_PASSWORD
Email the results using this password
--outgoing-port OUTGOING_PORT
Email the results using this port
--outgoing-ssl OUTGOING_SSL
Use SSL/TLS instead of STARTTLS (more secure, and
required by some providers, like Gmail)
-F OUTGOING_FROM, --outgoing-from OUTGOING_FROM
Email the results using this from address
-T OUTGOING_TO [OUTGOING_TO ...], --outgoing-to OUTGOING_TO [OUTGOING_TO ...]
Email the results to these addresses
-S OUTGOING_SUBJECT, --outgoing-subject OUTGOING_SUBJECT
Email the results using this subject
-A OUTGOING_ATTACHMENT, --outgoing-attachment OUTGOING_ATTACHMENT
Email the results using this filename
-M OUTGOING_MESSAGE, --outgoing-message OUTGOING_MESSAGE
Email the results using this message
-w, --watch Use an IMAP IDLE connection to process reports as they
arrive in the inbox
--test Do not move or delete IMAP messages
-s, --silent Only print errors
--debug Print debugging information
-v, --version show program's version number and exit
SPF and DMARC record validation
===============================
@@ -154,6 +197,11 @@ JSON
"base_domain": "bellsouth.net"
},
"count": 2,
"alignment": {
"spf": true,
"dkim": false,
"dmarc": true
},
"policy_evaluated": {
"disposition": "none",
"dkim": "fail",

View File

@@ -45,69 +45,88 @@ CLI help
::
usage: parsedmarc [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t TIMEOUT] [-H HOST] [-u USER] [-p PASSWORD]
[-r REPORTS_FOLDER] [-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]]
[--save-aggregate] [--save-forensic] [-O OUTGOING_HOST]
[-U OUTGOING_USER] [-P OUTGOING_PASSWORD] [-F OUTGOING_FROM]
[-T OUTGOING_TO [OUTGOING_TO ...]] [-S OUTGOING_SUBJECT]
[-A OUTGOING_ATTACHMENT] [-M OUTGOING_MESSAGE] [-w] [--test]
[-s] [--debug] [-v]
[file_path [file_path ...]]
usage: cli.py [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]] [-t TIMEOUT]
[-H HOST] [-u USER] [-p PASSWORD] [-r REPORTS_FOLDER]
[-a ARCHIVE_FOLDER] [-d]
[-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]] [--hec HEC]
[--hec-token HEC_TOKEN] [--hec-index HEC_INDEX]
[--hec-skip-certificate-verification] [--save-aggregate]
[--save-forensic] [-O OUTGOING_HOST] [-U OUTGOING_USER]
[-P OUTGOING_PASSWORD] [--outgoing-port OUTGOING_PORT]
[--outgoing-SSL OUTGOING_SSL] [-F OUTGOING_FROM]
[-T OUTGOING_TO [OUTGOING_TO ...]] [-S OUTGOING_SUBJECT]
[-A OUTGOING_ATTACHMENT] [-M OUTGOING_MESSAGE] [-w] [--test]
[-s] [--debug] [-v]
[file_path [file_path ...]]
Parses DMARC reports
Parses DMARC reports
positional arguments:
file_path one or more paths to aggregate or forensic report
files or emails
optional arguments:
-h, --help show this help message and exit
-o OUTPUT, --output OUTPUT
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query (Default is Cloudflare's)
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 2.0)
-H HOST, --host HOST IMAP hostname or IP address
-u USER, --user USER IMAP user
-p PASSWORD, --password PASSWORD
IMAP password
-r REPORTS_FOLDER, --reports-folder REPORTS_FOLDER
The IMAP folder containing the reports Default: INBOX
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them Default: Archive
-d, --delete Delete the reports after processing them
-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]], --elasticsearch-host [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]
A list of one or more Elasticsearch hostnames or URLs
to use (e.g. localhost:9200)
--hec HEC URL to a Splunk HTTP Event Collector (HEC)
--hec-token HEC_TOKEN
The authorization token for a Splunk HTTP event
collector (HEC)
--hec-index HEC_INDEX
The index to use when sending events to the Splunk
HTTP Events
--hec-skip-certificate-verification
Skip certificate verification for Splunk HEC
--save-aggregate Save aggregate reports to search indexes
--save-forensic Save forensic reports to search indexes
-O OUTGOING_HOST, --outgoing-host OUTGOING_HOST
Email the results using this host
-U OUTGOING_USER, --outgoing-user OUTGOING_USER
Email the results using this user
-P OUTGOING_PASSWORD, --outgoing-password OUTGOING_PASSWORD
Email the results using this password
--outgoing-port OUTGOING_PORT
Email the results using this port
--outgoing-ssl OUTGOING_SSL
Use SSL/TLS instead of STARTTLS (more secure, and
required by some providers, like Gmail)
-F OUTGOING_FROM, --outgoing-from OUTGOING_FROM
Email the results using this from address
-T OUTGOING_TO [OUTGOING_TO ...], --outgoing-to OUTGOING_TO [OUTGOING_TO ...]
Email the results to these addresses
-S OUTGOING_SUBJECT, --outgoing-subject OUTGOING_SUBJECT
Email the results using this subject
-A OUTGOING_ATTACHMENT, --outgoing-attachment OUTGOING_ATTACHMENT
Email the results using this filename
-M OUTGOING_MESSAGE, --outgoing-message OUTGOING_MESSAGE
Email the results using this message
-w, --watch Use an IMAP IDLE connection to process reports as they
arrive in the inbox
--test Do not move or delete IMAP messages
-s, --silent Only print errors
--debug Print debugging information
-v, --version show program's version number and exit
positional arguments:
file_path one or more paths to aggregate or forensic report
files or emails
optional arguments:
-h, --help show this help message and exit
-o OUTPUT, --output OUTPUT
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query ((Default is Cloudflare's)
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 6.0)
-H HOST, --host HOST IMAP hostname or IP address
-u USER, --user USER IMAP user
-p PASSWORD, --password PASSWORD
IMAP password
-r REPORTS_FOLDER, --reports-folder REPORTS_FOLDER
The IMAP folder containing the reports Default: INBOX
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them Default: Archive
-d, --delete Delete the reports after processing them
-E [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]], --elasticsearch-host [ELASTICSEARCH_HOST [ELASTICSEARCH_HOST ...]]
A list of one or more Elasticsearch hostnames or URLs
to use (Default localhost:9200)
--save-aggregate Save aggregate reports to Elasticsearch
--save-forensic Save forensic reports to Elasticsearch
-O OUTGOING_HOST, --outgoing-host OUTGOING_HOST
Email the results using this host
-U OUTGOING_USER, --outgoing-user OUTGOING_USER
Email the results using this user
-P OUTGOING_PASSWORD, --outgoing-password OUTGOING_PASSWORD
Email the results using this password
-F OUTGOING_FROM, --outgoing-from OUTGOING_FROM
Email the results using this from address
-T OUTGOING_TO [OUTGOING_TO ...], --outgoing-to OUTGOING_TO [OUTGOING_TO ...]
Email the results to these addresses
-S OUTGOING_SUBJECT, --outgoing-subject OUTGOING_SUBJECT
Email the results using this subject
-A OUTGOING_ATTACHMENT, --outgoing-attachment OUTGOING_ATTACHMENT
Email the results using this filename
-M OUTGOING_MESSAGE, --outgoing-message OUTGOING_MESSAGE
Email the results using this message
-w, --watch Use an IMAP IDLE connection to process reports as they
arrive in the inbox
--test Do not move or delete IMAP messages
-s, --silent Only print errors
--debug Print debugging information
-v, --version show program's version number and exit
SPF and DMARC record validation
===============================
@@ -162,6 +181,11 @@ JSON
"base_domain": "bellsouth.net"
},
"count": 2,
"alignment": {
"spf": true,
"dkim": false,
"dmarc": true
},
"policy_evaluated": {
"disposition": "none",
"dkim": "fail",
@@ -622,12 +646,6 @@ Then, enable the service
You must also run the above commands whenever you edit
``parsedmarc.service``.
Use this command to check the status of the service:
.. code-block:: bash
sudo service parsedmarc status
.. warning::
Always restart the service every time you upgrade to a new version of
@@ -637,6 +655,23 @@ Use this command to check the status of the service:
sudo service parsedmarc restart
To check the status of the service, run:
.. code-block:: bash
service parsedmarc status
.. note::
In the event of a crash, systemd will restart the service after 10 minutes,
but the `service parsedmarc status` command will only show the logs for the
current process. To vew the logs for previous runs as well as the
current process (newest to oldest), run:
.. code-block:: bash
journalctl -u parsedmarc.service -r
Using the Kibana dashboards
===========================

View File

@@ -4,6 +4,7 @@
import logging
import os
import platform
import xml.parsers.expat as expat
import json
from datetime import datetime
@@ -30,12 +31,12 @@ import smtplib
import ssl
import time
import requests
import publicsuffix
import xmltodict
import dns.reversename
import dns.resolver
import dns.exception
from requests import get
import geoip2.database
import geoip2.errors
import imapclient
@@ -43,7 +44,7 @@ import imapclient.exceptions
import dateparser
import mailparser
__version__ = "3.9.7"
__version__ = "4.0.0"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -55,6 +56,13 @@ MAGIC_GZIP = b"\x1F\x8B"
MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
USER_AGENT = "Mozilla/5.0 ((0 {1})) parsedmarc/{2}".format(
platform.system(),
platform.release(),
__version__
)
class ParserError(RuntimeError):
"""Raised whenever the parser fails for some reason"""
@@ -100,7 +108,10 @@ def _get_base_domain(domain):
psl_path = ".public_suffix_list.dat"
def download_psl():
fresh_psl = publicsuffix.fetch().read()
url = "https://publicsuffix.org/list/public_suffix_list.dat"
# Use a browser-like user agent string to bypass some proxy blocks
headers = {"User-Agent": USER_AGENT}
fresh_psl = requests.get(url, headers=headers).text
with open(psl_path, "w", encoding="utf-8") as fresh_psl_file:
fresh_psl_file.write(fresh_psl)
@@ -121,7 +132,7 @@ def _get_base_domain(domain):
return psl.get_public_suffix(domain)
def _query_dns(domain, record_type, nameservers=None, timeout=6.0):
def _query_dns(domain, record_type, nameservers=None, timeout=2.0):
"""
Queries DNS
@@ -149,7 +160,7 @@ def _query_dns(domain, record_type, nameservers=None, timeout=6.0):
resolver.query(domain, record_type, tcp=True)))
def _get_reverse_dns(ip_address, nameservers=None, timeout=6.0):
def _get_reverse_dns(ip_address, nameservers=None, timeout=2.0):
"""
Resolves an IP address to a hostname using a reverse DNS query
@@ -214,6 +225,19 @@ def human_timestamp_to_datetime(human_timestamp):
return datetime.strptime(human_timestamp, "%Y-%m-%d %H:%M:%S")
def human_timestamp_to_timestamp(human_timestamp):
"""
Converts a human-readable timestamp into a into a UNIX timestamp
Args:
human_timestamp (str): A timestamp in `YYYY-MM-DD HH:MM:SS`` format
Returns:
float: The converted timestamp
"""
return human_timestamp_to_datetime(human_timestamp).timestamp()
def _get_ip_address_country(ip_address):
"""
Uses the MaxMind Geolite2 Country database to return the ISO code for the
@@ -235,8 +259,11 @@ def _get_ip_address_country(ip_address):
"""
url = "https://geolite.maxmind.com/download/geoip/database/" \
"GeoLite2-Country.tar.gz"
# Use a browser-like user agent string to bypass some proxy blocks
headers = {"User-Agent": USER_AGENT}
original_filename = "GeoLite2-Country.mmdb"
tar_file = tarfile.open(fileobj=BytesIO(get(url).content), mode="r:gz")
tar_bytes = requests.get(url, headers=headers).content
tar_file = tarfile.open(fileobj=BytesIO(tar_bytes), mode="r:gz")
tar_dir = tar_file.getnames()[0]
tar_path = "{0}/{1}".format(tar_dir, original_filename)
tar_file.extract(tar_path)
@@ -274,7 +301,7 @@ def _get_ip_address_country(ip_address):
return country
def _get_ip_address_info(ip_address, nameservers=None, timeout=6.0):
def _get_ip_address_info(ip_address, nameservers=None, timeout=2.0):
"""
Returns reverse DNS and country information for the given IP address
@@ -305,7 +332,7 @@ def _get_ip_address_info(ip_address, nameservers=None, timeout=6.0):
return info
def _parse_report_record(record, nameservers=None, timeout=6.0):
def _parse_report_record(record, nameservers=None, timeout=2.0):
"""
Converts a record from a DMARC aggregate report into a more consistent
format
@@ -340,6 +367,13 @@ def _parse_report_record(record, nameservers=None, timeout=6.0):
if "spf" in policy_evaluated:
new_policy_evaluated["spf"] = policy_evaluated["spf"]
reasons = []
spf_aligned = policy_evaluated["spf"] == "pass"
dkim_aligned = policy_evaluated["dkim"] == "pass"
dmarc_aligned = spf_aligned or dkim_aligned
new_record["alignment"] = dict()
new_record["alignment"]["spf"] = spf_aligned
new_record["alignment"]["dkim"] = dkim_aligned
new_record["alignment"]["dmarc"] = dmarc_aligned
if "reason" in policy_evaluated:
if type(policy_evaluated["reason"]) == list:
reasons = policy_evaluated["reason"]
@@ -408,7 +442,7 @@ def _parse_report_record(record, nameservers=None, timeout=6.0):
return new_record
def parse_aggregate_report_xml(xml, nameservers=None, timeout=6.0):
def parse_aggregate_report_xml(xml, nameservers=None, timeout=2.0):
"""Parses a DMARC XML report string and returns a consistent OrderedDict
Args:
@@ -558,7 +592,7 @@ def extract_xml(input_):
return xml
def parse_aggregate_report_file(_input, nameservers=None, timeout=6.0):
def parse_aggregate_report_file(_input, nameservers=None, timeout=2.0):
"""Parses a file at the given path, a file-like object. or bytes as a
aggregate DMARC report
@@ -603,7 +637,7 @@ def parsed_aggregate_reports_to_csv(reports):
"envelope_to", "dkim_domains", "dkim_selectors", "dkim_results",
"spf_domains", "spf_scopes", "spf_results"]
csv_file_object = StringIO()
csv_file_object = StringIO(newline="\n")
writer = DictWriter(csv_file_object, fields)
writer.writeheader()
@@ -687,7 +721,7 @@ def parsed_aggregate_reports_to_csv(reports):
def parse_forensic_report(feedback_report, sample, sample_headers_only,
nameservers=None, timeout=6.0):
nameservers=None, timeout=2.0):
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
@@ -894,7 +928,7 @@ def parsed_forensic_reports_to_csv(reports):
return csv_file.getvalue()
def parse_report_email(input_, nameservers=None, timeout=6.0):
def parse_report_email(input_, nameservers=None, timeout=2.0):
"""
Parses a DMARC report from an email
@@ -937,10 +971,7 @@ def parse_report_email(input_, nameservers=None, timeout=6.0):
with open(eml_path, "rb") as eml_file:
rfc822 = eml_file.read()
except FileNotFoundError:
raise FileNotFoundError(
"msgconvert not found. Please ensure it is installed\n"
"sudo apt install libemail-outlook-message-perl\n"
"https://github.com/mvz/email-outlook-message-perl")
raise ParserError("msgconvert utility not found")
finally:
os.chdir(orig_dir)
shutil.rmtree(tmp_dir)
@@ -1048,7 +1079,7 @@ def parse_report_email(input_, nameservers=None, timeout=6.0):
return result
def parse_report_file(input_, nameservers=None, timeout=6.0):
def parse_report_file(input_, nameservers=None, timeout=2.0):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -1369,7 +1400,7 @@ def get_report_zip(results):
return storage.getvalue()
def email_results(results, host, mail_from, mail_to, port=0, starttls=True,
def email_results(results, host, mail_from, mail_to, port=0,
use_ssl=False, user=None, password=None, subject=None,
attachment_filename=None, message=None, ssl_context=None):
"""
@@ -1381,7 +1412,6 @@ def email_results(results, host, mail_from, mail_to, port=0, starttls=True,
mail_from: The value of the message from header
mail_to : A list of addresses to mail to
port (int): Port to use
starttls (bool): use STARTTLS
use_ssl (bool): Require a SSL connection from the start
user: An optional username
password: An optional password
@@ -1420,13 +1450,18 @@ def email_results(results, host, mail_from, mail_to, port=0, starttls=True,
ssl_context = ssl.create_default_context()
if use_ssl:
server = smtplib.SMTP_SSL(host, port=port, context=ssl_context)
server.helo()
server.connect(host, port)
server.ehlo_or_helo_if_needed()
else:
server = smtplib.SMTP(host, port=port)
server.ehlo()
if starttls:
server.connect(host, port)
server.ehlo_or_helo_if_needed()
if server.has_extn("starttls"):
server.starttls(context=ssl_context)
server.helo()
server.ehlo()
else:
logger.warning("SMTP server does not support STARTTLS. "
"Proceeding in plain text!")
if user and password:
server.login(user, password)
server.sendmail(mail_from, mail_to, msg.as_string())
@@ -1489,8 +1524,28 @@ def watch_inbox(host, username, password, callback, reports_folder="INBOX",
server.idle()
except imapclient.exceptions.IMAPClientError as error:
error = error.__str__().lstrip("b'").rstrip("'").rstrip(".")
raise IMAPError(error)
error = error.__str__().replace("b'", "").replace("'", "")
# Workaround for random Exchange/Office365 IMAP errors
if "Server Unavailable. 15" in error:
logger.debug("IMAP error: {0}".format(error))
logger.debug("Reconnecting watcher")
server = imapclient.IMAPClient(host)
server.login(username, password)
server.select_folder(rf)
idle_start_time = time.monotonic()
ms = "MOVE" in get_imap_capabilities(server)
res = get_dmarc_reports_from_inbox(connection=server,
move_supported=ms,
reports_folder=rf,
archive_folder=af,
delete=delete,
test=test,
nameservers=ns,
dns_timeout=dt)
callback(res)
server.idle()
else:
raise IMAPError(error)
except socket.gaierror:
raise IMAPError("DNS resolution failed")
except ConnectionRefusedError:
@@ -1509,6 +1564,7 @@ def watch_inbox(host, username, password, callback, reports_folder="INBOX",
logger.debug("IMAP error: Broken pipe")
logger.debug("Reconnecting watcher")
server = imapclient.IMAPClient(host)
server.login(username, password)
server.select_folder(rf)
idle_start_time = time.monotonic()
ms = "MOVE" in get_imap_capabilities(server)
@@ -1569,6 +1625,7 @@ def watch_inbox(host, username, password, callback, reports_folder="INBOX",
logger.debug("IMAP error: Broken pipe")
logger.debug("Reconnecting watcher")
server = imapclient.IMAPClient(host)
server.login(username, password)
server.select_folder(rf)
idle_start_time = time.monotonic()
res = get_dmarc_reports_from_inbox(connection=server,

View File

@@ -13,8 +13,8 @@ import json
from elasticsearch.exceptions import ElasticsearchException
from parsedmarc import logger, IMAPError, get_dmarc_reports_from_inbox, \
parse_report_file, elastic, save_output, watch_inbox, email_results, \
SMTPError, ParserError, __version__
parse_report_file, elastic, splunk, save_output, watch_inbox, \
email_results, SMTPError, ParserError, __version__
def _main():
@@ -28,22 +28,38 @@ def _main():
if args.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
elastic.save_aggregate_report_to_elasticsearch(report)
if args.elasticsearch_host:
elastic.save_aggregate_report_to_elasticsearch(report)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except ElasticsearchException as error_:
logger.error("Elasticsearch Error: {0}".format(
error_.__str__()))
exit(1)
if args.hec:
try:
aggregate_reports_ = reports_["aggregate_reports"]
hec_client.save_aggregate_reports_to_splunk(
aggregate_reports_)
except splunk.SplunkError as e:
logger.error("Splunk HEC error: {0}".format(e.__str__()))
if args.save_forensic:
for report in reports_["forensic_reports"]:
try:
elastic.save_forensic_report_to_elasticsearch(report)
if args.elasticsearch_host:
elastic.save_forensic_report_to_elasticsearch(report)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except ElasticsearchException as error_:
logger.error("Elasticsearch Error: {0}".format(
error_.__str__()))
if args.hec:
try:
forensic_reports_ = reports_["forensic_reports"]
hec_client.save_forensic_reports_to_splunk(
forensic_reports_)
except splunk.SplunkError as e:
logger.error("Splunk HEC error: {0}".format(e.__str__()))
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument("file_path", nargs="*",
@@ -56,7 +72,7 @@ def _main():
"(Default is Cloudflare's)")
arg_parser.add_argument("-t", "--timeout",
help="number of seconds to wait for an answer "
"from DNS (default 6.0)",
"from DNS (default 2.0)",
type=float,
default=6.0)
arg_parser.add_argument("-H", "--host", help="IMAP hostname or IP address")
@@ -76,21 +92,39 @@ def _main():
arg_parser.add_argument("-E", "--elasticsearch-host", nargs="*",
help="A list of one or more Elasticsearch "
"hostnames or URLs to use (Default "
"localhost:9200)",
default=["localhost:9200"])
"hostnames or URLs to use (e.g. "
"localhost:9200)")
arg_parser.add_argument("--hec", help="URL to a Splunk HTTP Event "
"Collector (HEC)")
arg_parser.add_argument("--hec-token", help="The authorization token for "
"a Splunk "
"HTTP event collector (HEC)")
arg_parser.add_argument("--hec-index", help="The index to use when "
"sending events to the "
"Splunk HTTP Events")
arg_parser.add_argument("--hec-skip-certificate-verification",
action="store_true",
default=False,
help="Skip certificate verification for Splunk "
"HEC")
arg_parser.add_argument("--save-aggregate", action="store_true",
default=False,
help="Save aggregate reports to Elasticsearch")
help="Save aggregate reports to search indexes")
arg_parser.add_argument("--save-forensic", action="store_true",
default=False,
help="Save forensic reports to Elasticsearch")
help="Save forensic reports to search indexes")
arg_parser.add_argument("-O", "--outgoing-host",
help="Email the results using this host")
arg_parser.add_argument("-U", "--outgoing-user",
help="Email the results using this user")
arg_parser.add_argument("-P", "--outgoing-password",
help="Email the results using this password")
arg_parser.add_argument("--outgoing-port",
help="Email the results using this port")
arg_parser.add_argument("--outgoing-ssl",
help="Use SSL/TLS instead of STARTTLS (more "
"secure, and required by some providers, "
"like Gmail)")
arg_parser.add_argument("-F", "--outgoing-from",
help="Email the results using this from address")
arg_parser.add_argument("-T", "--outgoing-to", nargs="+",
@@ -129,13 +163,29 @@ def _main():
exit(1)
if args.save_aggregate or args.save_forensic:
if args.elasticsearch_host is None and args.hec is None:
args.elasticsearch_host = ["localhost:9200"]
try:
elastic.set_hosts(args.elasticsearch_host)
elastic.create_indexes()
if args.elasticsearch_host:
elastic.set_hosts(args.elasticsearch_host)
elastic.create_indexes()
except ElasticsearchException as error:
logger.error("Elasticsearch Error: {0}".format(error.__str__()))
exit(1)
if args.hec:
if args.hec_token is None or args.hec_index is None:
logger.error("HEC token and HEC index are required when "
"using HEC URL")
exit(1)
verify = True
if args.hec_skip_certificate_verification:
verify = False
hec_client = splunk.HECClient(args.hec, args.hec_token,
args.hec_index,
verify=verify)
file_paths = []
for file_path in args.file_path:
file_paths += glob(file_path)
@@ -196,7 +246,8 @@ def _main():
try:
email_results(results, args.outgoing_host, args.outgoing_from,
args.outgoing_to, user=args.outgoing_user,
args.outgoing_to, use_ssl=args.outgoing_ssl,
user=args.outgoing_user,
password=args.outgoing_password,
subject=args.outgoing_subject)
except SMTPError as error:

132
parsedmarc/splunk.py Normal file
View File

@@ -0,0 +1,132 @@
from urllib.parse import urlparse
import socket
import json
import requests
from parsedmarc import __version__
class SplunkError(RuntimeError):
"""Raised when a Splunk API error occurs"""
class HECClient(object):
"""A client for a Splunk HTTP Events Collector (HEC)"""
# http://docs.splunk.com/Documentation/Splunk/latest/Data/AboutHEC
# http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
def __init__(self, url, access_token, index,
source="parsedmarc", verify=True):
"""
Initializes the HECClient
Args:
url (str): The URL of the HEC
access_token (str): The HEC access token
index (str): The name of the index
source (str): The source name
verify (bool): Verify SSL certificates
"""
url = urlparse(url)
self.url = "{0}://{1}/services/collector/event/1.0".format(url.scheme,
url.netloc)
self.access_token = access_token.lstrip("Splunk ")
self.index = index
self.host = socket.getfqdn()
self.source = source
self.session = requests.Session()
self.session.verify = verify
self._common_data = dict(host=self.host, source=self.source,
index=self.index)
self.session.headers = {
"User-Agent": "parsedmarc/{0}".format(__version__),
"Authorization": "Splunk {0}".format(self.access_token)
}
def save_aggregate_reports_to_splunk(self, aggregate_reports):
"""
Saves aggregate DMARC reports to Splunk
Args:
aggregate_reports: A list of aggregate report dictionaries
to save in Splunk
"""
if type(aggregate_reports) == dict:
aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1:
return
data = self._common_data.copy()
json_str = ""
for report in aggregate_reports:
for record in report["records"]:
new_report = dict()
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["policy_published"] = report["policy_published"]
new_report["source_ip_address"] = record["source"][
"ip_address"]
new_report["source_country"] = record["source"]["country"]
new_report["source_reverse_dns"] = record["source"][
"reverse_dns"]
new_report["source_base_domain"] = record["source"][
"base_domain"]
new_report["message_count"] = record["count"]
new_report["disposition"] = record["policy_evaluated"][
"disposition"
]
new_report["spf_aligned"] = record["alignment"]["spf"]
new_report["dkim_aligned"] = record["alignment"]["dkim"]
new_report["passed_dmarc"] = record["alignment"]["dmarc"]
new_report["header_from"] = record["identifiers"][
"header_from"]
new_report["envelope_from"] = record["identifiers"][
"envelope_from"]
if "dkim" in record["auth_results"]:
new_report["dkim_results"] = record["auth_results"][
"dkim"]
if "spf" in record["auth_results"]:
new_report["spf_results"] = record["auth_results"][
"spf"]
data["sourcetype"] = "dmarc:aggregate"
data["event"] = new_report.copy()
json_str += "{0}\n".format(json.dumps(data))
try:
response = self.session.post(self.url, data=json_str).json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])
def save_forensic_reports_to_splunk(self, forensic_reports):
"""
Saves forensic DMARC reports to Splunk
Args:
forensic_reports (list): A list of forensic report dictionaries
to save in Splunk
"""
if type(forensic_reports) == dict:
forensic_reports = [forensic_reports]
if len(forensic_reports) < 1:
return
json_str = ""
for report in forensic_reports:
data = self._common_data.copy()
data["sourcetype"] = "dmarc:forensic"
data["event"] = report.copy()
json_str += "{0}\n".format(json.dumps(data))
try:
response = self.session.post(self.url, data=json_str).json()
except Exception as e:
raise SplunkError(e.__str__())
if response["code"] != 0:
raise SplunkError(response["text"])

View File

@@ -14,7 +14,7 @@ from setuptools import setup
from codecs import open
from os import path
__version__ = "3.9.7"
__version__ = "4.0.0"
description = "A Python package and CLI for parsing aggregate and " \
"forensic DMARC reports"