From 158d63d2058ea3c48058d551dc2e2fd326771e95 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Tue, 2 Dec 2025 12:59:03 -0500 Subject: [PATCH] Complete annotations on elastic.py --- parsedmarc/elastic.py | 112 ++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 52 deletions(-) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index ed3ceab..27a5e96 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import Optional, Union + from collections import OrderedDict from elasticsearch_dsl.search import Q @@ -89,15 +93,15 @@ class _AggregateReportDoc(Document): dkim_results = Nested(_DKIMResult) spf_results = Nested(_SPFResult) - def add_policy_override(self, type_, comment): + def add_policy_override(self, type_: str, comment: str): self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) - def add_dkim_result(self, domain, selector, result): + def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult): self.dkim_results.append( _DKIMResult(domain=domain, selector=selector, result=result) ) - def add_spf_result(self, domain, scope, result): + def add_spf_result(self, domain: str, scope: str, result: _SPFResult): self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) def save(self, **kwargs): @@ -133,21 +137,21 @@ class _ForensicSampleDoc(InnerDoc): body = Text() attachments = Nested(_EmailAttachmentDoc) - def add_to(self, display_name, address): + def add_to(self, display_name: str, address: str): self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) - def add_reply_to(self, display_name, address): + def add_reply_to(self, display_name: str, address: str): self.reply_to.append( _EmailAddressDoc(display_name=display_name, address=address) ) - def add_cc(self, display_name, address): + def add_cc(self, display_name: str, address: str): self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) - def add_bcc(self, display_name, address): + def add_bcc(self, display_name: str, address: str): self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) - def add_attachment(self, filename, content_type, sha256): + def add_attachment(self, filename: str, content_type: str, sha256: str): self.attachments.append( _EmailAttachmentDoc( filename=filename, content_type=content_type, sha256=sha256 @@ -199,15 +203,15 @@ class _SMTPTLSPolicyDoc(InnerDoc): def add_failure_details( self, - result_type, - ip_address, - receiving_ip, - receiving_mx_helo, - failed_session_count, - sending_mta_ip=None, - receiving_mx_hostname=None, - additional_information_uri=None, - failure_reason_code=None, + result_type: str, + ip_address: str, + receiving_ip: str, + receiving_mx_helo: str, + failed_session_count: int, + sending_mta_ip: Optional[str] = None, + receiving_mx_hostname: Optional[str] = None, + additional_information_uri: Optional[str] = None, + failure_reason_code: Union[str, int, None] = None, ): _details = _SMTPTLSFailureDetailsDoc( result_type=result_type, @@ -237,13 +241,14 @@ class _SMTPTLSReportDoc(Document): def add_policy( self, - policy_type, - policy_domain, - successful_session_count, - failed_session_count, - policy_string=None, - mx_host_patterns=None, - failure_details=None, + policy_type: str, + policy_domain: str, + successful_session_count: int, + failed_session_count: int, + *, + policy_string: Optional[str] = None, + mx_host_patterns: Optional[list[str]] = None, + failure_details: Optional[str] = None, ): self.policies.append( policy_type=policy_type, @@ -261,19 +266,20 @@ class AlreadySaved(ValueError): def set_hosts( - hosts, - use_ssl=False, - ssl_cert_path=None, - username=None, - password=None, - api_key=None, - timeout=60.0, + hosts: Union[str, list[str]], + *, + use_ssl: Optional[bool] = False, + ssl_cert_path: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + timeout: Optional[float] = 60.0, ): """ Sets the Elasticsearch hosts to use Args: - hosts (str): A single hostname or URL, or list of hostnames or URLs + hosts: A single hostname or URL, or list of hostnames or URLs use_ssl (bool): Use a HTTPS connection to the server ssl_cert_path (str): Path to the certificate chain username (str): The username to use for authentication @@ -298,7 +304,7 @@ def set_hosts( connections.create_connection(**conn_params) -def create_indexes(names, settings=None): +def create_indexes(names: list[str], settings: Optional[dict[str, any]] = None): """ Create Elasticsearch indexes @@ -321,7 +327,9 @@ def create_indexes(names, settings=None): raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__())) -def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): +def migrate_indexes( + aggregate_indexes: Optional[list[str]] = None, forensic_indexes: Optional[list[str]] = None +): """ Updates index mappings @@ -368,12 +376,12 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): def save_aggregate_report_to_elasticsearch( - aggregate_report, - index_suffix=None, - index_prefix=None, - monthly_indexes=False, - number_of_shards=1, - number_of_replicas=0, + aggregate_report: OrderedDict[str, any], + index_suffix: Optional[str] = None, + index_prefix: Optional[str] =None, + monthly_indexes: Optional[bool] = False, + number_of_shards: Optional[int]=1, + number_of_replicas: Optional[int]=0, ): """ Saves a parsed DMARC aggregate report to Elasticsearch @@ -530,12 +538,12 @@ def save_aggregate_report_to_elasticsearch( def save_forensic_report_to_elasticsearch( - forensic_report, - index_suffix=None, - index_prefix=None, - monthly_indexes=False, - number_of_shards=1, - number_of_replicas=0, + forensic_report: OrderedDict[str, any], + index_suffix: Optional[any]=None, + index_prefix: Optional[str]=None, + monthly_indexes: Optional[bool]=False, + number_of_shards:int = 1, + number_of_replicas:int = 0, ): """ Saves a parsed DMARC forensic report to Elasticsearch @@ -697,12 +705,12 @@ def save_forensic_report_to_elasticsearch( def save_smtp_tls_report_to_elasticsearch( - report, - index_suffix=None, - index_prefix=None, - monthly_indexes=False, - number_of_shards=1, - number_of_replicas=0, + report: OrderedDict[str, any], + index_suffix:str = None, + index_prefix:str = None, + monthly_indexes: Optional[bool] = False, + number_of_shards: Optional[int] = 1, + number_of_replicas: Optional[int]=0, ): """ Saves a parsed SMTP TLS report to Elasticsearch