From f2febf21d368da4699944836084f214de569a28f Mon Sep 17 00:00:00 2001 From: Kili Date: Mon, 9 Mar 2026 22:35:38 +0100 Subject: [PATCH] Add fail_on_output_error CLI option for sink failures (#672) * Add fail-on-output-error option and CLI regression test * Broaden fail_on_output_error coverage for disabled and multi-sink paths --- docs/source/usage.md | 3 + parsedmarc/cli.py | 99 ++++++++++++++++----------- tests.py | 159 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 220 insertions(+), 41 deletions(-) diff --git a/docs/source/usage.md b/docs/source/usage.md index 2af1cc3..9d24e6b 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -146,6 +146,9 @@ The full set of configuration options are: - `dns_timeout` - float: DNS timeout period - `debug` - bool: Print debugging messages - `silent` - bool: Only print errors (Default: `True`) + - `fail_on_output_error` - bool: Exit with a non-zero status code if + any configured output destination fails while saving/publishing + reports (Default: `False`) - `log_file` - str: Write log messages to a file at this path - `n_procs` - int: Number of process to run in parallel when parsing in CLI mode (Default: `1`) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 599e507..b285c6d 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -194,6 +194,13 @@ def _main(): return None def process_reports(reports_): + output_errors = [] + + def log_output_error(destination, error): + message = f"{destination} Error: {error}" + logger.error(message) + output_errors.append(message) + indent_value = 2 if opts.prettify_json else None output_str = "{0}\n".format( json.dumps(reports_, ensure_ascii=False, indent=indent_value) @@ -230,11 +237,9 @@ def _main(): except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except elastic.ElasticsearchError as error_: - logger.error("Elasticsearch Error: {0}".format(error_.__str__())) + log_output_error("Elasticsearch", error_.__str__()) except Exception as error_: - logger.error( - "Elasticsearch exception error: {}".format(error_.__str__()) - ) + log_output_error("Elasticsearch exception", error_.__str__()) try: if opts.opensearch_hosts: @@ -252,11 +257,9 @@ def _main(): except opensearch.AlreadySaved as warning: logger.warning(warning.__str__()) except opensearch.OpenSearchError as error_: - logger.error("OpenSearch Error: {0}".format(error_.__str__())) + log_output_error("OpenSearch", error_.__str__()) except Exception as error_: - logger.error( - "OpenSearch exception error: {}".format(error_.__str__()) - ) + log_output_error("OpenSearch exception", error_.__str__()) try: if opts.kafka_hosts: @@ -264,25 +267,25 @@ def _main(): report, kafka_aggregate_topic ) except Exception as error_: - logger.error("Kafka Error: {0}".format(error_.__str__())) + log_output_error("Kafka", error_.__str__()) try: if opts.s3_bucket: s3_client.save_aggregate_report_to_s3(report) except Exception as error_: - logger.error("S3 Error: {0}".format(error_.__str__())) + log_output_error("S3", error_.__str__()) try: if opts.syslog_server: syslog_client.save_aggregate_report_to_syslog(report) except Exception as error_: - logger.error("Syslog Error: {0}".format(error_.__str__())) + log_output_error("Syslog", error_.__str__()) try: if opts.gelf_host: gelf_client.save_aggregate_report_to_gelf(report) except Exception as error_: - logger.error("GELF Error: {0}".format(error_.__str__())) + log_output_error("GELF", error_.__str__()) try: if opts.webhook_aggregate_url: @@ -291,7 +294,7 @@ def _main(): json.dumps(report, ensure_ascii=False, indent=indent_value) ) except Exception as error_: - logger.error("Webhook Error: {0}".format(error_.__str__())) + log_output_error("Webhook", error_.__str__()) if opts.hec: try: @@ -299,7 +302,7 @@ def _main(): if len(aggregate_reports_) > 0: hec_client.save_aggregate_reports_to_splunk(aggregate_reports_) except splunk.SplunkError as e: - logger.error("Splunk HEC error: {0}".format(e.__str__())) + log_output_error("Splunk HEC", e.__str__()) if opts.save_forensic: for report in reports_["forensic_reports"]: @@ -319,9 +322,9 @@ def _main(): except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except elastic.ElasticsearchError as error_: - logger.error("Elasticsearch Error: {0}".format(error_.__str__())) + log_output_error("Elasticsearch", error_.__str__()) except InvalidDMARCReport as error_: - logger.error(error_.__str__()) + log_output_error("Invalid DMARC report", error_.__str__()) try: shards = opts.opensearch_number_of_shards @@ -339,9 +342,9 @@ def _main(): except opensearch.AlreadySaved as warning: logger.warning(warning.__str__()) except opensearch.OpenSearchError as error_: - logger.error("OpenSearch Error: {0}".format(error_.__str__())) + log_output_error("OpenSearch", error_.__str__()) except InvalidDMARCReport as error_: - logger.error(error_.__str__()) + log_output_error("Invalid DMARC report", error_.__str__()) try: if opts.kafka_hosts: @@ -349,25 +352,25 @@ def _main(): report, kafka_forensic_topic ) except Exception as error_: - logger.error("Kafka Error: {0}".format(error_.__str__())) + log_output_error("Kafka", error_.__str__()) try: if opts.s3_bucket: s3_client.save_forensic_report_to_s3(report) except Exception as error_: - logger.error("S3 Error: {0}".format(error_.__str__())) + log_output_error("S3", error_.__str__()) try: if opts.syslog_server: syslog_client.save_forensic_report_to_syslog(report) except Exception as error_: - logger.error("Syslog Error: {0}".format(error_.__str__())) + log_output_error("Syslog", error_.__str__()) try: if opts.gelf_host: gelf_client.save_forensic_report_to_gelf(report) except Exception as error_: - logger.error("GELF Error: {0}".format(error_.__str__())) + log_output_error("GELF", error_.__str__()) try: if opts.webhook_forensic_url: @@ -376,7 +379,7 @@ def _main(): json.dumps(report, ensure_ascii=False, indent=indent_value) ) except Exception as error_: - logger.error("Webhook Error: {0}".format(error_.__str__())) + log_output_error("Webhook", error_.__str__()) if opts.hec: try: @@ -384,7 +387,7 @@ def _main(): if len(forensic_reports_) > 0: hec_client.save_forensic_reports_to_splunk(forensic_reports_) except splunk.SplunkError as e: - logger.error("Splunk HEC error: {0}".format(e.__str__())) + log_output_error("Splunk HEC", e.__str__()) if opts.save_smtp_tls: for report in reports_["smtp_tls_reports"]: @@ -404,9 +407,9 @@ def _main(): except elastic.AlreadySaved as warning: logger.warning(warning.__str__()) except elastic.ElasticsearchError as error_: - logger.error("Elasticsearch Error: {0}".format(error_.__str__())) + log_output_error("Elasticsearch", error_.__str__()) except InvalidDMARCReport as error_: - logger.error(error_.__str__()) + log_output_error("Invalid DMARC report", error_.__str__()) try: shards = opts.opensearch_number_of_shards @@ -424,9 +427,9 @@ def _main(): except opensearch.AlreadySaved as warning: logger.warning(warning.__str__()) except opensearch.OpenSearchError as error_: - logger.error("OpenSearch Error: {0}".format(error_.__str__())) + log_output_error("OpenSearch", error_.__str__()) except InvalidDMARCReport as error_: - logger.error(error_.__str__()) + log_output_error("Invalid DMARC report", error_.__str__()) try: if opts.kafka_hosts: @@ -434,25 +437,25 @@ def _main(): smtp_tls_reports, kafka_smtp_tls_topic ) except Exception as error_: - logger.error("Kafka Error: {0}".format(error_.__str__())) + log_output_error("Kafka", error_.__str__()) try: if opts.s3_bucket: s3_client.save_smtp_tls_report_to_s3(report) except Exception as error_: - logger.error("S3 Error: {0}".format(error_.__str__())) + log_output_error("S3", error_.__str__()) try: if opts.syslog_server: syslog_client.save_smtp_tls_report_to_syslog(report) except Exception as error_: - logger.error("Syslog Error: {0}".format(error_.__str__())) + log_output_error("Syslog", error_.__str__()) try: if opts.gelf_host: gelf_client.save_smtp_tls_report_to_gelf(report) except Exception as error_: - logger.error("GELF Error: {0}".format(error_.__str__())) + log_output_error("GELF", error_.__str__()) try: if opts.webhook_smtp_tls_url: @@ -461,7 +464,7 @@ def _main(): json.dumps(report, ensure_ascii=False, indent=indent_value) ) except Exception as error_: - logger.error("Webhook Error: {0}".format(error_.__str__())) + log_output_error("Webhook", error_.__str__()) if opts.hec: try: @@ -469,7 +472,7 @@ def _main(): if len(smtp_tls_reports_) > 0: hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_) except splunk.SplunkError as e: - logger.error("Splunk HEC error: {0}".format(e.__str__())) + log_output_error("Splunk HEC", e.__str__()) if opts.la_dce: try: @@ -490,14 +493,16 @@ def _main(): opts.save_smtp_tls, ) except loganalytics.LogAnalyticsException as e: - logger.error("Log Analytics error: {0}".format(e.__str__())) + log_output_error("Log Analytics", e.__str__()) except Exception as e: - logger.error( - "Unknown error occurred" - + " during the publishing" - + " to Log Analytics: " - + e.__str__() + log_output_error("Log Analytics", f"Unknown publishing error: {e}") + + if opts.fail_on_output_error and output_errors: + raise ParserError( + "Output destination failures detected: {0}".format( + " | ".join(output_errors) ) + ) arg_parser = ArgumentParser(description="Parses DMARC reports") arg_parser.add_argument( @@ -739,6 +744,7 @@ def _main(): webhook_smtp_tls_url=None, webhook_timeout=60, normalize_timespan_threshold_hours=24.0, + fail_on_output_error=False, ) args = arg_parser.parse_args() @@ -821,6 +827,10 @@ def _main(): opts.silent = bool(general_config.getboolean("silent")) if "warnings" in general_config: opts.warnings = bool(general_config.getboolean("warnings")) + if "fail_on_output_error" in general_config: + opts.fail_on_output_error = bool( + general_config.getboolean("fail_on_output_error") + ) if "log_file" in general_config: opts.log_file = general_config["log_file"] if "n_procs" in general_config: @@ -1834,7 +1844,11 @@ def _main(): "smtp_tls_reports": smtp_tls_reports, } - process_reports(parsing_results) + try: + process_reports(parsing_results) + except ParserError as error: + logger.error(error.__str__()) + exit(1) if opts.smtp_host: try: @@ -1890,6 +1904,9 @@ def _main(): except FileExistsError as error: logger.error("{0}".format(error.__str__())) exit(1) + except ParserError as error: + logger.error(error.__str__()) + exit(1) if __name__ == "__main__": diff --git a/tests.py b/tests.py index a0b5e5c..ecfbc88 100755 --- a/tests.py +++ b/tests.py @@ -279,6 +279,165 @@ aws_service = aoss self.assertEqual(mock_set_hosts.call_args.kwargs.get("auth_type"), "awssigv4") self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_region"), "eu-west-1") self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_service"), "aoss") + + @patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch") + @patch("parsedmarc.cli.elastic.migrate_indexes") + @patch("parsedmarc.cli.elastic.set_hosts") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.IMAPConnection") + def testFailOnOutputErrorExits( + self, + mock_imap_connection, + mock_get_reports, + _mock_set_hosts, + _mock_migrate_indexes, + mock_save_aggregate, + ): + """CLI should exit with code 1 when fail_on_output_error is enabled""" + mock_imap_connection.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], + "forensic_reports": [], + "smtp_tls_reports": [], + } + mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( + "simulated output failure" + ) + + config = """[general] +save_aggregate = true +fail_on_output_error = true +silent = true + +[imap] +host = imap.example.com +user = test-user +password = test-password + +[elasticsearch] +hosts = localhost +""" + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file: + config_file.write(config) + config_path = config_file.name + self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]): + with self.assertRaises(SystemExit) as ctx: + parsedmarc.cli._main() + + self.assertEqual(ctx.exception.code, 1) + mock_save_aggregate.assert_called_once() + + @patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch") + @patch("parsedmarc.cli.elastic.migrate_indexes") + @patch("parsedmarc.cli.elastic.set_hosts") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.IMAPConnection") + def testOutputErrorDoesNotExitWhenDisabled( + self, + mock_imap_connection, + mock_get_reports, + _mock_set_hosts, + _mock_migrate_indexes, + mock_save_aggregate, + ): + mock_imap_connection.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], + "forensic_reports": [], + "smtp_tls_reports": [], + } + mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( + "simulated output failure" + ) + + config = """[general] +save_aggregate = true +fail_on_output_error = false +silent = true + +[imap] +host = imap.example.com +user = test-user +password = test-password + +[elasticsearch] +hosts = localhost +""" + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file: + config_file.write(config) + config_path = config_file.name + self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]): + parsedmarc.cli._main() + + mock_save_aggregate.assert_called_once() + + @patch("parsedmarc.cli.opensearch.save_forensic_report_to_opensearch") + @patch("parsedmarc.cli.opensearch.migrate_indexes") + @patch("parsedmarc.cli.opensearch.set_hosts") + @patch("parsedmarc.cli.elastic.save_forensic_report_to_elasticsearch") + @patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch") + @patch("parsedmarc.cli.elastic.migrate_indexes") + @patch("parsedmarc.cli.elastic.set_hosts") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.IMAPConnection") + def testFailOnOutputErrorExitsWithMultipleSinkErrors( + self, + mock_imap_connection, + mock_get_reports, + _mock_es_set_hosts, + _mock_es_migrate, + mock_save_aggregate, + _mock_save_forensic_elastic, + _mock_os_set_hosts, + _mock_os_migrate, + mock_save_forensic_opensearch, + ): + mock_imap_connection.return_value = object() + mock_get_reports.return_value = { + "aggregate_reports": [{"policy_published": {"domain": "example.com"}}], + "forensic_reports": [{"reported_domain": "example.com"}], + "smtp_tls_reports": [], + } + mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError( + "aggregate sink failed" + ) + mock_save_forensic_opensearch.side_effect = parsedmarc.cli.opensearch.OpenSearchError( + "forensic sink failed" + ) + + config = """[general] +save_aggregate = true +save_forensic = true +fail_on_output_error = true +silent = true + +[imap] +host = imap.example.com +user = test-user +password = test-password + +[elasticsearch] +hosts = localhost + +[opensearch] +hosts = localhost +""" + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file: + config_file.write(config) + config_path = config_file.name + self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]): + with self.assertRaises(SystemExit) as ctx: + parsedmarc.cli._main() + + self.assertEqual(ctx.exception.code, 1) + mock_save_aggregate.assert_called_once() + mock_save_forensic_opensearch.assert_called_once() class _FakeGraphResponse: def __init__(self, status_code, payload=None, text=""): self.status_code = status_code