From 8567c73358cf5cc1ed0243e8840e1d5398eaad48 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Fri, 20 Mar 2026 22:19:28 -0400 Subject: [PATCH] Enhance resource management: add close methods for S3Client and HECClient, and improve IMAP connection handling during IDLE. Update CHANGELOG.md for config reload improvements and bug fixes. --- CHANGELOG.md | 24 +++++- parsedmarc/cli.py | 184 +++++++++++++++++++++++----------------- parsedmarc/mail/imap.py | 5 ++ parsedmarc/s3.py | 8 ++ parsedmarc/splunk.py | 4 + 5 files changed, 146 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4e3f91..be3de52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,15 +11,35 @@ systemd. On a successful reload, old output clients are closed and recreated. On a failed reload, the previous configuration remains fully active. -- `close()` methods on GelfClient, KafkaClient, SyslogClient, and - WebhookClient for clean resource teardown on reload. +- `close()` methods on GelfClient, KafkaClient, SyslogClient, + WebhookClient, HECClient, and S3Client for clean resource teardown + on reload. - `should_reload` parameter on all `MailboxConnection.watch()` implementations and `watch_inbox()` to ensure SIGHUP never triggers a new email batch mid-reload. +- Elasticsearch and OpenSearch connections are now tracked and cleaned + up on reload via `_close_output_clients()`. - Extracted `_parse_config_file()` and `_init_output_clients()` from `_main()` in `cli.py` to support config reload and reduce code duplication. +### Fixed + +- `get_index_prefix()` crashed on forensic reports with `TypeError` + due to `report()` instead of `report[]` dict access. +- Missing `exit(1)` after IMAP user/password validation failure + allowed execution to continue with `None` credentials. +- IMAP `watch()` leaked a connection on every IDLE cycle by not + closing the old `IMAPClient` before replacing it. +- Resource leak in `_init_output_clients()` when Splunk HEC + configuration is invalid — the partially-constructed HEC client + is now cleaned up on error. +- Elasticsearch/OpenSearch `set_hosts()` global state was not + rollback-safe on reload failure — init now runs last so other + client failures don't leave stale global connections. +- `active_log_file` was not initialized at startup, causing the + first reload to unnecessarily remove and re-add the FileHandler. + ## 9.2.1 ### Added diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 5212847..c1df138 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -747,15 +747,15 @@ def _parse_config_file(config_file, opts): if "oauth2_port" in gmail_api_config: opts.gmail_api_oauth2_port = gmail_api_config.getint("oauth2_port", 8080) if "auth_mode" in gmail_api_config: - opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip() + opts.gmail_api_auth_mode = gmail_api_config["auth_mode"].strip() if "service_account_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( + opts.gmail_api_service_account_user = gmail_api_config[ "service_account_user" - ).strip() + ].strip() elif "delegated_user" in gmail_api_config: - opts.gmail_api_service_account_user = gmail_api_config.get( + opts.gmail_api_service_account_user = gmail_api_config[ "delegated_user" - ).strip() + ].strip() if "maildir" in config.sections(): maildir_api_config = config["maildir"] @@ -810,6 +810,26 @@ def _parse_config_file(config_file, opts): return index_prefix_domain_map +class _ElasticsearchHandle: + """Sentinel so Elasticsearch participates in _close_output_clients.""" + + def close(self): + try: + elastic.connections.remove_connection("default") + except Exception: + pass + + +class _OpenSearchHandle: + """Sentinel so OpenSearch participates in _close_output_clients.""" + + def close(self): + try: + opensearch.connections.remove_connection("default") + except Exception: + pass + + def _init_output_clients(opts): """Create output clients based on current opts. @@ -821,76 +841,6 @@ def _init_output_clients(opts): """ clients = {} - if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: - if opts.elasticsearch_hosts: - es_aggregate_index = "dmarc_aggregate" - es_forensic_index = "dmarc_forensic" - es_smtp_tls_index = "smtp_tls" - if opts.elasticsearch_index_suffix: - suffix = opts.elasticsearch_index_suffix - es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) - es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) - es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) - if opts.elasticsearch_index_prefix: - prefix = opts.elasticsearch_index_prefix - es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) - es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) - es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) - elastic_timeout_value = ( - float(opts.elasticsearch_timeout) - if opts.elasticsearch_timeout is not None - else 60.0 - ) - elastic.set_hosts( - opts.elasticsearch_hosts, - use_ssl=opts.elasticsearch_ssl, - ssl_cert_path=opts.elasticsearch_ssl_cert_path, - username=opts.elasticsearch_username, - password=opts.elasticsearch_password, - api_key=opts.elasticsearch_api_key, - timeout=elastic_timeout_value, - ) - elastic.migrate_indexes( - aggregate_indexes=[es_aggregate_index], - forensic_indexes=[es_forensic_index], - ) - - if opts.opensearch_hosts: - os_aggregate_index = "dmarc_aggregate" - os_forensic_index = "dmarc_forensic" - os_smtp_tls_index = "smtp_tls" - if opts.opensearch_index_suffix: - suffix = opts.opensearch_index_suffix - os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) - os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) - os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) - if opts.opensearch_index_prefix: - prefix = opts.opensearch_index_prefix - os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) - os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) - os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) - opensearch_timeout_value = ( - float(opts.opensearch_timeout) - if opts.opensearch_timeout is not None - else 60.0 - ) - opensearch.set_hosts( - opts.opensearch_hosts, - use_ssl=opts.opensearch_ssl, - ssl_cert_path=opts.opensearch_ssl_cert_path, - username=opts.opensearch_username, - password=opts.opensearch_password, - api_key=opts.opensearch_api_key, - timeout=opensearch_timeout_value, - auth_type=opts.opensearch_auth_type, - aws_region=opts.opensearch_aws_region, - aws_service=opts.opensearch_aws_service, - ) - opensearch.migrate_indexes( - aggregate_indexes=[os_aggregate_index], - forensic_indexes=[os_forensic_index], - ) - if opts.s3_bucket: clients["s3_client"] = s3.S3Client( bucket_name=opts.s3_bucket, @@ -963,6 +913,83 @@ def _init_output_clients(opts): timeout=opts.webhook_timeout, ) + # Elasticsearch and OpenSearch mutate module-level global state via + # connections.create_connection(), which cannot be rolled back if a later + # step fails. Initialise them last so that all other clients are created + # successfully first; this minimises the window for partial-init problems + # during config reload. + if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: + if opts.elasticsearch_hosts: + es_aggregate_index = "dmarc_aggregate" + es_forensic_index = "dmarc_forensic" + es_smtp_tls_index = "smtp_tls" + if opts.elasticsearch_index_suffix: + suffix = opts.elasticsearch_index_suffix + es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) + es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) + es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) + if opts.elasticsearch_index_prefix: + prefix = opts.elasticsearch_index_prefix + es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) + es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) + es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) + elastic_timeout_value = ( + float(opts.elasticsearch_timeout) + if opts.elasticsearch_timeout is not None + else 60.0 + ) + elastic.set_hosts( + opts.elasticsearch_hosts, + use_ssl=opts.elasticsearch_ssl, + ssl_cert_path=opts.elasticsearch_ssl_cert_path, + username=opts.elasticsearch_username, + password=opts.elasticsearch_password, + api_key=opts.elasticsearch_api_key, + timeout=elastic_timeout_value, + ) + elastic.migrate_indexes( + aggregate_indexes=[es_aggregate_index], + forensic_indexes=[es_forensic_index], + ) + clients["elasticsearch"] = _ElasticsearchHandle() + + if opts.opensearch_hosts: + os_aggregate_index = "dmarc_aggregate" + os_forensic_index = "dmarc_forensic" + os_smtp_tls_index = "smtp_tls" + if opts.opensearch_index_suffix: + suffix = opts.opensearch_index_suffix + os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) + os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) + os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) + if opts.opensearch_index_prefix: + prefix = opts.opensearch_index_prefix + os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) + os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) + os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) + opensearch_timeout_value = ( + float(opts.opensearch_timeout) + if opts.opensearch_timeout is not None + else 60.0 + ) + opensearch.set_hosts( + opts.opensearch_hosts, + use_ssl=opts.opensearch_ssl, + ssl_cert_path=opts.opensearch_ssl_cert_path, + username=opts.opensearch_username, + password=opts.opensearch_password, + api_key=opts.opensearch_api_key, + timeout=opensearch_timeout_value, + auth_type=opts.opensearch_auth_type, + aws_region=opts.opensearch_aws_region, + aws_service=opts.opensearch_aws_service, + ) + opensearch.migrate_indexes( + aggregate_indexes=[os_aggregate_index], + forensic_indexes=[os_forensic_index], + ) + clients["opensearch"] = _OpenSearchHandle() + return clients @@ -993,7 +1020,7 @@ def _main(): if "policy_published" in report: domain = report["policy_published"]["domain"] elif "reported_domain" in report: - domain = report("reported_domain") + domain = report["reported_domain"] elif "policies" in report: domain = report["policies"][0]["domain"] if domain: @@ -1610,6 +1637,8 @@ def _main(): except Exception as error: logger.warning("Unable to write to log file: {}".format(error)) + opts.active_log_file = opts.log_file + if ( opts.imap_host is None and opts.graph_client_id is None @@ -1768,8 +1797,9 @@ def _main(): try: if opts.imap_user is None or opts.imap_password is None: logger.error( - "IMAP user and password must be specified ifhost is specified" + "IMAP user and password must be specified if host is specified" ) + exit(1) ssl = True verify = True diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index b084bd9..1fd4430 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -90,7 +90,12 @@ class IMAPConnection(MailboxConnection): # IDLE callback sends IMAPClient object, # send back the imap connection object instead def idle_callback_wrapper(client: IMAPClient): + old_client = self._client self._client = client + try: + old_client.logout() + except Exception: + pass check_callback(self) while True: diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index d6778fa..99e03b3 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -93,3 +93,11 @@ class S3Client(object): self.bucket.put_object( Body=json.dumps(report), Key=object_path, Metadata=object_metadata ) + + def close(self): + """Clean up the boto3 resource.""" + try: + if self.s3.meta is not None: + self.s3.meta.client.close() + except Exception: + pass diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 28f7c0f..f96e000 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -207,3 +207,7 @@ class HECClient(object): raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"]) + + def close(self): + """Close the underlying HTTP session.""" + self.session.close()