This commit is contained in:
Sean Whalen
2018-03-04 11:22:24 -05:00
parent c8d7bc703e
commit 05d49222c6
7 changed files with 848 additions and 113 deletions

2
.gitignore vendored
View File

@@ -111,6 +111,8 @@ ENV/
*.json
*.csv
*.xls*
*.eml
*.msg
# LibreOffice lock files
.~*

View File

@@ -1,3 +1,15 @@
2.0.0
-----
### New features
- Parse forensic reports
- Parse reports from IMAP inbox
### Changes
- Command line output is always a JSON object containing the lists
`aggregate_reports` and `forensic_reports`
- `-o`/`--output` option is now a path to an output directory, instead of an
output file
1.0.5
-----
- Prefix public suffix and GeoIP2 database filenames with `.`

View File

@@ -4,15 +4,17 @@ parsedmarc
|Build Status|
``pasedmarc`` is a Python module and CLI utility for parsing aggregate DMARC reports.
``pasedmarc`` is a Python module and CLI utility for parsing DMARC reports.
Features
========
* Parses draft and 1.0 standard aggregate reports
* Parses forensic reports
* Can parse reports from an inbox over IMAP
* Transparently handles gzip or zip compressed reports
* Consistent data structures
* Simple JSON or CSV output
* Simple JSON and/or CSV output
* Python 2 and 3 support
CLI help
@@ -20,11 +22,12 @@ CLI help
::
usage: parsedmarc.py [-h] [-f FORMAT] [-o OUTPUT]
[-n NAMESERVER [NAMESERVER ...]] [-t TIMEOUT] [-v]
file_path [file_path ...]
usage: parsedmarc [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t TIMEOUT] [-H HOST] [-U USERNAME] [-p PASSWORD]
[-a ARCHIVE_FOLDER] [-d] [-i] [-T] [-v]
[file_path [file_path ...]]
Parses aggregate DMARC reports
Parses DMARC reports
positional arguments:
file_path one or more paths of aggregate report files
@@ -32,21 +35,29 @@ CLI help
optional arguments:
-h, --help show this help message and exit
-f FORMAT, --format FORMAT
specify JSON or CSV output format
-o OUTPUT, --output OUTPUT
output to a file path rather than printing to the
screen
-n NAMESERVER [NAMESERVER ...], --nameserver NAMESERVER [NAMESERVER ...]
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 6.0)
-H HOST, --host HOST IMAP hostname or IP address
-U USERNAME, --username USERNAME
IMAP username
-p PASSWORD, --password PASSWORD
IMAP password
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them (default: Archive)
-d, --delete Delete the reports after processing them
-i, --idle Use an IMAP IDLE connection to process reports as they
arrive in the inbox
-T, --test Do not move or delete IMAP messages
-v, --version show program's version number and exit
Sample output
=============
Sample aggregate report output
==============================
Here are the results from parsing the `example <https://dmarc.org/wiki/FAQ#I_need_to_implement_aggregate_reports.2C_what_do_they_look_like.3F>`_
report from the dmarc.org wiki. It's actually an older draft of the the 1.0
@@ -54,7 +65,8 @@ report schema standardized in
`RFC 7480 Appendix C <https://tools.ietf.org/html/rfc7489#appendix-C>`_.
This draft schema is still in wide use.
``parsedmarc`` produces consistent, normalized output, regardless of the report schema.
``parsedmarc`` produces consistent, normalized output, regardless of the report
schema.
JSON
----
@@ -129,16 +141,19 @@ CSV
xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,count,disposition,dkim_alignment,spf_alignment,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-27 20:00:00,2012-04-28 19:59:59,,example.com,r,r,none,none,100,0,72.150.241.94,US,adsl-72-150-241-94.shv.bellsouth.net,bellsouth.net,2,none,fail,pass,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
What about forensic DMARC reports?
==================================
Forensic DMARC reports are emails with an attached email sample that failed a
DMARC check. You can parse them with any email message parser, such as
`mail-parser <https://pypi.python.org/pypi/mail-parser/>`_.
Sample forensic report output
=============================
Very few recipients send forensic reports, and even those who do will often
provide only the message headers, and not the message's content, for privacy
reasons.
I don't have a sample I can share for privacy reasons. If you have a sample
forensic report that you can share publicly, please contact me!
Bug reports
===========
Please report bugs on the GitHub issue tracker
https://github.com/domainaware/parsedmarc/issues
Installation
============
@@ -152,9 +167,11 @@ On Debian or Ubuntu systems, run:
$ sudo apt-get install python3-pip
Python 3 installers for Windows and macOS can be found at https://www.python.org/downloads/
Python 3 installers for Windows and macOS can be found at
https://www.python.org/downloads/
To install or upgrade to the latest stable release of ``parsedmarc`` on macOS or Linux, run
To install or upgrade to the latest stable release of ``parsedmarc`` on
macOS or Linux, run
.. code-block:: bash
@@ -169,7 +186,20 @@ Or, install the latest development release directly from GitHub:
.. note::
On Windows, ``pip3`` is ``pip``, even with Python 3. So on Windows, simply
substitute ``pip`` as an administrator in place of ``sudo pip3``, in the above commands.
substitute ``pip`` as an administrator in place of ``sudo pip3``, in the
above commands.
Optional dependencies
---------------------
If you would like to be able to parse emails saved from Microsoft Outlook
(i.e. OLE .msg files), install ``msgconvert``:
On Debian or Ubuntu systems, run:
.. code-block:: bash
$ sudo apt-get install libemail-outlook-message-perl

