From 657f34dc2aade617082037e90a7ccaf29a0d5b7f Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Thu, 21 Apr 2022 16:14:34 -0700 Subject: [PATCH] initial pass at integrating gmail with MailboxConnection interface --- README.rst | 7 +- parsedmarc/__init__.py | 215 --------------------------------------- parsedmarc/cli.py | 62 ++++++----- parsedmarc/mail/gmail.py | 84 +++++++++++++++ 4 files changed, 121 insertions(+), 247 deletions(-) create mode 100644 parsedmarc/mail/gmail.py diff --git a/README.rst b/README.rst index b8c51d8..7a9cbd3 100644 --- a/README.rst +++ b/README.rst @@ -150,6 +150,7 @@ For example [mailbox] watch = True + delete = False [elasticsearch] hosts = 127.0.0.1:9200 @@ -171,10 +172,8 @@ For example [gmail_api] credentials_file = /path/to/credentials.json # Get this file from console.google.com. See https://developers.google.com/identity/protocols/oauth2 token_file = /path/to/token.json # This file will be generated automatically - delete = False # Delete reports after successful processing - scopes = https://mail.google.com/ - include_spam_trash=True - reports_label=DMARC + scopes = https://mail.google.com/ + include_spam_trash=True The full set of configuration options are: diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 13e5907..6ae31c6 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1201,221 +1201,6 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, return results -def get_gmail_api_creds(token_file="token.json",credentials_file="credentials.json",scopes=['https://www.googleapis.com/auth/gmail.modify']): - - creds = None - - if os.path.exists(token_file): - creds = Credentials.from_authorized_user_file(token_file, scopes) - # If there are no (valid) credentials available, let the user log in. - if not creds or not creds.valid: - if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - else: - flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes) - creds = flow.run_console() - # Save the credentials for the next run - with open(token_file, 'w') as token: - token.write(creds.to_json()) - return creds - - -def get_dmarc_reports_from_gmail_api(credentials_file=".credentials",token_file=".token", - reports_label="INBOX", archive_label = "DMARC Archive", - offline=False, ip_db_path=None, - scopes=['https://mail.google.com/'], include_spam_trash=False, - nameservers=None, dns_timeout=2.0, - strip_attachment_payloads=False,delete=False, - test=False,parallel=False): - - logger = logging.getLogger("parsedmarc::gmail_api") - - aggregate_reports = [] - forensic_reports = [] - aggregate_report_msg_uids = [] - forensic_report_msg_uids = [] - - creds = get_gmail_api_creds(token_file,credentials_file,scopes) - service = build('gmail', 'v1', credentials=creds) - - results = service.users().labels().list(userId='me').execute() - labels = results.get('labels',[]) - - reports_label_id = None - archive_label_id = None - forensic_label_id = None - aggregate_label_id = None - invalid_label_id = None - - invalid_label = "Invalid" - forensic_label = "Forensic" - aggregate_label = "Aggregate" - - for label in labels: - if reports_label == label['id']: - reports_label_id = label['id'] - reports_label = label['name'] - elif reports_label == label['name']: - reports_label_id = label['id'] - - if archive_label == label['id']: - archive_label_id = label['id'] - archive_label == label['name'] - elif archive_label == label['name']: - archive_label_id = label['id'] - - if invalid_label == label['name']: - invalid_label_id = label['id'] - if forensic_label == label['name']: - forensic_label_id = label['id'] - if aggregate_label == label['name']: - aggregate_label_id = label['id'] - if reports_label_id is None: - logger.debug("Creating label {0} for reports".format(reports_label)) - label = service.users().labels().create(userId='me', - body={'name': reports_label, - 'messageListVisibility': 'show'}).execute() - reports_label_id = label['id'] - - if archive_label_id is None: - logger.debug("Creating label {0} for archive".format(archive_label)) - label = service.users().labels().create(userId='me', - body={'name': archive_label, - 'messageListVisibility': 'show'}).execute() - archive_label_id = label['id'] - - if forensic_label_id is None: - logger.debug("Creating label {0} for forensic reports".format(forensic_label)) - label = service.users().labels().create(userId='me', - body={'name': forensic_label, - 'messageListVisibility': 'show'}).execute() - forensic_label_id = label['id'] - - if aggregate_label_id is None: - logger.debug("Creating label {0} for aggregate reports".format(aggregate_label)) - label = service.users().labels().create(userId='me', - body={'name': aggregate_label, - 'messageListVisibility': 'show'}).execute() - aggregate_label_id = label['id'] - - if invalid_label_id is None: - logger.debug("Creating label {0} for invalid reports".format(invalid_label)) - label = service.users().labels().create(userId='me', - body={'name': invalid_label, - 'messageListVisibility': 'show'}).execute() - invalid_label_id = label['id'] - - results = service.users().messages().list(userId='me', includeSpamTrash=include_spam_trash, - labelIds=[reports_label_id]).execute() - messages = results.get('messages', []) - total_messages = results['resultSizeEstimate'] - - while(messages): - for message in messages: - msg_uid = message['id'] - msg = service.users().messages().get(userId='me',id=msg_uid,format="raw").execute() - - try: - parsed_email = parse_report_email(urlsafe_b64decode(msg['raw']),offline, - ip_db_path,nameservers, - dns_timeout,strip_attachment_payloads, - parallel) - - if parsed_email["report_type"] == "aggregate": - aggregate_reports.append(parsed_email["report"]) - aggregate_report_msg_uids.append(msg_uid) - elif parsed_email["report_type"] == "forensic": - forensic_reports.append(parsed_email["report"]) - forensic_report_msg_uids.append(msg_uid) - - except InvalidDMARCReport as error: - logger.warning(error.__str__()) - if not test: - logger.debug("Moving message UID {0} to {1}".format(msg_uid, invalid_label)) - service.users().messages().modify(userId='me',id=msg_uid, - body={'addLabelIds': [invalid_label_id], "removeLabelIds":[reports_label]}).execute() - - if 'nextPageToken' in results: - results = service.users().messages().list(userId='me',includeSpamTrash=include_spam_trash, - labelIds=[reports_label],nextToken=results['nextPageToken']).execute() - messages = results.get('messages',[]) - total_messages = results['resultSizeEstimate'] - else: - break - - if not test: - if delete: - processed_messages = aggregate_report_msg_uids + \ - forensic_report_msg_uids - - number_of_processed_msgs = len(processed_messages) - for i in range(number_of_processed_msgs): - msg_uid = processed_messages[i] - logger.debug( - "Deleting message {0} of {1}: UID {2}".format( - i + 1, number_of_processed_msgs, msg_uid)) - try: - r = service.users().messages().delete(userId='me',id=msg_uid) - if(r): - raise Exception(r) - except Exception as e: - message = "Error deleting message UID" - e = "{0} {1}: " "{2}".format(message, msg_uid, e) - logger.error("GMail error: {0}".format(e)) - else: - if len(aggregate_report_msg_uids) > 0: - log_message = "Moving aggregate report messages from" - logger.debug( - "{0} {1} to {2}".format( - log_message, reports_label, - aggregate_label)) - number_of_agg_report_msgs = len(aggregate_report_msg_uids) - for i in range(number_of_agg_report_msgs): - msg_uid = aggregate_report_msg_uids[i] - logger.debug( - "Moving message {0} of {1}: UID {2}".format( - i+1, number_of_agg_report_msgs, msg_uid)) - try: - r = service.users().messages().modify(userId="me",id=msg_uid, - body={"addLabelIds": [aggregate_label_id, archive_label_id], - "removeLabelIds":[reports_label_id,'INBOX']}).execute() - if(r): - raise Exception(r) - - except Exception as e: - message = "Error moving message UID" - e = "{0} {1}: {2}".format(message, msg_uid, e) - logger.error("Gmail error: {0}".format(e)) - if len(forensic_report_msg_uids) > 0: - message = "Moving forensic report messages from" - logger.debug( - "{0} {1} to {2}".format(message, - reports_label, - forensic_label)) - number_of_forensic_msgs = len(forensic_report_msg_uids) - for i in range(number_of_forensic_msgs): - msg_uid = forensic_report_msg_uids[i] - message = "Moving message" - logger.debug("{0} {1} of {2}: UID {3}".format( - message, - i + 1, number_of_forensic_msgs, msg_uid)) - try: - r = service.users().messages().modify(userId="me",id=msg_uid, - body={"addLabelIds": [forensic_label_id, archive_label_id], - "removeLabelIds":[reports_label_id,'INBOX']}).execute() - if(r): - raise Exception(r) - except Exception as e: - e = "Error moving message UID {0}: {1}".format( - msg_uid, e) - logger.error("GMail error: {0}".format(e)) - - results = OrderedDict([("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports)]) - - return results - - def watch_inbox(mailbox_connection: MailboxConnection, callback: Callable, reports_folder="INBOX", diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index e4d8c6d..28ea1de 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -20,9 +20,10 @@ from tqdm import tqdm from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \ parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \ splunk, save_output, email_results, ParserError, __version__, \ - InvalidDMARCReport, s3, syslog, get_dmarc_reports_from_gmail_api + InvalidDMARCReport, s3, syslog from parsedmarc.mail import IMAPConnection, MSGraphConnection +from parsedmarc.mail.gmail import GmailConnection from parsedmarc.utils import is_mbox @@ -632,14 +633,14 @@ def _main(): if "gmail_api" in config.sections(): gmail_api_config = config["gmail_api"] - opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file",None) - opts.gmail_api_token_file = gmail_api_config.get("token_file",".token") - opts.gmail_api_reports_label = gmail_api_config.get("reports_label","INBOX") - opts.gmail_api_archive_label = gmail_api_config.get("archive_label","DMARC Archive") - opts.gmail_api_include_spam_trash = gmail_api_config.getboolean("include_spam_trash",False) - opts.gmail_api_scopes = str.split(gmail_api_config.get("scopes","https://www.googleapis.com/auth/gmail.modify"),",") - opts.gmail_api_delete = gmail_api_config.getboolean("delete",None) - opts.gmail_api_test = gmail_api_config.getboolean("test",False) + opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file", None) + opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") + opts.gmail_api_reports_label = gmail_api_config.get("reports_label", "INBOX") + opts.gmail_api_archive_label = gmail_api_config.get("archive_label", "DMARC Archive") + opts.gmail_api_include_spam_trash = gmail_api_config.getboolean("include_spam_trash", False) + opts.gmail_api_scopes = str.split(gmail_api_config.get("scopes", "https://www.googleapis.com/auth/gmail.modify"),",") + opts.gmail_api_delete = gmail_api_config.getboolean("delete", None) + opts.gmail_api_test = gmail_api_config.getboolean("test", False) logger.setLevel(logging.WARNING) @@ -655,7 +656,10 @@ def _main(): fh.setFormatter(formatter) logger.addHandler(fh) - if opts.imap_host is None and opts.graph_user is None and len(opts.file_path) == 0 and opts.gmail_api_credentials_file is None: + if opts.imap_host is None \ + and opts.graph_user is None \ + and opts.gmail_api_credentials_file is None \ + and len(opts.file_path) == 0: logger.error("You must supply input files or a mailbox connection") exit(1) @@ -806,6 +810,26 @@ def _main(): logger.error("MS Graph Error: {0}".format(error.__str__())) exit(1) + if opts.gmail_api_credentials_file: + if opts.gmail_api_delete: + if 'https://mail.google.com/' not in opts.gmail_api_scopes: + logger.error("Message deletion requires scope 'https://mail.google.com/'. " + "Add the scope and remove token file to acquire proper access.") + opts.gmail_api_delete = False + + try: + token_file = opts.gmail_api_token_file or "token.json" + mailbox_connection = GmailConnection( + credentials_file=opts.gmail_api_credentials_file, + token_file=token_file, + scopes=opts.gmail_api_scopes, + include_spam_trash=opts.gmail_api_include_spam_trash, + ) + + except Exception as error: + logger.error("Gmail API Error: {0}".format(error.__str__())) + exit(1) + if mailbox_connection: try: reports = get_dmarc_reports_from_mailbox( @@ -828,24 +852,6 @@ def _main(): logger.error("Mailbox Error: {0}".format(error.__str__())) exit(1) - if opts.gmail_api_credentials_file: - if opts.gmail_api_delete: - if 'https://mail.google.com/' not in opts.gmail_api_scopes: - logger.error("Message deletion requires scope 'https://mail.google.com/'. Add the scope and remove token file to acquire proper access.") - opts.gmail_api_delete = False - - reports = get_dmarc_reports_from_gmail_api(credentials_file=opts.gmail_api_credentials_file,token_file=opts.gmail_api_token_file, - reports_label=opts.gmail_api_reports_label, archive_label=opts.gmail_api_archive_label, - offline=opts.offline, ip_db_path=opts.ip_db_path, - scopes = opts.gmail_api_scopes, include_spam_trash= opts.gmail_api_include_spam_trash, - nameservers=opts.nameservers, dns_timeout=opts.dns_timeout, - strip_attachment_payloads=opts.strip_attachment_payloads, - delete=opts.gmail_api_delete, test = opts.gmail_api_test) - - aggregate_reports += reports["aggregate_reports"] - forensic_reports += reports["forensic_reports"] - - results = OrderedDict([("aggregate_reports", aggregate_reports), ("forensic_reports", forensic_reports)]) diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py new file mode 100644 index 0000000..0c84733 --- /dev/null +++ b/parsedmarc/mail/gmail.py @@ -0,0 +1,84 @@ +import logging +from base64 import urlsafe_b64decode +from pathlib import Path +from time import sleep +from typing import List + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build + +from parsedmarc import MailboxConnection + +logger = logging.getLogger("parsedmarc") + + +def _get_creds(token_file, credentials_file, scopes): + creds = None + + if Path(token_file).exists(): + creds = Credentials.from_authorized_user_file(token_file, scopes) + # If there are no (valid) credentials available, let the user log in. + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes) + creds = flow.run_console() + # Save the credentials for the next run + with open(token_file, 'w') as token: + token.write(creds.to_json()) + return creds + + +class GmailConnection(MailboxConnection): + def __init__(self, token_file, credentials_file, scopes, include_spam_trash): + creds = _get_creds(token_file, credentials_file, scopes) + self.service = build('gmail', 'v1', credentials=creds) + self.include_spam_trash = include_spam_trash + + def create_folder(self, folder_name: str): + logger.debug("Creating label {0}".format(folder_name)) + request_body = {'name': folder_name, 'messageListVisibility': 'show'} + label = self.service.users().labels().create(userId='me', body=request_body).execute() + + def fetch_messages(self, reports_folder: str) -> List[str]: + reports_label_id = self._find_label_id_for_label(reports_folder) + results = self.service.users().messages().list(userId='me', + includeSpamTrash=self.include_spam_trash, + labelIds=[reports_label_id] + ).execute() + messages = results.get('messages', []) + return [message['id'] for message in messages] + + def fetch_message(self, message_id): + msg = self.service.users().messages().get(userId='me', id=message_id, format="raw").execute() + return urlsafe_b64decode(msg['raw']) + + def delete_message(self, message_id: str): + self.service.users().messages().delete(userId='me', id=message_id) + + def move_message(self, message_id: str, folder_name: str): + label_id = self._find_label_id_for_label(folder_name) + logger.debug("Moving message UID {0} to {1}".format(message_id, folder_name)) + request_body = {'addLabelIds': [label_id], "removeLabelIds": [folder_name]} + self.service.users().messages().modify(userId='me', id=message_id, + body=request_body).execute() + + def keepalive(self): + # Not needed + pass + + def watch(self, check_callback, check_timeout): + """ Checks the mailbox for new messages every n seconds""" + while True: + sleep(check_timeout) + check_callback(self) + + def _find_label_id_for_label(self, label_name: str) -> str: + results = self.service.users().labels().list(userId='me').execute() + labels = results.get('labels', []) + for label in labels: + if label_name == label['id'] or label_name == label['name']: + return label['id']