Better typing

This commit is contained in:
Sean Whalen
2025-12-29 17:14:54 -05:00
parent 34fa0c145d
commit 1f3a1fc843

View File

@@ -46,6 +46,7 @@ from parsedmarc.mail import (
MSGraphConnection, MSGraphConnection,
) )
from parsedmarc.mail.graph import AuthMethod from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a # Increase the max header limit for very large emails. `_MAXHEADERS` is a
@@ -71,25 +72,22 @@ def _configure_logging(log_level, log_file=None):
""" """
Configure logging for the current process. Configure logging for the current process.
This is needed for child processes to properly log messages. This is needed for child processes to properly log messages.
Args: Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING) log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file log_file: Optional path to log file
""" """
# Get the logger # Get the logger
from parsedmarc.log import logger from parsedmarc.log import logger
# Set the log level # Set the log level
logger.setLevel(log_level) logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present # Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates # Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass # Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any( has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
type(h) is logging.StreamHandler
for h in logger.handlers
)
if not has_stream_handler: if not has_stream_handler:
formatter = logging.Formatter( formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s", fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
@@ -98,7 +96,7 @@ def _configure_logging(log_level, log_file=None):
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Add FileHandler if log_file is specified # Add FileHandler if log_file is specified
if log_file: if log_file:
try: try:
@@ -128,7 +126,7 @@ def cli_parse(
log_file=None, log_file=None,
): ):
"""Separated this function for multiprocessing """Separated this function for multiprocessing
Args: Args:
file_path: Path to the report file file_path: Path to the report file
sa: Strip attachment payloads flag sa: Strip attachment payloads flag
@@ -146,7 +144,7 @@ def cli_parse(
""" """
# Configure logging in this child process # Configure logging in this child process
_configure_logging(log_level, log_file) _configure_logging(log_level, log_file)
try: try:
file_results = parse_report_file( file_results = parse_report_file(
file_path, file_path,
@@ -1760,13 +1758,13 @@ def _main():
logger.exception("Mailbox Error") logger.exception("Mailbox Error")
exit(1) exit(1)
results = { parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports, "aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports, "forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports, "smtp_tls_reports": smtp_tls_reports,
} }
process_reports(results) process_reports(parsing_results)
if opts.smtp_host: if opts.smtp_host:
try: try:
@@ -1780,7 +1778,7 @@ def _main():
else _str_to_list(str(opts.smtp_to)) else _str_to_list(str(opts.smtp_to))
) )
email_results( email_results(
results, parsing_results,
opts.smtp_host, opts.smtp_host,
opts.smtp_from, opts.smtp_from,
smtp_to_value, smtp_to_value,