Merge pull request #301 from nathanthorpe/graph_addition

Add support for Microsoft Graph API (Microsoft 365 mailboxes)
This commit is contained in:
Sean Whalen
2022-04-21 17:11:35 -04:00
committed by GitHub
9 changed files with 450 additions and 234 deletions

View File

@@ -29,7 +29,7 @@ Features
* Parses draft and 1.0 standard aggregate/rua reports
* Parses forensic/failure/ruf reports
* Can parse reports from an inbox over IMAP
* Can parse reports from an inbox over IMAP or Microsoft Graph
* Transparently handles gzip or zip compressed reports
* Consistent data structures
* Simple JSON and/or CSV output
@@ -147,6 +147,8 @@ For example
host = imap.example.com
user = dmarcresports@example.com
password = $uperSecure
[mailbox]
watch = True
[elasticsearch]
@@ -196,8 +198,16 @@ The full set of configuration options are:
.. note::
Setting this to a number larger than one can improve performance when processing thousands of files
- ``imap``
- ``mailbox``
- ``reports_folder`` - str: The mailbox folder where the incoming reports can be found (Default: INBOX)
- ``archive_folder`` - str: The mailbox folder to sort processed emails into (Default: Archive)
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive or poll MS Graph for new messages
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``imap``
- ``host`` - str: The IMAP server hostname or IP address
- ``port`` - int: The IMAP server port (Default: 993).
@@ -208,12 +218,16 @@ The full set of configuration options are:
- ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended)
- ``user`` - str: The IMAP user
- ``password`` - str: The IMAP password
- ``reports_folder`` - str: The IMAP folder where the incoming reports can be found (Default: INBOX)
- ``archive_folder`` - str: The IMAP folder to sort processed emails into (Default: Archive)
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
- ``test`` - bool: Do not move or delete messages
- ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set.
- ``msgraph``
- ``user`` - str: The M365 user
- ``password`` - str: The user password
- ``client_id`` - str: The app registration's client ID
- ``client_secret`` - str: The app registration's secret
- ``mailbox`` - str: The mailbox name. This defaults to the user that is logged in, but could be a shared mailbox if the user has access to the mailbox
.. note::
You must create an app registration in Azure AD and have an admin grant the Microsoft Graph `Mail.ReadWrite` (delegated) permission to the app.
- ``elasticsearch``
- ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``)

View File

