Files
parsedmarc/parsedmarc/cli.py
T
2026-03-20 15:28:38 -04:00

2018 lines
80 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""A CLI for parsing DMARC reports"""
import http.client
import json
import logging
import os
import signal
import sys
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import yaml
from tqdm import tqdm
from parsedmarc import (
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
ParserError,
__version__,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
s3,
save_output,
splunk,
syslog,
watch_inbox,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
def _str_to_list(s):
"""Converts a comma separated string to a list"""
_list = s.split(",")
return list(map(lambda i: i.lstrip(), _list))
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
nameservers,
dns_timeout,
ip_db_path,
offline,
always_use_local_files,
reverse_dns_map_path,
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
try:
file_results = parse_report_file(
file_path,
ip_db_path=ip_db_path,
offline=offline,
always_use_local_files=always_use_local_files,
reverse_dns_map_path=reverse_dns_map_path,
reverse_dns_map_url=reverse_dns_map_url,
nameservers=nameservers,
dns_timeout=dns_timeout,
strip_attachment_payloads=sa,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
)
conn.send([file_results, file_path])
except ParserError as error:
conn.send([error, file_path])
finally:
conn.close()
class ConfigurationError(Exception):
"""Raised when a configuration file has missing or invalid settings."""
pass
def _parse_config_file(config_file, opts):
"""Parse a config file and update opts in place.
Args:
config_file: Path to the .ini config file
opts: Namespace object to update with parsed values
Returns:
index_prefix_domain_map or None
Raises:
ConfigurationError: If required settings are missing or invalid.
"""
abs_path = os.path.abspath(config_file)
if not os.path.exists(abs_path):
raise ConfigurationError("A file does not exist at {0}".format(abs_path))
opts.silent = True
config = ConfigParser()
index_prefix_domain_map = None
config.read(config_file)
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
)
if "index_prefix_domain_map" in general_config:
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool(
general_config.getboolean("strip_attachment_payloads")
)
if "output" in general_config:
opts.output = general_config["output"]
if "aggregate_json_filename" in general_config:
opts.aggregate_json_filename = general_config["aggregate_json_filename"]
if "forensic_json_filename" in general_config:
opts.forensic_json_filename = general_config["forensic_json_filename"]
if "smtp_tls_json_filename" in general_config:
opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"]
if "aggregate_csv_filename" in general_config:
opts.aggregate_csv_filename = general_config["aggregate_csv_filename"]
if "forensic_csv_filename" in general_config:
opts.forensic_csv_filename = general_config["forensic_csv_filename"]
if "smtp_tls_csv_filename" in general_config:
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config:
opts.nameservers = _str_to_list(general_config["nameservers"])
# nameservers pre-flight check
dummy_hostname = None
try:
dummy_hostname = get_reverse_dns(
opts.dns_test_address,
nameservers=opts.nameservers,
timeout=opts.dns_timeout,
)
except Exception as ns_error:
raise ConfigurationError(
"DNS pre-flight check failed: {}".format(ns_error)
)
if not dummy_hostname:
raise ConfigurationError(
"DNS pre-flight check failed: no PTR record for {} from {}".format(
opts.dns_test_address, opts.nameservers
)
)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug"))
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
opts.fail_on_output_error = bool(
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = general_config["log_file"]
if "n_procs" in general_config:
opts.n_procs = general_config.getint("n_procs")
if "ip_db_path" in general_config:
opts.ip_db_path = general_config["ip_db_path"]
else:
opts.ip_db_path = None
if "always_use_local_files" in general_config:
opts.always_use_local_files = bool(
general_config.getboolean("always_use_local_files")
)
if "local_reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"]
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_map_url"]
if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
if "msgraph" in config.sections():
opts.mailbox_reports_folder = "Inbox"
if "reports_folder" in mailbox_config:
opts.mailbox_reports_folder = mailbox_config["reports_folder"]
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config:
opts.mailbox_check_timeout = mailbox_config.getint("check_timeout")
if "since" in mailbox_config:
opts.mailbox_since = mailbox_config["since"]
if "imap" in config.sections():
imap_config = config["imap"]
if "watch" in imap_config:
logger.warning(
"Starting in 8.0.0, the watch option has been "
"moved from the imap configuration section to "
"the mailbox configuration section."
)
if "host" in imap_config:
opts.imap_host = imap_config["host"]
else:
raise ConfigurationError(
"host setting missing from the imap config section"
)
if "port" in imap_config:
opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout")
if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool(
imap_config.getboolean("skip_certificate_verification")
)
if "user" in imap_config:
opts.imap_user = imap_config["user"]
else:
raise ConfigurationError(
"user setting missing from the imap config section"
)
if "password" in imap_config:
opts.imap_password = imap_config["password"]
else:
raise ConfigurationError(
"password setting missing from the imap config section"
)
if "reports_folder" in imap_config:
opts.mailbox_reports_folder = imap_config["reports_folder"]
logger.warning(
"Use of the reports_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "archive_folder" in imap_config:
opts.mailbox_archive_folder = imap_config["archive_folder"]
logger.warning(
"Use of the archive_folder option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
logger.warning(
"Use of the watch option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "delete" in imap_config:
logger.warning(
"Use of the delete option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test"))
logger.warning(
"Use of the test option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "batch_size" in imap_config:
opts.mailbox_batch_size = imap_config.getint("batch_size")
logger.warning(
"Use of the batch_size option in the imap "
"configuration section has been deprecated. "
"Use this option in the mailbox configuration "
"section instead."
)
if "msgraph" in config.sections():
graph_config = config["msgraph"]
opts.graph_token_file = graph_config.get("token_file", ".token")
if "auth_method" not in graph_config:
logger.info(
"auth_method setting missing from the "
"msgraph config section "
"defaulting to UsernamePassword"
)
opts.graph_auth_method = AuthMethod.UsernamePassword.name
else:
opts.graph_auth_method = graph_config["auth_method"]
if opts.graph_auth_method == AuthMethod.UsernamePassword.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
else:
raise ConfigurationError(
"user setting missing from the msgraph config section"
)
if "password" in graph_config:
opts.graph_password = graph_config["password"]
else:
raise ConfigurationError(
"password setting missing from the msgraph config section"
)
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.DeviceCode.name:
if "user" in graph_config:
opts.graph_user = graph_config["user"]
if opts.graph_auth_method != AuthMethod.UsernamePassword.name:
if "tenant_id" in graph_config:
opts.graph_tenant_id = graph_config["tenant_id"]
else:
raise ConfigurationError(
"tenant_id setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.ClientSecret.name:
if "client_secret" in graph_config:
opts.graph_client_secret = graph_config["client_secret"]
else:
raise ConfigurationError(
"client_secret setting missing from the msgraph config section"
)
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = graph_config["certificate_path"]
else:
raise ConfigurationError(
"certificate_path setting missing from the msgraph config section"
)
if "certificate_password" in graph_config:
opts.graph_certificate_password = graph_config["certificate_password"]
if "client_id" in graph_config:
opts.graph_client_id = graph_config["client_id"]
else:
raise ConfigurationError(
"client_id setting missing from the msgraph config section"
)
if "mailbox" in graph_config:
opts.graph_mailbox = graph_config["mailbox"]
elif opts.graph_auth_method != AuthMethod.UsernamePassword.name:
raise ConfigurationError(
"mailbox setting missing from the msgraph config section"
)
if "graph_url" in graph_config:
opts.graph_url = graph_config["graph_url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
graph_config.getboolean("allow_unencrypted_storage")
)
if "elasticsearch" in config:
elasticsearch_config = config["elasticsearch"]
if "hosts" in elasticsearch_config:
opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the elasticsearch config section"
)
if "timeout" in elasticsearch_config:
timeout = elasticsearch_config.getfloat("timeout")
opts.elasticsearch_timeout = timeout
if "number_of_shards" in elasticsearch_config:
number_of_shards = elasticsearch_config.getint("number_of_shards")
opts.elasticsearch_number_of_shards = number_of_shards
if "number_of_replicas" in elasticsearch_config:
number_of_replicas = elasticsearch_config.getint("number_of_replicas")
opts.elasticsearch_number_of_replicas = number_of_replicas
if "index_suffix" in elasticsearch_config:
opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"]
if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
opts.elasticsearch_password = elasticsearch_config["password"]
# Until 8.20
if "apiKey" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["apiKey"]
# Since 8.20
if "api_key" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["api_key"]
if "opensearch" in config:
opensearch_config = config["opensearch"]
if "hosts" in opensearch_config:
opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the opensearch config section"
)
if "timeout" in opensearch_config:
timeout = opensearch_config.getfloat("timeout")
opts.opensearch_timeout = timeout
if "number_of_shards" in opensearch_config:
number_of_shards = opensearch_config.getint("number_of_shards")
opts.opensearch_number_of_shards = number_of_shards
if "number_of_replicas" in opensearch_config:
number_of_replicas = opensearch_config.getint("number_of_replicas")
opts.opensearch_number_of_replicas = number_of_replicas
if "index_suffix" in opensearch_config:
opts.opensearch_index_suffix = opensearch_config["index_suffix"]
if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
opts.opensearch_password = opensearch_config["password"]
# Until 8.20
if "apiKey" in opensearch_config:
opts.opensearch_api_key = opensearch_config["apiKey"]
# Since 8.20
if "api_key" in opensearch_config:
opts.opensearch_api_key = opensearch_config["api_key"]
if "auth_type" in opensearch_config:
opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower()
elif "authentication_type" in opensearch_config:
opts.opensearch_auth_type = (
opensearch_config["authentication_type"].strip().lower()
)
if "aws_region" in opensearch_config:
opts.opensearch_aws_region = opensearch_config["aws_region"].strip()
if "aws_service" in opensearch_config:
opts.opensearch_aws_service = opensearch_config["aws_service"].strip()
if "splunk_hec" in config.sections():
hec_config = config["splunk_hec"]
if "url" in hec_config:
opts.hec = hec_config["url"]
else:
raise ConfigurationError(
"url setting missing from the splunk_hec config section"
)
if "token" in hec_config:
opts.hec_token = hec_config["token"]
else:
raise ConfigurationError(
"token setting missing from the splunk_hec config section"
)
if "index" in hec_config:
opts.hec_index = hec_config["index"]
else:
raise ConfigurationError(
"index setting missing from the splunk_hec config section"
)
if "skip_certificate_verification" in hec_config:
opts.hec_skip_certificate_verification = hec_config[
"skip_certificate_verification"
]
if "kafka" in config.sections():
kafka_config = config["kafka"]
if "hosts" in kafka_config:
opts.kafka_hosts = _str_to_list(kafka_config["hosts"])
else:
raise ConfigurationError(
"hosts setting missing from the kafka config section"
)
if "user" in kafka_config:
opts.kafka_username = kafka_config["user"]
if "password" in kafka_config:
opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
if "skip_certificate_verification" in kafka_config:
kafka_verify = bool(
kafka_config.getboolean("skip_certificate_verification")
)
opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
else:
raise ConfigurationError(
"aggregate_topic setting missing from the kafka config section"
)
if "forensic_topic" in kafka_config:
opts.kafka_forensic_topic = kafka_config["forensic_topic"]
else:
logger.critical(
"forensic_topic setting missing from the kafka config section"
)
if "smtp_tls_topic" in kafka_config:
opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"]
else:
logger.critical(
"forensic_topic setting missing from the splunk_hec config section"
)
if "smtp" in config.sections():
smtp_config = config["smtp"]
if "host" in smtp_config:
opts.smtp_host = smtp_config["host"]
else:
raise ConfigurationError(
"host setting missing from the smtp config section"
)
if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
if "skip_certificate_verification" in smtp_config:
smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification"))
opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config:
opts.smtp_user = smtp_config["user"]
else:
raise ConfigurationError(
"user setting missing from the smtp config section"
)
if "password" in smtp_config:
opts.smtp_password = smtp_config["password"]
else:
raise ConfigurationError(
"password setting missing from the smtp config section"
)
if "from" in smtp_config:
opts.smtp_from = smtp_config["from"]
else:
logger.critical("from setting missing from the smtp config section")
if "to" in smtp_config:
opts.smtp_to = _str_to_list(smtp_config["to"])
else:
logger.critical("to setting missing from the smtp config section")
if "subject" in smtp_config:
opts.smtp_subject = smtp_config["subject"]
if "attachment" in smtp_config:
opts.smtp_attachment = smtp_config["attachment"]
if "message" in smtp_config:
opts.smtp_message = smtp_config["message"]
if "s3" in config.sections():
s3_config = config["s3"]
if "bucket" in s3_config:
opts.s3_bucket = s3_config["bucket"]
else:
raise ConfigurationError(
"bucket setting missing from the s3 config section"
)
if "path" in s3_config:
opts.s3_path = s3_config["path"]
if opts.s3_path.startswith("/"):
opts.s3_path = opts.s3_path[1:]
if opts.s3_path.endswith("/"):
opts.s3_path = opts.s3_path[:-1]
else:
opts.s3_path = ""
if "region_name" in s3_config:
opts.s3_region_name = s3_config["region_name"]
if "endpoint_url" in s3_config:
opts.s3_endpoint_url = s3_config["endpoint_url"]
if "access_key_id" in s3_config:
opts.s3_access_key_id = s3_config["access_key_id"]
if "secret_access_key" in s3_config:
opts.s3_secret_access_key = s3_config["secret_access_key"]
if "syslog" in config.sections():
syslog_config = config["syslog"]
if "server" in syslog_config:
opts.syslog_server = syslog_config["server"]
else:
raise ConfigurationError(
"server setting missing from the syslog config section"
)
if "port" in syslog_config:
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
)
opts.gmail_api_paginate_messages = bool(
gmail_api_config.getboolean("paginate_messages", True)
)
default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify"
opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope)
opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes)
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"service_account_user"
).strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"delegated_user"
).strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
opts.maildir_path = maildir_api_config.get("maildir_path")
opts.maildir_create = bool(
maildir_api_config.getboolean("maildir_create", fallback=False)
)
if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"]
opts.la_client_id = log_analytics_config.get("client_id")
opts.la_client_secret = log_analytics_config.get("client_secret")
opts.la_tenant_id = log_analytics_config.get("tenant_id")
opts.la_dce = log_analytics_config.get("dce")
opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id")
opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream")
opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream")
opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream")
if "gelf" in config.sections():
gelf_config = config["gelf"]
if "host" in gelf_config:
opts.gelf_host = gelf_config["host"]
else:
raise ConfigurationError(
"host setting missing from the gelf config section"
)
if "port" in gelf_config:
opts.gelf_port = gelf_config["port"]
else:
raise ConfigurationError(
"port setting missing from the gelf config section"
)
if "mode" in gelf_config:
opts.gelf_mode = gelf_config["mode"]
else:
raise ConfigurationError(
"mode setting missing from the gelf config section"
)
if "webhook" in config.sections():
webhook_config = config["webhook"]
if "aggregate_url" in webhook_config:
opts.webhook_aggregate_url = webhook_config["aggregate_url"]
if "forensic_url" in webhook_config:
opts.webhook_forensic_url = webhook_config["forensic_url"]
if "smtp_tls_url" in webhook_config:
opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"]
if "timeout" in webhook_config:
opts.webhook_timeout = webhook_config.getint("timeout")
return index_prefix_domain_map
def _init_output_clients(opts):
"""Create output clients based on current opts.
Returns:
dict of client instances keyed by name.
Raises:
ConfigurationError: If a required output client cannot be created.
"""
clients = {}
if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls:
if opts.elasticsearch_hosts:
es_aggregate_index = "dmarc_aggregate"
es_forensic_index = "dmarc_forensic"
es_smtp_tls_index = "smtp_tls"
if opts.elasticsearch_index_suffix:
suffix = opts.elasticsearch_index_suffix
es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix)
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix)
if opts.elasticsearch_index_prefix:
prefix = opts.elasticsearch_index_prefix
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
forensic_indexes=[es_forensic_index],
)
if opts.opensearch_hosts:
os_aggregate_index = "dmarc_aggregate"
os_forensic_index = "dmarc_forensic"
os_smtp_tls_index = "smtp_tls"
if opts.opensearch_index_suffix:
suffix = opts.opensearch_index_suffix
os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix)
os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix)
os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix)
if opts.opensearch_index_prefix:
prefix = opts.opensearch_index_prefix
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
forensic_indexes=[os_forensic_index],
)
if opts.s3_bucket:
clients["s3_client"] = s3.S3Client(
bucket_name=opts.s3_bucket,
bucket_path=opts.s3_path,
region_name=opts.s3_region_name,
endpoint_url=opts.s3_endpoint_url,
access_key_id=opts.s3_access_key_id,
secret_access_key=opts.s3_secret_access_key,
)
if opts.syslog_server:
clients["syslog_client"] = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
if opts.hec:
if opts.hec_token is None or opts.hec_index is None:
raise ConfigurationError(
"HEC token and HEC index are required when using HEC URL"
)
verify = True
if opts.hec_skip_certificate_verification:
verify = False
clients["hec_client"] = splunk.HECClient(
opts.hec, opts.hec_token, opts.hec_index, verify=verify
)
if opts.kafka_hosts:
ssl_context = None
if opts.kafka_skip_certificate_verification:
logger.debug("Skipping Kafka certificate verification")
ssl_context = create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = CERT_NONE
clients["kafka_client"] = kafkaclient.KafkaClient(
opts.kafka_hosts,
username=opts.kafka_username,
password=opts.kafka_password,
ssl_context=ssl_context,
)
if opts.gelf_host:
clients["gelf_client"] = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
if (
opts.webhook_aggregate_url
or opts.webhook_forensic_url
or opts.webhook_smtp_tls_url
):
clients["webhook_client"] = webhook.WebhookClient(
aggregate_url=opts.webhook_aggregate_url,
forensic_url=opts.webhook_forensic_url,
smtp_tls_url=opts.webhook_smtp_tls_url,
timeout=opts.webhook_timeout,
)
return clients
def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
domain = report["policy_published"]["domain"]
elif "reported_domain" in report:
domain = report("reported_domain")
elif "policies" in report:
domain = report["policies"][0]["domain"]
if domain:
domain = get_base_domain(domain)
for prefix in index_prefix_domain_map:
if domain in index_prefix_domain_map[prefix]:
prefix = (
prefix.lower()
.strip()
.strip("_")
.replace(" ", "_")
.replace("-", "_")
)
prefix = f"{prefix}_"
return prefix
return None
def process_reports(reports_):
output_errors = []
def log_output_error(destination, error):
message = f"{destination} Error: {error}"
logger.error(message)
output_errors.append(message)
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
)
if not opts.silent:
print(output_str)
if opts.output:
save_output(
reports_,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
smtp_tls_json_filename=opts.smtp_tls_json_filename,
aggregate_csv_filename=opts.aggregate_csv_filename,
forensic_csv_filename=opts.forensic_csv_filename,
smtp_tls_csv_filename=opts.smtp_tls_csv_filename,
)
kafka_client = clients.get("kafka_client")
s3_client = clients.get("s3_client")
syslog_client = clients.get("syslog_client")
hec_client = clients.get("hec_client")
gelf_client = clients.get("gelf_client")
webhook_client = clients.get("webhook_client")
kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
if opts.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
if opts.elasticsearch_hosts:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
elastic.save_aggregate_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except Exception as error_:
log_output_error("Elasticsearch exception", error_.__str__())
try:
if opts.opensearch_hosts:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
opensearch.save_aggregate_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except Exception as error_:
log_output_error("OpenSearch exception", error_.__str__())
try:
if kafka_client:
kafka_client.save_aggregate_reports_to_kafka(
report, kafka_aggregate_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_aggregate_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_aggregate_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_aggregate_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
aggregate_reports_ = reports_["aggregate_reports"]
if len(aggregate_reports_) > 0:
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_forensic:
for report in reports_["forensic_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_forensic_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_forensic_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_forensic_reports_to_kafka(
report, kafka_forensic_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_forensic_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_forensic_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_forensic_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_forensic_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
forensic_reports_ = reports_["forensic_reports"]
if len(forensic_reports_) > 0:
hec_client.save_forensic_reports_to_splunk(forensic_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.save_smtp_tls:
for report in reports_["smtp_tls_reports"]:
try:
shards = opts.elasticsearch_number_of_shards
replicas = opts.elasticsearch_number_of_replicas
if opts.elasticsearch_hosts:
elastic.save_smtp_tls_report_to_elasticsearch(
report,
index_suffix=opts.elasticsearch_index_suffix,
index_prefix=opts.elasticsearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.elasticsearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
replicas = opts.opensearch_number_of_replicas
if opts.opensearch_hosts:
opensearch.save_smtp_tls_report_to_opensearch(
report,
index_suffix=opts.opensearch_index_suffix,
index_prefix=opts.opensearch_index_prefix
or get_index_prefix(report),
monthly_indexes=opts.opensearch_monthly_indexes,
number_of_shards=shards,
number_of_replicas=replicas,
)
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__())
try:
if kafka_client:
kafka_client.save_smtp_tls_reports_to_kafka(
[report], kafka_smtp_tls_topic
)
except Exception as error_:
log_output_error("Kafka", error_.__str__())
try:
if s3_client:
s3_client.save_smtp_tls_report_to_s3(report)
except Exception as error_:
log_output_error("S3", error_.__str__())
try:
if syslog_client:
syslog_client.save_smtp_tls_report_to_syslog(report)
except Exception as error_:
log_output_error("Syslog", error_.__str__())
try:
if gelf_client:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_smtp_tls_url and webhook_client:
indent_value = 2 if opts.prettify_json else None
webhook_client.save_smtp_tls_report_to_webhook(
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
log_output_error("Webhook", error_.__str__())
if hec_client:
try:
smtp_tls_reports_ = reports_["smtp_tls_reports"]
if len(smtp_tls_reports_) > 0:
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__())
if opts.la_dce:
try:
la_client = loganalytics.LogAnalyticsClient(
client_id=opts.la_client_id,
client_secret=opts.la_client_secret,
tenant_id=opts.la_tenant_id,
dce=opts.la_dce,
dcr_immutable_id=opts.la_dcr_immutable_id,
dcr_aggregate_stream=opts.la_dcr_aggregate_stream,
dcr_forensic_stream=opts.la_dcr_forensic_stream,
dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream,
)
la_client.publish_results(
reports_,
opts.save_aggregate,
opts.save_forensic,
opts.save_smtp_tls,
)
except loganalytics.LogAnalyticsException as e:
log_output_error("Log Analytics", e.__str__())
except Exception as e:
log_output_error("Log Analytics", f"Unknown publishing error: {e}")
if opts.fail_on_output_error and output_errors:
raise ParserError(
"Output destination failures detected: {0}".format(
" | ".join(output_errors)
)
)
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument(
"-c",
"--config-file",
help="a path to a configuration file (--silent implied)",
)
arg_parser.add_argument(
"file_path",
nargs="*",
help="one or more paths to aggregate or forensic "
"report files, emails, or mbox files'",
)
strip_attachment_help = "remove attachment payloads from forensic report output"
arg_parser.add_argument(
"--strip-attachment-payloads", help=strip_attachment_help, action="store_true"
)
arg_parser.add_argument(
"-o", "--output", help="write output files to the given directory"
)
arg_parser.add_argument(
"--aggregate-json-filename",
help="filename for the aggregate JSON output file",
default="aggregate.json",
)
arg_parser.add_argument(
"--forensic-json-filename",
help="filename for the forensic JSON output file",
default="forensic.json",
)
arg_parser.add_argument(
"--smtp-tls-json-filename",
help="filename for the SMTP TLS JSON output file",
default="smtp_tls.json",
)
arg_parser.add_argument(
"--aggregate-csv-filename",
help="filename for the aggregate CSV output file",
default="aggregate.csv",
)
arg_parser.add_argument(
"--forensic-csv-filename",
help="filename for the forensic CSV output file",
default="forensic.csv",
)
arg_parser.add_argument(
"--smtp-tls-csv-filename",
help="filename for the SMTP TLS CSV output file",
default="smtp_tls.csv",
)
arg_parser.add_argument(
"-n", "--nameservers", nargs="+", help="nameservers to query"
)
arg_parser.add_argument(
"-t",
"--dns_timeout",
help="number of seconds to wait for an answer from DNS (default: 2.0)",
type=float,
default=2.0,
)
arg_parser.add_argument(
"--offline",
action="store_true",
help="do not make online queries for geolocation or DNS",
)
arg_parser.add_argument(
"-s", "--silent", action="store_true", help="only print errors"
)
arg_parser.add_argument(
"-w",
"--warnings",
action="store_true",
help="print warnings in addition to errors",
)
arg_parser.add_argument(
"--verbose", action="store_true", help="more verbose output"
)
arg_parser.add_argument(
"--debug", action="store_true", help="print debugging information"
)
arg_parser.add_argument("--log-file", default=None, help="output logging to a file")
arg_parser.add_argument(
"--no-prettify-json",
action="store_false",
dest="prettify_json",
help="output JSON in a single line without indentation",
)
arg_parser.add_argument("-v", "--version", action="version", version=__version__)
aggregate_reports = []
forensic_reports = []
smtp_tls_reports = []
args = arg_parser.parse_args()
opts = Namespace(
file_path=args.file_path,
config_file=args.config_file,
offline=args.offline,
strip_attachment_payloads=args.strip_attachment_payloads,
output=args.output,
aggregate_csv_filename=args.aggregate_csv_filename,
aggregate_json_filename=args.aggregate_json_filename,
forensic_csv_filename=args.forensic_csv_filename,
forensic_json_filename=args.forensic_json_filename,
smtp_tls_json_filename=args.smtp_tls_json_filename,
smtp_tls_csv_filename=args.smtp_tls_csv_filename,
nameservers=args.nameservers,
dns_test_address="1.1.1.1",
silent=args.silent,
warnings=args.warnings,
dns_timeout=args.dns_timeout,
debug=args.debug,
verbose=args.verbose,
prettify_json=args.prettify_json,
save_aggregate=False,
save_forensic=False,
save_smtp_tls=False,
mailbox_reports_folder="INBOX",
mailbox_archive_folder="Archive",
mailbox_watch=False,
mailbox_delete=False,
mailbox_test=False,
mailbox_batch_size=10,
mailbox_check_timeout=30,
mailbox_since=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
imap_port=993,
imap_timeout=30,
imap_max_retries=4,
imap_user=None,
imap_password=None,
graph_auth_method=None,
graph_user=None,
graph_password=None,
graph_client_id=None,
graph_client_secret=None,
graph_certificate_path=None,
graph_certificate_password=None,
graph_tenant_id=None,
graph_mailbox=None,
graph_allow_unencrypted_storage=False,
graph_url="https://graph.microsoft.com",
hec=None,
hec_token=None,
hec_index=None,
hec_skip_certificate_verification=False,
elasticsearch_hosts=None,
elasticsearch_timeout=60,
elasticsearch_number_of_shards=1,
elasticsearch_number_of_replicas=0,
elasticsearch_index_suffix=None,
elasticsearch_index_prefix=None,
elasticsearch_ssl=True,
elasticsearch_ssl_cert_path=None,
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
elasticsearch_api_key=None,
opensearch_hosts=None,
opensearch_timeout=60,
opensearch_number_of_shards=1,
opensearch_number_of_replicas=0,
opensearch_index_suffix=None,
opensearch_index_prefix=None,
opensearch_ssl=True,
opensearch_ssl_cert_path=None,
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
opensearch_api_key=None,
opensearch_auth_type="basic",
opensearch_aws_region=None,
opensearch_aws_service="es",
kafka_hosts=None,
kafka_username=None,
kafka_password=None,
kafka_aggregate_topic=None,
kafka_forensic_topic=None,
kafka_smtp_tls_topic=None,
kafka_ssl=False,
kafka_skip_certificate_verification=False,
smtp_host=None,
smtp_port=25,
smtp_ssl=False,
smtp_skip_certificate_verification=False,
smtp_user=None,
smtp_password=None,
smtp_from=None,
smtp_to=[],
smtp_subject="parsedmarc report",
smtp_message="Please see the attached DMARC results.",
s3_bucket=None,
s3_path=None,
s3_region_name=None,
s3_endpoint_url=None,
s3_access_key_id=None,
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
gmail_api_paginate_messages=True,
gmail_api_scopes=[],
gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None,
maildir_create=False,
log_file=args.log_file,
n_procs=1,
ip_db_path=None,
always_use_local_files=False,
reverse_dns_map_path=None,
reverse_dns_map_url=None,
la_client_id=None,
la_client_secret=None,
la_tenant_id=None,
la_dce=None,
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_forensic_stream=None,
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
webhook_aggregate_url=None,
webhook_forensic_url=None,
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
)
args = arg_parser.parse_args()
index_prefix_domain_map = None
if args.config_file:
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
except ConfigurationError as e:
logger.error(str(e))
exit(-1)
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
if opts.log_file:
try:
fh = logging.FileHandler(opts.log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except Exception as error:
logger.warning("Unable to write to log file: {}".format(error))
if (
opts.imap_host is None
and opts.graph_client_id is None
and opts.gmail_api_credentials_file is None
and opts.maildir_path is None
and len(opts.file_path) == 0
):
logger.error("You must supply input files or a mailbox connection")
exit(1)
logger.info("Starting parsedmarc")
# Initialize output clients
try:
clients = _init_output_clients(opts)
except elastic.ElasticsearchError:
logger.exception("Elasticsearch Error")
exit(1)
except opensearch.OpenSearchError:
logger.exception("OpenSearch Error")
exit(1)
except ConfigurationError as e:
logger.error(str(e))
exit(1)
except Exception as error_:
logger.error("Output client error: {0}".format(error_))
exit(1)
file_paths = []
mbox_paths = []
for file_path in args.file_path:
file_paths += glob(file_path)
for file_path in file_paths:
if is_mbox(file_path):
mbox_paths.append(file_path)
file_paths = list(set(file_paths))
mbox_paths = list(set(mbox_paths))
for mbox_path in mbox_paths:
file_paths.remove(mbox_path)
counter = 0
results = []
pbar = None
if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1)
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
processes = []
connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
if proc_index >= len(file_paths):
break
parent_conn, child_conn = Pipe()
connections.append(parent_conn)
process = Process(
target=cli_parse,
args=(
file_paths[proc_index],
opts.strip_attachment_payloads,
opts.nameservers,
opts.dns_timeout,
opts.ip_db_path,
opts.offline,
opts.always_use_local_files,
opts.reverse_dns_map_path,
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
for proc in processes:
proc.start()
for conn in connections:
results.append(conn.recv())
for proc in processes:
proc.join()
if pbar is not None:
counter += 1
pbar.update(1)
if pbar is not None:
pbar.close()
for result in results:
if isinstance(result[0], ParserError) or result[0] is None:
logger.error("Failed to parse {0} - {1}".format(result[1], result[0]))
else:
if result[0]["report_type"] == "aggregate":
report_org = result[0]["report"]["report_metadata"]["org_name"]
report_id = result[0]["report"]["report_metadata"]["report_id"]
report_key = f"{report_org}_{report_id}"
if report_key not in SEEN_AGGREGATE_REPORT_IDS:
SEEN_AGGREGATE_REPORT_IDS[report_key] = True
aggregate_reports.append(result[0]["report"])
else:
logger.debug(
"Skipping duplicate aggregate report "
f"from {report_org} with ID: {report_id}"
)
elif result[0]["report_type"] == "forensic":
forensic_reports.append(result[0]["report"])
elif result[0]["report_type"] == "smtp_tls":
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox(
mbox_path,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=strip,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
logger.error(
"IMAP user and password must be specified ifhost is specified"
)
ssl = True
verify = True
if opts.imap_skip_certificate_verification:
logger.debug("Skipping IMAP certificate verification")
verify = False
if not opts.imap_ssl:
ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=imap_port_value,
ssl=ssl,
verify=verify,
timeout=imap_timeout,
max_retries=imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
)
except Exception:
logger.exception("IMAP Error")
exit(1)
if opts.graph_client_id:
try:
mailbox = opts.graph_mailbox or opts.graph_user
mailbox_connection = MSGraphConnection(
auth_method=opts.graph_auth_method,
mailbox=mailbox,
tenant_id=opts.graph_tenant_id,
client_id=opts.graph_client_id,
client_secret=opts.graph_client_secret,
certificate_path=opts.graph_certificate_path,
certificate_password=opts.graph_certificate_password,
username=opts.graph_user,
password=opts.graph_password,
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
graph_url=opts.graph_url,
)
except Exception:
logger.exception("MS Graph Error")
exit(1)
if opts.gmail_api_credentials_file:
if opts.mailbox_delete:
if "https://mail.google.com/" not in opts.gmail_api_scopes:
logger.error(
"Message deletion requires scope"
" 'https://mail.google.com/'. "
"Add the scope and remove token file "
"to acquire proper access."
)
opts.mailbox_delete = False
try:
mailbox_connection = GmailConnection(
credentials_file=opts.gmail_api_credentials_file,
token_file=opts.gmail_api_token_file,
scopes=opts.gmail_api_scopes,
include_spam_trash=opts.gmail_api_include_spam_trash,
paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
)
except Exception:
logger.exception("Gmail API Error")
exit(1)
if opts.maildir_path:
try:
mailbox_connection = MaildirConnection(
maildir_path=opts.maildir_path,
maildir_create=opts.maildir_create,
)
except Exception:
logger.exception("Maildir Error")
exit(1)
if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
except Exception:
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
try:
process_reports(parsing_results)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if opts.smtp_host:
try:
verify = True
if opts.smtp_skip_certificate_verification:
verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,
port=smtp_port_value,
verify=verify,
username=opts.smtp_user,
password=opts.smtp_password,
subject=opts.smtp_subject,
require_encryption=opts.smtp_ssl,
)
except Exception:
logger.exception("Failed to email results")
exit(1)
# SIGHUP-based config reload for watch mode
_reload_requested = False
def _handle_sighup(signum, frame):
nonlocal _reload_requested
_reload_requested = True
logger.info("SIGHUP received, config will reload after current batch")
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, _handle_sighup)
if mailbox_connection and opts.mailbox_watch:
logger.info("Watching for email - Quit with ctrl-c")
while True:
_reload_requested = False
try:
watch_inbox(
mailbox_connection=mailbox_connection,
callback=process_reports,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value,
since=opts.mailbox_since,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
should_reload=lambda: _reload_requested,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if not _reload_requested:
break
# Reload configuration
logger.info("Reloading configuration...")
old_opts_snapshot = Namespace(**vars(opts))
try:
index_prefix_domain_map = _parse_config_file(args.config_file, opts)
clients = _init_output_clients(opts)
# Update watch parameters from reloaded config
mailbox_batch_size_value = (
int(opts.mailbox_batch_size)
if opts.mailbox_batch_size is not None
else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
# Update log level
logger.setLevel(logging.ERROR)
if opts.warnings:
logger.setLevel(logging.WARNING)
if opts.verbose:
logger.setLevel(logging.INFO)
if opts.debug:
logger.setLevel(logging.DEBUG)
logger.info("Configuration reloaded successfully")
except Exception:
logger.exception(
"Config reload failed, continuing with previous config"
)
# Restore old opts
for k, v in vars(old_opts_snapshot).items():
setattr(opts, k, v)
if __name__ == "__main__":
_main()