Add fail_on_output_error CLI option for sink failures (#672)

* Add fail-on-output-error option and CLI regression test

* Broaden fail_on_output_error coverage for disabled and multi-sink paths
This commit is contained in:
Kili
2026-03-09 22:35:38 +01:00
committed by GitHub
parent 79f47121a4
commit f2febf21d3
3 changed files with 220 additions and 41 deletions
+3
View File
@@ -146,6 +146,9 @@ The full set of configuration options are:
- `dns_timeout` - float: DNS timeout period
- `debug` - bool: Print debugging messages
- `silent` - bool: Only print errors (Default: `True`)
- `fail_on_output_error` - bool: Exit with a non-zero status code if
any configured output destination fails while saving/publishing
reports (Default: `False`)
- `log_file` - str: Write log messages to a file at this path
- `n_procs` - int: Number of process to run in parallel when
parsing in CLI mode (Default: `1`)
+58 -41
View File
@@ -194,6 +194,13 @@ def _main():
return None
def process_reports(reports_):
output_errors = []
def log_output_error(destination, error):
message = f"{destination} Error: {error}"
logger.error(message)
output_errors.append(message)
indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -230,11 +237,9 @@ def _main():
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
log_output_error("Elasticsearch", error_.__str__())
except Exception as error_:
logger.error(
"Elasticsearch exception error: {}".format(error_.__str__())
)
log_output_error("Elasticsearch exception", error_.__str__())
try:
if opts.opensearch_hosts:
@@ -252,11 +257,9 @@ def _main():
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
logger.error("OpenSearch Error: {0}".format(error_.__str__()))
log_output_error("OpenSearch", error_.__str__())
except Exception as error_:
logger.error(
"OpenSearch exception error: {}".format(error_.__str__())
)
log_output_error("OpenSearch exception", error_.__str__())
try:
if opts.kafka_hosts:
@@ -264,25 +267,25 @@ def _main():
report, kafka_aggregate_topic
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
log_output_error("Kafka", error_.__str__())
try:
if opts.s3_bucket:
s3_client.save_aggregate_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
log_output_error("S3", error_.__str__())
try:
if opts.syslog_server:
syslog_client.save_aggregate_report_to_syslog(report)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))
log_output_error("Syslog", error_.__str__())
try:
if opts.gelf_host:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_aggregate_url:
@@ -291,7 +294,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
log_output_error("Webhook", error_.__str__())
if opts.hec:
try:
@@ -299,7 +302,7 @@ def _main():
if len(aggregate_reports_) > 0:
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
except splunk.SplunkError as e:
logger.error("Splunk HEC error: {0}".format(e.__str__()))
log_output_error("Splunk HEC", e.__str__())
if opts.save_forensic:
for report in reports_["forensic_reports"]:
@@ -319,9 +322,9 @@ def _main():
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
logger.error(error_.__str__())
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
@@ -339,9 +342,9 @@ def _main():
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
logger.error("OpenSearch Error: {0}".format(error_.__str__()))
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
logger.error(error_.__str__())
log_output_error("Invalid DMARC report", error_.__str__())
try:
if opts.kafka_hosts:
@@ -349,25 +352,25 @@ def _main():
report, kafka_forensic_topic
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
log_output_error("Kafka", error_.__str__())
try:
if opts.s3_bucket:
s3_client.save_forensic_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
log_output_error("S3", error_.__str__())
try:
if opts.syslog_server:
syslog_client.save_forensic_report_to_syslog(report)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))
log_output_error("Syslog", error_.__str__())
try:
if opts.gelf_host:
gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_forensic_url:
@@ -376,7 +379,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
log_output_error("Webhook", error_.__str__())
if opts.hec:
try:
@@ -384,7 +387,7 @@ def _main():
if len(forensic_reports_) > 0:
hec_client.save_forensic_reports_to_splunk(forensic_reports_)
except splunk.SplunkError as e:
logger.error("Splunk HEC error: {0}".format(e.__str__()))
log_output_error("Splunk HEC", e.__str__())
if opts.save_smtp_tls:
for report in reports_["smtp_tls_reports"]:
@@ -404,9 +407,9 @@ def _main():
except elastic.AlreadySaved as warning:
logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_:
logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
log_output_error("Elasticsearch", error_.__str__())
except InvalidDMARCReport as error_:
logger.error(error_.__str__())
log_output_error("Invalid DMARC report", error_.__str__())
try:
shards = opts.opensearch_number_of_shards
@@ -424,9 +427,9 @@ def _main():
except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_:
logger.error("OpenSearch Error: {0}".format(error_.__str__()))
log_output_error("OpenSearch", error_.__str__())
except InvalidDMARCReport as error_:
logger.error(error_.__str__())
log_output_error("Invalid DMARC report", error_.__str__())
try:
if opts.kafka_hosts:
@@ -434,25 +437,25 @@ def _main():
smtp_tls_reports, kafka_smtp_tls_topic
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
log_output_error("Kafka", error_.__str__())
try:
if opts.s3_bucket:
s3_client.save_smtp_tls_report_to_s3(report)
except Exception as error_:
logger.error("S3 Error: {0}".format(error_.__str__()))
log_output_error("S3", error_.__str__())
try:
if opts.syslog_server:
syslog_client.save_smtp_tls_report_to_syslog(report)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))
log_output_error("Syslog", error_.__str__())
try:
if opts.gelf_host:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))
log_output_error("GELF", error_.__str__())
try:
if opts.webhook_smtp_tls_url:
@@ -461,7 +464,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value)
)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
log_output_error("Webhook", error_.__str__())
if opts.hec:
try:
@@ -469,7 +472,7 @@ def _main():
if len(smtp_tls_reports_) > 0:
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
except splunk.SplunkError as e:
logger.error("Splunk HEC error: {0}".format(e.__str__()))
log_output_error("Splunk HEC", e.__str__())
if opts.la_dce:
try:
@@ -490,14 +493,16 @@ def _main():
opts.save_smtp_tls,
)
except loganalytics.LogAnalyticsException as e:
logger.error("Log Analytics error: {0}".format(e.__str__()))
log_output_error("Log Analytics", e.__str__())
except Exception as e:
logger.error(
"Unknown error occurred"
+ " during the publishing"
+ " to Log Analytics: "
+ e.__str__()
log_output_error("Log Analytics", f"Unknown publishing error: {e}")
if opts.fail_on_output_error and output_errors:
raise ParserError(
"Output destination failures detected: {0}".format(
" | ".join(output_errors)
)
)
arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument(
@@ -739,6 +744,7 @@ def _main():
webhook_smtp_tls_url=None,
webhook_timeout=60,
normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
)
args = arg_parser.parse_args()
@@ -821,6 +827,10 @@ def _main():
opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
opts.fail_on_output_error = bool(
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config:
opts.log_file = general_config["log_file"]
if "n_procs" in general_config:
@@ -1834,7 +1844,11 @@ def _main():
"smtp_tls_reports": smtp_tls_reports,
}
process_reports(parsing_results)
try:
process_reports(parsing_results)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if opts.smtp_host:
try:
@@ -1890,6 +1904,9 @@ def _main():
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))
exit(1)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if __name__ == "__main__":
+159
View File
@@ -279,6 +279,165 @@ aws_service = aoss
self.assertEqual(mock_set_hosts.call_args.kwargs.get("auth_type"), "awssigv4")
self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_region"), "eu-west-1")
self.assertEqual(mock_set_hosts.call_args.kwargs.get("aws_service"), "aoss")
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
@patch("parsedmarc.cli.elastic.migrate_indexes")
@patch("parsedmarc.cli.elastic.set_hosts")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testFailOnOutputErrorExits(
self,
mock_imap_connection,
mock_get_reports,
_mock_set_hosts,
_mock_migrate_indexes,
mock_save_aggregate,
):
"""CLI should exit with code 1 when fail_on_output_error is enabled"""
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
"forensic_reports": [],
"smtp_tls_reports": [],
}
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
"simulated output failure"
)
config = """[general]
save_aggregate = true
fail_on_output_error = true
silent = true
[imap]
host = imap.example.com
user = test-user
password = test-password
[elasticsearch]
hosts = localhost
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
with self.assertRaises(SystemExit) as ctx:
parsedmarc.cli._main()
self.assertEqual(ctx.exception.code, 1)
mock_save_aggregate.assert_called_once()
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
@patch("parsedmarc.cli.elastic.migrate_indexes")
@patch("parsedmarc.cli.elastic.set_hosts")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testOutputErrorDoesNotExitWhenDisabled(
self,
mock_imap_connection,
mock_get_reports,
_mock_set_hosts,
_mock_migrate_indexes,
mock_save_aggregate,
):
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
"forensic_reports": [],
"smtp_tls_reports": [],
}
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
"simulated output failure"
)
config = """[general]
save_aggregate = true
fail_on_output_error = false
silent = true
[imap]
host = imap.example.com
user = test-user
password = test-password
[elasticsearch]
hosts = localhost
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
parsedmarc.cli._main()
mock_save_aggregate.assert_called_once()
@patch("parsedmarc.cli.opensearch.save_forensic_report_to_opensearch")
@patch("parsedmarc.cli.opensearch.migrate_indexes")
@patch("parsedmarc.cli.opensearch.set_hosts")
@patch("parsedmarc.cli.elastic.save_forensic_report_to_elasticsearch")
@patch("parsedmarc.cli.elastic.save_aggregate_report_to_elasticsearch")
@patch("parsedmarc.cli.elastic.migrate_indexes")
@patch("parsedmarc.cli.elastic.set_hosts")
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.IMAPConnection")
def testFailOnOutputErrorExitsWithMultipleSinkErrors(
self,
mock_imap_connection,
mock_get_reports,
_mock_es_set_hosts,
_mock_es_migrate,
mock_save_aggregate,
_mock_save_forensic_elastic,
_mock_os_set_hosts,
_mock_os_migrate,
mock_save_forensic_opensearch,
):
mock_imap_connection.return_value = object()
mock_get_reports.return_value = {
"aggregate_reports": [{"policy_published": {"domain": "example.com"}}],
"forensic_reports": [{"reported_domain": "example.com"}],
"smtp_tls_reports": [],
}
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
"aggregate sink failed"
)
mock_save_forensic_opensearch.side_effect = parsedmarc.cli.opensearch.OpenSearchError(
"forensic sink failed"
)
config = """[general]
save_aggregate = true
save_forensic = true
fail_on_output_error = true
silent = true
[imap]
host = imap.example.com
user = test-user
password = test-password
[elasticsearch]
hosts = localhost
[opensearch]
hosts = localhost
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
config_file.write(config)
config_path = config_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
with self.assertRaises(SystemExit) as ctx:
parsedmarc.cli._main()
self.assertEqual(ctx.exception.code, 1)
mock_save_aggregate.assert_called_once()
mock_save_forensic_opensearch.assert_called_once()
class _FakeGraphResponse:
def __init__(self, status_code, payload=None, text=""):
self.status_code = status_code