@@ -2,34 +2,33 @@
"""A Python package for parsing DMARC reports"""
import logging
import os
import shutil
import xml.parsers.expat as expat
import json
from datetime import datetime
from time import sleep
from collections import OrderedDict
from io import BytesIO, StringIO
from gzip import GzipFile
from socket import timeout
import zipfile
from csv import DictWriter
import re
from base64 import b64decode
import binascii
import email
import tempfile
import email.utils
import json
import logging
import mailbox
import os
import re
import shutil
import tempfile
import xml.parsers.expat as expat
import zipfile
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime
from gzip import GzipFile
from io import BytesIO, StringIO
from typing import Callable
import mailparser
from expiringdict import ExpiringDict
import xmltodict
from expiringdict import ExpiringDict
from lxml import etree
from mailsuite.imap import IMAPClient
from mailsuite.smtp import send_email
from imapclient.exceptions import IMAPClientError
from parsedmarc.mail import MailboxConnection
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
@@ -40,8 +39,8 @@ from base64 import urlsafe_b64decode
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
from parsedmarc.utils import parse_email
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
__version__ = "7.1.1"
@@ -216,7 +215,7 @@ def _parse_report_record(record, ip_db_path=None, offline=False,
def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False,
nameservers=None, timeout=2.0,
parallel=False, server=None):
parallel=False, keep_alive=None):
"""Parses a DMARC XML report string and returns a consistent OrderedDict
Args:
@@ -227,7 +226,7 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False,
(Cloudflare's public DNS resolvers by default)
timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
keep_alive (callable): Keep alive function
Returns:
OrderedDict: The parsed aggregate DMARC report
@@ -321,9 +320,9 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False,
if type(report["record"]) == list:
for i in range(len(report["record"])):
if server is not None and i > 0 and i % 20 == 0:
logger.debug("Sending noop cmd")
server.noop()
if keep_alive is not None and i > 0 and i % 20 == 0:
logger.debug("Sending keepalive cmd")
keep_alive()
logger.debug("Processed {0}/{1}".format(
i, len(report["record"])))
report_record = _parse_report_record(report["record"][i],
@@ -410,7 +409,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None,
nameservers=None,
dns_timeout=2.0,
parallel=False,
server=None):
keep_alive=None):
"""Parses a file at the given path, a file-like object. or bytes as a
aggregate DMARC report
@@ -422,7 +421,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None,
(Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds
parallel (bool): Parallel processing
server (IMAPClient): Connection object
keep_alive (callable): Keep alive function
Returns:
OrderedDict: The parsed DMARC aggregate report
@@ -435,7 +434,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None,
nameservers=nameservers,
timeout=dns_timeout,
parallel=parallel,
server=server)
keep_alive=keep_alive)
def parsed_aggregate_reports_to_csv_rows(reports):
@@ -771,7 +770,7 @@ def parsed_forensic_reports_to_csv(reports):
def parse_report_email(input_, offline=False, ip_db_path=None,
nameservers=None, dns_timeout=2.0,
strip_attachment_payloads=False,
parallel=False, server=None):
parallel=False, keep_alive=None):
"""
Parses a DMARC report from an email
@@ -784,7 +783,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None,
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
parallel (bool): Parallel processing
server (IMAPClient): Connection object
keep_alive (callable): keep alive function
Returns:
OrderedDict:
@@ -852,7 +851,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None,
nameservers=ns,
dns_timeout=dns_timeout,
parallel=parallel,
server=server)
keep_alive=keep_alive)
result = OrderedDict([("report_type", "aggregate"),
("report", aggregate_report)])
return result
@@ -902,7 +901,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None,
def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
strip_attachment_payloads=False, ip_db_path=None,
offline=False, parallel=False, server=None):
offline=False, parallel=False, keep_alive=None):
"""Parses a DMARC aggregate or forensic file at the given path, a
file-like object. or bytes
@@ -916,7 +915,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
offline (bool): Do not make online queries for geolocation or DNS
parallel (bool): Parallel processing
server (IMAPClient): Connection object
keep_alive (callable): Keep alive function
Returns:
OrderedDict: The parsed DMARC report
@@ -938,7 +937,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
nameservers=nameservers,
dns_timeout=dns_timeout,
parallel=parallel,
server=server)
keep_alive=keep_alive)
results = OrderedDict([("report_type", "aggregate"),
("report", report)])
except InvalidAggregateReport:
@@ -951,7 +950,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
parallel=parallel,
server=server)
keep_alive=keep_alive)
except InvalidDMARCReport:
raise InvalidDMARCReport("Not a valid aggregate or forensic "
"report")
@@ -1016,71 +1015,37 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0,
("forensic_reports", forensic_reports)])
def get_imap_capabilities(server):
def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
reports_folder="INBOX",
archive_folder="Archive",
delete=False,
test=False,
ip_db_path=None,
offline=False,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None,
batch_size=None,
create_folders=True):
"""
Returns a list of an IMAP server's capabilities
Fetches and parses DMARC reports from a mailbox
Args:
server (imapclient.IMAPClient): An instance of imapclient.IMAPClient
Returns (list): A list of capabilities
"""
capabilities = list(map(str, list(server.capabilities())))
for i in range(len(capabilities)):
capabilities[i] = str(capabilities[i]).replace("b'",
"").replace("'",
"")
logger.debug("IMAP server supports: {0}".format(capabilities))
return capabilities
def get_dmarc_reports_from_inbox(connection=None,
host=None,
user=None,
password=None,
port=None,
ssl=True,
verify=True,
timeout=30,
max_retries=4,
reports_folder="INBOX",
archive_folder="Archive",
delete=False,
test=False,
ip_db_path=None,
offline=False,
nameservers=None,
dns_timeout=6.0,
strip_attachment_payloads=False,
results=None,
batch_size=None):
"""
Fetches and parses DMARC reports from an inbox
Args:
connection: An IMAPClient connection to reuse
host: The mail server hostname or IP address
user: The mail server user
password: The mail server password
port: The mail server port
ssl (bool): Use SSL/TLS
verify (bool): Verify SSL/TLS certificate
timeout (float): IMAP timeout in seconds
max_retries (int): The maximum number of retries after a timeout
reports_folder: The IMAP folder where reports can be found
connection: A Mailbox connection object
reports_folder: The folder where reports can be found
archive_folder: The folder to move processed mail to
delete (bool): Delete messages after processing them
test (bool): Do not move or delete messages after processing them
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
offline (bool): Do not query onfline for geolocation or DNS
offline (bool): Do not query online for geolocation or DNS
nameservers (list): A list of DNS nameservers to query
dns_timeout (float): Set the DNS query timeout
strip_attachment_payloads (bool): Remove attachment payloads from
forensic report results
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
create_folders (bool): Whether to create the destination folders (not used in watch)
Returns:
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
@@ -1088,9 +1053,8 @@ def get_dmarc_reports_from_inbox(connection=None,
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
if connection is None and (user is None or password is None):
raise ValueError("Must supply a connection, or a username and "
"password")
if connection is None:
raise ValueError("Must supply a connection")
aggregate_reports = []
forensic_reports = []
@@ -1104,22 +1068,13 @@ def get_dmarc_reports_from_inbox(connection=None,
aggregate_reports = results["aggregate_reports"].copy()
forensic_reports = results["forensic_reports"].copy()
if connection:
server = connection
else:
server = IMAPClient(host, user, password, port=port,
ssl=ssl, verify=verify,
timeout=timeout,
max_retries=max_retries,
initial_folder=reports_folder)
if not test and create_folders:
connection.create_folder(archive_folder)
connection.create_folder(aggregate_reports_folder)
connection.create_folder(forensic_reports_folder)
connection.create_folder(invalid_reports_folder)
if not test:
server.create_folder(archive_folder)
server.create_folder(aggregate_reports_folder)
server.create_folder(forensic_reports_folder)
server.create_folder(invalid_reports_folder)
messages = server.search()
messages = connection.fetch_messages(reports_folder)
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))
@@ -1136,16 +1091,15 @@ def get_dmarc_reports_from_inbox(connection=None,
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, message_limit, msg_uid
))
msg_content = server.fetch_message(msg_uid, parse=False)
sa = strip_attachment_payloads
msg_content = connection.fetch_message(msg_uid)
try:
parsed_email = parse_report_email(msg_content,
nameservers=nameservers,
dns_timeout=dns_timeout,
ip_db_path=ip_db_path,
offline=offline,
strip_attachment_payloads=sa,
server=server)
strip_attachment_payloads=strip_attachment_payloads,
keep_alive=connection.keepalive)
if parsed_email["report_type"] == "aggregate":
aggregate_reports.append(parsed_email["report"])
aggregate_report_msg_uids.append(msg_uid)
@@ -1158,12 +1112,12 @@ def get_dmarc_reports_from_inbox(connection=None,
if delete:
logger.debug(
"Deleting message UID {0}".format(msg_uid))
server.delete_messages([msg_uid])
connection.delete_message(msg_uid)
else:
logger.debug(
"Moving message UID {0} to {1}".format(
msg_uid, invalid_reports_folder))
server.move_messages([msg_uid], invalid_reports_folder)
connection.move_message(msg_uid, invalid_reports_folder)
if not test:
if delete:
@@ -1177,12 +1131,12 @@ def get_dmarc_reports_from_inbox(connection=None,
"Deleting message {0} of {1}: UID {2}".format(
i + 1, number_of_processed_msgs, msg_uid))
try:
server.delete_messages([msg_uid])
connection.delete_message(msg_uid)
except Exception as e:
message = "Error deleting message UID"
e = "{0} {1}: " "{2}".format(message, msg_uid, e)
logger.error("IMAP error: {0}".format(e))
logger.error("Mailbox error: {0}".format(e))
else:
if len(aggregate_report_msg_uids) > 0:
log_message = "Moving aggregate report messages from"
@@ -1197,12 +1151,12 @@ def get_dmarc_reports_from_inbox(connection=None,
"Moving message {0} of {1}: UID {2}".format(
i+1, number_of_agg_report_msgs, msg_uid))
try:
server.move_messages([msg_uid],
aggregate_reports_folder)
connection.move_message(msg_uid,
aggregate_reports_folder)
except Exception as e:
message = "Error moving message UID"
e = "{0} {1}: {2}".format(message, msg_uid, e)
logger.error("IMAP error: {0}".format(e))
logger.error("Mailbox error: {0}".format(e))
if len(forensic_report_msg_uids) > 0:
message = "Moving forensic report messages from"
logger.debug(
@@ -1217,21 +1171,21 @@ def get_dmarc_reports_from_inbox(connection=None,
message,
i + 1, number_of_forensic_msgs, msg_uid))
try:
server.move_messages([msg_uid],
forensic_reports_folder)
connection.move_message(msg_uid,
forensic_reports_folder)
except Exception as e:
e = "Error moving message UID {0}: {1}".format(
msg_uid, e)
logger.error("IMAP error: {0}".format(e))
logger.error("Mailbox error: {0}".format(e))
results = OrderedDict([("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports)])
total_messages = len(server.search())
total_messages = len(connection.fetch_messages(reports_folder))
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
results = get_dmarc_reports_from_inbox(
connection=server,
results = get_dmarc_reports_from_mailbox(
connection=connection,
reports_folder=reports_folder,
archive_folder=archive_folder,
delete=delete,
@@ -1452,29 +1406,25 @@ def get_dmarc_reports_from_gmail_api(credentials_file=".credentials",token_file=
return results
def watch_inbox(host, username, password, callback, port=None, ssl=True,
verify=True, reports_folder="INBOX",
def watch_inbox(mailbox_connection: MailboxConnection,
callback: Callable,
reports_folder="INBOX",
archive_folder="Archive", delete=False, test=False,
idle_timeout=30, ip_db_path=None,
check_timeout=30, ip_db_path=None,
offline=False, nameservers=None,
dns_timeout=6.0, strip_attachment_payloads=False,
batch_size=None):
"""
Use an IDLE IMAP connection to parse incoming emails, and pass the results
to a callback function
Watches the mailbox for new messages and sends the results to a callback function
Args:
host: The mail server hostname or IP address
username: The mail server username
password: The mail server password
mailbox_connection: The mailbox connection object
callback: The callback function to receive the parsing results
port: The mail server port
ssl (bool): Use SSL/TLS
verify (bool): Verify the TLS/SSL certificate
reports_folder: The IMAP folder where reports can be found
archive_folder: The folder to move processed mail to
delete (bool): Delete messages after processing them
test (bool): Do not move or delete messages after processing them
idle_timeout (int): Number of seconds to wait for a IMAP IDLE response
check_timeout (int): Number of seconds to wait for a IMAP IDLE response
or the number of seconds until the next mail check
ip_db_path (str): Path to a MMDB file from MaxMind or DBIP
offline (bool): Do not query online for geolocation or DNS
nameservers (list): A list of one or more nameservers to use
@@ -1484,36 +1434,23 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
forensic report samples with None
batch_size (int): Number of messages to read and process before saving
"""
sa = strip_attachment_payloads
def idle_callback(connection):
res = get_dmarc_reports_from_inbox(connection=connection,
reports_folder=reports_folder,
archive_folder=archive_folder,
delete=delete,
test=test,
ip_db_path=ip_db_path,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
batch_size=batch_size)
def check_callback(connection):
res = get_dmarc_reports_from_mailbox(connection=connection,
reports_folder=reports_folder,
archive_folder=archive_folder,
delete=delete,
test=test,
ip_db_path=ip_db_path,
offline=offline,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads,
batch_size=batch_size,
create_folders=False)
callback(res)
while True:
try:
IMAPClient(host=host, username=username, password=password,
port=port, ssl=ssl, verify=verify,
initial_folder=reports_folder,
idle_callback=idle_callback,
idle_timeout=idle_timeout)
except (timeout, IMAPClientError):
logger.warning("IMAP connection timeout. Reconnecting...")
sleep(5)
except Exception as e:
logger.warning("IMAP connection error. {0}. "
"Reconnecting...".format(e))
sleep(5)
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
def save_output(results, output_directory="output",

View File

@@ -17,10 +17,13 @@ import sys
import time
from tqdm import tqdm
from parsedmarc import get_dmarc_reports_from_inbox, watch_inbox, \
from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \
parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \
splunk, save_output, email_results, ParserError, __version__, \
InvalidDMARCReport, s3, syslog, get_dmarc_reports_from_gmail_api
from parsedmarc.mail import IMAPConnection, MSGraphConnection
from parsedmarc.utils import is_mbox
logger = logging.getLogger("parsedmarc")
@@ -63,6 +66,7 @@ def _main():
output_str = "{0}\n".format(json.dumps(reports_,
ensure_ascii=False,
indent=2))
if not opts.silent:
print(output_str)
if opts.kafka_hosts:
@@ -252,6 +256,12 @@ def _main():
verbose=args.verbose,
save_aggregate=False,
save_forensic=False,
mailbox_reports_folder="INBOX",
mailbox_archive_folder="Archive",
mailbox_watch=False,
mailbox_delete=False,
mailbox_test=False,
mailbox_batch_size=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
@@ -260,12 +270,11 @@ def _main():
imap_max_retries=4,
imap_user=None,
imap_password=None,
imap_reports_folder="INBOX",
imap_archive_folder="Archive",
imap_watch=False,
imap_delete=False,
imap_test=False,
imap_batch_size=None,
graph_user=None,
graph_password=None,
graph_client_id=None,
graph_client_secret=None,
graph_mailbox=None,
hec=None,
hec_token=None,
hec_index=None,
@@ -329,8 +338,7 @@ def _main():
if "offline" in general_config:
opts.offline = general_config.getboolean("offline")
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = general_config[
"strip_attachment_payloads"]
opts.strip_attachment_payloads = general_config.getboolean("strip_attachment_payloads")
if "output" in general_config:
opts.output = general_config["output"]
if "aggregate_json_filename" in general_config:
@@ -369,6 +377,24 @@ def _main():
opts.ip_db_path = general_config["ip_db_path"]
else:
opts.ip_db_path = None
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
if "reports_folder" in mailbox_config:
opts.mailbox_reports_folder = mailbox_config["reports_folder"]
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = mailbox_config.getboolean("watch")
if "delete" in mailbox_config:
opts.mailbox_delete = mailbox_config.getboolean("delete")
if "test" in mailbox_config:
opts.mailbox_test = mailbox_config.getboolean("test")
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
else:
opts.mailbox_batch_size = None
if "imap" in config.sections():
imap_config = config["imap"]
if "host" in imap_config:
@@ -402,20 +428,37 @@ def _main():
"imap config section")
exit(-1)
if "reports_folder" in imap_config:
opts.imap_reports_folder = imap_config["reports_folder"]
if "archive_folder" in imap_config:
opts.imap_archive_folder = imap_config["archive_folder"]
if "watch" in imap_config:
opts.imap_watch = imap_config.getboolean("watch")
if "delete" in imap_config:
opts.imap_delete = imap_config.getboolean("delete")
if "test" in imap_config:
opts.imap_test = imap_config.getboolean("test")
if "batch_size" in imap_config:
opts.imap_batch_size = imap_config.getint("batch_size")
if "msgraph" in config.sections():
graph_config = config["msgraph"]
if "user" in graph_config:
opts.graph_user = graph_config["user"]
else:
opts.imap_batch_size = None
logger.critical("user setting missing from the "
"msgraph config section")
exit(-1)
if "password" in graph_config:
opts.graph_password = graph_config["password"]
else:
logger.critical("password setting missing from the "
"msgraph config section")
exit(-1)
if "client_id" in graph_config:
opts.graph_client_id = graph_config["client_id"]
else:
logger.critical("client_id setting missing from the "
"msgraph config section")
exit(-1)
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
logger.critical("client_secret setting missing from the "
"msgraph config section")
exit(-1)
if "mailbox" in graph_config:
opts.graph_mailbox = graph_config["mailbox"]
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
@@ -498,7 +541,7 @@ def _main():
"kafka config section")
exit(-1)
if "ssl" in kafka_config:
opts.kafka_ssl = kafka_config["ssl"].getboolean()
opts.kafka_ssl = kafka_config.getboolean("ssl")
if "skip_certificate_verification" in kafka_config:
kafka_verify = kafka_config.getboolean(
"skip_certificate_verification")
@@ -523,7 +566,7 @@ def _main():
"smtp config section")
exit(-1)
if "port" in smtp_config:
opts.smtp_port = smtp_config["port"]
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = smtp_config.getboolean("ssl")
if "skip_certificate_verification" in smtp_config:
@@ -611,11 +654,12 @@ def _main():
'%(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
if opts.imap_host is None and len(opts.file_path) == 0 and opts.gmail_api_credentials_file is None:
logger.error("You must supply input files, or an IMAP or Gmail configurationor")
if opts.imap_host is None and opts.graph_user is None and len(opts.file_path) == 0 and opts.gmail_api_credentials_file is None:
logger.error("You must supply input files or a mailbox connection")
exit(1)
logger.info("Starting dmarcparse")
logger.info("Starting parsedmarc")
if opts.save_aggregate or opts.save_forensic:
try:
@@ -717,16 +761,13 @@ def _main():
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
mailbox_connection = None
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error("IMAP user and password must be specified if"
"host is specified")
rf = opts.imap_reports_folder
af = opts.imap_archive_folder
ns = opts.nameservers
sa = opts.strip_attachment_payloads
ssl = True
verify = True
if opts.imap_skip_certificate_verification:
@@ -734,7 +775,8 @@ def _main():
verify = False
if opts.imap_ssl is False:
ssl = False
reports = get_dmarc_reports_from_inbox(
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=opts.imap_port,
ssl=ssl,
@@ -743,22 +785,47 @@ def _main():
max_retries=opts.imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
reports_folder=rf,
archive_folder=af,
)
except Exception as error:
logger.error("IMAP Error: {0}".format(error.__str__()))
exit(1)
if opts.graph_user:
try:
mailbox = opts.graph_mailbox or opts.graph_user
mailbox_connection = MSGraphConnection(
client_id=opts.graph_client_id,
client_secret=opts.graph_client_secret,
username=opts.graph_user,
password=opts.graph_password,
mailbox=mailbox
)
except Exception as error:
logger.error("MS Graph Error: {0}".format(error.__str__()))
exit(1)
if mailbox_connection:
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=opts.mailbox_batch_size,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
delete=opts.imap_delete,
offline=opts.offline,
nameservers=ns,
test=opts.imap_test,
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
except Exception as error:
logger.error("IMAP Error: {0}".format(error.__str__()))
logger.error("Mailbox Error: {0}".format(error.__str__()))
exit(1)
if opts.gmail_api_credentials_file:
@@ -805,33 +872,21 @@ def _main():
logger.error("{0}".format(error.__str__()))
exit(1)
if opts.imap_host and opts.imap_watch:
if mailbox_connection and opts.mailbox_watch:
logger.info("Watching for email - Quit with ctrl-c")
ssl = True
verify = True
if opts.imap_skip_certificate_verification:
logger.debug("Skipping IMAP certificate verification")
verify = False
if opts.imap_ssl is False:
ssl = False
try:
sa = opts.strip_attachment_payloads
watch_inbox(
opts.imap_host,
opts.imap_user,
opts.imap_password,
process_reports,
port=opts.imap_port,
ssl=ssl,
verify=verify,
reports_folder=opts.imap_reports_folder,
archive_folder=opts.imap_archive_folder,
delete=opts.imap_delete,
test=opts.imap_test,
mailbox_connection=mailbox_connection,
callback=process_reports,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=sa,
batch_size=opts.imap_batch_size,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=opts.mailbox_batch_size,
ip_db_path=opts.ip_db_path,
offline=opts.offline)
except FileExistsError as error:

View File

@@ -0,0 +1,3 @@
from parsedmarc.mail.mailbox_connection import MailboxConnection
from parsedmarc.mail.graph import MSGraphConnection
from parsedmarc.mail.imap import IMAPConnection

111
parsedmarc/mail/graph.py Normal file
View File

@@ -0,0 +1,111 @@
import logging
from functools import lru_cache
from time import sleep
from typing import List, Optional
from azure.identity import UsernamePasswordCredential
from msgraph.core import GraphClient
from parsedmarc.mail.mailbox_connection import MailboxConnection
logger = logging.getLogger("parsedmarc")
class MSGraphConnection(MailboxConnection):
def __init__(self,
client_id: str,
username: str,
password: str,
client_secret: str,
mailbox: str):
credential = UsernamePasswordCredential(
client_id=client_id,
client_credential=client_secret,
disable_automatic_authentication=True,
username=username,
password=password
)
credential.authenticate(scopes=['Mail.ReadWrite'])
self._client = GraphClient(credential=credential)
self.mailbox_name = mailbox
def create_folder(self, folder_name: str):
sub_url = ''
path_parts = folder_name.split('/')
if len(path_parts) > 1: # Folder is a subFolder
parent_folder_id = None
for folder in path_parts[:-1]:
parent_folder_id = self._find_folder_id_with_parent(folder, parent_folder_id)
sub_url = f'/{parent_folder_id}/childFolders'
folder_name = path_parts[-1]
request_body = {
'displayName': folder_name
}
resp = self._client.post(f'/users/{self.mailbox_name}/mailFolders{sub_url}', json=request_body)
if resp.status_code == 409:
logger.debug(f'Folder {folder_name} already exists, skipping creation')
elif resp.status_code == 201:
logger.debug(f'Created folder {folder_name}')
else:
logger.warning(f'Unknown response {resp.status_code} {resp.json()}')
def fetch_messages(self, folder_name: str) -> List[str]:
""" Returns a list of message UIDs in the specified folder """
folder_id = self._find_folder_id_from_folder_path(folder_name)
result = self._client.get(f'/users/{self.mailbox_name}/mailFolders/{folder_id}/messages?$select=id')
emails = result.json()['value']
return [email['id'] for email in emails]
def fetch_message(self, message_id: str):
result = self._client.get(f'/users/{self.mailbox_name}/messages/{message_id}/$value')
return result.text
def delete_message(self, message_id: str):
resp = self._client.delete(f'/users/{self.mailbox_name}/messages/{message_id}')
if resp.status_code != 204:
raise RuntimeWarning(f"Failed to delete message {resp.status_code}: {resp.json()}")
def move_message(self, message_id: str, folder_name: str):
folder_id = self._find_folder_id_from_folder_path(folder_name)
request_body = {
'destinationId': folder_id
}
resp = self._client.post(f'/users/{self.mailbox_name}/messages/{message_id}/move', json=request_body)
if resp.status_code != 201:
raise RuntimeWarning(f"Failed to move message {resp.status_code}: {resp.json()}")
def keepalive(self):
# Not needed
pass
def watch(self, check_callback, check_timeout):
""" Checks the mailbox for new messages every n seconds"""
while True:
sleep(check_timeout)
check_callback(self)
@lru_cache
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:
path_parts = folder_name.split('/')
parent_folder_id = None
if len(path_parts) > 1:
for folder in path_parts[:-1]:
folder_id = self._find_folder_id_with_parent(folder, parent_folder_id)
parent_folder_id = folder_id
return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id)
else:
return self._find_folder_id_with_parent(folder_name, None)
def _find_folder_id_with_parent(self, folder_name: str, parent_folder_id: Optional[str]):
sub_url = ''
if parent_folder_id is not None:
sub_url = f'/{parent_folder_id}/childFolders'
folders_resp = self._client.get(f'/users/{self.mailbox_name}/mailFolders{sub_url}')
folders = folders_resp.json()['value']
matched_folders = [folder for folder in folders if folder['displayName'] == folder_name]
if len(matched_folders) == 0:
raise RuntimeError(f"folder {folder_name} not found")
selected_folder = matched_folders[0]
return selected_folder['id']

65
parsedmarc/mail/imap.py Normal file
View File

@@ -0,0 +1,65 @@
import logging
from time import sleep
from imapclient.exceptions import IMAPClientError
from mailsuite.imap import IMAPClient
from socket import timeout
from parsedmarc.mail.mailbox_connection import MailboxConnection
logger = logging.getLogger("parsedmarc")
class IMAPConnection(MailboxConnection):
def __init__(self,
host=None,
user=None,
password=None,
port=None,
ssl=True,
verify=True,
timeout=30,
max_retries=4):
self._username = user
self._password = password
self._verify = verify
self._client = IMAPClient(host, user, password, port=port,
ssl=ssl, verify=verify,
timeout=timeout,
max_retries=max_retries)
def create_folder(self, folder_name: str):
self._client.create_folder(folder_name)
def fetch_messages(self, reports_folder: str):
self._client.select_folder(reports_folder)
return self._client.search()
def fetch_message(self, message_id):
return self._client.fetch_message(message_id, parse=False)
def delete_message(self, message_id: str):
self._client.delete_messages([message_id])
def move_message(self, message_id: str, folder_name: str):
self._client.move_messages([message_id], folder_name)
def keepalive(self):
self._client.noop()
def watch(self, check_callback, check_timeout):
""" Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function"""
while True:
try:
IMAPClient(host=self._client.host, username=self._username, password=self._password,
port=self._client.port, ssl=self._client.ssl, verify=self._verify,
idle_callback=check_callback,
idle_timeout=check_timeout)
except (timeout, IMAPClientError):
logger.warning("IMAP connection timeout. Reconnecting...")
sleep(5)
except Exception as e:
logger.warning("IMAP connection error. {0}. "
"Reconnecting...".format(e))
sleep(5)

View File

@@ -0,0 +1,28 @@
from abc import ABC
from typing import List
class MailboxConnection(ABC):
"""
Interface for a mailbox connection
"""
def create_folder(self, folder_name: str):
raise NotImplementedError
def fetch_messages(self, reports_folder: str) -> List[str]:
raise NotImplementedError
def fetch_message(self, message_id) -> str:
raise NotImplementedError
def delete_message(self, message_id: str):
raise NotImplementedError
def move_message(self, message_id: str, folder_name: str):
raise NotImplementedError
def keepalive(self):
raise NotImplementedError
def watch(self, check_callback, check_timeout):
raise NotImplementedError

View File

@@ -10,7 +10,7 @@ xmltodict>=0.12.0
geoip2>=3.0.0
imapclient>=2.1.0
dateparser>=0.7.2
elasticsearch-dsl>=7.2.0,<v7.14.0
elasticsearch-dsl>=7.2.0,<7.14.0
kafka-python>=1.4.4
mailsuite>=1.6.1
nose>=1.3.7
@@ -29,9 +29,10 @@ sphinx_rtd_theme>=0.4.3
codecov>=2.0.15
lxml>=4.4.0
boto3>=1.16.63
msgraph-core>=0.2.2
azure-identity>=1.8.0
google-api-core>=2.4.0
google-api-python-client>=2.35.0
google-auth>=2.3.3
google-auth-httplib2>=0.1.0
google-auth-oauthlib>=0.4.6

View File

@@ -98,11 +98,13 @@ setup(
'requests>=2.22.0', 'imapclient>=2.1.0',
'dateparser>=0.7.2',
'mailsuite>=1.6.1',
'elasticsearch-dsl>=7.2.0,<8.0.0',
'elasticsearch-dsl>=7.2.0,<7.14.0',
'kafka-python>=1.4.4',
'tqdm>=4.31.1',
'lxml>=4.4.0',
'boto3>=1.16.63',
'msgraph-core>=0.2.2',
'azure-identity>=1.8.0'
'google-api-core>=2.4.0',
'google-api-python-client>=2.35.0',
'google-auth>=2.3.3',