Files
parsedmarc/parsedmarc/cli.py
T
Casper Biering d3510da3a6 feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI (#794)
* feat: graceful SIGTERM/SIGINT shutdown for watch mode and one-shot CLI

Previously SIGTERM (systemctl stop, docker stop, Kubernetes pod termination)
killed parsedmarc mid-batch, tearing output writes and silently dropping
buffered Kafka records. Shutdown is now cooperative:

- SIGTERM/SIGINT set a flag that is polled at safe boundaries. The one-shot
  CLI checks it between batches; watch mode passes it as `config_reloading` so
  the mailbox backend -- including the IMAP IDLE loop -- returns once the
  current batch is fully processed. Either way the in-flight batch and its
  output writes finish before the process exits 0.
- Ctrl-C is a double-tap: the first press is graceful, the second
  short-circuits to os._exit(130).
- Output clients are now closed on every exit path (atexit plus a trailing
  close in _main), fixing a long-standing leak where one-shot runs and
  graceful shutdowns never flushed Kafka / closed Elasticsearch / S3 / etc.

Docs: the example systemd unit gains KillSignal=SIGTERM and TimeoutStopSec=60
(keep it above mailbox_check_timeout). Tests cover watch shutdown, the one-shot
between-batch stop, the SIGINT double-tap, and the output-client-close leak.

* test: cover the one-shot mbox-loop shutdown break

Extend the one-shot SIGTERM test to also pass an .mbox path so a single
run exercises both shutdown checkpoints: the file-batch loop break and the
subsequent mbox loop break (which Codecov flagged as the only uncovered
lines on PR #794). is_mbox is keyed by suffix and get_dmarc_reports_from_mbox
is asserted not called, since the mbox loop breaks before reaching it.

* test: narrow signal.getsignal() return before invoking in SIGINT test

signal.getsignal() is typed Callable | int | Handlers | None; calling it
directly fails pyright's callable check. Assert callable() first.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <44679+seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 20:00:32 -04:00

2678 lines
106 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""A CLI for parsing DMARC reports"""
import atexit
import http.client
import json
import logging
import os
import signal
import sys
import time
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
__version__,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
postgres,
s3,
save_output,
splunk,
syslog,
watch_inbox,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
AuthMethod,
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
)
from parsedmarc.types import ParsingResults
from parsedmarc.utils import (
InvalidIPinfoAPIKey,
configure_ipinfo_api,
get_base_domain,
get_reverse_dns,
is_mbox,
load_ip_db,
load_psl_overrides,
load_reverse_dns_map,
)
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
class ConfigurationError(Exception):
"""Raised when a configuration file has missing or invalid settings."""
pass
def _str_to_list(s):
"""Converts a comma separated string to a list"""
_list = s.split(",")
return list(map(lambda i: i.lstrip(), _list))
def _expand_path(p: str) -> str:
"""Expand ``~`` and ``$VAR`` references in a file path."""
return os.path.expanduser(os.path.expandvars(p))
def _expand_file_path_args(paths: list[str]) -> list[str]:
"""Expand CLI file-path arguments into a flat list of file paths.
A path that already exists on disk is taken literally; only a
non-existent path is treated as a glob pattern. This preserves
shell-style wildcard expansion (e.g. a quoted ``samples/*.xml``) while
ensuring that literal filenames containing glob metacharacters
(``[``, ``]``, ``*``, ``?``) are not silently dropped. Emailed DMARC
failure reports are frequently named like
``[Provider DMARC Failure Report] Subject.eml``; ``glob()`` treats the
brackets as a character class, matches nothing, and drops the file
(see <https://docs.python.org/3/library/glob.html>).
"""
expanded: list[str] = []
for path in paths:
if os.path.exists(path):
expanded.append(path)
else:
expanded += glob(path)
return expanded
# All known INI config section names, used for env var resolution.
_KNOWN_SECTIONS = frozenset(
{
"general",
"mailbox",
"imap",
"msgraph",
"elasticsearch",
"opensearch",
"splunk_hec",
"kafka",
"smtp",
"s3",
"postgresql",
"syslog",
"gmail_api",
"maildir",
"log_analytics",
"gelf",
"webhook",
}
)
# Short aliases that don't follow the PARSEDMARC_{SECTION}_{KEY} pattern.
_ENV_ALIASES: dict[str, tuple[str, str]] = {
"DEBUG": ("general", "debug"),
"PARSEDMARC_DEBUG": ("general", "debug"),
}
# Real config keys whose own names end in ``_file``. For these the
# ``PARSEDMARC_..._FILE`` env var is the direct value (a path string),
# not a Docker-secret file reference. Keep in sync with ``_parse_config``
# whenever a new ``*_file`` config key is added.
_DIRECT_FILE_KEYS = frozenset(
[
"GENERAL_LOG_FILE",
"MSGRAPH_TOKEN_FILE",
"GMAIL_API_CREDENTIALS_FILE",
"GMAIL_API_TOKEN_FILE",
]
)
def _resolve_section_key(suffix: str) -> tuple:
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
Uses longest-prefix matching against known section names so that
multi-word sections like ``splunk_hec`` are handled correctly.
Returns ``(None, None)`` when no known section matches.
"""
suffix_lower = suffix.lower()
best_section = None
best_key = None
for section in _KNOWN_SECTIONS:
section_prefix = section + "_"
if suffix_lower.startswith(section_prefix):
key = suffix_lower[len(section_prefix) :]
if key and (best_section is None or len(section) > len(best_section)):
best_section = section
best_key = key
return best_section, best_key
def _read_secret_file(env_key: str, raw_path: str) -> str:
"""Read a Docker-secret file referenced by a ``PARSEDMARC_..._FILE`` env var.
Strips any trailing CR/LF from the file contents. Raises
``ConfigurationError`` (not a silent fallback) when the file is missing,
unreadable, or not valid UTF-8.
"""
path = _expand_path(raw_path)
try:
with open(path, encoding="utf-8") as f:
return f.read().rstrip("\r\n")
except (OSError, UnicodeDecodeError) as exc:
raise ConfigurationError(
"Cannot read secret file for {0}: {1} ({2})".format(
env_key, path, exc.__class__.__name__
)
) from exc
def _apply_env_overrides(config: ConfigParser) -> None:
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
(or create) the corresponding config-file values. Sections are created
automatically when they do not yet exist.
A ``PARSEDMARC_{SECTION}_{KEY}_FILE`` variant reads the value from the
referenced file (Docker / Kubernetes secret convention). When both the
direct variable and its ``_FILE`` companion are set, the file wins. The
handful of real config keys whose own names end in ``_file`` (see
``_DIRECT_FILE_KEYS``) keep their pre-existing direct-value semantics
and are not eligible for the secret-file wrap.
"""
prefix = "PARSEDMARC_"
file_suffix = "_FILE"
direct: dict[tuple[str, str], str] = {}
secrets: dict[tuple[str, str], str] = {}
for env_key, value in os.environ.items():
if env_key == "PARSEDMARC_CONFIG_FILE":
continue
if env_key in _ENV_ALIASES:
direct[_ENV_ALIASES[env_key]] = value
continue
if not env_key.startswith(prefix):
continue
key_body = env_key[len(prefix) :]
is_secret = key_body.endswith(file_suffix) and key_body not in _DIRECT_FILE_KEYS
if is_secret:
section, key = _resolve_section_key(key_body[: -len(file_suffix)])
else:
section, key = _resolve_section_key(key_body)
if section is None:
logger.debug("Ignoring unrecognized env var: %s", env_key)
continue
if is_secret:
value = _read_secret_file(env_key, value)
secrets[(section, key)] = value
else:
direct[(section, key)] = value
# _FILE entries win over direct ones: dict-unpack lets later mappings overwrite.
for (section, key), value in {**direct, **secrets}.items():
if not config.has_section(section):
config.add_section(section)
config.set(section, key, value)
logger.debug("Config override from env: [%s] %s", section, key)
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
nameservers,
dns_timeout,
dns_retries,
ip_db_path,
offline,
always_use_local_files,
reverse_dns_map_path,
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
dns_retries: Number of DNS retries on transient errors
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
try:
file_results = parse_report_file(
file_path,
ip_db_path=ip_db_path,
offline=offline,
always_use_local_files=always_use_local_files,
reverse_dns_map_path=reverse_dns_map_path,
reverse_dns_map_url=reverse_dns_map_url,
nameservers=nameservers,
dns_timeout=dns_timeout,
dns_retries=dns_retries,
strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
conn.send([file_results, file_path])
except ParserError as error:
conn.send([error, file_path])
finally:
conn.close()
def _load_config(config_file: str | None = None) -> ConfigParser:
"""Load configuration from an INI file and/or environment variables.
Args:
config_file: Optional path to an .ini config file.
Returns:
A ``ConfigParser`` populated from the file (if given) and from any
``PARSEDMARC_*`` environment variables.
Raises:
ConfigurationError: If *config_file* is given but does not exist.
"""
config = ConfigParser(interpolation=None)
if config_file is not None:
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
if not os.access(abs_path, os.R_OK):
raise ConfigurationError(
"Unable to read {0} — check file permissions".format(abs_path)
)
config.read(config_file)
_apply_env_overrides(config)
return config
def _parse_config(config: ConfigParser, opts):
"""Apply a loaded ``ConfigParser`` to *opts* in place.
Args:
config: A ``ConfigParser`` (from ``_load_config``).
opts: Namespace object to update with parsed values.
Returns:
index_prefix_domain_map or None
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
opts.silent = True
index_prefix_domain_map = None
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool(
general_config.getboolean("strip_attachment_payloads")
)
if "output" in general_config:
opts.output = _expand_path(general_config["output"])
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "failure_json_filename" in general_config:
opts.failure_json_filename = general_config["failure_json_filename"]
elif "forensic_json_filename" in general_config:
opts.failure_json_filename = general_config["forensic_json_filename"]
if "smtp_tls_json_filename" in general_config:
opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"]
if "aggregate_csv_filename" in general_config:
opts.aggregate_csv_filename = general_config["aggregate_csv_filename"]
if "failure_csv_filename" in general_config:
opts.failure_csv_filename = general_config["failure_csv_filename"]
elif "forensic_csv_filename" in general_config:
opts.failure_csv_filename = general_config["forensic_csv_filename"]
if "smtp_tls_csv_filename" in general_config:
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_retries" in general_config:
opts.dns_retries = general_config.getint("dns_retries")
if opts.dns_retries is None:
opts.dns_retries = 0
if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config:
opts.nameservers = _str_to_list(general_config["nameservers"])
# nameservers pre-flight check
dummy_hostname = None
try:
dummy_hostname = get_reverse_dns(
opts.dns_test_address,
nameservers=opts.nameservers,
timeout=opts.dns_timeout,
)
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
) from ns_error
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
opts.dns_test_address, opts.nameservers
)
)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
if "save_failure" in general_config:
opts.save_failure = bool(general_config.getboolean("save_failure"))
elif "save_forensic" in general_config:
opts.save_failure = bool(general_config.getboolean("save_forensic"))
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
opts.fail_on_output_error = bool(
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = _expand_path(general_config["log_file"])
if "n_procs" in general_config:
opts.n_procs = general_config.getint("n_procs")
if "ip_db_path" in general_config:
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
else:
opts.ip_db_path = None
if "ipinfo_url" in general_config:
opts.ipinfo_url = general_config["ipinfo_url"]
elif "ip_db_url" in general_config:
# ``ip_db_url`` is the pre-9.10 name for the same option. Accept
# it as a deprecated alias; prefer ``ipinfo_url`` going forward.
opts.ipinfo_url = general_config["ip_db_url"]
logger.warning("[general] ip_db_url is deprecated; rename it to ipinfo_url")
if "ipinfo_api_token" in general_config:
opts.ipinfo_api_token = general_config["ipinfo_api_token"]
if "always_use_local_files" in general_config:
opts.always_use_local_files = bool(
general_config.getboolean("always_use_local_files")
)
if "local_reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = _expand_path(
general_config["local_reverse_dns_map_path"]
)
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
if "local_psl_overrides_path" in general_config:
opts.psl_overrides_path = _expand_path(
general_config["local_psl_overrides_path"]
)
if "psl_overrides_url" in general_config:
opts.psl_overrides_url = general_config["psl_overrides_url"]
if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
if "msgraph" in config.sections():
opts.mailbox_reports_folder = "Inbox"
if "reports_folder" in mailbox_config:
opts.mailbox_reports_folder = mailbox_config["reports_folder"]
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config:
opts.mailbox_check_timeout = mailbox_config.getint("check_timeout")
if "since" in mailbox_config:
opts.mailbox_since = mailbox_config["since"]
if "imap" in config.sections():
imap_config = config["imap"]
if "watch" in imap_config:
logger.warning(
"Starting in 8.0.0, the watch option has been "
"moved from the imap configuration section to "
"the mailbox configuration section."
)
if "host" in imap_config:
opts.imap_host = imap_config["host"]
else:
raise ConfigurationError(
"host setting missing from the imap config section"
)
if "port" in imap_config:
opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout")
if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool(
imap_config.getboolean("skip_certificate_verification")
)
if "user" in imap_config:
opts.imap_user = imap_config["user"]
else:
raise ConfigurationError(
"user setting missing from the imap config section"
)
if "password" in imap_config:
opts.imap_password = imap_config["password"]
else:
raise ConfigurationError(
"password setting missing from the imap config section"
)
if "reports_folder" in imap_config:
opts.mailbox_reports_folder = imap_config["reports_folder"]
logger.warning(
"Use of the reports_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "archive_folder" in imap_config:
opts.mailbox_archive_folder = imap_config["archive_folder"]
logger.warning(
"Use of the archive_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
logger.warning(
"Use of the watch option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "delete" in imap_config:
logger.warning(
"Use of the delete option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test"))
logger.warning(
"Use of the test option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "batch_size" in imap_config:
opts.mailbox_batch_size = imap_config.getint("batch_size")
logger.warning(
"Use of the batch_size option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "msgraph" in config.sections():
graph_config = config["msgraph"]
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
if "auth_method" not in graph_config:
logger.info(
"auth_method setting missing from the "
"msgraph config section "
"defaulting to UsernamePassword"
)
opts.graph_auth_method = AuthMethod.UsernamePassword.name
else:
opts.graph_auth_method = graph_config["auth_method"]
if opts.graph_auth_method == AuthMethod.UsernamePassword.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
else:
raise ConfigurationError(
"user setting missing from the msgraph config section"
)
if "password" in graph_config:
opts.graph_password = graph_config["password"]
else:
raise ConfigurationError(
"password setting missing from the msgraph config section"
)
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.DeviceCode.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
if opts.graph_auth_method != AuthMethod.UsernamePassword.name:
if "tenant_id" in graph_config:
opts.graph_tenant_id = graph_config["tenant_id"]
else:
raise ConfigurationError(
"tenant_id setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.ClientSecret.name:
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = _expand_path(
graph_config["certificate_path"]
)
else:
raise ConfigurationError(
"certificate_path setting missing from the msgraph config section"
)
if "certificate_password" in graph_config:
opts.graph_certificate_password = graph_config["certificate_password"]
if "client_id" in graph_config:
opts.graph_client_id = graph_config["client_id"]
else:
raise ConfigurationError(
"client_id setting missing from the msgraph config section"
)
if "mailbox" in graph_config:
opts.graph_mailbox = graph_config["mailbox"]
elif opts.graph_auth_method != AuthMethod.UsernamePassword.name:
raise ConfigurationError(
"mailbox setting missing from the msgraph config section"
)
if "graph_url" in graph_config:
opts.graph_url = graph_config["graph_url"]
elif "url" in graph_config:
opts.graph_url = graph_config["url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
graph_config.getboolean("allow_unencrypted_storage")
)
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the elasticsearch config section"
)
if "timeout" in elasticsearch_config:
timeout = elasticsearch_config.getfloat("timeout")
opts.elasticsearch_timeout = timeout
if "number_of_shards" in elasticsearch_config:
number_of_shards = elasticsearch_config.getint("number_of_shards")
opts.elasticsearch_number_of_shards = number_of_shards
if "number_of_replicas" in elasticsearch_config:
number_of_replicas = elasticsearch_config.getint("number_of_replicas")
opts.elasticsearch_number_of_replicas = number_of_replicas
if "index_suffix" in elasticsearch_config:
opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"]
if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = _expand_path(
elasticsearch_config["cert_path"]
)
if "skip_certificate_verification" in elasticsearch_config:
opts.elasticsearch_skip_certificate_verification = bool(
elasticsearch_config.getboolean("skip_certificate_verification")
)
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
opts.elasticsearch_password = elasticsearch_config["password"]
# Until 8.20
if "apiKey" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["apiKey"]
# Since 8.20
if "api_key" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["api_key"]
if "serverless" in elasticsearch_config:
opts.elasticsearch_serverless = elasticsearch_config.getboolean(
"serverless"
)
if "opensearch" in config:
opensearch_config = config["opensearch"]
if "hosts" in opensearch_config:
opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the opensearch config section"
)
if "timeout" in opensearch_config:
timeout = opensearch_config.getfloat("timeout")
opts.opensearch_timeout = timeout
if "number_of_shards" in opensearch_config:
number_of_shards = opensearch_config.getint("number_of_shards")
opts.opensearch_number_of_shards = number_of_shards
if "number_of_replicas" in opensearch_config:
number_of_replicas = opensearch_config.getint("number_of_replicas")
opts.opensearch_number_of_replicas = number_of_replicas
if "index_suffix" in opensearch_config:
opts.opensearch_index_suffix = opensearch_config["index_suffix"]
if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
if "skip_certificate_verification" in opensearch_config:
opts.opensearch_skip_certificate_verification = bool(
opensearch_config.getboolean("skip_certificate_verification")
)
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
opts.opensearch_password = opensearch_config["password"]
# Until 8.20
if "apiKey" in opensearch_config:
opts.opensearch_api_key = opensearch_config["apiKey"]
# Since 8.20
if "api_key" in opensearch_config:
opts.opensearch_api_key = opensearch_config["api_key"]
if "auth_type" in opensearch_config:
opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower()
elif "authentication_type" in opensearch_config:
opts.opensearch_auth_type = (
opensearch_config["authentication_type"].strip().lower()
)
if "aws_region" in opensearch_config:
opts.opensearch_aws_region = opensearch_config["aws_region"].strip()
if "aws_service" in opensearch_config:
opts.opensearch_aws_service = opensearch_config["aws_service"].strip()
if "splunk_hec" in config.sections():
hec_config = config["splunk_hec"]
if "url" in hec_config:
opts.hec = hec_config["url"]
else:
raise ConfigurationError(
"url setting missing from the splunk_hec config section"
)
if "token" in hec_config:
opts.hec_token = hec_config["token"]
else:
raise ConfigurationError(
"token setting missing from the splunk_hec config section"
)
if "index" in hec_config:
opts.hec_index = hec_config["index"]
else:
raise ConfigurationError(
"index setting missing from the splunk_hec config section"
)
if "skip_certificate_verification" in hec_config:
opts.hec_skip_certificate_verification = bool(
hec_config.getboolean("skip_certificate_verification", fallback=False)
)
if "kafka" in config.sections():
kafka_config = config["kafka"]
if "hosts" in kafka_config:
opts.kafka_hosts = _str_to_list(kafka_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the kafka config section"
)
if "user" in kafka_config:
opts.kafka_username = kafka_config["user"]
if "password" in kafka_config:
opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
if "skip_certificate_verification" in kafka_config:
kafka_verify = bool(
kafka_config.getboolean("skip_certificate_verification")
)
opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
else:
raise ConfigurationError(
"aggregate_topic setting missing from the kafka config section"
)
if "failure_topic" in kafka_config:
opts.kafka_failure_topic = kafka_config["failure_topic"]
elif "forensic_topic" in kafka_config:
opts.kafka_failure_topic = kafka_config["forensic_topic"]
else:
raise ConfigurationError(
"failure_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
else:
raise ConfigurationError(
"smtp_tls_topic setting missing from the kafka config section"
)
if "smtp" in config.sections():
smtp_config = config["smtp"]
if "host" in smtp_config:
opts.smtp_host = smtp_config["host"]
else:
raise ConfigurationError(
"host setting missing from the smtp config section"
)
if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
if "skip_certificate_verification" in smtp_config:
smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification"))
opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config:
opts.smtp_user = smtp_config["user"]
else:
raise ConfigurationError(
"user setting missing from the smtp config section"
)
if "password" in smtp_config:
opts.smtp_password = smtp_config["password"]
else:
raise ConfigurationError(
"password setting missing from the smtp config section"
)
if "from" in smtp_config:
opts.smtp_from = smtp_config["from"]
else:
logger.critical("from setting missing from the smtp config section")
if "to" in smtp_config:
opts.smtp_to = _str_to_list(smtp_config["to"])
else:
logger.critical("to setting missing from the smtp config section")
if "subject" in smtp_config:
opts.smtp_subject = smtp_config["subject"]
if "attachment" in smtp_config:
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
if "s3" in config.sections():
s3_config = config["s3"]
if "bucket" in s3_config:
opts.s3_bucket = s3_config["bucket"]
else:
raise ConfigurationError(
"bucket setting missing from the s3 config section"
)
if "path" in s3_config:
opts.s3_path = s3_config["path"]
if opts.s3_path.startswith("/"):
opts.s3_path = opts.s3_path[1:]
if opts.s3_path.endswith("/"):
opts.s3_path = opts.s3_path[:-1]
else:
opts.s3_path = ""
if "region_name" in s3_config:
opts.s3_region_name = s3_config["region_name"]
if "endpoint_url" in s3_config:
opts.s3_endpoint_url = s3_config["endpoint_url"]
if "access_key_id" in s3_config:
opts.s3_access_key_id = s3_config["access_key_id"]
if "secret_access_key" in s3_config:
opts.s3_secret_access_key = s3_config["secret_access_key"]
if "postgresql" in config.sections():
pg_config = config["postgresql"]
if "connection_string" in pg_config:
opts.postgresql_connection_string = pg_config["connection_string"]
elif "host" in pg_config:
opts.postgresql_host = pg_config["host"]
if "port" in pg_config:
opts.postgresql_port = pg_config.getint("port")
if "user" in pg_config:
opts.postgresql_user = pg_config["user"]
if "password" in pg_config:
opts.postgresql_password = pg_config["password"]
if "database" in pg_config:
opts.postgresql_database = pg_config["database"]
else:
raise ConfigurationError(
"host (or connection_string) setting missing from the "
"postgresql config section"
)
if "syslog" in config.sections():
syslog_config = config["syslog"]
if "server" in syslog_config:
opts.syslog_server = syslog_config["server"]
else:
raise ConfigurationError(
"server setting missing from the syslog config section"
)
if "port" in syslog_config:
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
gmail_creds = gmail_api_config.get("credentials_file")
opts.gmail_api_credentials_file = (
_expand_path(gmail_creds) if gmail_creds else gmail_creds
)
opts.gmail_api_token_file = _expand_path(
gmail_api_config.get("token_file", ".token")
)
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
)
opts.gmail_api_paginate_messages = bool(
gmail_api_config.getboolean("paginate_messages", True)
)
default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify"
opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope)
opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes)
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
maildir_p = maildir_api_config.get(
"maildir_path", maildir_api_config.get("path")
)
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
opts.maildir_create = bool(
maildir_api_config.getboolean(
"maildir_create",
fallback=maildir_api_config.getboolean("create", fallback=False),
)
)
if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"]
opts.la_client_id = log_analytics_config.get("client_id")
opts.la_client_secret = log_analytics_config.get("client_secret")
opts.la_tenant_id = log_analytics_config.get("tenant_id")
opts.la_dce = log_analytics_config.get("dce")
opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id")
opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream")
opts.la_dcr_failure_stream = log_analytics_config.get(
"dcr_failure_stream"
) or log_analytics_config.get("dcr_forensic_stream")
opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream")
if "gelf" in config.sections():
gelf_config = config["gelf"]
if "host" in gelf_config:
opts.gelf_host = gelf_config["host"]
else:
raise ConfigurationError(
"host setting missing from the gelf config section"
)
if "port" in gelf_config:
opts.gelf_port = gelf_config["port"]
else:
raise ConfigurationError(
"port setting missing from the gelf config section"
)
if "mode" in gelf_config:
opts.gelf_mode = gelf_config["mode"]
else:
raise ConfigurationError(
"mode setting missing from the gelf config section"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
opts.webhook_aggregate_url = webhook_config["aggregate_url"]
if "failure_url" in webhook_config:
opts.webhook_failure_url = webhook_config["failure_url"]
elif "forensic_url" in webhook_config:
opts.webhook_failure_url = webhook_config["forensic_url"]
if "smtp_tls_url" in webhook_config:
opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"]
if "timeout" in webhook_config:
opts.webhook_timeout = webhook_config.getint("timeout")
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
Returns:
dict of client instances keyed by name.
Raises:
ConfigurationError: If a required output client cannot be created.
"""
clients = {}
try:
if opts.s3_bucket:
logger.debug("Initializing S3 client: bucket=%s", opts.s3_bucket)
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception as e:
raise RuntimeError(f"S3: {e}") from e
try:
if opts.postgresql_host or opts.postgresql_connection_string:
logger.debug("Initializing PostgreSQL client")
pg_client = postgres.PostgreSQLClient(
connection_string=opts.postgresql_connection_string,
host=opts.postgresql_host,
port=int(opts.postgresql_port or 5432),
user=opts.postgresql_user,
password=opts.postgresql_password,
database=opts.postgresql_database,
)
pg_client.create_tables()
clients["postgresql_client"] = pg_client
except Exception as e:
raise RuntimeError(f"PostgreSQL: {e}") from e
try:
if opts.syslog_server:
logger.debug(
"Initializing syslog client: server=%s:%s",
opts.syslog_server,
opts.syslog_port,
)
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception as e:
raise RuntimeError(f"Syslog: {e}") from e
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
try:
logger.debug("Initializing Splunk HEC client: url=%s", opts.hec)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception as e:
raise RuntimeError(f"Splunk HEC: {e}") from e
try:
if opts.kafka_hosts:
logger.debug("Initializing Kafka client: hosts=%s", opts.kafka_hosts)
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
except Exception as e:
raise RuntimeError(f"Kafka: {e}") from e
try:
if opts.gelf_host:
logger.debug(
"Initializing GELF client: host=%s:%s",
opts.gelf_host,
opts.gelf_port,
)
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as e:
raise RuntimeError(f"GELF: {e}") from e
try:
if (
opts.webhook_aggregate_url
or opts.webhook_failure_url
or opts.webhook_smtp_tls_url
):
logger.debug("Initializing webhook client")
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
failure_url=opts.webhook_failure_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception as e:
raise RuntimeError(f"Webhook: {e}") from e
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimizes the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_failure or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
logger.debug(
"Initializing Elasticsearch client: hosts=%s, ssl=%s",
opts.elasticsearch_hosts,
opts.elasticsearch_ssl,
)
es_aggregate_index = "dmarc_aggregate"
es_failure_index = "dmarc_failure"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_failure_index = "{0}_{1}".format(es_failure_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_failure_index = "{0}{1}".format(prefix, es_failure_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
serverless=opts.elasticsearch_serverless,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
failure_indexes=[es_failure_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
raise RuntimeError(f"Elasticsearch: {e}") from e
try:
if opts.opensearch_hosts:
logger.debug(
"Initializing OpenSearch client: hosts=%s, ssl=%s",
opts.opensearch_hosts,
opts.opensearch_ssl,
)
os_aggregate_index = "dmarc_aggregate"
os_failure_index = "dmarc_failure"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_failure_index = "{0}_{1}".format(os_failure_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_failure_index = "{0}{1}".format(prefix, os_failure_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
failure_indexes=[os_failure_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
raise RuntimeError(f"OpenSearch: {e}") from e
return clients
def _close_output_clients(clients):
"""Close output clients that hold persistent connections.
Clients that do not expose a ``close`` method are silently skipped.
Errors during closing are logged as warnings and do not propagate.
Idempotent: each client is popped as it is closed, so a second call
(e.g. the trailing close plus the atexit safety net) is a no-op.
Args:
clients: dict of client instances returned by :func:`_init_output_clients`.
"""
while clients:
name, client = clients.popitem()
if hasattr(client, "close"):
try:
client.close()
except Exception:
logger.warning("Error closing %s", name, exc_info=True)
def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["policy_domain"]
if domain:
domain = get_base_domain(domain)
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
output_errors = []
def log_output_error(destination, error):
message = f"{destination} Error: {error}"
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
)
if not opts.silent:
print(output_str)
if opts.output:
save_output(
reports_,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
failure_json_filename=opts.failure_json_filename,
smtp_tls_json_filename=opts.smtp_tls_json_filename,
aggregate_csv_filename=opts.aggregate_csv_filename,
failure_csv_filename=opts.failure_csv_filename,
smtp_tls_csv_filename=opts.smtp_tls_csv_filename,
)
kafka_client = clients.get("kafka_client")
s3_client = clients.get("s3_client")
syslog_client = clients.get("syslog_client")
hec_client = clients.get("hec_client")
gelf_client = clients.get("gelf_client")
webhook_client = clients.get("webhook_client")
pg_client = clients.get("postgresql_client")
kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_failure_topic = opts.kafka_failure_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
if opts.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
if opts.elasticsearch_hosts:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
elastic.save_aggregate_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except Exception as error_:
log_output_error("Elasticsearch exception", error_.__str__())
try:
if opts.opensearch_hosts:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
opensearch.save_aggregate_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except Exception as error_:
log_output_error("OpenSearch exception", error_.__str__())
try:
if kafka_client:
kafka_client.save_aggregate_reports_to_kafka(
report, kafka_aggregate_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if pg_client:
pg_client.save_aggregate_report_to_postgresql(report)
except postgres.AlreadySaved as warning:
logger.warning(warning.__str__())
except postgres.PostgreSQLError as error_:
log_output_error("PostgreSQL", error_.__str__())
try:
if syslog_client:
syslog_client.save_aggregate_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_aggregate_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_aggregate_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
aggregate_reports_ = reports_["aggregate_reports"]
if len(aggregate_reports_) > 0:
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_failure:
for report in reports_["failure_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_failure_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_failure_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_failure_reports_to_kafka(
report, kafka_failure_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_failure_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if pg_client:
pg_client.save_failure_report_to_postgresql(report)
except postgres.AlreadySaved as warning:
logger.warning(warning.__str__())
except postgres.PostgreSQLError as error_:
log_output_error("PostgreSQL", error_.__str__())
try:
if syslog_client:
syslog_client.save_failure_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_failure_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_failure_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_failure_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
failure_reports_ = reports_["failure_reports"]
if len(failure_reports_) > 0:
hec_client.save_failure_reports_to_splunk(failure_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_smtp_tls:
for report in reports_["smtp_tls_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_smtp_tls_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_smtp_tls_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_smtp_tls_reports_to_kafka(
[report], kafka_smtp_tls_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_smtp_tls_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if pg_client:
pg_client.save_smtp_tls_report_to_postgresql(report)
except postgres.AlreadySaved as warning:
logger.warning(warning.__str__())
except postgres.PostgreSQLError as error_:
log_output_error("PostgreSQL", error_.__str__())
try:
if syslog_client:
syslog_client.save_smtp_tls_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_smtp_tls_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_smtp_tls_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
smtp_tls_reports_ = reports_["smtp_tls_reports"]
if len(smtp_tls_reports_) > 0:
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.la_dce:
try:
la_client = loganalytics.LogAnalyticsClient(
client_id=opts.la_client_id,
client_secret=opts.la_client_secret,
tenant_id=opts.la_tenant_id,
dce=opts.la_dce,
dcr_immutable_id=opts.la_dcr_immutable_id,
dcr_aggregate_stream=opts.la_dcr_aggregate_stream,
dcr_failure_stream=opts.la_dcr_failure_stream,
dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream,
)
la_client.publish_results(
reports_,
opts.save_aggregate,
opts.save_failure,
opts.save_smtp_tls,
)
except loganalytics.LogAnalyticsException as e:
log_output_error("Log Analytics", e.__str__())
except Exception as e:
log_output_error("Log Analytics", f"Unknown publishing error: {e}")
if opts.fail_on_output_error and output_errors:
raise ParserError(
"Output destination failures detected: {0}".format(
" | ".join(output_errors)
)
)
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument(
"-c",
"--config-file",
help="a path to a configuration file (--silent implied)",
)
arg_parser.add_argument(
"file_path",
nargs="*",
help="one or more paths to aggregate or failure "
"report files, emails, or mbox files'",
)
strip_attachment_help = "remove attachment payloads from failure report output"
arg_parser.add_argument(
"--strip-attachment-payloads", help=strip_attachment_help, action="store_true"
)
arg_parser.add_argument(
"-o", "--output", help="write output files to the given directory"
)
arg_parser.add_argument(
"--aggregate-json-filename",
help="filename for the aggregate JSON output file",
default="aggregate.json",
)
arg_parser.add_argument(
"--failure-json-filename",
help="filename for the failure JSON output file",
default="failure.json",
)
arg_parser.add_argument(
"--smtp-tls-json-filename",
help="filename for the SMTP TLS JSON output file",
default="smtp_tls.json",
)
arg_parser.add_argument(
"--aggregate-csv-filename",
help="filename for the aggregate CSV output file",
default="aggregate.csv",
)
arg_parser.add_argument(
"--failure-csv-filename",
help="filename for the failure CSV output file",
default="failure.csv",
)
arg_parser.add_argument(
"--smtp-tls-csv-filename",
help="filename for the SMTP TLS CSV output file",
default="smtp_tls.csv",
)
arg_parser.add_argument(
"-n", "--nameservers", nargs="+", help="nameservers to query"
)
arg_parser.add_argument(
"-t",
"--dns_timeout",
help="number of seconds to wait for an answer from DNS (default: 2.0)",
type=float,
default=2.0,
)
arg_parser.add_argument(
"--dns-retries",
dest="dns_retries",
help="number of times to retry DNS queries on timeout or other "
"transient errors (default: 0)",
type=int,
default=0,
)
arg_parser.add_argument(
"--offline",
action="store_true",
help="do not make online queries for geolocation or DNS",
)
arg_parser.add_argument(
"-s", "--silent", action="store_true", help="only print errors"
)
arg_parser.add_argument(
"-w",
"--warnings",
action="store_true",
help="print warnings in addition to errors",
)
arg_parser.add_argument(
"--verbose", action="store_true", help="more verbose output"
)
arg_parser.add_argument(
"--debug", action="store_true", help="print debugging information"
)
arg_parser.add_argument("--log-file", default=None, help="output logging to a file")
arg_parser.add_argument(
"--no-prettify-json",
action="store_false",
dest="prettify_json",
help="output JSON in a single line without indentation",
)
arg_parser.add_argument("-v", "--version", action="version", version=__version__)
aggregate_reports = []
failure_reports = []
smtp_tls_reports = []
args = arg_parser.parse_args()
opts = Namespace(
file_path=args.file_path,
config_file=args.config_file,
offline=args.offline,
strip_attachment_payloads=args.strip_attachment_payloads,
output=args.output,
aggregate_csv_filename=args.aggregate_csv_filename,
aggregate_json_filename=args.aggregate_json_filename,
failure_csv_filename=args.failure_csv_filename,
failure_json_filename=args.failure_json_filename,
smtp_tls_json_filename=args.smtp_tls_json_filename,
smtp_tls_csv_filename=args.smtp_tls_csv_filename,
nameservers=args.nameservers,
dns_test_address="1.1.1.1",
silent=args.silent,
warnings=args.warnings,
dns_timeout=args.dns_timeout,
dns_retries=args.dns_retries,
debug=args.debug,
verbose=args.verbose,
prettify_json=args.prettify_json,
save_aggregate=False,
save_failure=False,
save_smtp_tls=False,
mailbox_reports_folder="INBOX",
mailbox_archive_folder="Archive",
mailbox_watch=False,
mailbox_delete=False,
mailbox_test=False,
mailbox_batch_size=10,
mailbox_check_timeout=30,
mailbox_since=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
imap_port=993,
imap_timeout=30,
imap_max_retries=4,
imap_user=None,
imap_password=None,
graph_auth_method=None,
graph_user=None,
graph_password=None,
graph_client_id=None,
graph_client_secret=None,
graph_certificate_path=None,
graph_certificate_password=None,
graph_tenant_id=None,
graph_mailbox=None,
graph_allow_unencrypted_storage=False,
graph_url="https://graph.microsoft.com",
hec=None,
hec_token=None,
hec_index=None,
hec_skip_certificate_verification=False,
elasticsearch_hosts=None,
elasticsearch_timeout=60,
elasticsearch_number_of_shards=1,
elasticsearch_number_of_replicas=0,
elasticsearch_index_suffix=None,
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_skip_certificate_verification=False,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
elasticsearch_api_key=None,
elasticsearch_serverless=False,
opensearch_hosts=None,
opensearch_timeout=60,
opensearch_number_of_shards=1,
opensearch_number_of_replicas=0,
opensearch_index_suffix=None,
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_skip_certificate_verification=False,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
opensearch_api_key=None,
opensearch_auth_type="basic",
opensearch_aws_region=None,
opensearch_aws_service="es",
kafka_hosts=None,
kafka_username=None,
kafka_password=None,
kafka_aggregate_topic=None,
kafka_failure_topic=None,
kafka_smtp_tls_topic=None,
kafka_ssl=False,
kafka_skip_certificate_verification=False,
smtp_host=None,
smtp_port=25,
smtp_ssl=False,
smtp_skip_certificate_verification=False,
smtp_user=None,
smtp_password=None,
smtp_from=None,
smtp_to=[],
smtp_subject="parsedmarc report",
smtp_message="Please see the attached DMARC results.",
s3_bucket=None,
s3_path=None,
s3_region_name=None,
s3_endpoint_url=None,
s3_access_key_id=None,
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
gmail_api_paginate_messages=True,
gmail_api_scopes=[],
gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None,
maildir_create=False,
log_file=args.log_file,
n_procs=1,
ip_db_path=None,
ipinfo_url=None,
ipinfo_api_token=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
psl_overrides_path=None,
psl_overrides_url=None,
la_client_id=None,
la_client_secret=None,
la_tenant_id=None,
la_dce=None,
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_failure_stream=None,
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
webhook_aggregate_url=None,
webhook_failure_url=None,
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
postgresql_host=None,
postgresql_port=5432,
postgresql_user=None,
postgresql_password=None,
postgresql_database=None,
postgresql_connection_string=None,
fail_on_output_error=False,
)
# Snapshot opts as set from CLI args / hardcoded defaults, before any config
# file is applied. On each SIGHUP reload we restore this baseline first so
# that sections removed from the config file actually take effect.
opts_from_cli = Namespace(**vars(opts))
index_prefix_domain_map = None
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
has_env_config = any(
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
for k in os.environ
)
if config_file or has_env_config:
try:
config = _load_config(config_file)
index_prefix_domain_map = _parse_config(config, opts)
except ConfigurationError as e:
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
if opts.log_file:
try:
fh = logging.FileHandler(opts.log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
and opts.gmail_api_credentials_file is None
and opts.maildir_path is None
and len(opts.file_path) == 0
):
logger.error("You must supply input files or a mailbox connection")
exit(1)
logger.info("Starting parsedmarc")
load_ip_db(
always_use_local_file=opts.always_use_local_files,
local_file_path=opts.ip_db_path,
url=opts.ipinfo_url,
offline=opts.offline,
)
if opts.ipinfo_api_token and not opts.offline:
try:
configure_ipinfo_api(opts.ipinfo_api_token)
except InvalidIPinfoAPIKey as e:
logger.critical(str(e))
exit(1)
load_psl_overrides(
always_use_local_file=opts.always_use_local_files,
local_file_path=opts.psl_overrides_path,
url=opts.psl_overrides_url,
offline=opts.offline,
)
# Initialize output clients (with retry for transient connection errors)
clients = {}
max_retries = 4
retry_delay = 5
for attempt in range(max_retries + 1):
try:
clients = _init_output_clients(opts)
break
except ConfigurationError as e:
logger.critical(str(e))
exit(1)
except Exception as error_:
if attempt < max_retries:
logger.warning(
"Output client error (attempt %d/%d, retrying in %ds): %s",
attempt + 1,
max_retries + 1,
retry_delay,
error_,
)
time.sleep(retry_delay)
retry_delay *= 2
else:
logger.error("Output client error: {0}".format(error_))
exit(1)
# Always close output clients on the way out (normal return,
# exit(N), uncaught exception, or SystemExit from a signal-driven
# shutdown). atexit does NOT fire on os._exit(130) — that's
# intentional for the SIGINT double-tap. The lambda closes whatever
# `clients` currently points at, so a SIGHUP reload that swaps the
# dict in-place is still covered.
atexit.register(lambda: _close_output_clients(clients))
# Signal handlers set a cooperative flag polled at safe checkpoints:
# the one-shot loops check it between batches; the watch loop relies
# on the mailbox backend polling `config_reloading` (which includes
# this flag) between checks, including inside the IMAP IDLE loop, so
# the current batch finishes before the watcher exits. SIGINT is a
# "double tap": the first press is graceful, the second short-circuits
# to os._exit(130). os._exit is async-signal-safe; sys.exit and
# logging are not, so the handlers only set flags / call os._exit.
_reload_requested = False
_shutdown_requested = False
_sigint_count = 0
def _handle_sighup(signum, frame):
nonlocal _reload_requested
_reload_requested = True
def _handle_sigterm(signum, frame):
nonlocal _shutdown_requested
_shutdown_requested = True
def _handle_sigint(signum, frame):
nonlocal _shutdown_requested, _sigint_count
_sigint_count += 1
if _sigint_count >= 2:
os._exit(130)
_shutdown_requested = True
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
signal.signal(signal.SIGTERM, _handle_sigterm)
signal.signal(signal.SIGINT, _handle_sigint)
file_paths = _expand_file_path_args(args.file_path)
mbox_paths = []
for file_path in file_paths:
if is_mbox(file_path):
mbox_paths.append(file_path)
file_paths = list(set(file_paths))
mbox_paths = list(set(mbox_paths))
for mbox_path in mbox_paths:
file_paths.remove(mbox_path)
counter = 0
results = []
pbar = None
if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1)
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
# Honor a shutdown request between batches before spawning the
# next pool. Anything already parsed is still in `results` and
# will go through process_reports() in the cleanup path so we
# don't lose work the operator already paid for.
if _shutdown_requested:
logger.info(
"Shutdown requested, stopping file processing after %d batch(es)",
batch_index,
)
break
processes = []
connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
if proc_index >= len(file_paths):
break
parent_conn, child_conn = Pipe()
connections.append(parent_conn)
process = Process(
target=cli_parse,
args=(
file_paths[proc_index],
opts.strip_attachment_payloads,
opts.nameservers,
opts.dns_timeout,
opts.dns_retries,
opts.ip_db_path,
opts.offline,
opts.always_use_local_files,
opts.reverse_dns_map_path,
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
for proc in processes:
proc.start()
for conn in connections:
results.append(conn.recv())
for proc in processes:
proc.join()
if pbar is not None:
counter += 1
pbar.update(1)
if pbar is not None:
pbar.close()
for result in results:
if isinstance(result[0], ParserError) or result[0] is None:
logger.error("Failed to parse {0} - {1}".format(result[1], result[0]))
else:
if result[0]["report_type"] == "aggregate":
report_org = result[0]["report"]["report_metadata"]["org_name"]
report_id = result[0]["report"]["report_metadata"]["report_id"]
report_key = f"{report_org}_{report_id}"
if report_key not in SEEN_AGGREGATE_REPORT_IDS:
SEEN_AGGREGATE_REPORT_IDS[report_key] = True
aggregate_reports.append(result[0]["report"])
else:
logger.debug(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif result[0]["report_type"] == "failure":
failure_reports.append(result[0]["report"])
elif result[0]["report_type"] == "smtp_tls":
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
if _shutdown_requested:
logger.info("Shutdown requested, skipping remaining mbox files")
break
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox(
mbox_path,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
dns_retries=opts.dns_retries,
strip_attachment_payloads=strip,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
failure_reports += reports["failure_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
if opts.imap_skip_certificate_verification:
logger.debug("Skipping IMAP certificate verification")
verify = False
if not opts.imap_ssl:
ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=imap_port_value,
ssl=ssl,
verify=verify,
timeout=imap_timeout,
max_retries=imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
)
except Exception:
logger.exception("IMAP Error")
exit(1)
if opts.graph_client_id:
try:
mailbox = opts.graph_mailbox or opts.graph_user
mailbox_connection = MSGraphConnection(
auth_method=opts.graph_auth_method,
mailbox=mailbox,
tenant_id=opts.graph_tenant_id,
client_id=opts.graph_client_id,
client_secret=opts.graph_client_secret,
certificate_path=opts.graph_certificate_path,
certificate_password=opts.graph_certificate_password,
username=opts.graph_user,
password=opts.graph_password,
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
graph_url=opts.graph_url,
token_cache_name="parsedmarc",
)
except Exception:
logger.exception("MS Graph Error")
exit(1)
if opts.gmail_api_credentials_file:
if opts.mailbox_delete:
if "https://mail.google.com/" not in opts.gmail_api_scopes:
logger.error(
"Message deletion requires scope"
" 'https://mail.google.com/'. "
"Add the scope and remove token file "
"to acquire proper access."
)
opts.mailbox_delete = False
try:
mailbox_connection = GmailConnection(
credentials_file=opts.gmail_api_credentials_file,
token_file=opts.gmail_api_token_file,
scopes=opts.gmail_api_scopes,
include_spam_trash=opts.gmail_api_include_spam_trash,
paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
)
except Exception:
logger.exception("Gmail API Error")
exit(1)
if opts.maildir_path:
try:
mailbox_connection = MaildirConnection(
maildir_path=opts.maildir_path,
maildir_create=opts.maildir_create,
)
except Exception:
logger.exception("Maildir Error")
exit(1)
if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
if mailbox_connection and not _shutdown_requested:
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
dns_retries=opts.dns_retries,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
failure_reports += reports["failure_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
except Exception:
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"failure_reports": failure_reports,
"smtp_tls_reports": smtp_tls_reports,
}
try:
process_reports(parsing_results)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if opts.smtp_host:
try:
verify = True
if opts.smtp_skip_certificate_verification:
verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,
port=smtp_port_value,
verify=verify,
username=opts.smtp_user,
password=opts.smtp_password,
subject=opts.smtp_subject,
require_encryption=opts.smtp_ssl,
)
except Exception:
logger.exception("Failed to email results")
exit(1)
if mailbox_connection and opts.mailbox_watch:
logger.info("Watching for email - Ctrl-C once to quit, twice to force")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
# `config_reloading` returns True on SIGHUP (reload) or
# SIGTERM/SIGINT (shutdown); the backend polls it between
# checks — including inside the IMAP IDLE loop — and returns
# at a safe boundary once the current batch is processed.
watch_inbox(
mailbox_connection=mailbox_connection,
callback=process_reports,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
dns_retries=opts.dns_retries,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value,
since=opts.mailbox_since,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
config_reloading=lambda: _reload_requested or _shutdown_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
except ParserError as error:
logger.error(error.__str__())
exit(1)
# Prioritize shutdown over reload if both flags are set (e.g.
# SIGHUP followed by SIGTERM). atexit closes output clients.
if _shutdown_requested:
logger.info("Shutdown requested, exiting watch loop")
break
if not _reload_requested:
break
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
# sections removed from the config file actually take effect.
new_opts = Namespace(**vars(opts_from_cli))
new_config = _load_config(config_file)
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
new_clients = _init_output_clients(new_opts)
# All steps succeeded — commit the changes atomically.
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect. PSL overrides
# are reloaded alongside it so map entries that depend on
# a folded base domain keep working.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
psl_overrides_path=new_opts.psl_overrides_path,
psl_overrides_url=new_opts.psl_overrides_url,
)
# Reload the IP database so changes to the
# db path/URL in the config take effect.
load_ip_db(
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.ip_db_path,
url=new_opts.ipinfo_url,
offline=new_opts.offline,
)
# Re-apply IPinfo API settings. Passing a falsy token disables
# the API; a rotated token picks up here too. An invalid token
# is fatal even on reload — the operator asked for it.
try:
configure_ipinfo_api(
new_opts.ipinfo_api_token if not new_opts.offline else None,
)
except InvalidIPinfoAPIKey as e:
logger.critical(str(e))
exit(1)
for k, v in vars(new_opts).items():
setattr(opts, k, v)
# Update watch parameters from reloaded config
mailbox_batch_size_value = (
int(opts.mailbox_batch_size)
if opts.mailbox_batch_size is not None
else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
# Update log level
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
# Refresh FileHandler if log_file changed
old_log_file = getattr(opts, "active_log_file", None)
new_log_file = opts.log_file
if old_log_file != new_log_file:
# Remove old FileHandlers
for h in list(logger.handlers):
if isinstance(h, logging.FileHandler):
h.close()
logger.removeHandler(h)
# Add new FileHandler if configured
if new_log_file:
try:
fh = logging.FileHandler(new_log_file, "a")
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s"
" - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
except Exception as log_error:
logger.warning(
"Unable to write to log file: {}".format(log_error)
)
opts.active_log_file = new_log_file
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(
"Config reload failed, continuing with previous config"
)
# Close output clients on the success path (one-shot or graceful
# watch-loop exit). atexit-registered above is the safety net for
# exit(1) / uncaught-exception paths.
_close_output_clients(clients)
if __name__ == "__main__":
_main()