Added 'since' option to search for messages since a certain time

- Added `since` option under `mailbox` section to search for messages since a certain time instead of going through the complete mailbox during testing scenarios. Acceptable values -`5m|3h|2d|1w`, units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}). Defaults to `1d` if an incorrect value is provided.
    - Not to mark messages as read if test option is selected (works only for MSGraphConnection)
This commit is contained in:
ramspoluri
2024-05-24 20:43:36 +05:30
parent 13ddc26d70
commit f618f69c6c
6 changed files with 116 additions and 25 deletions

View File

@@ -153,6 +153,9 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}).
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
- `port` - int: The IMAP server port (Default: `993`)

View File

@@ -17,7 +17,7 @@ import zlib
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime
from datetime import datetime, timedelta
from io import BytesIO, StringIO
from typing import Callable
@@ -28,7 +28,8 @@ from lxml import etree
from mailsuite.smtp import send_email
from parsedmarc.log import logger
from parsedmarc.mail import MailboxConnection
from parsedmarc.mail import MailboxConnection, IMAPConnection, \
MSGraphConnection, GmailConnection
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email
@@ -1371,6 +1372,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
strip_attachment_payloads=False,
results=None,
batch_size=10,
since=None,
create_folders=True):
"""
Fetches and parses DMARC reports from a mailbox
@@ -1393,6 +1395,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
since: Search for messages since certain time
(units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"})
create_folders (bool): Whether to create the destination folders
(not used in watch)
@@ -1405,6 +1409,9 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
if connection is None:
raise ValueError("Must supply a connection")
# current_time useful to fetch_messages later in the program
current_time = None
aggregate_reports = []
forensic_reports = []
smtp_tls_reports = []
@@ -1428,12 +1435,44 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)
messages = connection.fetch_messages(reports_folder, batch_size=batch_size)
if since:
_since = 1440 # default one day
if re.match(r'\d{1,2}[mhd]$', since):
s = re.split(r'(\d+)', since)
match s[2]:
case 'm': _since = int(s[1])
case 'h': _since = int(s[1])*60
case 'd': _since = int(s[1])*60*24
case 'w': _since = int(s[1])*60*24*7
else:
logger.warning("Incorrect format for \'since\' option. \
Provided value:{0}, Expected values:(5m|3h|2d|1w). \
Ignoring option, fetching messages for last 24hrs"
.format(since))
if isinstance(connection, IMAPConnection):
logger.debug("Only days and weeks values in \'since\' option are \
considered for IMAP conections. Examples: 2d or 1w")
since = (datetime.utcnow() - timedelta(minutes=_since)).date()
current_time = datetime.utcnow().date()
elif isinstance(connection, MSGraphConnection):
since = (datetime.utcnow() - timedelta(minutes=_since)) \
.isoformat() + 'Z'
current_time = datetime.utcnow().isoformat() + 'Z'
elif isinstance(connection, GmailConnection):
since = (datetime.utcnow() - timedelta(minutes=_since)) \
.strftime('%s')
current_time = datetime.utcnow().strftime('%s')
else:
pass
messages = connection.fetch_messages(reports_folder, batch_size=batch_size,
since=since)
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))
if batch_size:
if batch_size and not since:
message_limit = min(total_messages, batch_size)
else:
message_limit = total_messages
@@ -1445,7 +1484,15 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, message_limit, msg_uid
))
msg_content = connection.fetch_message(msg_uid)
if isinstance(mailbox, MSGraphConnection):
if test:
msg_content = connection.fetch_message(msg_uid,
mark_read=False)
else:
msg_content = connection.fetch_message(msg_uid,
mark_read=True)
else:
msg_content = connection.fetch_message(msg_uid)
try:
sa = strip_attachment_payloads
parsed_email = parse_report_email(
@@ -1564,7 +1611,11 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports)])
total_messages = len(connection.fetch_messages(reports_folder))
if current_time:
total_messages = len(connection.fetch_messages(reports_folder,
since=current_time))
else:
total_messages = len(connection.fetch_messages(reports_folder))
if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
@@ -1582,7 +1633,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
always_use_local_files=always_use_local_files,
reverse_dns_map_path=reverse_dns_map_path,
reverse_dns_map_url=reverse_dns_map_url,
offline=offline
offline=offline,
since=current_time,
)
return results

View File

@@ -404,6 +404,7 @@ def _main():
mailbox_test=False,
mailbox_batch_size=10,
mailbox_check_timeout=30,
mailbox_since=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
@@ -585,6 +586,8 @@ def _main():
if "check_timeout" in mailbox_config:
opts.mailbox_check_timeout = mailbox_config.getint(
"check_timeout")
if "since" in mailbox_config:
opts.mailbox_since = mailbox_config["since"]
if "imap" in config.sections():
imap_config = config["imap"]
@@ -1312,6 +1315,7 @@ def _main():
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
)
aggregate_reports += reports["aggregate_reports"]

View File

@@ -67,18 +67,33 @@ class GmailConnection(MailboxConnection):
else:
raise e
def _fetch_all_message_ids(self, reports_label_id, page_token=None):
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
def _fetch_all_message_ids(self, reports_label_id, page_token=None,
since=None):
if since:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
q=f'after:{since}',
)
.execute()
)
else:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
)
.execute()
)
.execute()
)
messages = results.get("messages", [])
for message in messages:
yield message["id"]
@@ -90,7 +105,12 @@ class GmailConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
reports_label_id = self._find_label_id_for_label(reports_folder)
return [id for id in self._fetch_all_message_ids(reports_label_id)]
since = kwargs.get('since')
if since:
return [id for id in self._fetch_all_message_ids(reports_label_id,
since=since)]
else:
return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id):
msg = self.service.users().messages()\

View File

@@ -144,17 +144,22 @@ class MSGraphConnection(MailboxConnection):
folder_id = self._find_folder_id_from_folder_path(folder_name)
url = f'/users/{self.mailbox_name}/mailFolders/' \
f'{folder_id}/messages'
since = kwargs.get('since')
if not since:
since = None
batch_size = kwargs.get('batch_size')
if not batch_size:
batch_size = 0
emails = self._get_all_messages(url, batch_size)
emails = self._get_all_messages(url, batch_size, since)
return [email['id'] for email in emails]
def _get_all_messages(self, url, batch_size):
def _get_all_messages(self, url, batch_size, since):
messages: list
params = {
'$select': 'id'
}
if since:
params['$filter'] = f'receivedDateTime ge {since}'
if batch_size and batch_size > 0:
params['$top'] = batch_size
else:
@@ -165,8 +170,9 @@ class MSGraphConnection(MailboxConnection):
messages = result.json()['value']
# Loop if next page is present and not obtained message limit.
while '@odata.nextLink' in result.json() and (
since is not None or (
batch_size == 0 or
batch_size - len(messages) > 0):
batch_size - len(messages) > 0)):
result = self._client.get(result.json()['@odata.nextLink'])
if result.status_code != 200:
raise RuntimeError(f'Failed to fetch messages {result.text}')
@@ -181,13 +187,15 @@ class MSGraphConnection(MailboxConnection):
raise RuntimeWarning(f"Failed to mark message read"
f"{resp.status_code}: {resp.json()}")
def fetch_message(self, message_id: str):
def fetch_message(self, message_id: str, **kwargs):
url = f'/users/{self.mailbox_name}/messages/{message_id}/$value'
result = self._client.get(url)
if result.status_code != 200:
raise RuntimeWarning(f"Failed to fetch message"
f"{result.status_code}: {result.json()}")
self.mark_message_read(message_id)
mark_read = kwargs.get('mark_read')
if mark_read:
self.mark_message_read(message_id)
return result.text
def delete_message(self, message_id: str):

View File

@@ -31,7 +31,11 @@ class IMAPConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder)
return self._client.search()
since = kwargs.get('since')
if since:
return self._client.search([u'SINCE', since])
else:
return self._client.search()
def fetch_message(self, message_id):
return self._client.fetch_message(message_id, parse=False)