API reference

parsedmarc

A Python package for parsing DMARC reports

exception parsedmarc.InvalidAggregateReport[source]

Raised when an invalid DMARC aggregate report is encountered

exception parsedmarc.InvalidDMARCReport[source]

Raised when an invalid DMARC report is encountered

exception parsedmarc.InvalidFailureReport[source]

Raised when an invalid DMARC failure report is encountered

parsedmarc.InvalidForensicReport

alias of InvalidFailureReport

exception parsedmarc.InvalidSMTPTLSReport[source]

Raised when an invalid SMTP TLS report is encountered

exception parsedmarc.ParserError[source]

Raised whenever the parser fails for some reason

parsedmarc.append_json(filename: str, reports: Sequence[AggregateReport] | Sequence[FailureReport] | Sequence[SMTPTLSReport]) None[source]

Append reports to a JSON array on disk, creating the file if needed.

Reads the existing array (if the file exists and parses cleanly), merges the new reports onto the end, and rewrites the file as a single valid JSON array. An earlier version of this used an open(..., "a+") + seek() + overwrite pattern, but Python’s documentation is explicit that on POSIX, a / a+ writes always go to EOF regardless of seek position — so the second call onto an existing file produced [...],\n[...]-style corrupted output. Read-merge-write is the only way to get a valid JSON array out of repeated appends.

parsedmarc.email_results(results: ParsingResults, host: str, mail_from: str, mail_to: list[str] | None, *, mail_cc: list[str] | None = None, mail_bcc: list[str] | None = None, port: int = 0, require_encryption: bool = False, verify: bool = True, username: str | None = None, password: str | None = None, subject: str | None = None, attachment_filename: str | None = None, message: str | None = None)[source]

Emails parsing results as a zip file

Parameters:
  • results (dict) – Parsing results

  • host (str) – Mail server hostname or IP address

  • mail_from – The value of the message from header

  • mail_to (list) – A list of addresses to mail to

  • mail_cc (list) – A list of addresses to CC

  • mail_bcc (list) – A list addresses to BCC

  • port (int) – Port to use

  • require_encryption (bool) – Require a secure connection from the start

  • verify (bool) – verify the SSL/TLS certificate

  • username (str) – An optional username

  • password (str) – An optional password

  • subject (str) – Overrides the default message subject

  • attachment_filename (str) – Override the default attachment filename

  • message (str) – Override the default plain text body

parsedmarc.extract_report(content: bytes | str | BinaryIO) str[source]

Extracts text from a zip or gzip file, as a base64-encoded string, file-like object, or bytes.

Parameters:
  • content – report file as a base64-encoded string, file-like object or

  • bytes.

Returns:

The extracted text

Return type:

str

parsedmarc.extract_report_from_file_path(file_path: str | bytes | PathLike[str] | PathLike[bytes]) str[source]

Extracts report from a file at the given file_path

parsedmarc.get_dmarc_reports_from_mailbox(connection: MailboxConnection, *, reports_folder: str = 'INBOX', archive_folder: str = 'Archive', delete: bool = False, test: bool = False, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, nameservers: list[str] | None = None, dns_timeout: float = 6.0, dns_retries: int = 0, strip_attachment_payloads: bool = False, results: ParsingResults | None = None, batch_size: int = 10, since: datetime | date | str | None = None, create_folders: bool = True, normalize_timespan_threshold_hours: float = 24) ParsingResults[source]

Fetches and parses DMARC reports from a mailbox

Parameters:
  • connection – A Mailbox connection object

  • reports_folder (str) – The folder where reports can be found

  • archive_folder (str) – The folder to move processed mail to

  • delete (bool) – Delete messages after processing them

  • test (bool) – Do not move or delete messages after processing them

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • offline (bool) – Do not query online for geolocation or DNS

  • nameservers (list) – A list of DNS nameservers to query

  • dns_timeout (float) – Set the DNS query timeout

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

  • results (dict) – Results from the previous run

  • batch_size (int) – Number of messages to read and process before saving (use 0 for no limit)

  • since – Search for messages since certain time (units - {“m”:”minutes”, “h”:”hours”, “d”:”days”, “w”:”weeks”})

  • create_folders (bool) – Whether to create the destination folders (not used in watch)

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

Returns:

Lists of aggregate_reports, failure_reports, and smtp_tls_reports

Return type:

dict

