Enhance type hints and argument formatting in multiple files for improved clarity and consistency

This commit is contained in:
Sean Whalen
2025-12-02 17:06:57 -05:00
parent 888d717476
commit d312522ab7
6 changed files with 73 additions and 47 deletions

View File

@@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from base64 import urlsafe_b64decode
from functools import lru_cache
from pathlib import Path

View File

@@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from enum import Enum
from functools import lru_cache
from pathlib import Path

View File

@@ -1,3 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional
from time import sleep
from imapclient.exceptions import IMAPClientError
@@ -11,14 +17,15 @@ from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection):
def __init__(
self,
host=None,
user=None,
password=None,
port=None,
ssl=True,
verify=True,
timeout=30,
max_retries=4,
host: Optional[str] = None,
*,
user: Optional[str] = None,
password: Optional[str] = None,
port: Optional[str] = None,
ssl: Optional[bool] = True,
verify: Optional[bool] = True,
timeout: Optional[int] = 30,
max_retries: Optional[int] = 4,
):
self._username = user
self._password = password
@@ -45,13 +52,13 @@ class IMAPConnection(MailboxConnection):
else:
return self._client.search()
def fetch_message(self, message_id):
def fetch_message(self, message_id: int):
return self._client.fetch_message(message_id, parse=False)
def delete_message(self, message_id: str):
def delete_message(self, message_id: int):
self._client.delete_messages([message_id])
def move_message(self, message_id: str, folder_name: str):
def move_message(self, message_id: int, folder_name: str):
self._client.move_messages([message_id], folder_name)
def keepalive(self):

View File

@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC
from typing import List
class MailboxConnection(ABC):
@@ -10,7 +13,7 @@ class MailboxConnection(ABC):
def create_folder(self, folder_name: str):
raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
def fetch_messages(self, reports_folder: str, **kwargs) -> list[str]:
raise NotImplementedError
def fetch_message(self, message_id) -> str:

View File

@@ -1,3 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional
from time import sleep
from parsedmarc.log import logger
@@ -9,8 +15,8 @@ import os
class MaildirConnection(MailboxConnection):
def __init__(
self,
maildir_path=None,
maildir_create=False,
maildir_path: Optional[bool] = None,
maildir_create: Optional[bool] = False,
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
@@ -36,7 +42,7 @@ class MaildirConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys()
def fetch_message(self, message_id):
def fetch_message(self, message_id: str):
return self._client.get(message_id).as_string()
def delete_message(self, message_id: str):

View File

@@ -109,12 +109,14 @@ def get_base_domain(domain: str) -> str:
return publicsuffix
def query_dns(domain: str,
record_type: str,
*,
cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None,
timeout:int = 2.0) -> list[str]:
def query_dns(
domain: str,
record_type: str,
*,
cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None,
timeout: int = 2.0,
) -> list[str]:
"""
Queries DNS
@@ -175,11 +177,13 @@ def query_dns(domain: str,
return records
def get_reverse_dns(ip_address,
*,
cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None,
timeout:int = 2.0) -> str:
def get_reverse_dns(
ip_address,
*,
cache: Optional[ExpiringDict] = None,
nameservers: list[str] = None,
timeout: int = 2.0,
) -> str:
"""
Resolves an IP address to a hostname using a reverse DNS query
@@ -207,7 +211,7 @@ def get_reverse_dns(ip_address,
return hostname
def timestamp_to_datetime(timestamp: int) -> datetime:
def timestamp_to_datetime(timestamp: int) -> datetime:
"""
Converts a UNIX/DMARC timestamp to a Python ``datetime`` object
@@ -233,9 +237,9 @@ def timestamp_to_human(timestamp: int) -> str:
return timestamp_to_datetime(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def human_timestamp_to_datetime(human_timestamp: str,
*,
to_utc: Optional[bool] = False) -> datetime:
def human_timestamp_to_datetime(
human_timestamp: str, *, to_utc: Optional[bool] = False
) -> datetime:
"""
Converts a human-readable timestamp into a Python ``datetime`` object
@@ -268,9 +272,7 @@ def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
return human_timestamp_to_datetime(human_timestamp).timestamp()
def get_ip_address_country(ip_address:str,
*,
db_path: Optional[str ] = None) -> str:
def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) -> str:
"""
Returns the ISO code for the country associated
with the given IPv4 or IPv6 address
@@ -338,7 +340,7 @@ def get_service_from_reverse_dns_base_domain(
always_use_local_file: Optional[bool] = False,
local_file_path: Optional[bool] = None,
url: Optional[bool] = None,
offline: Optional[bool ] = False,
offline: Optional[bool] = False,
reverse_dns_map: Optional[bool] = None,
) -> str:
"""
@@ -411,15 +413,15 @@ def get_service_from_reverse_dns_base_domain(
def get_ip_address_info(
ip_address,
*,
ip_db_path:Optional[str]=None,
reverse_dns_map_path:Optional[str]=None,
always_use_local_files:Optional[bool]=False,
reverse_dns_map_url: Optional[bool ] = None,
cache: Optional[ExpiringDict]=None,
ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None,
always_use_local_files: Optional[bool] = False,
reverse_dns_map_url: Optional[bool] = None,
cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[bool] = None,
offline: Optional[bool]=False,
nameservers:Optional[list[str]]=None,
timeout:Optional[float]=2.0,
offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None,
timeout: Optional[float] = 2.0,
) -> OrderedDict[str, str]:
"""
Returns reverse DNS and country information for the given IP address
@@ -485,7 +487,7 @@ def get_ip_address_info(
return info
def parse_email_address(original_address: str) -> OrderedDict[str,str]:
def parse_email_address(original_address: str) -> OrderedDict[str, str]:
if original_address[0] == "":
display_name = None
else:
@@ -602,9 +604,9 @@ def convert_outlook_msg(msg_bytes: bytes) -> str:
return rfc822
def parse_email(data: Union[bytes, str],
*,
strip_attachment_payloads: Optional[bool]=False):
def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: Optional[bool] = False
):
"""
A simplified email parser