Complete annotations on elastic.py

This commit is contained in:
Sean Whalen
2025-12-02 12:59:03 -05:00
parent f1933b906c
commit 158d63d205

View File

@@ -1,5 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional, Union
from collections import OrderedDict
from elasticsearch_dsl.search import Q
@@ -89,15 +93,15 @@ class _AggregateReportDoc(Document):
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
def add_policy_override(self, type_, comment):
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain, selector, result):
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
)
def add_spf_result(self, domain, scope, result):
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs):
@@ -133,21 +137,21 @@ class _ForensicSampleDoc(InnerDoc):
body = Text()
attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name, address):
def add_to(self, display_name: str, address: str):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_reply_to(self, display_name, address):
def add_reply_to(self, display_name: str, address: str):
self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address)
)
def add_cc(self, display_name, address):
def add_cc(self, display_name: str, address: str):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_bcc(self, display_name, address):
def add_bcc(self, display_name: str, address: str):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_attachment(self, filename, content_type, sha256):
def add_attachment(self, filename: str, content_type: str, sha256: str):
self.attachments.append(
_EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256
@@ -199,15 +203,15 @@ class _SMTPTLSPolicyDoc(InnerDoc):
def add_failure_details(
self,
result_type,
ip_address,
receiving_ip,
receiving_mx_helo,
failed_session_count,
sending_mta_ip=None,
receiving_mx_hostname=None,
additional_information_uri=None,
failure_reason_code=None,
result_type: str,
ip_address: str,
receiving_ip: str,
receiving_mx_helo: str,
failed_session_count: int,
sending_mta_ip: Optional[str] = None,
receiving_mx_hostname: Optional[str] = None,
additional_information_uri: Optional[str] = None,
failure_reason_code: Union[str, int, None] = None,
):
_details = _SMTPTLSFailureDetailsDoc(
result_type=result_type,
@@ -237,13 +241,14 @@ class _SMTPTLSReportDoc(Document):
def add_policy(
self,
policy_type,
policy_domain,
successful_session_count,
failed_session_count,
policy_string=None,
mx_host_patterns=None,
failure_details=None,
policy_type: str,
policy_domain: str,
successful_session_count: int,
failed_session_count: int,
*,
policy_string: Optional[str] = None,
mx_host_patterns: Optional[list[str]] = None,
failure_details: Optional[str] = None,
):
self.policies.append(
policy_type=policy_type,
@@ -261,19 +266,20 @@ class AlreadySaved(ValueError):
def set_hosts(
hosts,
use_ssl=False,
ssl_cert_path=None,
username=None,
password=None,
api_key=None,
timeout=60.0,
hosts: Union[str, list[str]],
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
timeout: Optional[float] = 60.0,
):
"""
Sets the Elasticsearch hosts to use
Args:
hosts (str): A single hostname or URL, or list of hostnames or URLs
hosts: A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use a HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication
@@ -298,7 +304,7 @@ def set_hosts(
connections.create_connection(**conn_params)
def create_indexes(names, settings=None):
def create_indexes(names: list[str], settings: Optional[dict[str, any]] = None):
"""
Create Elasticsearch indexes
@@ -321,7 +327,9 @@ def create_indexes(names, settings=None):
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None, forensic_indexes: Optional[list[str]] = None
):
"""
Updates index mappings
@@ -368,12 +376,12 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
def save_aggregate_report_to_elasticsearch(
aggregate_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
aggregate_report: OrderedDict[str, any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] =None,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int]=1,
number_of_replicas: Optional[int]=0,
):
"""
Saves a parsed DMARC aggregate report to Elasticsearch
@@ -530,12 +538,12 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
forensic_report: OrderedDict[str, any],
index_suffix: Optional[any]=None,
index_prefix: Optional[str]=None,
monthly_indexes: Optional[bool]=False,
number_of_shards:int = 1,
number_of_replicas:int = 0,
):
"""
Saves a parsed DMARC forensic report to Elasticsearch
@@ -697,12 +705,12 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
report: OrderedDict[str, any],
index_suffix:str = None,
index_prefix:str = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: Optional[int] = 1,
number_of_replicas: Optional[int]=0,
):
"""
Saves a parsed SMTP TLS report to Elasticsearch