parsedmarc.get_dmarc_reports_from_mbox(input_: str, *, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, strip_attachment_payloads: bool = False, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, normalize_timespan_threshold_hours: float = 24.0) ParsingResults[source]

Parses a mailbox in mbox format containing e-mails with attached DMARC reports

Parameters:
  • input (str) – A path to a mbox file

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • offline (bool) – Do not make online queries for geolocation or DNS

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

Returns:

Lists of aggregate_reports, failure_reports, and smtp_tls_reports

Return type:

dict

parsedmarc.get_report_zip(results: ParsingResults) bytes[source]

Creates a zip file of parsed report output

Parameters:

results – The parsed results

Returns:

zip file bytes

Return type:

bytes

parsedmarc.parse_aggregate_report_file(_input: str | bytes | BinaryIO, *, offline: bool = False, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, ip_db_path: str | None = None, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float = 24.0) AggregateReport[source]

Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report

Parameters:
  • _input (str | bytes | IO) – A path to a file, a file like object, or bytes

  • offline (bool) – Do not query online for geolocation or DNS

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • keep_alive (callable) – Keep alive function

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

Returns:

The parsed DMARC aggregate report

Return type:

dict

parsedmarc.parse_aggregate_report_xml(xml: str, *, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, nameservers: list[str] | None = None, timeout: float = 2.0, retries: int = 0, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float = 24.0) AggregateReport[source]

Parses a DMARC XML report string and returns a consistent dict

Parameters:
  • xml (str) – A string of DMARC aggregate report XML

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • offline (bool) – Do not query online for geolocation or DNS

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • timeout (float) – Sets the DNS timeout in seconds

  • retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • keep_alive (callable) – Keep alive function

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

Returns:

The parsed aggregate DMARC report

Return type:

dict

parsedmarc.parse_failure_report(feedback_report: str, sample: str, msg_date: datetime, *, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, ip_db_path: str | None = None, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, strip_attachment_payloads: bool = False) FailureReport[source]

Converts a DMARC failure report and sample to a dict

Parameters:
  • feedback_report (str) – A message’s feedback report as a string

  • sample (str) – The RFC 822 headers or RFC 822 message sample

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • offline (bool) – Do not query online for geolocation or DNS

  • msg_date (str) – The message’s date header

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

Returns:

A parsed report and sample

Return type:

dict

parsedmarc.parse_forensic_report(feedback_report: str, sample: str, msg_date: datetime, *, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, ip_db_path: str | None = None, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, strip_attachment_payloads: bool = False) FailureReport

Converts a DMARC failure report and sample to a dict

Parameters:
  • feedback_report (str) – A message’s feedback report as a string

  • sample (str) – The RFC 822 headers or RFC 822 message sample

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • offline (bool) – Do not query online for geolocation or DNS

  • msg_date (str) – The message’s date header

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

Returns:

A parsed report and sample

Return type:

dict

parsedmarc.parse_report_email(input_: bytes | str, *, offline: bool = False, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, strip_attachment_payloads: bool = False, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float = 24.0) AggregateParsedReport | FailureParsedReport | SMTPTLSParsedReport[source]

Parses a DMARC report from an email

Parameters:
  • input – An emailed DMARC report in RFC 822 format, as bytes or a string

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map

  • reverse_dns_map_url (str) – URL to a reverse DNS map

  • offline (bool) – Do not query online for geolocation on DNS

  • nameservers (list) – A list of one or more nameservers to use

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

  • keep_alive (callable) – keep alive function

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

Returns:

  • report_type: aggregate or failure

  • report: The parsed report

Return type:

dict

parsedmarc.parse_report_file(input_: bytes | str | PathLike[str] | PathLike[bytes] | BinaryIO, *, nameservers: list[str] | None = None, dns_timeout: float = 2.0, dns_retries: int = 0, strip_attachment_payloads: bool = False, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float = 24) AggregateParsedReport | FailureParsedReport | SMTPTLSParsedReport[source]

Parses a DMARC aggregate or failure file at the given path, a file-like object. or bytes

Parameters:
  • input (str | os.PathLike | bytes | BinaryIO) – A path to a file, a file-like object, or bytes

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Sets the DNS timeout in seconds

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Remove attachment payloads from failure report results

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map

  • reverse_dns_map_url (str) – URL to a reverse DNS map

  • offline (bool) – Do not make online queries for geolocation or DNS

  • keep_alive (callable) – Keep alive function

