Fix ruff formatting errors, duplicate import, and test mock key names

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-03-09 21:40:28 +00:00
committed by Sean Whalen
parent 82a4ffab56
commit f68671cbd4
3 changed files with 40 additions and 42 deletions

View File

@@ -825,18 +825,14 @@ def parse_aggregate_report_xml(
if policy_published["np"] is not None:
np_ = policy_published["np"]
if np_ not in ("none", "quarantine", "reject"):
logger.warning(
"Invalid np value: {0}".format(np_)
)
logger.warning("Invalid np value: {0}".format(np_))
new_policy_published["np"] = np_
testing = None
if "testing" in policy_published:
if policy_published["testing"] is not None:
testing = policy_published["testing"]
if testing not in ("n", "y"):
logger.warning(
"Invalid testing value: {0}".format(testing)
)
logger.warning("Invalid testing value: {0}".format(testing))
new_policy_published["testing"] = testing
discovery_method = None
if "discovery_method" in policy_published:
@@ -844,9 +840,7 @@ def parse_aggregate_report_xml(
discovery_method = policy_published["discovery_method"]
if discovery_method not in ("psl", "treewalk"):
logger.warning(
"Invalid discovery_method value: {0}".format(
discovery_method
)
"Invalid discovery_method value: {0}".format(discovery_method)
)
new_policy_published["discovery_method"] = discovery_method
new_report["policy_published"] = new_policy_published
@@ -1109,9 +1103,7 @@ def parsed_aggregate_reports_to_csv_rows(
fo = report["policy_published"]["fo"]
np_ = report["policy_published"].get("np", None)
testing = report["policy_published"].get("testing", None)
discovery_method = report["policy_published"].get(
"discovery_method", None
)
discovery_method = report["policy_published"].get("discovery_method", None)
report_dict: dict[str, Any] = dict(
xml_schema=xml_schema,
@@ -2395,9 +2387,7 @@ def save_output(
parsed_aggregate_reports_to_csv(aggregate_reports),
)
append_json(
os.path.join(output_directory, failure_json_filename), failure_reports
)
append_json(os.path.join(output_directory, failure_json_filename), failure_reports)
append_csv(
os.path.join(output_directory, failure_csv_filename),

View File

@@ -105,23 +105,33 @@ class _AggregateReportDoc(Document):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_dkim_result(
self, domain: str, selector: str, result: _DKIMResult,
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
self.dkim_results.append(
_DKIMResult(
domain=domain, selector=selector, result=result,
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
def add_spf_result(
self, domain: str, scope: str, result: _SPFResult,
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain, scope=scope, result=result,
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
) # pyright: ignore[reportCallIssue]
@@ -483,9 +493,7 @@ def save_aggregate_report_to_elasticsearch(
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -615,15 +623,12 @@ def save_failure_report_to_elasticsearch(
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(
index_suffix
)
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part)
for part in search_index.split(",")
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]

View File

@@ -108,23 +108,33 @@ class _AggregateReportDoc(Document):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(
self, domain: str, selector: str, result: _DKIMResult,
self,
domain: str,
selector: str,
result: _DKIMResult,
human_result: str = None,
):
self.dkim_results.append(
_DKIMResult(
domain=domain, selector=selector, result=result,
domain=domain,
selector=selector,
result=result,
human_result=human_result,
)
)
def add_spf_result(
self, domain: str, scope: str, result: _SPFResult,
self,
domain: str,
scope: str,
result: _SPFResult,
human_result: str = None,
):
self.spf_results.append(
_SPFResult(
domain=domain, scope=scope, result=result,
domain=domain,
scope=scope,
result=result,
human_result=human_result,
)
)
@@ -512,9 +522,7 @@ def save_aggregate_report_to_opensearch(
fo=aggregate_report["policy_published"]["fo"],
np=aggregate_report["policy_published"].get("np"),
testing=aggregate_report["policy_published"].get("testing"),
discovery_method=aggregate_report["policy_published"].get(
"discovery_method"
),
discovery_method=aggregate_report["policy_published"].get("discovery_method"),
)
for record in aggregate_report["records"]:
@@ -644,15 +652,12 @@ def save_failure_report_to_opensearch(
arrival_date_epoch_milliseconds = int(arrival_date.timestamp() * 1000)
if index_suffix is not None:
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(
index_suffix
)
search_index = "dmarc_failure_{0}*,dmarc_forensic_{0}*".format(index_suffix)
else:
search_index = "dmarc_failure*,dmarc_forensic*"
if index_prefix is not None:
search_index = ",".join(
"{0}{1}".format(index_prefix, part)
for part in search_index.split(",")
"{0}{1}".format(index_prefix, part) for part in search_index.split(",")
)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
@@ -697,9 +702,7 @@ def save_failure_report_to_opensearch(
"A failure sample to {0} from {1} "
"with a subject of {2} and arrival date of {3} "
"already exists in "
"OpenSearch".format(
to_, from_, subject, failure_report["arrival_date_utc"]
)
"OpenSearch".format(to_, from_, subject, failure_report["arrival_date_utc"])
)
parsed_sample = failure_report["parsed_sample"]