Files
parsedmarc/parsedmarc/cli.py
Sean Whalen c5f432c460 Add optional IPinfo Lite REST API with MMDB fallback (#717)
* Add optional IPinfo Lite REST API with MMDB fallback

Configure [general] ipinfo_api_token (or PARSEDMARC_GENERAL_IPINFO_API_TOKEN)
and every IP lookup hits https://api.ipinfo.io/lite/<ip> first for fresh
country + ASN data. On HTTP 429 (rate-limit) or 402 (quota), the API is
disabled for the rest of the run and lookups fall through to the bundled /
cached MMDB; transient network errors fall through per-request without
disabling the API. An invalid token (401/403) raises InvalidIPinfoAPIKey,
which the CLI catches and exits fatally — including at startup via a probe
lookup so operators notice misconfiguration immediately. Added
ipinfo_api_url as a base-URL override for mirrors or proxies.

The API token is never logged. A new _normalize_ip_record() helper is
shared between the API path and the MMDB path so both paths produce the
same normalized shape (country code, asn int, asn_name, asn_domain).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* IPinfo API: cool down and retry instead of permanent disable

Previously a single 429 or 402 disabled the API for the whole run. Now
each event sets a cooldown (using Retry-After when present, defaulting to
5 minutes for rate limits and 1 hour for quota exhaustion). Once the
cooldown expires the next lookup retries; a successful retry logs
"IPinfo API recovered" once at info level so operators can see service
came back. Repeat rate-limit responses after the first event stay at
debug to avoid log spam.

Test now targets parsedmarc.log (the actual emitting logger) instead of
the parsedmarc parent — cli._main() sets the child's level to ERROR,
and assertLogs on the parent can't see warnings filtered before
propagation. Test also exercises the cooldown-then-recovery path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* IPinfo API: log plan and quota from /me at startup

Configure-time probe now hits https://ipinfo.io/me first. That endpoint
is documented as quota-free and doubles as a free-of-quota token check,
so we use it to both validate the token and surface plan / month-to-date
usage / remaining-quota numbers at info level:

  IPinfo API configured — plan: Lite, usage: 12345/50000 this month, 37655 remaining

Field names in /me have drifted across IPinfo plan generations, so the
summary formatter probes a few aliases before giving up. If /me is
unreachable (custom mirror behind ipinfo_api_url, network error) we
fall back to the original 1.1.1.1 lookup probe, which still validates
the token and logs a generic "configured" message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Drop speculative ipinfo_api_url override

It was added mirroring ip_db_url, but the two serve different needs.
ip_db_url has a real use (internal hosting of the MMDB); an
authenticated IPinfo API isn't something anyone mirrors, and /me was
always hardcoded anyway, making the override half-baked. YAGNI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* AGENTS.md: warn against speculative config options

New section under Configuration spelling out that every option is
permanent surface area and must come from a real user need rather than
pattern-matching a nearby option. Cites the removed ipinfo_api_url as
the canonical cautionary tale so the next session doesn't reintroduce
it, and calls out "override the base URL" / "configurable retries" as
common YAGNI traps.

Also requires that new options land fully wired in one PR (INI schema,
_parse_config, Namespace defaults, docs, SIGHUP-reload path) rather
than half-implemented.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Rename [general] ip_db_url to ipinfo_url

The bundled MMDB is specifically IPinfo Lite, so the option name
should say so. ip_db_url stays accepted as a deprecated alias and
logs a warning when used; env-var equivalents accept either spelling
via the existing PARSEDMARC_{SECTION}_{KEY} machinery.

Updated the AGENTS.md cautionary tale to refer to ipinfo_url (with
the note about the alias) so the anti-pattern example still reads
correctly post-rename.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Fix testPSLDownload to reflect .akamaiedge.net override

PSL carries c.akamaiedge.net as a public suffix, but
psl_overrides.txt intentionally folds .akamaiedge.net so every
Akamai CDN-customer PTR (the aXXXX-XX.cXXXXX.akamaiedge.net pattern)
clusters under one akamaiedge.net display key. The override was added
in 2978436 as a design decision for source attribution; the test
assertion just predates it.

Updated the comment to explain why override wins over the live PSL
here so the next reader doesn't reach for the PSL answer again.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 10:11:37 -04:00

2450 lines
97 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""A CLI for parsing DMARC reports"""
import http.client
import json
import logging
import os
import signal
import sys
import time
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import yaml
from tqdm import tqdm
from parsedmarc import (
REVERSE_DNS_MAP,
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
__version__,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
s3,
save_output,
splunk,
syslog,
watch_inbox,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import (
InvalidIPinfoAPIKey,
configure_ipinfo_api,
get_base_domain,
get_reverse_dns,
is_mbox,
load_ip_db,
load_psl_overrides,
load_reverse_dns_map,
)
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
def _str_to_list(s):
"""Converts a comma separated string to a list"""
_list = s.split(",")
return list(map(lambda i: i.lstrip(), _list))
def _expand_path(p: str) -> str:
"""Expand ``~`` and ``$VAR`` references in a file path."""
return os.path.expanduser(os.path.expandvars(p))
# All known INI config section names, used for env var resolution.
_KNOWN_SECTIONS = frozenset(
{
"general",
"mailbox",
"imap",
"msgraph",
"elasticsearch",
"opensearch",
"splunk_hec",
"kafka",
"smtp",
"s3",
"syslog",
"gmail_api",
"maildir",
"log_analytics",
"gelf",
"webhook",
}
)
def _resolve_section_key(suffix: str) -> tuple:
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
Uses longest-prefix matching against known section names so that
multi-word sections like ``splunk_hec`` are handled correctly.
Returns ``(None, None)`` when no known section matches.
"""
suffix_lower = suffix.lower()
best_section = None
best_key = None
for section in _KNOWN_SECTIONS:
section_prefix = section + "_"
if suffix_lower.startswith(section_prefix):
key = suffix_lower[len(section_prefix) :]
if key and (best_section is None or len(section) > len(best_section)):
best_section = section
best_key = key
return best_section, best_key
def _apply_env_overrides(config: ConfigParser) -> None:
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
(or create) the corresponding config-file values. Sections are created
automatically when they do not yet exist.
"""
prefix = "PARSEDMARC_"
# Short aliases that don't follow the PARSEDMARC_{SECTION}_{KEY} pattern.
_ENV_ALIASES = {
"DEBUG": ("general", "debug"),
"PARSEDMARC_DEBUG": ("general", "debug"),
}
for env_key, env_value in os.environ.items():
if env_key in _ENV_ALIASES:
section, key = _ENV_ALIASES[env_key]
elif env_key.startswith(prefix) and env_key != "PARSEDMARC_CONFIG_FILE":
suffix = env_key[len(prefix) :]
section, key = _resolve_section_key(suffix)
else:
continue
if section is None:
logger.debug("Ignoring unrecognized env var: %s", env_key)
continue
if not config.has_section(section):
config.add_section(section)
config.set(section, key, env_value)
logger.debug("Config override from env: [%s] %s", section, key)
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
nameservers,
dns_timeout,
dns_retries,
ip_db_path,
offline,
always_use_local_files,
reverse_dns_map_path,
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
dns_retries: Number of DNS retries on transient errors
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
try:
file_results = parse_report_file(
file_path,
ip_db_path=ip_db_path,
offline=offline,
always_use_local_files=always_use_local_files,
reverse_dns_map_path=reverse_dns_map_path,
reverse_dns_map_url=reverse_dns_map_url,
nameservers=nameservers,
dns_timeout=dns_timeout,
dns_retries=dns_retries,
strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
conn.send([file_results, file_path])
except ParserError as error:
conn.send([error, file_path])
finally:
conn.close()
class ConfigurationError(Exception):
"""Raised when a configuration file has missing or invalid settings."""
pass
def _load_config(config_file: str | None = None) -> ConfigParser:
"""Load configuration from an INI file and/or environment variables.
Args:
config_file: Optional path to an .ini config file.
Returns:
A ``ConfigParser`` populated from the file (if given) and from any
``PARSEDMARC_*`` environment variables.
Raises:
ConfigurationError: If *config_file* is given but does not exist.
"""
config = ConfigParser(interpolation=None)
if config_file is not None:
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
if not os.access(abs_path, os.R_OK):
raise ConfigurationError(
"Unable to read {0} — check file permissions".format(abs_path)
)
config.read(config_file)
_apply_env_overrides(config)
return config
def _parse_config(config: ConfigParser, opts):
"""Apply a loaded ``ConfigParser`` to *opts* in place.
Args:
config: A ``ConfigParser`` (from ``_load_config``).
opts: Namespace object to update with parsed values.
Returns:
index_prefix_domain_map or None
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
opts.silent = True
index_prefix_domain_map = None
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool(
general_config.getboolean("strip_attachment_payloads")
)
if "output" in general_config:
opts.output = _expand_path(general_config["output"])
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "forensic_json_filename" in general_config:
opts.forensic_json_filename = general_config["forensic_json_filename"]
if "smtp_tls_json_filename" in general_config:
opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"]
if "aggregate_csv_filename" in general_config:
opts.aggregate_csv_filename = general_config["aggregate_csv_filename"]
if "forensic_csv_filename" in general_config:
opts.forensic_csv_filename = general_config["forensic_csv_filename"]
if "smtp_tls_csv_filename" in general_config:
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_retries" in general_config:
opts.dns_retries = general_config.getint("dns_retries")
if opts.dns_retries is None:
opts.dns_retries = 0
if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config:
opts.nameservers = _str_to_list(general_config["nameservers"])
# nameservers pre-flight check
dummy_hostname = None
try:
dummy_hostname = get_reverse_dns(
opts.dns_test_address,
nameservers=opts.nameservers,
timeout=opts.dns_timeout,
)
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
) from ns_error
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
opts.dns_test_address, opts.nameservers
)
)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
opts.fail_on_output_error = bool(
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = _expand_path(general_config["log_file"])
if "n_procs" in general_config:
opts.n_procs = general_config.getint("n_procs")
if "ip_db_path" in general_config:
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
else:
opts.ip_db_path = None
if "ipinfo_url" in general_config:
opts.ipinfo_url = general_config["ipinfo_url"]
elif "ip_db_url" in general_config:
# ``ip_db_url`` is the pre-9.10 name for the same option. Accept
# it as a deprecated alias; prefer ``ipinfo_url`` going forward.
opts.ipinfo_url = general_config["ip_db_url"]
logger.warning("[general] ip_db_url is deprecated; rename it to ipinfo_url")
if "ipinfo_api_token" in general_config:
opts.ipinfo_api_token = general_config["ipinfo_api_token"]
if "always_use_local_files" in general_config:
opts.always_use_local_files = bool(
general_config.getboolean("always_use_local_files")
)
if "local_reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = _expand_path(
general_config["local_reverse_dns_map_path"]
)
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
if "local_psl_overrides_path" in general_config:
opts.psl_overrides_path = _expand_path(
general_config["local_psl_overrides_path"]
)
if "psl_overrides_url" in general_config:
opts.psl_overrides_url = general_config["psl_overrides_url"]
if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
if "msgraph" in config.sections():
opts.mailbox_reports_folder = "Inbox"
if "reports_folder" in mailbox_config:
opts.mailbox_reports_folder = mailbox_config["reports_folder"]
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config:
opts.mailbox_check_timeout = mailbox_config.getint("check_timeout")
if "since" in mailbox_config:
opts.mailbox_since = mailbox_config["since"]
if "imap" in config.sections():
imap_config = config["imap"]
if "watch" in imap_config:
logger.warning(
"Starting in 8.0.0, the watch option has been "
"moved from the imap configuration section to "
"the mailbox configuration section."
)
if "host" in imap_config:
opts.imap_host = imap_config["host"]
else:
raise ConfigurationError(
"host setting missing from the imap config section"
)
if "port" in imap_config:
opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout")
if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool(
imap_config.getboolean("skip_certificate_verification")
)
if "user" in imap_config:
opts.imap_user = imap_config["user"]
else:
raise ConfigurationError(
"user setting missing from the imap config section"
)
if "password" in imap_config:
opts.imap_password = imap_config["password"]
else:
raise ConfigurationError(
"password setting missing from the imap config section"
)
if "reports_folder" in imap_config:
opts.mailbox_reports_folder = imap_config["reports_folder"]
logger.warning(
"Use of the reports_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "archive_folder" in imap_config:
opts.mailbox_archive_folder = imap_config["archive_folder"]
logger.warning(
"Use of the archive_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
logger.warning(
"Use of the watch option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "delete" in imap_config:
logger.warning(
"Use of the delete option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test"))
logger.warning(
"Use of the test option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "batch_size" in imap_config:
opts.mailbox_batch_size = imap_config.getint("batch_size")
logger.warning(
"Use of the batch_size option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "msgraph" in config.sections():
graph_config = config["msgraph"]
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
if "auth_method" not in graph_config:
logger.info(
"auth_method setting missing from the "
"msgraph config section "
"defaulting to UsernamePassword"
)
opts.graph_auth_method = AuthMethod.UsernamePassword.name
else:
opts.graph_auth_method = graph_config["auth_method"]
if opts.graph_auth_method == AuthMethod.UsernamePassword.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
else:
raise ConfigurationError(
"user setting missing from the msgraph config section"
)
if "password" in graph_config:
opts.graph_password = graph_config["password"]
else:
raise ConfigurationError(
"password setting missing from the msgraph config section"
)
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.DeviceCode.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
if opts.graph_auth_method != AuthMethod.UsernamePassword.name:
if "tenant_id" in graph_config:
opts.graph_tenant_id = graph_config["tenant_id"]
else:
raise ConfigurationError(
"tenant_id setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.ClientSecret.name:
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = _expand_path(
graph_config["certificate_path"]
)
else:
raise ConfigurationError(
"certificate_path setting missing from the msgraph config section"
)
if "certificate_password" in graph_config:
opts.graph_certificate_password = graph_config["certificate_password"]
if "client_id" in graph_config:
opts.graph_client_id = graph_config["client_id"]
else:
raise ConfigurationError(
"client_id setting missing from the msgraph config section"
)
if "mailbox" in graph_config:
opts.graph_mailbox = graph_config["mailbox"]
elif opts.graph_auth_method != AuthMethod.UsernamePassword.name:
raise ConfigurationError(
"mailbox setting missing from the msgraph config section"
)
if "graph_url" in graph_config:
opts.graph_url = graph_config["graph_url"]
elif "url" in graph_config:
opts.graph_url = graph_config["url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
graph_config.getboolean("allow_unencrypted_storage")
)
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the elasticsearch config section"
)
if "timeout" in elasticsearch_config:
timeout = elasticsearch_config.getfloat("timeout")
opts.elasticsearch_timeout = timeout
if "number_of_shards" in elasticsearch_config:
number_of_shards = elasticsearch_config.getint("number_of_shards")
opts.elasticsearch_number_of_shards = number_of_shards
if "number_of_replicas" in elasticsearch_config:
number_of_replicas = elasticsearch_config.getint("number_of_replicas")
opts.elasticsearch_number_of_replicas = number_of_replicas
if "index_suffix" in elasticsearch_config:
opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"]
if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = _expand_path(
elasticsearch_config["cert_path"]
)
if "skip_certificate_verification" in elasticsearch_config:
opts.elasticsearch_skip_certificate_verification = bool(
elasticsearch_config.getboolean("skip_certificate_verification")
)
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
opts.elasticsearch_password = elasticsearch_config["password"]
# Until 8.20
if "apiKey" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["apiKey"]
# Since 8.20
if "api_key" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["api_key"]
if "opensearch" in config:
opensearch_config = config["opensearch"]
if "hosts" in opensearch_config:
opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the opensearch config section"
)
if "timeout" in opensearch_config:
timeout = opensearch_config.getfloat("timeout")
opts.opensearch_timeout = timeout
if "number_of_shards" in opensearch_config:
number_of_shards = opensearch_config.getint("number_of_shards")
opts.opensearch_number_of_shards = number_of_shards
if "number_of_replicas" in opensearch_config:
number_of_replicas = opensearch_config.getint("number_of_replicas")
opts.opensearch_number_of_replicas = number_of_replicas
if "index_suffix" in opensearch_config:
opts.opensearch_index_suffix = opensearch_config["index_suffix"]
if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
if "skip_certificate_verification" in opensearch_config:
opts.opensearch_skip_certificate_verification = bool(
opensearch_config.getboolean("skip_certificate_verification")
)
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
opts.opensearch_password = opensearch_config["password"]
# Until 8.20
if "apiKey" in opensearch_config:
opts.opensearch_api_key = opensearch_config["apiKey"]
# Since 8.20
if "api_key" in opensearch_config:
opts.opensearch_api_key = opensearch_config["api_key"]
if "auth_type" in opensearch_config:
opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower()
elif "authentication_type" in opensearch_config:
opts.opensearch_auth_type = (
opensearch_config["authentication_type"].strip().lower()
)
if "aws_region" in opensearch_config:
opts.opensearch_aws_region = opensearch_config["aws_region"].strip()
if "aws_service" in opensearch_config:
opts.opensearch_aws_service = opensearch_config["aws_service"].strip()
if "splunk_hec" in config.sections():
hec_config = config["splunk_hec"]
if "url" in hec_config:
opts.hec = hec_config["url"]
else:
raise ConfigurationError(
"url setting missing from the splunk_hec config section"
)
if "token" in hec_config:
opts.hec_token = hec_config["token"]
else:
raise ConfigurationError(
"token setting missing from the splunk_hec config section"
)
if "index" in hec_config:
opts.hec_index = hec_config["index"]
else:
raise ConfigurationError(
"index setting missing from the splunk_hec config section"
)
if "skip_certificate_verification" in hec_config:
opts.hec_skip_certificate_verification = bool(
hec_config.getboolean("skip_certificate_verification", fallback=False)
)
if "kafka" in config.sections():
kafka_config = config["kafka"]
if "hosts" in kafka_config:
opts.kafka_hosts = _str_to_list(kafka_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the kafka config section"
)
if "user" in kafka_config:
opts.kafka_username = kafka_config["user"]
if "password" in kafka_config:
opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
if "skip_certificate_verification" in kafka_config:
kafka_verify = bool(
kafka_config.getboolean("skip_certificate_verification")
)
opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
else:
raise ConfigurationError(
"aggregate_topic setting missing from the kafka config section"
)
if "forensic_topic" in kafka_config:
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
else:
raise ConfigurationError(
"forensic_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
else:
raise ConfigurationError(
"smtp_tls_topic setting missing from the kafka config section"
)
if "smtp" in config.sections():
smtp_config = config["smtp"]
if "host" in smtp_config:
opts.smtp_host = smtp_config["host"]
else:
raise ConfigurationError(
"host setting missing from the smtp config section"
)
if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
if "skip_certificate_verification" in smtp_config:
smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification"))
opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config:
opts.smtp_user = smtp_config["user"]
else:
raise ConfigurationError(
"user setting missing from the smtp config section"
)
if "password" in smtp_config:
opts.smtp_password = smtp_config["password"]
else:
raise ConfigurationError(
"password setting missing from the smtp config section"
)
if "from" in smtp_config:
opts.smtp_from = smtp_config["from"]
else:
logger.critical("from setting missing from the smtp config section")
if "to" in smtp_config:
opts.smtp_to = _str_to_list(smtp_config["to"])
else:
logger.critical("to setting missing from the smtp config section")
if "subject" in smtp_config:
opts.smtp_subject = smtp_config["subject"]
if "attachment" in smtp_config:
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
if "s3" in config.sections():
s3_config = config["s3"]
if "bucket" in s3_config:
opts.s3_bucket = s3_config["bucket"]
else:
raise ConfigurationError(
"bucket setting missing from the s3 config section"
)
if "path" in s3_config:
opts.s3_path = s3_config["path"]
if opts.s3_path.startswith("/"):
opts.s3_path = opts.s3_path[1:]
if opts.s3_path.endswith("/"):
opts.s3_path = opts.s3_path[:-1]
else:
opts.s3_path = ""
if "region_name" in s3_config:
opts.s3_region_name = s3_config["region_name"]
if "endpoint_url" in s3_config:
opts.s3_endpoint_url = s3_config["endpoint_url"]
if "access_key_id" in s3_config:
opts.s3_access_key_id = s3_config["access_key_id"]
if "secret_access_key" in s3_config:
opts.s3_secret_access_key = s3_config["secret_access_key"]
if "syslog" in config.sections():
syslog_config = config["syslog"]
if "server" in syslog_config:
opts.syslog_server = syslog_config["server"]
else:
raise ConfigurationError(
"server setting missing from the syslog config section"
)
if "port" in syslog_config:
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
gmail_creds = gmail_api_config.get("credentials_file")
opts.gmail_api_credentials_file = (
_expand_path(gmail_creds) if gmail_creds else gmail_creds
)
opts.gmail_api_token_file = _expand_path(
gmail_api_config.get("token_file", ".token")
)
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
)
opts.gmail_api_paginate_messages = bool(
gmail_api_config.getboolean("paginate_messages", True)
)
default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify"
opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope)
opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes)
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
"service_account_user"
].strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config[
"delegated_user"
].strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
maildir_p = maildir_api_config.get(
"maildir_path", maildir_api_config.get("path")
)
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
opts.maildir_create = bool(
maildir_api_config.getboolean(
"maildir_create",
fallback=maildir_api_config.getboolean("create", fallback=False),
)
)
if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"]
opts.la_client_id = log_analytics_config.get("client_id")
opts.la_client_secret = log_analytics_config.get("client_secret")
opts.la_tenant_id = log_analytics_config.get("tenant_id")
opts.la_dce = log_analytics_config.get("dce")
opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id")
opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream")
opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream")
opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream")
if "gelf" in config.sections():
gelf_config = config["gelf"]
if "host" in gelf_config:
opts.gelf_host = gelf_config["host"]
else:
raise ConfigurationError(
"host setting missing from the gelf config section"
)
if "port" in gelf_config:
opts.gelf_port = gelf_config["port"]
else:
raise ConfigurationError(
"port setting missing from the gelf config section"
)
if "mode" in gelf_config:
opts.gelf_mode = gelf_config["mode"]
else:
raise ConfigurationError(
"mode setting missing from the gelf config section"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
opts.webhook_aggregate_url = webhook_config["aggregate_url"]
if "forensic_url" in webhook_config:
opts.webhook_forensic_url = webhook_config["forensic_url"]
if "smtp_tls_url" in webhook_config:
opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"]
if "timeout" in webhook_config:
opts.webhook_timeout = webhook_config.getint("timeout")
return index_prefix_domain_map
class _ElasticsearchHandle:
"""Sentinel so Elasticsearch participates in _close_output_clients."""
def close(self):
try:
conn = elastic.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
elastic.connections.remove_connection("default")
except Exception:
pass
class _OpenSearchHandle:
"""Sentinel so OpenSearch participates in _close_output_clients."""
def close(self):
try:
conn = opensearch.connections.get_connection()
if not isinstance(conn, str):
conn.close()
except Exception:
pass
try:
opensearch.connections.remove_connection("default")
except Exception:
pass
def _init_output_clients(opts):
"""Create output clients based on current opts.
Returns:
dict of client instances keyed by name.
Raises:
ConfigurationError: If a required output client cannot be created.
"""
clients = {}
try:
if opts.s3_bucket:
logger.debug("Initializing S3 client: bucket=%s", opts.s3_bucket)
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
except Exception as e:
raise RuntimeError(f"S3: {e}") from e
try:
if opts.syslog_server:
logger.debug(
"Initializing syslog client: server=%s:%s",
opts.syslog_server,
opts.syslog_port,
)
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception as e:
raise RuntimeError(f"Syslog: {e}") from e
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
try:
logger.debug("Initializing Splunk HEC client: url=%s", opts.hec)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
except Exception as e:
raise RuntimeError(f"Splunk HEC: {e}") from e
try:
if opts.kafka_hosts:
logger.debug("Initializing Kafka client: hosts=%s", opts.kafka_hosts)
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
except Exception as e:
raise RuntimeError(f"Kafka: {e}") from e
try:
if opts.gelf_host:
logger.debug(
"Initializing GELF client: host=%s:%s",
opts.gelf_host,
opts.gelf_port,
)
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as e:
raise RuntimeError(f"GELF: {e}") from e
try:
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
logger.debug("Initializing webhook client")
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
except Exception as e:
raise RuntimeError(f"Webhook: {e}") from e
# Elasticsearch and OpenSearch mutate module-level global state via
# connections.create_connection(), which cannot be rolled back if a later
# step fails. Initialise them last so that all other clients are created
# successfully first; this minimizes the window for partial-init problems
# during config reload.
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
try:
if opts.elasticsearch_hosts:
logger.debug(
"Initializing Elasticsearch client: hosts=%s, ssl=%s",
opts.elasticsearch_hosts,
opts.elasticsearch_ssl,
)
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
clients["elasticsearch"] = _ElasticsearchHandle()
except Exception as e:
raise RuntimeError(f"Elasticsearch: {e}") from e
try:
if opts.opensearch_hosts:
logger.debug(
"Initializing OpenSearch client: hosts=%s, ssl=%s",
opts.opensearch_hosts,
opts.opensearch_ssl,
)
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
clients["opensearch"] = _OpenSearchHandle()
except Exception as e:
raise RuntimeError(f"OpenSearch: {e}") from e
return clients
def _close_output_clients(clients):
"""Close output clients that hold persistent connections.
Clients that do not expose a ``close`` method are silently skipped.
Errors during closing are logged as warnings and do not propagate.
Args:
clients: dict of client instances returned by :func:`_init_output_clients`.
"""
for name, client in clients.items():
if hasattr(client, "close"):
try:
client.close()
except Exception:
logger.warning("Error closing %s", name, exc_info=True)
def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report["reported_domain"]
elif "policies" in report:
domain = report["policies"][0]["policy_domain"]
if domain:
domain = get_base_domain(domain)
if domain:
domain = domain.lower()
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
output_errors = []
def log_output_error(destination, error):
message = f"{destination} Error: {error}"
logger.error(message)
output_errors.append(message)
if index_prefix_domain_map is not None:
filtered_tls = []
for report in reports_.get("smtp_tls_reports", []):
if get_index_prefix(report) is not None:
filtered_tls.append(report)
else:
domain = "unknown"
if "policies" in report and report["policies"]:
domain = report["policies"][0].get("policy_domain", "unknown")
logger.debug(
"Ignoring SMTP TLS report for domain not in "
"index_prefix_domain_map: %s",
domain,
)
reports_["smtp_tls_reports"] = filtered_tls
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
)
if not opts.silent:
print(output_str)
if opts.output:
save_output(
reports_,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
smtp_tls_json_filename=opts.smtp_tls_json_filename,
aggregate_csv_filename=opts.aggregate_csv_filename,
forensic_csv_filename=opts.forensic_csv_filename,
smtp_tls_csv_filename=opts.smtp_tls_csv_filename,
)
kafka_client = clients.get("kafka_client")
s3_client = clients.get("s3_client")
syslog_client = clients.get("syslog_client")
hec_client = clients.get("hec_client")
gelf_client = clients.get("gelf_client")
webhook_client = clients.get("webhook_client")
kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
if opts.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
if opts.elasticsearch_hosts:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
elastic.save_aggregate_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except Exception as error_:
log_output_error("Elasticsearch exception", error_.__str__())
try:
if opts.opensearch_hosts:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
opensearch.save_aggregate_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except Exception as error_:
log_output_error("OpenSearch exception", error_.__str__())
try:
if kafka_client:
kafka_client.save_aggregate_reports_to_kafka(
report, kafka_aggregate_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_aggregate_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_aggregate_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_aggregate_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
aggregate_reports_ = reports_["aggregate_reports"]
if len(aggregate_reports_) > 0:
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_forensic:
for report in reports_["forensic_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_forensic_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_forensic_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_forensic_reports_to_kafka(
report, kafka_forensic_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_forensic_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_forensic_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_forensic_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_forensic_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
forensic_reports_ = reports_["forensic_reports"]
if len(forensic_reports_) > 0:
hec_client.save_forensic_reports_to_splunk(forensic_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_smtp_tls:
for report in reports_["smtp_tls_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_smtp_tls_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_smtp_tls_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_smtp_tls_reports_to_kafka(
[report], kafka_smtp_tls_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_smtp_tls_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_smtp_tls_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_smtp_tls_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_smtp_tls_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
smtp_tls_reports_ = reports_["smtp_tls_reports"]
if len(smtp_tls_reports_) > 0:
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.la_dce:
try:
la_client = loganalytics.LogAnalyticsClient(
client_id=opts.la_client_id,
client_secret=opts.la_client_secret,
tenant_id=opts.la_tenant_id,
dce=opts.la_dce,
dcr_immutable_id=opts.la_dcr_immutable_id,
dcr_aggregate_stream=opts.la_dcr_aggregate_stream,
dcr_forensic_stream=opts.la_dcr_forensic_stream,
dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream,
)
la_client.publish_results(
reports_,
opts.save_aggregate,
opts.save_forensic,
opts.save_smtp_tls,
)
except loganalytics.LogAnalyticsException as e:
log_output_error("Log Analytics", e.__str__())
except Exception as e:
log_output_error("Log Analytics", f"Unknown publishing error: {e}")
if opts.fail_on_output_error and output_errors:
raise ParserError(
"Output destination failures detected: {0}".format(
" | ".join(output_errors)
)
)
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument(
"-c",
"--config-file",
help="a path to a configuration file (--silent implied)",
)
arg_parser.add_argument(
"file_path",
nargs="*",
help="one or more paths to aggregate or forensic "
"report files, emails, or mbox files'",
)
strip_attachment_help = "remove attachment payloads from forensic report output"
arg_parser.add_argument(
"--strip-attachment-payloads", help=strip_attachment_help, action="store_true"
)
arg_parser.add_argument(
"-o", "--output", help="write output files to the given directory"
)
arg_parser.add_argument(
"--aggregate-json-filename",
help="filename for the aggregate JSON output file",
default="aggregate.json",
)
arg_parser.add_argument(
"--forensic-json-filename",
help="filename for the forensic JSON output file",
default="forensic.json",
)
arg_parser.add_argument(
"--smtp-tls-json-filename",
help="filename for the SMTP TLS JSON output file",
default="smtp_tls.json",
)
arg_parser.add_argument(
"--aggregate-csv-filename",
help="filename for the aggregate CSV output file",
default="aggregate.csv",
)
arg_parser.add_argument(
"--forensic-csv-filename",
help="filename for the forensic CSV output file",
default="forensic.csv",
)
arg_parser.add_argument(
"--smtp-tls-csv-filename",
help="filename for the SMTP TLS CSV output file",
default="smtp_tls.csv",
)
arg_parser.add_argument(
"-n", "--nameservers", nargs="+", help="nameservers to query"
)
arg_parser.add_argument(
"-t",
"--dns_timeout",
help="number of seconds to wait for an answer from DNS (default: 2.0)",
type=float,
default=2.0,
)
arg_parser.add_argument(
"--dns-retries",
dest="dns_retries",
help="number of times to retry DNS queries on timeout or other "
"transient errors (default: 0)",
type=int,
default=0,
)
arg_parser.add_argument(
"--offline",
action="store_true",
help="do not make online queries for geolocation or DNS",
)
arg_parser.add_argument(
"-s", "--silent", action="store_true", help="only print errors"
)
arg_parser.add_argument(
"-w",
"--warnings",
action="store_true",
help="print warnings in addition to errors",
)
arg_parser.add_argument(
"--verbose", action="store_true", help="more verbose output"
)
arg_parser.add_argument(
"--debug", action="store_true", help="print debugging information"
)
arg_parser.add_argument("--log-file", default=None, help="output logging to a file")
arg_parser.add_argument(
"--no-prettify-json",
action="store_false",
dest="prettify_json",
help="output JSON in a single line without indentation",
)
arg_parser.add_argument("-v", "--version", action="version", version=__version__)
aggregate_reports = []
forensic_reports = []
smtp_tls_reports = []
args = arg_parser.parse_args()
opts = Namespace(
file_path=args.file_path,
config_file=args.config_file,
offline=args.offline,
strip_attachment_payloads=args.strip_attachment_payloads,
output=args.output,
aggregate_csv_filename=args.aggregate_csv_filename,
aggregate_json_filename=args.aggregate_json_filename,
forensic_csv_filename=args.forensic_csv_filename,
forensic_json_filename=args.forensic_json_filename,
smtp_tls_json_filename=args.smtp_tls_json_filename,
smtp_tls_csv_filename=args.smtp_tls_csv_filename,
nameservers=args.nameservers,
dns_test_address="1.1.1.1",
silent=args.silent,
warnings=args.warnings,
dns_timeout=args.dns_timeout,
dns_retries=args.dns_retries,
debug=args.debug,
verbose=args.verbose,
prettify_json=args.prettify_json,
save_aggregate=False,
save_forensic=False,
save_smtp_tls=False,
mailbox_reports_folder="INBOX",
mailbox_archive_folder="Archive",
mailbox_watch=False,
mailbox_delete=False,
mailbox_test=False,
mailbox_batch_size=10,
mailbox_check_timeout=30,
mailbox_since=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
imap_port=993,
imap_timeout=30,
imap_max_retries=4,
imap_user=None,
imap_password=None,
graph_auth_method=None,
graph_user=None,
graph_password=None,
graph_client_id=None,
graph_client_secret=None,
graph_certificate_path=None,
graph_certificate_password=None,
graph_tenant_id=None,
graph_mailbox=None,
graph_allow_unencrypted_storage=False,
graph_url="https://graph.microsoft.com",
hec=None,
hec_token=None,
hec_index=None,
hec_skip_certificate_verification=False,
elasticsearch_hosts=None,
elasticsearch_timeout=60,
elasticsearch_number_of_shards=1,
elasticsearch_number_of_replicas=0,
elasticsearch_index_suffix=None,
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_skip_certificate_verification=False,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
elasticsearch_api_key=None,
opensearch_hosts=None,
opensearch_timeout=60,
opensearch_number_of_shards=1,
opensearch_number_of_replicas=0,
opensearch_index_suffix=None,
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_skip_certificate_verification=False,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
opensearch_api_key=None,
opensearch_auth_type="basic",
opensearch_aws_region=None,
opensearch_aws_service="es",
kafka_hosts=None,
kafka_username=None,
kafka_password=None,
kafka_aggregate_topic=None,
kafka_forensic_topic=None,
kafka_smtp_tls_topic=None,
kafka_ssl=False,
kafka_skip_certificate_verification=False,
smtp_host=None,
smtp_port=25,
smtp_ssl=False,
smtp_skip_certificate_verification=False,
smtp_user=None,
smtp_password=None,
smtp_from=None,
smtp_to=[],
smtp_subject="parsedmarc report",
smtp_message="Please see the attached DMARC results.",
s3_bucket=None,
s3_path=None,
s3_region_name=None,
s3_endpoint_url=None,
s3_access_key_id=None,
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
gmail_api_paginate_messages=True,
gmail_api_scopes=[],
gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None,
maildir_create=False,
log_file=args.log_file,
n_procs=1,
ip_db_path=None,
ipinfo_url=None,
ipinfo_api_token=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
psl_overrides_path=None,
psl_overrides_url=None,
la_client_id=None,
la_client_secret=None,
la_tenant_id=None,
la_dce=None,
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_forensic_stream=None,
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
)
# Snapshot opts as set from CLI args / hardcoded defaults, before any config
# file is applied. On each SIGHUP reload we restore this baseline first so
# that sections removed from the config file actually take effect.
opts_from_cli = Namespace(**vars(opts))
index_prefix_domain_map = None
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
has_env_config = any(
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
for k in os.environ
)
if config_file or has_env_config:
try:
config = _load_config(config_file)
index_prefix_domain_map = _parse_config(config, opts)
except ConfigurationError as e:
logger.critical(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
if opts.log_file:
try:
fh = logging.FileHandler(opts.log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
opts.active_log_file = opts.log_file
if (
opts.imap_host is None
and opts.graph_client_id is None
and opts.gmail_api_credentials_file is None
and opts.maildir_path is None
and len(opts.file_path) == 0
):
logger.error("You must supply input files or a mailbox connection")
exit(1)
logger.info("Starting parsedmarc")
load_ip_db(
always_use_local_file=opts.always_use_local_files,
local_file_path=opts.ip_db_path,
url=opts.ipinfo_url,
offline=opts.offline,
)
if opts.ipinfo_api_token and not opts.offline:
try:
configure_ipinfo_api(opts.ipinfo_api_token)
except InvalidIPinfoAPIKey as e:
logger.critical(str(e))
exit(1)
load_psl_overrides(
always_use_local_file=opts.always_use_local_files,
local_file_path=opts.psl_overrides_path,
url=opts.psl_overrides_url,
offline=opts.offline,
)
# Initialize output clients (with retry for transient connection errors)
clients = {}
max_retries = 4
retry_delay = 5
for attempt in range(max_retries + 1):
try:
clients = _init_output_clients(opts)
break
except ConfigurationError as e:
logger.critical(str(e))
exit(1)
except Exception as error_:
if attempt < max_retries:
logger.warning(
"Output client error (attempt %d/%d, retrying in %ds): %s",
attempt + 1,
max_retries + 1,
retry_delay,
error_,
)
time.sleep(retry_delay)
retry_delay *= 2
else:
logger.error("Output client error: {0}".format(error_))
exit(1)
file_paths = []
mbox_paths = []
for file_path in args.file_path:
file_paths += glob(file_path)
for file_path in file_paths:
if is_mbox(file_path):
mbox_paths.append(file_path)
file_paths = list(set(file_paths))
mbox_paths = list(set(mbox_paths))
for mbox_path in mbox_paths:
file_paths.remove(mbox_path)
counter = 0
results = []
pbar = None
if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1)
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
processes = []
connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
if proc_index >= len(file_paths):
break
parent_conn, child_conn = Pipe()
connections.append(parent_conn)
process = Process(
target=cli_parse,
args=(
file_paths[proc_index],
opts.strip_attachment_payloads,
opts.nameservers,
opts.dns_timeout,
opts.dns_retries,
opts.ip_db_path,
opts.offline,
opts.always_use_local_files,
opts.reverse_dns_map_path,
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
for proc in processes:
proc.start()
for conn in connections:
results.append(conn.recv())
for proc in processes:
proc.join()
if pbar is not None:
counter += 1
pbar.update(1)
if pbar is not None:
pbar.close()
for result in results:
if isinstance(result[0], ParserError) or result[0] is None:
logger.error("Failed to parse {0} - {1}".format(result[1], result[0]))
else:
if result[0]["report_type"] == "aggregate":
report_org = result[0]["report"]["report_metadata"]["org_name"]
report_id = result[0]["report"]["report_metadata"]["report_id"]
report_key = f"{report_org}_{report_id}"
if report_key not in SEEN_AGGREGATE_REPORT_IDS:
SEEN_AGGREGATE_REPORT_IDS[report_key] = True
aggregate_reports.append(result[0]["report"])
else:
logger.debug(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif result[0]["report_type"] == "forensic":
forensic_reports.append(result[0]["report"])
elif result[0]["report_type"] == "smtp_tls":
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox(
mbox_path,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
dns_retries=opts.dns_retries,
strip_attachment_payloads=strip,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified if host is specified"
)
exit(1)
ssl = True
verify = True
if opts.imap_skip_certificate_verification:
logger.debug("Skipping IMAP certificate verification")
verify = False
if not opts.imap_ssl:
ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=imap_port_value,
ssl=ssl,
verify=verify,
timeout=imap_timeout,
max_retries=imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
)
except Exception:
logger.exception("IMAP Error")
exit(1)
if opts.graph_client_id:
try:
mailbox = opts.graph_mailbox or opts.graph_user
mailbox_connection = MSGraphConnection(
auth_method=opts.graph_auth_method,
mailbox=mailbox,
tenant_id=opts.graph_tenant_id,
client_id=opts.graph_client_id,
client_secret=opts.graph_client_secret,
certificate_path=opts.graph_certificate_path,
certificate_password=opts.graph_certificate_password,
username=opts.graph_user,
password=opts.graph_password,
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
graph_url=opts.graph_url,
)
except Exception:
logger.exception("MS Graph Error")
exit(1)
if opts.gmail_api_credentials_file:
if opts.mailbox_delete:
if "https://mail.google.com/" not in opts.gmail_api_scopes:
logger.error(
"Message deletion requires scope"
" 'https://mail.google.com/'. "
"Add the scope and remove token file "
"to acquire proper access."
)
opts.mailbox_delete = False
try:
mailbox_connection = GmailConnection(
credentials_file=opts.gmail_api_credentials_file,
token_file=opts.gmail_api_token_file,
scopes=opts.gmail_api_scopes,
include_spam_trash=opts.gmail_api_include_spam_trash,
paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
)
except Exception:
logger.exception("Gmail API Error")
exit(1)
if opts.maildir_path:
try:
mailbox_connection = MaildirConnection(
maildir_path=opts.maildir_path,
maildir_create=opts.maildir_create,
)
except Exception:
logger.exception("Maildir Error")
exit(1)
if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
dns_retries=opts.dns_retries,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
except Exception:
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
try:
process_reports(parsing_results)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if opts.smtp_host:
try:
verify = True
if opts.smtp_skip_certificate_verification:
verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,
port=smtp_port_value,
verify=verify,
username=opts.smtp_user,
password=opts.smtp_password,
subject=opts.smtp_subject,
require_encryption=opts.smtp_ssl,
)
except Exception:
logger.exception("Failed to email results")
exit(1)
# SIGHUP-based config reload for watch mode
_reload_requested = False
def _handle_sighup(signum, frame):
nonlocal _reload_requested
# Logging is not async-signal-safe; only set the flag here.
# The log message is emitted from the main loop when the flag is read.
_reload_requested = True
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
if mailbox_connection and opts.mailbox_watch:
logger.info("Watching for email - Quit with ctrl-c")
while True:
# Re-check mailbox_watch in case a config reload disabled watch mode
if not opts.mailbox_watch:
logger.info(
"Mailbox watch disabled in reloaded configuration, stopping watcher"
)
break
try:
watch_inbox(
mailbox_connection=mailbox_connection,
callback=process_reports,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
dns_retries=opts.dns_retries,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value,
since=opts.mailbox_since,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
config_reloading=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if not _reload_requested:
break
# Reload configuration — emit the log message here (not in the
# signal handler, which is not async-signal-safe), then clear the
# flag so that any new SIGHUP arriving while we reload will be
# captured for the next iteration rather than being silently dropped.
logger.info("SIGHUP received, config will reload after current batch")
_reload_requested = False
logger.info("Reloading configuration...")
try:
# Build a fresh opts starting from CLI-only defaults so that
# sections removed from the config file actually take effect.
new_opts = Namespace(**vars(opts_from_cli))
new_config = _load_config(config_file)
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
new_clients = _init_output_clients(new_opts)
# All steps succeeded — commit the changes atomically.
_close_output_clients(clients)
clients = new_clients
index_prefix_domain_map = new_index_prefix_domain_map
# Reload the reverse DNS map so changes to the
# map path/URL in the config take effect. PSL overrides
# are reloaded alongside it so map entries that depend on
# a folded base domain keep working.
load_reverse_dns_map(
REVERSE_DNS_MAP,
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.reverse_dns_map_path,
url=new_opts.reverse_dns_map_url,
offline=new_opts.offline,
psl_overrides_path=new_opts.psl_overrides_path,
psl_overrides_url=new_opts.psl_overrides_url,
)
# Reload the IP database so changes to the
# db path/URL in the config take effect.
load_ip_db(
always_use_local_file=new_opts.always_use_local_files,
local_file_path=new_opts.ip_db_path,
url=new_opts.ipinfo_url,
offline=new_opts.offline,
)
# Re-apply IPinfo API settings. Passing a falsy token disables
# the API; a rotated token picks up here too. An invalid token
# is fatal even on reload — the operator asked for it.
try:
configure_ipinfo_api(
new_opts.ipinfo_api_token if not new_opts.offline else None,
)
except InvalidIPinfoAPIKey as e:
logger.critical(str(e))
exit(1)
for k, v in vars(new_opts).items():
setattr(opts, k, v)
# Update watch parameters from reloaded config
mailbox_batch_size_value = (
int(opts.mailbox_batch_size)
if opts.mailbox_batch_size is not None
else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
# Update log level
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
# Refresh FileHandler if log_file changed
old_log_file = getattr(opts, "active_log_file", None)
new_log_file = opts.log_file
if old_log_file != new_log_file:
# Remove old FileHandlers
for h in list(logger.handlers):
if isinstance(h, logging.FileHandler):
h.close()
logger.removeHandler(h)
# Add new FileHandler if configured
if new_log_file:
try:
fh = logging.FileHandler(new_log_file, "a")
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s"
" - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
except Exception as log_error:
logger.warning(
"Unable to write to log file: {}".format(log_error)
)
opts.active_log_file = new_log_file
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(
"Config reload failed, continuing with previous config"
)
if __name__ == "__main__":
_main()