Returns:

The parsed DMARC report

Return type:

dict

parsedmarc.parse_smtp_tls_report_json(report: str | bytes) SMTPTLSReport[source]

Parses and validates an SMTP TLS report

parsedmarc.parsed_aggregate_reports_to_csv(reports: AggregateReport | list[AggregateReport]) str[source]

Converts one or more parsed aggregate reports to flat CSV format, including headers

Parameters:

reports – A parsed aggregate report or list of parsed aggregate reports

Returns:

Parsed aggregate report data in flat CSV format, including headers

Return type:

str

parsedmarc.parsed_aggregate_reports_to_csv_rows(reports: AggregateReport | list[AggregateReport]) list[dict[str, Any]][source]

Converts one or more parsed aggregate reports to list of dicts in flat CSV format

Parameters:

reports – A parsed aggregate report or list of parsed aggregate reports

Returns:

Parsed aggregate report data as a list of dicts in flat CSV format

Return type:

list

parsedmarc.parsed_failure_reports_to_csv(reports: FailureReport | list[FailureReport]) str[source]

Converts one or more parsed failure reports to flat CSV format, including headers

Parameters:

reports – A parsed failure report or list of parsed failure reports

Returns:

Parsed failure report data in flat CSV format, including headers

Return type:

str

parsedmarc.parsed_failure_reports_to_csv_rows(reports: FailureReport | list[FailureReport]) list[dict[str, Any]][source]

Converts one or more parsed failure reports to a list of dicts in flat CSV format

Parameters:

reports – A parsed failure report or list of parsed failure reports

Returns:

Parsed failure report data as a list of dicts in flat CSV format

Return type:

list

parsedmarc.parsed_forensic_reports_to_csv(reports: FailureReport | list[FailureReport]) str

Converts one or more parsed failure reports to flat CSV format, including headers

Parameters:

reports – A parsed failure report or list of parsed failure reports

Returns:

Parsed failure report data in flat CSV format, including headers

Return type:

str

parsedmarc.parsed_forensic_reports_to_csv_rows(reports: FailureReport | list[FailureReport]) list[dict[str, Any]]

Converts one or more parsed failure reports to a list of dicts in flat CSV format

Parameters:

reports – A parsed failure report or list of parsed failure reports

Returns:

Parsed failure report data as a list of dicts in flat CSV format

Return type:

list

parsedmarc.parsed_smtp_tls_reports_to_csv(reports: SMTPTLSReport | list[SMTPTLSReport]) str[source]

Converts one or more parsed SMTP TLS reports to flat CSV format, including headers

Parameters:

reports – A parsed aggregate report or list of parsed aggregate reports

Returns:

Parsed aggregate report data in flat CSV format, including headers

Return type:

str

parsedmarc.parsed_smtp_tls_reports_to_csv_rows(reports: SMTPTLSReport | list[SMTPTLSReport]) list[dict[str, Any]][source]

Converts one oor more parsed SMTP TLS reports into a list of single layer dict objects suitable for use in a CSV

parsedmarc.save_output(results: ParsingResults, *, output_directory: str = 'output', aggregate_json_filename: str = 'aggregate.json', failure_json_filename: str = 'failure.json', smtp_tls_json_filename: str = 'smtp_tls.json', aggregate_csv_filename: str = 'aggregate.csv', failure_csv_filename: str = 'failure.csv', smtp_tls_csv_filename: str = 'smtp_tls.csv')[source]

Save report data in the given directory

Parameters:
  • results – Parsing results

  • output_directory (str) – The path to the directory to save in

  • aggregate_json_filename (str) – Filename for the aggregate JSON file

  • failure_json_filename (str) – Filename for the failure JSON file

  • smtp_tls_json_filename (str) – Filename for the SMTP TLS JSON file

  • aggregate_csv_filename (str) – Filename for the aggregate CSV file

  • failure_csv_filename (str) – Filename for the failure CSV file

  • smtp_tls_csv_filename (str) – Filename for the SMTP TLS CSV file

parsedmarc.watch_inbox(mailbox_connection: MailboxConnection, callback: Callable, *, reports_folder: str = 'INBOX', archive_folder: str = 'Archive', delete: bool = False, test: bool = False, check_timeout: int = 30, ip_db_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool = False, nameservers: list[str] | None = None, dns_timeout: float = 6.0, dns_retries: int = 0, strip_attachment_payloads: bool = False, batch_size: int = 10, since: datetime | date | str | None = None, normalize_timespan_threshold_hours: float = 24, config_reloading: Callable | None = None)[source]
Watches the mailbox for new messages and

sends the results to a callback function

Parameters:
  • mailbox_connection – The mailbox connection object

  • callback – The callback function to receive the parsing results

  • reports_folder (str) – The IMAP folder where reports can be found

  • archive_folder (str) – The folder to move processed mail to

  • delete (bool) – Delete messages after processing them

  • test (bool) – Do not move or delete messages after processing them

  • check_timeout (int) – Number of seconds to wait for a IMAP IDLE response or the number of seconds until the next mail check

  • ip_db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

  • always_use_local_files (bool) – Do not download files

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to a reverse DNS map file

  • offline (bool) – Do not query online for geolocation or DNS

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • dns_timeout (float) – Set the DNS query timeout

  • dns_retries (int) – Number of times to retry DNS queries on timeout or other transient errors

  • strip_attachment_payloads (bool) – Replace attachment payloads in failure report samples with None

  • batch_size (int) – Number of messages to read and process before saving

  • since – Search for messages since certain time

  • normalize_timespan_threshold_hours (float) – Normalize timespans beyond this

  • config_reloading – Optional callable that returns True when a config reload has been requested (e.g. via SIGHUP)

parsedmarc.elastic

exception parsedmarc.elastic.AlreadySaved[source]

Raised when a report to be saved matches an existing report

exception parsedmarc.elastic.ElasticsearchError[source]

Raised when an Elasticsearch error occurs

parsedmarc.elastic.create_indexes(names: list[str], settings: dict[str, Any] | None = None)[source]

Create Elasticsearch indexes

Parameters:
  • names (list) – A list of index names

  • settings (dict) – Index settings. In Serverless mode, keys in _SERVERLESS_REJECTED_SETTINGS are filtered out and the remaining keys are passed through; defaults are skipped entirely.

parsedmarc.elastic.migrate_indexes(aggregate_indexes: list[str] | None = None, failure_indexes: list[str] | None = None)[source]

Updates index mappings

Parameters:
  • aggregate_indexes (list) – A list of aggregate index names

  • failure_indexes (list) – A list of failure index names

