From e82f3e58a11734dc430db41ee57da8af73258199 Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Sat, 21 Mar 2026 16:14:48 -0400 Subject: [PATCH] SIGHUP-based configuration reload for watch mode (#697) * Enhance mailbox connection watch method to support reload functionality - Updated the `watch` method in `GmailConnection`, `MSGraphConnection`, `IMAPConnection`, `MaildirConnection`, and the abstract `MailboxConnection` class to accept an optional `should_reload` parameter. This allows the method to check if a reload is necessary and exit the loop if so. - Modified related tests to accommodate the new method signature. - Changed logger calls from `critical` to `error` for consistency in logging severity. - Added a new settings file for Claude with specific permissions for testing and code checks. * Update parsedmarc/cli.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update parsedmarc/cli.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * [WIP] SIGHUP-based configuration reload for watch mode (#698) * Initial plan * Fix reload state consistency, resource leaks, stale opts; add tests Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/3c2e0bb9-7e2d-4efa-aef6-d2b98478b921 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * [WIP] SIGHUP-based configuration reload for watch mode (#699) * Initial plan * Fix review comments: ConfigurationError wrapping, duplicate parse args, bool parsing, Kafka required topics, should_reload kwarg, SIGHUP test skips Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/0779003c-ccbe-4d76-9748-801dbc238b96 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * SIGHUP-based configuration reload: address review feedback (#700) * Initial plan * Address review feedback: kafka_ssl, duplicate silent, exception chain, log file reload, should_reload timing Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/a8a43c55-23fa-4471-abe6-7ac966f381f9 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * Update parsedmarc/cli.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Best-effort initialization for optional output clients in watch mode (#701) * Initial plan * Wrap optional output client init in try/except for best-effort initialization Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/59241d4e-1b05-4a92-b2d2-e6d13d10a4fd --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * Fix SIGHUP reload tight-loop in watch mode (#702) * Initial plan * Fix _reload_requested tight-loop: reset flag before reload to capture concurrent SIGHUPs Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/879d0bb1-9037-41f7-bc89-f59611956d2e --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * Update parsedmarc/cli.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Fix resource leak when HEC config is invalid in `_init_output_clients()` (#703) * Initial plan * Fix resource leak: validate HEC settings before creating any output clients Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/38c73e09-789d-4d41-b75e-bbc61418859d --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * Ensure SIGHUP never triggers a new email batch across all watch() implementations (#704) * Initial plan * Ensure SIGHUP never starts a new email batch in any watch() implementation Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/45d5be30-8f6b-4200-9bdd-15c655033f17 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * SIGHUP-based config reload for watch mode: address review feedback (#705) * Initial plan * Address review feedback: Kafka SSL context, SIGHUP handler safety, test formatting Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> Agent-Logs-Url: https://github.com/domainaware/parsedmarc/sessions/8f2fd48f-32a4-4258-9a89-06f7c7ac29bf --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com> * Reverted changes by copilot that turned errors into warnings * Enhance usage documentation for config reload: clarify behavior on successful reload and error handling * Update CHANGELOG.md to reflect config reload enhancements * Add pytest command to settings for silent output during testing * Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes. * Update changelog to not include fixes within the same unreleased version * Refactor changelog entries for clarity and consistency in configuration reload section * Fix changelog entry for msgraph configuration check * Update CHANGELOG..md * make single list items on one line in the changelog instead of doing hard wraps * Remove incorrect IMAP changes * Rename 'should_reload' parameter to 'config_reloading' in mailbox connection methods for clarity * Restore startup configuration checks * Improve error logging for Elasticsearch and OpenSearch exceptions * Bump version to 9.3.0 in constants.py * Refactor GelfClient methods to use specific report types instead of generic dicts * Refactor tests to use assertions consistently and improve type hints --------- Co-authored-by: Sean Whalen Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> --- .claude/settings.json | 17 + CHANGELOG.md | 20 +- docs/source/usage.md | 49 +- parsedmarc/__init__.py | 12 +- parsedmarc/cli.py | 1909 ++++++++++++++----------- parsedmarc/constants.py | 2 +- parsedmarc/gelf.py | 16 +- parsedmarc/kafkaclient.py | 4 + parsedmarc/mail/gmail.py | 6 +- parsedmarc/mail/graph.py | 6 +- parsedmarc/mail/imap.py | 6 +- parsedmarc/mail/mailbox_connection.py | 2 +- parsedmarc/mail/maildir.py | 6 +- parsedmarc/s3.py | 8 + parsedmarc/splunk.py | 4 + parsedmarc/syslog.py | 9 +- parsedmarc/webhook.py | 4 + tests.py | 372 ++++- 18 files changed, 1550 insertions(+), 902 deletions(-) create mode 100644 .claude/settings.json diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..fc59110 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,17 @@ +{ + "permissions": { + "allow": [ + "Bash(python -c \"import py_compile; py_compile.compile\\(''parsedmarc/cli.py'', doraise=True\\)\")", + "Bash(ruff check:*)", + "Bash(ruff format:*)", + "Bash(GITHUB_ACTIONS=true pytest --cov tests.py)", + "Bash(ls tests*)", + "Bash(GITHUB_ACTIONS=true python -m pytest --cov tests.py -x)", + "Bash(GITHUB_ACTIONS=true python -m pytest tests.py -x -v)", + "Bash(python -m pytest tests.py --no-header -q)" + ], + "additionalDirectories": [ + "/tmp" + ] + } +} diff --git a/CHANGELOG.md b/CHANGELOG.md index db0de04..1b783d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,28 @@ # Changelog +## 9.3.0 + +### Added + +- SIGHUP-based configuration reload for watch mode — update output destinations, DNS/GeoIP settings, processing flags, and log level without restarting the service or interrupting in-progress report processing. + - Use `systemctl reload parsedmarc` when running under `systemd`. + - On a successful reload, old output clients are closed and recreated. + - On a failed reload, the previous configuration remains fully active. +- `close()` methods on `GelfClient`, `KafkaClient`, `SyslogClient`, `WebhookClient`, HECClient, and `S3Client` for clean resource teardown on reload. +- `config_reloading` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload. +- Elasticsearch and OpenSearch connections are now tracked and cleaned up on reload via `_close_output_clients()`. +- Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication. + +### Fixed + +- `get_index_prefix()` crashed on forensic reports with `TypeError` due to `report()` instead of `report[]` dict access. +- Missing `exit(1)` after IMAP user/password validation failure allowed execution to continue with `None` credentials. + ## 9.2.1 ### Added -- Better checking of `msconfig` configuration (PR #695) +- Better checking of `msgraph` configuration (PR #695) ### Changed diff --git a/docs/source/usage.md b/docs/source/usage.md index c97c872..2883f13 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -404,6 +404,7 @@ The full set of configuration options are: retry_attempts = 3 retry_delay = 5 ``` + - `gmail_api` - `credentials_file` - str: Path to file containing the credentials, None to disable (Default: `None`) @@ -442,7 +443,7 @@ The full set of configuration options are: - `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR :::{note} - Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). + Information regarding the setup of the Data Collection Rule can be found [in the Azure documentation](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). ::: - `gelf` - `host` - str: The GELF server name or IP address @@ -602,6 +603,7 @@ After=network.target network-online.target elasticsearch.service [Service] ExecStart=/opt/parsedmarc/venv/bin/parsedmarc -c /etc/parsedmarc.ini +ExecReload=/bin/kill -HUP $MAINPID User=parsedmarc Group=parsedmarc Restart=always @@ -634,6 +636,51 @@ sudo service parsedmarc restart ::: +### Reloading configuration without restarting + +When running in watch mode, `parsedmarc` supports reloading its +configuration file without restarting the service or interrupting +report processing that is already in progress. Send a `SIGHUP` signal +to the process, or use `systemctl reload` if the unit file includes +the `ExecReload` line shown above: + +```bash +sudo systemctl reload parsedmarc +``` + +The reload takes effect after the current batch of reports finishes +processing and all output operations (Elasticsearch, Kafka, S3, etc.) +for that batch have completed. The following settings are reloaded: + +- All output destinations (Elasticsearch, OpenSearch, Kafka, S3, + Splunk, syslog, GELF, webhooks, Log Analytics) +- Multi-tenant index prefix domain map (`index_prefix_domain_map` — + the referenced YAML file is re-read on reload) +- DNS and GeoIP settings (`nameservers`, `dns_timeout`, `ip_db_path`, + `offline`, etc.) +- Processing flags (`strip_attachment_payloads`, `batch_size`, + `check_timeout`, etc.) +- Log level (`debug`, `verbose`, `warnings`, `silent`) + +Mailbox connection settings (IMAP host/credentials, Microsoft Graph, +Gmail API, Maildir path) are **not** reloaded — changing those still +requires a full restart. + +On a **successful** reload, existing output client connections are +closed and new ones are created from the updated configuration. The +service then resumes watching with the new settings. + +If the new configuration file contains errors (missing required +settings, unreachable output destinations, etc.), the **entire reload +is aborted** — no output clients are replaced and the previous +configuration remains fully active. This means a typo in one section +will not take down an otherwise working setup. Check the logs for +details: + +```bash +journalctl -u parsedmarc.service -r +``` + To check the status of the service, run: ```bash diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 2188f91..1b9ddfc 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -2195,6 +2195,7 @@ def watch_inbox( batch_size: int = 10, since: Optional[Union[datetime, date, str]] = None, normalize_timespan_threshold_hours: float = 24, + config_reloading: Optional[Callable] = None, ): """ Watches the mailbox for new messages and @@ -2222,6 +2223,8 @@ def watch_inbox( batch_size (int): Number of messages to read and process before saving since: Search for messages since certain time normalize_timespan_threshold_hours (float): Normalize timespans beyond this + config_reloading: Optional callable that returns True when a config + reload has been requested (e.g. via SIGHUP) """ def check_callback(connection): @@ -2246,7 +2249,14 @@ def watch_inbox( ) callback(res) - mailbox_connection.watch(check_callback=check_callback, check_timeout=check_timeout) + watch_kwargs: dict = { + "check_callback": check_callback, + "check_timeout": check_timeout, + } + if config_reloading is not None: + watch_kwargs["config_reloading"] = config_reloading + + mailbox_connection.watch(**watch_kwargs) def append_json( diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 402e45e..b9ae3da 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -7,6 +7,7 @@ import http.client import json import logging import os +import signal import sys from argparse import ArgumentParser, Namespace from configparser import ConfigParser @@ -165,6 +166,862 @@ def cli_parse( conn.close() +class ConfigurationError(Exception): + """Raised when a configuration file has missing or invalid settings.""" + + pass + + +def _parse_config_file(config_file, opts): + """Parse a config file and update opts in place. + + Args: + config_file: Path to the .ini config file + opts: Namespace object to update with parsed values + + Returns: + index_prefix_domain_map or None + + Raises: + ConfigurationError: If required settings are missing or invalid. + """ + abs_path = os.path.abspath(config_file) + if not os.path.exists(abs_path): + raise ConfigurationError("A file does not exist at {0}".format(abs_path)) + opts.silent = True + config = ConfigParser() + index_prefix_domain_map = None + config.read(config_file) + if "general" in config.sections(): + general_config = config["general"] + if "silent" in general_config: + opts.silent = bool(general_config.getboolean("silent")) + if "normalize_timespan_threshold_hours" in general_config: + opts.normalize_timespan_threshold_hours = general_config.getfloat( + "normalize_timespan_threshold_hours" + ) + if "index_prefix_domain_map" in general_config: + with open(general_config["index_prefix_domain_map"]) as f: + index_prefix_domain_map = yaml.safe_load(f) + if "offline" in general_config: + opts.offline = bool(general_config.getboolean("offline")) + if "strip_attachment_payloads" in general_config: + opts.strip_attachment_payloads = bool( + general_config.getboolean("strip_attachment_payloads") + ) + if "output" in general_config: + opts.output = general_config["output"] + if "aggregate_json_filename" in general_config: + opts.aggregate_json_filename = general_config["aggregate_json_filename"] + if "forensic_json_filename" in general_config: + opts.forensic_json_filename = general_config["forensic_json_filename"] + if "smtp_tls_json_filename" in general_config: + opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"] + if "aggregate_csv_filename" in general_config: + opts.aggregate_csv_filename = general_config["aggregate_csv_filename"] + if "forensic_csv_filename" in general_config: + opts.forensic_csv_filename = general_config["forensic_csv_filename"] + if "smtp_tls_csv_filename" in general_config: + opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"] + if "dns_timeout" in general_config: + opts.dns_timeout = general_config.getfloat("dns_timeout") + if opts.dns_timeout is None: + opts.dns_timeout = 2 + if "dns_test_address" in general_config: + opts.dns_test_address = general_config["dns_test_address"] + if "nameservers" in general_config: + opts.nameservers = _str_to_list(general_config["nameservers"]) + # nameservers pre-flight check + dummy_hostname = None + try: + dummy_hostname = get_reverse_dns( + opts.dns_test_address, + nameservers=opts.nameservers, + timeout=opts.dns_timeout, + ) + except Exception as ns_error: + raise ConfigurationError( + "DNS pre-flight check failed: {}".format(ns_error) + ) from ns_error + if not dummy_hostname: + raise ConfigurationError( + "DNS pre-flight check failed: no PTR record for {} from {}".format( + opts.dns_test_address, opts.nameservers + ) + ) + if "save_aggregate" in general_config: + opts.save_aggregate = bool(general_config.getboolean("save_aggregate")) + if "save_forensic" in general_config: + opts.save_forensic = bool(general_config.getboolean("save_forensic")) + if "save_smtp_tls" in general_config: + opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls")) + if "debug" in general_config: + opts.debug = bool(general_config.getboolean("debug")) + if "verbose" in general_config: + opts.verbose = bool(general_config.getboolean("verbose")) + if "warnings" in general_config: + opts.warnings = bool(general_config.getboolean("warnings")) + if "fail_on_output_error" in general_config: + opts.fail_on_output_error = bool( + general_config.getboolean("fail_on_output_error") + ) + if "log_file" in general_config: + opts.log_file = general_config["log_file"] + if "n_procs" in general_config: + opts.n_procs = general_config.getint("n_procs") + if "ip_db_path" in general_config: + opts.ip_db_path = general_config["ip_db_path"] + else: + opts.ip_db_path = None + if "always_use_local_files" in general_config: + opts.always_use_local_files = bool( + general_config.getboolean("always_use_local_files") + ) + if "local_reverse_dns_map_path" in general_config: + opts.reverse_dns_map_path = general_config["local_reverse_dns_map_path"] + if "reverse_dns_map_url" in general_config: + opts.reverse_dns_map_url = general_config["reverse_dns_map_url"] + if "prettify_json" in general_config: + opts.prettify_json = bool(general_config.getboolean("prettify_json")) + + if "mailbox" in config.sections(): + mailbox_config = config["mailbox"] + if "msgraph" in config.sections(): + opts.mailbox_reports_folder = "Inbox" + if "reports_folder" in mailbox_config: + opts.mailbox_reports_folder = mailbox_config["reports_folder"] + if "archive_folder" in mailbox_config: + opts.mailbox_archive_folder = mailbox_config["archive_folder"] + if "watch" in mailbox_config: + opts.mailbox_watch = bool(mailbox_config.getboolean("watch")) + if "delete" in mailbox_config: + opts.mailbox_delete = bool(mailbox_config.getboolean("delete")) + if "test" in mailbox_config: + opts.mailbox_test = bool(mailbox_config.getboolean("test")) + if "batch_size" in mailbox_config: + opts.mailbox_batch_size = mailbox_config.getint("batch_size") + if "check_timeout" in mailbox_config: + opts.mailbox_check_timeout = mailbox_config.getint("check_timeout") + if "since" in mailbox_config: + opts.mailbox_since = mailbox_config["since"] + + if "imap" in config.sections(): + imap_config = config["imap"] + if "watch" in imap_config: + logger.warning( + "Starting in 8.0.0, the watch option has been " + "moved from the imap configuration section to " + "the mailbox configuration section." + ) + if "host" in imap_config: + opts.imap_host = imap_config["host"] + else: + raise ConfigurationError( + "host setting missing from the imap config section" + ) + if "port" in imap_config: + opts.imap_port = imap_config.getint("port") + if "timeout" in imap_config: + opts.imap_timeout = imap_config.getint("timeout") + if "max_retries" in imap_config: + opts.imap_max_retries = imap_config.getint("max_retries") + if "ssl" in imap_config: + opts.imap_ssl = bool(imap_config.getboolean("ssl")) + if "skip_certificate_verification" in imap_config: + opts.imap_skip_certificate_verification = bool( + imap_config.getboolean("skip_certificate_verification") + ) + if "user" in imap_config: + opts.imap_user = imap_config["user"] + else: + raise ConfigurationError( + "user setting missing from the imap config section" + ) + if "password" in imap_config: + opts.imap_password = imap_config["password"] + else: + raise ConfigurationError( + "password setting missing from the imap config section" + ) + if "reports_folder" in imap_config: + opts.mailbox_reports_folder = imap_config["reports_folder"] + logger.warning( + "Use of the reports_folder option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "archive_folder" in imap_config: + opts.mailbox_archive_folder = imap_config["archive_folder"] + logger.warning( + "Use of the archive_folder option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "watch" in imap_config: + opts.mailbox_watch = bool(imap_config.getboolean("watch")) + logger.warning( + "Use of the watch option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "delete" in imap_config: + logger.warning( + "Use of the delete option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "test" in imap_config: + opts.mailbox_test = bool(imap_config.getboolean("test")) + logger.warning( + "Use of the test option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + if "batch_size" in imap_config: + opts.mailbox_batch_size = imap_config.getint("batch_size") + logger.warning( + "Use of the batch_size option in the imap " + "configuration section has been deprecated. " + "Use this option in the mailbox configuration " + "section instead." + ) + + if "msgraph" in config.sections(): + graph_config = config["msgraph"] + opts.graph_token_file = graph_config.get("token_file", ".token") + + if "auth_method" not in graph_config: + logger.info( + "auth_method setting missing from the " + "msgraph config section " + "defaulting to UsernamePassword" + ) + opts.graph_auth_method = AuthMethod.UsernamePassword.name + else: + opts.graph_auth_method = graph_config["auth_method"] + + if opts.graph_auth_method == AuthMethod.UsernamePassword.name: + if "user" in graph_config: + opts.graph_user = graph_config["user"] + else: + raise ConfigurationError( + "user setting missing from the msgraph config section" + ) + if "password" in graph_config: + opts.graph_password = graph_config["password"] + else: + raise ConfigurationError( + "password setting missing from the msgraph config section" + ) + if "client_secret" in graph_config: + opts.graph_client_secret = graph_config["client_secret"] + else: + raise ConfigurationError( + "client_secret setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.DeviceCode.name: + if "user" in graph_config: + opts.graph_user = graph_config["user"] + + if opts.graph_auth_method != AuthMethod.UsernamePassword.name: + if "tenant_id" in graph_config: + opts.graph_tenant_id = graph_config["tenant_id"] + else: + raise ConfigurationError( + "tenant_id setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.ClientSecret.name: + if "client_secret" in graph_config: + opts.graph_client_secret = graph_config["client_secret"] + else: + raise ConfigurationError( + "client_secret setting missing from the msgraph config section" + ) + + if opts.graph_auth_method == AuthMethod.Certificate.name: + if "certificate_path" in graph_config: + opts.graph_certificate_path = graph_config["certificate_path"] + else: + raise ConfigurationError( + "certificate_path setting missing from the msgraph config section" + ) + if "certificate_password" in graph_config: + opts.graph_certificate_password = graph_config["certificate_password"] + + if "client_id" in graph_config: + opts.graph_client_id = graph_config["client_id"] + else: + raise ConfigurationError( + "client_id setting missing from the msgraph config section" + ) + + if "mailbox" in graph_config: + opts.graph_mailbox = graph_config["mailbox"] + elif opts.graph_auth_method != AuthMethod.UsernamePassword.name: + raise ConfigurationError( + "mailbox setting missing from the msgraph config section" + ) + + if "graph_url" in graph_config: + opts.graph_url = graph_config["graph_url"] + + if "allow_unencrypted_storage" in graph_config: + opts.graph_allow_unencrypted_storage = bool( + graph_config.getboolean("allow_unencrypted_storage") + ) + + if "elasticsearch" in config: + elasticsearch_config = config["elasticsearch"] + if "hosts" in elasticsearch_config: + opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the elasticsearch config section" + ) + if "timeout" in elasticsearch_config: + timeout = elasticsearch_config.getfloat("timeout") + opts.elasticsearch_timeout = timeout + if "number_of_shards" in elasticsearch_config: + number_of_shards = elasticsearch_config.getint("number_of_shards") + opts.elasticsearch_number_of_shards = number_of_shards + if "number_of_replicas" in elasticsearch_config: + number_of_replicas = elasticsearch_config.getint("number_of_replicas") + opts.elasticsearch_number_of_replicas = number_of_replicas + if "index_suffix" in elasticsearch_config: + opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"] + if "index_prefix" in elasticsearch_config: + opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"] + if "monthly_indexes" in elasticsearch_config: + monthly = bool(elasticsearch_config.getboolean("monthly_indexes")) + opts.elasticsearch_monthly_indexes = monthly + if "ssl" in elasticsearch_config: + opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) + if "cert_path" in elasticsearch_config: + opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] + if "user" in elasticsearch_config: + opts.elasticsearch_username = elasticsearch_config["user"] + if "password" in elasticsearch_config: + opts.elasticsearch_password = elasticsearch_config["password"] + # Until 8.20 + if "apiKey" in elasticsearch_config: + opts.elasticsearch_api_key = elasticsearch_config["apiKey"] + # Since 8.20 + if "api_key" in elasticsearch_config: + opts.elasticsearch_api_key = elasticsearch_config["api_key"] + + if "opensearch" in config: + opensearch_config = config["opensearch"] + if "hosts" in opensearch_config: + opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the opensearch config section" + ) + if "timeout" in opensearch_config: + timeout = opensearch_config.getfloat("timeout") + opts.opensearch_timeout = timeout + if "number_of_shards" in opensearch_config: + number_of_shards = opensearch_config.getint("number_of_shards") + opts.opensearch_number_of_shards = number_of_shards + if "number_of_replicas" in opensearch_config: + number_of_replicas = opensearch_config.getint("number_of_replicas") + opts.opensearch_number_of_replicas = number_of_replicas + if "index_suffix" in opensearch_config: + opts.opensearch_index_suffix = opensearch_config["index_suffix"] + if "index_prefix" in opensearch_config: + opts.opensearch_index_prefix = opensearch_config["index_prefix"] + if "monthly_indexes" in opensearch_config: + monthly = bool(opensearch_config.getboolean("monthly_indexes")) + opts.opensearch_monthly_indexes = monthly + if "ssl" in opensearch_config: + opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) + if "cert_path" in opensearch_config: + opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] + if "user" in opensearch_config: + opts.opensearch_username = opensearch_config["user"] + if "password" in opensearch_config: + opts.opensearch_password = opensearch_config["password"] + # Until 8.20 + if "apiKey" in opensearch_config: + opts.opensearch_api_key = opensearch_config["apiKey"] + # Since 8.20 + if "api_key" in opensearch_config: + opts.opensearch_api_key = opensearch_config["api_key"] + if "auth_type" in opensearch_config: + opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower() + elif "authentication_type" in opensearch_config: + opts.opensearch_auth_type = ( + opensearch_config["authentication_type"].strip().lower() + ) + if "aws_region" in opensearch_config: + opts.opensearch_aws_region = opensearch_config["aws_region"].strip() + if "aws_service" in opensearch_config: + opts.opensearch_aws_service = opensearch_config["aws_service"].strip() + + if "splunk_hec" in config.sections(): + hec_config = config["splunk_hec"] + if "url" in hec_config: + opts.hec = hec_config["url"] + else: + raise ConfigurationError( + "url setting missing from the splunk_hec config section" + ) + if "token" in hec_config: + opts.hec_token = hec_config["token"] + else: + raise ConfigurationError( + "token setting missing from the splunk_hec config section" + ) + if "index" in hec_config: + opts.hec_index = hec_config["index"] + else: + raise ConfigurationError( + "index setting missing from the splunk_hec config section" + ) + if "skip_certificate_verification" in hec_config: + opts.hec_skip_certificate_verification = bool( + hec_config.getboolean("skip_certificate_verification", fallback=False) + ) + + if "kafka" in config.sections(): + kafka_config = config["kafka"] + if "hosts" in kafka_config: + opts.kafka_hosts = _str_to_list(kafka_config["hosts"]) + else: + raise ConfigurationError( + "hosts setting missing from the kafka config section" + ) + if "user" in kafka_config: + opts.kafka_username = kafka_config["user"] + if "password" in kafka_config: + opts.kafka_password = kafka_config["password"] + if "ssl" in kafka_config: + opts.kafka_ssl = bool(kafka_config.getboolean("ssl")) + if "skip_certificate_verification" in kafka_config: + kafka_verify = bool( + kafka_config.getboolean("skip_certificate_verification") + ) + opts.kafka_skip_certificate_verification = kafka_verify + if "aggregate_topic" in kafka_config: + opts.kafka_aggregate_topic = kafka_config["aggregate_topic"] + else: + raise ConfigurationError( + "aggregate_topic setting missing from the kafka config section" + ) + if "forensic_topic" in kafka_config: + opts.kafka_forensic_topic = kafka_config["forensic_topic"] + else: + raise ConfigurationError( + "forensic_topic setting missing from the kafka config section" + ) + if "smtp_tls_topic" in kafka_config: + opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"] + else: + raise ConfigurationError( + "smtp_tls_topic setting missing from the kafka config section" + ) + + if "smtp" in config.sections(): + smtp_config = config["smtp"] + if "host" in smtp_config: + opts.smtp_host = smtp_config["host"] + else: + raise ConfigurationError( + "host setting missing from the smtp config section" + ) + if "port" in smtp_config: + opts.smtp_port = smtp_config.getint("port") + if "ssl" in smtp_config: + opts.smtp_ssl = bool(smtp_config.getboolean("ssl")) + if "skip_certificate_verification" in smtp_config: + smtp_verify = bool(smtp_config.getboolean("skip_certificate_verification")) + opts.smtp_skip_certificate_verification = smtp_verify + if "user" in smtp_config: + opts.smtp_user = smtp_config["user"] + else: + raise ConfigurationError( + "user setting missing from the smtp config section" + ) + if "password" in smtp_config: + opts.smtp_password = smtp_config["password"] + else: + raise ConfigurationError( + "password setting missing from the smtp config section" + ) + if "from" in smtp_config: + opts.smtp_from = smtp_config["from"] + else: + logger.critical("from setting missing from the smtp config section") + if "to" in smtp_config: + opts.smtp_to = _str_to_list(smtp_config["to"]) + else: + logger.critical("to setting missing from the smtp config section") + if "subject" in smtp_config: + opts.smtp_subject = smtp_config["subject"] + if "attachment" in smtp_config: + opts.smtp_attachment = smtp_config["attachment"] + if "message" in smtp_config: + opts.smtp_message = smtp_config["message"] + + if "s3" in config.sections(): + s3_config = config["s3"] + if "bucket" in s3_config: + opts.s3_bucket = s3_config["bucket"] + else: + raise ConfigurationError( + "bucket setting missing from the s3 config section" + ) + if "path" in s3_config: + opts.s3_path = s3_config["path"] + if opts.s3_path.startswith("/"): + opts.s3_path = opts.s3_path[1:] + if opts.s3_path.endswith("/"): + opts.s3_path = opts.s3_path[:-1] + else: + opts.s3_path = "" + + if "region_name" in s3_config: + opts.s3_region_name = s3_config["region_name"] + if "endpoint_url" in s3_config: + opts.s3_endpoint_url = s3_config["endpoint_url"] + if "access_key_id" in s3_config: + opts.s3_access_key_id = s3_config["access_key_id"] + if "secret_access_key" in s3_config: + opts.s3_secret_access_key = s3_config["secret_access_key"] + + if "syslog" in config.sections(): + syslog_config = config["syslog"] + if "server" in syslog_config: + opts.syslog_server = syslog_config["server"] + else: + raise ConfigurationError( + "server setting missing from the syslog config section" + ) + if "port" in syslog_config: + opts.syslog_port = syslog_config["port"] + else: + opts.syslog_port = 514 + if "protocol" in syslog_config: + opts.syslog_protocol = syslog_config["protocol"] + else: + opts.syslog_protocol = "udp" + if "cafile_path" in syslog_config: + opts.syslog_cafile_path = syslog_config["cafile_path"] + if "certfile_path" in syslog_config: + opts.syslog_certfile_path = syslog_config["certfile_path"] + if "keyfile_path" in syslog_config: + opts.syslog_keyfile_path = syslog_config["keyfile_path"] + if "timeout" in syslog_config: + opts.syslog_timeout = float(syslog_config["timeout"]) + else: + opts.syslog_timeout = 5.0 + if "retry_attempts" in syslog_config: + opts.syslog_retry_attempts = int(syslog_config["retry_attempts"]) + else: + opts.syslog_retry_attempts = 3 + if "retry_delay" in syslog_config: + opts.syslog_retry_delay = int(syslog_config["retry_delay"]) + else: + opts.syslog_retry_delay = 5 + + if "gmail_api" in config.sections(): + gmail_api_config = config["gmail_api"] + opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file") + opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") + opts.gmail_api_include_spam_trash = bool( + gmail_api_config.getboolean("include_spam_trash", False) + ) + opts.gmail_api_paginate_messages = bool( + gmail_api_config.getboolean("paginate_messages", True) + ) + default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify" + opts.gmail_api_scopes = gmail_api_config.get("scopes", default_gmail_api_scope) + opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes) + if "oauth2_port" in gmail_api_config: + opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080) + if "auth_mode" in gmail_api_config: + opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip() + if "service_account_user" in gmail_api_config: + opts.gmail_api_service_account_user = gmail_api_config[ + "service_account_user" + ].strip() + elif "delegated_user" in gmail_api_config: + opts.gmail_api_service_account_user = gmail_api_config[ + "delegated_user" + ].strip() + + if "maildir" in config.sections(): + maildir_api_config = config["maildir"] + opts.maildir_path = maildir_api_config.get("maildir_path") + opts.maildir_create = bool( + maildir_api_config.getboolean("maildir_create", fallback=False) + ) + + if "log_analytics" in config.sections(): + log_analytics_config = config["log_analytics"] + opts.la_client_id = log_analytics_config.get("client_id") + opts.la_client_secret = log_analytics_config.get("client_secret") + opts.la_tenant_id = log_analytics_config.get("tenant_id") + opts.la_dce = log_analytics_config.get("dce") + opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id") + opts.la_dcr_aggregate_stream = log_analytics_config.get("dcr_aggregate_stream") + opts.la_dcr_forensic_stream = log_analytics_config.get("dcr_forensic_stream") + opts.la_dcr_smtp_tls_stream = log_analytics_config.get("dcr_smtp_tls_stream") + + if "gelf" in config.sections(): + gelf_config = config["gelf"] + if "host" in gelf_config: + opts.gelf_host = gelf_config["host"] + else: + raise ConfigurationError( + "host setting missing from the gelf config section" + ) + if "port" in gelf_config: + opts.gelf_port = gelf_config["port"] + else: + raise ConfigurationError( + "port setting missing from the gelf config section" + ) + if "mode" in gelf_config: + opts.gelf_mode = gelf_config["mode"] + else: + raise ConfigurationError( + "mode setting missing from the gelf config section" + ) + + if "webhook" in config.sections(): + webhook_config = config["webhook"] + if "aggregate_url" in webhook_config: + opts.webhook_aggregate_url = webhook_config["aggregate_url"] + if "forensic_url" in webhook_config: + opts.webhook_forensic_url = webhook_config["forensic_url"] + if "smtp_tls_url" in webhook_config: + opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"] + if "timeout" in webhook_config: + opts.webhook_timeout = webhook_config.getint("timeout") + + return index_prefix_domain_map + + +class _ElasticsearchHandle: + """Sentinel so Elasticsearch participates in _close_output_clients.""" + + def close(self): + try: + conn = elastic.connections.get_connection() + if not isinstance(conn, str): + conn.close() + except Exception: + pass + try: + elastic.connections.remove_connection("default") + except Exception: + pass + + +class _OpenSearchHandle: + """Sentinel so OpenSearch participates in _close_output_clients.""" + + def close(self): + try: + conn = opensearch.connections.get_connection() + if not isinstance(conn, str): + conn.close() + except Exception: + pass + try: + opensearch.connections.remove_connection("default") + except Exception: + pass + + +def _init_output_clients(opts): + """Create output clients based on current opts. + + Returns: + dict of client instances keyed by name. + + Raises: + ConfigurationError: If a required output client cannot be created. + """ + clients = {} + + if opts.s3_bucket: + clients["s3_client"] = s3.S3Client( + bucket_name=opts.s3_bucket, + bucket_path=opts.s3_path, + region_name=opts.s3_region_name, + endpoint_url=opts.s3_endpoint_url, + access_key_id=opts.s3_access_key_id, + secret_access_key=opts.s3_secret_access_key, + ) + + if opts.syslog_server: + clients["syslog_client"] = syslog.SyslogClient( + server_name=opts.syslog_server, + server_port=int(opts.syslog_port), + protocol=opts.syslog_protocol or "udp", + cafile_path=opts.syslog_cafile_path, + certfile_path=opts.syslog_certfile_path, + keyfile_path=opts.syslog_keyfile_path, + timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, + retry_attempts=opts.syslog_retry_attempts + if opts.syslog_retry_attempts is not None + else 3, + retry_delay=opts.syslog_retry_delay + if opts.syslog_retry_delay is not None + else 5, + ) + + if opts.hec: + if opts.hec_token is None or opts.hec_index is None: + raise ConfigurationError( + "HEC token and HEC index are required when using HEC URL" + ) + verify = True + if opts.hec_skip_certificate_verification: + verify = False + clients["hec_client"] = splunk.HECClient( + opts.hec, opts.hec_token, opts.hec_index, verify=verify + ) + + if opts.kafka_hosts: + ssl_context = None + if opts.kafka_skip_certificate_verification: + logger.debug("Skipping Kafka certificate verification") + ssl_context = create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = CERT_NONE + clients["kafka_client"] = kafkaclient.KafkaClient( + opts.kafka_hosts, + username=opts.kafka_username, + password=opts.kafka_password, + ssl_context=ssl_context, + ) + + if opts.gelf_host: + clients["gelf_client"] = gelf.GelfClient( + host=opts.gelf_host, + port=int(opts.gelf_port), + mode=opts.gelf_mode, + ) + + if ( + opts.webhook_aggregate_url + or opts.webhook_forensic_url + or opts.webhook_smtp_tls_url + ): + clients["webhook_client"] = webhook.WebhookClient( + aggregate_url=opts.webhook_aggregate_url, + forensic_url=opts.webhook_forensic_url, + smtp_tls_url=opts.webhook_smtp_tls_url, + timeout=opts.webhook_timeout, + ) + + # Elasticsearch and OpenSearch mutate module-level global state via + # connections.create_connection(), which cannot be rolled back if a later + # step fails. Initialise them last so that all other clients are created + # successfully first; this minimises the window for partial-init problems + # during config reload. + if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: + if opts.elasticsearch_hosts: + es_aggregate_index = "dmarc_aggregate" + es_forensic_index = "dmarc_forensic" + es_smtp_tls_index = "smtp_tls" + if opts.elasticsearch_index_suffix: + suffix = opts.elasticsearch_index_suffix + es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) + es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) + es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) + if opts.elasticsearch_index_prefix: + prefix = opts.elasticsearch_index_prefix + es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) + es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) + es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) + elastic_timeout_value = ( + float(opts.elasticsearch_timeout) + if opts.elasticsearch_timeout is not None + else 60.0 + ) + elastic.set_hosts( + opts.elasticsearch_hosts, + use_ssl=opts.elasticsearch_ssl, + ssl_cert_path=opts.elasticsearch_ssl_cert_path, + username=opts.elasticsearch_username, + password=opts.elasticsearch_password, + api_key=opts.elasticsearch_api_key, + timeout=elastic_timeout_value, + ) + elastic.migrate_indexes( + aggregate_indexes=[es_aggregate_index], + forensic_indexes=[es_forensic_index], + ) + clients["elasticsearch"] = _ElasticsearchHandle() + + if opts.opensearch_hosts: + os_aggregate_index = "dmarc_aggregate" + os_forensic_index = "dmarc_forensic" + os_smtp_tls_index = "smtp_tls" + if opts.opensearch_index_suffix: + suffix = opts.opensearch_index_suffix + os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) + os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) + os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) + if opts.opensearch_index_prefix: + prefix = opts.opensearch_index_prefix + os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) + os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) + os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) + opensearch_timeout_value = ( + float(opts.opensearch_timeout) + if opts.opensearch_timeout is not None + else 60.0 + ) + opensearch.set_hosts( + opts.opensearch_hosts, + use_ssl=opts.opensearch_ssl, + ssl_cert_path=opts.opensearch_ssl_cert_path, + username=opts.opensearch_username, + password=opts.opensearch_password, + api_key=opts.opensearch_api_key, + timeout=opensearch_timeout_value, + auth_type=opts.opensearch_auth_type, + aws_region=opts.opensearch_aws_region, + aws_service=opts.opensearch_aws_service, + ) + opensearch.migrate_indexes( + aggregate_indexes=[os_aggregate_index], + forensic_indexes=[os_forensic_index], + ) + clients["opensearch"] = _OpenSearchHandle() + + return clients + + +def _close_output_clients(clients): + """Close output clients that hold persistent connections. + + Clients that do not expose a ``close`` method are silently skipped. + Errors during closing are logged as warnings and do not propagate. + + Args: + clients: dict of client instances returned by :func:`_init_output_clients`. + """ + for name, client in clients.items(): + if hasattr(client, "close"): + try: + client.close() + except Exception: + logger.warning("Error closing %s", name, exc_info=True) + + def _main(): """Called when the module is executed""" @@ -175,7 +1032,7 @@ def _main(): if "policy_published" in report: domain = report["policy_published"]["domain"] elif "reported_domain" in report: - domain = report("reported_domain") + domain = report["reported_domain"] elif "policies" in report: domain = report["policies"][0]["domain"] if domain: @@ -219,6 +1076,18 @@ def _main(): forensic_csv_filename=opts.forensic_csv_filename, smtp_tls_csv_filename=opts.smtp_tls_csv_filename, ) + + kafka_client = clients.get("kafka_client") + s3_client = clients.get("s3_client") + syslog_client = clients.get("syslog_client") + hec_client = clients.get("hec_client") + gelf_client = clients.get("gelf_client") + webhook_client = clients.get("webhook_client") + + kafka_aggregate_topic = opts.kafka_aggregate_topic + kafka_forensic_topic = opts.kafka_forensic_topic + kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic + if opts.save_aggregate: for report in reports_["aggregate_reports"]: try: @@ -262,7 +1131,7 @@ def _main(): log_output_error("OpenSearch exception", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_aggregate_reports_to_kafka( report, kafka_aggregate_topic ) @@ -270,25 +1139,25 @@ def _main(): log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_aggregate_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_aggregate_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_aggregate_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_aggregate_url: + if opts.webhook_aggregate_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_aggregate_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -296,7 +1165,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: aggregate_reports_ = reports_["aggregate_reports"] if len(aggregate_reports_) > 0: @@ -347,7 +1216,7 @@ def _main(): log_output_error("Invalid DMARC report", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_forensic_reports_to_kafka( report, kafka_forensic_topic ) @@ -355,25 +1224,25 @@ def _main(): log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_forensic_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_forensic_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_forensic_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_forensic_url: + if opts.webhook_forensic_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_forensic_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -381,7 +1250,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: forensic_reports_ = reports_["forensic_reports"] if len(forensic_reports_) > 0: @@ -432,33 +1301,33 @@ def _main(): log_output_error("Invalid DMARC report", error_.__str__()) try: - if opts.kafka_hosts: + if kafka_client: kafka_client.save_smtp_tls_reports_to_kafka( - smtp_tls_reports, kafka_smtp_tls_topic + [report], kafka_smtp_tls_topic ) except Exception as error_: log_output_error("Kafka", error_.__str__()) try: - if opts.s3_bucket: + if s3_client: s3_client.save_smtp_tls_report_to_s3(report) except Exception as error_: log_output_error("S3", error_.__str__()) try: - if opts.syslog_server: + if syslog_client: syslog_client.save_smtp_tls_report_to_syslog(report) except Exception as error_: log_output_error("Syslog", error_.__str__()) try: - if opts.gelf_host: + if gelf_client: gelf_client.save_smtp_tls_report_to_gelf(report) except Exception as error_: log_output_error("GELF", error_.__str__()) try: - if opts.webhook_smtp_tls_url: + if opts.webhook_smtp_tls_url and webhook_client: indent_value = 2 if opts.prettify_json else None webhook_client.save_smtp_tls_report_to_webhook( json.dumps(report, ensure_ascii=False, indent=indent_value) @@ -466,7 +1335,7 @@ def _main(): except Exception as error_: log_output_error("Webhook", error_.__str__()) - if opts.hec: + if hec_client: try: smtp_tls_reports_ = reports_["smtp_tls_reports"] if len(smtp_tls_reports_) > 0: @@ -598,8 +1467,6 @@ def _main(): args = arg_parser.parse_args() - default_gmail_api_scope = "https://www.googleapis.com/auth/gmail.modify" - opts = Namespace( file_path=args.file_path, config_file=args.config_file, @@ -748,651 +1615,20 @@ def _main(): normalize_timespan_threshold_hours=24.0, fail_on_output_error=False, ) - args = arg_parser.parse_args() + + # Snapshot opts as set from CLI args / hardcoded defaults, before any config + # file is applied. On each SIGHUP reload we restore this baseline first so + # that sections removed from the config file actually take effect. + opts_from_cli = Namespace(**vars(opts)) + + index_prefix_domain_map = None if args.config_file: - abs_path = os.path.abspath(args.config_file) - if not os.path.exists(abs_path): - logger.error("A file does not exist at {0}".format(abs_path)) + try: + index_prefix_domain_map = _parse_config_file(args.config_file, opts) + except ConfigurationError as e: + logger.critical(str(e)) exit(-1) - opts.silent = True - config = ConfigParser() - index_prefix_domain_map = None - config.read(args.config_file) - if "general" in config.sections(): - general_config = config["general"] - if "silent" in general_config: - opts.silent = bool(general_config.getboolean("silent")) - if "normalize_timespan_threshold_hours" in general_config: - opts.normalize_timespan_threshold_hours = general_config.getfloat( - "normalize_timespan_threshold_hours" - ) - if "index_prefix_domain_map" in general_config: - with open(general_config["index_prefix_domain_map"]) as f: - index_prefix_domain_map = yaml.safe_load(f) - if "offline" in general_config: - opts.offline = bool(general_config.getboolean("offline")) - if "strip_attachment_payloads" in general_config: - opts.strip_attachment_payloads = bool( - general_config.getboolean("strip_attachment_payloads") - ) - if "output" in general_config: - opts.output = general_config["output"] - if "aggregate_json_filename" in general_config: - opts.aggregate_json_filename = general_config["aggregate_json_filename"] - if "forensic_json_filename" in general_config: - opts.forensic_json_filename = general_config["forensic_json_filename"] - if "smtp_tls_json_filename" in general_config: - opts.smtp_tls_json_filename = general_config["smtp_tls_json_filename"] - if "aggregate_csv_filename" in general_config: - opts.aggregate_csv_filename = general_config["aggregate_csv_filename"] - if "forensic_csv_filename" in general_config: - opts.forensic_csv_filename = general_config["forensic_csv_filename"] - if "smtp_tls_csv_filename" in general_config: - opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"] - if "dns_timeout" in general_config: - opts.dns_timeout = general_config.getfloat("dns_timeout") - if opts.dns_timeout is None: - opts.dns_timeout = 2 - if "dns_test_address" in general_config: - opts.dns_test_address = general_config["dns_test_address"] - if "nameservers" in general_config: - opts.nameservers = _str_to_list(general_config["nameservers"]) - # nameservers pre-flight check - dummy_hostname = None - try: - dummy_hostname = get_reverse_dns( - opts.dns_test_address, - nameservers=opts.nameservers, - timeout=opts.dns_timeout, - ) - except Exception as ns_error: - logger.critical("DNS pre-flight check failed: {}".format(ns_error)) - exit(-1) - if not dummy_hostname: - logger.critical( - "DNS pre-flight check failed: no PTR record for " - "{} from {}".format(opts.dns_test_address, opts.nameservers) - ) - exit(-1) - if "save_aggregate" in general_config: - opts.save_aggregate = bool(general_config.getboolean("save_aggregate")) - if "save_forensic" in general_config: - opts.save_forensic = bool(general_config.getboolean("save_forensic")) - if "save_smtp_tls" in general_config: - opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls")) - if "debug" in general_config: - opts.debug = bool(general_config.getboolean("debug")) - if "verbose" in general_config: - opts.verbose = bool(general_config.getboolean("verbose")) - if "silent" in general_config: - opts.silent = bool(general_config.getboolean("silent")) - if "warnings" in general_config: - opts.warnings = bool(general_config.getboolean("warnings")) - if "fail_on_output_error" in general_config: - opts.fail_on_output_error = bool( - general_config.getboolean("fail_on_output_error") - ) - if "log_file" in general_config: - opts.log_file = general_config["log_file"] - if "n_procs" in general_config: - opts.n_procs = general_config.getint("n_procs") - if "ip_db_path" in general_config: - opts.ip_db_path = general_config["ip_db_path"] - else: - opts.ip_db_path = None - if "always_use_local_files" in general_config: - opts.always_use_local_files = bool( - general_config.getboolean("always_use_local_files") - ) - if "reverse_dns_map_path" in general_config: - opts.reverse_dns_map_path = general_config["reverse_dns_path"] - if "reverse_dns_map_url" in general_config: - opts.reverse_dns_map_url = general_config["reverse_dns_url"] - if "prettify_json" in general_config: - opts.prettify_json = bool(general_config.getboolean("prettify_json")) - - if "mailbox" in config.sections(): - mailbox_config = config["mailbox"] - if "msgraph" in config.sections(): - opts.mailbox_reports_folder = "Inbox" - if "reports_folder" in mailbox_config: - opts.mailbox_reports_folder = mailbox_config["reports_folder"] - if "archive_folder" in mailbox_config: - opts.mailbox_archive_folder = mailbox_config["archive_folder"] - if "watch" in mailbox_config: - opts.mailbox_watch = bool(mailbox_config.getboolean("watch")) - if "delete" in mailbox_config: - opts.mailbox_delete = bool(mailbox_config.getboolean("delete")) - if "test" in mailbox_config: - opts.mailbox_test = bool(mailbox_config.getboolean("test")) - if "batch_size" in mailbox_config: - opts.mailbox_batch_size = mailbox_config.getint("batch_size") - if "check_timeout" in mailbox_config: - opts.mailbox_check_timeout = mailbox_config.getint("check_timeout") - if "since" in mailbox_config: - opts.mailbox_since = mailbox_config["since"] - - if "imap" in config.sections(): - imap_config = config["imap"] - if "watch" in imap_config: - logger.warning( - "Starting in 8.0.0, the watch option has been " - "moved from the imap configuration section to " - "the mailbox configuration section." - ) - if "host" in imap_config: - opts.imap_host = imap_config["host"] - else: - logger.error("host setting missing from the imap config section") - exit(-1) - if "port" in imap_config: - opts.imap_port = imap_config.getint("port") - if "timeout" in imap_config: - opts.imap_timeout = imap_config.getint("timeout") - if "max_retries" in imap_config: - opts.imap_max_retries = imap_config.getint("max_retries") - if "ssl" in imap_config: - opts.imap_ssl = bool(imap_config.getboolean("ssl")) - if "skip_certificate_verification" in imap_config: - opts.imap_skip_certificate_verification = bool( - imap_config.getboolean("skip_certificate_verification") - ) - if "user" in imap_config: - opts.imap_user = imap_config["user"] - else: - logger.critical("user setting missing from the imap config section") - exit(-1) - if "password" in imap_config: - opts.imap_password = imap_config["password"] - else: - logger.critical("password setting missing from the imap config section") - exit(-1) - if "reports_folder" in imap_config: - opts.mailbox_reports_folder = imap_config["reports_folder"] - logger.warning( - "Use of the reports_folder option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "archive_folder" in imap_config: - opts.mailbox_archive_folder = imap_config["archive_folder"] - logger.warning( - "Use of the archive_folder option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "watch" in imap_config: - opts.mailbox_watch = bool(imap_config.getboolean("watch")) - logger.warning( - "Use of the watch option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "delete" in imap_config: - logger.warning( - "Use of the delete option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "test" in imap_config: - opts.mailbox_test = bool(imap_config.getboolean("test")) - logger.warning( - "Use of the test option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - if "batch_size" in imap_config: - opts.mailbox_batch_size = imap_config.getint("batch_size") - logger.warning( - "Use of the batch_size option in the imap " - "configuration section has been deprecated. " - "Use this option in the mailbox configuration " - "section instead." - ) - - if "msgraph" in config.sections(): - graph_config = config["msgraph"] - opts.graph_token_file = graph_config.get("token_file", ".token") - - if "auth_method" not in graph_config: - logger.info( - "auth_method setting missing from the " - "msgraph config section " - "defaulting to UsernamePassword" - ) - opts.graph_auth_method = AuthMethod.UsernamePassword.name - else: - opts.graph_auth_method = graph_config["auth_method"] - - if opts.graph_auth_method == AuthMethod.UsernamePassword.name: - if "user" in graph_config: - opts.graph_user = graph_config["user"] - else: - logger.critical( - "user setting missing from the msgraph config section" - ) - exit(-1) - if "password" in graph_config: - opts.graph_password = graph_config["password"] - else: - logger.critical( - "password setting missing from the msgraph config section" - ) - exit(-1) - if "client_secret" in graph_config: - opts.graph_client_secret = graph_config["client_secret"] - else: - logger.critical( - "client_secret setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.DeviceCode.name: - if "user" in graph_config: - opts.graph_user = graph_config["user"] - - if opts.graph_auth_method != AuthMethod.UsernamePassword.name: - if "tenant_id" in graph_config: - opts.graph_tenant_id = graph_config["tenant_id"] - else: - logger.critical( - "tenant_id setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.ClientSecret.name: - if "client_secret" in graph_config: - opts.graph_client_secret = graph_config["client_secret"] - else: - logger.critical( - "client_secret setting missing from the msgraph config section" - ) - exit(-1) - - if opts.graph_auth_method == AuthMethod.Certificate.name: - if "certificate_path" in graph_config: - opts.graph_certificate_path = graph_config["certificate_path"] - else: - logger.critical( - "certificate_path setting missing from the msgraph config section" - ) - exit(-1) - if "certificate_password" in graph_config: - opts.graph_certificate_password = graph_config[ - "certificate_password" - ] - - if "client_id" in graph_config: - opts.graph_client_id = graph_config["client_id"] - else: - logger.critical( - "client_id setting missing from the msgraph config section" - ) - exit(-1) - - if "mailbox" in graph_config: - opts.graph_mailbox = graph_config["mailbox"] - elif opts.graph_auth_method != AuthMethod.UsernamePassword.name: - logger.critical( - "mailbox setting missing from the msgraph config section" - ) - exit(-1) - - if "graph_url" in graph_config: - opts.graph_url = graph_config["graph_url"] - - if "allow_unencrypted_storage" in graph_config: - opts.graph_allow_unencrypted_storage = bool( - graph_config.getboolean("allow_unencrypted_storage") - ) - - if "elasticsearch" in config: - elasticsearch_config = config["elasticsearch"] - if "hosts" in elasticsearch_config: - opts.elasticsearch_hosts = _str_to_list(elasticsearch_config["hosts"]) - else: - logger.critical( - "hosts setting missing from the elasticsearch config section" - ) - exit(-1) - if "timeout" in elasticsearch_config: - timeout = elasticsearch_config.getfloat("timeout") - opts.elasticsearch_timeout = timeout - if "number_of_shards" in elasticsearch_config: - number_of_shards = elasticsearch_config.getint("number_of_shards") - opts.elasticsearch_number_of_shards = number_of_shards - if "number_of_replicas" in elasticsearch_config: - number_of_replicas = elasticsearch_config.getint( - "number_of_replicas" - ) - opts.elasticsearch_number_of_replicas = number_of_replicas - if "index_suffix" in elasticsearch_config: - opts.elasticsearch_index_suffix = elasticsearch_config["index_suffix"] - if "index_prefix" in elasticsearch_config: - opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"] - if "monthly_indexes" in elasticsearch_config: - monthly = bool(elasticsearch_config.getboolean("monthly_indexes")) - opts.elasticsearch_monthly_indexes = monthly - if "ssl" in elasticsearch_config: - opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) - if "cert_path" in elasticsearch_config: - opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] - if "user" in elasticsearch_config: - opts.elasticsearch_username = elasticsearch_config["user"] - if "password" in elasticsearch_config: - opts.elasticsearch_password = elasticsearch_config["password"] - # Until 8.20 - if "apiKey" in elasticsearch_config: - opts.elasticsearch_api_key = elasticsearch_config["apiKey"] - # Since 8.20 - if "api_key" in elasticsearch_config: - opts.elasticsearch_api_key = elasticsearch_config["api_key"] - - if "opensearch" in config: - opensearch_config = config["opensearch"] - if "hosts" in opensearch_config: - opts.opensearch_hosts = _str_to_list(opensearch_config["hosts"]) - else: - logger.critical( - "hosts setting missing from the opensearch config section" - ) - exit(-1) - if "timeout" in opensearch_config: - timeout = opensearch_config.getfloat("timeout") - opts.opensearch_timeout = timeout - if "number_of_shards" in opensearch_config: - number_of_shards = opensearch_config.getint("number_of_shards") - opts.opensearch_number_of_shards = number_of_shards - if "number_of_replicas" in opensearch_config: - number_of_replicas = opensearch_config.getint("number_of_replicas") - opts.opensearch_number_of_replicas = number_of_replicas - if "index_suffix" in opensearch_config: - opts.opensearch_index_suffix = opensearch_config["index_suffix"] - if "index_prefix" in opensearch_config: - opts.opensearch_index_prefix = opensearch_config["index_prefix"] - if "monthly_indexes" in opensearch_config: - monthly = bool(opensearch_config.getboolean("monthly_indexes")) - opts.opensearch_monthly_indexes = monthly - if "ssl" in opensearch_config: - opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) - if "cert_path" in opensearch_config: - opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] - if "user" in opensearch_config: - opts.opensearch_username = opensearch_config["user"] - if "password" in opensearch_config: - opts.opensearch_password = opensearch_config["password"] - # Until 8.20 - if "apiKey" in opensearch_config: - opts.opensearch_api_key = opensearch_config["apiKey"] - # Since 8.20 - if "api_key" in opensearch_config: - opts.opensearch_api_key = opensearch_config["api_key"] - if "auth_type" in opensearch_config: - opts.opensearch_auth_type = ( - opensearch_config["auth_type"].strip().lower() - ) - elif "authentication_type" in opensearch_config: - opts.opensearch_auth_type = ( - opensearch_config["authentication_type"].strip().lower() - ) - if "aws_region" in opensearch_config: - opts.opensearch_aws_region = opensearch_config["aws_region"].strip() - if "aws_service" in opensearch_config: - opts.opensearch_aws_service = opensearch_config["aws_service"].strip() - - if "splunk_hec" in config.sections(): - hec_config = config["splunk_hec"] - if "url" in hec_config: - opts.hec = hec_config["url"] - else: - logger.critical( - "url setting missing from the splunk_hec config section" - ) - exit(-1) - if "token" in hec_config: - opts.hec_token = hec_config["token"] - else: - logger.critical( - "token setting missing from the splunk_hec config section" - ) - exit(-1) - if "index" in hec_config: - opts.hec_index = hec_config["index"] - else: - logger.critical( - "index setting missing from the splunk_hec config section" - ) - exit(-1) - if "skip_certificate_verification" in hec_config: - opts.hec_skip_certificate_verification = hec_config[ - "skip_certificate_verification" - ] - - if "kafka" in config.sections(): - kafka_config = config["kafka"] - if "hosts" in kafka_config: - opts.kafka_hosts = _str_to_list(kafka_config["hosts"]) - else: - logger.critical("hosts setting missing from the kafka config section") - exit(-1) - if "user" in kafka_config: - opts.kafka_username = kafka_config["user"] - if "password" in kafka_config: - opts.kafka_password = kafka_config["password"] - if "ssl" in kafka_config: - opts.kafka_ssl = bool(kafka_config.getboolean("ssl")) - if "skip_certificate_verification" in kafka_config: - kafka_verify = bool( - kafka_config.getboolean("skip_certificate_verification") - ) - opts.kafka_skip_certificate_verification = kafka_verify - if "aggregate_topic" in kafka_config: - opts.kafka_aggregate_topic = kafka_config["aggregate_topic"] - else: - logger.critical( - "aggregate_topic setting missing from the kafka config section" - ) - exit(-1) - if "forensic_topic" in kafka_config: - opts.kafka_forensic_topic = kafka_config["forensic_topic"] - else: - logger.critical( - "forensic_topic setting missing from the kafka config section" - ) - if "smtp_tls_topic" in kafka_config: - opts.kafka_smtp_tls_topic = kafka_config["smtp_tls_topic"] - else: - logger.critical( - "forensic_topic setting missing from the splunk_hec config section" - ) - - if "smtp" in config.sections(): - smtp_config = config["smtp"] - if "host" in smtp_config: - opts.smtp_host = smtp_config["host"] - else: - logger.critical("host setting missing from the smtp config section") - exit(-1) - if "port" in smtp_config: - opts.smtp_port = smtp_config.getint("port") - if "ssl" in smtp_config: - opts.smtp_ssl = bool(smtp_config.getboolean("ssl")) - if "skip_certificate_verification" in smtp_config: - smtp_verify = bool( - smtp_config.getboolean("skip_certificate_verification") - ) - opts.smtp_skip_certificate_verification = smtp_verify - if "user" in smtp_config: - opts.smtp_user = smtp_config["user"] - else: - logger.critical("user setting missing from the smtp config section") - exit(-1) - if "password" in smtp_config: - opts.smtp_password = smtp_config["password"] - else: - logger.critical("password setting missing from the smtp config section") - exit(-1) - if "from" in smtp_config: - opts.smtp_from = smtp_config["from"] - else: - logger.critical("from setting missing from the smtp config section") - if "to" in smtp_config: - opts.smtp_to = _str_to_list(smtp_config["to"]) - else: - logger.critical("to setting missing from the smtp config section") - if "subject" in smtp_config: - opts.smtp_subject = smtp_config["subject"] - if "attachment" in smtp_config: - opts.smtp_attachment = smtp_config["attachment"] - if "message" in smtp_config: - opts.smtp_message = smtp_config["message"] - - if "s3" in config.sections(): - s3_config = config["s3"] - if "bucket" in s3_config: - opts.s3_bucket = s3_config["bucket"] - else: - logger.critical("bucket setting missing from the s3 config section") - exit(-1) - if "path" in s3_config: - opts.s3_path = s3_config["path"] - if opts.s3_path.startswith("/"): - opts.s3_path = opts.s3_path[1:] - if opts.s3_path.endswith("/"): - opts.s3_path = opts.s3_path[:-1] - else: - opts.s3_path = "" - - if "region_name" in s3_config: - opts.s3_region_name = s3_config["region_name"] - if "endpoint_url" in s3_config: - opts.s3_endpoint_url = s3_config["endpoint_url"] - if "access_key_id" in s3_config: - opts.s3_access_key_id = s3_config["access_key_id"] - if "secret_access_key" in s3_config: - opts.s3_secret_access_key = s3_config["secret_access_key"] - - if "syslog" in config.sections(): - syslog_config = config["syslog"] - if "server" in syslog_config: - opts.syslog_server = syslog_config["server"] - else: - logger.critical("server setting missing from the syslog config section") - exit(-1) - if "port" in syslog_config: - opts.syslog_port = syslog_config["port"] - else: - opts.syslog_port = 514 - if "protocol" in syslog_config: - opts.syslog_protocol = syslog_config["protocol"] - else: - opts.syslog_protocol = "udp" - if "cafile_path" in syslog_config: - opts.syslog_cafile_path = syslog_config["cafile_path"] - if "certfile_path" in syslog_config: - opts.syslog_certfile_path = syslog_config["certfile_path"] - if "keyfile_path" in syslog_config: - opts.syslog_keyfile_path = syslog_config["keyfile_path"] - if "timeout" in syslog_config: - opts.syslog_timeout = float(syslog_config["timeout"]) - else: - opts.syslog_timeout = 5.0 - if "retry_attempts" in syslog_config: - opts.syslog_retry_attempts = int(syslog_config["retry_attempts"]) - else: - opts.syslog_retry_attempts = 3 - if "retry_delay" in syslog_config: - opts.syslog_retry_delay = int(syslog_config["retry_delay"]) - else: - opts.syslog_retry_delay = 5 - - if "gmail_api" in config.sections(): - gmail_api_config = config["gmail_api"] - opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file") - opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") - opts.gmail_api_include_spam_trash = bool( - gmail_api_config.getboolean("include_spam_trash", False) - ) - opts.gmail_api_paginate_messages = bool( - gmail_api_config.getboolean("paginate_messages", True) - ) - opts.gmail_api_scopes = gmail_api_config.get( - "scopes", default_gmail_api_scope - ) - opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes) - if "oauth2_port" in gmail_api_config: - opts.gmail_api_oauth2_port = gmail_api_config.getint( - "oauth2_port", 8080 - ) - if "auth_mode" in gmail_api_config: - opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip() - if "service_account_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( - "service_account_user" - ).strip() - elif "delegated_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( - "delegated_user" - ).strip() - - if "maildir" in config.sections(): - maildir_api_config = config["maildir"] - opts.maildir_path = maildir_api_config.get("maildir_path") - opts.maildir_create = bool( - maildir_api_config.getboolean("maildir_create", fallback=False) - ) - - if "log_analytics" in config.sections(): - log_analytics_config = config["log_analytics"] - opts.la_client_id = log_analytics_config.get("client_id") - opts.la_client_secret = log_analytics_config.get("client_secret") - opts.la_tenant_id = log_analytics_config.get("tenant_id") - opts.la_dce = log_analytics_config.get("dce") - opts.la_dcr_immutable_id = log_analytics_config.get("dcr_immutable_id") - opts.la_dcr_aggregate_stream = log_analytics_config.get( - "dcr_aggregate_stream" - ) - opts.la_dcr_forensic_stream = log_analytics_config.get( - "dcr_forensic_stream" - ) - opts.la_dcr_smtp_tls_stream = log_analytics_config.get( - "dcr_smtp_tls_stream" - ) - - if "gelf" in config.sections(): - gelf_config = config["gelf"] - if "host" in gelf_config: - opts.gelf_host = gelf_config["host"] - else: - logger.critical("host setting missing from the gelf config section") - exit(-1) - if "port" in gelf_config: - opts.gelf_port = gelf_config["port"] - else: - logger.critical("port setting missing from the gelf config section") - exit(-1) - if "mode" in gelf_config: - opts.gelf_mode = gelf_config["mode"] - else: - logger.critical("mode setting missing from the gelf config section") - exit(-1) - - if "webhook" in config.sections(): - webhook_config = config["webhook"] - if "aggregate_url" in webhook_config: - opts.webhook_aggregate_url = webhook_config["aggregate_url"] - if "forensic_url" in webhook_config: - opts.webhook_forensic_url = webhook_config["forensic_url"] - if "smtp_tls_url" in webhook_config: - opts.webhook_smtp_tls_url = webhook_config["smtp_tls_url"] - if "timeout" in webhook_config: - opts.webhook_timeout = webhook_config.getint("timeout") logger.setLevel(logging.ERROR) @@ -1413,6 +1649,8 @@ def _main(): except Exception as error: logger.warning("Unable to write to log file: {}".format(error)) + opts.active_log_file = opts.log_file + if ( opts.imap_host is None and opts.graph_client_id is None @@ -1425,174 +1663,21 @@ def _main(): logger.info("Starting parsedmarc") - if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: - try: - if opts.elasticsearch_hosts: - es_aggregate_index = "dmarc_aggregate" - es_forensic_index = "dmarc_forensic" - es_smtp_tls_index = "smtp_tls" - if opts.elasticsearch_index_suffix: - suffix = opts.elasticsearch_index_suffix - es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) - es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) - es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) - if opts.elasticsearch_index_prefix: - prefix = opts.elasticsearch_index_prefix - es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) - es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) - es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) - elastic_timeout_value = ( - float(opts.elasticsearch_timeout) - if opts.elasticsearch_timeout is not None - else 60.0 - ) - elastic.set_hosts( - opts.elasticsearch_hosts, - use_ssl=opts.elasticsearch_ssl, - ssl_cert_path=opts.elasticsearch_ssl_cert_path, - username=opts.elasticsearch_username, - password=opts.elasticsearch_password, - api_key=opts.elasticsearch_api_key, - timeout=elastic_timeout_value, - ) - elastic.migrate_indexes( - aggregate_indexes=[es_aggregate_index], - forensic_indexes=[es_forensic_index], - ) - except elastic.ElasticsearchError: - logger.exception("Elasticsearch Error") - exit(1) - - try: - if opts.opensearch_hosts: - os_aggregate_index = "dmarc_aggregate" - os_forensic_index = "dmarc_forensic" - os_smtp_tls_index = "smtp_tls" - if opts.opensearch_index_suffix: - suffix = opts.opensearch_index_suffix - os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) - os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) - os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) - if opts.opensearch_index_prefix: - prefix = opts.opensearch_index_prefix - os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) - os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) - os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) - opensearch_timeout_value = ( - float(opts.opensearch_timeout) - if opts.opensearch_timeout is not None - else 60.0 - ) - opensearch.set_hosts( - opts.opensearch_hosts, - use_ssl=opts.opensearch_ssl, - ssl_cert_path=opts.opensearch_ssl_cert_path, - username=opts.opensearch_username, - password=opts.opensearch_password, - api_key=opts.opensearch_api_key, - timeout=opensearch_timeout_value, - auth_type=opts.opensearch_auth_type, - aws_region=opts.opensearch_aws_region, - aws_service=opts.opensearch_aws_service, - ) - opensearch.migrate_indexes( - aggregate_indexes=[os_aggregate_index], - forensic_indexes=[os_forensic_index], - ) - except opensearch.OpenSearchError: - logger.exception("OpenSearch Error") - exit(1) - - if opts.s3_bucket: - try: - s3_client = s3.S3Client( - bucket_name=opts.s3_bucket, - bucket_path=opts.s3_path, - region_name=opts.s3_region_name, - endpoint_url=opts.s3_endpoint_url, - access_key_id=opts.s3_access_key_id, - secret_access_key=opts.s3_secret_access_key, - ) - except Exception as error_: - logger.error("S3 Error: {0}".format(error_.__str__())) - - if opts.syslog_server: - try: - syslog_client = syslog.SyslogClient( - server_name=opts.syslog_server, - server_port=int(opts.syslog_port), - protocol=opts.syslog_protocol or "udp", - cafile_path=opts.syslog_cafile_path, - certfile_path=opts.syslog_certfile_path, - keyfile_path=opts.syslog_keyfile_path, - timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, - retry_attempts=opts.syslog_retry_attempts - if opts.syslog_retry_attempts is not None - else 3, - retry_delay=opts.syslog_retry_delay - if opts.syslog_retry_delay is not None - else 5, - ) - except Exception as error_: - logger.error("Syslog Error: {0}".format(error_.__str__())) - - if opts.hec: - if opts.hec_token is None or opts.hec_index is None: - logger.error("HEC token and HEC index are required when using HEC URL") - exit(1) - - verify = True - if opts.hec_skip_certificate_verification: - verify = False - hec_client = splunk.HECClient( - opts.hec, opts.hec_token, opts.hec_index, verify=verify - ) - - if opts.kafka_hosts: - try: - ssl_context = None - if opts.kafka_skip_certificate_verification: - logger.debug("Skipping Kafka certificate verification") - ssl_context = create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = CERT_NONE - kafka_client = kafkaclient.KafkaClient( - opts.kafka_hosts, - username=opts.kafka_username, - password=opts.kafka_password, - ssl_context=ssl_context, - ) - except Exception as error_: - logger.error("Kafka Error: {0}".format(error_.__str__())) - - if opts.gelf_host: - try: - gelf_client = gelf.GelfClient( - host=opts.gelf_host, - port=int(opts.gelf_port), - mode=opts.gelf_mode, - ) - except Exception as error_: - logger.error("GELF Error: {0}".format(error_.__str__())) - - if ( - opts.webhook_aggregate_url - or opts.webhook_forensic_url - or opts.webhook_smtp_tls_url - ): - try: - webhook_client = webhook.WebhookClient( - aggregate_url=opts.webhook_aggregate_url, - forensic_url=opts.webhook_forensic_url, - smtp_tls_url=opts.webhook_smtp_tls_url, - timeout=opts.webhook_timeout, - ) - except Exception as error_: - logger.error("Webhook Error: {0}".format(error_.__str__())) - - kafka_aggregate_topic = opts.kafka_aggregate_topic - kafka_forensic_topic = opts.kafka_forensic_topic - kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic + # Initialize output clients + try: + clients = _init_output_clients(opts) + except elastic.ElasticsearchError as e: + logger.exception("Elasticsearch Error: {0}".format(e)) + exit(1) + except opensearch.OpenSearchError as e: + logger.exception("OpenSearch Error: {0}".format(e)) + exit(1) + except ConfigurationError as e: + logger.critical(str(e)) + exit(1) + except Exception as error_: + logger.error("Output client error: {0}".format(error_)) + exit(1) file_paths = [] mbox_paths = [] @@ -1724,8 +1809,9 @@ def _main(): try: if opts.imap_user is None or opts.imap_password is None: logger.error( - "IMAP user and password must be specified ifhost is specified" + "IMAP user and password must be specified if host is specified" ) + exit(1) ssl = True verify = True @@ -1897,36 +1983,139 @@ def _main(): logger.exception("Failed to email results") exit(1) + # SIGHUP-based config reload for watch mode + _reload_requested = False + + def _handle_sighup(signum, frame): + nonlocal _reload_requested + # Logging is not async-signal-safe; only set the flag here. + # The log message is emitted from the main loop when the flag is read. + _reload_requested = True + + if hasattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, _handle_sighup) + if mailbox_connection and opts.mailbox_watch: logger.info("Watching for email - Quit with ctrl-c") - try: - watch_inbox( - mailbox_connection=mailbox_connection, - callback=process_reports, - reports_folder=opts.mailbox_reports_folder, - archive_folder=opts.mailbox_archive_folder, - delete=opts.mailbox_delete, - test=opts.mailbox_test, - check_timeout=mailbox_check_timeout_value, - nameservers=opts.nameservers, - dns_timeout=opts.dns_timeout, - strip_attachment_payloads=opts.strip_attachment_payloads, - batch_size=mailbox_batch_size_value, - since=opts.mailbox_since, - ip_db_path=opts.ip_db_path, - always_use_local_files=opts.always_use_local_files, - reverse_dns_map_path=opts.reverse_dns_map_path, - reverse_dns_map_url=opts.reverse_dns_map_url, - offline=opts.offline, - normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, - ) - except FileExistsError as error: - logger.error("{0}".format(error.__str__())) - exit(1) - except ParserError as error: - logger.error(error.__str__()) - exit(1) + while True: + # Re-check mailbox_watch in case a config reload disabled watch mode + if not opts.mailbox_watch: + logger.info( + "Mailbox watch disabled in reloaded configuration, stopping watcher" + ) + break + try: + watch_inbox( + mailbox_connection=mailbox_connection, + callback=process_reports, + reports_folder=opts.mailbox_reports_folder, + archive_folder=opts.mailbox_archive_folder, + delete=opts.mailbox_delete, + test=opts.mailbox_test, + check_timeout=mailbox_check_timeout_value, + nameservers=opts.nameservers, + dns_timeout=opts.dns_timeout, + strip_attachment_payloads=opts.strip_attachment_payloads, + batch_size=mailbox_batch_size_value, + since=opts.mailbox_since, + ip_db_path=opts.ip_db_path, + always_use_local_files=opts.always_use_local_files, + reverse_dns_map_path=opts.reverse_dns_map_path, + reverse_dns_map_url=opts.reverse_dns_map_url, + offline=opts.offline, + normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, + config_reloading=lambda: _reload_requested, + ) + except FileExistsError as error: + logger.error("{0}".format(error.__str__())) + exit(1) + except ParserError as error: + logger.error(error.__str__()) + exit(1) + + if not _reload_requested: + break + + # Reload configuration — emit the log message here (not in the + # signal handler, which is not async-signal-safe), then clear the + # flag so that any new SIGHUP arriving while we reload will be + # captured for the next iteration rather than being silently dropped. + logger.info("SIGHUP received, config will reload after current batch") + _reload_requested = False + logger.info("Reloading configuration...") + try: + # Build a fresh opts starting from CLI-only defaults so that + # sections removed from the config file actually take effect. + new_opts = Namespace(**vars(opts_from_cli)) + new_index_prefix_domain_map = _parse_config_file( + args.config_file, new_opts + ) + new_clients = _init_output_clients(new_opts) + + # All steps succeeded — commit the changes atomically. + _close_output_clients(clients) + clients = new_clients + index_prefix_domain_map = new_index_prefix_domain_map + for k, v in vars(new_opts).items(): + setattr(opts, k, v) + + # Update watch parameters from reloaded config + mailbox_batch_size_value = ( + int(opts.mailbox_batch_size) + if opts.mailbox_batch_size is not None + else 10 + ) + mailbox_check_timeout_value = ( + int(opts.mailbox_check_timeout) + if opts.mailbox_check_timeout is not None + else 30 + ) + normalize_timespan_threshold_hours_value = ( + float(opts.normalize_timespan_threshold_hours) + if opts.normalize_timespan_threshold_hours is not None + else 24.0 + ) + + # Update log level + logger.setLevel(logging.ERROR) + if opts.warnings: + logger.setLevel(logging.WARNING) + if opts.verbose: + logger.setLevel(logging.INFO) + if opts.debug: + logger.setLevel(logging.DEBUG) + + # Refresh FileHandler if log_file changed + old_log_file = getattr(opts, "active_log_file", None) + new_log_file = opts.log_file + if old_log_file != new_log_file: + # Remove old FileHandlers + for h in list(logger.handlers): + if isinstance(h, logging.FileHandler): + h.close() + logger.removeHandler(h) + # Add new FileHandler if configured + if new_log_file: + try: + fh = logging.FileHandler(new_log_file, "a") + file_formatter = logging.Formatter( + "%(asctime)s - %(levelname)s" + " - [%(filename)s:%(lineno)d] - %(message)s" + ) + fh.setFormatter(file_formatter) + logger.addHandler(fh) + except Exception as log_error: + logger.warning( + "Unable to write to log file: {}".format(log_error) + ) + opts.active_log_file = new_log_file + + logger.info("Configuration reloaded successfully") + except Exception: + logger.exception( + "Config reload failed, continuing with previous config" + ) if __name__ == "__main__": diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index f65a0a1..38c0044 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,3 +1,3 @@ -__version__ = "9.2.1" +__version__ = "9.3.0" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py index 67f4f5d..2ac5a5a 100644 --- a/parsedmarc/gelf.py +++ b/parsedmarc/gelf.py @@ -3,9 +3,7 @@ from __future__ import annotations import logging -import logging.handlers import threading -from typing import Any from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler @@ -14,6 +12,7 @@ from parsedmarc import ( parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows, ) +from parsedmarc.types import AggregateReport, ForensicReport, SMTPTLSReport log_context_data = threading.local() @@ -37,7 +36,7 @@ class GelfClient(object): """ self.host = host self.port = port - self.logger = logging.getLogger("parsedmarc_syslog") + self.logger = logging.getLogger("parsedmarc_gelf") self.logger.setLevel(logging.INFO) self.logger.addFilter(ContextFilter()) self.gelf_mode = { @@ -50,7 +49,7 @@ class GelfClient(object): ) self.logger.addHandler(self.handler) - def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]): + def save_aggregate_report_to_gelf(self, aggregate_reports: list[AggregateReport]): rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) for row in rows: log_context_data.parsedmarc = row @@ -58,14 +57,19 @@ class GelfClient(object): log_context_data.parsedmarc = None - def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]): + def save_forensic_report_to_gelf(self, forensic_reports: list[ForensicReport]): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: log_context_data.parsedmarc = row self.logger.info("parsedmarc forensic report") - def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]): + def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: SMTPTLSReport): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: log_context_data.parsedmarc = row self.logger.info("parsedmarc smtptls report") + + def close(self): + """Remove and close the GELF handler, releasing its connection.""" + self.logger.removeHandler(self.handler) + self.handler.close() diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index e27c9b9..227e102 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -62,6 +62,10 @@ class KafkaClient(object): except NoBrokersAvailable: raise KafkaError("No Kafka brokers available") + def close(self): + """Close the Kafka producer, releasing background threads and sockets.""" + self.producer.close() + @staticmethod def strip_metadata(report: dict[str, Any]): """ diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py index ac8f453..924ba7e 100644 --- a/parsedmarc/mail/gmail.py +++ b/parsedmarc/mail/gmail.py @@ -175,10 +175,14 @@ class GmailConnection(MailboxConnection): # Not needed pass - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): """Checks the mailbox for new messages every n seconds""" while True: + if config_reloading and config_reloading(): + return sleep(check_timeout) + if config_reloading and config_reloading(): + return check_callback(self) @lru_cache(maxsize=10) diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index 05154f7..7df039c 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -278,10 +278,14 @@ class MSGraphConnection(MailboxConnection): # Not needed pass - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): """Checks the mailbox for new messages every n seconds""" while True: + if config_reloading and config_reloading(): + return sleep(check_timeout) + if config_reloading and config_reloading(): + return check_callback(self) @lru_cache(maxsize=10) diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 3252807..5201fa7 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -81,7 +81,7 @@ class IMAPConnection(MailboxConnection): def keepalive(self): self._client.noop() - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): """ Use an IDLE IMAP connection to parse incoming emails, and pass the results to a callback function @@ -94,6 +94,8 @@ class IMAPConnection(MailboxConnection): check_callback(self) while True: + if config_reloading and config_reloading(): + return try: IMAPClient( host=self._client.host, @@ -111,3 +113,5 @@ class IMAPConnection(MailboxConnection): except Exception as e: logger.warning("IMAP connection error. {0}. Reconnecting...".format(e)) sleep(check_timeout) + if config_reloading and config_reloading(): + return diff --git a/parsedmarc/mail/mailbox_connection.py b/parsedmarc/mail/mailbox_connection.py index 21f1d92..6c94336 100644 --- a/parsedmarc/mail/mailbox_connection.py +++ b/parsedmarc/mail/mailbox_connection.py @@ -28,5 +28,5 @@ class MailboxConnection(ABC): def keepalive(self): raise NotImplementedError - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): raise NotImplementedError diff --git a/parsedmarc/mail/maildir.py b/parsedmarc/mail/maildir.py index c3ea8ae..fa23dd3 100644 --- a/parsedmarc/mail/maildir.py +++ b/parsedmarc/mail/maildir.py @@ -63,10 +63,14 @@ class MaildirConnection(MailboxConnection): def keepalive(self): return - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): while True: + if config_reloading and config_reloading(): + return try: check_callback(self) except Exception as e: logger.warning("Maildir init error. {0}".format(e)) + if config_reloading and config_reloading(): + return sleep(check_timeout) diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index d6778fa..99e03b3 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -93,3 +93,11 @@ class S3Client(object): self.bucket.put_object( Body=json.dumps(report), Key=object_path, Metadata=object_metadata ) + + def close(self): + """Clean up the boto3 resource.""" + try: + if self.s3.meta is not None: + self.s3.meta.client.close() + except Exception: + pass diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 28f7c0f..f96e000 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -207,3 +207,7 @@ class HECClient(object): raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"]) + + def close(self): + """Close the underlying HTTP session.""" + self.session.close() diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index d96e56b..ec8e757 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -57,7 +57,7 @@ class SyslogClient(object): self.logger.setLevel(logging.INFO) # Create the appropriate syslog handler based on protocol - log_handler = self._create_syslog_handler( + self.log_handler = self._create_syslog_handler( server_name, server_port, self.protocol, @@ -69,7 +69,7 @@ class SyslogClient(object): retry_delay, ) - self.logger.addHandler(log_handler) + self.logger.addHandler(self.log_handler) def _create_syslog_handler( self, @@ -179,3 +179,8 @@ class SyslogClient(object): rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) for row in rows: self.logger.info(json.dumps(row)) + + def close(self): + """Remove and close the syslog handler, releasing its socket.""" + self.logger.removeHandler(self.log_handler) + self.log_handler.close() diff --git a/parsedmarc/webhook.py b/parsedmarc/webhook.py index 5dd05bf..9b6f66f 100644 --- a/parsedmarc/webhook.py +++ b/parsedmarc/webhook.py @@ -63,3 +63,7 @@ class WebhookClient(object): self.session.post(webhook_url, data=payload, timeout=self.timeout) except Exception as error_: logger.error("Webhook Error: {0}".format(error_.__str__())) + + def close(self): + """Close the underlying HTTP session.""" + self.session.close() diff --git a/tests.py b/tests.py index efc4b25..1b70cb0 100755 --- a/tests.py +++ b/tests.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, print_function, unicode_literals import os +import signal import sys import tempfile import unittest @@ -11,10 +12,11 @@ from base64 import urlsafe_b64encode from glob import glob from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory +from typing import cast from types import SimpleNamespace from unittest.mock import MagicMock, patch -from lxml import etree +from lxml import etree # type: ignore[import-untyped] from googleapiclient.errors import HttpError from httplib2 import Response from imapclient.exceptions import IMAPClientError @@ -31,6 +33,7 @@ from parsedmarc.mail.imap import IMAPConnection import parsedmarc.mail.gmail as gmail_module import parsedmarc.mail.graph as graph_module import parsedmarc.mail.imap as imap_module +import parsedmarc.elastic import parsedmarc.opensearch as opensearch_module import parsedmarc.utils @@ -153,7 +156,7 @@ class Test(unittest.TestCase): report_path, offline=True, ) - self.assertEqual(result["report_type"], "aggregate") + assert result["report_type"] == "aggregate" self.assertEqual(result["report"]["report_metadata"]["org_name"], "outlook.com") def testParseReportFileAcceptsPathForEmail(self): @@ -164,7 +167,7 @@ class Test(unittest.TestCase): report_path, offline=True, ) - self.assertEqual(result["report_type"], "aggregate") + assert result["report_type"] == "aggregate" self.assertEqual(result["report"]["report_metadata"]["org_name"], "google.com") def testAggregateSamples(self): @@ -175,10 +178,11 @@ class Test(unittest.TestCase): if os.path.isdir(sample_path): continue print("Testing {0}: ".format(sample_path), end="") - parsed_report = parsedmarc.parse_report_file( + result = parsedmarc.parse_report_file( sample_path, always_use_local_files=True, offline=OFFLINE_MODE - )["report"] - parsedmarc.parsed_aggregate_reports_to_csv(parsed_report) + ) + assert result["report_type"] == "aggregate" + parsedmarc.parsed_aggregate_reports_to_csv(result["report"]) print("Passed!") def testEmptySample(self): @@ -194,13 +198,15 @@ class Test(unittest.TestCase): print("Testing {0}: ".format(sample_path), end="") with open(sample_path) as sample_file: sample_content = sample_file.read() - parsed_report = parsedmarc.parse_report_email( + email_result = parsedmarc.parse_report_email( sample_content, offline=OFFLINE_MODE - )["report"] - parsed_report = parsedmarc.parse_report_file( + ) + assert email_result["report_type"] == "forensic" + result = parsedmarc.parse_report_file( sample_path, offline=OFFLINE_MODE - )["report"] - parsedmarc.parsed_forensic_reports_to_csv(parsed_report) + ) + assert result["report_type"] == "forensic" + parsedmarc.parsed_forensic_reports_to_csv(result["report"]) print("Passed!") def testSmtpTlsSamples(self): @@ -211,10 +217,11 @@ class Test(unittest.TestCase): if os.path.isdir(sample_path): continue print("Testing {0}: ".format(sample_path), end="") - parsed_report = parsedmarc.parse_report_file( + result = parsedmarc.parse_report_file( sample_path, offline=OFFLINE_MODE - )["report"] - parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report) + ) + assert result["report_type"] == "smtp_tls" + parsedmarc.parsed_smtp_tls_reports_to_csv(result["report"]) print("Passed!") def testOpenSearchSigV4RequiresRegion(self): @@ -1277,7 +1284,7 @@ class TestMailboxWatchSince(unittest.TestCase): def testWatchInboxPassesSinceToMailboxFetch(self): mailbox_connection = SimpleNamespace() - def fake_watch(check_callback, check_timeout): + def fake_watch(check_callback, check_timeout, config_reloading=None): check_callback(mailbox_connection) raise _BreakLoop() @@ -1288,7 +1295,7 @@ class TestMailboxWatchSince(unittest.TestCase): ) as mocked: with self.assertRaises(_BreakLoop): parsedmarc.watch_inbox( - mailbox_connection=mailbox_connection, + mailbox_connection=cast(parsedmarc.MailboxConnection, mailbox_connection), callback=callback, check_timeout=1, batch_size=10, @@ -1336,30 +1343,30 @@ since = 2d self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d") -class _DummyMailboxConnection: +class _DummyMailboxConnection(parsedmarc.MailboxConnection): def __init__(self): - self.fetch_calls = [] + self.fetch_calls: list[dict[str, object]] = [] - def create_folder(self, folder_name): + def create_folder(self, folder_name: str): return None - def fetch_messages(self, reports_folder, **kwargs): + def fetch_messages(self, reports_folder: str, **kwargs): self.fetch_calls.append({"reports_folder": reports_folder, **kwargs}) return [] - def fetch_message(self, message_id, **kwargs): + def fetch_message(self, message_id) -> str: return "" def delete_message(self, message_id): return None - def move_message(self, message_id, folder_name): + def move_message(self, message_id, folder_name: str): return None def keepalive(self): return None - def watch(self, check_callback, check_timeout): + def watch(self, check_callback, check_timeout, config_reloading=None): return None @@ -1558,7 +1565,7 @@ class TestMSGraphFolderFallback(unittest.TestCase): def testWellKnownFolderFallback(self): connection = MSGraphConnection.__new__(MSGraphConnection) connection.mailbox_name = "shared@example.com" - connection._client = _FakeGraphClient() + connection._client = _FakeGraphClient() # type: ignore[assignment] connection._request_with_retries = MagicMock( side_effect=lambda method_name, *args, **kwargs: getattr( connection._client, method_name @@ -1578,7 +1585,7 @@ class TestMSGraphFolderFallback(unittest.TestCase): def testUnknownFolderStillFails(self): connection = MSGraphConnection.__new__(MSGraphConnection) connection.mailbox_name = "shared@example.com" - connection._client = _FakeGraphClient() + connection._client = _FakeGraphClient() # type: ignore[assignment] connection._request_with_retries = MagicMock( side_effect=lambda method_name, *args, **kwargs: getattr( connection._client, method_name @@ -1910,5 +1917,320 @@ certificate_path = /tmp/msgraph-cert.pem mock_get_mailbox_reports.assert_not_called() +class TestSighupReload(unittest.TestCase): + """Tests for SIGHUP-driven configuration reload in watch mode.""" + + _BASE_CONFIG = """[general] +silent = true + +[imap] +host = imap.example.com +user = user +password = pass + +[mailbox] +watch = true +""" + + @unittest.skipUnless( + hasattr(signal, "SIGHUP"), + "SIGHUP not available on this platform", + ) + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testSighupTriggersReloadAndWatchRestarts( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """SIGHUP causes watch to return, config is re-parsed, and watch restarts.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + def parse_side_effect(config_file, opts): + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + return None + + mock_parse_config.side_effect = parse_side_effect + mock_init_clients.return_value = {} + + call_count = [0] + + def watch_side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # Simulate SIGHUP arriving while watch is running + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return # Normal return — reload loop will continue + else: + raise FileExistsError("stop-watch-loop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit) as cm: + parsedmarc.cli._main() + + # Exited with code 1 (from FileExistsError handler) + self.assertEqual(cm.exception.code, 1) + # watch_inbox was called twice: initial run + after reload + self.assertEqual(mock_watch.call_count, 2) + # _parse_config_file called for initial load + reload + self.assertGreaterEqual(mock_parse_config.call_count, 2) + + @unittest.skipUnless( + hasattr(signal, "SIGHUP"), + "SIGHUP not available on this platform", + ) + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testInvalidConfigOnReloadKeepsPreviousState( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """A failing reload leaves opts and clients unchanged.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + # Initial parse sets required opts; reload parse raises + initial_map = {"prefix_": ["example.com"]} + call_count = [0] + + def parse_side_effect(config_file, opts): + call_count[0] += 1 + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + if call_count[0] == 1: + return initial_map + raise RuntimeError("bad config") + + mock_parse_config.side_effect = parse_side_effect + + initial_clients = {"s3_client": MagicMock()} + mock_init_clients.return_value = initial_clients + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit) as cm: + parsedmarc.cli._main() + + self.assertEqual(cm.exception.code, 1) + # watch was still called twice (reload loop continued after failed reload) + self.assertEqual(mock_watch.call_count, 2) + # The failed reload must not have closed the original clients + initial_clients["s3_client"].close.assert_not_called() + + @unittest.skipUnless( + hasattr(signal, "SIGHUP"), + "SIGHUP not available on this platform", + ) + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli._parse_config_file") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testReloadClosesOldClients( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_parse_config, + mock_init_clients, + ): + """Successful reload closes the old output clients before replacing them.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + def parse_side_effect(config_file, opts): + opts.imap_host = "imap.example.com" + opts.imap_user = "user" + opts.imap_password = "pass" + opts.mailbox_watch = True + return None + + mock_parse_config.side_effect = parse_side_effect + + old_client = MagicMock() + new_client = MagicMock() + init_call = [0] + + def init_side_effect(opts): + init_call[0] += 1 + if init_call[0] == 1: + return {"kafka_client": old_client} + return {"kafka_client": new_client} + + mock_init_clients.side_effect = init_side_effect + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(self._BASE_CONFIG) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit): + parsedmarc.cli._main() + + # Old client must have been closed when reload succeeded + old_client.close.assert_called_once() + + @unittest.skipUnless( + hasattr(signal, "SIGHUP"), + "SIGHUP not available on this platform", + ) + @patch("parsedmarc.cli._init_output_clients") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.watch_inbox") + @patch("parsedmarc.cli.IMAPConnection") + def testRemovedConfigSectionTakesEffectOnReload( + self, + mock_imap, + mock_watch, + mock_get_reports, + mock_init_clients, + ): + """Removing a config section on reload resets that option to its default.""" + import signal as signal_module + + mock_imap.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + mock_init_clients.return_value = {} + + # First config sets kafka_hosts (with required topics); second removes it. + config_v1 = ( + self._BASE_CONFIG + + "\n[kafka]\nhosts = kafka.example.com:9092\n" + + "aggregate_topic = dmarc_agg\n" + + "forensic_topic = dmarc_forensic\n" + + "smtp_tls_topic = smtp_tls\n" + ) + config_v2 = self._BASE_CONFIG # no [kafka] section + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(config_v1) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + watch_calls = [0] + + def watch_side_effect(*args, **kwargs): + watch_calls[0] += 1 + if watch_calls[0] == 1: + # Rewrite config to remove kafka before triggering reload + with open(cfg_path, "w") as f: + f.write(config_v2) + if hasattr(signal_module, "SIGHUP"): + import os + + os.kill(os.getpid(), signal_module.SIGHUP) + return + else: + raise FileExistsError("stop") + + mock_watch.side_effect = watch_side_effect + + # Capture opts used on each _init_output_clients call + init_opts_captures = [] + + def init_side_effect(opts): + from argparse import Namespace as NS + + init_opts_captures.append(NS(**vars(opts))) + return {} + + mock_init_clients.side_effect = init_side_effect + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + with self.assertRaises(SystemExit): + parsedmarc.cli._main() + + # First init: kafka_hosts should be set from v1 config + self.assertIsNotNone(init_opts_captures[0].kafka_hosts) + # Second init (after reload with v2 config): kafka_hosts should be None + self.assertIsNone(init_opts_captures[1].kafka_hosts) + + if __name__ == "__main__": unittest.main(verbosity=2)