From dfdffe4947c1a70a024df847c0ab8af62502a5b6 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Fri, 20 Mar 2026 15:00:21 -0400 Subject: [PATCH] Enhance mailbox connection watch method to support reload functionality - Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so. - Modified related tests to accommodate the new method signature. - Changed logger calls from `critical` to `error` for consistency in logging severity. - Added a new settings file for Claude with specific permissions for testing and code checks. --- .claude/settings.json | 16 + CHANGELOG.md | 13 + docs/source/usage.md | 42 +- parsedmarc/__init__.py | 9 +- parsedmarc/cli.py | 1798 +++++++++++++------------ parsedmarc/mail/gmail.py | 4 +- parsedmarc/mail/graph.py | 4 +- parsedmarc/mail/imap.py | 4 +- parsedmarc/mail/mailbox_connection.py | 2 +- parsedmarc/mail/maildir.py | 4 +- tests.py | 20 +- 11 files changed, 1042 insertions(+), 874 deletions(-) create mode 100644 .claude/settings.json diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..33dfb92 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,16 @@ +{ + "permissions": { + "allow": [ + "Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")", + "Bash(ruff check:*)", + "Bash(ruff format:*)", + "Bash(GITHUB_ACTIONS=true pytest --cov tests.py)", + "Bash(ls tests*)", + "Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)", + "Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)" + ], + "additionalDirectories": [ + "/tmp" + ] + } +} diff --git a/CHANGELOG.md b/CHANGELOG.md index db0de04..dc44fdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## 9.3.0 + +### Added + +- SIGHUP-based configuration reload for watch mode — update output + destinations, DNS/GeoIP settings, processing flags, and log level + without restarting the service or interrupting in-progress report + processing. Use `systemctl reload parsedmarc` when running under + systemd. +- Extracted `_parse_config_file()` and `_init_output_clients()` from + `_main()` in `cli.py` to support config reload and reduce code + duplication. + ## 9.2.1 ### Added diff --git a/docs/source/usage.md b/docs/source/usage.md index c97c872..cd4640e 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -404,6 +404,7 @@ The full set of configuration options are: retry_attempts = 3 retry_delay = 5 ``` + - `gmail_api` - `credentials_file` - str: Path to file containing the credentials, None to disable (Default: `None`) @@ -442,7 +443,7 @@ The full set of configuration options are: - `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR :::{note} - Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). + Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). ::: - `gelf` - `host` - str: The GELF server name or IP address @@ -602,6 +603,7 @@ After=network.target network-online.target elasticsearch.service [Service] ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini +ExecReload=/bin/kill -HUP $MAINPID User=parsedmarc Group=parsedmarc Restart=always @@ -634,6 +636,44 @@ sudo service parsedmarc restart ::: +### Reloading configuration without restarting + +When running in watch mode, `parsedmarc` supports reloading its +configuration file without restarting the service or interrupting +report processing that is already in progress. Send a `SIGHUP` signal +to the process, or use `systemctl reload` if the unit file includes +the `ExecReload` line shown above: + +```bash +sudo systemctl reload parsedmarc +``` + +The reload takes effect after the current batch of reports finishes +processing and all output operations (Elasticsearch, Kafka, S3, etc.) +for that batch have completed. The following settings are reloaded: + +- All output destinations (Elasticsearch, OpenSearch, Kafka, S3, + Splunk, syslog, GELF, webhooks, Log Analytics) +- Multi-tenant index prefix domain map (`index_prefix_domain_map` — + the referenced YAML file is re-read on reload) +- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`, + `offline`, etc.) +- Processing flags (`strip_attachment_payloads`, `batch_size`, + `check_timeout`, etc.) +- Log level (`debug`, `verbose`, `warnings`, `silent`) + +Mailbox connection settings (IMAP host/credentials, Microsoft Graph, +Gmail API, Maildir path) are **not** reloaded — changing those still +requires a full restart. + +If the new configuration file contains errors, the reload is aborted +and the previous configuration remains active. Check the logs for +details: + +```bash +journalctl -u parsedmarc.service -r +``` + To check the status of the service, run: ```bash diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 2188f91..a20501e 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2195,6 +2195,7 @@ def watch_inbox( batch_size: int = 10, since: Optional[Union[datetime, date, str]] = None, normalize_timespan_threshold_hours: float = 24, + should_reload: Optional[Callable] = None, ): """ Watches the mailbox for new messages and @@ -2222,6 +2223,8 @@ def watch_inbox( batch_size (int): Number of messages to read and process before saving since: Search for messages since certain time normalize_timespan_threshold_hours (float): Normalize timespans beyond this + should_reload: Optional callable that returns True when a config + reload has been requested (e.g. via SIGHUP) """ def check_callback(connection): @@ -2246,7 +2249,11 @@ def watch_inbox( ) callback(res) - mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout) + mailbox_connection.watch( + check_callback=check_callback, + check_timeout=check_timeout, + should_reload=should_reload, + ) def append_json( diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 402e45e..c1ab8a0 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -7,6 +7,7 @@ import http.client import json import logging import os +import signal import sys from argparse import ArgumentParser, Namespace from configparser import ConfigParser @@ -165,6 +166,808 @@ def cli_parse( conn.close() +class ConfigurationError(Exception): + """Raised when a configuration file has missing or invalid settings.""" + + pass + + +def _parse_config_file(config_file, opts): + """Parse a config file and update opts in place. + + Args: + config_file: Path to the .ini config file + opts: Namespace object to update with parsed values + + Returns: + index_prefix_domain_map or None + + Raises: + ConfigurationError: If required settings are missing or invalid. + """ + abs_path = os.path.abspath(config_file) + if not os.path.exists(abs_path): + raise ConfigurationError("A file does not exist at {0}".format(abs_path)) + opts.silent = True + config = ConfigParser() + index_prefix_domain_map = None + config.read(config_file) + if "general" in config.sections(): + general_config = config["general"] + if "silent" in general_config: + opts.silent = bool(general_config.getboolean("silent")) + if "normalize_timespan_threshold_hours" in general_config: + opts.normalize_timespan_threshold_hours = general_config.getfloat( + "normalize_timespan_threshold_hours" + ) + if "index_prefix_domain_map" in general_config: + with open(general_config["index_prefix_domain_map"]) as f: + index_prefix_domain_map = yaml.safe_load(f) + if "offline" in general_config: + opts.offline = bool(general_config.getboolean("offline")) + if "strip_attachment_payloads" in general_config: + opts.strip_attachment_payloads = bool( + general_config.getboolean("strip_attachment_payloads") + ) + if "output" in general_config: + opts.output = general_config["output"] + if "aggregate_json_filename" in general_config: + opts.aggregate_json_filename = general_config["aggregate_json_filename"] + if "forensic_json_filename" in general_config: + opts.forensic_json_filename = general_config["forensic_json_filename"] + if "smtp_tls_json_filename" in general_config: + opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"] + if "aggregate_csv_filename" in general_config: + opts.aggregate_csv_filename = general_config["aggregate_csv_filename"] + if "forensic_csv_filename" in general_config: + opts.forensic_csv_filename = general_config["forensic_csv_filename"] + if "smtp_tls_csv_filename" in general_config: + opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"] + if "dns_timeout" in general_config: + opts.dns_timeout = general_config.getfloat("dns_timeout") + if opts.dns_timeout is None: + opts.dns_timeout = 2 + if "dns_test_address" in general_config: + opts.dns_test_address = general_config["dns_test_address"] + if "nameservers" in general_config: + opts.nameservers = _str_to_list(general_config["nameservers"]) + # nameservers pre-flight check + dummy_hostname = None + try: + dummy_hostname = get_reverse_dns( + opts.dns_test_address, + nameservers=opts.nameservers, + timeout=opts.dns_timeout, + ) + except Exception as ns_error: + raise ConfigurationError( + "DNS pre-flight check failed: {}".format(ns_error) + ) + if not dummy_hostname: + raise ConfigurationError( + "DNS pre-flight check failed: no PTR record for {} from {}".format( + opts.dns_test_address, opts.nameservers + ) + ) + if "save_aggregate" in general_config: + opts.save_aggregate = bool(general_config.getboolean("save_aggregate")) + if "save_forensic" in general_config: + opts.save_forensic = bool(general_config.getboolean("save_forensic")) + if "save_smtp_tls" in general_config: + opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls")) + if "debug" in general_config: + opts.debug = bool(general_config.getboolean("debug")) + if "verbose" in general_config: + opts.verbose = bool(general_config.getboolean("verbose")) + if "silent" in general_config: + opts.silent = bool(general_config.getboolean("silent")) + if "warnings" in general_config: + opts.warnings = bool(general_config.getboolean("warnings")) + if "fail_on_output_error" in general_config: + opts.fail_on_output_error = bool( + general_config.getboolean("fail_on_output_error") + ) + if "log_file" in general_config: + opts.log_file = general_config["log_file"] + if "n_procs" in general_config: + opts.n_procs = general_config.getint("n_procs") + if "ip_db_path" in general_config: + opts.ip_db_path = general_config["ip_db_path"] + else: + opts.ip_db_path = None + if "always_use_local_files" in general_config: + opts.always_use_local_files = bool( + general_config.getboolean("always_use_local_files") + ) + if "reverse_dns_map_path" in general_config: + opts.reverse_dns_map_path = general_config["reverse_dns_path"] + if "reverse_dns_map_url" in general_config: + opts.reverse_dns_map_url = general_config["reverse_dns_url"] + if "prettify_json" in general_config: + opts.prettify_json = bool(general_config.getboolean("prettify_json")) + + if "mailbox" in config.sections(): + mailbox_config = config["mailbox"] + if "msgraph" in config.sections(): + opts.mailbox_reports_folder = "Inbox" + if "reports_folder" in mailbox_config: + opts.mailbox_reports_folder = mailbox_config["reports_folder"] + if "archive_folder" in mailbox_config: + opts.mailbox_archive_folder = mailbox_config["archive_folder"] + if "watch" in mailbox_config: + opts.mailbox_watch = bool(mailbox_config.getboolean("watch")) + if "delete" in mailbox_config: + opts.mailbox_delete = bool(mailbox_config.getboolean("delete")) + if "test" in mailbox_config: + opts.mailbox_test = bool(mailbox_config.getboolean("test")) + if "batch_size" in mailbox_config: + opts.mailbox_batch_size = mailbox_config.getint("batch_size") + if "check_timeout" in mailbox_config: + opts.mailbox_check_timeout = mailbox_config.getint("check_timeout") + if "since" in mailbox_config: + opts.mailbox_since = mailbox_config["since"] + + if "imap" in config.sections(): + imap_config = config["imap"] + if "watch" in imap_config: + logger.warning( + "Starting in 8.0.0, the watch option has been " + "moved from the imap configuration section to " + "the mailbox configuration section." + ) + if "host" in imap_config: + opts.imap_host = imap_config["host"] + else: + raise ConfigurationError( + "host setting missing from the imap config section" + ) + if "port" in imap_config: + opts.imap_port = imap_config.getint("port") + if "timeout" in imap_config: + opts.imap_timeout = imap_config.getint("timeout") + if "max_retries" in imap_config: + opts.imap_max_retries = imap_config.getint("max_retries") + if "ssl" in imap_config: + opts.imap_ssl = bool(imap_config.getboolean("ssl")) + if "skip_certificate_verification" in imap_config: + opts.imap_skip_certificate_verification = bool( + imap_config.getboolean("skip_certificate_verification") + ) + if "user" in imap_config: + opts.imap_user = imap_config["user"] + else: + raise ConfigurationError( + "user setting missing from the imap config section" + ) + if "password" in imap_config: + opts.imap_password = imap_config["password"] + else: + raise ConfigurationError( + "password setting missing from the imap config section" + ) + if "reports_folder" in imap_config: + opts.mailbox_reports_folder = imap_config["reports_folder"] + logger.warning( + "Use of the reports_folder option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "archive_folder" in imap_config: + opts.mailbox_archive_folder = imap_config["archive_folder"] + logger.warning( + "Use of the archive_folder option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "watch" in imap_config: + opts.mailbox_watch = bool(imap_config.getboolean("watch")) + logger.warning( + "Use of the watch option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "delete" in imap_config: + logger.warning( + "Use of the delete option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "test" in imap_config: + opts.mailbox_test = bool(imap_config.getboolean("test")) + logger.warning( + "Use of the test option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "batch_size" in imap_config: + opts.mailbox_batch_size = imap_config.getint("batch_size") + logger.warning( + "Use of the batch_size option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + + if "msgraph" in config.sections(): + graph_config = config["msgraph"] + opts.graph_token_file = graph_config.get("token_file", ".token") + + if "auth_method" not in graph_config: + logger.info( + "auth_method setting missing from the " + "msgraph config section " + "defaulting to UsernamePassword" + ) + opts.graph_auth_method = AuthMethod.UsernamePassword.name + else: + opts.graph_auth_method = graph_config["auth_method"] + + if opts.graph_auth_method == AuthMethod.UsernamePassword.name: + if "user" in graph_config: + opts.graph_user = graph_config["user"] + else: + raise ConfigurationError( + "user setting missing from the msgraph config section" + ) + if "password" in graph_config: + opts.graph_password = graph_config["password"] + else: + raise ConfigurationError( + "password setting missing from the msgraph config section" + ) + if "client_secret" in graph_config: + opts.graph_client_secret = graph_config["client_secret"] + else: + raise ConfigurationError( + "client_secret setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.DeviceCode.name: + if "user" in graph_config: + opts.graph_user = graph_config["user"] + + if opts.graph_auth_method != AuthMethod.UsernamePassword.name: + if "tenant_id" in graph_config: + opts.graph_tenant_id = graph_config["tenant_id"] + else: + raise ConfigurationError( + "tenant_id setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.ClientSecret.name: + if "client_secret" in graph_config: + opts.graph_client_secret = graph_config["client_secret"] + else: + raise ConfigurationError( + "client_secret setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.Certificate.name: + if "certificate_path" in graph_config: + opts.graph_certificate_path = graph_config["certificate_path"] + else: + raise ConfigurationError( + "certificate_path setting missing from the msgraph config section" + ) + if "certificate_password" in graph_config: + opts.graph_certificate_password = graph_config["certificate_password"] + + if "client_id" in graph_config: + opts.graph_client_id = graph_config["client_id"] + else: + raise ConfigurationError( + "client_id setting missing from the msgraph config section" + ) + + if "mailbox" in graph_config: + opts.graph_mailbox = graph_config["mailbox"] + elif opts.graph_auth_method != AuthMethod.UsernamePassword.name: + raise ConfigurationError( + "mailbox setting missing from the msgraph config section" + ) + + if "graph_url" in graph_config: + opts.graph_url = graph_config["graph_url"] + + if "allow_unencrypted_storage" in graph_config: + opts.graph_allow_unencrypted_storage = bool( + graph_config.getboolean("allow_unencrypted_storage") + ) + + if "elasticsearch" in config: + elasticsearch_config = config["elasticsearch"] + if "hosts" in elasticsearch_config: + opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the elasticsearch config section" + ) + if "timeout" in elasticsearch_config: + timeout = elasticsearch_config.getfloat("timeout") + opts.elasticsearch_timeout = timeout + if "number_of_shards" in elasticsearch_config: + number_of_shards = elasticsearch_config.getint("number_of_shards") + opts.elasticsearch_number_of_shards = number_of_shards + if "number_of_replicas" in elasticsearch_config: + number_of_replicas = elasticsearch_config.getint("number_of_replicas") + opts.elasticsearch_number_of_replicas = number_of_replicas + if "index_suffix" in elasticsearch_config: + opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"] + if "index_prefix" in elasticsearch_config: + opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"] + if "monthly_indexes" in elasticsearch_config: + monthly = bool(elasticsearch_config.getboolean("monthly_indexes")) + opts.elasticsearch_monthly_indexes = monthly + if "ssl" in elasticsearch_config: + opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) + if "cert_path" in elasticsearch_config: + opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] + if "user" in elasticsearch_config: + opts.elasticsearch_username = elasticsearch_config["user"] + if "password" in elasticsearch_config: + opts.elasticsearch_password = elasticsearch_config["password"] + # Until 8.20 + if "apiKey" in elasticsearch_config: + opts.elasticsearch_api_key = elasticsearch_config["apiKey"] + # Since 8.20 + if "api_key" in elasticsearch_config: + opts.elasticsearch_api_key = elasticsearch_config["api_key"] + + if "opensearch" in config: + opensearch_config = config["opensearch"] + if "hosts" in opensearch_config: + opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the opensearch config section" + ) + if "timeout" in opensearch_config: + timeout = opensearch_config.getfloat("timeout") + opts.opensearch_timeout = timeout + if "number_of_shards" in opensearch_config: + number_of_shards = opensearch_config.getint("number_of_shards") + opts.opensearch_number_of_shards = number_of_shards + if "number_of_replicas" in opensearch_config: + number_of_replicas = opensearch_config.getint("number_of_replicas") + opts.opensearch_number_of_replicas = number_of_replicas + if "index_suffix" in opensearch_config: + opts.opensearch_index_suffix = opensearch_config["index_suffix"] + if "index_prefix" in opensearch_config: + opts.opensearch_index_prefix = opensearch_config["index_prefix"] + if "monthly_indexes" in opensearch_config: + monthly = bool(opensearch_config.getboolean("monthly_indexes")) + opts.opensearch_monthly_indexes = monthly + if "ssl" in opensearch_config: + opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) + if "cert_path" in opensearch_config: + opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] + if "user" in opensearch_config: + opts.opensearch_username = opensearch_config["user"] + if "password" in opensearch_config: + opts.opensearch_password = opensearch_config["password"] + # Until 8.20 + if "apiKey" in opensearch_config: + opts.opensearch_api_key = opensearch_config["apiKey"] + # Since 8.20 + if "api_key" in opensearch_config: + opts.opensearch_api_key = opensearch_config["api_key"] + if "auth_type" in opensearch_config: + opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower() + elif "authentication_type" in opensearch_config: + opts.opensearch_auth_type = ( + opensearch_config["authentication_type"].strip().lower() + ) + if "aws_region" in opensearch_config: + opts.opensearch_aws_region = opensearch_config["aws_region"].strip() + if "aws_service" in opensearch_config: + opts.opensearch_aws_service = opensearch_config["aws_service"].strip() + + if "splunk_hec" in config.sections(): + hec_config = config["splunk_hec"] + if "url" in hec_config: + opts.hec = hec_config["url"] + else: + raise ConfigurationError( + "url setting missing from the splunk_hec config section" + ) + if "token" in hec_config: + opts.hec_token = hec_config["token"] + else: + raise ConfigurationError( + "token setting missing from the splunk_hec config section" + ) + if "index" in hec_config: + opts.hec_index = hec_config["index"] + else: + raise ConfigurationError( + "index setting missing from the splunk_hec config section" + ) + if "skip_certificate_verification" in hec_config: + opts.hec_skip_certificate_verification = hec_config[ + "skip_certificate_verification" + ] + + if "kafka" in config.sections(): + kafka_config = config["kafka"] + if "hosts" in kafka_config: + opts.kafka_hosts = _str_to_list(kafka_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the kafka config section" + ) + if "user" in kafka_config: + opts.kafka_username = kafka_config["user"] + if "password" in kafka_config: + opts.kafka_password = kafka_config["password"] + if "ssl" in kafka_config: + opts.kafka_ssl = bool(kafka_config.getboolean("ssl")) + if "skip_certificate_verification" in kafka_config: + kafka_verify = bool( + kafka_config.getboolean("skip_certificate_verification") + ) + opts.kafka_skip_certificate_verification = kafka_verify + if "aggregate_topic" in kafka_config: + opts.kafka_aggregate_topic = kafka_config["aggregate_topic"] + else: + raise ConfigurationError( + "aggregate_topic setting missing from the kafka config section" + ) + if "forensic_topic" in kafka_config: + opts.kafka_forensic_topic = kafka_config["forensic_topic"] + else: + logger.critical( + "forensic_topic setting missing from the kafka config section" + ) + if "smtp_tls_topic" in kafka_config: + opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"] + else: + logger.critical( + "forensic_topic setting missing from the splunk_hec config section" + ) + + if "smtp" in config.sections(): + smtp_config = config["smtp"] + if "host" in smtp_config: + opts.smtp_host = smtp_config["host"] + else: + raise ConfigurationError( + "host setting missing from the smtp config section" + ) + if "port" in smtp_config: + opts.smtp_port = smtp_config.getint("port") + if "ssl" in smtp_config: + opts.smtp_ssl = bool(smtp_config.getboolean("ssl")) + if "skip_certificate_verification" in smtp_config: + smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification")) + opts.smtp_skip_certificate_verification = smtp_verify + if "user" in smtp_config: + opts.smtp_user = smtp_config["user"] + else: + raise ConfigurationError( + "user setting missing from the smtp config section" + ) + if "password" in smtp_config: + opts.smtp_password = smtp_config["password"] + else: + raise ConfigurationError( + "password setting missing from the smtp config section" + ) + if "from" in smtp_config: + opts.smtp_from = smtp_config["from"] + else: + logger.critical("from setting missing from the smtp config section") + if "to" in smtp_config: + opts.smtp_to = _str_to_list(smtp_config["to"]) + else: + logger.critical("to setting missing from the smtp config section") + if "subject" in smtp_config: + opts.smtp_subject = smtp_config["subject"] + if "attachment" in smtp_config: + opts.smtp_attachment = smtp_config["attachment"] + if "message" in smtp_config: + opts.smtp_message = smtp_config["message"] + + if "s3" in config.sections(): + s3_config = config["s3"] + if "bucket" in s3_config: + opts.s3_bucket = s3_config["bucket"] + else: + raise ConfigurationError( + "bucket setting missing from the s3 config section" + ) + if "path" in s3_config: + opts.s3_path = s3_config["path"] + if opts.s3_path.startswith("/"): + opts.s3_path = opts.s3_path[1:] + if opts.s3_path.endswith("/"): + opts.s3_path = opts.s3_path[:-1] + else: + opts.s3_path = "" + + if "region_name" in s3_config: + opts.s3_region_name = s3_config["region_name"] + if "endpoint_url" in s3_config: + opts.s3_endpoint_url = s3_config["endpoint_url"] + if "access_key_id" in s3_config: + opts.s3_access_key_id = s3_config["access_key_id"] + if "secret_access_key" in s3_config: + opts.s3_secret_access_key = s3_config["secret_access_key"] + + if "syslog" in config.sections(): + syslog_config = config["syslog"] + if "server" in syslog_config: + opts.syslog_server = syslog_config["server"] + else: + raise ConfigurationError( + "server setting missing from the syslog config section" + ) + if "port" in syslog_config: + opts.syslog_port = syslog_config["port"] + else: + opts.syslog_port = 514 + if "protocol" in syslog_config: + opts.syslog_protocol = syslog_config["protocol"] + else: + opts.syslog_protocol = "udp" + if "cafile_path" in syslog_config: + opts.syslog_cafile_path = syslog_config["cafile_path"] + if "certfile_path" in syslog_config: + opts.syslog_certfile_path = syslog_config["certfile_path"] + if "keyfile_path" in syslog_config: + opts.syslog_keyfile_path = syslog_config["keyfile_path"] + if "timeout" in syslog_config: + opts.syslog_timeout = float(syslog_config["timeout"]) + else: + opts.syslog_timeout = 5.0 + if "retry_attempts" in syslog_config: + opts.syslog_retry_attempts = int(syslog_config["retry_attempts"]) + else: + opts.syslog_retry_attempts = 3 + if "retry_delay" in syslog_config: + opts.syslog_retry_delay = int(syslog_config["retry_delay"]) + else: + opts.syslog_retry_delay = 5 + + if "gmail_api" in config.sections(): + gmail_api_config = config["gmail_api"] + opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file") + opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") + opts.gmail_api_include_spam_trash = bool( + gmail_api_config.getboolean("include_spam_trash", False) + ) + opts.gmail_api_paginate_messages = bool( + gmail_api_config.getboolean("paginate_messages", True) + ) + default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify" + opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope) + opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes) + if "oauth2_port" in gmail_api_config: + opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080) + if "auth_mode" in gmail_api_config: + opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip() + if "service_account_user" in gmail_api_config: + opts.gmail_api_service_account_user = gmail_api_config.get( + "service_account_user" + ).strip() + elif "delegated_user" in gmail_api_config: + opts.gmail_api_service_account_user = gmail_api_config.get( + "delegated_user" + ).strip() + + if "maildir" in config.sections(): + maildir_api_config = config["maildir"] + opts.maildir_path = maildir_api_config.get("maildir_path") + opts.maildir_create = bool( + maildir_api_config.getboolean("maildir_create", fallback=False) + ) + + if "log_analytics" in config.sections(): + log_analytics_config = config["log_analytics"] + opts.la_client_id = log_analytics_config.get("client_id") + opts.la_client_secret = log_analytics_config.get("client_secret") + opts.la_tenant_id = log_analytics_config.get("tenant_id") + opts.la_dce = log_analytics_config.get("dce") + opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id") + opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream") + opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream") + opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream") + + if "gelf" in config.sections(): + gelf_config = config["gelf"] + if "host" in gelf_config: + opts.gelf_host = gelf_config["host"] + else: + raise ConfigurationError( + "host setting missing from the gelf config section" + ) + if "port" in gelf_config: + opts.gelf_port = gelf_config["port"] + else: + raise ConfigurationError( + "port setting missing from the gelf config section" + ) + if "mode" in gelf_config: + opts.gelf_mode = gelf_config["mode"] + else: + raise ConfigurationError( + "mode setting missing from the gelf config section" + ) + + if "webhook" in config.sections(): + webhook_config = config["webhook"] + if "aggregate_url" in webhook_config: + opts.webhook_aggregate_url = webhook_config["aggregate_url"] + if "forensic_url" in webhook_config: + opts.webhook_forensic_url = webhook_config["forensic_url"] + if "smtp_tls_url" in webhook_config: + opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"] + if "timeout" in webhook_config: + opts.webhook_timeout = webhook_config.getint("timeout") + + return index_prefix_domain_map + + +def _init_output_clients(opts): + """Create output clients based on current opts. + + Returns: + dict of client instances keyed by name. + + Raises: + ConfigurationError: If a required output client cannot be created. + """ + clients = {} + + if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: + if opts.elasticsearch_hosts: + es_aggregate_index = "dmarc_aggregate" + es_forensic_index = "dmarc_forensic" + es_smtp_tls_index = "smtp_tls" + if opts.elasticsearch_index_suffix: + suffix = opts.elasticsearch_index_suffix + es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) + es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) + es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) + if opts.elasticsearch_index_prefix: + prefix = opts.elasticsearch_index_prefix + es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) + es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) + es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) + elastic_timeout_value = ( + float(opts.elasticsearch_timeout) + if opts.elasticsearch_timeout is not None + else 60.0 + ) + elastic.set_hosts( + opts.elasticsearch_hosts, + use_ssl=opts.elasticsearch_ssl, + ssl_cert_path=opts.elasticsearch_ssl_cert_path, + username=opts.elasticsearch_username, + password=opts.elasticsearch_password, + api_key=opts.elasticsearch_api_key, + timeout=elastic_timeout_value, + ) + elastic.migrate_indexes( + aggregate_indexes=[es_aggregate_index], + forensic_indexes=[es_forensic_index], + ) + + if opts.opensearch_hosts: + os_aggregate_index = "dmarc_aggregate" + os_forensic_index = "dmarc_forensic" + os_smtp_tls_index = "smtp_tls" + if opts.opensearch_index_suffix: + suffix = opts.opensearch_index_suffix + os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) + os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) + os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) + if opts.opensearch_index_prefix: + prefix = opts.opensearch_index_prefix + os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) + os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) + os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) + opensearch_timeout_value = ( + float(opts.opensearch_timeout) + if opts.opensearch_timeout is not None + else 60.0 + ) + opensearch.set_hosts( + opts.opensearch_hosts, + use_ssl=opts.opensearch_ssl, + ssl_cert_path=opts.opensearch_ssl_cert_path, + username=opts.opensearch_username, + password=opts.opensearch_password, + api_key=opts.opensearch_api_key, + timeout=opensearch_timeout_value, + auth_type=opts.opensearch_auth_type, + aws_region=opts.opensearch_aws_region, + aws_service=opts.opensearch_aws_service, + ) + opensearch.migrate_indexes( + aggregate_indexes=[os_aggregate_index], + forensic_indexes=[os_forensic_index], + ) + + if opts.s3_bucket: + clients["s3_client"] = s3.S3Client( + bucket_name=opts.s3_bucket, + bucket_path=opts.s3_path, + region_name=opts.s3_region_name, + endpoint_url=opts.s3_endpoint_url, + access_key_id=opts.s3_access_key_id, + secret_access_key=opts.s3_secret_access_key, + ) + + if opts.syslog_server: + clients["syslog_client"] = syslog.SyslogClient( + server_name=opts.syslog_server, + server_port=int(opts.syslog_port), + protocol=opts.syslog_protocol or "udp", + cafile_path=opts.syslog_cafile_path, + certfile_path=opts.syslog_certfile_path, + keyfile_path=opts.syslog_keyfile_path, + timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, + retry_attempts=opts.syslog_retry_attempts + if opts.syslog_retry_attempts is not None + else 3, + retry_delay=opts.syslog_retry_delay + if opts.syslog_retry_delay is not None + else 5, + ) + + if opts.hec: + if opts.hec_token is None or opts.hec_index is None: + raise ConfigurationError( + "HEC token and HEC index are required when using HEC URL" + ) + verify = True + if opts.hec_skip_certificate_verification: + verify = False + clients["hec_client"] = splunk.HECClient( + opts.hec, opts.hec_token, opts.hec_index, verify=verify + ) + + if opts.kafka_hosts: + ssl_context = None + if opts.kafka_skip_certificate_verification: + logger.debug("Skipping Kafka certificate verification") + ssl_context = create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = CERT_NONE + clients["kafka_client"] = kafkaclient.KafkaClient( + opts.kafka_hosts, + username=opts.kafka_username, + password=opts.kafka_password, + ssl_context=ssl_context, + ) + + if opts.gelf_host: + clients["gelf_client"] = gelf.GelfClient( + host=opts.gelf_host, + port=int(opts.gelf_port), + mode=opts.gelf_mode, + ) + + if ( + opts.webhook_aggregate_url + or opts.webhook_forensic_url + or opts.webhook_smtp_tls_url + ): + clients["webhook_client"] = webhook.WebhookClient( + aggregate_url=opts.webhook_aggregate_url, + forensic_url=opts.webhook_forensic_url, + smtp_tls_url=opts.webhook_smtp_tls_url, + timeout=opts.webhook_timeout, + ) + + return clients + + def _main(): """Called when the module is executed""" @@ -219,6 +1022,18 @@ def _main(): forensic_csv_filename=opts.forensic_csv_filename, smtp_tls_csv_filename=opts.smtp_tls_csv_filename, ) + + kafka_client = clients.get("kafka_client") + s3_client = clients.get("s3_client") + syslog_client = clients.get("syslog_client") + hec_client = clients.get("hec_client") + gelf_client = clients.get("gelf_client") + webhook_client = clients.get("webhook_client") + + kafka_aggregate_topic = opts.kafka_aggregate_topic + kafka_forensic_topic = opts.kafka_forensic_topic + kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic + if opts.save_aggregate: for report in reports_["aggregate_reports"]: try: @@ -262,7 +1077,7 @@ def _main(): log_output_error("OpenSearch exception", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_aggregate_reports_to_kafka( report, kafka_aggregate_topic ) @@ -270,25 +1085,25 @@ def _main(): log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_aggregate_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_aggregate_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_aggregate_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_aggregate_url: + if opts.webhook_aggregate_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_aggregate_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -296,7 +1111,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: aggregate_reports_ = reports_["aggregate_reports"] if len(aggregate_reports_) > 0: @@ -347,7 +1162,7 @@ def _main(): log_output_error("Invalid DMARC report", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_forensic_reports_to_kafka( report, kafka_forensic_topic ) @@ -355,25 +1170,25 @@ def _main(): log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_forensic_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_forensic_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_forensic_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_forensic_url: + if opts.webhook_forensic_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_forensic_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -381,7 +1196,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: forensic_reports_ = reports_["forensic_reports"] if len(forensic_reports_) > 0: @@ -432,33 +1247,33 @@ def _main(): log_output_error("Invalid DMARC report", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_smtp_tls_reports_to_kafka( - smtp_tls_reports, kafka_smtp_tls_topic + reports_["smtp_tls_reports"], kafka_smtp_tls_topic ) except Exception as error_: log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_smtp_tls_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_smtp_tls_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_smtp_tls_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_smtp_tls_url: + if opts.webhook_smtp_tls_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_smtp_tls_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -466,7 +1281,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: smtp_tls_reports_ = reports_["smtp_tls_reports"] if len(smtp_tls_reports_) > 0: @@ -598,8 +1413,6 @@ def _main(): args = arg_parser.parse_args() - default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify" - opts = Namespace( file_path=args.file_path, config_file=args.config_file, @@ -750,649 +1563,14 @@ def _main(): ) args = arg_parser.parse_args() + index_prefix_domain_map = None + if args.config_file: - abs_path = os.path.abspath(args.config_file) - if not os.path.exists(abs_path): - logger.error("A file does not exist at {0}".format(abs_path)) + try: + index_prefix_domain_map = _parse_config_file(args.config_file, opts) + except ConfigurationError as e: + logger.error(str(e)) exit(-1) - opts.silent = True - config = ConfigParser() - index_prefix_domain_map = None - config.read(args.config_file) - if "general" in config.sections(): - general_config = config["general"] - if "silent" in general_config: - opts.silent = bool(general_config.getboolean("silent")) - if "normalize_timespan_threshold_hours" in general_config: - opts.normalize_timespan_threshold_hours = general_config.getfloat( - "normalize_timespan_threshold_hours" - ) - if "index_prefix_domain_map" in general_config: - with open(general_config["index_prefix_domain_map"]) as f: - index_prefix_domain_map = yaml.safe_load(f) - if "offline" in general_config: - opts.offline = bool(general_config.getboolean("offline")) - if "strip_attachment_payloads" in general_config: - opts.strip_attachment_payloads = bool( - general_config.getboolean("strip_attachment_payloads") - ) - if "output" in general_config: - opts.output = general_config["output"] - if "aggregate_json_filename" in general_config: - opts.aggregate_json_filename = general_config["aggregate_json_filename"] - if "forensic_json_filename" in general_config: - opts.forensic_json_filename = general_config["forensic_json_filename"] - if "smtp_tls_json_filename" in general_config: - opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"] - if "aggregate_csv_filename" in general_config: - opts.aggregate_csv_filename = general_config["aggregate_csv_filename"] - if "forensic_csv_filename" in general_config: - opts.forensic_csv_filename = general_config["forensic_csv_filename"] - if "smtp_tls_csv_filename" in general_config: - opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"] - if "dns_timeout" in general_config: - opts.dns_timeout = general_config.getfloat("dns_timeout") - if opts.dns_timeout is None: - opts.dns_timeout = 2 - if "dns_test_address" in general_config: - opts.dns_test_address = general_config["dns_test_address"] - if "nameservers" in general_config: - opts.nameservers = _str_to_list(general_config["nameservers"]) - # nameservers pre-flight check - dummy_hostname = None - try: - dummy_hostname = get_reverse_dns( - opts.dns_test_address, - nameservers=opts.nameservers, - timeout=opts.dns_timeout, - ) - except Exception as ns_error: - logger.critical("DNS pre-flight check failed: {}".format(ns_error)) - exit(-1) - if not dummy_hostname: - logger.critical( - "DNS pre-flight check failed: no PTR record for " - "{} from {}".format(opts.dns_test_address, opts.nameservers) - ) - exit(-1) - if "save_aggregate" in general_config: - opts.save_aggregate = bool(general_config.getboolean("save_aggregate")) - if "save_forensic" in general_config: - opts.save_forensic = bool(general_config.getboolean("save_forensic")) - if "save_smtp_tls" in general_config: - opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls")) - if "debug" in general_config: - opts.debug = bool(general_config.getboolean("debug")) - if "verbose" in general_config: - opts.verbose = bool(general_config.getboolean("verbose")) - if "silent" in general_config: - opts.silent = bool(general_config.getboolean("silent")) - if "warnings" in general_config: - opts.warnings = bool(general_config.getboolean("warnings")) - if "fail_on_output_error" in general_config: - opts.fail_on_output_error = bool( - general_config.getboolean("fail_on_output_error") - ) - if "log_file" in general_config: - opts.log_file = general_config["log_file"] - if "n_procs" in general_config: - opts.n_procs = general_config.getint("n_procs") - if "ip_db_path" in general_config: - opts.ip_db_path = general_config["ip_db_path"] - else: - opts.ip_db_path = None - if "always_use_local_files" in general_config: - opts.always_use_local_files = bool( - general_config.getboolean("always_use_local_files") - ) - if "reverse_dns_map_path" in general_config: - opts.reverse_dns_map_path = general_config["reverse_dns_path"] - if "reverse_dns_map_url" in general_config: - opts.reverse_dns_map_url = general_config["reverse_dns_url"] - if "prettify_json" in general_config: - opts.prettify_json = bool(general_config.getboolean("prettify_json")) - - if "mailbox" in config.sections(): - mailbox_config = config["mailbox"] - if "msgraph" in config.sections(): - opts.mailbox_reports_folder = "Inbox" - if "reports_folder" in mailbox_config: - opts.mailbox_reports_folder = mailbox_config["reports_folder"] - if "archive_folder" in mailbox_config: - opts.mailbox_archive_folder = mailbox_config["archive_folder"] - if "watch" in mailbox_config: - opts.mailbox_watch = bool(mailbox_config.getboolean("watch")) - if "delete" in mailbox_config: - opts.mailbox_delete = bool(mailbox_config.getboolean("delete")) - if "test" in mailbox_config: - opts.mailbox_test = bool(mailbox_config.getboolean("test")) - if "batch_size" in mailbox_config: - opts.mailbox_batch_size = mailbox_config.getint("batch_size") - if "check_timeout" in mailbox_config: - opts.mailbox_check_timeout = mailbox_config.getint("check_timeout") - if "since" in mailbox_config: - opts.mailbox_since = mailbox_config["since"] - - if "imap" in config.sections(): - imap_config = config["imap"] - if "watch" in imap_config: - logger.warning( - "Starting in 8.0.0, the watch option has been " - "moved from the imap configuration section to " - "the mailbox configuration section." - ) - if "host" in imap_config: - opts.imap_host = imap_config["host"] - else: - logger.error("host setting missing from the imap config section") - exit(-1) - if "port" in imap_config: - opts.imap_port = imap_config.getint("port") - if "timeout" in imap_config: - opts.imap_timeout = imap_config.getint("timeout") - if "max_retries" in imap_config: - opts.imap_max_retries = imap_config.getint("max_retries") - if "ssl" in imap_config: - opts.imap_ssl = bool(imap_config.getboolean("ssl")) - if "skip_certificate_verification" in imap_config: - opts.imap_skip_certificate_verification = bool( - imap_config.getboolean("skip_certificate_verification") - ) - if "user" in imap_config: - opts.imap_user = imap_config["user"] - else: - logger.critical("user setting missing from the imap config section") - exit(-1) - if "password" in imap_config: - opts.imap_password = imap_config["password"] - else: - logger.critical("password setting missing from the imap config section") - exit(-1) - if "reports_folder" in imap_config: - opts.mailbox_reports_folder = imap_config["reports_folder"] - logger.warning( - "Use of the reports_folder option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "archive_folder" in imap_config: - opts.mailbox_archive_folder = imap_config["archive_folder"] - logger.warning( - "Use of the archive_folder option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "watch" in imap_config: - opts.mailbox_watch = bool(imap_config.getboolean("watch")) - logger.warning( - "Use of the watch option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "delete" in imap_config: - logger.warning( - "Use of the delete option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "test" in imap_config: - opts.mailbox_test = bool(imap_config.getboolean("test")) - logger.warning( - "Use of the test option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "batch_size" in imap_config: - opts.mailbox_batch_size = imap_config.getint("batch_size") - logger.warning( - "Use of the batch_size option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - - if "msgraph" in config.sections(): - graph_config = config["msgraph"] - opts.graph_token_file = graph_config.get("token_file", ".token") - - if "auth_method" not in graph_config: - logger.info( - "auth_method setting missing from the " - "msgraph config section " - "defaulting to UsernamePassword" - ) - opts.graph_auth_method = AuthMethod.UsernamePassword.name - else: - opts.graph_auth_method = graph_config["auth_method"] - - if opts.graph_auth_method == AuthMethod.UsernamePassword.name: - if "user" in graph_config: - opts.graph_user = graph_config["user"] - else: - logger.critical( - "user setting missing from the msgraph config section" - ) - exit(-1) - if "password" in graph_config: - opts.graph_password = graph_config["password"] - else: - logger.critical( - "password setting missing from the msgraph config section" - ) - exit(-1) - if "client_secret" in graph_config: - opts.graph_client_secret = graph_config["client_secret"] - else: - logger.critical( - "client_secret setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.DeviceCode.name: - if "user" in graph_config: - opts.graph_user = graph_config["user"] - - if opts.graph_auth_method != AuthMethod.UsernamePassword.name: - if "tenant_id" in graph_config: - opts.graph_tenant_id = graph_config["tenant_id"] - else: - logger.critical( - "tenant_id setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.ClientSecret.name: - if "client_secret" in graph_config: - opts.graph_client_secret = graph_config["client_secret"] - else: - logger.critical( - "client_secret setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.Certificate.name: - if "certificate_path" in graph_config: - opts.graph_certificate_path = graph_config["certificate_path"] - else: - logger.critical( - "certificate_path setting missing from the msgraph config section" - ) - exit(-1) - if "certificate_password" in graph_config: - opts.graph_certificate_password = graph_config[ - "certificate_password" - ] - - if "client_id" in graph_config: - opts.graph_client_id = graph_config["client_id"] - else: - logger.critical( - "client_id setting missing from the msgraph config section" - ) - exit(-1) - - if "mailbox" in graph_config: - opts.graph_mailbox = graph_config["mailbox"] - elif opts.graph_auth_method != AuthMethod.UsernamePassword.name: - logger.critical( - "mailbox setting missing from the msgraph config section" - ) - exit(-1) - - if "graph_url" in graph_config: - opts.graph_url = graph_config["graph_url"] - - if "allow_unencrypted_storage" in graph_config: - opts.graph_allow_unencrypted_storage = bool( - graph_config.getboolean("allow_unencrypted_storage") - ) - - if "elasticsearch" in config: - elasticsearch_config = config["elasticsearch"] - if "hosts" in elasticsearch_config: - opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"]) - else: - logger.critical( - "hosts setting missing from the elasticsearch config section" - ) - exit(-1) - if "timeout" in elasticsearch_config: - timeout = elasticsearch_config.getfloat("timeout") - opts.elasticsearch_timeout = timeout - if "number_of_shards" in elasticsearch_config: - number_of_shards = elasticsearch_config.getint("number_of_shards") - opts.elasticsearch_number_of_shards = number_of_shards - if "number_of_replicas" in elasticsearch_config: - number_of_replicas = elasticsearch_config.getint( - "number_of_replicas" - ) - opts.elasticsearch_number_of_replicas = number_of_replicas - if "index_suffix" in elasticsearch_config: - opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"] - if "index_prefix" in elasticsearch_config: - opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"] - if "monthly_indexes" in elasticsearch_config: - monthly = bool(elasticsearch_config.getboolean("monthly_indexes")) - opts.elasticsearch_monthly_indexes = monthly - if "ssl" in elasticsearch_config: - opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) - if "cert_path" in elasticsearch_config: - opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] - if "user" in elasticsearch_config: - opts.elasticsearch_username = elasticsearch_config["user"] - if "password" in elasticsearch_config: - opts.elasticsearch_password = elasticsearch_config["password"] - # Until 8.20 - if "apiKey" in elasticsearch_config: - opts.elasticsearch_api_key = elasticsearch_config["apiKey"] - # Since 8.20 - if "api_key" in elasticsearch_config: - opts.elasticsearch_api_key = elasticsearch_config["api_key"] - - if "opensearch" in config: - opensearch_config = config["opensearch"] - if "hosts" in opensearch_config: - opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"]) - else: - logger.critical( - "hosts setting missing from the opensearch config section" - ) - exit(-1) - if "timeout" in opensearch_config: - timeout = opensearch_config.getfloat("timeout") - opts.opensearch_timeout = timeout - if "number_of_shards" in opensearch_config: - number_of_shards = opensearch_config.getint("number_of_shards") - opts.opensearch_number_of_shards = number_of_shards - if "number_of_replicas" in opensearch_config: - number_of_replicas = opensearch_config.getint("number_of_replicas") - opts.opensearch_number_of_replicas = number_of_replicas - if "index_suffix" in opensearch_config: - opts.opensearch_index_suffix = opensearch_config["index_suffix"] - if "index_prefix" in opensearch_config: - opts.opensearch_index_prefix = opensearch_config["index_prefix"] - if "monthly_indexes" in opensearch_config: - monthly = bool(opensearch_config.getboolean("monthly_indexes")) - opts.opensearch_monthly_indexes = monthly - if "ssl" in opensearch_config: - opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) - if "cert_path" in opensearch_config: - opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] - if "user" in opensearch_config: - opts.opensearch_username = opensearch_config["user"] - if "password" in opensearch_config: - opts.opensearch_password = opensearch_config["password"] - # Until 8.20 - if "apiKey" in opensearch_config: - opts.opensearch_api_key = opensearch_config["apiKey"] - # Since 8.20 - if "api_key" in opensearch_config: - opts.opensearch_api_key = opensearch_config["api_key"] - if "auth_type" in opensearch_config: - opts.opensearch_auth_type = ( - opensearch_config["auth_type"].strip().lower() - ) - elif "authentication_type" in opensearch_config: - opts.opensearch_auth_type = ( - opensearch_config["authentication_type"].strip().lower() - ) - if "aws_region" in opensearch_config: - opts.opensearch_aws_region = opensearch_config["aws_region"].strip() - if "aws_service" in opensearch_config: - opts.opensearch_aws_service = opensearch_config["aws_service"].strip() - - if "splunk_hec" in config.sections(): - hec_config = config["splunk_hec"] - if "url" in hec_config: - opts.hec = hec_config["url"] - else: - logger.critical( - "url setting missing from the splunk_hec config section" - ) - exit(-1) - if "token" in hec_config: - opts.hec_token = hec_config["token"] - else: - logger.critical( - "token setting missing from the splunk_hec config section" - ) - exit(-1) - if "index" in hec_config: - opts.hec_index = hec_config["index"] - else: - logger.critical( - "index setting missing from the splunk_hec config section" - ) - exit(-1) - if "skip_certificate_verification" in hec_config: - opts.hec_skip_certificate_verification = hec_config[ - "skip_certificate_verification" - ] - - if "kafka" in config.sections(): - kafka_config = config["kafka"] - if "hosts" in kafka_config: - opts.kafka_hosts = _str_to_list(kafka_config["hosts"]) - else: - logger.critical("hosts setting missing from the kafka config section") - exit(-1) - if "user" in kafka_config: - opts.kafka_username = kafka_config["user"] - if "password" in kafka_config: - opts.kafka_password = kafka_config["password"] - if "ssl" in kafka_config: - opts.kafka_ssl = bool(kafka_config.getboolean("ssl")) - if "skip_certificate_verification" in kafka_config: - kafka_verify = bool( - kafka_config.getboolean("skip_certificate_verification") - ) - opts.kafka_skip_certificate_verification = kafka_verify - if "aggregate_topic" in kafka_config: - opts.kafka_aggregate_topic = kafka_config["aggregate_topic"] - else: - logger.critical( - "aggregate_topic setting missing from the kafka config section" - ) - exit(-1) - if "forensic_topic" in kafka_config: - opts.kafka_forensic_topic = kafka_config["forensic_topic"] - else: - logger.critical( - "forensic_topic setting missing from the kafka config section" - ) - if "smtp_tls_topic" in kafka_config: - opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"] - else: - logger.critical( - "forensic_topic setting missing from the splunk_hec config section" - ) - - if "smtp" in config.sections(): - smtp_config = config["smtp"] - if "host" in smtp_config: - opts.smtp_host = smtp_config["host"] - else: - logger.critical("host setting missing from the smtp config section") - exit(-1) - if "port" in smtp_config: - opts.smtp_port = smtp_config.getint("port") - if "ssl" in smtp_config: - opts.smtp_ssl = bool(smtp_config.getboolean("ssl")) - if "skip_certificate_verification" in smtp_config: - smtp_verify = bool( - smtp_config.getboolean("skip_certificate_verification") - ) - opts.smtp_skip_certificate_verification = smtp_verify - if "user" in smtp_config: - opts.smtp_user = smtp_config["user"] - else: - logger.critical("user setting missing from the smtp config section") - exit(-1) - if "password" in smtp_config: - opts.smtp_password = smtp_config["password"] - else: - logger.critical("password setting missing from the smtp config section") - exit(-1) - if "from" in smtp_config: - opts.smtp_from = smtp_config["from"] - else: - logger.critical("from setting missing from the smtp config section") - if "to" in smtp_config: - opts.smtp_to = _str_to_list(smtp_config["to"]) - else: - logger.critical("to setting missing from the smtp config section") - if "subject" in smtp_config: - opts.smtp_subject = smtp_config["subject"] - if "attachment" in smtp_config: - opts.smtp_attachment = smtp_config["attachment"] - if "message" in smtp_config: - opts.smtp_message = smtp_config["message"] - - if "s3" in config.sections(): - s3_config = config["s3"] - if "bucket" in s3_config: - opts.s3_bucket = s3_config["bucket"] - else: - logger.critical("bucket setting missing from the s3 config section") - exit(-1) - if "path" in s3_config: - opts.s3_path = s3_config["path"] - if opts.s3_path.startswith("/"): - opts.s3_path = opts.s3_path[1:] - if opts.s3_path.endswith("/"): - opts.s3_path = opts.s3_path[:-1] - else: - opts.s3_path = "" - - if "region_name" in s3_config: - opts.s3_region_name = s3_config["region_name"] - if "endpoint_url" in s3_config: - opts.s3_endpoint_url = s3_config["endpoint_url"] - if "access_key_id" in s3_config: - opts.s3_access_key_id = s3_config["access_key_id"] - if "secret_access_key" in s3_config: - opts.s3_secret_access_key = s3_config["secret_access_key"] - - if "syslog" in config.sections(): - syslog_config = config["syslog"] - if "server" in syslog_config: - opts.syslog_server = syslog_config["server"] - else: - logger.critical("server setting missing from the syslog config section") - exit(-1) - if "port" in syslog_config: - opts.syslog_port = syslog_config["port"] - else: - opts.syslog_port = 514 - if "protocol" in syslog_config: - opts.syslog_protocol = syslog_config["protocol"] - else: - opts.syslog_protocol = "udp" - if "cafile_path" in syslog_config: - opts.syslog_cafile_path = syslog_config["cafile_path"] - if "certfile_path" in syslog_config: - opts.syslog_certfile_path = syslog_config["certfile_path"] - if "keyfile_path" in syslog_config: - opts.syslog_keyfile_path = syslog_config["keyfile_path"] - if "timeout" in syslog_config: - opts.syslog_timeout = float(syslog_config["timeout"]) - else: - opts.syslog_timeout = 5.0 - if "retry_attempts" in syslog_config: - opts.syslog_retry_attempts = int(syslog_config["retry_attempts"]) - else: - opts.syslog_retry_attempts = 3 - if "retry_delay" in syslog_config: - opts.syslog_retry_delay = int(syslog_config["retry_delay"]) - else: - opts.syslog_retry_delay = 5 - - if "gmail_api" in config.sections(): - gmail_api_config = config["gmail_api"] - opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file") - opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") - opts.gmail_api_include_spam_trash = bool( - gmail_api_config.getboolean("include_spam_trash", False) - ) - opts.gmail_api_paginate_messages = bool( - gmail_api_config.getboolean("paginate_messages", True) - ) - opts.gmail_api_scopes = gmail_api_config.get( - "scopes", default_gmail_api_scope - ) - opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes) - if "oauth2_port" in gmail_api_config: - opts.gmail_api_oauth2_port = gmail_api_config.getint( - "oauth2_port", 8080 - ) - if "auth_mode" in gmail_api_config: - opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip() - if "service_account_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( - "service_account_user" - ).strip() - elif "delegated_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( - "delegated_user" - ).strip() - - if "maildir" in config.sections(): - maildir_api_config = config["maildir"] - opts.maildir_path = maildir_api_config.get("maildir_path") - opts.maildir_create = bool( - maildir_api_config.getboolean("maildir_create", fallback=False) - ) - - if "log_analytics" in config.sections(): - log_analytics_config = config["log_analytics"] - opts.la_client_id = log_analytics_config.get("client_id") - opts.la_client_secret = log_analytics_config.get("client_secret") - opts.la_tenant_id = log_analytics_config.get("tenant_id") - opts.la_dce = log_analytics_config.get("dce") - opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id") - opts.la_dcr_aggregate_stream = log_analytics_config.get( - "dcr_aggregate_stream" - ) - opts.la_dcr_forensic_stream = log_analytics_config.get( - "dcr_forensic_stream" - ) - opts.la_dcr_smtp_tls_stream = log_analytics_config.get( - "dcr_smtp_tls_stream" - ) - - if "gelf" in config.sections(): - gelf_config = config["gelf"] - if "host" in gelf_config: - opts.gelf_host = gelf_config["host"] - else: - logger.critical("host setting missing from the gelf config section") - exit(-1) - if "port" in gelf_config: - opts.gelf_port = gelf_config["port"] - else: - logger.critical("port setting missing from the gelf config section") - exit(-1) - if "mode" in gelf_config: - opts.gelf_mode = gelf_config["mode"] - else: - logger.critical("mode setting missing from the gelf config section") - exit(-1) - - if "webhook" in config.sections(): - webhook_config = config["webhook"] - if "aggregate_url" in webhook_config: - opts.webhook_aggregate_url = webhook_config["aggregate_url"] - if "forensic_url" in webhook_config: - opts.webhook_forensic_url = webhook_config["forensic_url"] - if "smtp_tls_url" in webhook_config: - opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"] - if "timeout" in webhook_config: - opts.webhook_timeout = webhook_config.getint("timeout") logger.setLevel(logging.ERROR) @@ -1425,174 +1603,21 @@ def _main(): logger.info("Starting parsedmarc") - if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: - try: - if opts.elasticsearch_hosts: - es_aggregate_index = "dmarc_aggregate" - es_forensic_index = "dmarc_forensic" - es_smtp_tls_index = "smtp_tls" - if opts.elasticsearch_index_suffix: - suffix = opts.elasticsearch_index_suffix - es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) - es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) - es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) - if opts.elasticsearch_index_prefix: - prefix = opts.elasticsearch_index_prefix - es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) - es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) - es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) - elastic_timeout_value = ( - float(opts.elasticsearch_timeout) - if opts.elasticsearch_timeout is not None - else 60.0 - ) - elastic.set_hosts( - opts.elasticsearch_hosts, - use_ssl=opts.elasticsearch_ssl, - ssl_cert_path=opts.elasticsearch_ssl_cert_path, - username=opts.elasticsearch_username, - password=opts.elasticsearch_password, - api_key=opts.elasticsearch_api_key, - timeout=elastic_timeout_value, - ) - elastic.migrate_indexes( - aggregate_indexes=[es_aggregate_index], - forensic_indexes=[es_forensic_index], - ) - except elastic.ElasticsearchError: - logger.exception("Elasticsearch Error") - exit(1) - - try: - if opts.opensearch_hosts: - os_aggregate_index = "dmarc_aggregate" - os_forensic_index = "dmarc_forensic" - os_smtp_tls_index = "smtp_tls" - if opts.opensearch_index_suffix: - suffix = opts.opensearch_index_suffix - os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) - os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) - os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) - if opts.opensearch_index_prefix: - prefix = opts.opensearch_index_prefix - os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) - os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) - os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) - opensearch_timeout_value = ( - float(opts.opensearch_timeout) - if opts.opensearch_timeout is not None - else 60.0 - ) - opensearch.set_hosts( - opts.opensearch_hosts, - use_ssl=opts.opensearch_ssl, - ssl_cert_path=opts.opensearch_ssl_cert_path, - username=opts.opensearch_username, - password=opts.opensearch_password, - api_key=opts.opensearch_api_key, - timeout=opensearch_timeout_value, - auth_type=opts.opensearch_auth_type, - aws_region=opts.opensearch_aws_region, - aws_service=opts.opensearch_aws_service, - ) - opensearch.migrate_indexes( - aggregate_indexes=[os_aggregate_index], - forensic_indexes=[os_forensic_index], - ) - except opensearch.OpenSearchError: - logger.exception("OpenSearch Error") - exit(1) - - if opts.s3_bucket: - try: - s3_client = s3.S3Client( - bucket_name=opts.s3_bucket, - bucket_path=opts.s3_path, - region_name=opts.s3_region_name, - endpoint_url=opts.s3_endpoint_url, - access_key_id=opts.s3_access_key_id, - secret_access_key=opts.s3_secret_access_key, - ) - except Exception as error_: - logger.error("S3 Error: {0}".format(error_.__str__())) - - if opts.syslog_server: - try: - syslog_client = syslog.SyslogClient( - server_name=opts.syslog_server, - server_port=int(opts.syslog_port), - protocol=opts.syslog_protocol or "udp", - cafile_path=opts.syslog_cafile_path, - certfile_path=opts.syslog_certfile_path, - keyfile_path=opts.syslog_keyfile_path, - timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, - retry_attempts=opts.syslog_retry_attempts - if opts.syslog_retry_attempts is not None - else 3, - retry_delay=opts.syslog_retry_delay - if opts.syslog_retry_delay is not None - else 5, - ) - except Exception as error_: - logger.error("Syslog Error: {0}".format(error_.__str__())) - - if opts.hec: - if opts.hec_token is None or opts.hec_index is None: - logger.error("HEC token and HEC index are required when using HEC URL") - exit(1) - - verify = True - if opts.hec_skip_certificate_verification: - verify = False - hec_client = splunk.HECClient( - opts.hec, opts.hec_token, opts.hec_index, verify=verify - ) - - if opts.kafka_hosts: - try: - ssl_context = None - if opts.kafka_skip_certificate_verification: - logger.debug("Skipping Kafka certificate verification") - ssl_context = create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = CERT_NONE - kafka_client = kafkaclient.KafkaClient( - opts.kafka_hosts, - username=opts.kafka_username, - password=opts.kafka_password, - ssl_context=ssl_context, - ) - except Exception as error_: - logger.error("Kafka Error: {0}".format(error_.__str__())) - - if opts.gelf_host: - try: - gelf_client = gelf.GelfClient( - host=opts.gelf_host, - port=int(opts.gelf_port), - mode=opts.gelf_mode, - ) - except Exception as error_: - logger.error("GELF Error: {0}".format(error_.__str__())) - - if ( - opts.webhook_aggregate_url - or opts.webhook_forensic_url - or opts.webhook_smtp_tls_url - ): - try: - webhook_client = webhook.WebhookClient( - aggregate_url=opts.webhook_aggregate_url, - forensic_url=opts.webhook_forensic_url, - smtp_tls_url=opts.webhook_smtp_tls_url, - timeout=opts.webhook_timeout, - ) - except Exception as error_: - logger.error("Webhook Error: {0}".format(error_.__str__())) - - kafka_aggregate_topic = opts.kafka_aggregate_topic - kafka_forensic_topic = opts.kafka_forensic_topic - kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic + # Initialize output clients + try: + clients = _init_output_clients(opts) + except elastic.ElasticsearchError: + logger.exception("Elasticsearch Error") + exit(1) + except opensearch.OpenSearchError: + logger.exception("OpenSearch Error") + exit(1) + except ConfigurationError as e: + logger.error(str(e)) + exit(1) + except Exception as error_: + logger.error("Output client error: {0}".format(error_)) + exit(1) file_paths = [] mbox_paths = [] @@ -1897,36 +1922,95 @@ def _main(): logger.exception("Failed to email results") exit(1) + # SIGHUP-based config reload for watch mode + _reload_requested = False + + def _handle_sighup(signum, frame): + nonlocal _reload_requested + _reload_requested = True + logger.info("SIGHUP received, config will reload after current batch") + + if hasattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, _handle_sighup) + if mailbox_connection and opts.mailbox_watch: logger.info("Watching for email - Quit with ctrl-c") - try: - watch_inbox( - mailbox_connection=mailbox_connection, - callback=process_reports, - reports_folder=opts.mailbox_reports_folder, - archive_folder=opts.mailbox_archive_folder, - delete=opts.mailbox_delete, - test=opts.mailbox_test, - check_timeout=mailbox_check_timeout_value, - nameservers=opts.nameservers, - dns_timeout=opts.dns_timeout, - strip_attachment_payloads=opts.strip_attachment_payloads, - batch_size=mailbox_batch_size_value, - since=opts.mailbox_since, - ip_db_path=opts.ip_db_path, - always_use_local_files=opts.always_use_local_files, - reverse_dns_map_path=opts.reverse_dns_map_path, - reverse_dns_map_url=opts.reverse_dns_map_url, - offline=opts.offline, - normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, - ) - except FileExistsError as error: - logger.error("{0}".format(error.__str__())) - exit(1) - except ParserError as error: - logger.error(error.__str__()) - exit(1) + while True: + _reload_requested = False + try: + watch_inbox( + mailbox_connection=mailbox_connection, + callback=process_reports, + reports_folder=opts.mailbox_reports_folder, + archive_folder=opts.mailbox_archive_folder, + delete=opts.mailbox_delete, + test=opts.mailbox_test, + check_timeout=mailbox_check_timeout_value, + nameservers=opts.nameservers, + dns_timeout=opts.dns_timeout, + strip_attachment_payloads=opts.strip_attachment_payloads, + batch_size=mailbox_batch_size_value, + since=opts.mailbox_since, + ip_db_path=opts.ip_db_path, + always_use_local_files=opts.always_use_local_files, + reverse_dns_map_path=opts.reverse_dns_map_path, + reverse_dns_map_url=opts.reverse_dns_map_url, + offline=opts.offline, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, + should_reload=lambda: _reload_requested, + ) + except FileExistsError as error: + logger.error("{0}".format(error.__str__())) + exit(1) + except ParserError as error: + logger.error(error.__str__()) + exit(1) + + if not _reload_requested: + break + + # Reload configuration + logger.info("Reloading configuration...") + old_opts_snapshot = Namespace(**vars(opts)) + try: + index_prefix_domain_map = _parse_config_file(args.config_file, opts) + clients = _init_output_clients(opts) + + # Update watch parameters from reloaded config + mailbox_batch_size_value = ( + int(opts.mailbox_batch_size) + if opts.mailbox_batch_size is not None + else 10 + ) + mailbox_check_timeout_value = ( + int(opts.mailbox_check_timeout) + if opts.mailbox_check_timeout is not None + else 30 + ) + normalize_timespan_threshold_hours_value = ( + float(opts.normalize_timespan_threshold_hours) + if opts.normalize_timespan_threshold_hours is not None + else 24.0 + ) + + # Update log level + logger.setLevel(logging.ERROR) + if opts.warnings: + logger.setLevel(logging.WARNING) + if opts.verbose: + logger.setLevel(logging.INFO) + if opts.debug: + logger.setLevel(logging.DEBUG) + + logger.info("Configuration reloaded successfully") + except Exception: + logger.exception( + "Config reload failed, continuing with previous config" + ) + # Restore old opts + for k, v in vars(old_opts_snapshot).items(): + setattr(opts, k, v) if __name__ == "__main__": diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py index ac8f453..fd9fb64 100644 --- a/parsedmarc/mail/gmail.py +++ b/parsedmarc/mail/gmail.py @@ -175,11 +175,13 @@ class GmailConnection(MailboxConnection): # Not needed pass - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, should_reload=None): """Checks the mailbox for new messages every n seconds""" while True: sleep(check_timeout) check_callback(self) + if should_reload and should_reload(): + return @lru_cache(maxsize=10) def _find_label_id_for_label(self, label_name: str) -> str: diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index 05154f7..2b12e6e 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -278,11 +278,13 @@ class MSGraphConnection(MailboxConnection): # Not needed pass - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, should_reload=None): """Checks the mailbox for new messages every n seconds""" while True: sleep(check_timeout) check_callback(self) + if should_reload and should_reload(): + return @lru_cache(maxsize=10) def _find_folder_id_from_folder_path(self, folder_name: str) -> str: diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 3252807..d9d8bb4 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection): def keepalive(self): self._client.noop() - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, should_reload=None): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function @@ -111,3 +111,5 @@ class IMAPConnection(MailboxConnection): except Exception as e: logger.warning("IMAP connection error. {0}. Reconnecting...".format(e)) sleep(check_timeout) + if should_reload and should_reload(): + return diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py index 21f1d92..670af28 100644 --- a/parsedmarc/mail/mailbox_connection.py +++ b/parsedmarc/mail/mailbox_connection.py @@ -28,5 +28,5 @@ class MailboxConnection(ABC): def keepalive(self): raise NotImplementedError - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, should_reload=None): raise NotImplementedError diff --git a/parsedmarc/mail/maildir.py b/parsedmarc/mail/maildir.py index c3ea8ae..255cea0 100644 --- a/parsedmarc/mail/maildir.py +++ b/parsedmarc/mail/maildir.py @@ -63,10 +63,12 @@ class MaildirConnection(MailboxConnection): def keepalive(self): return - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, should_reload=None): while True: try: check_callback(self) except Exception as e: logger.warning("Maildir init error. {0}".format(e)) + if should_reload and should_reload(): + return sleep(check_timeout) diff --git a/tests.py b/tests.py index efc4b25..3c6c89a 100755 --- a/tests.py +++ b/tests.py @@ -1277,7 +1277,7 @@ class TestMailboxWatchSince(unittest.TestCase): def testWatchInboxPassesSinceToMailboxFetch(self): mailbox_connection = SimpleNamespace() - def fake_watch(check_callback, check_timeout): + def fake_watch(check_callback, check_timeout, should_reload=None): check_callback(mailbox_connection) raise _BreakLoop() @@ -1445,7 +1445,7 @@ mailbox = shared@example.com parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "certificate_path setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1517,7 +1517,7 @@ user = owner@example.com parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "password setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1674,7 +1674,7 @@ mailbox = shared@example.com parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "client_secret setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1706,7 +1706,7 @@ mailbox = shared@example.com parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "tenant_id setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1738,7 +1738,7 @@ tenant_id = tenant-id parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "mailbox setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1808,7 +1808,7 @@ mailbox = shared@example.com parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "tenant_id setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1839,7 +1839,7 @@ tenant_id = tenant-id parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "mailbox setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1871,7 +1871,7 @@ certificate_path = /tmp/msgraph-cert.pem parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "tenant_id setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called() @@ -1903,7 +1903,7 @@ certificate_path = /tmp/msgraph-cert.pem parsedmarc.cli._main() self.assertEqual(system_exit.exception.code, -1) - mock_logger.critical.assert_called_once_with( + mock_logger.error.assert_called_once_with( "mailbox setting missing from the msgraph config section" ) mock_graph_connection.assert_not_called()