diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b783d4..4cf9be8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## 9.3.1 + +### Breaking changes + +- Elasticsearch and OpenSearch now verify SSL certificates by default when `ssl = True`, even without a `cert_path` +- Added `skip_certificate_verification` option to the `elasticsearch` and `opensearch` configuration sections for consistency with `splunk_hec` + +### Fixed + +- Splunk HEC `skip_certificate_verification` now works correctly +- SMTP TLS reports no longer fail when saving to multiple output targets (e.g. Elasticsearch and OpenSearch) due to in-place mutation of the report dict +- Output client initialization errors now identify which module failed (e.g. "OpenSearch: ConnectionError..." instead of generic "Output client error") + ## 9.3.0 ### Added diff --git a/docs/source/usage.md b/docs/source/usage.md index 2883f13..e920c53 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -273,6 +273,8 @@ The full set of configuration options are: (Default: `True`) - `timeout` - float: Timeout in seconds (Default: 60) - `cert_path` - str: Path to a trusted certificates + - `skip_certificate_verification` - bool: Skip certificate + verification (not recommended) - `index_suffix` - str: A suffix to apply to the index names - `index_prefix` - str: A prefix to apply to the index names - `monthly_indexes` - bool: Use monthly indexes instead of daily indexes @@ -300,6 +302,8 @@ The full set of configuration options are: (Default: `True`) - `timeout` - float: Timeout in seconds (Default: 60) - `cert_path` - str: Path to a trusted certificates + - `skip_certificate_verification` - bool: Skip certificate + verification (not recommended) - `index_suffix` - str: A suffix to apply to the index names - `index_prefix` - str: A prefix to apply to the index names - `monthly_indexes` - bool: Use monthly indexes instead of daily indexes diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index b9ae3da..6e1beed 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -505,6 +505,10 @@ def _parse_config_file(config_file, opts): opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) if "cert_path" in elasticsearch_config: opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] + if "skip_certificate_verification" in elasticsearch_config: + opts.elasticsearch_skip_certificate_verification = bool( + elasticsearch_config.getboolean("skip_certificate_verification") + ) if "user" in elasticsearch_config: opts.elasticsearch_username = elasticsearch_config["user"] if "password" in elasticsearch_config: @@ -544,6 +548,10 @@ def _parse_config_file(config_file, opts): opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) if "cert_path" in opensearch_config: opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] + if "skip_certificate_verification" in opensearch_config: + opts.opensearch_skip_certificate_verification = bool( + opensearch_config.getboolean("skip_certificate_verification") + ) if "user" in opensearch_config: opts.opensearch_username = opensearch_config["user"] if "password" in opensearch_config: @@ -853,77 +861,95 @@ def _init_output_clients(opts): """ clients = {} - if opts.s3_bucket: - clients["s3_client"] = s3.S3Client( - bucket_name=opts.s3_bucket, - bucket_path=opts.s3_path, - region_name=opts.s3_region_name, - endpoint_url=opts.s3_endpoint_url, - access_key_id=opts.s3_access_key_id, - secret_access_key=opts.s3_secret_access_key, - ) + try: + if opts.s3_bucket: + clients["s3_client"] = s3.S3Client( + bucket_name=opts.s3_bucket, + bucket_path=opts.s3_path, + region_name=opts.s3_region_name, + endpoint_url=opts.s3_endpoint_url, + access_key_id=opts.s3_access_key_id, + secret_access_key=opts.s3_secret_access_key, + ) + except Exception as e: + raise RuntimeError(f"S3: {e}") from e - if opts.syslog_server: - clients["syslog_client"] = syslog.SyslogClient( - server_name=opts.syslog_server, - server_port=int(opts.syslog_port), - protocol=opts.syslog_protocol or "udp", - cafile_path=opts.syslog_cafile_path, - certfile_path=opts.syslog_certfile_path, - keyfile_path=opts.syslog_keyfile_path, - timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, - retry_attempts=opts.syslog_retry_attempts - if opts.syslog_retry_attempts is not None - else 3, - retry_delay=opts.syslog_retry_delay - if opts.syslog_retry_delay is not None - else 5, - ) + try: + if opts.syslog_server: + clients["syslog_client"] = syslog.SyslogClient( + server_name=opts.syslog_server, + server_port=int(opts.syslog_port), + protocol=opts.syslog_protocol or "udp", + cafile_path=opts.syslog_cafile_path, + certfile_path=opts.syslog_certfile_path, + keyfile_path=opts.syslog_keyfile_path, + timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0, + retry_attempts=opts.syslog_retry_attempts + if opts.syslog_retry_attempts is not None + else 3, + retry_delay=opts.syslog_retry_delay + if opts.syslog_retry_delay is not None + else 5, + ) + except Exception as e: + raise RuntimeError(f"Syslog: {e}") from e if opts.hec: if opts.hec_token is None or opts.hec_index is None: raise ConfigurationError( "HEC token and HEC index are required when using HEC URL" ) - verify = True - if opts.hec_skip_certificate_verification: - verify = False - clients["hec_client"] = splunk.HECClient( - opts.hec, opts.hec_token, opts.hec_index, verify=verify - ) + try: + verify = True + if opts.hec_skip_certificate_verification: + verify = False + clients["hec_client"] = splunk.HECClient( + opts.hec, opts.hec_token, opts.hec_index, verify=verify + ) + except Exception as e: + raise RuntimeError(f"Splunk HEC: {e}") from e - if opts.kafka_hosts: - ssl_context = None - if opts.kafka_skip_certificate_verification: - logger.debug("Skipping Kafka certificate verification") - ssl_context = create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = CERT_NONE - clients["kafka_client"] = kafkaclient.KafkaClient( - opts.kafka_hosts, - username=opts.kafka_username, - password=opts.kafka_password, - ssl_context=ssl_context, - ) + try: + if opts.kafka_hosts: + ssl_context = None + if opts.kafka_skip_certificate_verification: + logger.debug("Skipping Kafka certificate verification") + ssl_context = create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = CERT_NONE + clients["kafka_client"] = kafkaclient.KafkaClient( + opts.kafka_hosts, + username=opts.kafka_username, + password=opts.kafka_password, + ssl_context=ssl_context, + ) + except Exception as e: + raise RuntimeError(f"Kafka: {e}") from e - if opts.gelf_host: - clients["gelf_client"] = gelf.GelfClient( - host=opts.gelf_host, - port=int(opts.gelf_port), - mode=opts.gelf_mode, - ) + try: + if opts.gelf_host: + clients["gelf_client"] = gelf.GelfClient( + host=opts.gelf_host, + port=int(opts.gelf_port), + mode=opts.gelf_mode, + ) + except Exception as e: + raise RuntimeError(f"GELF: {e}") from e - if ( - opts.webhook_aggregate_url - or opts.webhook_forensic_url - or opts.webhook_smtp_tls_url - ): - clients["webhook_client"] = webhook.WebhookClient( - aggregate_url=opts.webhook_aggregate_url, - forensic_url=opts.webhook_forensic_url, - smtp_tls_url=opts.webhook_smtp_tls_url, - timeout=opts.webhook_timeout, - ) + try: + if ( + opts.webhook_aggregate_url + or opts.webhook_forensic_url + or opts.webhook_smtp_tls_url + ): + clients["webhook_client"] = webhook.WebhookClient( + aggregate_url=opts.webhook_aggregate_url, + forensic_url=opts.webhook_forensic_url, + smtp_tls_url=opts.webhook_smtp_tls_url, + timeout=opts.webhook_timeout, + ) + except Exception as e: + raise RuntimeError(f"Webhook: {e}") from e # Elasticsearch and OpenSearch mutate module-level global state via # connections.create_connection(), which cannot be rolled back if a later @@ -931,76 +957,84 @@ def _init_output_clients(opts): # successfully first; this minimises the window for partial-init problems # during config reload. if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: - if opts.elasticsearch_hosts: - es_aggregate_index = "dmarc_aggregate" - es_forensic_index = "dmarc_forensic" - es_smtp_tls_index = "smtp_tls" - if opts.elasticsearch_index_suffix: - suffix = opts.elasticsearch_index_suffix - es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) - es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) - es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) - if opts.elasticsearch_index_prefix: - prefix = opts.elasticsearch_index_prefix - es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) - es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) - es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) - elastic_timeout_value = ( - float(opts.elasticsearch_timeout) - if opts.elasticsearch_timeout is not None - else 60.0 - ) - elastic.set_hosts( - opts.elasticsearch_hosts, - use_ssl=opts.elasticsearch_ssl, - ssl_cert_path=opts.elasticsearch_ssl_cert_path, - username=opts.elasticsearch_username, - password=opts.elasticsearch_password, - api_key=opts.elasticsearch_api_key, - timeout=elastic_timeout_value, - ) - elastic.migrate_indexes( - aggregate_indexes=[es_aggregate_index], - forensic_indexes=[es_forensic_index], - ) - clients["elasticsearch"] = _ElasticsearchHandle() + try: + if opts.elasticsearch_hosts: + es_aggregate_index = "dmarc_aggregate" + es_forensic_index = "dmarc_forensic" + es_smtp_tls_index = "smtp_tls" + if opts.elasticsearch_index_suffix: + suffix = opts.elasticsearch_index_suffix + es_aggregate_index = "{0}_{1}".format(es_aggregate_index, suffix) + es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) + es_smtp_tls_index = "{0}_{1}".format(es_smtp_tls_index, suffix) + if opts.elasticsearch_index_prefix: + prefix = opts.elasticsearch_index_prefix + es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) + es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) + es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) + elastic_timeout_value = ( + float(opts.elasticsearch_timeout) + if opts.elasticsearch_timeout is not None + else 60.0 + ) + elastic.set_hosts( + opts.elasticsearch_hosts, + use_ssl=opts.elasticsearch_ssl, + ssl_cert_path=opts.elasticsearch_ssl_cert_path, + skip_certificate_verification=opts.elasticsearch_skip_certificate_verification, + username=opts.elasticsearch_username, + password=opts.elasticsearch_password, + api_key=opts.elasticsearch_api_key, + timeout=elastic_timeout_value, + ) + elastic.migrate_indexes( + aggregate_indexes=[es_aggregate_index], + forensic_indexes=[es_forensic_index], + ) + clients["elasticsearch"] = _ElasticsearchHandle() + except Exception as e: + raise RuntimeError(f"Elasticsearch: {e}") from e - if opts.opensearch_hosts: - os_aggregate_index = "dmarc_aggregate" - os_forensic_index = "dmarc_forensic" - os_smtp_tls_index = "smtp_tls" - if opts.opensearch_index_suffix: - suffix = opts.opensearch_index_suffix - os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) - os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) - os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) - if opts.opensearch_index_prefix: - prefix = opts.opensearch_index_prefix - os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) - os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) - os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) - opensearch_timeout_value = ( - float(opts.opensearch_timeout) - if opts.opensearch_timeout is not None - else 60.0 - ) - opensearch.set_hosts( - opts.opensearch_hosts, - use_ssl=opts.opensearch_ssl, - ssl_cert_path=opts.opensearch_ssl_cert_path, - username=opts.opensearch_username, - password=opts.opensearch_password, - api_key=opts.opensearch_api_key, - timeout=opensearch_timeout_value, - auth_type=opts.opensearch_auth_type, - aws_region=opts.opensearch_aws_region, - aws_service=opts.opensearch_aws_service, - ) - opensearch.migrate_indexes( - aggregate_indexes=[os_aggregate_index], - forensic_indexes=[os_forensic_index], - ) - clients["opensearch"] = _OpenSearchHandle() + try: + if opts.opensearch_hosts: + os_aggregate_index = "dmarc_aggregate" + os_forensic_index = "dmarc_forensic" + os_smtp_tls_index = "smtp_tls" + if opts.opensearch_index_suffix: + suffix = opts.opensearch_index_suffix + os_aggregate_index = "{0}_{1}".format(os_aggregate_index, suffix) + os_forensic_index = "{0}_{1}".format(os_forensic_index, suffix) + os_smtp_tls_index = "{0}_{1}".format(os_smtp_tls_index, suffix) + if opts.opensearch_index_prefix: + prefix = opts.opensearch_index_prefix + os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) + os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) + os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) + opensearch_timeout_value = ( + float(opts.opensearch_timeout) + if opts.opensearch_timeout is not None + else 60.0 + ) + opensearch.set_hosts( + opts.opensearch_hosts, + use_ssl=opts.opensearch_ssl, + ssl_cert_path=opts.opensearch_ssl_cert_path, + skip_certificate_verification=opts.opensearch_skip_certificate_verification, + username=opts.opensearch_username, + password=opts.opensearch_password, + api_key=opts.opensearch_api_key, + timeout=opensearch_timeout_value, + auth_type=opts.opensearch_auth_type, + aws_region=opts.opensearch_aws_region, + aws_service=opts.opensearch_aws_service, + ) + opensearch.migrate_indexes( + aggregate_indexes=[os_aggregate_index], + forensic_indexes=[os_forensic_index], + ) + clients["opensearch"] = _OpenSearchHandle() + except Exception as e: + raise RuntimeError(f"OpenSearch: {e}") from e return clients @@ -1529,6 +1563,7 @@ def _main(): elasticsearch_index_prefix=None, elasticsearch_ssl=True, elasticsearch_ssl_cert_path=None, + elasticsearch_skip_certificate_verification=False, elasticsearch_monthly_indexes=False, elasticsearch_username=None, elasticsearch_password=None, @@ -1541,6 +1576,7 @@ def _main(): opensearch_index_prefix=None, opensearch_ssl=True, opensearch_ssl_cert_path=None, + opensearch_skip_certificate_verification=False, opensearch_monthly_indexes=False, opensearch_username=None, opensearch_password=None, @@ -1666,12 +1702,6 @@ def _main(): # Initialize output clients try: clients = _init_output_clients(opts) - except elastic.ElasticsearchError as e: - logger.exception("Elasticsearch Error: {0}".format(e)) - exit(1) - except opensearch.OpenSearchError as e: - logger.exception("OpenSearch Error: {0}".format(e)) - exit(1) except ConfigurationError as e: logger.critical(str(e)) exit(1) diff --git a/parsedmarc/constants.py b/parsedmarc/constants.py index 38c0044..823c2f9 100644 --- a/parsedmarc/constants.py +++ b/parsedmarc/constants.py @@ -1,3 +1,3 @@ -__version__ = "9.3.0" +__version__ = "9.3.1" USER_AGENT = f"parsedmarc/{__version__}" diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index c823e07..f2e56f2 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -268,6 +268,7 @@ def set_hosts( *, use_ssl: bool = False, ssl_cert_path: Optional[str] = None, + skip_certificate_verification: bool = False, username: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, @@ -280,6 +281,7 @@ def set_hosts( hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs use_ssl (bool): Use an HTTPS connection to the server ssl_cert_path (str): Path to the certificate chain + skip_certificate_verification (bool): Skip certificate verification username (str): The username to use for authentication password (str): The password to use for authentication api_key (str): The Base64 encoded API key to use for authentication @@ -291,10 +293,11 @@ def set_hosts( if use_ssl: conn_params["use_ssl"] = True if ssl_cert_path: - conn_params["verify_certs"] = True conn_params["ca_certs"] = ssl_cert_path - else: + if skip_certificate_verification: conn_params["verify_certs"] = False + else: + conn_params["verify_certs"] = True if username and password: conn_params["http_auth"] = username + ":" + password if api_key: diff --git a/parsedmarc/opensearch.py b/parsedmarc/opensearch.py index 0e27141..c99ea5b 100644 --- a/parsedmarc/opensearch.py +++ b/parsedmarc/opensearch.py @@ -271,6 +271,7 @@ def set_hosts( *, use_ssl: Optional[bool] = False, ssl_cert_path: Optional[str] = None, + skip_certificate_verification: bool = False, username: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, @@ -286,6 +287,7 @@ def set_hosts( hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs use_ssl (bool): Use an HTTPS connection to the server ssl_cert_path (str): Path to the certificate chain + skip_certificate_verification (bool): Skip certificate verification username (str): The username to use for authentication password (str): The password to use for authentication api_key (str): The Base64 encoded API key to use for authentication @@ -300,10 +302,11 @@ def set_hosts( if use_ssl: conn_params["use_ssl"] = True if ssl_cert_path: - conn_params["verify_certs"] = True conn_params["ca_certs"] = ssl_cert_path - else: + if skip_certificate_verification: conn_params["verify_certs"] = False + else: + conn_params["verify_certs"] = True normalized_auth_type = (auth_type or "basic").strip().lower() if normalized_auth_type == "awssigv4": if not aws_region: diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index f96e000..ff660f0 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -58,7 +58,7 @@ class HECClient(object): self.source = source self.session = requests.Session() self.timeout = timeout - self.session.verify = verify + self.verify = verify self._common_data: dict[str, Union[str, int, float, dict]] = dict( host=self.host, source=self.source, index=self.index ) @@ -124,10 +124,12 @@ class HECClient(object): data["event"] = new_report.copy() json_str += "{0}\n".format(json.dumps(data)) - if not self.session.verify: + if not self.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: - response = self.session.post(self.url, data=json_str, timeout=self.timeout) + response = self.session.post( + self.url, data=json_str, verify=self.verify, timeout=self.timeout + ) response = response.json() except Exception as e: raise SplunkError(e.__str__()) @@ -161,10 +163,12 @@ class HECClient(object): data["event"] = report.copy() json_str += "{0}\n".format(json.dumps(data)) - if not self.session.verify: + if not self.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: - response = self.session.post(self.url, data=json_str, timeout=self.timeout) + response = self.session.post( + self.url, data=json_str, verify=self.verify, timeout=self.timeout + ) response = response.json() except Exception as e: raise SplunkError(e.__str__()) @@ -198,10 +202,12 @@ class HECClient(object): data["event"] = report.copy() json_str += "{0}\n".format(json.dumps(data)) - if not self.session.verify: + if not self.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: - response = self.session.post(self.url, data=json_str, timeout=self.timeout) + response = self.session.post( + self.url, data=json_str, verify=self.verify, timeout=self.timeout + ) response = response.json() except Exception as e: raise SplunkError(e.__str__()) diff --git a/tests.py b/tests.py index 1f29852..538079a 100755 --- a/tests.py +++ b/tests.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, print_function, unicode_literals +import io import os import signal import sys @@ -1277,6 +1278,22 @@ class TestImapFallbacks(unittest.TestCase): class TestMailboxWatchSince(unittest.TestCase): + def setUp(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = True + self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO) + self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO) + self._stdout_patch.start() + self._stderr_patch.start() + + def tearDown(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = False + self._stderr_patch.stop() + self._stdout_patch.stop() + def testWatchInboxPassesSinceToMailboxFetch(self): mailbox_connection = SimpleNamespace() @@ -1369,6 +1386,22 @@ class _DummyMailboxConnection(parsedmarc.MailboxConnection): class TestMailboxPerformance(unittest.TestCase): + def setUp(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = True + self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO) + self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO) + self._stdout_patch.start() + self._stderr_patch.start() + + def tearDown(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = False + self._stderr_patch.stop() + self._stdout_patch.stop() + def testBatchModeAvoidsExtraFullFetch(self): connection = _DummyMailboxConnection() parsedmarc.get_dmarc_reports_from_mailbox( @@ -1918,6 +1951,22 @@ certificate_path = /tmp/msgraph-cert.pem class TestSighupReload(unittest.TestCase): """Tests for SIGHUP-driven configuration reload in watch mode.""" + def setUp(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = True + self._stdout_patch = patch("sys.stdout", new_callable=io.StringIO) + self._stderr_patch = patch("sys.stderr", new_callable=io.StringIO) + self._stdout_patch.start() + self._stderr_patch.start() + + def tearDown(self): + from parsedmarc.log import logger as _logger + + _logger.disabled = False + self._stderr_patch.stop() + self._stdout_patch.stop() + _BASE_CONFIG = """[general] silent = true