mirror of
https://github.com/domainaware/parsedmarc.git
synced 2026-05-01 17:52:31 +00:00
c5f432c460
* Add optional IPinfo Lite REST API with MMDB fallback
Configure [general] ipinfo_api_token (or PARSEDMARC_GENERAL_IPINFO_API_TOKEN)
and every IP lookup hits https://api.ipinfo.io/lite/<ip> first for fresh
country + ASN data. On HTTP 429 (rate-limit) or 402 (quota), the API is
disabled for the rest of the run and lookups fall through to the bundled /
cached MMDB; transient network errors fall through per-request without
disabling the API. An invalid token (401/403) raises InvalidIPinfoAPIKey,
which the CLI catches and exits fatally — including at startup via a probe
lookup so operators notice misconfiguration immediately. Added
ipinfo_api_url as a base-URL override for mirrors or proxies.
The API token is never logged. A new _normalize_ip_record() helper is
shared between the API path and the MMDB path so both paths produce the
same normalized shape (country code, asn int, asn_name, asn_domain).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* IPinfo API: cool down and retry instead of permanent disable
Previously a single 429 or 402 disabled the API for the whole run. Now
each event sets a cooldown (using Retry-After when present, defaulting to
5 minutes for rate limits and 1 hour for quota exhaustion). Once the
cooldown expires the next lookup retries; a successful retry logs
"IPinfo API recovered" once at info level so operators can see service
came back. Repeat rate-limit responses after the first event stay at
debug to avoid log spam.
Test now targets parsedmarc.log (the actual emitting logger) instead of
the parsedmarc parent — cli._main() sets the child's level to ERROR,
and assertLogs on the parent can't see warnings filtered before
propagation. Test also exercises the cooldown-then-recovery path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* IPinfo API: log plan and quota from /me at startup
Configure-time probe now hits https://ipinfo.io/me first. That endpoint
is documented as quota-free and doubles as a free-of-quota token check,
so we use it to both validate the token and surface plan / month-to-date
usage / remaining-quota numbers at info level:
IPinfo API configured — plan: Lite, usage: 12345/50000 this month, 37655 remaining
Field names in /me have drifted across IPinfo plan generations, so the
summary formatter probes a few aliases before giving up. If /me is
unreachable (custom mirror behind ipinfo_api_url, network error) we
fall back to the original 1.1.1.1 lookup probe, which still validates
the token and logs a generic "configured" message.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Drop speculative ipinfo_api_url override
It was added mirroring ip_db_url, but the two serve different needs.
ip_db_url has a real use (internal hosting of the MMDB); an
authenticated IPinfo API isn't something anyone mirrors, and /me was
always hardcoded anyway, making the override half-baked. YAGNI.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* AGENTS.md: warn against speculative config options
New section under Configuration spelling out that every option is
permanent surface area and must come from a real user need rather than
pattern-matching a nearby option. Cites the removed ipinfo_api_url as
the canonical cautionary tale so the next session doesn't reintroduce
it, and calls out "override the base URL" / "configurable retries" as
common YAGNI traps.
Also requires that new options land fully wired in one PR (INI schema,
_parse_config, Namespace defaults, docs, SIGHUP-reload path) rather
than half-implemented.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Rename [general] ip_db_url to ipinfo_url
The bundled MMDB is specifically IPinfo Lite, so the option name
should say so. ip_db_url stays accepted as a deprecated alias and
logs a warning when used; env-var equivalents accept either spelling
via the existing PARSEDMARC_{SECTION}_{KEY} machinery.
Updated the AGENTS.md cautionary tale to refer to ipinfo_url (with
the note about the alias) so the anti-pattern example still reads
correctly post-rename.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Fix testPSLDownload to reflect .akamaiedge.net override
PSL carries c.akamaiedge.net as a public suffix, but
psl_overrides.txt intentionally folds .akamaiedge.net so every
Akamai CDN-customer PTR (the aXXXX-XX.cXXXXX.akamaiedge.net pattern)
clusters under one akamaiedge.net display key. The override was added
in 2978436 as a design decision for source attribution; the test
assertion just predates it.
Updated the comment to explain why override wins over the live PSL
here so the next reader doesn't reach for the PSL answer again.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Sean Whalen <seanthegeek@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2450 lines
97 KiB
Python
2450 lines
97 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""A CLI for parsing DMARC reports"""
|
|
|
|
import http.client
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from argparse import ArgumentParser, Namespace
|
|
from configparser import ConfigParser
|
|
from glob import glob
|
|
from multiprocessing import Pipe, Process
|
|
from ssl import CERT_NONE, create_default_context
|
|
|
|
import yaml
|
|
from tqdm import tqdm
|
|
|
|
from parsedmarc import (
|
|
REVERSE_DNS_MAP,
|
|
SEEN_AGGREGATE_REPORT_IDS,
|
|
InvalidDMARCReport,
|
|
ParserError,
|
|
__version__,
|
|
elastic,
|
|
email_results,
|
|
gelf,
|
|
get_dmarc_reports_from_mailbox,
|
|
get_dmarc_reports_from_mbox,
|
|
kafkaclient,
|
|
loganalytics,
|
|
opensearch,
|
|
parse_report_file,
|
|
s3,
|
|
save_output,
|
|
splunk,
|
|
syslog,
|
|
watch_inbox,
|
|
webhook,
|
|
)
|
|
from parsedmarc.log import logger
|
|
from parsedmarc.mail import (
|
|
GmailConnection,
|
|
IMAPConnection,
|
|
MaildirConnection,
|
|
MSGraphConnection,
|
|
)
|
|
from parsedmarc.mail.graph import AuthMethod
|
|
from parsedmarc.types import ParsingResults
|
|
from parsedmarc.utils import (
|
|
InvalidIPinfoAPIKey,
|
|
configure_ipinfo_api,
|
|
get_base_domain,
|
|
get_reverse_dns,
|
|
is_mbox,
|
|
load_ip_db,
|
|
load_psl_overrides,
|
|
load_reverse_dns_map,
|
|
)
|
|
|
|
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
|
|
# private stdlib attribute and may not exist in type stubs.
|
|
setattr(http.client, "_MAXHEADERS", 200)
|
|
|
|
formatter = logging.Formatter(
|
|
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
|
|
datefmt="%Y-%m-%d:%H:%M:%S",
|
|
)
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
|
|
def _str_to_list(s):
|
|
"""Converts a comma separated string to a list"""
|
|
_list = s.split(",")
|
|
return list(map(lambda i: i.lstrip(), _list))
|
|
|
|
|
|
def _expand_path(p: str) -> str:
|
|
"""Expand ``~`` and ``$VAR`` references in a file path."""
|
|
return os.path.expanduser(os.path.expandvars(p))
|
|
|
|
|
|
# All known INI config section names, used for env var resolution.
|
|
_KNOWN_SECTIONS = frozenset(
|
|
{
|
|
"general",
|
|
"mailbox",
|
|
"imap",
|
|
"msgraph",
|
|
"elasticsearch",
|
|
"opensearch",
|
|
"splunk_hec",
|
|
"kafka",
|
|
"smtp",
|
|
"s3",
|
|
"syslog",
|
|
"gmail_api",
|
|
"maildir",
|
|
"log_analytics",
|
|
"gelf",
|
|
"webhook",
|
|
}
|
|
)
|
|
|
|
|
|
def _resolve_section_key(suffix: str) -> tuple:
|
|
"""Resolve an env var suffix like ``IMAP_PASSWORD`` to ``('imap', 'password')``.
|
|
|
|
Uses longest-prefix matching against known section names so that
|
|
multi-word sections like ``splunk_hec`` are handled correctly.
|
|
|
|
Returns ``(None, None)`` when no known section matches.
|
|
"""
|
|
suffix_lower = suffix.lower()
|
|
|
|
best_section = None
|
|
best_key = None
|
|
for section in _KNOWN_SECTIONS:
|
|
section_prefix = section + "_"
|
|
if suffix_lower.startswith(section_prefix):
|
|
key = suffix_lower[len(section_prefix) :]
|
|
if key and (best_section is None or len(section) > len(best_section)):
|
|
best_section = section
|
|
best_key = key
|
|
|
|
return best_section, best_key
|
|
|
|
|
|
def _apply_env_overrides(config: ConfigParser) -> None:
|
|
"""Inject ``PARSEDMARC_*`` environment variables into *config*.
|
|
|
|
Environment variables matching ``PARSEDMARC_{SECTION}_{KEY}`` override
|
|
(or create) the corresponding config-file values. Sections are created
|
|
automatically when they do not yet exist.
|
|
"""
|
|
prefix = "PARSEDMARC_"
|
|
|
|
# Short aliases that don't follow the PARSEDMARC_{SECTION}_{KEY} pattern.
|
|
_ENV_ALIASES = {
|
|
"DEBUG": ("general", "debug"),
|
|
"PARSEDMARC_DEBUG": ("general", "debug"),
|
|
}
|
|
|
|
for env_key, env_value in os.environ.items():
|
|
if env_key in _ENV_ALIASES:
|
|
section, key = _ENV_ALIASES[env_key]
|
|
elif env_key.startswith(prefix) and env_key != "PARSEDMARC_CONFIG_FILE":
|
|
suffix = env_key[len(prefix) :]
|
|
section, key = _resolve_section_key(suffix)
|
|
else:
|
|
continue
|
|
|
|
if section is None:
|
|
logger.debug("Ignoring unrecognized env var: %s", env_key)
|
|
continue
|
|
|
|
if not config.has_section(section):
|
|
config.add_section(section)
|
|
|
|
config.set(section, key, env_value)
|
|
logger.debug("Config override from env: [%s] %s", section, key)
|
|
|
|
|
|
def _configure_logging(log_level, log_file=None):
|
|
"""
|
|
Configure logging for the current process.
|
|
This is needed for child processes to properly log messages.
|
|
|
|
Args:
|
|
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
|
|
log_file: Optional path to log file
|
|
"""
|
|
# Get the logger
|
|
from parsedmarc.log import logger
|
|
|
|
# Set the log level
|
|
logger.setLevel(log_level)
|
|
|
|
# Add StreamHandler with formatter if not already present
|
|
# Check if we already have a StreamHandler to avoid duplicates
|
|
# Use exact type check to distinguish from FileHandler subclass
|
|
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
|
|
|
|
if not has_stream_handler:
|
|
formatter = logging.Formatter(
|
|
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
|
|
datefmt="%Y-%m-%d:%H:%M:%S",
|
|
)
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
# Add FileHandler if log_file is specified
|
|
if log_file:
|
|
try:
|
|
fh = logging.FileHandler(log_file, "a")
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
|
|
)
|
|
fh.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
except (IOError, OSError, PermissionError) as error:
|
|
logger.warning("Unable to write to log file: {}".format(error))
|
|
|
|
|
|
def cli_parse(
|
|
file_path,
|
|
sa,
|
|
nameservers,
|
|
dns_timeout,
|
|
dns_retries,
|
|
ip_db_path,
|
|
offline,
|
|
always_use_local_files,
|
|
reverse_dns_map_path,
|
|
reverse_dns_map_url,
|
|
normalize_timespan_threshold_hours,
|
|
conn,
|
|
log_level=logging.ERROR,
|
|
log_file=None,
|
|
):
|
|
"""Separated this function for multiprocessing
|
|
|
|
Args:
|
|
file_path: Path to the report file
|
|
sa: Strip attachment payloads flag
|
|
nameservers: List of nameservers
|
|
dns_timeout: DNS timeout
|
|
dns_retries: Number of DNS retries on transient errors
|
|
ip_db_path: Path to IP database
|
|
offline: Offline mode flag
|
|
always_use_local_files: Always use local files flag
|
|
reverse_dns_map_path: Path to reverse DNS map
|
|
reverse_dns_map_url: URL to reverse DNS map
|
|
normalize_timespan_threshold_hours: Timespan threshold
|
|
conn: Pipe connection for IPC
|
|
log_level: Logging level for this process
|
|
log_file: Optional path to log file
|
|
"""
|
|
# Configure logging in this child process
|
|
_configure_logging(log_level, log_file)
|
|
|
|
try:
|
|
file_results = parse_report_file(
|
|
file_path,
|
|
ip_db_path=ip_db_path,
|
|
offline=offline,
|
|
always_use_local_files=always_use_local_files,
|
|
reverse_dns_map_path=reverse_dns_map_path,
|
|
reverse_dns_map_url=reverse_dns_map_url,
|
|
nameservers=nameservers,
|
|
dns_timeout=dns_timeout,
|
|
dns_retries=dns_retries,
|
|
strip_attachment_payloads=sa,
|
|
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
|
|
)
|
|
conn.send([file_results, file_path])
|
|
except ParserError as error:
|
|
conn.send([error, file_path])
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
class ConfigurationError(Exception):
|
|
"""Raised when a configuration file has missing or invalid settings."""
|
|
|
|
pass
|
|
|
|
|
|
def _load_config(config_file: str | None = None) -> ConfigParser:
|
|
"""Load configuration from an INI file and/or environment variables.
|
|
|
|
Args:
|
|
config_file: Optional path to an .ini config file.
|
|
|
|
Returns:
|
|
A ``ConfigParser`` populated from the file (if given) and from any
|
|
``PARSEDMARC_*`` environment variables.
|
|
|
|
Raises:
|
|
ConfigurationError: If *config_file* is given but does not exist.
|
|
"""
|
|
config = ConfigParser(interpolation=None)
|
|
if config_file is not None:
|
|
abs_path = os.path.abspath(config_file)
|
|
if not os.path.exists(abs_path):
|
|
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
|
|
if not os.access(abs_path, os.R_OK):
|
|
raise ConfigurationError(
|
|
"Unable to read {0} — check file permissions".format(abs_path)
|
|
)
|
|
config.read(config_file)
|
|
_apply_env_overrides(config)
|
|
return config
|
|
|
|
|
|
def _parse_config(config: ConfigParser, opts):
|
|
"""Apply a loaded ``ConfigParser`` to *opts* in place.
|
|
|
|
Args:
|
|
config: A ``ConfigParser`` (from ``_load_config``).
|
|
opts: Namespace object to update with parsed values.
|
|
|
|
Returns:
|
|
index_prefix_domain_map or None
|
|
|
|
Raises:
|
|
ConfigurationError: If required settings are missing or invalid.
|
|
"""
|
|
opts.silent = True
|
|
index_prefix_domain_map = None
|
|
if "general" in config.sections():
|
|
general_config = config["general"]
|
|
if "silent" in general_config:
|
|
opts.silent = bool(general_config.getboolean("silent"))
|
|
if "normalize_timespan_threshold_hours" in general_config:
|
|
opts.normalize_timespan_threshold_hours = general_config.getfloat(
|
|
"normalize_timespan_threshold_hours"
|
|
)
|
|
if "index_prefix_domain_map" in general_config:
|
|
with open(_expand_path(general_config["index_prefix_domain_map"])) as f:
|
|
index_prefix_domain_map = yaml.safe_load(f)
|
|
if "offline" in general_config:
|
|
opts.offline = bool(general_config.getboolean("offline"))
|
|
if "strip_attachment_payloads" in general_config:
|
|
opts.strip_attachment_payloads = bool(
|
|
general_config.getboolean("strip_attachment_payloads")
|
|
)
|
|
if "output" in general_config:
|
|
opts.output = _expand_path(general_config["output"])
|
|
if "aggregate_json_filename" in general_config:
|
|
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
|
|
if "forensic_json_filename" in general_config:
|
|
opts.forensic_json_filename = general_config["forensic_json_filename"]
|
|
if "smtp_tls_json_filename" in general_config:
|
|
opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"]
|
|
if "aggregate_csv_filename" in general_config:
|
|
opts.aggregate_csv_filename = general_config["aggregate_csv_filename"]
|
|
if "forensic_csv_filename" in general_config:
|
|
opts.forensic_csv_filename = general_config["forensic_csv_filename"]
|
|
if "smtp_tls_csv_filename" in general_config:
|
|
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
|
|
if "dns_timeout" in general_config:
|
|
opts.dns_timeout = general_config.getfloat("dns_timeout")
|
|
if opts.dns_timeout is None:
|
|
opts.dns_timeout = 2
|
|
if "dns_retries" in general_config:
|
|
opts.dns_retries = general_config.getint("dns_retries")
|
|
if opts.dns_retries is None:
|
|
opts.dns_retries = 0
|
|
if "dns_test_address" in general_config:
|
|
opts.dns_test_address = general_config["dns_test_address"]
|
|
if "nameservers" in general_config:
|
|
opts.nameservers = _str_to_list(general_config["nameservers"])
|
|
# nameservers pre-flight check
|
|
dummy_hostname = None
|
|
try:
|
|
dummy_hostname = get_reverse_dns(
|
|
opts.dns_test_address,
|
|
nameservers=opts.nameservers,
|
|
timeout=opts.dns_timeout,
|
|
)
|
|
except Exception as ns_error:
|
|
raise ConfigurationError(
|
|
"DNS pre-flight check failed: {}".format(ns_error)
|
|
) from ns_error
|
|
if not dummy_hostname:
|
|
raise ConfigurationError(
|
|
"DNS pre-flight check failed: no PTR record for {} from {}".format(
|
|
opts.dns_test_address, opts.nameservers
|
|
)
|
|
)
|
|
if "save_aggregate" in general_config:
|
|
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
|
|
if "save_forensic" in general_config:
|
|
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
|
|
if "save_smtp_tls" in general_config:
|
|
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
|
|
if "debug" in general_config:
|
|
opts.debug = bool(general_config.getboolean("debug"))
|
|
if "verbose" in general_config:
|
|
opts.verbose = bool(general_config.getboolean("verbose"))
|
|
if "warnings" in general_config:
|
|
opts.warnings = bool(general_config.getboolean("warnings"))
|
|
if "fail_on_output_error" in general_config:
|
|
opts.fail_on_output_error = bool(
|
|
general_config.getboolean("fail_on_output_error")
|
|
)
|
|
if "log_file" in general_config:
|
|
opts.log_file = _expand_path(general_config["log_file"])
|
|
if "n_procs" in general_config:
|
|
opts.n_procs = general_config.getint("n_procs")
|
|
if "ip_db_path" in general_config:
|
|
opts.ip_db_path = _expand_path(general_config["ip_db_path"])
|
|
else:
|
|
opts.ip_db_path = None
|
|
if "ipinfo_url" in general_config:
|
|
opts.ipinfo_url = general_config["ipinfo_url"]
|
|
elif "ip_db_url" in general_config:
|
|
# ``ip_db_url`` is the pre-9.10 name for the same option. Accept
|
|
# it as a deprecated alias; prefer ``ipinfo_url`` going forward.
|
|
opts.ipinfo_url = general_config["ip_db_url"]
|
|
logger.warning("[general] ip_db_url is deprecated; rename it to ipinfo_url")
|
|
if "ipinfo_api_token" in general_config:
|
|
opts.ipinfo_api_token = general_config["ipinfo_api_token"]
|
|
if "always_use_local_files" in general_config:
|
|
opts.always_use_local_files = bool(
|
|
general_config.getboolean("always_use_local_files")
|
|
)
|
|
if "local_reverse_dns_map_path" in general_config:
|
|
opts.reverse_dns_map_path = _expand_path(
|
|
general_config["local_reverse_dns_map_path"]
|
|
)
|
|
if "reverse_dns_map_url" in general_config:
|
|
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
|
|
if "local_psl_overrides_path" in general_config:
|
|
opts.psl_overrides_path = _expand_path(
|
|
general_config["local_psl_overrides_path"]
|
|
)
|
|
if "psl_overrides_url" in general_config:
|
|
opts.psl_overrides_url = general_config["psl_overrides_url"]
|
|
if "prettify_json" in general_config:
|
|
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
|
|
|
|
if "mailbox" in config.sections():
|
|
mailbox_config = config["mailbox"]
|
|
if "msgraph" in config.sections():
|
|
opts.mailbox_reports_folder = "Inbox"
|
|
if "reports_folder" in mailbox_config:
|
|
opts.mailbox_reports_folder = mailbox_config["reports_folder"]
|
|
if "archive_folder" in mailbox_config:
|
|
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
|
|
if "watch" in mailbox_config:
|
|
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
|
|
if "delete" in mailbox_config:
|
|
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
|
|
if "test" in mailbox_config:
|
|
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
|
|
if "batch_size" in mailbox_config:
|
|
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
|
|
if "check_timeout" in mailbox_config:
|
|
opts.mailbox_check_timeout = mailbox_config.getint("check_timeout")
|
|
if "since" in mailbox_config:
|
|
opts.mailbox_since = mailbox_config["since"]
|
|
|
|
if "imap" in config.sections():
|
|
imap_config = config["imap"]
|
|
if "watch" in imap_config:
|
|
logger.warning(
|
|
"Starting in 8.0.0, the watch option has been "
|
|
"moved from the imap configuration section to "
|
|
"the mailbox configuration section."
|
|
)
|
|
if "host" in imap_config:
|
|
opts.imap_host = imap_config["host"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"host setting missing from the imap config section"
|
|
)
|
|
if "port" in imap_config:
|
|
opts.imap_port = imap_config.getint("port")
|
|
if "timeout" in imap_config:
|
|
opts.imap_timeout = imap_config.getint("timeout")
|
|
if "max_retries" in imap_config:
|
|
opts.imap_max_retries = imap_config.getint("max_retries")
|
|
if "ssl" in imap_config:
|
|
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
|
|
if "skip_certificate_verification" in imap_config:
|
|
opts.imap_skip_certificate_verification = bool(
|
|
imap_config.getboolean("skip_certificate_verification")
|
|
)
|
|
if "user" in imap_config:
|
|
opts.imap_user = imap_config["user"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"user setting missing from the imap config section"
|
|
)
|
|
if "password" in imap_config:
|
|
opts.imap_password = imap_config["password"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"password setting missing from the imap config section"
|
|
)
|
|
if "reports_folder" in imap_config:
|
|
opts.mailbox_reports_folder = imap_config["reports_folder"]
|
|
logger.warning(
|
|
"Use of the reports_folder option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
if "archive_folder" in imap_config:
|
|
opts.mailbox_archive_folder = imap_config["archive_folder"]
|
|
logger.warning(
|
|
"Use of the archive_folder option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
if "watch" in imap_config:
|
|
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
|
|
logger.warning(
|
|
"Use of the watch option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
if "delete" in imap_config:
|
|
logger.warning(
|
|
"Use of the delete option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
if "test" in imap_config:
|
|
opts.mailbox_test = bool(imap_config.getboolean("test"))
|
|
logger.warning(
|
|
"Use of the test option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
if "batch_size" in imap_config:
|
|
opts.mailbox_batch_size = imap_config.getint("batch_size")
|
|
logger.warning(
|
|
"Use of the batch_size option in the imap "
|
|
"configuration section has been deprecated. "
|
|
"Use this option in the mailbox configuration "
|
|
"section instead."
|
|
)
|
|
|
|
if "msgraph" in config.sections():
|
|
graph_config = config["msgraph"]
|
|
opts.graph_token_file = _expand_path(graph_config.get("token_file", ".token"))
|
|
|
|
if "auth_method" not in graph_config:
|
|
logger.info(
|
|
"auth_method setting missing from the "
|
|
"msgraph config section "
|
|
"defaulting to UsernamePassword"
|
|
)
|
|
opts.graph_auth_method = AuthMethod.UsernamePassword.name
|
|
else:
|
|
opts.graph_auth_method = graph_config["auth_method"]
|
|
|
|
if opts.graph_auth_method == AuthMethod.UsernamePassword.name:
|
|
if "user" in graph_config:
|
|
opts.graph_user = graph_config["user"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"user setting missing from the msgraph config section"
|
|
)
|
|
if "password" in graph_config:
|
|
opts.graph_password = graph_config["password"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"password setting missing from the msgraph config section"
|
|
)
|
|
if "client_secret" in graph_config:
|
|
opts.graph_client_secret = graph_config["client_secret"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"client_secret setting missing from the msgraph config section"
|
|
)
|
|
|
|
if opts.graph_auth_method == AuthMethod.DeviceCode.name:
|
|
if "user" in graph_config:
|
|
opts.graph_user = graph_config["user"]
|
|
|
|
if opts.graph_auth_method != AuthMethod.UsernamePassword.name:
|
|
if "tenant_id" in graph_config:
|
|
opts.graph_tenant_id = graph_config["tenant_id"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"tenant_id setting missing from the msgraph config section"
|
|
)
|
|
|
|
if opts.graph_auth_method == AuthMethod.ClientSecret.name:
|
|
if "client_secret" in graph_config:
|
|
opts.graph_client_secret = graph_config["client_secret"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"client_secret setting missing from the msgraph config section"
|
|
)
|
|
|
|
if opts.graph_auth_method == AuthMethod.Certificate.name:
|
|
if "certificate_path" in graph_config:
|
|
opts.graph_certificate_path = _expand_path(
|
|
graph_config["certificate_path"]
|
|
)
|
|
else:
|
|
raise ConfigurationError(
|
|
"certificate_path setting missing from the msgraph config section"
|
|
)
|
|
if "certificate_password" in graph_config:
|
|
opts.graph_certificate_password = graph_config["certificate_password"]
|
|
|
|
if "client_id" in graph_config:
|
|
opts.graph_client_id = graph_config["client_id"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"client_id setting missing from the msgraph config section"
|
|
)
|
|
|
|
if "mailbox" in graph_config:
|
|
opts.graph_mailbox = graph_config["mailbox"]
|
|
elif opts.graph_auth_method != AuthMethod.UsernamePassword.name:
|
|
raise ConfigurationError(
|
|
"mailbox setting missing from the msgraph config section"
|
|
)
|
|
|
|
if "graph_url" in graph_config:
|
|
opts.graph_url = graph_config["graph_url"]
|
|
elif "url" in graph_config:
|
|
opts.graph_url = graph_config["url"]
|
|
|
|
if "allow_unencrypted_storage" in graph_config:
|
|
opts.graph_allow_unencrypted_storage = bool(
|
|
graph_config.getboolean("allow_unencrypted_storage")
|
|
)
|
|
|
|
if "elasticsearch" in config:
|
|
elasticsearch_config = config["elasticsearch"]
|
|
if "hosts" in elasticsearch_config:
|
|
opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"])
|
|
else:
|
|
raise ConfigurationError(
|
|
"hosts setting missing from the elasticsearch config section"
|
|
)
|
|
if "timeout" in elasticsearch_config:
|
|
timeout = elasticsearch_config.getfloat("timeout")
|
|
opts.elasticsearch_timeout = timeout
|
|
if "number_of_shards" in elasticsearch_config:
|
|
number_of_shards = elasticsearch_config.getint("number_of_shards")
|
|
opts.elasticsearch_number_of_shards = number_of_shards
|
|
if "number_of_replicas" in elasticsearch_config:
|
|
number_of_replicas = elasticsearch_config.getint("number_of_replicas")
|
|
opts.elasticsearch_number_of_replicas = number_of_replicas
|
|
if "index_suffix" in elasticsearch_config:
|
|
opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"]
|
|
if "index_prefix" in elasticsearch_config:
|
|
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
|
|
if "monthly_indexes" in elasticsearch_config:
|
|
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
|
|
opts.elasticsearch_monthly_indexes = monthly
|
|
if "ssl" in elasticsearch_config:
|
|
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
|
|
if "cert_path" in elasticsearch_config:
|
|
opts.elasticsearch_ssl_cert_path = _expand_path(
|
|
elasticsearch_config["cert_path"]
|
|
)
|
|
if "skip_certificate_verification" in elasticsearch_config:
|
|
opts.elasticsearch_skip_certificate_verification = bool(
|
|
elasticsearch_config.getboolean("skip_certificate_verification")
|
|
)
|
|
if "user" in elasticsearch_config:
|
|
opts.elasticsearch_username = elasticsearch_config["user"]
|
|
if "password" in elasticsearch_config:
|
|
opts.elasticsearch_password = elasticsearch_config["password"]
|
|
# Until 8.20
|
|
if "apiKey" in elasticsearch_config:
|
|
opts.elasticsearch_api_key = elasticsearch_config["apiKey"]
|
|
# Since 8.20
|
|
if "api_key" in elasticsearch_config:
|
|
opts.elasticsearch_api_key = elasticsearch_config["api_key"]
|
|
|
|
if "opensearch" in config:
|
|
opensearch_config = config["opensearch"]
|
|
if "hosts" in opensearch_config:
|
|
opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"])
|
|
else:
|
|
raise ConfigurationError(
|
|
"hosts setting missing from the opensearch config section"
|
|
)
|
|
if "timeout" in opensearch_config:
|
|
timeout = opensearch_config.getfloat("timeout")
|
|
opts.opensearch_timeout = timeout
|
|
if "number_of_shards" in opensearch_config:
|
|
number_of_shards = opensearch_config.getint("number_of_shards")
|
|
opts.opensearch_number_of_shards = number_of_shards
|
|
if "number_of_replicas" in opensearch_config:
|
|
number_of_replicas = opensearch_config.getint("number_of_replicas")
|
|
opts.opensearch_number_of_replicas = number_of_replicas
|
|
if "index_suffix" in opensearch_config:
|
|
opts.opensearch_index_suffix = opensearch_config["index_suffix"]
|
|
if "index_prefix" in opensearch_config:
|
|
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
|
|
if "monthly_indexes" in opensearch_config:
|
|
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
|
|
opts.opensearch_monthly_indexes = monthly
|
|
if "ssl" in opensearch_config:
|
|
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
|
|
if "cert_path" in opensearch_config:
|
|
opts.opensearch_ssl_cert_path = _expand_path(opensearch_config["cert_path"])
|
|
if "skip_certificate_verification" in opensearch_config:
|
|
opts.opensearch_skip_certificate_verification = bool(
|
|
opensearch_config.getboolean("skip_certificate_verification")
|
|
)
|
|
if "user" in opensearch_config:
|
|
opts.opensearch_username = opensearch_config["user"]
|
|
if "password" in opensearch_config:
|
|
opts.opensearch_password = opensearch_config["password"]
|
|
# Until 8.20
|
|
if "apiKey" in opensearch_config:
|
|
opts.opensearch_api_key = opensearch_config["apiKey"]
|
|
# Since 8.20
|
|
if "api_key" in opensearch_config:
|
|
opts.opensearch_api_key = opensearch_config["api_key"]
|
|
if "auth_type" in opensearch_config:
|
|
opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower()
|
|
elif "authentication_type" in opensearch_config:
|
|
opts.opensearch_auth_type = (
|
|
opensearch_config["authentication_type"].strip().lower()
|
|
)
|
|
if "aws_region" in opensearch_config:
|
|
opts.opensearch_aws_region = opensearch_config["aws_region"].strip()
|
|
if "aws_service" in opensearch_config:
|
|
opts.opensearch_aws_service = opensearch_config["aws_service"].strip()
|
|
|
|
if "splunk_hec" in config.sections():
|
|
hec_config = config["splunk_hec"]
|
|
if "url" in hec_config:
|
|
opts.hec = hec_config["url"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"url setting missing from the splunk_hec config section"
|
|
)
|
|
if "token" in hec_config:
|
|
opts.hec_token = hec_config["token"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"token setting missing from the splunk_hec config section"
|
|
)
|
|
if "index" in hec_config:
|
|
opts.hec_index = hec_config["index"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"index setting missing from the splunk_hec config section"
|
|
)
|
|
if "skip_certificate_verification" in hec_config:
|
|
opts.hec_skip_certificate_verification = bool(
|
|
hec_config.getboolean("skip_certificate_verification", fallback=False)
|
|
)
|
|
|
|
if "kafka" in config.sections():
|
|
kafka_config = config["kafka"]
|
|
if "hosts" in kafka_config:
|
|
opts.kafka_hosts = _str_to_list(kafka_config["hosts"])
|
|
else:
|
|
raise ConfigurationError(
|
|
"hosts setting missing from the kafka config section"
|
|
)
|
|
if "user" in kafka_config:
|
|
opts.kafka_username = kafka_config["user"]
|
|
if "password" in kafka_config:
|
|
opts.kafka_password = kafka_config["password"]
|
|
if "ssl" in kafka_config:
|
|
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
|
|
if "skip_certificate_verification" in kafka_config:
|
|
kafka_verify = bool(
|
|
kafka_config.getboolean("skip_certificate_verification")
|
|
)
|
|
opts.kafka_skip_certificate_verification = kafka_verify
|
|
if "aggregate_topic" in kafka_config:
|
|
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"aggregate_topic setting missing from the kafka config section"
|
|
)
|
|
if "forensic_topic" in kafka_config:
|
|
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"forensic_topic setting missing from the kafka config section"
|
|
)
|
|
if "smtp_tls_topic" in kafka_config:
|
|
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"smtp_tls_topic setting missing from the kafka config section"
|
|
)
|
|
|
|
if "smtp" in config.sections():
|
|
smtp_config = config["smtp"]
|
|
if "host" in smtp_config:
|
|
opts.smtp_host = smtp_config["host"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"host setting missing from the smtp config section"
|
|
)
|
|
if "port" in smtp_config:
|
|
opts.smtp_port = smtp_config.getint("port")
|
|
if "ssl" in smtp_config:
|
|
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
|
|
if "skip_certificate_verification" in smtp_config:
|
|
smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification"))
|
|
opts.smtp_skip_certificate_verification = smtp_verify
|
|
if "user" in smtp_config:
|
|
opts.smtp_user = smtp_config["user"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"user setting missing from the smtp config section"
|
|
)
|
|
if "password" in smtp_config:
|
|
opts.smtp_password = smtp_config["password"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"password setting missing from the smtp config section"
|
|
)
|
|
if "from" in smtp_config:
|
|
opts.smtp_from = smtp_config["from"]
|
|
else:
|
|
logger.critical("from setting missing from the smtp config section")
|
|
if "to" in smtp_config:
|
|
opts.smtp_to = _str_to_list(smtp_config["to"])
|
|
else:
|
|
logger.critical("to setting missing from the smtp config section")
|
|
if "subject" in smtp_config:
|
|
opts.smtp_subject = smtp_config["subject"]
|
|
if "attachment" in smtp_config:
|
|
opts.smtp_attachment = _expand_path(smtp_config["attachment"])
|
|
if "message" in smtp_config:
|
|
opts.smtp_message = smtp_config["message"]
|
|
|
|
if "s3" in config.sections():
|
|
s3_config = config["s3"]
|
|
if "bucket" in s3_config:
|
|
opts.s3_bucket = s3_config["bucket"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"bucket setting missing from the s3 config section"
|
|
)
|
|
if "path" in s3_config:
|
|
opts.s3_path = s3_config["path"]
|
|
if opts.s3_path.startswith("/"):
|
|
opts.s3_path = opts.s3_path[1:]
|
|
if opts.s3_path.endswith("/"):
|
|
opts.s3_path = opts.s3_path[:-1]
|
|
else:
|
|
opts.s3_path = ""
|
|
|
|
if "region_name" in s3_config:
|
|
opts.s3_region_name = s3_config["region_name"]
|
|
if "endpoint_url" in s3_config:
|
|
opts.s3_endpoint_url = s3_config["endpoint_url"]
|
|
if "access_key_id" in s3_config:
|
|
opts.s3_access_key_id = s3_config["access_key_id"]
|
|
if "secret_access_key" in s3_config:
|
|
opts.s3_secret_access_key = s3_config["secret_access_key"]
|
|
|
|
if "syslog" in config.sections():
|
|
syslog_config = config["syslog"]
|
|
if "server" in syslog_config:
|
|
opts.syslog_server = syslog_config["server"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"server setting missing from the syslog config section"
|
|
)
|
|
if "port" in syslog_config:
|
|
opts.syslog_port = syslog_config["port"]
|
|
else:
|
|
opts.syslog_port = 514
|
|
if "protocol" in syslog_config:
|
|
opts.syslog_protocol = syslog_config["protocol"]
|
|
else:
|
|
opts.syslog_protocol = "udp"
|
|
if "cafile_path" in syslog_config:
|
|
opts.syslog_cafile_path = _expand_path(syslog_config["cafile_path"])
|
|
if "certfile_path" in syslog_config:
|
|
opts.syslog_certfile_path = _expand_path(syslog_config["certfile_path"])
|
|
if "keyfile_path" in syslog_config:
|
|
opts.syslog_keyfile_path = _expand_path(syslog_config["keyfile_path"])
|
|
if "timeout" in syslog_config:
|
|
opts.syslog_timeout = float(syslog_config["timeout"])
|
|
else:
|
|
opts.syslog_timeout = 5.0
|
|
if "retry_attempts" in syslog_config:
|
|
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
|
|
else:
|
|
opts.syslog_retry_attempts = 3
|
|
if "retry_delay" in syslog_config:
|
|
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
|
|
else:
|
|
opts.syslog_retry_delay = 5
|
|
|
|
if "gmail_api" in config.sections():
|
|
gmail_api_config = config["gmail_api"]
|
|
gmail_creds = gmail_api_config.get("credentials_file")
|
|
opts.gmail_api_credentials_file = (
|
|
_expand_path(gmail_creds) if gmail_creds else gmail_creds
|
|
)
|
|
opts.gmail_api_token_file = _expand_path(
|
|
gmail_api_config.get("token_file", ".token")
|
|
)
|
|
opts.gmail_api_include_spam_trash = bool(
|
|
gmail_api_config.getboolean("include_spam_trash", False)
|
|
)
|
|
opts.gmail_api_paginate_messages = bool(
|
|
gmail_api_config.getboolean("paginate_messages", True)
|
|
)
|
|
default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify"
|
|
opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope)
|
|
opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes)
|
|
if "oauth2_port" in gmail_api_config:
|
|
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
|
|
if "auth_mode" in gmail_api_config:
|
|
opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip()
|
|
if "service_account_user" in gmail_api_config:
|
|
opts.gmail_api_service_account_user = gmail_api_config[
|
|
"service_account_user"
|
|
].strip()
|
|
elif "delegated_user" in gmail_api_config:
|
|
opts.gmail_api_service_account_user = gmail_api_config[
|
|
"delegated_user"
|
|
].strip()
|
|
|
|
if "maildir" in config.sections():
|
|
maildir_api_config = config["maildir"]
|
|
maildir_p = maildir_api_config.get(
|
|
"maildir_path", maildir_api_config.get("path")
|
|
)
|
|
opts.maildir_path = _expand_path(maildir_p) if maildir_p else maildir_p
|
|
opts.maildir_create = bool(
|
|
maildir_api_config.getboolean(
|
|
"maildir_create",
|
|
fallback=maildir_api_config.getboolean("create", fallback=False),
|
|
)
|
|
)
|
|
|
|
if "log_analytics" in config.sections():
|
|
log_analytics_config = config["log_analytics"]
|
|
opts.la_client_id = log_analytics_config.get("client_id")
|
|
opts.la_client_secret = log_analytics_config.get("client_secret")
|
|
opts.la_tenant_id = log_analytics_config.get("tenant_id")
|
|
opts.la_dce = log_analytics_config.get("dce")
|
|
opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id")
|
|
opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream")
|
|
opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream")
|
|
opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream")
|
|
|
|
if "gelf" in config.sections():
|
|
gelf_config = config["gelf"]
|
|
if "host" in gelf_config:
|
|
opts.gelf_host = gelf_config["host"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"host setting missing from the gelf config section"
|
|
)
|
|
if "port" in gelf_config:
|
|
opts.gelf_port = gelf_config["port"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"port setting missing from the gelf config section"
|
|
)
|
|
if "mode" in gelf_config:
|
|
opts.gelf_mode = gelf_config["mode"]
|
|
else:
|
|
raise ConfigurationError(
|
|
"mode setting missing from the gelf config section"
|
|
)
|
|
|
|
if "webhook" in config.sections():
|
|
webhook_config = config["webhook"]
|
|
if "aggregate_url" in webhook_config:
|
|
opts.webhook_aggregate_url = webhook_config["aggregate_url"]
|
|
if "forensic_url" in webhook_config:
|
|
opts.webhook_forensic_url = webhook_config["forensic_url"]
|
|
if "smtp_tls_url" in webhook_config:
|
|
opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"]
|
|
if "timeout" in webhook_config:
|
|
opts.webhook_timeout = webhook_config.getint("timeout")
|
|
|
|
return index_prefix_domain_map
|
|
|
|
|
|
class _ElasticsearchHandle:
|
|
"""Sentinel so Elasticsearch participates in _close_output_clients."""
|
|
|
|
def close(self):
|
|
try:
|
|
conn = elastic.connections.get_connection()
|
|
if not isinstance(conn, str):
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
elastic.connections.remove_connection("default")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class _OpenSearchHandle:
|
|
"""Sentinel so OpenSearch participates in _close_output_clients."""
|
|
|
|
def close(self):
|
|
try:
|
|
conn = opensearch.connections.get_connection()
|
|
if not isinstance(conn, str):
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
opensearch.connections.remove_connection("default")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _init_output_clients(opts):
|
|
"""Create output clients based on current opts.
|
|
|
|
Returns:
|
|
dict of client instances keyed by name.
|
|
|
|
Raises:
|
|
ConfigurationError: If a required output client cannot be created.
|
|
"""
|
|
clients = {}
|
|
|
|
try:
|
|
if opts.s3_bucket:
|
|
logger.debug("Initializing S3 client: bucket=%s", opts.s3_bucket)
|
|
clients["s3_client"] = s3.S3Client(
|
|
bucket_name=opts.s3_bucket,
|
|
bucket_path=opts.s3_path,
|
|
region_name=opts.s3_region_name,
|
|
endpoint_url=opts.s3_endpoint_url,
|
|
access_key_id=opts.s3_access_key_id,
|
|
secret_access_key=opts.s3_secret_access_key,
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"S3: {e}") from e
|
|
|
|
try:
|
|
if opts.syslog_server:
|
|
logger.debug(
|
|
"Initializing syslog client: server=%s:%s",
|
|
opts.syslog_server,
|
|
opts.syslog_port,
|
|
)
|
|
clients["syslog_client"] = syslog.SyslogClient(
|
|
server_name=opts.syslog_server,
|
|
server_port=int(opts.syslog_port),
|
|
protocol=opts.syslog_protocol or "udp",
|
|
cafile_path=opts.syslog_cafile_path,
|
|
certfile_path=opts.syslog_certfile_path,
|
|
keyfile_path=opts.syslog_keyfile_path,
|
|
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
|
|
retry_attempts=opts.syslog_retry_attempts
|
|
if opts.syslog_retry_attempts is not None
|
|
else 3,
|
|
retry_delay=opts.syslog_retry_delay
|
|
if opts.syslog_retry_delay is not None
|
|
else 5,
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Syslog: {e}") from e
|
|
|
|
if opts.hec:
|
|
if opts.hec_token is None or opts.hec_index is None:
|
|
raise ConfigurationError(
|
|
"HEC token and HEC index are required when using HEC URL"
|
|
)
|
|
try:
|
|
logger.debug("Initializing Splunk HEC client: url=%s", opts.hec)
|
|
verify = True
|
|
if opts.hec_skip_certificate_verification:
|
|
verify = False
|
|
clients["hec_client"] = splunk.HECClient(
|
|
opts.hec, opts.hec_token, opts.hec_index, verify=verify
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Splunk HEC: {e}") from e
|
|
|
|
try:
|
|
if opts.kafka_hosts:
|
|
logger.debug("Initializing Kafka client: hosts=%s", opts.kafka_hosts)
|
|
ssl_context = None
|
|
if opts.kafka_skip_certificate_verification:
|
|
logger.debug("Skipping Kafka certificate verification")
|
|
ssl_context = create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = CERT_NONE
|
|
clients["kafka_client"] = kafkaclient.KafkaClient(
|
|
opts.kafka_hosts,
|
|
username=opts.kafka_username,
|
|
password=opts.kafka_password,
|
|
ssl_context=ssl_context,
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Kafka: {e}") from e
|
|
|
|
try:
|
|
if opts.gelf_host:
|
|
logger.debug(
|
|
"Initializing GELF client: host=%s:%s",
|
|
opts.gelf_host,
|
|
opts.gelf_port,
|
|
)
|
|
clients["gelf_client"] = gelf.GelfClient(
|
|
host=opts.gelf_host,
|
|
port=int(opts.gelf_port),
|
|
mode=opts.gelf_mode,
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"GELF: {e}") from e
|
|
|
|
try:
|
|
if (
|
|
opts.webhook_aggregate_url
|
|
or opts.webhook_forensic_url
|
|
or opts.webhook_smtp_tls_url
|
|
):
|
|
logger.debug("Initializing webhook client")
|
|
clients["webhook_client"] = webhook.WebhookClient(
|
|
aggregate_url=opts.webhook_aggregate_url,
|
|
forensic_url=opts.webhook_forensic_url,
|
|
smtp_tls_url=opts.webhook_smtp_tls_url,
|
|
timeout=opts.webhook_timeout,
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Webhook: {e}") from e
|
|
|
|
# Elasticsearch and OpenSearch mutate module-level global state via
|
|
# connections.create_connection(), which cannot be rolled back if a later
|
|
# step fails. Initialise them last so that all other clients are created
|
|
# successfully first; this minimizes the window for partial-init problems
|
|
# during config reload.
|
|
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
|
|
try:
|
|
if opts.elasticsearch_hosts:
|
|
logger.debug(
|
|
"Initializing Elasticsearch client: hosts=%s, ssl=%s",
|
|
opts.elasticsearch_hosts,
|
|
opts.elasticsearch_ssl,
|
|
)
|
|
es_aggregate_index = "dmarc_aggregate"
|
|
es_forensic_index = "dmarc_forensic"
|
|
es_smtp_tls_index = "smtp_tls"
|
|
if opts.elasticsearch_index_suffix:
|
|
suffix = opts.elasticsearch_index_suffix
|
|
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
|
|
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
|
|
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
|
|
if opts.elasticsearch_index_prefix:
|
|
prefix = opts.elasticsearch_index_prefix
|
|
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
|
|
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
|
|
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
|
|
elastic_timeout_value = (
|
|
float(opts.elasticsearch_timeout)
|
|
if opts.elasticsearch_timeout is not None
|
|
else 60.0
|
|
)
|
|
elastic.set_hosts(
|
|
opts.elasticsearch_hosts,
|
|
use_ssl=opts.elasticsearch_ssl,
|
|
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
|
|
skip_certificate_verification=opts.elasticsearch_skip_certificate_verification,
|
|
username=opts.elasticsearch_username,
|
|
password=opts.elasticsearch_password,
|
|
api_key=opts.elasticsearch_api_key,
|
|
timeout=elastic_timeout_value,
|
|
)
|
|
elastic.migrate_indexes(
|
|
aggregate_indexes=[es_aggregate_index],
|
|
forensic_indexes=[es_forensic_index],
|
|
)
|
|
clients["elasticsearch"] = _ElasticsearchHandle()
|
|
except Exception as e:
|
|
raise RuntimeError(f"Elasticsearch: {e}") from e
|
|
|
|
try:
|
|
if opts.opensearch_hosts:
|
|
logger.debug(
|
|
"Initializing OpenSearch client: hosts=%s, ssl=%s",
|
|
opts.opensearch_hosts,
|
|
opts.opensearch_ssl,
|
|
)
|
|
os_aggregate_index = "dmarc_aggregate"
|
|
os_forensic_index = "dmarc_forensic"
|
|
os_smtp_tls_index = "smtp_tls"
|
|
if opts.opensearch_index_suffix:
|
|
suffix = opts.opensearch_index_suffix
|
|
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
|
|
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
|
|
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
|
|
if opts.opensearch_index_prefix:
|
|
prefix = opts.opensearch_index_prefix
|
|
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
|
|
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
|
|
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
|
|
opensearch_timeout_value = (
|
|
float(opts.opensearch_timeout)
|
|
if opts.opensearch_timeout is not None
|
|
else 60.0
|
|
)
|
|
opensearch.set_hosts(
|
|
opts.opensearch_hosts,
|
|
use_ssl=opts.opensearch_ssl,
|
|
ssl_cert_path=opts.opensearch_ssl_cert_path,
|
|
skip_certificate_verification=opts.opensearch_skip_certificate_verification,
|
|
username=opts.opensearch_username,
|
|
password=opts.opensearch_password,
|
|
api_key=opts.opensearch_api_key,
|
|
timeout=opensearch_timeout_value,
|
|
auth_type=opts.opensearch_auth_type,
|
|
aws_region=opts.opensearch_aws_region,
|
|
aws_service=opts.opensearch_aws_service,
|
|
)
|
|
opensearch.migrate_indexes(
|
|
aggregate_indexes=[os_aggregate_index],
|
|
forensic_indexes=[os_forensic_index],
|
|
)
|
|
clients["opensearch"] = _OpenSearchHandle()
|
|
except Exception as e:
|
|
raise RuntimeError(f"OpenSearch: {e}") from e
|
|
|
|
return clients
|
|
|
|
|
|
def _close_output_clients(clients):
|
|
"""Close output clients that hold persistent connections.
|
|
|
|
Clients that do not expose a ``close`` method are silently skipped.
|
|
Errors during closing are logged as warnings and do not propagate.
|
|
|
|
Args:
|
|
clients: dict of client instances returned by :func:`_init_output_clients`.
|
|
"""
|
|
for name, client in clients.items():
|
|
if hasattr(client, "close"):
|
|
try:
|
|
client.close()
|
|
except Exception:
|
|
logger.warning("Error closing %s", name, exc_info=True)
|
|
|
|
|
|
def _main():
|
|
"""Called when the module is executed"""
|
|
|
|
def get_index_prefix(report):
|
|
domain = None
|
|
if index_prefix_domain_map is None:
|
|
return None
|
|
if "policy_published" in report:
|
|
domain = report["policy_published"]["domain"]
|
|
elif "reported_domain" in report:
|
|
domain = report["reported_domain"]
|
|
elif "policies" in report:
|
|
domain = report["policies"][0]["policy_domain"]
|
|
if domain:
|
|
domain = get_base_domain(domain)
|
|
if domain:
|
|
domain = domain.lower()
|
|
for prefix in index_prefix_domain_map:
|
|
if domain in index_prefix_domain_map[prefix]:
|
|
prefix = (
|
|
prefix.lower()
|
|
.strip()
|
|
.strip("_")
|
|
.replace(" ", "_")
|
|
.replace("-", "_")
|
|
)
|
|
prefix = f"{prefix}_"
|
|
return prefix
|
|
return None
|
|
|
|
def process_reports(reports_):
|
|
output_errors = []
|
|
|
|
def log_output_error(destination, error):
|
|
message = f"{destination} Error: {error}"
|
|
logger.error(message)
|
|
output_errors.append(message)
|
|
|
|
if index_prefix_domain_map is not None:
|
|
filtered_tls = []
|
|
for report in reports_.get("smtp_tls_reports", []):
|
|
if get_index_prefix(report) is not None:
|
|
filtered_tls.append(report)
|
|
else:
|
|
domain = "unknown"
|
|
if "policies" in report and report["policies"]:
|
|
domain = report["policies"][0].get("policy_domain", "unknown")
|
|
logger.debug(
|
|
"Ignoring SMTP TLS report for domain not in "
|
|
"index_prefix_domain_map: %s",
|
|
domain,
|
|
)
|
|
reports_["smtp_tls_reports"] = filtered_tls
|
|
|
|
indent_value = 2 if opts.prettify_json else None
|
|
output_str = "{0}\n".format(
|
|
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
|
|
)
|
|
|
|
if not opts.silent:
|
|
print(output_str)
|
|
if opts.output:
|
|
save_output(
|
|
reports_,
|
|
output_directory=opts.output,
|
|
aggregate_json_filename=opts.aggregate_json_filename,
|
|
forensic_json_filename=opts.forensic_json_filename,
|
|
smtp_tls_json_filename=opts.smtp_tls_json_filename,
|
|
aggregate_csv_filename=opts.aggregate_csv_filename,
|
|
forensic_csv_filename=opts.forensic_csv_filename,
|
|
smtp_tls_csv_filename=opts.smtp_tls_csv_filename,
|
|
)
|
|
|
|
kafka_client = clients.get("kafka_client")
|
|
s3_client = clients.get("s3_client")
|
|
syslog_client = clients.get("syslog_client")
|
|
hec_client = clients.get("hec_client")
|
|
gelf_client = clients.get("gelf_client")
|
|
webhook_client = clients.get("webhook_client")
|
|
|
|
kafka_aggregate_topic = opts.kafka_aggregate_topic
|
|
kafka_forensic_topic = opts.kafka_forensic_topic
|
|
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
|
|
|
|
if opts.save_aggregate:
|
|
for report in reports_["aggregate_reports"]:
|
|
try:
|
|
if opts.elasticsearch_hosts:
|
|
shards = opts.elasticsearch_number_of_shards
|
|
replicas = opts.elasticsearch_number_of_replicas
|
|
elastic.save_aggregate_report_to_elasticsearch(
|
|
report,
|
|
index_suffix=opts.elasticsearch_index_suffix,
|
|
index_prefix=opts.elasticsearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.elasticsearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except elastic.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except elastic.ElasticsearchError as error_:
|
|
log_output_error("Elasticsearch", error_.__str__())
|
|
except Exception as error_:
|
|
log_output_error("Elasticsearch exception", error_.__str__())
|
|
|
|
try:
|
|
if opts.opensearch_hosts:
|
|
shards = opts.opensearch_number_of_shards
|
|
replicas = opts.opensearch_number_of_replicas
|
|
opensearch.save_aggregate_report_to_opensearch(
|
|
report,
|
|
index_suffix=opts.opensearch_index_suffix,
|
|
index_prefix=opts.opensearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.opensearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except opensearch.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except opensearch.OpenSearchError as error_:
|
|
log_output_error("OpenSearch", error_.__str__())
|
|
except Exception as error_:
|
|
log_output_error("OpenSearch exception", error_.__str__())
|
|
|
|
try:
|
|
if kafka_client:
|
|
kafka_client.save_aggregate_reports_to_kafka(
|
|
report, kafka_aggregate_topic
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Kafka", error_.__str__())
|
|
|
|
try:
|
|
if s3_client:
|
|
s3_client.save_aggregate_report_to_s3(report)
|
|
except Exception as error_:
|
|
log_output_error("S3", error_.__str__())
|
|
|
|
try:
|
|
if syslog_client:
|
|
syslog_client.save_aggregate_report_to_syslog(report)
|
|
except Exception as error_:
|
|
log_output_error("Syslog", error_.__str__())
|
|
|
|
try:
|
|
if gelf_client:
|
|
gelf_client.save_aggregate_report_to_gelf(report)
|
|
except Exception as error_:
|
|
log_output_error("GELF", error_.__str__())
|
|
|
|
try:
|
|
if opts.webhook_aggregate_url and webhook_client:
|
|
indent_value = 2 if opts.prettify_json else None
|
|
webhook_client.save_aggregate_report_to_webhook(
|
|
json.dumps(report, ensure_ascii=False, indent=indent_value)
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Webhook", error_.__str__())
|
|
|
|
if hec_client:
|
|
try:
|
|
aggregate_reports_ = reports_["aggregate_reports"]
|
|
if len(aggregate_reports_) > 0:
|
|
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
|
|
except splunk.SplunkError as e:
|
|
log_output_error("Splunk HEC", e.__str__())
|
|
|
|
if opts.save_forensic:
|
|
for report in reports_["forensic_reports"]:
|
|
try:
|
|
shards = opts.elasticsearch_number_of_shards
|
|
replicas = opts.elasticsearch_number_of_replicas
|
|
if opts.elasticsearch_hosts:
|
|
elastic.save_forensic_report_to_elasticsearch(
|
|
report,
|
|
index_suffix=opts.elasticsearch_index_suffix,
|
|
index_prefix=opts.elasticsearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.elasticsearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except elastic.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except elastic.ElasticsearchError as error_:
|
|
log_output_error("Elasticsearch", error_.__str__())
|
|
except InvalidDMARCReport as error_:
|
|
log_output_error("Invalid DMARC report", error_.__str__())
|
|
|
|
try:
|
|
shards = opts.opensearch_number_of_shards
|
|
replicas = opts.opensearch_number_of_replicas
|
|
if opts.opensearch_hosts:
|
|
opensearch.save_forensic_report_to_opensearch(
|
|
report,
|
|
index_suffix=opts.opensearch_index_suffix,
|
|
index_prefix=opts.opensearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.opensearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except opensearch.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except opensearch.OpenSearchError as error_:
|
|
log_output_error("OpenSearch", error_.__str__())
|
|
except InvalidDMARCReport as error_:
|
|
log_output_error("Invalid DMARC report", error_.__str__())
|
|
|
|
try:
|
|
if kafka_client:
|
|
kafka_client.save_forensic_reports_to_kafka(
|
|
report, kafka_forensic_topic
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Kafka", error_.__str__())
|
|
|
|
try:
|
|
if s3_client:
|
|
s3_client.save_forensic_report_to_s3(report)
|
|
except Exception as error_:
|
|
log_output_error("S3", error_.__str__())
|
|
|
|
try:
|
|
if syslog_client:
|
|
syslog_client.save_forensic_report_to_syslog(report)
|
|
except Exception as error_:
|
|
log_output_error("Syslog", error_.__str__())
|
|
|
|
try:
|
|
if gelf_client:
|
|
gelf_client.save_forensic_report_to_gelf(report)
|
|
except Exception as error_:
|
|
log_output_error("GELF", error_.__str__())
|
|
|
|
try:
|
|
if opts.webhook_forensic_url and webhook_client:
|
|
indent_value = 2 if opts.prettify_json else None
|
|
webhook_client.save_forensic_report_to_webhook(
|
|
json.dumps(report, ensure_ascii=False, indent=indent_value)
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Webhook", error_.__str__())
|
|
|
|
if hec_client:
|
|
try:
|
|
forensic_reports_ = reports_["forensic_reports"]
|
|
if len(forensic_reports_) > 0:
|
|
hec_client.save_forensic_reports_to_splunk(forensic_reports_)
|
|
except splunk.SplunkError as e:
|
|
log_output_error("Splunk HEC", e.__str__())
|
|
|
|
if opts.save_smtp_tls:
|
|
for report in reports_["smtp_tls_reports"]:
|
|
try:
|
|
shards = opts.elasticsearch_number_of_shards
|
|
replicas = opts.elasticsearch_number_of_replicas
|
|
if opts.elasticsearch_hosts:
|
|
elastic.save_smtp_tls_report_to_elasticsearch(
|
|
report,
|
|
index_suffix=opts.elasticsearch_index_suffix,
|
|
index_prefix=opts.elasticsearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.elasticsearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except elastic.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except elastic.ElasticsearchError as error_:
|
|
log_output_error("Elasticsearch", error_.__str__())
|
|
except InvalidDMARCReport as error_:
|
|
log_output_error("Invalid DMARC report", error_.__str__())
|
|
|
|
try:
|
|
shards = opts.opensearch_number_of_shards
|
|
replicas = opts.opensearch_number_of_replicas
|
|
if opts.opensearch_hosts:
|
|
opensearch.save_smtp_tls_report_to_opensearch(
|
|
report,
|
|
index_suffix=opts.opensearch_index_suffix,
|
|
index_prefix=opts.opensearch_index_prefix
|
|
or get_index_prefix(report),
|
|
monthly_indexes=opts.opensearch_monthly_indexes,
|
|
number_of_shards=shards,
|
|
number_of_replicas=replicas,
|
|
)
|
|
except opensearch.AlreadySaved as warning:
|
|
logger.warning(warning.__str__())
|
|
except opensearch.OpenSearchError as error_:
|
|
log_output_error("OpenSearch", error_.__str__())
|
|
except InvalidDMARCReport as error_:
|
|
log_output_error("Invalid DMARC report", error_.__str__())
|
|
|
|
try:
|
|
if kafka_client:
|
|
kafka_client.save_smtp_tls_reports_to_kafka(
|
|
[report], kafka_smtp_tls_topic
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Kafka", error_.__str__())
|
|
|
|
try:
|
|
if s3_client:
|
|
s3_client.save_smtp_tls_report_to_s3(report)
|
|
except Exception as error_:
|
|
log_output_error("S3", error_.__str__())
|
|
|
|
try:
|
|
if syslog_client:
|
|
syslog_client.save_smtp_tls_report_to_syslog(report)
|
|
except Exception as error_:
|
|
log_output_error("Syslog", error_.__str__())
|
|
|
|
try:
|
|
if gelf_client:
|
|
gelf_client.save_smtp_tls_report_to_gelf(report)
|
|
except Exception as error_:
|
|
log_output_error("GELF", error_.__str__())
|
|
|
|
try:
|
|
if opts.webhook_smtp_tls_url and webhook_client:
|
|
indent_value = 2 if opts.prettify_json else None
|
|
webhook_client.save_smtp_tls_report_to_webhook(
|
|
json.dumps(report, ensure_ascii=False, indent=indent_value)
|
|
)
|
|
except Exception as error_:
|
|
log_output_error("Webhook", error_.__str__())
|
|
|
|
if hec_client:
|
|
try:
|
|
smtp_tls_reports_ = reports_["smtp_tls_reports"]
|
|
if len(smtp_tls_reports_) > 0:
|
|
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
|
|
except splunk.SplunkError as e:
|
|
log_output_error("Splunk HEC", e.__str__())
|
|
|
|
if opts.la_dce:
|
|
try:
|
|
la_client = loganalytics.LogAnalyticsClient(
|
|
client_id=opts.la_client_id,
|
|
client_secret=opts.la_client_secret,
|
|
tenant_id=opts.la_tenant_id,
|
|
dce=opts.la_dce,
|
|
dcr_immutable_id=opts.la_dcr_immutable_id,
|
|
dcr_aggregate_stream=opts.la_dcr_aggregate_stream,
|
|
dcr_forensic_stream=opts.la_dcr_forensic_stream,
|
|
dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream,
|
|
)
|
|
la_client.publish_results(
|
|
reports_,
|
|
opts.save_aggregate,
|
|
opts.save_forensic,
|
|
opts.save_smtp_tls,
|
|
)
|
|
except loganalytics.LogAnalyticsException as e:
|
|
log_output_error("Log Analytics", e.__str__())
|
|
except Exception as e:
|
|
log_output_error("Log Analytics", f"Unknown publishing error: {e}")
|
|
|
|
if opts.fail_on_output_error and output_errors:
|
|
raise ParserError(
|
|
"Output destination failures detected: {0}".format(
|
|
" | ".join(output_errors)
|
|
)
|
|
)
|
|
|
|
arg_parser = ArgumentParser(description="Parses DMARC reports")
|
|
arg_parser.add_argument(
|
|
"-c",
|
|
"--config-file",
|
|
help="a path to a configuration file (--silent implied)",
|
|
)
|
|
arg_parser.add_argument(
|
|
"file_path",
|
|
nargs="*",
|
|
help="one or more paths to aggregate or forensic "
|
|
"report files, emails, or mbox files'",
|
|
)
|
|
strip_attachment_help = "remove attachment payloads from forensic report output"
|
|
arg_parser.add_argument(
|
|
"--strip-attachment-payloads", help=strip_attachment_help, action="store_true"
|
|
)
|
|
arg_parser.add_argument(
|
|
"-o", "--output", help="write output files to the given directory"
|
|
)
|
|
arg_parser.add_argument(
|
|
"--aggregate-json-filename",
|
|
help="filename for the aggregate JSON output file",
|
|
default="aggregate.json",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--forensic-json-filename",
|
|
help="filename for the forensic JSON output file",
|
|
default="forensic.json",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--smtp-tls-json-filename",
|
|
help="filename for the SMTP TLS JSON output file",
|
|
default="smtp_tls.json",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--aggregate-csv-filename",
|
|
help="filename for the aggregate CSV output file",
|
|
default="aggregate.csv",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--forensic-csv-filename",
|
|
help="filename for the forensic CSV output file",
|
|
default="forensic.csv",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--smtp-tls-csv-filename",
|
|
help="filename for the SMTP TLS CSV output file",
|
|
default="smtp_tls.csv",
|
|
)
|
|
arg_parser.add_argument(
|
|
"-n", "--nameservers", nargs="+", help="nameservers to query"
|
|
)
|
|
arg_parser.add_argument(
|
|
"-t",
|
|
"--dns_timeout",
|
|
help="number of seconds to wait for an answer from DNS (default: 2.0)",
|
|
type=float,
|
|
default=2.0,
|
|
)
|
|
arg_parser.add_argument(
|
|
"--dns-retries",
|
|
dest="dns_retries",
|
|
help="number of times to retry DNS queries on timeout or other "
|
|
"transient errors (default: 0)",
|
|
type=int,
|
|
default=0,
|
|
)
|
|
arg_parser.add_argument(
|
|
"--offline",
|
|
action="store_true",
|
|
help="do not make online queries for geolocation or DNS",
|
|
)
|
|
arg_parser.add_argument(
|
|
"-s", "--silent", action="store_true", help="only print errors"
|
|
)
|
|
arg_parser.add_argument(
|
|
"-w",
|
|
"--warnings",
|
|
action="store_true",
|
|
help="print warnings in addition to errors",
|
|
)
|
|
arg_parser.add_argument(
|
|
"--verbose", action="store_true", help="more verbose output"
|
|
)
|
|
arg_parser.add_argument(
|
|
"--debug", action="store_true", help="print debugging information"
|
|
)
|
|
arg_parser.add_argument("--log-file", default=None, help="output logging to a file")
|
|
arg_parser.add_argument(
|
|
"--no-prettify-json",
|
|
action="store_false",
|
|
dest="prettify_json",
|
|
help="output JSON in a single line without indentation",
|
|
)
|
|
arg_parser.add_argument("-v", "--version", action="version", version=__version__)
|
|
|
|
aggregate_reports = []
|
|
forensic_reports = []
|
|
smtp_tls_reports = []
|
|
|
|
args = arg_parser.parse_args()
|
|
|
|
opts = Namespace(
|
|
file_path=args.file_path,
|
|
config_file=args.config_file,
|
|
offline=args.offline,
|
|
strip_attachment_payloads=args.strip_attachment_payloads,
|
|
output=args.output,
|
|
aggregate_csv_filename=args.aggregate_csv_filename,
|
|
aggregate_json_filename=args.aggregate_json_filename,
|
|
forensic_csv_filename=args.forensic_csv_filename,
|
|
forensic_json_filename=args.forensic_json_filename,
|
|
smtp_tls_json_filename=args.smtp_tls_json_filename,
|
|
smtp_tls_csv_filename=args.smtp_tls_csv_filename,
|
|
nameservers=args.nameservers,
|
|
dns_test_address="1.1.1.1",
|
|
silent=args.silent,
|
|
warnings=args.warnings,
|
|
dns_timeout=args.dns_timeout,
|
|
dns_retries=args.dns_retries,
|
|
debug=args.debug,
|
|
verbose=args.verbose,
|
|
prettify_json=args.prettify_json,
|
|
save_aggregate=False,
|
|
save_forensic=False,
|
|
save_smtp_tls=False,
|
|
mailbox_reports_folder="INBOX",
|
|
mailbox_archive_folder="Archive",
|
|
mailbox_watch=False,
|
|
mailbox_delete=False,
|
|
mailbox_test=False,
|
|
mailbox_batch_size=10,
|
|
mailbox_check_timeout=30,
|
|
mailbox_since=None,
|
|
imap_host=None,
|
|
imap_skip_certificate_verification=False,
|
|
imap_ssl=True,
|
|
imap_port=993,
|
|
imap_timeout=30,
|
|
imap_max_retries=4,
|
|
imap_user=None,
|
|
imap_password=None,
|
|
graph_auth_method=None,
|
|
graph_user=None,
|
|
graph_password=None,
|
|
graph_client_id=None,
|
|
graph_client_secret=None,
|
|
graph_certificate_path=None,
|
|
graph_certificate_password=None,
|
|
graph_tenant_id=None,
|
|
graph_mailbox=None,
|
|
graph_allow_unencrypted_storage=False,
|
|
graph_url="https://graph.microsoft.com",
|
|
hec=None,
|
|
hec_token=None,
|
|
hec_index=None,
|
|
hec_skip_certificate_verification=False,
|
|
elasticsearch_hosts=None,
|
|
elasticsearch_timeout=60,
|
|
elasticsearch_number_of_shards=1,
|
|
elasticsearch_number_of_replicas=0,
|
|
elasticsearch_index_suffix=None,
|
|
elasticsearch_index_prefix=None,
|
|
elasticsearch_ssl=True,
|
|
elasticsearch_ssl_cert_path=None,
|
|
elasticsearch_skip_certificate_verification=False,
|
|
elasticsearch_monthly_indexes=False,
|
|
elasticsearch_username=None,
|
|
elasticsearch_password=None,
|
|
elasticsearch_api_key=None,
|
|
opensearch_hosts=None,
|
|
opensearch_timeout=60,
|
|
opensearch_number_of_shards=1,
|
|
opensearch_number_of_replicas=0,
|
|
opensearch_index_suffix=None,
|
|
opensearch_index_prefix=None,
|
|
opensearch_ssl=True,
|
|
opensearch_ssl_cert_path=None,
|
|
opensearch_skip_certificate_verification=False,
|
|
opensearch_monthly_indexes=False,
|
|
opensearch_username=None,
|
|
opensearch_password=None,
|
|
opensearch_api_key=None,
|
|
opensearch_auth_type="basic",
|
|
opensearch_aws_region=None,
|
|
opensearch_aws_service="es",
|
|
kafka_hosts=None,
|
|
kafka_username=None,
|
|
kafka_password=None,
|
|
kafka_aggregate_topic=None,
|
|
kafka_forensic_topic=None,
|
|
kafka_smtp_tls_topic=None,
|
|
kafka_ssl=False,
|
|
kafka_skip_certificate_verification=False,
|
|
smtp_host=None,
|
|
smtp_port=25,
|
|
smtp_ssl=False,
|
|
smtp_skip_certificate_verification=False,
|
|
smtp_user=None,
|
|
smtp_password=None,
|
|
smtp_from=None,
|
|
smtp_to=[],
|
|
smtp_subject="parsedmarc report",
|
|
smtp_message="Please see the attached DMARC results.",
|
|
s3_bucket=None,
|
|
s3_path=None,
|
|
s3_region_name=None,
|
|
s3_endpoint_url=None,
|
|
s3_access_key_id=None,
|
|
s3_secret_access_key=None,
|
|
syslog_server=None,
|
|
syslog_port=None,
|
|
syslog_protocol=None,
|
|
syslog_cafile_path=None,
|
|
syslog_certfile_path=None,
|
|
syslog_keyfile_path=None,
|
|
syslog_timeout=None,
|
|
syslog_retry_attempts=None,
|
|
syslog_retry_delay=None,
|
|
gmail_api_credentials_file=None,
|
|
gmail_api_token_file=None,
|
|
gmail_api_include_spam_trash=False,
|
|
gmail_api_paginate_messages=True,
|
|
gmail_api_scopes=[],
|
|
gmail_api_oauth2_port=8080,
|
|
gmail_api_auth_mode="installed_app",
|
|
gmail_api_service_account_user=None,
|
|
maildir_path=None,
|
|
maildir_create=False,
|
|
log_file=args.log_file,
|
|
n_procs=1,
|
|
ip_db_path=None,
|
|
ipinfo_url=None,
|
|
ipinfo_api_token=None,
|
|
always_use_local_files=False,
|
|
reverse_dns_map_path=None,
|
|
reverse_dns_map_url=None,
|
|
psl_overrides_path=None,
|
|
psl_overrides_url=None,
|
|
la_client_id=None,
|
|
la_client_secret=None,
|
|
la_tenant_id=None,
|
|
la_dce=None,
|
|
la_dcr_immutable_id=None,
|
|
la_dcr_aggregate_stream=None,
|
|
la_dcr_forensic_stream=None,
|
|
la_dcr_smtp_tls_stream=None,
|
|
gelf_host=None,
|
|
gelf_port=None,
|
|
gelf_mode=None,
|
|
webhook_aggregate_url=None,
|
|
webhook_forensic_url=None,
|
|
webhook_smtp_tls_url=None,
|
|
webhook_timeout=60,
|
|
normalize_timespan_threshold_hours=24.0,
|
|
fail_on_output_error=False,
|
|
)
|
|
|
|
# Snapshot opts as set from CLI args / hardcoded defaults, before any config
|
|
# file is applied. On each SIGHUP reload we restore this baseline first so
|
|
# that sections removed from the config file actually take effect.
|
|
opts_from_cli = Namespace(**vars(opts))
|
|
|
|
index_prefix_domain_map = None
|
|
|
|
config_file = args.config_file or os.environ.get("PARSEDMARC_CONFIG_FILE")
|
|
has_env_config = any(
|
|
k.startswith("PARSEDMARC_") and k != "PARSEDMARC_CONFIG_FILE"
|
|
for k in os.environ
|
|
)
|
|
|
|
if config_file or has_env_config:
|
|
try:
|
|
config = _load_config(config_file)
|
|
index_prefix_domain_map = _parse_config(config, opts)
|
|
except ConfigurationError as e:
|
|
logger.critical(str(e))
|
|
exit(-1)
|
|
|
|
logger.setLevel(logging.ERROR)
|
|
|
|
if opts.warnings:
|
|
logger.setLevel(logging.WARNING)
|
|
if opts.verbose:
|
|
logger.setLevel(logging.INFO)
|
|
if opts.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
if opts.log_file:
|
|
try:
|
|
fh = logging.FileHandler(opts.log_file, "a")
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
|
|
)
|
|
fh.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
except Exception as error:
|
|
logger.warning("Unable to write to log file: {}".format(error))
|
|
|
|
opts.active_log_file = opts.log_file
|
|
|
|
if (
|
|
opts.imap_host is None
|
|
and opts.graph_client_id is None
|
|
and opts.gmail_api_credentials_file is None
|
|
and opts.maildir_path is None
|
|
and len(opts.file_path) == 0
|
|
):
|
|
logger.error("You must supply input files or a mailbox connection")
|
|
exit(1)
|
|
|
|
logger.info("Starting parsedmarc")
|
|
|
|
load_ip_db(
|
|
always_use_local_file=opts.always_use_local_files,
|
|
local_file_path=opts.ip_db_path,
|
|
url=opts.ipinfo_url,
|
|
offline=opts.offline,
|
|
)
|
|
|
|
if opts.ipinfo_api_token and not opts.offline:
|
|
try:
|
|
configure_ipinfo_api(opts.ipinfo_api_token)
|
|
except InvalidIPinfoAPIKey as e:
|
|
logger.critical(str(e))
|
|
exit(1)
|
|
|
|
load_psl_overrides(
|
|
always_use_local_file=opts.always_use_local_files,
|
|
local_file_path=opts.psl_overrides_path,
|
|
url=opts.psl_overrides_url,
|
|
offline=opts.offline,
|
|
)
|
|
|
|
# Initialize output clients (with retry for transient connection errors)
|
|
clients = {}
|
|
max_retries = 4
|
|
retry_delay = 5
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
clients = _init_output_clients(opts)
|
|
break
|
|
except ConfigurationError as e:
|
|
logger.critical(str(e))
|
|
exit(1)
|
|
except Exception as error_:
|
|
if attempt < max_retries:
|
|
logger.warning(
|
|
"Output client error (attempt %d/%d, retrying in %ds): %s",
|
|
attempt + 1,
|
|
max_retries + 1,
|
|
retry_delay,
|
|
error_,
|
|
)
|
|
time.sleep(retry_delay)
|
|
retry_delay *= 2
|
|
else:
|
|
logger.error("Output client error: {0}".format(error_))
|
|
exit(1)
|
|
|
|
file_paths = []
|
|
mbox_paths = []
|
|
|
|
for file_path in args.file_path:
|
|
file_paths += glob(file_path)
|
|
for file_path in file_paths:
|
|
if is_mbox(file_path):
|
|
mbox_paths.append(file_path)
|
|
|
|
file_paths = list(set(file_paths))
|
|
mbox_paths = list(set(mbox_paths))
|
|
|
|
for mbox_path in mbox_paths:
|
|
file_paths.remove(mbox_path)
|
|
|
|
counter = 0
|
|
|
|
results = []
|
|
|
|
pbar = None
|
|
if sys.stdout.isatty():
|
|
pbar = tqdm(total=len(file_paths))
|
|
|
|
n_procs = int(opts.n_procs or 1)
|
|
if n_procs < 1:
|
|
n_procs = 1
|
|
|
|
# Capture the current log level to pass to child processes
|
|
current_log_level = logger.level
|
|
current_log_file = opts.log_file
|
|
|
|
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
|
|
processes = []
|
|
connections = []
|
|
|
|
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
|
|
if proc_index >= len(file_paths):
|
|
break
|
|
|
|
parent_conn, child_conn = Pipe()
|
|
connections.append(parent_conn)
|
|
|
|
process = Process(
|
|
target=cli_parse,
|
|
args=(
|
|
file_paths[proc_index],
|
|
opts.strip_attachment_payloads,
|
|
opts.nameservers,
|
|
opts.dns_timeout,
|
|
opts.dns_retries,
|
|
opts.ip_db_path,
|
|
opts.offline,
|
|
opts.always_use_local_files,
|
|
opts.reverse_dns_map_path,
|
|
opts.reverse_dns_map_url,
|
|
opts.normalize_timespan_threshold_hours,
|
|
child_conn,
|
|
current_log_level,
|
|
current_log_file,
|
|
),
|
|
)
|
|
processes.append(process)
|
|
|
|
for proc in processes:
|
|
proc.start()
|
|
|
|
for conn in connections:
|
|
results.append(conn.recv())
|
|
|
|
for proc in processes:
|
|
proc.join()
|
|
if pbar is not None:
|
|
counter += 1
|
|
pbar.update(1)
|
|
|
|
if pbar is not None:
|
|
pbar.close()
|
|
|
|
for result in results:
|
|
if isinstance(result[0], ParserError) or result[0] is None:
|
|
logger.error("Failed to parse {0} - {1}".format(result[1], result[0]))
|
|
else:
|
|
if result[0]["report_type"] == "aggregate":
|
|
report_org = result[0]["report"]["report_metadata"]["org_name"]
|
|
report_id = result[0]["report"]["report_metadata"]["report_id"]
|
|
report_key = f"{report_org}_{report_id}"
|
|
if report_key not in SEEN_AGGREGATE_REPORT_IDS:
|
|
SEEN_AGGREGATE_REPORT_IDS[report_key] = True
|
|
aggregate_reports.append(result[0]["report"])
|
|
else:
|
|
logger.debug(
|
|
"Skipping duplicate aggregate report "
|
|
f"from {report_org} with ID: {report_id}"
|
|
)
|
|
elif result[0]["report_type"] == "forensic":
|
|
forensic_reports.append(result[0]["report"])
|
|
elif result[0]["report_type"] == "smtp_tls":
|
|
smtp_tls_reports.append(result[0]["report"])
|
|
|
|
for mbox_path in mbox_paths:
|
|
normalize_timespan_threshold_hours_value = (
|
|
float(opts.normalize_timespan_threshold_hours)
|
|
if opts.normalize_timespan_threshold_hours is not None
|
|
else 24.0
|
|
)
|
|
strip = opts.strip_attachment_payloads
|
|
reports = get_dmarc_reports_from_mbox(
|
|
mbox_path,
|
|
nameservers=opts.nameservers,
|
|
dns_timeout=opts.dns_timeout,
|
|
dns_retries=opts.dns_retries,
|
|
strip_attachment_payloads=strip,
|
|
ip_db_path=opts.ip_db_path,
|
|
always_use_local_files=opts.always_use_local_files,
|
|
reverse_dns_map_path=opts.reverse_dns_map_path,
|
|
reverse_dns_map_url=opts.reverse_dns_map_url,
|
|
offline=opts.offline,
|
|
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
|
)
|
|
aggregate_reports += reports["aggregate_reports"]
|
|
forensic_reports += reports["forensic_reports"]
|
|
smtp_tls_reports += reports["smtp_tls_reports"]
|
|
|
|
mailbox_connection = None
|
|
mailbox_batch_size_value = 10
|
|
mailbox_check_timeout_value = 30
|
|
normalize_timespan_threshold_hours_value = 24.0
|
|
|
|
if opts.imap_host:
|
|
try:
|
|
if opts.imap_user is None or opts.imap_password is None:
|
|
logger.error(
|
|
"IMAP user and password must be specified if host is specified"
|
|
)
|
|
exit(1)
|
|
|
|
ssl = True
|
|
verify = True
|
|
if opts.imap_skip_certificate_verification:
|
|
logger.debug("Skipping IMAP certificate verification")
|
|
verify = False
|
|
if not opts.imap_ssl:
|
|
ssl = False
|
|
|
|
imap_timeout = (
|
|
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
|
|
)
|
|
imap_max_retries = (
|
|
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
|
|
)
|
|
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
|
|
mailbox_connection = IMAPConnection(
|
|
host=opts.imap_host,
|
|
port=imap_port_value,
|
|
ssl=ssl,
|
|
verify=verify,
|
|
timeout=imap_timeout,
|
|
max_retries=imap_max_retries,
|
|
user=opts.imap_user,
|
|
password=opts.imap_password,
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("IMAP Error")
|
|
exit(1)
|
|
|
|
if opts.graph_client_id:
|
|
try:
|
|
mailbox = opts.graph_mailbox or opts.graph_user
|
|
mailbox_connection = MSGraphConnection(
|
|
auth_method=opts.graph_auth_method,
|
|
mailbox=mailbox,
|
|
tenant_id=opts.graph_tenant_id,
|
|
client_id=opts.graph_client_id,
|
|
client_secret=opts.graph_client_secret,
|
|
certificate_path=opts.graph_certificate_path,
|
|
certificate_password=opts.graph_certificate_password,
|
|
username=opts.graph_user,
|
|
password=opts.graph_password,
|
|
token_file=opts.graph_token_file,
|
|
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
|
|
graph_url=opts.graph_url,
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("MS Graph Error")
|
|
exit(1)
|
|
|
|
if opts.gmail_api_credentials_file:
|
|
if opts.mailbox_delete:
|
|
if "https://mail.google.com/" not in opts.gmail_api_scopes:
|
|
logger.error(
|
|
"Message deletion requires scope"
|
|
" 'https://mail.google.com/'. "
|
|
"Add the scope and remove token file "
|
|
"to acquire proper access."
|
|
)
|
|
opts.mailbox_delete = False
|
|
|
|
try:
|
|
mailbox_connection = GmailConnection(
|
|
credentials_file=opts.gmail_api_credentials_file,
|
|
token_file=opts.gmail_api_token_file,
|
|
scopes=opts.gmail_api_scopes,
|
|
include_spam_trash=opts.gmail_api_include_spam_trash,
|
|
paginate_messages=opts.gmail_api_paginate_messages,
|
|
reports_folder=opts.mailbox_reports_folder,
|
|
oauth2_port=opts.gmail_api_oauth2_port,
|
|
auth_mode=opts.gmail_api_auth_mode,
|
|
service_account_user=opts.gmail_api_service_account_user,
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("Gmail API Error")
|
|
exit(1)
|
|
|
|
if opts.maildir_path:
|
|
try:
|
|
mailbox_connection = MaildirConnection(
|
|
maildir_path=opts.maildir_path,
|
|
maildir_create=opts.maildir_create,
|
|
)
|
|
except Exception:
|
|
logger.exception("Maildir Error")
|
|
exit(1)
|
|
|
|
if mailbox_connection:
|
|
mailbox_batch_size_value = (
|
|
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
|
|
)
|
|
mailbox_check_timeout_value = (
|
|
int(opts.mailbox_check_timeout)
|
|
if opts.mailbox_check_timeout is not None
|
|
else 30
|
|
)
|
|
normalize_timespan_threshold_hours_value = (
|
|
float(opts.normalize_timespan_threshold_hours)
|
|
if opts.normalize_timespan_threshold_hours is not None
|
|
else 24.0
|
|
)
|
|
try:
|
|
reports = get_dmarc_reports_from_mailbox(
|
|
connection=mailbox_connection,
|
|
delete=opts.mailbox_delete,
|
|
batch_size=mailbox_batch_size_value,
|
|
reports_folder=opts.mailbox_reports_folder,
|
|
archive_folder=opts.mailbox_archive_folder,
|
|
ip_db_path=opts.ip_db_path,
|
|
always_use_local_files=opts.always_use_local_files,
|
|
reverse_dns_map_path=opts.reverse_dns_map_path,
|
|
reverse_dns_map_url=opts.reverse_dns_map_url,
|
|
offline=opts.offline,
|
|
nameservers=opts.nameservers,
|
|
test=opts.mailbox_test,
|
|
strip_attachment_payloads=opts.strip_attachment_payloads,
|
|
since=opts.mailbox_since,
|
|
dns_retries=opts.dns_retries,
|
|
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
|
)
|
|
|
|
aggregate_reports += reports["aggregate_reports"]
|
|
forensic_reports += reports["forensic_reports"]
|
|
smtp_tls_reports += reports["smtp_tls_reports"]
|
|
|
|
except Exception:
|
|
logger.exception("Mailbox Error")
|
|
exit(1)
|
|
|
|
parsing_results: ParsingResults = {
|
|
"aggregate_reports": aggregate_reports,
|
|
"forensic_reports": forensic_reports,
|
|
"smtp_tls_reports": smtp_tls_reports,
|
|
}
|
|
|
|
try:
|
|
process_reports(parsing_results)
|
|
except ParserError as error:
|
|
logger.error(error.__str__())
|
|
exit(1)
|
|
|
|
if opts.smtp_host:
|
|
try:
|
|
verify = True
|
|
if opts.smtp_skip_certificate_verification:
|
|
verify = False
|
|
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
|
|
smtp_to_value = (
|
|
list(opts.smtp_to)
|
|
if isinstance(opts.smtp_to, list)
|
|
else _str_to_list(str(opts.smtp_to))
|
|
)
|
|
email_results(
|
|
parsing_results,
|
|
opts.smtp_host,
|
|
opts.smtp_from,
|
|
smtp_to_value,
|
|
port=smtp_port_value,
|
|
verify=verify,
|
|
username=opts.smtp_user,
|
|
password=opts.smtp_password,
|
|
subject=opts.smtp_subject,
|
|
require_encryption=opts.smtp_ssl,
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to email results")
|
|
exit(1)
|
|
|
|
# SIGHUP-based config reload for watch mode
|
|
_reload_requested = False
|
|
|
|
def _handle_sighup(signum, frame):
|
|
nonlocal _reload_requested
|
|
# Logging is not async-signal-safe; only set the flag here.
|
|
# The log message is emitted from the main loop when the flag is read.
|
|
_reload_requested = True
|
|
|
|
if hasattr(signal, "SIGHUP"):
|
|
signal.signal(signal.SIGHUP, _handle_sighup)
|
|
|
|
if mailbox_connection and opts.mailbox_watch:
|
|
logger.info("Watching for email - Quit with ctrl-c")
|
|
|
|
while True:
|
|
# Re-check mailbox_watch in case a config reload disabled watch mode
|
|
if not opts.mailbox_watch:
|
|
logger.info(
|
|
"Mailbox watch disabled in reloaded configuration, stopping watcher"
|
|
)
|
|
break
|
|
try:
|
|
watch_inbox(
|
|
mailbox_connection=mailbox_connection,
|
|
callback=process_reports,
|
|
reports_folder=opts.mailbox_reports_folder,
|
|
archive_folder=opts.mailbox_archive_folder,
|
|
delete=opts.mailbox_delete,
|
|
test=opts.mailbox_test,
|
|
check_timeout=mailbox_check_timeout_value,
|
|
nameservers=opts.nameservers,
|
|
dns_timeout=opts.dns_timeout,
|
|
dns_retries=opts.dns_retries,
|
|
strip_attachment_payloads=opts.strip_attachment_payloads,
|
|
batch_size=mailbox_batch_size_value,
|
|
since=opts.mailbox_since,
|
|
ip_db_path=opts.ip_db_path,
|
|
always_use_local_files=opts.always_use_local_files,
|
|
reverse_dns_map_path=opts.reverse_dns_map_path,
|
|
reverse_dns_map_url=opts.reverse_dns_map_url,
|
|
offline=opts.offline,
|
|
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
|
|
config_reloading=lambda: _reload_requested,
|
|
)
|
|
except FileExistsError as error:
|
|
logger.error("{0}".format(error.__str__()))
|
|
exit(1)
|
|
except ParserError as error:
|
|
logger.error(error.__str__())
|
|
exit(1)
|
|
|
|
if not _reload_requested:
|
|
break
|
|
|
|
# Reload configuration — emit the log message here (not in the
|
|
# signal handler, which is not async-signal-safe), then clear the
|
|
# flag so that any new SIGHUP arriving while we reload will be
|
|
# captured for the next iteration rather than being silently dropped.
|
|
logger.info("SIGHUP received, config will reload after current batch")
|
|
_reload_requested = False
|
|
logger.info("Reloading configuration...")
|
|
try:
|
|
# Build a fresh opts starting from CLI-only defaults so that
|
|
# sections removed from the config file actually take effect.
|
|
new_opts = Namespace(**vars(opts_from_cli))
|
|
new_config = _load_config(config_file)
|
|
new_index_prefix_domain_map = _parse_config(new_config, new_opts)
|
|
new_clients = _init_output_clients(new_opts)
|
|
|
|
# All steps succeeded — commit the changes atomically.
|
|
_close_output_clients(clients)
|
|
clients = new_clients
|
|
index_prefix_domain_map = new_index_prefix_domain_map
|
|
|
|
# Reload the reverse DNS map so changes to the
|
|
# map path/URL in the config take effect. PSL overrides
|
|
# are reloaded alongside it so map entries that depend on
|
|
# a folded base domain keep working.
|
|
load_reverse_dns_map(
|
|
REVERSE_DNS_MAP,
|
|
always_use_local_file=new_opts.always_use_local_files,
|
|
local_file_path=new_opts.reverse_dns_map_path,
|
|
url=new_opts.reverse_dns_map_url,
|
|
offline=new_opts.offline,
|
|
psl_overrides_path=new_opts.psl_overrides_path,
|
|
psl_overrides_url=new_opts.psl_overrides_url,
|
|
)
|
|
|
|
# Reload the IP database so changes to the
|
|
# db path/URL in the config take effect.
|
|
load_ip_db(
|
|
always_use_local_file=new_opts.always_use_local_files,
|
|
local_file_path=new_opts.ip_db_path,
|
|
url=new_opts.ipinfo_url,
|
|
offline=new_opts.offline,
|
|
)
|
|
|
|
# Re-apply IPinfo API settings. Passing a falsy token disables
|
|
# the API; a rotated token picks up here too. An invalid token
|
|
# is fatal even on reload — the operator asked for it.
|
|
try:
|
|
configure_ipinfo_api(
|
|
new_opts.ipinfo_api_token if not new_opts.offline else None,
|
|
)
|
|
except InvalidIPinfoAPIKey as e:
|
|
logger.critical(str(e))
|
|
exit(1)
|
|
|
|
for k, v in vars(new_opts).items():
|
|
setattr(opts, k, v)
|
|
|
|
# Update watch parameters from reloaded config
|
|
mailbox_batch_size_value = (
|
|
int(opts.mailbox_batch_size)
|
|
if opts.mailbox_batch_size is not None
|
|
else 10
|
|
)
|
|
mailbox_check_timeout_value = (
|
|
int(opts.mailbox_check_timeout)
|
|
if opts.mailbox_check_timeout is not None
|
|
else 30
|
|
)
|
|
normalize_timespan_threshold_hours_value = (
|
|
float(opts.normalize_timespan_threshold_hours)
|
|
if opts.normalize_timespan_threshold_hours is not None
|
|
else 24.0
|
|
)
|
|
|
|
# Update log level
|
|
logger.setLevel(logging.ERROR)
|
|
if opts.warnings:
|
|
logger.setLevel(logging.WARNING)
|
|
if opts.verbose:
|
|
logger.setLevel(logging.INFO)
|
|
if opts.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Refresh FileHandler if log_file changed
|
|
old_log_file = getattr(opts, "active_log_file", None)
|
|
new_log_file = opts.log_file
|
|
if old_log_file != new_log_file:
|
|
# Remove old FileHandlers
|
|
for h in list(logger.handlers):
|
|
if isinstance(h, logging.FileHandler):
|
|
h.close()
|
|
logger.removeHandler(h)
|
|
# Add new FileHandler if configured
|
|
if new_log_file:
|
|
try:
|
|
fh = logging.FileHandler(new_log_file, "a")
|
|
file_formatter = logging.Formatter(
|
|
"%(asctime)s - %(levelname)s"
|
|
" - [%(filename)s:%(lineno)d] - %(message)s"
|
|
)
|
|
fh.setFormatter(file_formatter)
|
|
logger.addHandler(fh)
|
|
except Exception as log_error:
|
|
logger.warning(
|
|
"Unable to write to log file: {}".format(log_error)
|
|
)
|
|
opts.active_log_file = new_log_file
|
|
|
|
logger.info("Configuration reloaded successfully")
|
|
except Exception:
|
|
logger.exception(
|
|
"Config reload failed, continuing with previous config"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_main()
|