View File

@@ -9,15 +9,17 @@ Welcome to parsedmarc's documentation!
|Build Status|
``pasedmarc`` is a Python module and CLI utility for parsing aggregate DMARC reports.
``pasedmarc`` is a Python module and CLI utility for parsing DMARC reports.
Features
========
* Parses draft and 1.0 standard aggregate reports
* Parses forensic reports
* Can parse reports from an inbox over IMAP
* Transparently handles gzip or zip compressed reports
* Consistent data structures
* Simple JSON or CSV output
* Simple JSON and/or CSV output
* Python 2 and 3 support
CLI help
@@ -25,11 +27,12 @@ CLI help
::
usage: parsedmarc.py [-h] [-f FORMAT] [-o OUTPUT]
[-n NAMESERVER [NAMESERVER ...]] [-t TIMEOUT] [-v]
file_path [file_path ...]
usage: parsedmarc [-h] [-o OUTPUT] [-n NAMESERVERS [NAMESERVERS ...]]
[-t TIMEOUT] [-H HOST] [-U USERNAME] [-p PASSWORD]
[-a ARCHIVE_FOLDER] [-d] [-i] [-T] [-v]
[file_path [file_path ...]]
Parses aggregate DMARC reports
Parses DMARC reports
positional arguments:
file_path one or more paths of aggregate report files
@@ -37,21 +40,29 @@ CLI help
optional arguments:
-h, --help show this help message and exit
-f FORMAT, --format FORMAT
specify JSON or CSV output format
-o OUTPUT, --output OUTPUT
output to a file path rather than printing to the
screen
-n NAMESERVER [NAMESERVER ...], --nameserver NAMESERVER [NAMESERVER ...]
Write output files to the given directory
-n NAMESERVERS [NAMESERVERS ...], --nameservers NAMESERVERS [NAMESERVERS ...]
nameservers to query
-t TIMEOUT, --timeout TIMEOUT
number of seconds to wait for an answer from DNS
(default 6.0)
-H HOST, --host HOST IMAP hostname or IP address
-U USERNAME, --username USERNAME
IMAP username
-p PASSWORD, --password PASSWORD
IMAP password
-a ARCHIVE_FOLDER, --archive-folder ARCHIVE_FOLDER
Specifies the IMAP folder to move messages to after
processing them (default: Archive)
-d, --delete Delete the reports after processing them
-i, --idle Use an IMAP IDLE connection to process reports as they
arrive in the inbox
-T, --test Do not move or delete IMAP messages
-v, --version show program's version number and exit
Sample output
=============
Sample aggregate report output
==============================
Here are the results from parsing the `example <https://dmarc.org/wiki/FAQ#I_need_to_implement_aggregate_reports.2C_what_do_they_look_like.3F>`_
report from the dmarc.org wiki. It's actually an older draft of the the 1.0
@@ -59,7 +70,8 @@ report schema standardized in
`RFC 7480 Appendix C <https://tools.ietf.org/html/rfc7489#appendix-C>`_.
This draft schema is still in wide use.
``parsedmarc`` produces consistent, normalized output, regardless of the report schema.
``parsedmarc`` produces consistent, normalized output, regardless of the report
schema.
JSON
----
@@ -134,16 +146,12 @@ CSV
xml_schema,org_name,org_email,org_extra_contact_info,report_id,begin_date,end_date,errors,domain,adkim,aspf,p,sp,pct,fo,source_ip_address,source_country,source_reverse_dns,source_base_domain,count,disposition,dkim_alignment,spf_alignment,policy_override_reasons,policy_override_comments,envelope_from,header_from,envelope_to,dkim_domains,dkim_selectors,dkim_results,spf_domains,spf_scopes,spf_results
draft,acme.com,noreply-dmarc-support@acme.com,http://acme.com/dmarc/support,9391651994964116463,2012-04-27 20:00:00,2012-04-28 19:59:59,,example.com,r,r,none,none,100,0,72.150.241.94,US,adsl-72-150-241-94.shv.bellsouth.net,bellsouth.net,2,none,fail,pass,,,example.com,example.com,,example.com,none,fail,example.com,mfrom,pass
What about forensic DMARC reports?
==================================
Forensic DMARC reports are emails with an attached email sample that failed a
DMARC check. You can parse them with any email message parser, such as
`mail-parser <https://pypi.python.org/pypi/mail-parser/>`_.
Sample forensic report output
=============================
Very few recipients send forensic reports, and even those who do will often
provide only the message headers, and not the message's content, for privacy
reasons.
I don't have a sample I can share for privacy reasons. If you have a sample
forensic report that you can share publicly, please contact me!
Bug reports
===========
@@ -164,9 +172,11 @@ On Debian or Ubuntu systems, run:
$ sudo apt-get install python3-pip
Python 3 installers for Windows and macOS can be found at https://www.python.org/downloads/
Python 3 installers for Windows and macOS can be found at
https://www.python.org/downloads/
To install or upgrade to the latest stable release of ``parsedmarc`` on macOS or Linux, run
To install or upgrade to the latest stable release of ``parsedmarc`` on
macOS or Linux, run
.. code-block:: bash
@@ -181,7 +191,20 @@ Or, install the latest development release directly from GitHub:
.. note::
On Windows, ``pip3`` is ``pip``, even with Python 3. So on Windows, simply
substitute ``pip`` as an administrator in place of ``sudo pip3``, in the above commands.
substitute ``pip`` as an administrator in place of ``sudo pip3``, in the
above commands.
Optional dependencies
---------------------
If you would like to be able to parse emails saved from Microsoft Outlook
(i.e. OLE .msg files), install ``msgconvert``:
On Debian or Ubuntu systems, run:
.. code-block:: bash
$ sudo apt-get install libemail-outlook-message-perl
API
===

