From 88c8af8334acb9c8d76ae240198400c1d391fe6a Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Sun, 3 Apr 2022 16:57:31 -0700 Subject: [PATCH 1/6] Implement getting messages from an Office 365 mailbox using MS Graph --- parsedmarc/__init__.py | 178 ++++++++++---------------- parsedmarc/cli.py | 147 ++++++++++++++------- parsedmarc/mail/__init__.py | 3 + parsedmarc/mail/graph.py | 104 +++++++++++++++ parsedmarc/mail/imap.py | 43 +++++++ parsedmarc/mail/mailbox_connection.py | 25 ++++ requirements.txt | 2 + 7 files changed, 346 insertions(+), 156 deletions(-) create mode 100644 parsedmarc/mail/__init__.py create mode 100644 parsedmarc/mail/graph.py create mode 100644 parsedmarc/mail/imap.py create mode 100644 parsedmarc/mail/mailbox_connection.py diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 75fc02a..b92bf21 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -31,6 +31,7 @@ from mailsuite.imap import IMAPClient from mailsuite.smtp import send_email from imapclient.exceptions import IMAPClientError +from parsedmarc.mail import MailboxConnection from parsedmarc.utils import get_base_domain, get_ip_address_info from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime @@ -207,7 +208,7 @@ def _parse_report_record(record, ip_db_path=None, offline=False, def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, nameservers=None, timeout=2.0, - parallel=False, server=None): + parallel=False, keep_alive=None): """Parses a DMARC XML report string and returns a consistent OrderedDict Args: @@ -218,7 +219,7 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS timeout in seconds parallel (bool): Parallel processing - server (IMAPClient): Connection object + keep_alive (callable): Keep alive function Returns: OrderedDict: The parsed aggregate DMARC report @@ -312,9 +313,9 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, if type(report["record"]) == list: for i in range(len(report["record"])): - if server is not None and i > 0 and i % 20 == 0: - logger.debug("Sending noop cmd") - server.noop() + if keep_alive is not None and i > 0 and i % 20 == 0: + logger.debug("Sending keepalive cmd") + keep_alive() logger.debug("Processed {0}/{1}".format( i, len(report["record"]))) report_record = _parse_report_record(report["record"][i], @@ -401,7 +402,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None, nameservers=None, dns_timeout=2.0, parallel=False, - server=None): + keep_alive=None): """Parses a file at the given path, a file-like object. or bytes as a aggregate DMARC report @@ -413,7 +414,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None, (Cloudflare's public DNS resolvers by default) dns_timeout (float): Sets the DNS timeout in seconds parallel (bool): Parallel processing - server (IMAPClient): Connection object + keep_alive (callable): Keep alive function Returns: OrderedDict: The parsed DMARC aggregate report @@ -426,7 +427,7 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None, nameservers=nameservers, timeout=dns_timeout, parallel=parallel, - server=server) + keep_alive=keep_alive) def parsed_aggregate_reports_to_csv_rows(reports): @@ -762,7 +763,7 @@ def parsed_forensic_reports_to_csv(reports): def parse_report_email(input_, offline=False, ip_db_path=None, nameservers=None, dns_timeout=2.0, strip_attachment_payloads=False, - parallel=False, server=None): + parallel=False, keep_alive=None): """ Parses a DMARC report from an email @@ -775,7 +776,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None, strip_attachment_payloads (bool): Remove attachment payloads from forensic report results parallel (bool): Parallel processing - server (IMAPClient): Connection object + keep_alive (callable): keep alive function Returns: OrderedDict: @@ -843,7 +844,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None, nameservers=ns, dns_timeout=dns_timeout, parallel=parallel, - server=server) + keep_alive=keep_alive) result = OrderedDict([("report_type", "aggregate"), ("report", aggregate_report)]) return result @@ -893,7 +894,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None, def parse_report_file(input_, nameservers=None, dns_timeout=2.0, strip_attachment_payloads=False, ip_db_path=None, - offline=False, parallel=False, server=None): + offline=False, parallel=False, keep_alive=None): """Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes @@ -907,7 +908,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, ip_db_path (str): Path to a MMDB file from MaxMind or DBIP offline (bool): Do not make online queries for geolocation or DNS parallel (bool): Parallel processing - server (IMAPClient): Connection object + keep_alive (callable): Keep alive function Returns: OrderedDict: The parsed DMARC report @@ -929,7 +930,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, nameservers=nameservers, dns_timeout=dns_timeout, parallel=parallel, - server=server) + keep_alive=keep_alive) results = OrderedDict([("report_type", "aggregate"), ("report", report)]) except InvalidAggregateReport: @@ -942,7 +943,7 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, dns_timeout=dns_timeout, strip_attachment_payloads=sa, parallel=parallel, - server=server) + keep_alive=keep_alive) except InvalidDMARCReport: raise InvalidDMARCReport("Not a valid aggregate or forensic " "report") @@ -1007,65 +1008,29 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0, ("forensic_reports", forensic_reports)]) -def get_imap_capabilities(server): +def get_dmarc_reports_from_mailbox(connection: MailboxConnection, + reports_folder="INBOX", + archive_folder="Archive", + delete=False, + test=False, + ip_db_path=None, + offline=False, + nameservers=None, + dns_timeout=6.0, + strip_attachment_payloads=False, + results=None, + batch_size=None): """ - Returns a list of an IMAP server's capabilities + Fetches and parses DMARC reports from a mailbox Args: - server (imapclient.IMAPClient): An instance of imapclient.IMAPClient - - Returns (list): A list of capabilities - """ - - capabilities = list(map(str, list(server.capabilities()))) - for i in range(len(capabilities)): - capabilities[i] = str(capabilities[i]).replace("b'", - "").replace("'", - "") - logger.debug("IMAP server supports: {0}".format(capabilities)) - - return capabilities - - -def get_dmarc_reports_from_inbox(connection=None, - host=None, - user=None, - password=None, - port=None, - ssl=True, - verify=True, - timeout=30, - max_retries=4, - reports_folder="INBOX", - archive_folder="Archive", - delete=False, - test=False, - ip_db_path=None, - offline=False, - nameservers=None, - dns_timeout=6.0, - strip_attachment_payloads=False, - results=None, - batch_size=None): - """ - Fetches and parses DMARC reports from an inbox - - Args: - connection: An IMAPClient connection to reuse - host: The mail server hostname or IP address - user: The mail server user - password: The mail server password - port: The mail server port - ssl (bool): Use SSL/TLS - verify (bool): Verify SSL/TLS certificate - timeout (float): IMAP timeout in seconds - max_retries (int): The maximum number of retries after a timeout - reports_folder: The IMAP folder where reports can be found + connection: A Mailbox connection object + reports_folder: The folder where reports can be found archive_folder: The folder to move processed mail to delete (bool): Delete messages after processing them test (bool): Do not move or delete messages after processing them ip_db_path (str): Path to a MMDB file from MaxMind or DBIP - offline (bool): Do not query onfline for geolocation or DNS + offline (bool): Do not query online for geolocation or DNS nameservers (list): A list of DNS nameservers to query dns_timeout (float): Set the DNS query timeout strip_attachment_payloads (bool): Remove attachment payloads from @@ -1079,9 +1044,8 @@ def get_dmarc_reports_from_inbox(connection=None, if delete and test: raise ValueError("delete and test options are mutually exclusive") - if connection is None and (user is None or password is None): - raise ValueError("Must supply a connection, or a username and " - "password") + if connection is None: + raise ValueError("Must supply a connection") aggregate_reports = [] forensic_reports = [] @@ -1095,22 +1059,13 @@ def get_dmarc_reports_from_inbox(connection=None, aggregate_reports = results["aggregate_reports"].copy() forensic_reports = results["forensic_reports"].copy() - if connection: - server = connection - else: - server = IMAPClient(host, user, password, port=port, - ssl=ssl, verify=verify, - timeout=timeout, - max_retries=max_retries, - initial_folder=reports_folder) - if not test: - server.create_folder(archive_folder) - server.create_folder(aggregate_reports_folder) - server.create_folder(forensic_reports_folder) - server.create_folder(invalid_reports_folder) + connection.create_folder(archive_folder) + connection.create_folder(aggregate_reports_folder) + connection.create_folder(forensic_reports_folder) + connection.create_folder(invalid_reports_folder) - messages = server.search() + messages = connection.fetch_messages(batch_size, reports_folder) total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) @@ -1127,16 +1082,15 @@ def get_dmarc_reports_from_inbox(connection=None, logger.debug("Processing message {0} of {1}: UID {2}".format( i+1, message_limit, msg_uid )) - msg_content = server.fetch_message(msg_uid, parse=False) - sa = strip_attachment_payloads + msg_content = connection.fetch_message(msg_uid) try: parsed_email = parse_report_email(msg_content, nameservers=nameservers, dns_timeout=dns_timeout, ip_db_path=ip_db_path, offline=offline, - strip_attachment_payloads=sa, - server=server) + strip_attachment_payloads=strip_attachment_payloads, + keep_alive=connection.keepalive) if parsed_email["report_type"] == "aggregate": aggregate_reports.append(parsed_email["report"]) aggregate_report_msg_uids.append(msg_uid) @@ -1149,12 +1103,12 @@ def get_dmarc_reports_from_inbox(connection=None, if delete: logger.debug( "Deleting message UID {0}".format(msg_uid)) - server.delete_messages([msg_uid]) + connection.delete_message(msg_uid) else: logger.debug( "Moving message UID {0} to {1}".format( msg_uid, invalid_reports_folder)) - server.move_messages([msg_uid], invalid_reports_folder) + connection.move_message(msg_uid, invalid_reports_folder) if not test: if delete: @@ -1168,12 +1122,12 @@ def get_dmarc_reports_from_inbox(connection=None, "Deleting message {0} of {1}: UID {2}".format( i + 1, number_of_processed_msgs, msg_uid)) try: - server.delete_messages([msg_uid]) + connection.delete_message(msg_uid) except Exception as e: message = "Error deleting message UID" e = "{0} {1}: " "{2}".format(message, msg_uid, e) - logger.error("IMAP error: {0}".format(e)) + logger.error("Mailbox error: {0}".format(e)) else: if len(aggregate_report_msg_uids) > 0: log_message = "Moving aggregate report messages from" @@ -1188,12 +1142,12 @@ def get_dmarc_reports_from_inbox(connection=None, "Moving message {0} of {1}: UID {2}".format( i+1, number_of_agg_report_msgs, msg_uid)) try: - server.move_messages([msg_uid], - aggregate_reports_folder) + connection.move_message(msg_uid, + aggregate_reports_folder) except Exception as e: message = "Error moving message UID" e = "{0} {1}: {2}".format(message, msg_uid, e) - logger.error("IMAP error: {0}".format(e)) + logger.error("Mailbox error: {0}".format(e)) if len(forensic_report_msg_uids) > 0: message = "Moving forensic report messages from" logger.debug( @@ -1208,21 +1162,21 @@ def get_dmarc_reports_from_inbox(connection=None, message, i + 1, number_of_forensic_msgs, msg_uid)) try: - server.move_messages([msg_uid], - forensic_reports_folder) + connection.move_message(msg_uid, + forensic_reports_folder) except Exception as e: e = "Error moving message UID {0}: {1}".format( msg_uid, e) - logger.error("IMAP error: {0}".format(e)) + logger.error("Mailbox error: {0}".format(e)) results = OrderedDict([("aggregate_reports", aggregate_reports), ("forensic_reports", forensic_reports)]) - total_messages = len(server.search()) + total_messages = len(connection.fetch_messages(batch_size, reports_folder)) if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run - results = get_dmarc_reports_from_inbox( - connection=server, + results = get_dmarc_reports_from_mailbox( + connection=connection, reports_folder=reports_folder, archive_folder=archive_folder, delete=delete, @@ -1273,17 +1227,17 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, sa = strip_attachment_payloads def idle_callback(connection): - res = get_dmarc_reports_from_inbox(connection=connection, - reports_folder=reports_folder, - archive_folder=archive_folder, - delete=delete, - test=test, - ip_db_path=ip_db_path, - offline=offline, - nameservers=nameservers, - dns_timeout=dns_timeout, - strip_attachment_payloads=sa, - batch_size=batch_size) + res = get_dmarc_reports_from_mailbox(connection=connection, + reports_folder=reports_folder, + archive_folder=archive_folder, + delete=delete, + test=test, + ip_db_path=ip_db_path, + offline=offline, + nameservers=nameservers, + dns_timeout=dns_timeout, + strip_attachment_payloads=sa, + batch_size=batch_size) callback(res) while True: diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 1418e80..72b8032 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -17,10 +17,11 @@ import sys import time from tqdm import tqdm -from parsedmarc import get_dmarc_reports_from_inbox, watch_inbox, \ +from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \ parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \ splunk, save_output, email_results, ParserError, __version__, \ InvalidDMARCReport, s3, syslog +from parsedmarc.mail import IMAPConnection, MSGraphConnection from parsedmarc.utils import is_mbox logger = logging.getLogger("parsedmarc") @@ -63,6 +64,7 @@ def _main(): output_str = "{0}\n".format(json.dumps(reports_, ensure_ascii=False, indent=2)) + if not opts.silent: print(output_str) if opts.kafka_hosts: @@ -252,6 +254,12 @@ def _main(): verbose=args.verbose, save_aggregate=False, save_forensic=False, + mailbox_reports_folder="INBOX", + mailbox_archive_folder="Archive", + mailbox_watch=False, + mailbox_delete=False, + mailbox_test=False, + mailbox_batch_size=None, imap_host=None, imap_skip_certificate_verification=False, imap_ssl=True, @@ -260,12 +268,11 @@ def _main(): imap_max_retries=4, imap_user=None, imap_password=None, - imap_reports_folder="INBOX", - imap_archive_folder="Archive", - imap_watch=False, - imap_delete=False, - imap_test=False, - imap_batch_size=None, + graph_user=None, + graph_password=None, + graph_client_id=None, + graph_client_secret=None, + graph_mailbox=None, hec=None, hec_token=None, hec_index=None, @@ -320,8 +327,7 @@ def _main(): if "offline" in general_config: opts.offline = general_config.getboolean("offline") if "strip_attachment_payloads" in general_config: - opts.strip_attachment_payloads = general_config[ - "strip_attachment_payloads"] + opts.strip_attachment_payloads = general_config.getboolean("strip_attachment_payloads") if "output" in general_config: opts.output = general_config["output"] if "aggregate_json_filename" in general_config: @@ -360,6 +366,24 @@ def _main(): opts.ip_db_path = general_config["ip_db_path"] else: opts.ip_db_path = None + + if "mailbox" in config.sections(): + mailbox_config = config["mailbox"] + if "reports_folder" in mailbox_config: + opts.mailbox_reports_folder = mailbox_config["reports_folder"] + if "archive_folder" in mailbox_config: + opts.mailbox_archive_folder = mailbox_config["archive_folder"] + if "watch" in mailbox_config: + opts.mailbox_watch = mailbox_config.getboolean("watch") + if "delete" in mailbox_config: + opts.mailbox_delete = mailbox_config.getboolean("delete") + if "test" in mailbox_config: + opts.mailbox_test = mailbox_config.getboolean("test") + if "batch_size" in mailbox_config: + opts.mailbox_batch_size = mailbox_config.getint("batch_size") + else: + opts.mailbox_batch_size = None + if "imap" in config.sections(): imap_config = config["imap"] if "host" in imap_config: @@ -393,20 +417,37 @@ def _main(): "imap config section") exit(-1) - if "reports_folder" in imap_config: - opts.imap_reports_folder = imap_config["reports_folder"] - if "archive_folder" in imap_config: - opts.imap_archive_folder = imap_config["archive_folder"] - if "watch" in imap_config: - opts.imap_watch = imap_config.getboolean("watch") - if "delete" in imap_config: - opts.imap_delete = imap_config.getboolean("delete") - if "test" in imap_config: - opts.imap_test = imap_config.getboolean("test") - if "batch_size" in imap_config: - opts.imap_batch_size = imap_config.getint("batch_size") + if "msgraph" in config.sections(): + graph_config = config["msgraph"] + if "user" in graph_config: + opts.graph_user = graph_config["user"] else: - opts.imap_batch_size = None + logger.critical("user setting missing from the " + "msgraph config section") + exit(-1) + if "password" in graph_config: + opts.graph_password = graph_config["password"] + else: + logger.critical("password setting missing from the " + "msgraph config section") + exit(-1) + + if "client_id" in graph_config: + opts.graph_client_id = graph_config["client_id"] + else: + logger.critical("client_id setting missing from the " + "msgraph config section") + exit(-1) + + if "client_secret" in graph_config: + opts.graph_client_secret = graph_config["client_secret"] + else: + logger.critical("client_secret setting missing from the " + "msgraph config section") + exit(-1) + if "mailbox" in graph_config: + opts.graph_mailbox = graph_config["mailbox"] + if "elasticsearch" in config: elasticsearch_config = config["elasticsearch"] if "hosts" in elasticsearch_config: @@ -489,7 +530,7 @@ def _main(): "kafka config section") exit(-1) if "ssl" in kafka_config: - opts.kafka_ssl = kafka_config["ssl"].getboolean() + opts.kafka_ssl = kafka_config.getboolean("ssl") if "skip_certificate_verification" in kafka_config: kafka_verify = kafka_config.getboolean( "skip_certificate_verification") @@ -514,7 +555,7 @@ def _main(): "smtp config section") exit(-1) if "port" in smtp_config: - opts.smtp_port = smtp_config["port"] + opts.smtp_port = smtp_config.getint("port") if "ssl" in smtp_config: opts.smtp_ssl = smtp_config.getboolean("ssl") if "skip_certificate_verification" in smtp_config: @@ -594,8 +635,8 @@ def _main(): '%(levelname)s - [%(filename)s:%(lineno)d] - %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) - if opts.imap_host is None and len(opts.file_path) == 0: - logger.error("You must supply input files, or an IMAP configuration") + if opts.imap_host is None and opts.graph_user is None and len(opts.file_path) == 0: + logger.error("You must supply input files, or a mailbox connection") exit(1) logger.info("Starting dmarcparse") @@ -700,16 +741,13 @@ def _main(): aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] + mailbox_connection = None if opts.imap_host: try: if opts.imap_user is None or opts.imap_password is None: logger.error("IMAP user and password must be specified if" "host is specified") - rf = opts.imap_reports_folder - af = opts.imap_archive_folder - ns = opts.nameservers - sa = opts.strip_attachment_payloads ssl = True verify = True if opts.imap_skip_certificate_verification: @@ -717,7 +755,8 @@ def _main(): verify = False if opts.imap_ssl is False: ssl = False - reports = get_dmarc_reports_from_inbox( + + mailbox_connection = IMAPConnection( host=opts.imap_host, port=opts.imap_port, ssl=ssl, @@ -726,24 +765,44 @@ def _main(): max_retries=opts.imap_max_retries, user=opts.imap_user, password=opts.imap_password, - reports_folder=rf, - archive_folder=af, - ip_db_path=opts.ip_db_path, - delete=opts.imap_delete, - offline=opts.offline, - nameservers=ns, - test=opts.imap_test, - strip_attachment_payloads=sa, - batch_size=opts.imap_batch_size ) - aggregate_reports += reports["aggregate_reports"] - forensic_reports += reports["forensic_reports"] - except Exception as error: logger.error("IMAP Error: {0}".format(error.__str__())) exit(1) + if opts.graph_user: + try: + mailbox = opts.graph_mailbox or opts.graph_user + mailbox_connection = MSGraphConnection( + client_id=opts.graph_client_id, + client_secret=opts.graph_client_secret, + username=opts.graph_user, + password=opts.graph_password, + mailbox=mailbox + ) + + except Exception as error: + logger.error("MS Graph Error: {0}".format(error.__str__())) + exit(1) + + if mailbox_connection: + reports = get_dmarc_reports_from_mailbox( + connection=mailbox_connection, + delete=opts.mailbox_delete, + batch_size=opts.mailbox_batch_size, + reports_folder=opts.mailbox_reports_folder, + archive_folder=opts.mailbox_archive_folder, + ip_db_path=opts.ip_db_path, + offline=opts.offline, + nameservers=opts.nameservers, + test=opts.mailbox_test, + strip_attachment_payloads=opts.strip_attachment_payloads, + ) + + aggregate_reports += reports["aggregate_reports"] + forensic_reports += reports["forensic_reports"] + results = OrderedDict([("aggregate_reports", aggregate_reports), ("forensic_reports", forensic_reports)]) @@ -770,7 +829,7 @@ def _main(): logger.error("{0}".format(error.__str__())) exit(1) - if opts.imap_host and opts.imap_watch: + if mailbox_connection and opts.mailbox_watch: logger.info("Watching for email - Quit with ctrl-c") ssl = True verify = True diff --git a/parsedmarc/mail/__init__.py b/parsedmarc/mail/__init__.py new file mode 100644 index 0000000..87ef360 --- /dev/null +++ b/parsedmarc/mail/__init__.py @@ -0,0 +1,3 @@ +from parsedmarc.mail.mailbox_connection import MailboxConnection +from parsedmarc.mail.graph import MSGraphConnection +from parsedmarc.mail.imap import IMAPConnection diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py new file mode 100644 index 0000000..b58740c --- /dev/null +++ b/parsedmarc/mail/graph.py @@ -0,0 +1,104 @@ +import logging +from functools import lru_cache +from typing import List, Optional + +from azure.identity import UsernamePasswordCredential +from msgraph.core import GraphClient + +from parsedmarc.mail.mailbox_connection import MailboxConnection + +logger = logging.getLogger("parsedmarc") + + +class MSGraphConnection(MailboxConnection): + def __init__(self, + client_id: str, + username: str, + password: str, + client_secret: str, + mailbox: str): + credential = UsernamePasswordCredential( + client_id=client_id, + client_credential=client_secret, + disable_automatic_authentication=True, + username=username, + password=password + ) + credential.authenticate(scopes=['Mail.ReadWrite']) + self._client = GraphClient(credential=credential) + self.mailbox_name = mailbox + + def create_folder(self, folder_name: str): + sub_url = '' + path_parts = folder_name.split('/') + if len(path_parts) > 1: # Folder is a subFolder + parent_folder_id = None + for folder in path_parts[:-1]: + parent_folder_id = self._find_folder_id_with_parent(folder, parent_folder_id) + sub_url = f'/{parent_folder_id}/childFolders' + folder_name = path_parts[-1] + + request_body = { + 'displayName': folder_name + } + resp = self._client.post(f'/users/{self.mailbox_name}/mailFolders{sub_url}', json=request_body) + if resp.status_code == 409: + logger.debug(f'Folder {folder_name} already exists, skipping creation') + elif resp.status_code == 201: + logger.debug(f'Created folder {folder_name}') + else: + logger.warning(f'Unknown response {resp.status_code} {resp.json()}') + + def fetch_messages(self, batch_size: int, folder_name: str) -> List[str]: + """ Returns a list of message UIDs in the specified folder """ + folder_id = self._find_folder_id_from_folder_path(folder_name) + result = self._client.get(f'/users/{self.mailbox_name}/mailFolders/{folder_id}/messages?$select=id') + emails = result.json()['value'] + return [email['id'] for email in emails] + + def fetch_message(self, message_id: str): + result = self._client.get(f'/users/{self.mailbox_name}/messages/{message_id}/$value') + return result.text + + def delete_message(self, message_id: str): + resp = self._client.delete(f'/users/{self.mailbox_name}/messages/{message_id}') + if resp.status_code != 204: + raise RuntimeWarning(f"Failed to delete message {resp.status_code}: {resp.json()}") + + def move_message(self, message_id: str, folder_name: str): + folder_id = self._find_folder_id_from_folder_path(folder_name) + request_body = { + 'destinationId': folder_id + } + resp = self._client.post(f'/users/{self.mailbox_name}/messages/{message_id}/move', json=request_body) + if resp.status_code != 201: + raise RuntimeWarning(f"Failed to move message {resp.status_code}: {resp.json()}") + + def keepalive(self): + # Not needed + pass + + @lru_cache + def _find_folder_id_from_folder_path(self, folder_name: str) -> str: + path_parts = folder_name.split('/') + parent_folder_id = None + if len(path_parts) > 1: + for folder in path_parts[:-1]: + folder_id = self._find_folder_id_with_parent(folder, parent_folder_id) + parent_folder_id = folder_id + return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id) + else: + return self._find_folder_id_with_parent(folder_name, None) + + def _find_folder_id_with_parent(self, folder_name: str, parent_folder_id: Optional[str]): + sub_url = '' + if parent_folder_id is not None: + sub_url = f'/{parent_folder_id}/childFolders' + folders_resp = self._client.get(f'/users/{self.mailbox_name}/mailFolders{sub_url}') + folders = folders_resp.json()['value'] + matched_folders = [folder for folder in folders if folder['displayName'] == folder_name] + if len(matched_folders) == 0: + raise RuntimeError(f"folder {folder_name} not found") + selected_folder = matched_folders[0] + return selected_folder['id'] + diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py new file mode 100644 index 0000000..4d00e2c --- /dev/null +++ b/parsedmarc/mail/imap.py @@ -0,0 +1,43 @@ +import logging + +from mailsuite.imap import IMAPClient + +from parsedmarc.mail.mailbox_connection import MailboxConnection + + +logger = logging.getLogger("parsedmarc") + + +class IMAPConnection(MailboxConnection): + def __init__(self, + host=None, + user=None, + password=None, + port=None, + ssl=True, + verify=True, + timeout=30, + max_retries=4): + self._client = IMAPClient(host, user, password, port=port, + ssl=ssl, verify=verify, + timeout=timeout, + max_retries=max_retries) + + def create_folder(self, folder_name: str): + self._client.create_folder(folder_name) + + def fetch_messages(self, batch_size, reports_folder: str): + self._client.select_folder(reports_folder) + return self._client.search() + + def fetch_message(self, message_id): + return self._client.fetch_message(message_id, parse=False) + + def delete_message(self, message_id: str): + self._client.delete_messages([message_id]) + + def move_message(self, message_id: str, folder_name: str): + self._client.move_messages([message_id], folder_name) + + def keepalive(self): + self._client.noop() diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py new file mode 100644 index 0000000..717360c --- /dev/null +++ b/parsedmarc/mail/mailbox_connection.py @@ -0,0 +1,25 @@ +from abc import ABC +from typing import List + + +class MailboxConnection(ABC): + """ + Interface for a mailbox connection + """ + def create_folder(self, folder_name: str): + raise NotImplementedError + + def fetch_messages(self, batch_size: int, reports_folder: str) -> List[str]: + raise NotImplementedError + + def fetch_message(self, message_id) -> str: + raise NotImplementedError + + def delete_message(self, message_id: str): + raise NotImplementedError + + def move_message(self, message_id: str, folder_name: str): + raise NotImplementedError + + def keepalive(self): + raise NotImplementedError diff --git a/requirements.txt b/requirements.txt index d098312..b30d8dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,5 @@ sphinx_rtd_theme>=0.4.3 codecov>=2.0.15 lxml>=4.4.0 boto3>=1.16.63 +msgraph-core>=0.2.2 +azure-identity>=1.8.0 From 1f865ae566febd7db53b6904c4e392ad43945ab2 Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Sun, 3 Apr 2022 17:42:11 -0700 Subject: [PATCH 2/6] implement mailbox watch --- parsedmarc/__init__.py | 86 +++++++++++---------------- parsedmarc/cli.py | 30 +++------- parsedmarc/mail/graph.py | 7 +++ parsedmarc/mail/imap.py | 22 +++++++ parsedmarc/mail/mailbox_connection.py | 3 + 5 files changed, 75 insertions(+), 73 deletions(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index b92bf21..5558346 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2,40 +2,37 @@ """A Python package for parsing DMARC reports""" -import logging -import os -import shutil -import xml.parsers.expat as expat -import json -from datetime import datetime -from time import sleep -from collections import OrderedDict -from io import BytesIO, StringIO -from gzip import GzipFile -from socket import timeout -import zipfile -from csv import DictWriter -import re -from base64 import b64decode import binascii import email -import tempfile import email.utils +import json +import logging import mailbox +import os +import re +import shutil +import tempfile +import xml.parsers.expat as expat +import zipfile +from base64 import b64decode +from collections import OrderedDict +from csv import DictWriter +from datetime import datetime +from gzip import GzipFile +from io import BytesIO, StringIO +from typing import Callable import mailparser -from expiringdict import ExpiringDict import xmltodict +from expiringdict import ExpiringDict from lxml import etree -from mailsuite.imap import IMAPClient from mailsuite.smtp import send_email -from imapclient.exceptions import IMAPClientError from parsedmarc.mail import MailboxConnection from parsedmarc.utils import get_base_domain, get_ip_address_info from parsedmarc.utils import is_outlook_msg, convert_outlook_msg -from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime from parsedmarc.utils import parse_email +from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime __version__ = "7.1.1" @@ -1019,7 +1016,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, dns_timeout=6.0, strip_attachment_payloads=False, results=None, - batch_size=None): + batch_size=None, + create_folders=True): """ Fetches and parses DMARC reports from a mailbox @@ -1037,6 +1035,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, forensic report results results (dict): Results from the previous run batch_size (int): Number of messages to read and process before saving + create_folders (bool): Whether to create the destination folders (not used in watch) Returns: OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports`` @@ -1059,7 +1058,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, aggregate_reports = results["aggregate_reports"].copy() forensic_reports = results["forensic_reports"].copy() - if not test: + if not test and create_folders: connection.create_folder(archive_folder) connection.create_folder(aggregate_reports_folder) connection.create_folder(forensic_reports_folder) @@ -1192,29 +1191,25 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, return results -def watch_inbox(host, username, password, callback, port=None, ssl=True, - verify=True, reports_folder="INBOX", +def watch_inbox(mailbox_connection: MailboxConnection, + callback: Callable, + reports_folder="INBOX", archive_folder="Archive", delete=False, test=False, - idle_timeout=30, ip_db_path=None, + check_timeout=30, ip_db_path=None, offline=False, nameservers=None, dns_timeout=6.0, strip_attachment_payloads=False, batch_size=None): """ - Use an IDLE IMAP connection to parse incoming emails, and pass the results - to a callback function + Watches the mailbox for new messages and sends the results to a callback function Args: - host: The mail server hostname or IP address - username: The mail server username - password: The mail server password + mailbox_connection: The mailbox connection object callback: The callback function to receive the parsing results - port: The mail server port - ssl (bool): Use SSL/TLS - verify (bool): Verify the TLS/SSL certificate reports_folder: The IMAP folder where reports can be found archive_folder: The folder to move processed mail to delete (bool): Delete messages after processing them test (bool): Do not move or delete messages after processing them - idle_timeout (int): Number of seconds to wait for a IMAP IDLE response + check_timeout (int): Number of seconds to wait for a IMAP IDLE response + or the number of seconds until the next mail check ip_db_path (str): Path to a MMDB file from MaxMind or DBIP offline (bool): Do not query online for geolocation or DNS nameservers (list): A list of one or more nameservers to use @@ -1224,9 +1219,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, forensic report samples with None batch_size (int): Number of messages to read and process before saving """ - sa = strip_attachment_payloads - def idle_callback(connection): + def check_callback(connection): res = get_dmarc_reports_from_mailbox(connection=connection, reports_folder=reports_folder, archive_folder=archive_folder, @@ -1236,24 +1230,12 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, offline=offline, nameservers=nameservers, dns_timeout=dns_timeout, - strip_attachment_payloads=sa, - batch_size=batch_size) + strip_attachment_payloads=strip_attachment_payloads, + batch_size=batch_size, + create_folders=False) callback(res) - while True: - try: - IMAPClient(host=host, username=username, password=password, - port=port, ssl=ssl, verify=verify, - initial_folder=reports_folder, - idle_callback=idle_callback, - idle_timeout=idle_timeout) - except (timeout, IMAPClientError): - logger.warning("IMAP connection timeout. Reconnecting...") - sleep(5) - except Exception as e: - logger.warning("IMAP connection error. {0}. " - "Reconnecting...".format(e)) - sleep(5) + mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout) def save_output(results, output_directory="output", diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 72b8032..4f07f15 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -831,31 +831,19 @@ def _main(): if mailbox_connection and opts.mailbox_watch: logger.info("Watching for email - Quit with ctrl-c") - ssl = True - verify = True - if opts.imap_skip_certificate_verification: - logger.debug("Skipping IMAP certificate verification") - verify = False - if opts.imap_ssl is False: - ssl = False + try: - sa = opts.strip_attachment_payloads watch_inbox( - opts.imap_host, - opts.imap_user, - opts.imap_password, - process_reports, - port=opts.imap_port, - ssl=ssl, - verify=verify, - reports_folder=opts.imap_reports_folder, - archive_folder=opts.imap_archive_folder, - delete=opts.imap_delete, - test=opts.imap_test, + mailbox_connection=mailbox_connection, + callback=process_reports, + reports_folder=opts.mailbox_reports_folder, + archive_folder=opts.mailbox_archive_folder, + delete=opts.mailbox_delete, + test=opts.mailbox_test, nameservers=opts.nameservers, dns_timeout=opts.dns_timeout, - strip_attachment_payloads=sa, - batch_size=opts.imap_batch_size, + strip_attachment_payloads=opts.strip_attachment_payloads, + batch_size=opts.mailbox_batch_size, ip_db_path=opts.ip_db_path, offline=opts.offline) except FileExistsError as error: diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index b58740c..ba12f25 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -1,5 +1,6 @@ import logging from functools import lru_cache +from time import sleep from typing import List, Optional from azure.identity import UsernamePasswordCredential @@ -78,6 +79,12 @@ class MSGraphConnection(MailboxConnection): # Not needed pass + def watch(self, check_callback, check_timeout): + """ Checks the mailbox for new messages every n seconds""" + while True: + sleep(check_timeout) + check_callback(self) + @lru_cache def _find_folder_id_from_folder_path(self, folder_name: str) -> str: path_parts = folder_name.split('/') diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 4d00e2c..732cef3 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -1,6 +1,9 @@ import logging +from time import sleep +from imapclient.exceptions import IMAPClientError from mailsuite.imap import IMAPClient +from socket import timeout from parsedmarc.mail.mailbox_connection import MailboxConnection @@ -18,6 +21,9 @@ class IMAPConnection(MailboxConnection): verify=True, timeout=30, max_retries=4): + self._username = user + self._password = password + self._verify = verify self._client = IMAPClient(host, user, password, port=port, ssl=ssl, verify=verify, timeout=timeout, @@ -41,3 +47,19 @@ class IMAPConnection(MailboxConnection): def keepalive(self): self._client.noop() + + def watch(self, check_callback, check_timeout): + """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function""" + while True: + try: + IMAPClient(host=self._client.host, username=self._username, password=self._password, + port=self._client.port, ssl=self._client.ssl, verify=self._verify, + idle_callback=check_callback, + idle_timeout=check_timeout) + except (timeout, IMAPClientError): + logger.warning("IMAP connection timeout. Reconnecting...") + sleep(5) + except Exception as e: + logger.warning("IMAP connection error. {0}. " + "Reconnecting...".format(e)) + sleep(5) diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py index 717360c..f5750f3 100644 --- a/parsedmarc/mail/mailbox_connection.py +++ b/parsedmarc/mail/mailbox_connection.py @@ -23,3 +23,6 @@ class MailboxConnection(ABC): def keepalive(self): raise NotImplementedError + + def watch(self, check_callback, check_timeout): + raise NotImplementedError From 59a39b15091436216da341a74e9964042aef6d8b Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Sun, 3 Apr 2022 17:54:40 -0700 Subject: [PATCH 3/6] update readme --- README.rst | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index ab7b96f..868a02b 100644 --- a/README.rst +++ b/README.rst @@ -147,6 +147,8 @@ For example host = imap.example.com user = dmarcresports@example.com password = $uperSecure + + [mailbox] watch = True [elasticsearch] @@ -187,8 +189,16 @@ The full set of configuration options are: .. note:: Setting this to a number larger than one can improve performance when processing thousands of files -- ``imap`` +- ``mailbox`` + - ``reports_folder`` - str: The mailbox folder where the incoming reports can be found (Default: INBOX) + - ``archive_folder`` - str: The mailbox folder to sort processed emails into (Default: Archive) + - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive or poll MS Graph for new messages + - ``delete`` - bool: Delete messages after processing them, instead of archiving them + - ``test`` - bool: Do not move or delete messages + - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. + +- ``imap`` - ``host`` - str: The IMAP server hostname or IP address - ``port`` - int: The IMAP server port (Default: 993). @@ -199,12 +209,16 @@ The full set of configuration options are: - ``skip_certificate_verification`` - bool: Skip certificate verification (not recommended) - ``user`` - str: The IMAP user - ``password`` - str: The IMAP password - - ``reports_folder`` - str: The IMAP folder where the incoming reports can be found (Default: INBOX) - - ``archive_folder`` - str: The IMAP folder to sort processed emails into (Default: Archive) - - ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive - - ``delete`` - bool: Delete messages after processing them, instead of archiving them - - ``test`` - bool: Do not move or delete messages - - ``batch_size`` - int: Number of messages to read and process before saving. Defaults to all messages if not set. + +- ``msgraph`` + - ``user`` - str: The M365 user + - ``password`` - str: The user password + - ``client_id`` - str: The app registration's client ID + - ``client_secret`` - str: The app registration's secret + - ``mailbox`` - str: The mailbox name. This defaults to the user that is logged in, but could be a shared mailbox if the user has access to the mailbox + .. note:: + You must create an app registration in Azure AD and have an admin grant the Microsoft Graph `Mail.ReadWrite` (delegated) permission to the app. + - ``elasticsearch`` - ``hosts`` - str: A comma separated list of hostnames and ports or URLs (e.g. ``127.0.0.1:9200`` or ``https://user:secret@localhost``) From db1d3443fdfb52177461d6a056f5e200d6ceeada Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Sun, 3 Apr 2022 17:56:39 -0700 Subject: [PATCH 4/6] update setup.py --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 785ba97..e577963 100644 --- a/setup.py +++ b/setup.py @@ -102,7 +102,9 @@ setup( 'kafka-python>=1.4.4', 'tqdm>=4.31.1', 'lxml>=4.4.0', - 'boto3>=1.16.63' + 'boto3>=1.16.63', + 'msgraph-core>=0.2.2', + 'azure-identity>=1.8.0' ], entry_points={ From 445e3cdf9d7731892cf1a7a68917c0e5abbff4fc Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Sun, 3 Apr 2022 21:49:27 -0700 Subject: [PATCH 5/6] add try except block to get_dmarc_reports call --- README.rst | 2 +- parsedmarc/cli.py | 33 +++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/README.rst b/README.rst index 868a02b..9afbf1a 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ Features * Parses draft and 1.0 standard aggregate/rua reports * Parses forensic/failure/ruf reports -* Can parse reports from an inbox over IMAP +* Can parse reports from an inbox over IMAP or Microsoft Graph * Transparently handles gzip or zip compressed reports * Consistent data structures * Simple JSON and/or CSV output diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 4f07f15..9dee52e 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -787,21 +787,26 @@ def _main(): exit(1) if mailbox_connection: - reports = get_dmarc_reports_from_mailbox( - connection=mailbox_connection, - delete=opts.mailbox_delete, - batch_size=opts.mailbox_batch_size, - reports_folder=opts.mailbox_reports_folder, - archive_folder=opts.mailbox_archive_folder, - ip_db_path=opts.ip_db_path, - offline=opts.offline, - nameservers=opts.nameservers, - test=opts.mailbox_test, - strip_attachment_payloads=opts.strip_attachment_payloads, - ) + try: + reports = get_dmarc_reports_from_mailbox( + connection=mailbox_connection, + delete=opts.mailbox_delete, + batch_size=opts.mailbox_batch_size, + reports_folder=opts.mailbox_reports_folder, + archive_folder=opts.mailbox_archive_folder, + ip_db_path=opts.ip_db_path, + offline=opts.offline, + nameservers=opts.nameservers, + test=opts.mailbox_test, + strip_attachment_payloads=opts.strip_attachment_payloads, + ) - aggregate_reports += reports["aggregate_reports"] - forensic_reports += reports["forensic_reports"] + aggregate_reports += reports["aggregate_reports"] + forensic_reports += reports["forensic_reports"] + + except Exception as error: + logger.error("Mailbox Error: {0}".format(error.__str__())) + exit(1) results = OrderedDict([("aggregate_reports", aggregate_reports), ("forensic_reports", forensic_reports)]) From 754e1d6bc5b1b705f5c123143f508c9d00f9ab08 Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Wed, 6 Apr 2022 11:46:37 -0700 Subject: [PATCH 6/6] remove batch_size from fetch_messages method --- parsedmarc/__init__.py | 4 ++-- parsedmarc/mail/graph.py | 2 +- parsedmarc/mail/imap.py | 2 +- parsedmarc/mail/mailbox_connection.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 5558346..e059798 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1064,7 +1064,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, connection.create_folder(forensic_reports_folder) connection.create_folder(invalid_reports_folder) - messages = connection.fetch_messages(batch_size, reports_folder) + messages = connection.fetch_messages(reports_folder) total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) @@ -1170,7 +1170,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, results = OrderedDict([("aggregate_reports", aggregate_reports), ("forensic_reports", forensic_reports)]) - total_messages = len(connection.fetch_messages(batch_size, reports_folder)) + total_messages = len(connection.fetch_messages(reports_folder)) if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index ba12f25..f718856 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -50,7 +50,7 @@ class MSGraphConnection(MailboxConnection): else: logger.warning(f'Unknown response {resp.status_code} {resp.json()}') - def fetch_messages(self, batch_size: int, folder_name: str) -> List[str]: + def fetch_messages(self, folder_name: str) -> List[str]: """ Returns a list of message UIDs in the specified folder """ folder_id = self._find_folder_id_from_folder_path(folder_name) result = self._client.get(f'/users/{self.mailbox_name}/mailFolders/{folder_id}/messages?$select=id') diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 732cef3..3090b15 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -32,7 +32,7 @@ class IMAPConnection(MailboxConnection): def create_folder(self, folder_name: str): self._client.create_folder(folder_name) - def fetch_messages(self, batch_size, reports_folder: str): + def fetch_messages(self, reports_folder: str): self._client.select_folder(reports_folder) return self._client.search() diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py index f5750f3..ecaa7f4 100644 --- a/parsedmarc/mail/mailbox_connection.py +++ b/parsedmarc/mail/mailbox_connection.py @@ -9,7 +9,7 @@ class MailboxConnection(ABC): def create_folder(self, folder_name: str): raise NotImplementedError - def fetch_messages(self, batch_size: int, reports_folder: str) -> List[str]: + def fetch_messages(self, reports_folder: str) -> List[str]: raise NotImplementedError def fetch_message(self, message_id) -> str: