mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-05-12 06:55:25 +00:00
Merge pull request #307 from nathanthorpe/gmail_mailbox_integration
Refactor Gmail integration with MailboxConnection interface
This commit is contained in:
+5
-10
@@ -150,6 +150,7 @@ For example
|
||||
|
||||
[mailbox]
|
||||
watch = True
|
||||
delete = False
|
||||
|
||||
[elasticsearch]
|
||||
hosts = 127.0.0.1:9200
|
||||
@@ -171,10 +172,8 @@ For example
|
||||
[gmail_api]
|
||||
credentials_file = /path/to/credentials.json # Get this file from console.google.com. See https://developers.google.com/identity/protocols/oauth2
|
||||
token_file = /path/to/token.json # This file will be generated automatically
|
||||
delete = False # Delete reports after successful processing
|
||||
scopes = https://mail.google.com/
|
||||
include_spam_trash=True
|
||||
reports_label=DMARC
|
||||
scopes = https://mail.google.com/
|
||||
include_spam_trash=True
|
||||
|
||||
|
||||
The full set of configuration options are:
|
||||
@@ -200,8 +199,8 @@ The full set of configuration options are:
|
||||
Setting this to a number larger than one can improve performance when processing thousands of files
|
||||
|
||||
- ``mailbox``
|
||||
- ``reports_folder`` - str: The mailbox folder where the incoming reports can be found (Default: INBOX)
|
||||
- ``archive_folder`` - str: The mailbox folder to sort processed emails into (Default: Archive)
|
||||
- ``reports_folder`` - str: The mailbox folder (or label for Gmail) where the incoming reports can be found (Default: INBOX)
|
||||
- ``archive_folder`` - str: The mailbox folder (or label for Gmail) to sort processed emails into (Default: Archive)
|
||||
- ``watch`` - bool: Use the IMAP ``IDLE`` command to process messages as they arrive or poll MS Graph for new messages
|
||||
- ``delete`` - bool: Delete messages after processing them, instead of archiving them
|
||||
- ``test`` - bool: Do not move or delete messages
|
||||
@@ -275,12 +274,8 @@ The full set of configuration options are:
|
||||
- ``gmail_api``
|
||||
- ``gmail_api_credentials_file`` - str: Path to file containing the credentials, None to disable (Default: None)
|
||||
- ``gmail_api_token_file`` - str: Path to save the token file (Default: .token)
|
||||
- ``gmail_api_reports_label`` - str: Label to use when searching for reports to parse (Default: INBOX)
|
||||
- ``gmail_api_archive_file`` - str: Label to apply to processed reports (Default: DMARC Archive)
|
||||
- ``gmail_api_include_spam_trash`` - bool: Include messages in Spam and Trash when searching reports (Default: False)
|
||||
- ``gmail_api_scopes`` - str: Comma separated list of scopes to use when acquiring credentials (Default: https://www.googleapis.com/auth/gmail.modify)
|
||||
- ``gmail_api_delete`` - bool: Delete messages after processing them, instead of archiving them (Default: False)
|
||||
- ``gmail_api_test`` - bool: Do not move or delete messages (Default: False)
|
||||
|
||||
.. warning::
|
||||
|
||||
|
||||
+11
-229
@@ -29,14 +29,6 @@ from lxml import etree
|
||||
from mailsuite.smtp import send_email
|
||||
|
||||
from parsedmarc.mail import MailboxConnection
|
||||
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
from base64 import urlsafe_b64decode
|
||||
|
||||
from parsedmarc.utils import get_base_domain, get_ip_address_info
|
||||
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
|
||||
from parsedmarc.utils import parse_email
|
||||
@@ -1042,10 +1034,11 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
|
||||
nameservers (list): A list of DNS nameservers to query
|
||||
dns_timeout (float): Set the DNS query timeout
|
||||
strip_attachment_payloads (bool): Remove attachment payloads from
|
||||
forensic report results
|
||||
forensic report results
|
||||
results (dict): Results from the previous run
|
||||
batch_size (int): Number of messages to read and process before saving
|
||||
create_folders (bool): Whether to create the destination folders (not used in watch)
|
||||
create_folders (bool): Whether to create the destination folders
|
||||
(not used in watch)
|
||||
|
||||
Returns:
|
||||
OrderedDict: Lists of ``aggregate_reports`` and ``forensic_reports``
|
||||
@@ -1093,12 +1086,13 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
|
||||
))
|
||||
msg_content = connection.fetch_message(msg_uid)
|
||||
try:
|
||||
sa = strip_attachment_payloads
|
||||
parsed_email = parse_report_email(msg_content,
|
||||
nameservers=nameservers,
|
||||
dns_timeout=dns_timeout,
|
||||
ip_db_path=ip_db_path,
|
||||
offline=offline,
|
||||
strip_attachment_payloads=strip_attachment_payloads,
|
||||
strip_attachment_payloads=sa,
|
||||
keep_alive=connection.keepalive)
|
||||
if parsed_email["report_type"] == "aggregate":
|
||||
aggregate_reports.append(parsed_email["report"])
|
||||
@@ -1201,221 +1195,6 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
|
||||
return results
|
||||
|
||||
|
||||
def get_gmail_api_creds(token_file="token.json",credentials_file="credentials.json",scopes=['https://www.googleapis.com/auth/gmail.modify']):
|
||||
|
||||
creds = None
|
||||
|
||||
if os.path.exists(token_file):
|
||||
creds = Credentials.from_authorized_user_file(token_file, scopes)
|
||||
# If there are no (valid) credentials available, let the user log in.
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, scopes)
|
||||
creds = flow.run_console()
|
||||
# Save the credentials for the next run
|
||||
with open(token_file, 'w') as token:
|
||||
token.write(creds.to_json())
|
||||
return creds
|
||||
|
||||
|
||||
def get_dmarc_reports_from_gmail_api(credentials_file=".credentials",token_file=".token",
|
||||
reports_label="INBOX", archive_label = "DMARC Archive",
|
||||
offline=False, ip_db_path=None,
|
||||
scopes=['https://mail.google.com/'], include_spam_trash=False,
|
||||
nameservers=None, dns_timeout=2.0,
|
||||
strip_attachment_payloads=False,delete=False,
|
||||
test=False,parallel=False):
|
||||
|
||||
logger = logging.getLogger("parsedmarc::gmail_api")
|
||||
|
||||
aggregate_reports = []
|
||||
forensic_reports = []
|
||||
aggregate_report_msg_uids = []
|
||||
forensic_report_msg_uids = []
|
||||
|
||||
creds = get_gmail_api_creds(token_file,credentials_file,scopes)
|
||||
service = build('gmail', 'v1', credentials=creds)
|
||||
|
||||
results = service.users().labels().list(userId='me').execute()
|
||||
labels = results.get('labels',[])
|
||||
|
||||
reports_label_id = None
|
||||
archive_label_id = None
|
||||
forensic_label_id = None
|
||||
aggregate_label_id = None
|
||||
invalid_label_id = None
|
||||
|
||||
invalid_label = "Invalid"
|
||||
forensic_label = "Forensic"
|
||||
aggregate_label = "Aggregate"
|
||||
|
||||
for label in labels:
|
||||
if reports_label == label['id']:
|
||||
reports_label_id = label['id']
|
||||
reports_label = label['name']
|
||||
elif reports_label == label['name']:
|
||||
reports_label_id = label['id']
|
||||
|
||||
if archive_label == label['id']:
|
||||
archive_label_id = label['id']
|
||||
archive_label == label['name']
|
||||
elif archive_label == label['name']:
|
||||
archive_label_id = label['id']
|
||||
|
||||
if invalid_label == label['name']:
|
||||
invalid_label_id = label['id']
|
||||
if forensic_label == label['name']:
|
||||
forensic_label_id = label['id']
|
||||
if aggregate_label == label['name']:
|
||||
aggregate_label_id = label['id']
|
||||
if reports_label_id is None:
|
||||
logger.debug("Creating label {0} for reports".format(reports_label))
|
||||
label = service.users().labels().create(userId='me',
|
||||
body={'name': reports_label,
|
||||
'messageListVisibility': 'show'}).execute()
|
||||
reports_label_id = label['id']
|
||||
|
||||
if archive_label_id is None:
|
||||
logger.debug("Creating label {0} for archive".format(archive_label))
|
||||
label = service.users().labels().create(userId='me',
|
||||
body={'name': archive_label,
|
||||
'messageListVisibility': 'show'}).execute()
|
||||
archive_label_id = label['id']
|
||||
|
||||
if forensic_label_id is None:
|
||||
logger.debug("Creating label {0} for forensic reports".format(forensic_label))
|
||||
label = service.users().labels().create(userId='me',
|
||||
body={'name': forensic_label,
|
||||
'messageListVisibility': 'show'}).execute()
|
||||
forensic_label_id = label['id']
|
||||
|
||||
if aggregate_label_id is None:
|
||||
logger.debug("Creating label {0} for aggregate reports".format(aggregate_label))
|
||||
label = service.users().labels().create(userId='me',
|
||||
body={'name': aggregate_label,
|
||||
'messageListVisibility': 'show'}).execute()
|
||||
aggregate_label_id = label['id']
|
||||
|
||||
if invalid_label_id is None:
|
||||
logger.debug("Creating label {0} for invalid reports".format(invalid_label))
|
||||
label = service.users().labels().create(userId='me',
|
||||
body={'name': invalid_label,
|
||||
'messageListVisibility': 'show'}).execute()
|
||||
invalid_label_id = label['id']
|
||||
|
||||
results = service.users().messages().list(userId='me', includeSpamTrash=include_spam_trash,
|
||||
labelIds=[reports_label_id]).execute()
|
||||
messages = results.get('messages', [])
|
||||
total_messages = results['resultSizeEstimate']
|
||||
|
||||
while(messages):
|
||||
for message in messages:
|
||||
msg_uid = message['id']
|
||||
msg = service.users().messages().get(userId='me',id=msg_uid,format="raw").execute()
|
||||
|
||||
try:
|
||||
parsed_email = parse_report_email(urlsafe_b64decode(msg['raw']),offline,
|
||||
ip_db_path,nameservers,
|
||||
dns_timeout,strip_attachment_payloads,
|
||||
parallel)
|
||||
|
||||
if parsed_email["report_type"] == "aggregate":
|
||||
aggregate_reports.append(parsed_email["report"])
|
||||
aggregate_report_msg_uids.append(msg_uid)
|
||||
elif parsed_email["report_type"] == "forensic":
|
||||
forensic_reports.append(parsed_email["report"])
|
||||
forensic_report_msg_uids.append(msg_uid)
|
||||
|
||||
except InvalidDMARCReport as error:
|
||||
logger.warning(error.__str__())
|
||||
if not test:
|
||||
logger.debug("Moving message UID {0} to {1}".format(msg_uid, invalid_label))
|
||||
service.users().messages().modify(userId='me',id=msg_uid,
|
||||
body={'addLabelIds': [invalid_label_id], "removeLabelIds":[reports_label]}).execute()
|
||||
|
||||
if 'nextPageToken' in results:
|
||||
results = service.users().messages().list(userId='me',includeSpamTrash=include_spam_trash,
|
||||
labelIds=[reports_label],nextToken=results['nextPageToken']).execute()
|
||||
messages = results.get('messages',[])
|
||||
total_messages = results['resultSizeEstimate']
|
||||
else:
|
||||
break
|
||||
|
||||
if not test:
|
||||
if delete:
|
||||
processed_messages = aggregate_report_msg_uids + \
|
||||
forensic_report_msg_uids
|
||||
|
||||
number_of_processed_msgs = len(processed_messages)
|
||||
for i in range(number_of_processed_msgs):
|
||||
msg_uid = processed_messages[i]
|
||||
logger.debug(
|
||||
"Deleting message {0} of {1}: UID {2}".format(
|
||||
i + 1, number_of_processed_msgs, msg_uid))
|
||||
try:
|
||||
r = service.users().messages().delete(userId='me',id=msg_uid)
|
||||
if(r):
|
||||
raise Exception(r)
|
||||
except Exception as e:
|
||||
message = "Error deleting message UID"
|
||||
e = "{0} {1}: " "{2}".format(message, msg_uid, e)
|
||||
logger.error("GMail error: {0}".format(e))
|
||||
else:
|
||||
if len(aggregate_report_msg_uids) > 0:
|
||||
log_message = "Moving aggregate report messages from"
|
||||
logger.debug(
|
||||
"{0} {1} to {2}".format(
|
||||
log_message, reports_label,
|
||||
aggregate_label))
|
||||
number_of_agg_report_msgs = len(aggregate_report_msg_uids)
|
||||
for i in range(number_of_agg_report_msgs):
|
||||
msg_uid = aggregate_report_msg_uids[i]
|
||||
logger.debug(
|
||||
"Moving message {0} of {1}: UID {2}".format(
|
||||
i+1, number_of_agg_report_msgs, msg_uid))
|
||||
try:
|
||||
r = service.users().messages().modify(userId="me",id=msg_uid,
|
||||
body={"addLabelIds": [aggregate_label_id, archive_label_id],
|
||||
"removeLabelIds":[reports_label_id,'INBOX']}).execute()
|
||||
if(r):
|
||||
raise Exception(r)
|
||||
|
||||
except Exception as e:
|
||||
message = "Error moving message UID"
|
||||
e = "{0} {1}: {2}".format(message, msg_uid, e)
|
||||
logger.error("Gmail error: {0}".format(e))
|
||||
if len(forensic_report_msg_uids) > 0:
|
||||
message = "Moving forensic report messages from"
|
||||
logger.debug(
|
||||
"{0} {1} to {2}".format(message,
|
||||
reports_label,
|
||||
forensic_label))
|
||||
number_of_forensic_msgs = len(forensic_report_msg_uids)
|
||||
for i in range(number_of_forensic_msgs):
|
||||
msg_uid = forensic_report_msg_uids[i]
|
||||
message = "Moving message"
|
||||
logger.debug("{0} {1} of {2}: UID {3}".format(
|
||||
message,
|
||||
i + 1, number_of_forensic_msgs, msg_uid))
|
||||
try:
|
||||
r = service.users().messages().modify(userId="me",id=msg_uid,
|
||||
body={"addLabelIds": [forensic_label_id, archive_label_id],
|
||||
"removeLabelIds":[reports_label_id,'INBOX']}).execute()
|
||||
if(r):
|
||||
raise Exception(r)
|
||||
except Exception as e:
|
||||
e = "Error moving message UID {0}: {1}".format(
|
||||
msg_uid, e)
|
||||
logger.error("GMail error: {0}".format(e))
|
||||
|
||||
results = OrderedDict([("aggregate_reports", aggregate_reports),
|
||||
("forensic_reports", forensic_reports)])
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def watch_inbox(mailbox_connection: MailboxConnection,
|
||||
callback: Callable,
|
||||
reports_folder="INBOX",
|
||||
@@ -1425,7 +1204,8 @@ def watch_inbox(mailbox_connection: MailboxConnection,
|
||||
dns_timeout=6.0, strip_attachment_payloads=False,
|
||||
batch_size=None):
|
||||
"""
|
||||
Watches the mailbox for new messages and sends the results to a callback function
|
||||
Watches the mailbox for new messages and
|
||||
sends the results to a callback function
|
||||
Args:
|
||||
mailbox_connection: The mailbox connection object
|
||||
callback: The callback function to receive the parsing results
|
||||
@@ -1446,6 +1226,7 @@ def watch_inbox(mailbox_connection: MailboxConnection,
|
||||
"""
|
||||
|
||||
def check_callback(connection):
|
||||
sa = strip_attachment_payloads
|
||||
res = get_dmarc_reports_from_mailbox(connection=connection,
|
||||
reports_folder=reports_folder,
|
||||
archive_folder=archive_folder,
|
||||
@@ -1455,12 +1236,13 @@ def watch_inbox(mailbox_connection: MailboxConnection,
|
||||
offline=offline,
|
||||
nameservers=nameservers,
|
||||
dns_timeout=dns_timeout,
|
||||
strip_attachment_payloads=strip_attachment_payloads,
|
||||
strip_attachment_payloads=sa,
|
||||
batch_size=batch_size,
|
||||
create_folders=False)
|
||||
callback(res)
|
||||
|
||||
mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout)
|
||||
mailbox_connection.watch(check_callback=check_callback,
|
||||
check_timeout=check_timeout)
|
||||
|
||||
|
||||
def save_output(results, output_directory="output",
|
||||
|
||||
+42
-47
@@ -20,9 +20,9 @@ from tqdm import tqdm
|
||||
from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \
|
||||
parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \
|
||||
splunk, save_output, email_results, ParserError, __version__, \
|
||||
InvalidDMARCReport, s3, syslog, get_dmarc_reports_from_gmail_api
|
||||
InvalidDMARCReport, s3, syslog
|
||||
|
||||
from parsedmarc.mail import IMAPConnection, MSGraphConnection
|
||||
from parsedmarc.mail import IMAPConnection, MSGraphConnection, GmailConnection
|
||||
|
||||
from parsedmarc.utils import is_mbox
|
||||
|
||||
@@ -240,7 +240,7 @@ def _main():
|
||||
|
||||
args = arg_parser.parse_args()
|
||||
|
||||
gmail_api_scopes = ['https://www.googleapis.com/auth/gmail.modify']
|
||||
default_gmail_api_scope = 'https://www.googleapis.com/auth/gmail.modify'
|
||||
|
||||
opts = Namespace(file_path=args.file_path,
|
||||
config_file=args.config_file,
|
||||
@@ -312,18 +312,14 @@ def _main():
|
||||
s3_path=None,
|
||||
syslog_server=None,
|
||||
syslog_port=None,
|
||||
gmail_api_credentials_file =None,
|
||||
gmail_api_credentials_file=None,
|
||||
gmail_api_token_file=None,
|
||||
gmail_api_reports_label='INBOX',
|
||||
gmail_api_archive_label='DMARC Archive',
|
||||
gmail_api_include_spam_trash=False,
|
||||
gmail_api_scopes=gmail_api_scopes,
|
||||
gmail_api_delete=False,
|
||||
gmail_api_test=False,
|
||||
gmail_api_scopes=[],
|
||||
log_file=args.log_file,
|
||||
n_procs=1,
|
||||
chunk_size=1,
|
||||
ip_db_path = None
|
||||
ip_db_path=None
|
||||
)
|
||||
args = arg_parser.parse_args()
|
||||
|
||||
@@ -632,19 +628,18 @@ def _main():
|
||||
opts.syslog_port = syslog_config["port"]
|
||||
else:
|
||||
opts.syslog_port = 514
|
||||
|
||||
|
||||
if "gmail_api" in config.sections():
|
||||
gmail_api_config = config["gmail_api"]
|
||||
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file",None)
|
||||
opts.gmail_api_token_file = gmail_api_config.get("token_file",".token")
|
||||
opts.gmail_api_reports_label = gmail_api_config.get("reports_label","INBOX")
|
||||
opts.gmail_api_archive_label = gmail_api_config.get("archive_label","DMARC Archive")
|
||||
opts.gmail_api_include_spam_trash = gmail_api_config.getboolean("include_spam_trash",False)
|
||||
opts.gmail_api_scopes = str.split(gmail_api_config.get("scopes",
|
||||
"https://www.googleapis.com/auth/gmail.modify"),
|
||||
",")
|
||||
opts.gmail_api_delete = gmail_api_config.getboolean("delete", None)
|
||||
opts.gmail_api_test = gmail_api_config.getboolean("test", False)
|
||||
opts.gmail_api_credentials_file = \
|
||||
gmail_api_config.get("credentials_file")
|
||||
opts.gmail_api_token_file = \
|
||||
gmail_api_config.get("token_file", ".token")
|
||||
opts.gmail_api_include_spam_trash = \
|
||||
gmail_api_config.getboolean("include_spam_trash", False)
|
||||
opts.gmail_api_scopes = \
|
||||
gmail_api_config.get("scopes",
|
||||
default_gmail_api_scope).split(',')
|
||||
|
||||
logger.setLevel(logging.WARNING)
|
||||
|
||||
@@ -660,8 +655,10 @@ def _main():
|
||||
fh.setFormatter(formatter)
|
||||
logger.addHandler(fh)
|
||||
|
||||
if opts.imap_host is None and opts.graph_user is None and len(opts.file_path) == 0 \
|
||||
and opts.gmail_api_credentials_file is None:
|
||||
if opts.imap_host is None \
|
||||
and opts.graph_user is None \
|
||||
and opts.gmail_api_credentials_file is None \
|
||||
and len(opts.file_path) == 0:
|
||||
logger.error("You must supply input files or a mailbox connection")
|
||||
exit(1)
|
||||
|
||||
@@ -812,6 +809,28 @@ def _main():
|
||||
logger.error("MS Graph Error: {0}".format(error.__str__()))
|
||||
exit(1)
|
||||
|
||||
if opts.gmail_api_credentials_file:
|
||||
if opts.mailbox_delete:
|
||||
if 'https://mail.google.com/' not in opts.gmail_api_scopes:
|
||||
logger.error("Message deletion requires scope"
|
||||
" 'https://mail.google.com/'. "
|
||||
"Add the scope and remove token file "
|
||||
"to acquire proper access.")
|
||||
opts.mailbox_delete = False
|
||||
|
||||
try:
|
||||
mailbox_connection = GmailConnection(
|
||||
credentials_file=opts.gmail_api_credentials_file,
|
||||
token_file=opts.gmail_api_token_file,
|
||||
scopes=opts.gmail_api_scopes,
|
||||
include_spam_trash=opts.gmail_api_include_spam_trash,
|
||||
reports_folder=opts.mailbox_reports_folder
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error("Gmail API Error: {0}".format(error.__str__()))
|
||||
exit(1)
|
||||
|
||||
if mailbox_connection:
|
||||
try:
|
||||
reports = get_dmarc_reports_from_mailbox(
|
||||
@@ -834,30 +853,6 @@ def _main():
|
||||
logger.error("Mailbox Error: {0}".format(error.__str__()))
|
||||
exit(1)
|
||||
|
||||
if opts.gmail_api_credentials_file:
|
||||
if opts.gmail_api_delete:
|
||||
if 'https://mail.google.com/' not in opts.gmail_api_scopes:
|
||||
logger.error("Message deletion requires scope 'https://mail.google.com/'. "
|
||||
"Add the scope and remove token file to acquire proper access.")
|
||||
opts.gmail_api_delete = False
|
||||
|
||||
reports = get_dmarc_reports_from_gmail_api(credentials_file=opts.gmail_api_credentials_file,
|
||||
token_file=opts.gmail_api_token_file,
|
||||
reports_label=opts.gmail_api_reports_label,
|
||||
archive_label=opts.gmail_api_archive_label,
|
||||
offline=opts.offline,
|
||||
ip_db_path=opts.ip_db_path,
|
||||
scopes=opts.gmail_api_scopes,
|
||||
include_spam_trash=opts.gmail_api_include_spam_trash,
|
||||
nameservers=opts.nameservers,
|
||||
dns_timeout=opts.dns_timeout,
|
||||
strip_attachment_payloads=opts.strip_attachment_payloads,
|
||||
delete=opts.gmail_api_delete,
|
||||
test=opts.gmail_api_test)
|
||||
|
||||
aggregate_reports += reports["aggregate_reports"]
|
||||
forensic_reports += reports["forensic_reports"]
|
||||
|
||||
results = OrderedDict([("aggregate_reports", aggregate_reports),
|
||||
("forensic_reports", forensic_reports)])
|
||||
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
from parsedmarc.mail.mailbox_connection import MailboxConnection
|
||||
from parsedmarc.mail.graph import MSGraphConnection
|
||||
from parsedmarc.mail.gmail import GmailConnection
|
||||
from parsedmarc.mail.imap import IMAPConnection
|
||||
|
||||
__all__ = ["MailboxConnection",
|
||||
"MSGraphConnection",
|
||||
"GmailConnection",
|
||||
"IMAPConnection"]
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import logging
|
||||
from base64 import urlsafe_b64decode
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from typing import List
|
||||
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
from parsedmarc.mail.mailbox_connection import MailboxConnection
|
||||
|
||||
logger = logging.getLogger("parsedmarc")
|
||||
|
||||
|
||||
def _get_creds(token_file, credentials_file, scopes):
|
||||
creds = None
|
||||
|
||||
if Path(token_file).exists():
|
||||
creds = Credentials.from_authorized_user_file(token_file, scopes)
|
||||
|
||||
# If there are no (valid) credentials available, let the user log in.
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(
|
||||
credentials_file, scopes)
|
||||
creds = flow.run_local_server(open_browser=False)
|
||||
# Save the credentials for the next run
|
||||
with Path(token_file).open('w') as token:
|
||||
token.write(creds.to_json())
|
||||
return creds
|
||||
|
||||
|
||||
class GmailConnection(MailboxConnection):
|
||||
def __init__(self,
|
||||
token_file: str,
|
||||
credentials_file: str,
|
||||
scopes: List[str],
|
||||
include_spam_trash: bool,
|
||||
reports_folder: str):
|
||||
creds = _get_creds(token_file, credentials_file, scopes)
|
||||
self.service = build('gmail', 'v1', credentials=creds)
|
||||
self.include_spam_trash = include_spam_trash
|
||||
self.reports_label_id = self._find_label_id_for_label(reports_folder)
|
||||
|
||||
def create_folder(self, folder_name: str):
|
||||
# Gmail doesn't support the name Archive
|
||||
if folder_name == 'Archive':
|
||||
return
|
||||
|
||||
logger.debug(f"Creating label {folder_name}")
|
||||
request_body = {'name': folder_name, 'messageListVisibility': 'show'}
|
||||
try:
|
||||
self.service.users().labels()\
|
||||
.create(userId='me', body=request_body).execute()
|
||||
except HttpError as e:
|
||||
if e.status_code == 409:
|
||||
logger.debug(f'Folder {folder_name} already exists, '
|
||||
f'skipping creation')
|
||||
else:
|
||||
raise e
|
||||
|
||||
def fetch_messages(self, reports_folder: str) -> List[str]:
|
||||
reports_label_id = self._find_label_id_for_label(reports_folder)
|
||||
results = self.service.users().messages()\
|
||||
.list(userId='me',
|
||||
includeSpamTrash=self.include_spam_trash,
|
||||
labelIds=[reports_label_id]
|
||||
)\
|
||||
.execute()
|
||||
messages = results.get('messages', [])
|
||||
return [message['id'] for message in messages]
|
||||
|
||||
def fetch_message(self, message_id):
|
||||
msg = self.service.users().messages()\
|
||||
.get(userId='me',
|
||||
id=message_id,
|
||||
format="raw"
|
||||
)\
|
||||
.execute()
|
||||
return urlsafe_b64decode(msg['raw'])
|
||||
|
||||
def delete_message(self, message_id: str):
|
||||
self.service.users().messages().delete(userId='me', id=message_id)
|
||||
|
||||
def move_message(self, message_id: str, folder_name: str):
|
||||
label_id = self._find_label_id_for_label(folder_name)
|
||||
logger.debug(f"Moving message UID {message_id} to {folder_name}")
|
||||
request_body = {
|
||||
'addLabelIds': [label_id],
|
||||
'removeLabelIds': [self.reports_label_id]
|
||||
}
|
||||
self.service.users().messages()\
|
||||
.modify(userId='me',
|
||||
id=message_id,
|
||||
body=request_body)\
|
||||
.execute()
|
||||
|
||||
def keepalive(self):
|
||||
# Not needed
|
||||
pass
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
""" Checks the mailbox for new messages every n seconds"""
|
||||
while True:
|
||||
sleep(check_timeout)
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=10)
|
||||
def _find_label_id_for_label(self, label_name: str) -> str:
|
||||
results = self.service.users().labels().list(userId='me').execute()
|
||||
labels = results.get('labels', [])
|
||||
for label in labels:
|
||||
if label_name == label['id'] or label_name == label['name']:
|
||||
return label['id']
|
||||
+33
-17
@@ -35,45 +35,56 @@ class MSGraphConnection(MailboxConnection):
|
||||
if len(path_parts) > 1: # Folder is a subFolder
|
||||
parent_folder_id = None
|
||||
for folder in path_parts[:-1]:
|
||||
parent_folder_id = self._find_folder_id_with_parent(folder, parent_folder_id)
|
||||
parent_folder_id = self._find_folder_id_with_parent(
|
||||
folder, parent_folder_id)
|
||||
sub_url = f'/{parent_folder_id}/childFolders'
|
||||
folder_name = path_parts[-1]
|
||||
|
||||
request_body = {
|
||||
'displayName': folder_name
|
||||
}
|
||||
resp = self._client.post(f'/users/{self.mailbox_name}/mailFolders{sub_url}', json=request_body)
|
||||
request_url = f'/users/{self.mailbox_name}/mailFolders{sub_url}'
|
||||
resp = self._client.post(request_url, json=request_body)
|
||||
if resp.status_code == 409:
|
||||
logger.debug(f'Folder {folder_name} already exists, skipping creation')
|
||||
logger.debug(f'Folder {folder_name} already exists, '
|
||||
f'skipping creation')
|
||||
elif resp.status_code == 201:
|
||||
logger.debug(f'Created folder {folder_name}')
|
||||
else:
|
||||
logger.warning(f'Unknown response {resp.status_code} {resp.json()}')
|
||||
logger.warning(f'Unknown response '
|
||||
f'{resp.status_code} {resp.json()}')
|
||||
|
||||
def fetch_messages(self, folder_name: str) -> List[str]:
|
||||
""" Returns a list of message UIDs in the specified folder """
|
||||
folder_id = self._find_folder_id_from_folder_path(folder_name)
|
||||
result = self._client.get(f'/users/{self.mailbox_name}/mailFolders/{folder_id}/messages?$select=id')
|
||||
url = f'/users/{self.mailbox_name}/mailFolders/' \
|
||||
f'{folder_id}/messages?$select=id'
|
||||
result = self._client.get(url)
|
||||
emails = result.json()['value']
|
||||
return [email['id'] for email in emails]
|
||||
|
||||
def fetch_message(self, message_id: str):
|
||||
result = self._client.get(f'/users/{self.mailbox_name}/messages/{message_id}/$value')
|
||||
url = f'/users/{self.mailbox_name}/messages/{message_id}/$value'
|
||||
result = self._client.get(url)
|
||||
return result.text
|
||||
|
||||
def delete_message(self, message_id: str):
|
||||
resp = self._client.delete(f'/users/{self.mailbox_name}/messages/{message_id}')
|
||||
url = f'/users/{self.mailbox_name}/messages/{message_id}'
|
||||
resp = self._client.delete(url)
|
||||
if resp.status_code != 204:
|
||||
raise RuntimeWarning(f"Failed to delete message {resp.status_code}: {resp.json()}")
|
||||
raise RuntimeWarning(f"Failed to delete message "
|
||||
f"{resp.status_code}: {resp.json()}")
|
||||
|
||||
def move_message(self, message_id: str, folder_name: str):
|
||||
folder_id = self._find_folder_id_from_folder_path(folder_name)
|
||||
request_body = {
|
||||
'destinationId': folder_id
|
||||
}
|
||||
resp = self._client.post(f'/users/{self.mailbox_name}/messages/{message_id}/move', json=request_body)
|
||||
url = f'/users/{self.mailbox_name}/messages/{message_id}/move'
|
||||
resp = self._client.post(url, json=request_body)
|
||||
if resp.status_code != 201:
|
||||
raise RuntimeWarning(f"Failed to move message {resp.status_code}: {resp.json()}")
|
||||
raise RuntimeWarning(f"Failed to move message "
|
||||
f"{resp.status_code}: {resp.json()}")
|
||||
|
||||
def keepalive(self):
|
||||
# Not needed
|
||||
@@ -85,27 +96,32 @@ class MSGraphConnection(MailboxConnection):
|
||||
sleep(check_timeout)
|
||||
check_callback(self)
|
||||
|
||||
@lru_cache(maxsize=100)
|
||||
@lru_cache(maxsize=10)
|
||||
def _find_folder_id_from_folder_path(self, folder_name: str) -> str:
|
||||
path_parts = folder_name.split('/')
|
||||
parent_folder_id = None
|
||||
if len(path_parts) > 1:
|
||||
for folder in path_parts[:-1]:
|
||||
folder_id = self._find_folder_id_with_parent(folder, parent_folder_id)
|
||||
folder_id = self._find_folder_id_with_parent(
|
||||
folder, parent_folder_id)
|
||||
parent_folder_id = folder_id
|
||||
return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id)
|
||||
return self._find_folder_id_with_parent(
|
||||
path_parts[-1], parent_folder_id)
|
||||
else:
|
||||
return self._find_folder_id_with_parent(folder_name, None)
|
||||
|
||||
def _find_folder_id_with_parent(self, folder_name: str, parent_folder_id: Optional[str]):
|
||||
def _find_folder_id_with_parent(self,
|
||||
folder_name: str,
|
||||
parent_folder_id: Optional[str]):
|
||||
sub_url = ''
|
||||
if parent_folder_id is not None:
|
||||
sub_url = f'/{parent_folder_id}/childFolders'
|
||||
folders_resp = self._client.get(f'/users/{self.mailbox_name}/mailFolders{sub_url}')
|
||||
url = f'/users/{self.mailbox_name}/mailFolders{sub_url}'
|
||||
folders_resp = self._client.get(url)
|
||||
folders = folders_resp.json()['value']
|
||||
matched_folders = [folder for folder in folders if folder['displayName'] == folder_name]
|
||||
matched_folders = [folder for folder in folders
|
||||
if folder['displayName'] == folder_name]
|
||||
if len(matched_folders) == 0:
|
||||
raise RuntimeError(f"folder {folder_name} not found")
|
||||
selected_folder = matched_folders[0]
|
||||
return selected_folder['id']
|
||||
|
||||
|
||||
+10
-3
@@ -49,11 +49,18 @@ class IMAPConnection(MailboxConnection):
|
||||
self._client.noop()
|
||||
|
||||
def watch(self, check_callback, check_timeout):
|
||||
""" Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function"""
|
||||
"""
|
||||
Use an IDLE IMAP connection to parse incoming emails,
|
||||
and pass the results to a callback function
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
IMAPClient(host=self._client.host, username=self._username, password=self._password,
|
||||
port=self._client.port, ssl=self._client.ssl, verify=self._verify,
|
||||
IMAPClient(host=self._client.host,
|
||||
username=self._username,
|
||||
password=self._password,
|
||||
port=self._client.port,
|
||||
ssl=self._client.ssl,
|
||||
verify=self._verify,
|
||||
idle_callback=check_callback,
|
||||
idle_timeout=check_timeout)
|
||||
except (timeout, IMAPClientError):
|
||||
|
||||
Reference in New Issue
Block a user