View File

@@ -1,13 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""A Python module and CLI for parsing aggregate DMARC reports"""
"""A Python module and CLI for parsing DMARC reports"""
from __future__ import unicode_literals, print_function, absolute_import
import logging
from sys import version_info
from os import path, stat
import os
import json
from datetime import datetime
from collections import OrderedDict
@@ -17,9 +17,16 @@ from gzip import GzipFile
import tarfile
from zipfile import ZipFile
from csv import DictWriter
import re
from base64 import b64decode
import shutil
from argparse import ArgumentParser
from glob import glob
import email
import tempfile
import subprocess
import socket
from time import sleep
import publicsuffix
import xmltodict
@@ -29,11 +36,15 @@ import dns.exception
from requests import get
import geoip2.database
import geoip2.errors
import imapclient
import imapclient.exceptions
import dateparser
import mailparser
__version__ = "1.0.5"
__version__ = "2.0.0"
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
logger.setLevel(logging.INFO)
# Python 2 comparability hack
@@ -41,10 +52,29 @@ if version_info[0] >= 3:
unicode = str
class InvalidAggregateReport(Exception):
feedback_report_regex = re.compile(r"^([\w\-]+): (.+)$", re.MULTILINE)
class ParserError(RuntimeError):
"""Raised whenever the parser fails for some reason"""
class IMAPError(RuntimeError):
"""Raised when an IMAP error occurs"""
class InvalidDMARCReport(ParserError):
"""Raised when an invalid DMARC report is encountered"""
class InvalidAggregateReport(InvalidDMARCReport):
"""Raised when an invalid DMARC aggregate report is encountered"""
class InvalidForensicReport(InvalidDMARCReport):
"""Raised when an invalid DMARC forensic report is encountered"""
def _get_base_domain(domain):
"""
Gets the base domain name for the given domain
@@ -72,11 +102,11 @@ def _get_base_domain(domain):
return publicsuffix.PublicSuffixList(fresh_psl)
if not path.exists(psl_path):
if not os.path.exists(psl_path):
psl = download_psl()
else:
psl_age = datetime.now() - datetime.fromtimestamp(
stat(psl_path).st_mtime)
os.stat(psl_path).st_mtime)
if psl_age > timedelta(hours=24):
psl = download_psl()
else:
@@ -210,16 +240,16 @@ def _get_ip_address_country(ip_address):
db_path = ""
for system_path in system_paths:
if path.exists(system_path):
if os.path.exists(system_path):
db_path = system_path
break
if db_path == "":
if not path.exists(db_filename):
if not os.path.exists(db_filename):
download_country_database(db_filename)
else:
db_age = datetime.now() - datetime.fromtimestamp(
stat(db_filename).st_mtime)
os.stat(db_filename).st_mtime)
if db_age > timedelta(days=60):
shutil.rmtree(db_path)
download_country_database()
@@ -237,6 +267,36 @@ def _get_ip_address_country(ip_address):
return country
def _get_ip_address_info(ip_address, nameservers=None, timeout=6.0):
"""
Returns reverse DNS and country information for the given IP address
Args:
ip_address: The IP address to check
nameservers (list): A list of one or more nameservers to use
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: ``ip_address``, ``reverse_dns``
"""
ip_address = ip_address.lower()
info = OrderedDict()
info["ip_address"] = ip_address
reverse_dns = _get_reverse_dns(ip_address,
nameservers=nameservers,
timeout=timeout)
country = _get_ip_address_country(ip_address)
info["country"] = country
info["reverse_dns"] = reverse_dns
info["base_domain"] = None
if reverse_dns is not None:
base_domain = _get_base_domain(reverse_dns)
info["base_domain"] = base_domain
return info
def _parse_report_record(record, nameservers=None, timeout=6.0):
"""
Converts a record from a DMARC aggregate report into a more consistent
@@ -252,18 +312,9 @@ def _parse_report_record(record, nameservers=None, timeout=6.0):
"""
record = record.copy()
new_record = OrderedDict()
new_record["source"] = OrderedDict()
new_record["source"]["ip_address"] = record["row"]["source_ip"]
reverse_dns = _get_reverse_dns(new_record["source"]["ip_address"],
nameservers=nameservers,
timeout=timeout)
country = _get_ip_address_country(new_record["source"]["ip_address"])
new_record["source"]["country"] = country
new_record["source"]["reverse_dns"] = reverse_dns
new_record["source"]["base_domain"] = None
if new_record["source"]["reverse_dns"] is not None:
base_domain = _get_base_domain(new_record["source"]["reverse_dns"])
new_record["source"]["base_domain"] = base_domain
new_record["source"] = _get_ip_address_info(record["row"]["source_ip"],
nameservers=nameservers,
timeout=timeout)
new_record["count"] = int(record["row"]["count"])
policy_evaluated = record["row"]["policy_evaluated"].copy()
new_policy_evaluated = OrderedDict([("disposition", "none"),
@@ -428,24 +479,24 @@ def parse_aggregate_report_xml(xml, nameservers=None, timeout=6.0):
"{0}".format(error.__str__()))
def parse_aggregate_report_file(_input, nameservers=None, timeout=6.0):
def parse_aggregate_report_file(input_, nameservers=None, timeout=6.0):
"""Parses a file at the given path, a file-like object. or bytes as a
aggregate DMARC report
Args:
_input: A path to a file, a file like object, or bytes
input_: A path to a file, a file like object, or bytes
nameservers (list): A list of one or more nameservers to use
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: The parsed DMARC aggregate report
"""
if type(_input) == str or type(_input) == unicode:
file_object = open(_input, "rb")
elif type(_input) == bytes:
file_object = BytesIO(_input)
if type(input_) == str or type(input_) == unicode:
file_object = open(input_, "rb")
elif type(input_) == bytes:
file_object = BytesIO(input_)
else:
file_object = _input
file_object = input_
try:
header = file_object.read(6)
file_object.seek(0)
@@ -574,61 +625,674 @@ def parsed_aggregate_report_to_csv(_input):
return csv_file_object.getvalue()
def parse_forensic_report(feedback_report, sample, sample_headers_only,
nameservers=None, timeout=6.0):
"""
Converts a DMARC forensic report and sample to a ``OrderedDict``
Args:
feedback_report: A message's feedback report as a string
sample: The RFC 822 headers or RFC 822 message sample
sample_headers_only (bool): Set true if the sample is only headers
nameservers (list): A list of one or more nameservers to use
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: An parsed report and sample
"""
def convert_address(original_address):
if original_address[0] == "":
display_name = None
else:
display_name = original_address[0]
address = original_address[1]
return OrderedDict([("display_name", display_name),
("address", address)])
def get_filename_safe_subject(_subject):
"""
Converts a message subject to a string that is safe for a filename
Args:
_subject: A message subject
Returns:
A string safe for a filename
"""
invalid_filename_chars = ['\\', '/', ':', '"', '*', '?', '|', '\n',
'\r']
if _subject is None:
_subject = "No Subject"
for char in invalid_filename_chars:
_subject = _subject.replace(char, "")
_subject = _subject.rstrip(".")
return _subject
try:
parsed_report = OrderedDict()
report_values = feedback_report_regex.findall(feedback_report)
for report_value in report_values:
key = report_value[0].lower().replace("-", "_")
parsed_report[key] = report_value[1]
if key == "arrival_date":
arrival_utc = dateparser.parse(parsed_report["arrival_date"],
settings={"TO_TIMEZONE": "UTC"})
arrival_utc = arrival_utc.strftime("%Y-%m-%d %H:%M:%S")
parsed_report["arrival_date_utc"] = arrival_utc
ip_address = parsed_report["source_ip"]
parsed_report["source"] = _get_ip_address_info(ip_address,
nameservers=nameservers,
timeout=timeout)
del parsed_report["source_ip"]
if "identity_alignment" not in parsed_report:
parsed_report["authentication_mechanisms"] = []
elif parsed_report["identity_alignment"] == "none":
parsed_report["authentication_mechanisms"] = []
del parsed_report["identity_alignment"]
else:
auth_mechanisms = parsed_report["identity_alignment"]
auth_mechanisms = auth_mechanisms.split(",")
parsed_report["authentication_mechanisms"] = auth_mechanisms
del parsed_report["identity_alignment"]
if "auth_failure" not in parsed_report:
parsed_report["auth_failure"] = "dmarc"
auth_failure = parsed_report["auth_failure"].split(",")
parsed_report["auth_failure"] = auth_failure
optional_fields = ["original_envelope_id", "dkim_domain",
"original_mail_from", "original_rcpt_to"]
for optional_field in optional_fields:
if optional_field not in parsed_report:
parsed_report[optional_field] = None
parsed_mail = mailparser.parse_from_string(sample)
parsed_headers = json.loads(parsed_mail.headers_json)
parsed_message = json.loads(parsed_mail.mail_json)
parsed_sample = OrderedDict([("headers", parsed_headers)])
for key in parsed_message:
parsed_sample[key] = parsed_message[key]
parsed_sample["date"] = parsed_sample["date"].replace("T", " ")
if "received" in parsed_message:
for received in parsed_message["received"]:
if "date_utc" in received:
received["date_utc"] = received["date_utc"].replace("T",
" ")
parsed_sample["from"] = convert_address(parsed_sample["from"][0])
if "reply_to" in parsed_sample:
parsed_sample["reply_to"] = list(map(lambda x: convert_address(x),
parsed_sample["reply_to"]))
else:
parsed_sample["reply_to"] = []
parsed_sample["to"] = list(map(lambda x: convert_address(x),
parsed_sample["to"]))
if "cc" in parsed_sample:
parsed_sample["cc"] = list(map(lambda x: convert_address(x),
parsed_sample["cc"]))
else:
parsed_sample["cc"] = []
if "bcc" in parsed_sample:
parsed_sample["bcc"] = list(map(lambda x: convert_address(x),
parsed_sample["bcc"]))
else:
parsed_sample["bcc"] = []
if "delivered_to" in parsed_sample:
parsed_sample["delivered_to"] = list(
map(lambda x: convert_address(x),
parsed_sample["delivered_to"])
)
if "attachments" not in parsed_sample:
parsed_sample["attachments"] = []
if "subject" not in parsed_sample:
parsed_sample["subject"] = None
parsed_sample["filename_safe_subject"] = get_filename_safe_subject(
parsed_sample["subject"])
if "body" not in parsed_sample:
parsed_sample["body"] = None
if sample_headers_only and parsed_sample["has_defects"]:
del parsed_sample["defects"]
del parsed_sample["defects_categories"]
del parsed_sample["has_defects"]
parsed_report["sample_headers_only"] = sample_headers_only
parsed_report["sample"] = sample
parsed_report["parsed_sample"] = parsed_sample
return parsed_report
except KeyError as error:
raise InvalidForensicReport("Missing value: {0}".format(
error.__str__()))
def parsed_dmarc_forensic_reports_to_csv(report):
fields = ["feedback_type", "user_agent", "version", "original_envelope_id",
"original_mail_from", "original_rcpt_to", "arrival_date",
"arrival_date_utc", "subject", "message_id",
"authentication_results", "dkim_domain", "source_ip_address",
"source_country", "source_reverse_dns", "source_base_domain",
"delivery_result", "auth_failure", "reported_domain",
"authentication_mechanisms", "sample_headers_only"]
csv_file = StringIO()
csv_writer = DictWriter(csv_file, fieldnames=fields)
csv_writer.writeheader()
for row in report:
row = row.copy()
row["source_ip_address"] = row["source"]["ip_address"]
row["source_reverse_dns"] = row["source"]["reverse_dns"]
row["source_base_domain"] = row["source"]["base_domain"]
row["source_country"] = row["source"]["country"]
del row["source"]
row["subject"] = row["parsed_sample"]["subject"]
row["auth_failure"] = ",".join(row["auth_failure"])
authentication_mechanisms = row["authentication_mechanisms"]
row["authentication_mechanisms"] = ",".join(
authentication_mechanisms)
del row["sample"]
del row["parsed_sample"]
csv_writer.writerow(row)
return csv_file.getvalue()
def parse_report_email(input_, nameservers=None, timeout=6.0):
"""
Parses a DMARC report from an email
Args:
input_: An emailed DMARC report in RFC 822 format, as bytes or a string
nameservers (list): A list of one or more nameservers to use
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict:
* ``report_type``: ``aggregate`` or ``forensic``
* ``report``: The parsed report
"""
def is_outlook_msg(suspect_bytes):
"""Checks if the given content is a Outlook msg OLE file"""
return suspect_bytes.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1")
def convert_outlook_msg(msg_bytes):
"""
Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to
standard RFC 822 format
Args:
msg_bytes (bytes): the content of the .msg file
Returns:
A RFC 822 string
"""
if not is_outlook_msg(msg_bytes):
raise ValueError("The supplied bytes are not an Outlook MSG file")
orig_dir = os.getcwd()
tmp_dir = tempfile.mkdtemp()
os.chdir(tmp_dir)
with open("sample.msg", "wb") as msg_file:
msg_file.write(msg_bytes)
try:
subprocess.check_call(["msgconvert", "sample.msg"])
eml_path = "sample.eml"
with open(eml_path, "rb") as eml_file:
rfc822 = eml_file.read()
except FileNotFoundError as e:
raise RuntimeError(
"Error running msgconvert. Please ensure it is installed\n"
"sudo apt install libemail-outlook-message-perl\n"
"https://github.com/mvz/email-outlook-message-perl\n\n"
"{0}".format(e))
finally:
os.chdir(orig_dir)
shutil.rmtree(tmp_dir)
return rfc822
def decode_header(header):
"""Decodes a RFC 822 email header"""
header = header.replace("\r", "").replace("\n", "")
decoded_header = email.header.decode_header(header)
header = ""
for header_part in decoded_header:
if type(header_part[0]) == bytes:
encoding = header_part[1] or "ascii"
header_part = header_part[0].decode(encoding=encoding,
errors="replace")
else:
header_part = header_part[0]
header += header_part
header = header.replace("\r", " ").replace("\n", " ")
return header
if type(input_) == bytes:
if is_outlook_msg(input_):
input_ = convert_outlook_msg(input_)
else:
input_ = input_.decode("utf-8", errors="replace")
result = None
msg = email.message_from_string(input_)
subject = None
feedback_report = None
sample_headers_only = False
sample = None
if "subject" in msg:
subject = decode_header(msg["subject"])
for part in msg.walk():
filename = part.get_filename()
if filename is None:
filename = ""
filename = filename.lower()
content_type = part.get_content_type()
payload = part.get_payload()
if type(payload) == list:
payload = payload[0].__str__()
if content_type == "message/feedback-report":
feedback_report = payload
elif content_type == "text/rfc822-headers":
sample = payload
sample_headers_only = True
elif content_type == "message/rfc822":
sample = payload
sample_headers_only = False
if feedback_report and sample:
forensic_report = parse_forensic_report(feedback_report,
sample,
sample_headers_only,
nameservers=nameservers,
timeout=timeout)
result = OrderedDict([("report_type", "forensic"),
("report", forensic_report)])
if filename.endswith(".gz") or filename.endswith(".zip"):
ns = nameservers
xml = b64decode(part.get_payload())
aggregate_report = parse_aggregate_report_file(xml,
nameservers=ns,
timeout=timeout)
result = OrderedDict([("report_type", "aggregate"),
("report", aggregate_report)])
if result is None:
error = 'Message with subject "{0}" is ' \
'not a valid DMARC report'.format(subject)
raise InvalidDMARCReport(error)
return result
def parse_report_file(input_, nameservers=None, timeout=6.0):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
Args:
input_: A path to a file, a file like object, or bytes
nameservers (list): A list of one or more nameservers to use
timeout (float): Sets the DNS timeout in seconds
Returns:
OrderedDict: The parsed DMARC report
"""
if type(input_) == str or type(input_) == unicode:
file_object = open(input_, "rb")
elif type(input_) == bytes:
file_object = BytesIO(input_)
else:
file_object = input_
content = file_object.read()
try:
report = parse_aggregate_report_file(content, nameservers=nameservers,
timeout=timeout)
results = OrderedDict([("report_type", "aggregate"),
("report", report)])
except InvalidAggregateReport:
try:
results = parse_report_email(content,
nameservers=nameservers,
timeout=timeout)
except InvalidDMARCReport:
raise InvalidDMARCReport("Not a valid aggregate or forensic "
"report")
return results
def get_dmarc_reports_from_inbox(host, username, password,
archive_folder="Archive",
delete=False, test=False,
nameservers=None,
dns_timeout=6.0):
"""
Fetches and parses DMARC reports from sn inbox
Args:
host: The mail server hostname or IP address
username: The mail server username
password: The mail server password
archive_folder: The folder to move processed mail to
delete (bool): Delete messages after processing them
test (bool): Do not move or delete messages after processing them
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
"""
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
aggregate_reports = []
forensic_reports = []
aggregate_report_msg_uids = []
forensic_report_msg_uids = []
aggregate_reports_folder = bytes("{0}/Aggregate".format(archive_folder),
encoding="utf-8")
forensic_reports_folder = bytes("{0}/Forensic".format(archive_folder),
encoding="utf-8")
archive_folder = bytes(archive_folder, encoding="utf-8")
try:
server = imapclient.IMAPClient(host, use_uid=True)
server.login(username, password)
server.select_folder(b'INBOX')
if not server.folder_exists(archive_folder):
server.create_folder(archive_folder)
if not server.folder_exists(aggregate_reports_folder):
server.create_folder(aggregate_reports_folder)
if not server.folder_exists(forensic_reports_folder):
server.create_folder(forensic_reports_folder)
messages = server.search()
for message_uid in messages:
raw_msg = server.fetch(message_uid,
[b'RFC822'])[message_uid][b'RFC822']
msg_content = raw_msg.decode("utf-8", errors="replace")
try:
parsed_email = parse_report_email(msg_content,
nameservers=nameservers,
timeout=dns_timeout)
if parsed_email["report_type"] == "aggregate":
aggregate_reports.append(parsed_email["report"])
aggregate_report_msg_uids.append(message_uid)
elif parsed_email["report_type"] == "forensic":
forensic_reports.append(parsed_email["report"])
forensic_report_msg_uids.append(message_uid)
except InvalidDMARCReport as error:
logger.warning(error.__str__())
if not test:
if delete:
processed_messages = aggregate_report_msg_uids + \
forensic_report_msg_uids
server.add_flags(processed_messages, [imapclient.DELETED])
server.expunge()
else:
if len(aggregate_report_msg_uids) > 0:
server.move(aggregate_report_msg_uids,
aggregate_reports_folder)
if len(forensic_report_msg_uids) > 0:
server.move(forensic_report_msg_uids,
forensic_reports_folder)
results = OrderedDict([("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports)])
return results
except imapclient.exceptions.IMAPClientError as error:
error = error.__str__().lstrip("b'").rstrip("'").rstrip(".")
raise IMAPError(error)
except socket.gaierror:
raise IMAPError("DNS resolution failed")
except TimeoutError:
raise IMAPError("The connection timed out")
def save_output(results, output_directory="output"):
"""
Save report data in the given directory
Args:
results (OrderedDict): Parsing results
output_directory: The patch to the directory to save in
"""
aggregate_reports = results["aggregate_reports"]
forensic_reports = results["forensic_reports"]
if os.path.exists(output_directory):
if not os.path.isdir(output_directory):
raise ValueError("{0} is not a directory".format(output_directory))
else:
os.makedirs(output_directory)
with open("{0}".format(os.path.join(output_directory, "aggregate.json")),
"w", newline="\n", encoding="utf-8") as agg_json:
agg_json.write(json.dumps(aggregate_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "aggregate.csv")),
"w", newline="\n", encoding="utf-8") as agg_csv:
csv = parsed_aggregate_report_to_csv(aggregate_reports)
agg_csv.write(csv)
with open("{0}".format(os.path.join(output_directory, "forensic.json")),
"w", newline="\n", encoding="utf-8") as for_json:
for_json.write(json.dumps(forensic_reports, ensure_ascii=False,
indent=2))
with open("{0}".format(os.path.join(output_directory, "forensic.csv")),
"w", newline="\n", encoding="utf-8") as for_csv:
csv = parsed_dmarc_forensic_reports_to_csv(forensic_reports)
for_csv.write(csv)
samples_directory = os.path.join(output_directory, "samples")
if not os.path.exists(samples_directory):
os.makedirs(samples_directory)
sample_filenames = []
for forensic_report in forensic_reports:
sample = forensic_report["sample"]
message_count = 0
parsed_sample = forensic_report["parsed_sample"]
subject = parsed_sample["filename_safe_subject"]
filename = subject
while filename in sample_filenames:
message_count += 1
filename = "{0} ({1})".format(subject, message_count)
sample_filenames.append(filename)
filename = "{0}.eml".format(filename)
path = os.path.join(samples_directory, filename)
with open(path, "w", newline="\n", encoding="utf-8") as sample_file:
sample_file.write(sample)
def watch_inbox(host, username, password, callback, archive_folder="Archive",
delete=False, test=False, wait=30, nameservers=None,
dns_timeout=6.0):
"""
Use an IDLE IMAP connection to parse incoming emails, and pass the results
to a callback function
Args:
host: The mail server hostname or IP address
username: The mail server username
password: The mail server password
callback: The callback function to receive the parsing results
archive_folder: The folder to move processed mail to
delete (bool): Delete messages after processing them
test (bool): Do not move or delete messages after processing them
wait (int): Number of seconds to wait for a IMAP IDLE response
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
"""
af = archive_folder
ns = nameservers
dt = dns_timeout
server = imapclient.IMAPClient(host)
server.login(username, password)
server.select_folder(b'INBOX')
# Start IDLE mode
server.idle()
while True:
try:
responses = server.idle_check(timeout=wait)
if responses is not None:
for response in responses:
if response[1] == b'RECENT' and response[0] > 0:
res = get_dmarc_reports_from_inbox(host, username,
password,
archive_folder=af,
delete=delete,
test=test,
nameservers=ns,
dns_timeout=dt)
callback(res)
except imapclient.exceptions.IMAPClientError as error:
error = error.__str__().lstrip("b'").rstrip("'").rstrip(".")
raise IMAPError(error)
except socket.gaierror:
raise IMAPError("DNS resolution failed")
except TimeoutError:
raise IMAPError("The connection timed out")
except KeyboardInterrupt:
break
server.idle_done()
logger.info("\nIDLE mode done")
server.logout()
def _main():
"""Called when the module in executed"""
arg_parser = ArgumentParser(description="Parses aggregate DMARC reports")
arg_parser.add_argument("file_path", nargs="+",
def print_results(results_):
"""
Print results in human readable format
Args:
results_ (OrderedDict): Parsing results
"""
print(json.dumps(results_, ensure_ascii=False, indent=2), "\n")
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument("file_path", nargs="*",
help="one or more paths of aggregate report "
"files (compressed or uncompressed)")
arg_parser.add_argument("-f", "--format", default="json",
help="specify JSON or CSV output format")
arg_parser.add_argument("-o", "--output",
help="output to a file path rather than "
"printing to the screen")
arg_parser.add_argument("-n", "--nameserver", nargs="+",
help="Write output files to the given directory")
arg_parser.add_argument("-n", "--nameservers", nargs="+",
help="nameservers to query")
arg_parser.add_argument("-t", "--timeout",
help="number of seconds to wait for an answer "
"from DNS (default 6.0)",
type=float,
default=6.0)
arg_parser.add_argument("-H", "--host", help="IMAP hostname or IP address")
arg_parser.add_argument("-U", "--username", help="IMAP username")
arg_parser.add_argument("-p", "--password", help="IMAP password")
arg_parser.add_argument("-a", "--archive-folder",
help="Specifies the IMAP folder to move "
"messages to after processing them "
"(default: Archive)",
default="Archive")
arg_parser.add_argument("-d", "--delete",
help="Delete the reports after processing them",
action="store_true", default=False)
arg_parser.add_argument("-i", "--idle", action="store_true",
help="Use an IMAP IDLE connection to process "
"reports as they arrive in the inbox")
arg_parser.add_argument("-T", "--test",
help="Do not move or delete IMAP messages",
action="store_true", default=False)
arg_parser.add_argument("-v", "--version", action="version",
version=__version__)
aggregate_reports = []
forensic_reports = []
args = arg_parser.parse_args()
file_paths = []
for file_path in args.file_path:
file_paths += glob(file_path)
file_paths = list(set(file_paths))
parsed_reports = []
for file_path in file_paths:
try:
report = parse_aggregate_report_file(file_path,
nameservers=args.nameserver,
timeout=args.timeout)
parsed_reports.append(report)
except InvalidAggregateReport as error:
logger.error("Unable to parse {0}: {1}".format(file_path,
error.__str__()))
output = ""
if args.format.lower() == "json":
if len(parsed_reports) == 1:
parsed_reports = parsed_reports[0]
output = json.dumps(parsed_reports,
ensure_ascii=False,
indent=2)
elif args.format.lower() == "csv":
output = parsed_aggregate_report_to_csv(parsed_reports)
else:
logger.error("Invalid output format: {0}".format(args.format))
exit(-1)
file_results = parse_report_file(file_path,
nameservers=args.nameservers,
timeout=args.timeout)
if file_results["report_type"] == "aggregate":
aggregate_reports.append(file_results["report"])
elif file_results["report_type"] == "forensic":
forensic_reports.append(file_results["report"])
except ParserError as error:
logger.error("Failed to parse {0} - {1}".format(file_path,
error))
if args.host:
try:
if args.username is None or args.password is None:
logger.error("username and password must be specified if"
"host is specified")
af = args.archive_folder
reports = get_dmarc_reports_from_inbox(args.host,
args.username,
args.password,
archive_folder=af,
delete=args.delete,
test=args.test)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
except IMAPError as error:
logger.error("IMAP Error: {0}".format(error.__str__()))
exit(1)
results = OrderedDict([("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports)])
if args.output:
with open(args.output, "w", encoding="utf-8", newline="\n") as file:
file.write(output)
else:
print(output)
save_output(results, output_directory=args.output)
print_results(results)
if args.host and args.idle:
sleep(2)
logger.warning("The IMAP Connection is now in IDLE mode. "
"Send yourself an email, or quit with ^c")
try:
watch_inbox(args.host, args.username, args.password, print_results,
archive_folder=args.archive_folder, delete=args.delete,
test=args.test, nameservers=args.nameservers,
dns_timeout=args.timeout)
except IMAPError as error:
logger.error("IMAP Error: {0}".format(error.__str__()))
exit(1)
if __name__ == "__main__":

View File

@@ -3,6 +3,9 @@ requests
publicsuffix
xmltodict
geoip2
dnspython
imapclient
dateparser
flake8
sphinx
sphinx_rtd_theme

View File

@@ -92,7 +92,8 @@ setup(
# your project is installed. For an analysis of "install_requires" vs pip's
# requirements files see:
# https://packaging.python.org/en/latest/requirements.html
install_requires=['dnspython', 'publicsuffix', 'xmltodict', 'geoip2'],
install_requires=['dnspython', 'publicsuffix', 'xmltodict', 'geoip2',
'dnspython', 'imapclient', 'dateparser'],
entry_points={
'console_scripts': ['parsedmarc=parsedmarc:_main'],