parsedmarc.elastic.save_aggregate_report_to_elasticsearch(aggregate_report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed DMARC aggregate report to Elasticsearch

Parameters:
  • aggregate_report (dict) – A parsed aggregate report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.elastic.save_failure_report_to_elasticsearch(failure_report: dict[str, Any], index_suffix: Any | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed DMARC failure report to Elasticsearch

Parameters:
  • failure_report (dict) – A parsed failure report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.elastic.save_forensic_report_to_elasticsearch(failure_report: dict[str, Any], index_suffix: Any | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int = 1, number_of_replicas: int = 0)

Saves a parsed DMARC failure report to Elasticsearch

Parameters:
  • failure_report (dict) – A parsed failure report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.elastic.save_smtp_tls_report_to_elasticsearch(report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed SMTP TLS report to Elasticsearch

Parameters:
  • report (dict) – A parsed SMTP TLS report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.elastic.set_hosts(hosts: str | list[str], *, use_ssl: bool = False, ssl_cert_path: str | None = None, skip_certificate_verification: bool = False, username: str | None = None, password: str | None = None, api_key: str | None = None, timeout: float = 60.0, serverless: bool = False)[source]

Sets the Elasticsearch hosts to use

Parameters:
  • hosts (str | list[str]) – A single hostname or URL, or list of hostnames or URLs

  • use_ssl (bool) – Use an HTTPS connection to the server

  • ssl_cert_path (str) – Path to the certificate chain

  • skip_certificate_verification (bool) – Skip certificate verification

  • username (str) – The username to use for authentication

  • password (str) – The password to use for authentication

  • api_key (str) – The Base64 encoded API key to use for authentication

  • timeout (float) – Timeout in seconds

  • serverless (bool) – Target an Elastic Cloud Serverless project. When True, create_indexes strips number_of_shards / number_of_replicas from its settings (which Serverless rejects with HTTP 400) and passes any other settings through unchanged.

parsedmarc.opensearch

exception parsedmarc.opensearch.AlreadySaved[source]

Raised when a report to be saved matches an existing report

exception parsedmarc.opensearch.OpenSearchError[source]

Raised when an OpenSearch error occurs

parsedmarc.opensearch.create_indexes(names: list[str], settings: dict[str, Any] | None = None)[source]

Create OpenSearch indexes

Parameters:
  • names (list) – A list of index names

  • settings (dict) – Index settings

parsedmarc.opensearch.migrate_indexes(aggregate_indexes: list[str] | None = None, failure_indexes: list[str] | None = None)[source]

Updates index mappings

Parameters:
  • aggregate_indexes (list) – A list of aggregate index names

  • failure_indexes (list) – A list of failure index names

parsedmarc.opensearch.save_aggregate_report_to_opensearch(aggregate_report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed DMARC aggregate report to OpenSearch

Parameters:
  • aggregate_report (dict) – A parsed aggregate report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.opensearch.save_failure_report_to_opensearch(failure_report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed DMARC failure report to OpenSearch

Parameters:
  • failure_report (dict) – A parsed failure report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.opensearch.save_forensic_report_to_opensearch(failure_report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool = False, number_of_shards: int = 1, number_of_replicas: int = 0)

Saves a parsed DMARC failure report to OpenSearch

Parameters:
  • failure_report (dict) – A parsed failure report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.opensearch.save_smtp_tls_report_to_opensearch(report: dict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]

Saves a parsed SMTP TLS report to OpenSearch

Parameters:
  • report (dict) – A parsed SMTP TLS report

  • index_suffix (str) – The suffix of the name of the index to save to

  • index_prefix (str) – The prefix of the name of the index to save to

  • monthly_indexes (bool) – Use monthly indexes instead of daily indexes

  • number_of_shards (int) – The number of shards to use in the index

  • number_of_replicas (int) – The number of replicas to use in the index

Raises:

AlreadySaved

parsedmarc.opensearch.set_hosts(hosts: str | list[str], *, use_ssl: bool | None = False, ssl_cert_path: str | None = None, skip_certificate_verification: bool = False, username: str | None = None, password: str | None = None, api_key: str | None = None, timeout: float | None = 60.0, auth_type: str = 'basic', aws_region: str | None = None, aws_service: str = 'es')[source]

Sets the OpenSearch hosts to use

Parameters:
  • hosts (str|list[str]) – A single hostname or URL, or list of hostnames or URLs

  • use_ssl (bool) – Use an HTTPS connection to the server

  • ssl_cert_path (str) – Path to the certificate chain

  • skip_certificate_verification (bool) – Skip certificate verification

  • username (str) – The username to use for authentication

  • password (str) – The password to use for authentication

  • api_key (str) – The Base64 encoded API key to use for authentication

  • timeout (float) – Timeout in seconds

  • auth_type (str) – OpenSearch auth mode: basic (default) or awssigv4

  • aws_region (str) – AWS region for SigV4 auth (required for awssigv4)

  • aws_service (str) – AWS service for SigV4 signing (default: es)

parsedmarc.splunk

class parsedmarc.splunk.HECClient(url: str, access_token: str, index: str, source: str = 'parsedmarc', verify=True, timeout=60)[source]

Initializes the HECClient

Parameters:
  • url (str) – The URL of the HEC

  • access_token (str) – The HEC access token

  • index (str) – The name of the index

  • source (str) – The source name

  • verify (bool) – Verify SSL certificates

  • timeout (float) – Number of seconds to wait for the server to send data before giving up

close()[source]

Close the underlying HTTP session.

save_aggregate_reports_to_splunk(aggregate_reports: list[dict[str, Any]] | dict[str, Any])[source]

Saves aggregate DMARC reports to Splunk

Parameters:

aggregate_reports – A list of aggregate report dictionaries to save in Splunk

save_failure_reports_to_splunk(failure_reports: list[dict[str, Any]] | dict[str, Any])[source]

Saves failure DMARC reports to Splunk

Parameters:

failure_reports (list) – A list of failure report dictionaries to save in Splunk

save_forensic_reports_to_splunk(failure_reports: list[dict[str, Any]] | dict[str, Any])

Saves failure DMARC reports to Splunk

Parameters:

failure_reports (list) – A list of failure report dictionaries to save in Splunk

save_smtp_tls_reports_to_splunk(reports: list[dict[str, Any]] | dict[str, Any])[source]

Saves aggregate DMARC reports to Splunk

Parameters:

reports – A list of SMTP TLS report dictionaries to save in Splunk

exception parsedmarc.splunk.SplunkError[source]

Raised when a Splunk API error occurs

parsedmarc.types

class parsedmarc.types.AggregateAlignment[source]
class parsedmarc.types.AggregateAuthResultDKIM[source]
class parsedmarc.types.AggregateAuthResultSPF[source]
class parsedmarc.types.AggregateAuthResults[source]
class parsedmarc.types.AggregateIdentifiers[source]
class parsedmarc.types.AggregateParsedReport[source]
class parsedmarc.types.AggregatePolicyEvaluated[source]
class parsedmarc.types.AggregatePolicyOverrideReason[source]
class parsedmarc.types.AggregatePolicyPublished[source]
class parsedmarc.types.AggregateRecord[source]
class parsedmarc.types.AggregateReport[source]
class parsedmarc.types.AggregateReportMetadata[source]
class parsedmarc.types.EmailAddress[source]
class parsedmarc.types.EmailAttachment[source]
class parsedmarc.types.FailureParsedReport[source]
class parsedmarc.types.FailureReport[source]
parsedmarc.types.ForensicParsedReport

alias of FailureParsedReport

parsedmarc.types.ForensicReport

alias of FailureReport

class parsedmarc.types.IPSourceInfo[source]
class parsedmarc.types.ParsedEmail
class parsedmarc.types.ParsingResults[source]
class parsedmarc.types.SMTPTLSFailureDetails[source]
class parsedmarc.types.SMTPTLSFailureDetailsOptional[source]
class parsedmarc.types.SMTPTLSParsedReport[source]
class parsedmarc.types.SMTPTLSPolicy[source]
class parsedmarc.types.SMTPTLSPolicySummary[source]
class parsedmarc.types.SMTPTLSReport[source]

parsedmarc.utils

Utility functions that might be useful for other projects

exception parsedmarc.utils.DownloadError[source]

Raised when an error occurs when downloading a file

exception parsedmarc.utils.EmailParserError[source]

Raised when an error parsing the email occurs

class parsedmarc.utils.IPAddressInfo[source]
exception parsedmarc.utils.InvalidIPinfoAPIKey[source]

Raised when the IPinfo API rejects the configured token.

class parsedmarc.utils.ReverseDNSService[source]
parsedmarc.utils.configure_ipinfo_api(token: str | None, *, probe: bool = True) None[source]

Configure the IPinfo Lite REST API as the primary source for IP lookups.

When a token is configured, get_ip_address_db_record() hits the API first for every lookup and falls back to the MMDB on network errors. An invalid token raises InvalidIPinfoAPIKey — the CLI catches that and exits fatally.

Parameters:
  • token – IPinfo API token. None or empty disables the API.

  • probe – If True, verify the token by looking up 1.1.1.1. A 401/403 raises InvalidIPinfoAPIKey; other errors are logged and the token is still accepted so per-request fallback can take over.

parsedmarc.utils.convert_outlook_msg(msg_bytes: bytes) bytes[source]

Uses the msgconvert Perl utility to convert an Outlook MS file to standard RFC 822 format

Parameters:

msg_bytes (bytes) – the content of the .msg file

Returns:

A RFC 822 bytes payload

parsedmarc.utils.decode_base64(data: str) bytes[source]

Decodes a base64 string, with padding being optional

Parameters:

data (str) – A base64 encoded string

Returns:

The decoded bytes

Return type:

bytes

parsedmarc.utils.get_base_domain(domain: str) str | None[source]

Gets the base domain name for the given domain

Note

Results are based on a list of public domain suffixes at https://publicsuffix.org/list/public_suffix_list.dat and overrides included in parsedmarc.resources.maps.psl_overrides.txt

Parameters:

domain (str) – A domain or subdomain

Returns:

The base domain of the given domain

Return type:

str

parsedmarc.utils.get_filename_safe_string(string: str) str[source]

Converts a string to a string that is safe for a filename

Parameters:

string (str) – A string to make safe for a filename

Returns:

A string safe for a filename

Return type:

str

parsedmarc.utils.get_ip_address_country(ip_address: str, *, db_path: str | None = None) str | None[source]

Returns the ISO code for the country associated with the given IPv4 or IPv6 address.

Parameters:
  • ip_address (str) – The IP address to query for

  • db_path (str) – Path to a MMDB file from IPinfo, MaxMind, or DBIP

Returns:

And ISO country code associated with the given IP address

Return type:

str

parsedmarc.utils.get_ip_address_db_record(ip_address: str, *, db_path: str | None = None) _IPDatabaseRecord[source]

Look up an IP and return country + ASN fields.

If the IPinfo Lite API is configured via configure_ipinfo_api(), the API is queried first; any non-fatal failure (rate limit, quota, network) falls through to the MMDB. An invalid API token raises InvalidIPinfoAPIKey and is not caught here.

IPinfo Lite carries country_code, as_name, and as_domain on every record. MaxMind/DBIP country-only databases carry only country, so as_name / as_domain come back None for those users.

parsedmarc.utils.get_ip_address_info(ip_address, *, ip_db_path: str | None = None, reverse_dns_map_path: str | None = None, always_use_local_files: bool = False, reverse_dns_map_url: str | None = None, cache: ExpiringDict | None = None, reverse_dns_map: dict[str, ReverseDNSService] | None = None, offline: bool = False, nameservers: list[str] | None = None, timeout: float = 2.0, retries: int = 0) IPAddressInfo[source]

Returns reverse DNS and country information for the given IP address

Parameters:
  • ip_address (str) – The IP address to check

  • ip_db_path (str) – path to a MMDB file from MaxMind or DBIP

  • reverse_dns_map_path (str) – Path to a reverse DNS map file

  • reverse_dns_map_url (str) – URL to the reverse DNS map file

  • always_use_local_files (bool) – Do not download files

  • cache (ExpiringDict) – Cache storage

  • reverse_dns_map (dict) – A reverse DNS map

  • offline (bool) – Do not make online queries for geolocation or DNS

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • timeout (float) – Sets the DNS timeout in seconds

  • retries (int) – Number of times to retry on timeout or other transient errors

Returns:

ip_address, reverse_dns, country

Return type:

dict

parsedmarc.utils.get_reverse_dns(ip_address, *, cache: ExpiringDict | None = None, nameservers: list[str] | None = None, timeout: float = 2.0, retries: int = 0) str | None[source]

Resolves an IP address to a hostname using a reverse DNS query

Parameters:
  • ip_address (str) – The IP address to resolve

  • cache (ExpiringDict) – Cache storage

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)

  • timeout (float) – Sets the DNS query timeout in seconds

  • retries (int) – Number of times to retry on timeout or other transient errors

Returns:

The reverse DNS hostname (if any)

Return type:

str

parsedmarc.utils.get_service_from_reverse_dns_base_domain(base_domain, *, always_use_local_file: bool = False, local_file_path: str | None = None, url: str | None = None, offline: bool = False, reverse_dns_map: dict[str, ReverseDNSService] | None = None) ReverseDNSService[source]

Returns the service name of a given base domain name from reverse DNS.

Parameters:
  • base_domain (str) – The base domain of the reverse DNS lookup

  • always_use_local_file (bool) – Always use a local map file

  • local_file_path (str) – Path to a local map file

  • url (str) – URL ro a reverse DNS map

  • offline (bool) – Use the built-in copy of the reverse DNS map

  • reverse_dns_map (dict) – A reverse DNS map

Returns:

A dictionary containing name and type. If the service is unknown, the name will be the supplied reverse_dns_base_domain and the type will be None

Return type:

dict

parsedmarc.utils.human_timestamp_to_datetime(human_timestamp: str, *, to_utc: bool = False) datetime[source]

Converts a human-readable timestamp into a Python datetime object

Parameters:
  • human_timestamp (str) – A timestamp string

  • to_utc (bool) – Convert the timestamp to UTC

Returns:

The converted timestamp

Return type:

datetime

parsedmarc.utils.human_timestamp_to_unix_timestamp(human_timestamp: str) int[source]

Converts a human-readable timestamp into a UNIX timestamp

Parameters:

human_timestamp (str) – A timestamp in YYYY-MM-DD HH:MM:SS` format

Returns:

The converted timestamp

Return type:

float

parsedmarc.utils.is_mbox(path: str) bool[source]

Checks if the given content is an MBOX mailbox file

Parameters:

path – Content to check

Returns:

A flag that indicates if the file is an MBOX mailbox file

Return type:

bool

parsedmarc.utils.is_outlook_msg(content) bool[source]

Checks if the given content is an Outlook msg OLE/MSG file

Parameters:

content – Content to check

Returns:

A flag that indicates if the file is an Outlook MSG file

Return type:

bool

parsedmarc.utils.load_ip_db(*, always_use_local_file: bool = False, local_file_path: str | None = None, url: str | None = None, offline: bool = False) None[source]

Downloads the IP-to-country MMDB database from a URL and caches it locally. Falls back to the bundled copy on failure or when offline.

Parameters:
  • always_use_local_file – Always use a local/bundled database file

  • local_file_path – Path to a local MMDB file

  • url – URL to the MMDB database file

  • offline – Do not make online requests

parsedmarc.utils.load_psl_overrides(*, always_use_local_file: bool = False, local_file_path: str | None = None, url: str | None = None, offline: bool = False) list[str][source]

Loads the PSL overrides list from a URL or local file.

Clears and repopulates the module-level psl_overrides list in place, then returns it. The URL is tried first; on failure (or when offline/always_use_local_file is set) the local path is used, defaulting to the bundled psl_overrides.txt.

Parameters:
  • always_use_local_file (bool) – Always use a local overrides file

  • local_file_path (str) – Path to a local overrides file

  • url (str) – URL to a PSL overrides file

  • offline (bool) – Use the built-in copy of the overrides

Returns:

the module-level psl_overrides list

Return type:

list[str]

parsedmarc.utils.load_reverse_dns_map(reverse_dns_map: dict[str, ReverseDNSService], *, always_use_local_file: bool = False, local_file_path: str | None = None, url: str | None = None, offline: bool = False, psl_overrides_path: str | None = None, psl_overrides_url: str | None = None) None[source]

Loads the reverse DNS map from a URL or local file.

Clears and repopulates the given map dict in place. If the map is fetched from a URL, that is tried first; on failure (or if offline/local mode is selected) the bundled CSV is used as a fallback.

psl_overrides.txt is reloaded at the same time using the same offline / always_use_local_file flags (with separate path/URL kwargs), so map entries that depend on a recent overrides entry fold correctly.

Parameters:
  • reverse_dns_map (dict) – The map dict to populate (modified in place)

  • always_use_local_file (bool) – Always use a local map file

  • local_file_path (str) – Path to a local map file

  • url (str) – URL to a reverse DNS map

  • offline (bool) – Use the built-in copy of the reverse DNS map

  • psl_overrides_path (str) – Path to a local PSL overrides file

  • psl_overrides_url (str) – URL to a PSL overrides file

parsedmarc.utils.parse_email(data: bytes | str, *, strip_attachment_payloads: bool = False) dict[source]

A simplified email parser

Parameters:
  • data – The RFC 822 message string, or MSG binary

  • strip_attachment_payloads (bool) – Remove attachment payloads

Returns:

Parsed email data

Return type:

dict

parsedmarc.utils.query_dns(domain: str, record_type: str, *, cache: ExpiringDict | None = None, nameservers: list[str] | None = None, timeout: float = 2.0, retries: int = 0, _attempt: int = 0) list[str][source]

Queries DNS

Parameters:
  • domain (str) – The domain or subdomain to query about

  • record_type (str) – The record type to query for

  • cache (ExpiringDict) – Cache storage

  • nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default). Pass parsedmarc.constants.RECOMMENDED_DNS_NAMESERVERS for a cross-provider mix that fails over when one provider’s path is slow or broken.

  • timeout (float) – Overall DNS lifetime budget in seconds per configured nameserver. Per-query UDP attempts are capped at min(1.0, timeout) so dnspython retries within the lifetime on transient UDP packet loss (mirroring dig’s default +tries=3 behavior); with multiple nameservers configured this same cap also makes a slow or broken nameserver fall through to the next quickly.

  • retries (int) – Number of times to retry the whole query after a timeout or other transient error (LifetimeTimeout, NoNameservers, OSError). Failover between configured nameservers happens within each attempt.

Returns:

A list of answers

Return type:

list

parsedmarc.utils.timestamp_to_datetime(timestamp: int) datetime[source]

Converts a UNIX/DMARC timestamp to a Python datetime object

Parameters:

timestamp (int) – The timestamp

Returns:

The converted timestamp as a Python datetime object

Return type:

datetime

parsedmarc.utils.timestamp_to_human(timestamp: int) str[source]

Converts a UNIX/DMARC timestamp to a human-readable string

Parameters:

timestamp – The timestamp

Returns:

The converted timestamp in YYYY-MM-DD HH:MM:SS format

Return type:

str

Indices and tables