Compare commits

..

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
2174f23eb5 Add comprehensive TypedDicts to minimize Any usage in public APIs
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:28:19 +00:00
copilot-swe-agent[bot]
febbb107c4 Fix Python 3.9 compatibility: replace pipe union syntax with Union/Optional
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 22:18:57 +00:00
copilot-swe-agent[bot]
9a64b494e7 Fix code review issues: incomplete isinstance and variable name mismatch
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:45:21 +00:00
copilot-swe-agent[bot]
e93209c766 Fix function signatures and improve type annotations
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:42:25 +00:00
copilot-swe-agent[bot]
d1c22466be Replace OrderedDict with dict and add TypedDict definitions
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-17 21:36:57 +00:00
copilot-swe-agent[bot]
3d1b2522d3 Initial plan 2025-12-17 21:19:30 +00:00
29 changed files with 932 additions and 1492 deletions

View File

@@ -30,7 +30,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v5

292
.vscode/settings.json vendored
View File

@@ -13,154 +13,148 @@
"MD024": false "MD024": false
}, },
"cSpell.words": [ "cSpell.words": [
"adkim", "adkim",
"akamaiedge", "akamaiedge",
"amsmath", "amsmath",
"andrewmcgilvray", "andrewmcgilvray",
"arcname", "arcname",
"aspf", "aspf",
"autoclass", "autoclass",
"automodule", "automodule",
"backported", "backported",
"bellsouth", "bellsouth",
"boto", "boto",
"brakhane", "brakhane",
"Brightmail", "Brightmail",
"CEST", "CEST",
"CHACHA", "CHACHA",
"checkdmarc", "checkdmarc",
"Codecov", "Codecov",
"confnew", "confnew",
"dateparser", "dateparser",
"dateutil", "dateutil",
"Davmail", "Davmail",
"DBIP", "DBIP",
"dearmor", "dearmor",
"deflist", "deflist",
"devel", "devel",
"DMARC", "DMARC",
"Dmarcian", "Dmarcian",
"dnspython", "dnspython",
"dollarmath", "dollarmath",
"dpkg", "dpkg",
"exampleuser", "exampleuser",
"expiringdict", "expiringdict",
"fieldlist", "fieldlist",
"GELF", "GELF",
"genindex", "genindex",
"geoip", "geoip",
"geoipupdate", "geoipupdate",
"Geolite", "Geolite",
"geolocation", "geolocation",
"githubpages", "githubpages",
"Grafana", "Grafana",
"hostnames", "hostnames",
"htpasswd", "htpasswd",
"httpasswd", "httpasswd",
"httplib", "httplib",
"ifhost", "IMAP",
"IMAP", "imapclient",
"imapclient", "infile",
"infile", "Interaktive",
"Interaktive", "IPDB",
"IPDB", "journalctl",
"journalctl", "keepalive",
"kafkaclient", "keyout",
"keepalive", "keyrings",
"keyout", "Leeman",
"keyrings", "libemail",
"Leeman", "linkify",
"libemail", "LISTSERV",
"linkify", "lxml",
"LISTSERV", "mailparser",
"loganalytics", "mailrelay",
"lxml", "mailsuite",
"mailparser", "maxdepth",
"mailrelay", "MAXHEADERS",
"mailsuite", "maxmind",
"maxdepth", "mbox",
"MAXHEADERS", "mfrom",
"maxmind", "michaeldavie",
"mbox", "mikesiegel",
"mfrom", "Mimecast",
"mhdw", "mitigations",
"michaeldavie", "MMDB",
"mikesiegel", "modindex",
"Mimecast", "msgconvert",
"mitigations", "msgraph",
"MMDB", "MSSP",
"modindex", "multiprocess",
"msgconvert", "Munge",
"msgraph", "ndjson",
"MSSP", "newkey",
"multiprocess", "Nhcm",
"Munge", "nojekyll",
"ndjson", "nondigest",
"newkey", "nosecureimap",
"Nhcm", "nosniff",
"nojekyll", "nwettbewerb",
"nondigest", "opensearch",
"nosecureimap", "opensearchpy",
"nosniff", "parsedmarc",
"nwettbewerb", "passsword",
"opensearch", "Postorius",
"opensearchpy", "premade",
"parsedmarc", "procs",
"passsword", "publicsuffix",
"pbar", "publicsuffixlist",
"Postorius", "publixsuffix",
"premade", "pygelf",
"privatesuffix", "pypy",
"procs", "pytest",
"publicsuffix", "quickstart",
"publicsuffixlist", "Reindex",
"publixsuffix", "replyto",
"pygelf", "reversename",
"pypy", "Rollup",
"pytest", "Rpdm",
"quickstart", "SAMEORIGIN",
"Reindex", "sdist",
"replyto", "Servernameone",
"reversename", "setuptools",
"Rollup", "smartquotes",
"Rpdm", "SMTPTLS",
"SAMEORIGIN", "sortlists",
"sdist", "sortmaps",
"Servernameone", "sourcetype",
"setuptools", "STARTTLS",
"smartquotes", "tasklist",
"SMTPTLS", "timespan",
"sortlists", "tlsa",
"sortmaps", "tlsrpt",
"sourcetype", "toctree",
"STARTTLS", "TQDDM",
"tasklist", "tqdm",
"timespan", "truststore",
"tlsa", "Übersicht",
"tlsrpt", "uids",
"toctree", "Uncategorized",
"TQDDM", "unparasable",
"tqdm", "uper",
"truststore", "urllib",
"Übersicht", "Valimail",
"uids", "venv",
"Uncategorized", "Vhcw",
"unparasable", "viewcode",
"uper", "virtualenv",
"urllib", "WBITS",
"Valimail", "webmail",
"venv", "Wettbewerber",
"Vhcw", "Whalen",
"viewcode", "whitespaces",
"virtualenv", "xennn",
"WBITS", "xmltodict",
"webmail", "xpack",
"Wettbewerber", "zscholl"
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
], ],
} }

View File

@@ -1,54 +1,5 @@
# Changelog # Changelog
## 9.1.0
## Enhancements
- Add TCP and TLS support for syslog output. (#656)
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
## 9.0.7
## Fixes
- Fix IMAP `since` option (#PR 645 closes issues #581 and #643).
## 9.0.6
### Fixes
- Fix #638.
- Fix/clarify report extraction and parsing behavior for multiple input types (bytes, base64 strings, and file-like objects).
- Fix type mismatches that could cause runtime issues in SMTP emailing and CLI option handling.
### Improvements
- Improve type hints across the library (Pylance/Pyright friendliness) and reduce false-positive linter errors.
- Emails in Microsoft 365 are now marked read as they are read. This provides constancy with other mailbox types, and gives you a indication of when emails are being read as they are processed in batches. (Close #625)
### Compatibility / Dependencies
- Set Python requirement to `>=3.9,<3.14`.
- Bump `mailsuite` requirement to `>=1.11.0`.
## 9.0.5 ## 9.0.5
## Fixes ## Fixes

View File

@@ -61,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Actively maintained | | 3.14 | | Not currently supported due to Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|

1
ci.ini
View File

@@ -3,7 +3,6 @@ save_aggregate = True
save_forensic = True save_forensic = True
save_smtp_tls = True save_smtp_tls = True
debug = True debug = True
offline = True
[elasticsearch] [elasticsearch]
hosts = http://localhost:9200 hosts = http://localhost:9200

View File

@@ -28,13 +28,6 @@
:members: :members:
``` ```
## parsedmarc.types
```{eval-rst}
.. automodule:: parsedmarc.types
:members:
```
## parsedmarc.utils ## parsedmarc.utils
```{eval-rst} ```{eval-rst}

View File

@@ -61,7 +61,7 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Actively maintained | | 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
```{toctree} ```{toctree}
:caption: 'Contents' :caption: 'Contents'

View File

@@ -336,59 +336,7 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional) - `secret_access_key` - str: The secret access key (Optional)
- `syslog` - `syslog`
- `server` - str: The Syslog server name or IP address - `server` - str: The Syslog server name or IP address
- `port` - int: The port to use (Default: `514`) - `port` - int: The UDP port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `gmail_api` - `gmail_api`
- `credentials_file` - str: Path to file containing the - `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`) credentials, None to disable (Default: `None`)

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -3,55 +3,53 @@
"""A CLI for parsing DMARC reports""" """A CLI for parsing DMARC reports"""
import http.client from argparse import Namespace, ArgumentParser
import json
import logging
import os import os
import sys
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser from configparser import ConfigParser
from glob import glob from glob import glob
from multiprocessing import Pipe, Process import logging
from ssl import CERT_NONE, create_default_context import math
import yaml import yaml
import json
from ssl import CERT_NONE, create_default_context
from multiprocessing import Pipe, Process
import sys
import http.client
from tqdm import tqdm from tqdm import tqdm
from parsedmarc import ( from parsedmarc import (
SEEN_AGGREGATE_REPORT_IDS, get_dmarc_reports_from_mailbox,
InvalidDMARCReport, watch_inbox,
parse_report_file,
get_dmarc_reports_from_mbox,
elastic,
opensearch,
kafkaclient,
splunk,
save_output,
email_results,
ParserError, ParserError,
__version__, __version__,
elastic, InvalidDMARCReport,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
s3, s3,
save_output,
splunk,
syslog, syslog,
watch_inbox, loganalytics,
gelf,
webhook, webhook,
) )
from parsedmarc.log import logger
from parsedmarc.mail import ( from parsedmarc.mail import (
GmailConnection,
IMAPConnection, IMAPConnection,
MaildirConnection,
MSGraphConnection, MSGraphConnection,
GmailConnection,
MaildirConnection,
) )
from parsedmarc.mail.graph import AuthMethod from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a from parsedmarc.log import logger
# private stdlib attribute and may not exist in type stubs. from parsedmarc.utils import is_mbox, get_reverse_dns, get_base_domain
setattr(http.client, "_MAXHEADERS", 200) from parsedmarc import SEEN_AGGREGATE_REPORT_IDS
http.client._MAXHEADERS = 200 # pylint:disable=protected-access
formatter = logging.Formatter( formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s", fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
@@ -68,48 +66,6 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list)) return list(map(lambda i: i.lstrip(), _list))
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse( def cli_parse(
file_path, file_path,
sa, sa,
@@ -122,29 +78,8 @@ def cli_parse(
reverse_dns_map_url, reverse_dns_map_url,
normalize_timespan_threshold_hours, normalize_timespan_threshold_hours,
conn, conn,
log_level=logging.ERROR,
log_file=None,
): ):
"""Separated this function for multiprocessing """Separated this function for multiprocessing"""
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
try: try:
file_results = parse_report_file( file_results = parse_report_file(
file_path, file_path,
@@ -169,7 +104,6 @@ def _main():
"""Called when the module is executed""" """Called when the module is executed"""
def get_index_prefix(report): def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None: if index_prefix_domain_map is None:
return None return None
if "policy_published" in report: if "policy_published" in report:
@@ -203,7 +137,7 @@ def _main():
print(output_str) print(output_str)
if opts.output: if opts.output:
save_output( save_output(
reports_, results,
output_directory=opts.output, output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename, aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename, forensic_json_filename=opts.forensic_json_filename,
@@ -697,13 +631,6 @@ def _main():
s3_secret_access_key=None, s3_secret_access_key=None,
syslog_server=None, syslog_server=None,
syslog_port=None, syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None, gmail_api_credentials_file=None,
gmail_api_token_file=None, gmail_api_token_file=None,
gmail_api_include_spam_trash=False, gmail_api_include_spam_trash=False,
@@ -749,7 +676,7 @@ def _main():
if "general" in config.sections(): if "general" in config.sections():
general_config = config["general"] general_config = config["general"]
if "silent" in general_config: if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent")) opts.silent = general_config.getboolean("silent")
if "normalize_timespan_threshold_hours" in general_config: if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat( opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours" "normalize_timespan_threshold_hours"
@@ -758,10 +685,10 @@ def _main():
with open(general_config["index_prefix_domain_map"]) as f: with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f) index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config: if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline")) opts.offline = general_config.getboolean("offline")
if "strip_attachment_payloads" in general_config: if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool( opts.strip_attachment_payloads = general_config.getboolean(
general_config.getboolean("strip_attachment_payloads") "strip_attachment_payloads"
) )
if "output" in general_config: if "output" in general_config:
opts.output = general_config["output"] opts.output = general_config["output"]
@@ -779,8 +706,6 @@ def _main():
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"] opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config: if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout") opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_test_address" in general_config: if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"] opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config: if "nameservers" in general_config:
@@ -803,19 +728,19 @@ def _main():
) )
exit(-1) exit(-1)
if "save_aggregate" in general_config: if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate")) opts.save_aggregate = general_config.getboolean("save_aggregate")
if "save_forensic" in general_config: if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic")) opts.save_forensic = general_config.getboolean("save_forensic")
if "save_smtp_tls" in general_config: if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls")) opts.save_smtp_tls = general_config.getboolean("save_smtp_tls")
if "debug" in general_config: if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug")) opts.debug = general_config.getboolean("debug")
if "verbose" in general_config: if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose")) opts.verbose = general_config.getboolean("verbose")
if "silent" in general_config: if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent")) opts.silent = general_config.getboolean("silent")
if "warnings" in general_config: if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings")) opts.warnings = general_config.getboolean("warnings")
if "log_file" in general_config: if "log_file" in general_config:
opts.log_file = general_config["log_file"] opts.log_file = general_config["log_file"]
if "n_procs" in general_config: if "n_procs" in general_config:
@@ -825,15 +750,15 @@ def _main():
else: else:
opts.ip_db_path = None opts.ip_db_path = None
if "always_use_local_files" in general_config: if "always_use_local_files" in general_config:
opts.always_use_local_files = bool( opts.always_use_local_files = general_config.getboolean(
general_config.getboolean("always_use_local_files") "always_use_local_files"
) )
if "reverse_dns_map_path" in general_config: if "reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = general_config["reverse_dns_path"] opts.reverse_dns_map_path = general_config["reverse_dns_path"]
if "reverse_dns_map_url" in general_config: if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_url"] opts.reverse_dns_map_url = general_config["reverse_dns_url"]
if "prettify_json" in general_config: if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json")) opts.prettify_json = general_config.getboolean("prettify_json")
if "mailbox" in config.sections(): if "mailbox" in config.sections():
mailbox_config = config["mailbox"] mailbox_config = config["mailbox"]
@@ -844,11 +769,11 @@ def _main():
if "archive_folder" in mailbox_config: if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"] opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config: if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch")) opts.mailbox_watch = mailbox_config.getboolean("watch")
if "delete" in mailbox_config: if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete")) opts.mailbox_delete = mailbox_config.getboolean("delete")
if "test" in mailbox_config: if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test")) opts.mailbox_test = mailbox_config.getboolean("test")
if "batch_size" in mailbox_config: if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size") opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config: if "check_timeout" in mailbox_config:
@@ -872,14 +797,14 @@ def _main():
if "port" in imap_config: if "port" in imap_config:
opts.imap_port = imap_config.getint("port") opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config: if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout") opts.imap_timeout = imap_config.getfloat("timeout")
if "max_retries" in imap_config: if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries") opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config: if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl")) opts.imap_ssl = imap_config.getboolean("ssl")
if "skip_certificate_verification" in imap_config: if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool( opts.imap_skip_certificate_verification = imap_config.getboolean(
imap_config.getboolean("skip_certificate_verification") "skip_certificate_verification"
) )
if "user" in imap_config: if "user" in imap_config:
opts.imap_user = imap_config["user"] opts.imap_user = imap_config["user"]
@@ -908,7 +833,7 @@ def _main():
"section instead." "section instead."
) )
if "watch" in imap_config: if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch")) opts.mailbox_watch = imap_config.getboolean("watch")
logger.warning( logger.warning(
"Use of the watch option in the imap " "Use of the watch option in the imap "
"configuration section has been deprecated. " "configuration section has been deprecated. "
@@ -923,7 +848,7 @@ def _main():
"section instead." "section instead."
) )
if "test" in imap_config: if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test")) opts.mailbox_test = imap_config.getboolean("test")
logger.warning( logger.warning(
"Use of the test option in the imap " "Use of the test option in the imap "
"configuration section has been deprecated. " "configuration section has been deprecated. "
@@ -1017,8 +942,8 @@ def _main():
opts.graph_url = graph_config["graph_url"] opts.graph_url = graph_config["graph_url"]
if "allow_unencrypted_storage" in graph_config: if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool( opts.graph_allow_unencrypted_storage = graph_config.getboolean(
graph_config.getboolean("allow_unencrypted_storage") "allow_unencrypted_storage"
) )
if "elasticsearch" in config: if "elasticsearch" in config:
@@ -1046,10 +971,10 @@ def _main():
if "index_prefix" in elasticsearch_config: if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"] opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config: if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes")) monthly = elasticsearch_config.getboolean("monthly_indexes")
opts.elasticsearch_monthly_indexes = monthly opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config: if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl")) opts.elasticsearch_ssl = elasticsearch_config.getboolean("ssl")
if "cert_path" in elasticsearch_config: if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"] opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "user" in elasticsearch_config: if "user" in elasticsearch_config:
@@ -1086,10 +1011,10 @@ def _main():
if "index_prefix" in opensearch_config: if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"] opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config: if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes")) monthly = opensearch_config.getboolean("monthly_indexes")
opts.opensearch_monthly_indexes = monthly opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config: if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl")) opts.opensearch_ssl = opensearch_config.getboolean("ssl")
if "cert_path" in opensearch_config: if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"] opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "user" in opensearch_config: if "user" in opensearch_config:
@@ -1143,11 +1068,9 @@ def _main():
if "password" in kafka_config: if "password" in kafka_config:
opts.kafka_password = kafka_config["password"] opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config: if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl")) opts.kafka_ssl = kafka_config.getboolean("ssl")
if "skip_certificate_verification" in kafka_config: if "skip_certificate_verification" in kafka_config:
kafka_verify = bool( kafka_verify = kafka_config.getboolean("skip_certificate_verification")
kafka_config.getboolean("skip_certificate_verification")
)
opts.kafka_skip_certificate_verification = kafka_verify opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config: if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"] opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
@@ -1179,11 +1102,9 @@ def _main():
if "port" in smtp_config: if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port") opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config: if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl")) opts.smtp_ssl = smtp_config.getboolean("ssl")
if "skip_certificate_verification" in smtp_config: if "skip_certificate_verification" in smtp_config:
smtp_verify = bool( smtp_verify = smtp_config.getboolean("skip_certificate_verification")
smtp_config.getboolean("skip_certificate_verification")
)
opts.smtp_skip_certificate_verification = smtp_verify opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config: if "user" in smtp_config:
opts.smtp_user = smtp_config["user"] opts.smtp_user = smtp_config["user"]
@@ -1246,38 +1167,16 @@ def _main():
opts.syslog_port = syslog_config["port"] opts.syslog_port = syslog_config["port"]
else: else:
opts.syslog_port = 514 opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections(): if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"] gmail_api_config = config["gmail_api"]
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file") opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token") opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
opts.gmail_api_include_spam_trash = bool( opts.gmail_api_include_spam_trash = gmail_api_config.getboolean(
gmail_api_config.getboolean("include_spam_trash", False) "include_spam_trash", False
) )
opts.gmail_api_paginate_messages = bool( opts.gmail_api_paginate_messages = gmail_api_config.getboolean(
gmail_api_config.getboolean("paginate_messages", True) "paginate_messages", True
) )
opts.gmail_api_scopes = gmail_api_config.get( opts.gmail_api_scopes = gmail_api_config.get(
"scopes", default_gmail_api_scope "scopes", default_gmail_api_scope
@@ -1291,9 +1190,7 @@ def _main():
if "maildir" in config.sections(): if "maildir" in config.sections():
maildir_api_config = config["maildir"] maildir_api_config = config["maildir"]
opts.maildir_path = maildir_api_config.get("maildir_path") opts.maildir_path = maildir_api_config.get("maildir_path")
opts.maildir_create = bool( opts.maildir_create = maildir_api_config.get("maildir_create")
maildir_api_config.getboolean("maildir_create", fallback=False)
)
if "log_analytics" in config.sections(): if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"] log_analytics_config = config["log_analytics"]
@@ -1388,11 +1285,6 @@ def _main():
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index) es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index) es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index) es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts( elastic.set_hosts(
opts.elasticsearch_hosts, opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl, use_ssl=opts.elasticsearch_ssl,
@@ -1400,7 +1292,7 @@ def _main():
username=opts.elasticsearch_username, username=opts.elasticsearch_username,
password=opts.elasticsearch_password, password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key, api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value, timeout=opts.elasticsearch_timeout,
) )
elastic.migrate_indexes( elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index], aggregate_indexes=[es_aggregate_index],
@@ -1425,11 +1317,6 @@ def _main():
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index) os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index) os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index) os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts( opensearch.set_hosts(
opts.opensearch_hosts, opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl, use_ssl=opts.opensearch_ssl,
@@ -1437,7 +1324,7 @@ def _main():
username=opts.opensearch_username, username=opts.opensearch_username,
password=opts.opensearch_password, password=opts.opensearch_password,
api_key=opts.opensearch_api_key, api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value, timeout=opts.opensearch_timeout,
) )
opensearch.migrate_indexes( opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index], aggregate_indexes=[os_aggregate_index],
@@ -1465,17 +1352,6 @@ def _main():
syslog_client = syslog.SyslogClient( syslog_client = syslog.SyslogClient(
server_name=opts.syslog_server, server_name=opts.syslog_server,
server_port=int(opts.syslog_port), server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
) )
except Exception as error_: except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__())) logger.error("Syslog Error: {0}".format(error_.__str__()))
@@ -1557,23 +1433,16 @@ def _main():
results = [] results = []
pbar = None
if sys.stdout.isatty(): if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths)) pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1) for batch_index in range(math.ceil(len(file_paths) / opts.n_procs)):
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
processes = [] processes = []
connections = [] connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)): for proc_index in range(
opts.n_procs * batch_index, opts.n_procs * (batch_index + 1)
):
if proc_index >= len(file_paths): if proc_index >= len(file_paths):
break break
@@ -1594,8 +1463,6 @@ def _main():
opts.reverse_dns_map_url, opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours, opts.normalize_timespan_threshold_hours,
child_conn, child_conn,
current_log_level,
current_log_file,
), ),
) )
processes.append(process) processes.append(process)
@@ -1608,12 +1475,9 @@ def _main():
for proc in processes: for proc in processes:
proc.join() proc.join()
if pbar is not None: if sys.stdout.isatty():
counter += 1 counter += 1
pbar.update(1) pbar.update(counter - pbar.n)
if pbar is not None:
pbar.close()
for result in results: for result in results:
if isinstance(result[0], ParserError) or result[0] is None: if isinstance(result[0], ParserError) or result[0] is None:
@@ -1637,11 +1501,6 @@ def _main():
smtp_tls_reports.append(result[0]["report"]) smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths: for mbox_path in mbox_paths:
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox( reports = get_dmarc_reports_from_mbox(
mbox_path, mbox_path,
@@ -1653,17 +1512,13 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url, reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline, offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
aggregate_reports += reports["aggregate_reports"] aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"] forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"] smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host: if opts.imap_host:
try: try:
if opts.imap_user is None or opts.imap_password is None: if opts.imap_user is None or opts.imap_password is None:
@@ -1679,20 +1534,13 @@ def _main():
if not opts.imap_ssl: if not opts.imap_ssl:
ssl = False ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection( mailbox_connection = IMAPConnection(
host=opts.imap_host, host=opts.imap_host,
port=imap_port_value, port=opts.imap_port,
ssl=ssl, ssl=ssl,
verify=verify, verify=verify,
timeout=imap_timeout, timeout=opts.imap_timeout,
max_retries=imap_max_retries, max_retries=opts.imap_max_retries,
user=opts.imap_user, user=opts.imap_user,
password=opts.imap_password, password=opts.imap_password,
) )
@@ -1713,7 +1561,7 @@ def _main():
username=opts.graph_user, username=opts.graph_user,
password=opts.graph_password, password=opts.graph_password,
token_file=opts.graph_token_file, token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage), allow_unencrypted_storage=opts.graph_allow_unencrypted_storage,
graph_url=opts.graph_url, graph_url=opts.graph_url,
) )
@@ -1758,24 +1606,11 @@ def _main():
exit(1) exit(1)
if mailbox_connection: if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
try: try:
reports = get_dmarc_reports_from_mailbox( reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection, connection=mailbox_connection,
delete=opts.mailbox_delete, delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value, batch_size=opts.mailbox_batch_size,
reports_folder=opts.mailbox_reports_folder, reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder, archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path, ip_db_path=opts.ip_db_path,
@@ -1787,7 +1622,7 @@ def _main():
test=opts.mailbox_test, test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads, strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since, since=opts.mailbox_since,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
aggregate_reports += reports["aggregate_reports"] aggregate_reports += reports["aggregate_reports"]
@@ -1798,31 +1633,27 @@ def _main():
logger.exception("Mailbox Error") logger.exception("Mailbox Error")
exit(1) exit(1)
parsing_results: ParsingResults = { results = dict(
"aggregate_reports": aggregate_reports, [
"forensic_reports": forensic_reports, ("aggregate_reports", aggregate_reports),
"smtp_tls_reports": smtp_tls_reports, ("forensic_reports", forensic_reports),
} ("smtp_tls_reports", smtp_tls_reports),
]
)
process_reports(parsing_results) process_reports(results)
if opts.smtp_host: if opts.smtp_host:
try: try:
verify = True verify = True
if opts.smtp_skip_certificate_verification: if opts.smtp_skip_certificate_verification:
verify = False verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results( email_results(
parsing_results, results,
opts.smtp_host, opts.smtp_host,
opts.smtp_from, opts.smtp_from,
smtp_to_value, opts.smtp_to,
port=smtp_port_value, port=opts.smtp_port,
verify=verify, verify=verify,
username=opts.smtp_user, username=opts.smtp_user,
password=opts.smtp_password, password=opts.smtp_password,
@@ -1844,17 +1675,17 @@ def _main():
archive_folder=opts.mailbox_archive_folder, archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete, delete=opts.mailbox_delete,
test=opts.mailbox_test, test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value, check_timeout=opts.mailbox_check_timeout,
nameservers=opts.nameservers, nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout, dns_timeout=opts.dns_timeout,
strip_attachment_payloads=opts.strip_attachment_payloads, strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value, batch_size=opts.mailbox_batch_size,
ip_db_path=opts.ip_db_path, ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files, always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url, reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline, offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value, normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
) )
except FileExistsError as error: except FileExistsError as error:
logger.error("{0}".format(error.__str__())) logger.error("{0}".format(error.__str__()))

View File

@@ -1,3 +1,3 @@
__version__ = "9.1.0" __version__ = "9.0.5"
USER_AGENT = f"parsedmarc/{__version__}" USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -2,28 +2,29 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Optional, Union from typing import Optional, Union, Any
from elasticsearch.helpers import reindex
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import ( from elasticsearch_dsl import (
Boolean, connections,
Date, Object,
Document, Document,
Index, Index,
Nested,
InnerDoc, InnerDoc,
Integer, Integer,
Ip,
Nested,
Object,
Search,
Text, Text,
connections, Boolean,
Ip,
Date,
Search,
) )
from elasticsearch_dsl.search import Q from elasticsearch.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception): class ElasticsearchError(Exception):
@@ -92,17 +93,17 @@ class _AggregateReportDoc(Document):
spf_results = Nested(_SPFResult) spf_results = Nested(_SPFResult)
def add_policy_override(self, type_: str, comment: str): def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue] self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult): def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
self.dkim_results.append( self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result) _DKIMResult(domain=domain, selector=selector, result=result)
) # pyright: ignore[reportCallIssue] )
def add_spf_result(self, domain: str, scope: str, result: _SPFResult): def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue] self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride] def save(self, **kwargs):
self.passed_dmarc = False self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -136,25 +137,25 @@ class _ForensicSampleDoc(InnerDoc):
attachments = Nested(_EmailAttachmentDoc) attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name: str, address: str): def add_to(self, display_name: str, address: str):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue] self.to.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_reply_to(self, display_name: str, address: str): def add_reply_to(self, display_name: str, address: str):
self.reply_to.append( self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address) _EmailAddressDoc(display_name=display_name, address=address)
) # pyright: ignore[reportCallIssue] )
def add_cc(self, display_name: str, address: str): def add_cc(self, display_name: str, address: str):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue] self.cc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_bcc(self, display_name: str, address: str): def add_bcc(self, display_name: str, address: str):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue] self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_attachment(self, filename: str, content_type: str, sha256: str): def add_attachment(self, filename: str, content_type: str, sha256: str):
self.attachments.append( self.attachments.append(
_EmailAttachmentDoc( _EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256 filename=filename, content_type=content_type, sha256=sha256
) )
) # pyright: ignore[reportCallIssue] )
class _ForensicReportDoc(Document): class _ForensicReportDoc(Document):
@@ -222,7 +223,7 @@ class _SMTPTLSPolicyDoc(InnerDoc):
additional_information=additional_information_uri, additional_information=additional_information_uri,
failure_reason_code=failure_reason_code, failure_reason_code=failure_reason_code,
) )
self.failure_details.append(_details) # pyright: ignore[reportCallIssue] self.failure_details.append(_details)
class _SMTPTLSReportDoc(Document): class _SMTPTLSReportDoc(Document):
@@ -256,7 +257,7 @@ class _SMTPTLSReportDoc(Document):
policy_string=policy_string, policy_string=policy_string,
mx_host_patterns=mx_host_patterns, mx_host_patterns=mx_host_patterns,
failure_details=failure_details, failure_details=failure_details,
) # pyright: ignore[reportCallIssue] )
class AlreadySaved(ValueError): class AlreadySaved(ValueError):
@@ -266,18 +267,18 @@ class AlreadySaved(ValueError):
def set_hosts( def set_hosts(
hosts: Union[str, list[str]], hosts: Union[str, list[str]],
*, *,
use_ssl: bool = False, use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None, ssl_cert_path: Optional[str] = None,
username: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
timeout: float = 60.0, timeout: Optional[float] = 60.0,
): ):
""" """
Sets the Elasticsearch hosts to use Sets the Elasticsearch hosts to use
Args: Args:
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs hosts (Union[str, list[str]]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication username (str): The username to use for authentication
@@ -367,7 +368,7 @@ def migrate_indexes(
} }
Index(new_index_name).create() Index(new_index_name).create()
Index(new_index_name).put_mapping(doc_type=doc, body=body) Index(new_index_name).put_mapping(doc_type=doc, body=body)
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType] reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete() Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes: for forensic_index in forensic_indexes:
@@ -379,8 +380,8 @@ def save_aggregate_report_to_elasticsearch(
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False, monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1, number_of_shards: Optional[int] = 1,
number_of_replicas: int = 0, number_of_replicas: Optional[int] = 0,
): ):
""" """
Saves a parsed DMARC aggregate report to Elasticsearch Saves a parsed DMARC aggregate report to Elasticsearch
@@ -410,11 +411,11 @@ def save_aggregate_report_to_elasticsearch(
else: else:
index_date = begin_date.strftime("%Y-%m-%d") index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType] report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType] domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType] begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType] end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None: if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix) search_index = "dmarc_aggregate_{0}*".format(index_suffix)
@@ -426,12 +427,13 @@ def save_aggregate_report_to_elasticsearch(
query = org_name_query & report_id_query & domain_query query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query query = query & begin_date_query & end_date_query
search.query = query search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:
existing = search.execute() existing = search.execute()
except Exception as error_: except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise ElasticsearchError( raise ElasticsearchError(
"Elasticsearch's search for existing report \ "Elasticsearch's search for existing report \
error: {}".format(error_.__str__()) error: {}".format(error_.__str__())
@@ -527,7 +529,7 @@ def save_aggregate_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
) )
create_indexes([index], index_settings) create_indexes([index], index_settings)
agg_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] agg_doc.meta.index = index
try: try:
agg_doc.save() agg_doc.save()
@@ -567,7 +569,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"] sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date) sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"] original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {} headers = dict()
for original_header in original_headers: for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header] headers[original_header.lower()] = original_headers[original_header]
@@ -581,7 +583,7 @@ def save_forensic_report_to_elasticsearch(
if index_prefix is not None: if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index) search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index) search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType] q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
from_ = None from_ = None
to_ = None to_ = None
@@ -596,7 +598,7 @@ def save_forensic_report_to_elasticsearch(
from_ = dict() from_ = dict()
from_["sample.headers.from"] = headers["from"] from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_)) # pyright: ignore[reportArgumentType] from_query = Q(dict(match_phrase=from_))
q = q & from_query q = q & from_query
if "to" in headers: if "to" in headers:
# We convert the TO header from a string list to a flat string. # We convert the TO header from a string list to a flat string.
@@ -608,12 +610,12 @@ def save_forensic_report_to_elasticsearch(
to_ = dict() to_ = dict()
to_["sample.headers.to"] = headers["to"] to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_)) # pyright: ignore[reportArgumentType] to_query = Q(dict(match_phrase=to_))
q = q & to_query q = q & to_query
if "subject" in headers: if "subject" in headers:
subject = headers["subject"] subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}} subject_query = {"match_phrase": {"sample.headers.subject": subject}}
q = q & Q(subject_query) # pyright: ignore[reportArgumentType] q = q & Q(subject_query)
search.query = q search.query = q
existing = search.execute() existing = search.execute()
@@ -691,7 +693,7 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
) )
create_indexes([index], index_settings) create_indexes([index], index_settings)
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] forensic_doc.meta.index = index
try: try:
forensic_doc.save() forensic_doc.save()
except Exception as e: except Exception as e:
@@ -706,9 +708,9 @@ def save_smtp_tls_report_to_elasticsearch(
report: dict[str, Any], report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: bool = False, monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1, number_of_shards: Optional[int] = 1,
number_of_replicas: int = 0, number_of_replicas: Optional[int] = 0,
): ):
""" """
Saves a parsed SMTP TLS report to Elasticsearch Saves a parsed SMTP TLS report to Elasticsearch
@@ -738,10 +740,10 @@ def save_smtp_tls_report_to_elasticsearch(
report["begin_date"] = begin_date report["begin_date"] = begin_date
report["end_date"] = end_date report["end_date"] = end_date
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # pyright: ignore[reportArgumentType] org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType] report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType] begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType] end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None: if index_suffix is not None:
search_index = "smtp_tls_{0}*".format(index_suffix) search_index = "smtp_tls_{0}*".format(index_suffix)
@@ -842,10 +844,10 @@ def save_smtp_tls_report_to_elasticsearch(
additional_information_uri=additional_information_uri, additional_information_uri=additional_information_uri,
failure_reason_code=failure_reason_code, failure_reason_code=failure_reason_code,
) )
smtp_tls_doc.policies.append(policy_doc) # pyright: ignore[reportCallIssue] smtp_tls_doc.policies.append(policy_doc)
create_indexes([index], index_settings) create_indexes([index], index_settings)
smtp_tls_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue] smtp_tls_doc.meta.index = index
try: try:
smtp_tls_doc.save() smtp_tls_doc.save()

View File

@@ -2,18 +2,20 @@
from __future__ import annotations from __future__ import annotations
import logging
import logging.handlers
import threading
from typing import Any from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler import logging
import logging.handlers
import json
import threading
from parsedmarc import ( from parsedmarc import (
parsed_aggregate_reports_to_csv_rows, parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows, parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows,
) )
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler
log_context_data = threading.local() log_context_data = threading.local()
@@ -50,7 +52,9 @@ class GelfClient(object):
) )
self.logger.addHandler(self.handler) self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]): def save_aggregate_report_to_gelf(
self, aggregate_reports: list[dict[str, Any]]
):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
log_context_data.parsedmarc = row log_context_data.parsedmarc = row
@@ -58,14 +62,14 @@ class GelfClient(object):
log_context_data.parsedmarc = None log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]): def save_forensic_report_to_gelf(
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
log_context_data.parsedmarc = row self.logger.info(json.dumps(row))
self.logger.info("parsedmarc forensic report")
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]): def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:
log_context_data.parsedmarc = row self.logger.info(json.dumps(row))
self.logger.info("parsedmarc smtptls report")

View File

@@ -2,16 +2,18 @@
from __future__ import annotations from __future__ import annotations
import json
from ssl import SSLContext, create_default_context
from typing import Any, Optional, Union from typing import Any, Optional, Union
from ssl import SSLContext
import json
from ssl import create_default_context
from kafka import KafkaProducer from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__ from parsedmarc import __version__
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class KafkaError(RuntimeError): class KafkaError(RuntimeError):
@@ -46,7 +48,7 @@ class KafkaClient(object):
``$ConnectionString``, and the password is the ``$ConnectionString``, and the password is the
Azure Event Hub connection string. Azure Event Hub connection string.
""" """
config: dict[str, Any] = dict( config = dict(
value_serializer=lambda v: json.dumps(v).encode("utf-8"), value_serializer=lambda v: json.dumps(v).encode("utf-8"),
bootstrap_servers=kafka_hosts, bootstrap_servers=kafka_hosts,
client_id="parsedmarc-{0}".format(__version__), client_id="parsedmarc-{0}".format(__version__),

View File

@@ -4,12 +4,11 @@ from __future__ import annotations
from typing import Any from typing import Any
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient from azure.monitor.ingestion import LogsIngestionClient
from parsedmarc.log import logger
class LogAnalyticsException(Exception): class LogAnalyticsException(Exception):
"""Raised when an Elasticsearch error occurs""" """Raised when an Elasticsearch error occurs"""
@@ -133,7 +132,7 @@ class LogAnalyticsClient(object):
def publish_results( def publish_results(
self, self,
results: dict[str, Any], results: dict[str, dict[str, Any]],
save_aggregate: bool, save_aggregate: bool,
save_forensic: bool, save_forensic: bool,
save_smtp_tls: bool, save_smtp_tls: bool,

View File

@@ -116,14 +116,14 @@ class GmailConnection(MailboxConnection):
else: else:
return [id for id in self._fetch_all_message_ids(reports_label_id)] return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id) -> str: def fetch_message(self, message_id):
msg = ( msg = (
self.service.users() self.service.users()
.messages() .messages()
.get(userId="me", id=message_id, format="raw") .get(userId="me", id=message_id, format="raw")
.execute() .execute()
) )
return urlsafe_b64decode(msg["raw"]).decode(errors="replace") return urlsafe_b64decode(msg["raw"])
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id) self.service.users().messages().delete(userId="me", id=message_id)

View File

@@ -6,7 +6,7 @@ from enum import Enum
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
from typing import Any, List, Optional, Union from typing import List, Optional
from azure.identity import ( from azure.identity import (
UsernamePasswordCredential, UsernamePasswordCredential,
@@ -28,7 +28,7 @@ class AuthMethod(Enum):
def _get_cache_args(token_path: Path, allow_unencrypted_storage): def _get_cache_args(token_path: Path, allow_unencrypted_storage):
cache_args: dict[str, Any] = { cache_args = {
"cache_persistence_options": TokenCachePersistenceOptions( "cache_persistence_options": TokenCachePersistenceOptions(
name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage
) )
@@ -151,9 +151,9 @@ class MSGraphConnection(MailboxConnection):
else: else:
logger.warning(f"Unknown response {resp.status_code} {resp.json()}") logger.warning(f"Unknown response {resp.status_code} {resp.json()}")
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]: def fetch_messages(self, folder_name: str, **kwargs) -> List[str]:
"""Returns a list of message UIDs in the specified folder""" """Returns a list of message UIDs in the specified folder"""
folder_id = self._find_folder_id_from_folder_path(reports_folder) folder_id = self._find_folder_id_from_folder_path(folder_name)
url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages" url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages"
since = kwargs.get("since") since = kwargs.get("since")
if not since: if not since:
@@ -166,7 +166,7 @@ class MSGraphConnection(MailboxConnection):
def _get_all_messages(self, url, batch_size, since): def _get_all_messages(self, url, batch_size, since):
messages: list messages: list
params: dict[str, Union[str, int]] = {"$select": "id"} params = {"$select": "id"}
if since: if since:
params["$filter"] = f"receivedDateTime ge {since}" params["$filter"] = f"receivedDateTime ge {since}"
if batch_size and batch_size > 0: if batch_size and batch_size > 0:

View File

@@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from typing import cast from typing import Optional
from time import sleep from time import sleep
@@ -17,14 +17,15 @@ from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection): class IMAPConnection(MailboxConnection):
def __init__( def __init__(
self, self,
host: str, host: Optional[str] = None,
user: str, *,
password: str, user: Optional[str] = None,
port: int = 993, password: Optional[str] = None,
ssl: bool = True, port: Optional[str] = None,
verify: bool = True, ssl: Optional[bool] = True,
timeout: int = 30, verify: Optional[bool] = True,
max_retries: int = 4, timeout: Optional[int] = 30,
max_retries: Optional[int] = 4,
): ):
self._username = user self._username = user
self._password = password self._password = password
@@ -46,13 +47,13 @@ class IMAPConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs): def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder) self._client.select_folder(reports_folder)
since = kwargs.get("since") since = kwargs.get("since")
if since is not None: if since:
return self._client.search(f"SINCE {since}") return self._client.search(["SINCE", since])
else: else:
return self._client.search() return self._client.search()
def fetch_message(self, message_id: int): def fetch_message(self, message_id: int):
return cast(str, self._client.fetch_message(message_id, parse=False)) return self._client.fetch_message(message_id, parse=False)
def delete_message(self, message_id: int): def delete_message(self, message_id: int):
self._client.delete_messages([message_id]) self._client.delete_messages([message_id])

View File

@@ -13,16 +13,16 @@ class MailboxConnection(ABC):
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
raise NotImplementedError raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs): def fetch_messages(self, reports_folder: str, **kwargs) -> list[str]:
raise NotImplementedError raise NotImplementedError
def fetch_message(self, message_id) -> str: def fetch_message(self, message_id) -> str:
raise NotImplementedError raise NotImplementedError
def delete_message(self, message_id): def delete_message(self, message_id: str):
raise NotImplementedError raise NotImplementedError
def move_message(self, message_id, folder_name: str): def move_message(self, message_id: str, folder_name: str):
raise NotImplementedError raise NotImplementedError
def keepalive(self): def keepalive(self):

View File

@@ -2,20 +2,21 @@
from __future__ import annotations from __future__ import annotations
import mailbox from typing import Optional
import os
from time import sleep from time import sleep
from typing import Dict
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
import mailbox
import os
class MaildirConnection(MailboxConnection): class MaildirConnection(MailboxConnection):
def __init__( def __init__(
self, self,
maildir_path: str, maildir_path: Optional[bool] = None,
maildir_create: bool = False, maildir_create: Optional[bool] = False,
): ):
self._maildir_path = maildir_path self._maildir_path = maildir_path
self._maildir_create = maildir_create self._maildir_create = maildir_create
@@ -32,31 +33,27 @@ class MaildirConnection(MailboxConnection):
) )
raise Exception(ex) raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create) self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._subfolder_client: Dict[str, mailbox.Maildir] = {} self._subfolder_client = {}
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
self._subfolder_client[folder_name] = self._client.add_folder(folder_name) self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._client.add_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs): def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys() return self._client.keys()
def fetch_message(self, message_id: str) -> str: def fetch_message(self, message_id: str):
msg = self._client.get(message_id) return self._client.get(message_id).as_string()
if msg is not None:
msg = msg.as_string()
if msg is not None:
return msg
return ""
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
self._client.remove(message_id) self._client.remove(message_id)
def move_message(self, message_id: str, folder_name: str): def move_message(self, message_id: str, folder_name: str):
message_data = self._client.get(message_id) message_data = self._client.get(message_id)
if message_data is None: if folder_name not in self._subfolder_client.keys():
return self._subfolder_client = mailbox.Maildir(
if folder_name not in self._subfolder_client: os.join(self.maildir_path, folder_name), create=self.maildir_create
self._subfolder_client[folder_name] = self._client.add_folder(folder_name) )
self._subfolder_client[folder_name].add(message_data) self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id) self._client.remove(message_id)

View File

@@ -2,28 +2,29 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Optional, Union from typing import Optional, Union, Any
from opensearchpy import ( from opensearchpy import (
Boolean, Q,
Date, connections,
Object,
Document, Document,
Index, Index,
Nested,
InnerDoc, InnerDoc,
Integer, Integer,
Ip,
Nested,
Object,
Q,
Search,
Text, Text,
connections, Boolean,
Ip,
Date,
Search,
) )
from opensearchpy.helpers import reindex from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception): class OpenSearchError(Exception):
@@ -102,7 +103,7 @@ class _AggregateReportDoc(Document):
def add_spf_result(self, domain: str, scope: str, result: _SPFResult): def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride] def save(self, **kwargs):
self.passed_dmarc = False self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -378,9 +379,9 @@ def save_aggregate_report_to_opensearch(
aggregate_report: dict[str, Any], aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: bool = False, monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1, number_of_shards: Optional[int] = 1,
number_of_replicas: int = 0, number_of_replicas: Optional[int] = 0,
): ):
""" """
Saves a parsed DMARC aggregate report to OpenSearch Saves a parsed DMARC aggregate report to OpenSearch
@@ -426,12 +427,13 @@ def save_aggregate_report_to_opensearch(
query = org_name_query & report_id_query & domain_query query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query query = query & begin_date_query & end_date_query
search.query = query search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try: try:
existing = search.execute() existing = search.execute()
except Exception as error_: except Exception as error_:
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
raise OpenSearchError( raise OpenSearchError(
"OpenSearch's search for existing report \ "OpenSearch's search for existing report \
error: {}".format(error_.__str__()) error: {}".format(error_.__str__())
@@ -539,7 +541,7 @@ def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any], forensic_report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: bool = False, monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1, number_of_shards: int = 1,
number_of_replicas: int = 0, number_of_replicas: int = 0,
): ):
@@ -567,7 +569,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"] sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date) sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"] original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {} headers = dict()
for original_header in original_headers: for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header] headers[original_header.lower()] = original_headers[original_header]
@@ -706,9 +708,9 @@ def save_smtp_tls_report_to_opensearch(
report: dict[str, Any], report: dict[str, Any],
index_suffix: Optional[str] = None, index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None, index_prefix: Optional[str] = None,
monthly_indexes: bool = False, monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1, number_of_shards: Optional[int] = 1,
number_of_replicas: int = 0, number_of_replicas: Optional[int] = 0,
): ):
""" """
Saves a parsed SMTP TLS report to OpenSearch Saves a parsed SMTP TLS report to OpenSearch

View File

@@ -2,9 +2,9 @@
from __future__ import annotations from __future__ import annotations
import json
from typing import Any from typing import Any
import json
import boto3 import boto3
from parsedmarc.log import logger from parsedmarc.log import logger
@@ -51,7 +51,7 @@ class S3Client(object):
aws_access_key_id=access_key_id, aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key, aws_secret_access_key=secret_access_key,
) )
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore self.bucket: Any = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report: dict[str, Any]): def save_aggregate_report_to_s3(self, report: dict[str, Any]):
self.save_report_to_s3(report, "aggregate") self.save_report_to_s3(report, "aggregate")

View File

@@ -2,13 +2,15 @@
from __future__ import annotations from __future__ import annotations
import json
import socket
from typing import Any, Union from typing import Any, Union
from urllib.parse import urlparse
import requests
from urllib.parse import urlparse
import socket
import json
import urllib3 import urllib3
import requests
from parsedmarc.constants import USER_AGENT from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger from parsedmarc.log import logger

View File

@@ -3,13 +3,13 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import logging.handlers import logging.handlers
import socket
import ssl from typing import Any
import time
from typing import Any, Optional
import json
from parsedmarc import ( from parsedmarc import (
parsed_aggregate_reports_to_csv_rows, parsed_aggregate_reports_to_csv_rows,
@@ -21,161 +21,37 @@ from parsedmarc import (
class SyslogClient(object): class SyslogClient(object):
"""A client for Syslog""" """A client for Syslog"""
def __init__( def __init__(self, server_name: str, server_port: int):
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
""" """
Initializes the SyslogClient Initializes the SyslogClient
Args: Args:
server_name (str): The Syslog server server_name (str): The Syslog server
server_port (int): The Syslog port server_port (int): The Syslog UDP port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
""" """
self.server_name = server_name self.server_name = server_name
self.server_port = server_port self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog") self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
self.logger.addHandler(log_handler) self.logger.addHandler(log_handler)
def _create_syslog_handler( def save_aggregate_report_to_syslog(
self, self, aggregate_reports: list[dict[str, Any]]
server_name: str, ):
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]): def save_forensic_report_to_syslog(
self, forensic_reports: list[dict[str, Any]]
):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports) rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]): def save_smtp_tls_report_to_syslog(
self, smtp_tls_reports: list[dict[str, Any]]
):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows: for row in rows:
self.logger.info(json.dumps(row)) self.logger.info(json.dumps(row))

View File

@@ -1,220 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# NOTE: This module is intentionally Python 3.9 compatible.
# - No PEP 604 unions (A | B)
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
class AggregatePolicyPublished(TypedDict):
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class IPSourceInfo(TypedDict):
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class AggregateAlignment(TypedDict):
spf: bool
dkim: bool
dmarc: bool
class AggregateIdentifiers(TypedDict):
header_from: str
envelope_from: Optional[str]
envelope_to: Optional[str]
class AggregatePolicyOverrideReason(TypedDict):
type: Optional[str]
comment: Optional[str]
class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
class AggregateAuthResults(TypedDict):
dkim: List[AggregateAuthResultDKIM]
spf: List[AggregateAuthResultSPF]
class AggregatePolicyEvaluated(TypedDict):
disposition: str
dkim: str
spf: str
policy_override_reasons: List[AggregatePolicyOverrideReason]
class AggregateRecord(TypedDict):
interval_begin: str
interval_end: str
source: IPSourceInfo
count: int
alignment: AggregateAlignment
policy_evaluated: AggregatePolicyEvaluated
disposition: str
identifiers: AggregateIdentifiers
auth_results: AggregateAuthResults
class AggregateReport(TypedDict):
xml_schema: str
report_metadata: AggregateReportMetadata
policy_published: AggregatePolicyPublished
records: List[AggregateRecord]
class EmailAddress(TypedDict):
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class EmailAttachment(TypedDict, total=False):
filename: Optional[str]
mail_content_type: Optional[str]
sha256: Optional[str]
ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
"date": Optional[str],
"from": EmailAddress,
"to": List[EmailAddress],
"cc": List[EmailAddress],
"bcc": List[EmailAddress],
"attachments": List[EmailAttachment],
"body": Optional[str],
"has_defects": bool,
"defects": Any,
"defects_categories": Any,
},
total=False,
)
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
authentication_results: Optional[str]
delivery_result: Optional[str]
auth_failure: List[str]
authentication_mechanisms: List[str]
dkim_domain: Optional[str]
reported_domain: str
sample_headers_only: bool
source: IPSourceInfo
sample: str
parsed_sample: ParsedEmail
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
class SMTPTLSFailureDetailsOptional(SMTPTLSFailureDetails, total=False):
sending_mta_ip: str
receiving_ip: str
receiving_mx_hostname: str
receiving_mx_helo: str
additional_info_uri: str
failure_reason_code: str
ip_address: str
class SMTPTLSPolicySummary(TypedDict):
policy_domain: str
policy_type: str
successful_session_count: int
failed_session_count: int
class SMTPTLSPolicy(SMTPTLSPolicySummary, total=False):
policy_strings: List[str]
mx_host_patterns: List[str]
failure_details: List[SMTPTLSFailureDetailsOptional]
class SMTPTLSReport(TypedDict):
organization_name: str
begin_date: str
end_date: str
contact_info: Union[str, List[str]]
report_id: str
policies: List[SMTPTLSPolicy]
class AggregateParsedReport(TypedDict):
report_type: Literal["aggregate"]
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
report_type: Literal["smtp_tls"]
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -4,23 +4,25 @@
from __future__ import annotations from __future__ import annotations
import base64 from typing import Optional, Union, TypedDict, Any
import csv
import hashlib
import io
import json
import logging
import mailbox
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Optional, TypedDict, Union, cast
import mailparser import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from expiringdict import ExpiringDict from expiringdict import ExpiringDict
import tempfile
import subprocess
import shutil
import mailparser
import json
import hashlib
import base64
import mailbox
import re
import csv
import io
try: try:
from importlib.resources import files from importlib.resources import files
@@ -29,19 +31,19 @@ except ImportError:
from importlib.resources import files from importlib.resources import files
import dns.exception from dateutil.parser import parse as parse_date
import dns.resolver
import dns.reversename import dns.reversename
import dns.resolver
import dns.exception
import geoip2.database import geoip2.database
import geoip2.errors import geoip2.errors
import publicsuffixlist import publicsuffixlist
import requests import requests
from dateutil.parser import parse as parse_date
from parsedmarc.log import logger
import parsedmarc.resources.dbip import parsedmarc.resources.dbip
import parsedmarc.resources.maps import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*") parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
@@ -64,21 +66,12 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file""" """Raised when an error occurs when downloading a file"""
class ReverseDNSService(TypedDict): class EmailAddress(TypedDict):
name: str """Parsed email address information"""
type: Optional[str] display_name: Optional[str]
address: str
local: Optional[str]
ReverseDNSMap = dict[str, ReverseDNSService] domain: Optional[str]
class IPAddressInfo(TypedDict):
ip_address: str
reverse_dns: Optional[str]
country: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
def decode_base64(data: str) -> bytes: def decode_base64(data: str) -> bytes:
@@ -92,14 +85,14 @@ def decode_base64(data: str) -> bytes:
bytes: The decoded bytes bytes: The decoded bytes
""" """
data_bytes = bytes(data, encoding="ascii") data = bytes(data, encoding="ascii")
missing_padding = len(data_bytes) % 4 missing_padding = len(data) % 4
if missing_padding != 0: if missing_padding != 0:
data_bytes += b"=" * (4 - missing_padding) data += b"=" * (4 - missing_padding)
return base64.b64decode(data_bytes) return base64.b64decode(data)
def get_base_domain(domain: str) -> Optional[str]: def get_base_domain(domain: str) -> str:
""" """
Gets the base domain name for the given domain Gets the base domain name for the given domain
@@ -128,8 +121,8 @@ def query_dns(
record_type: str, record_type: str,
*, *,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None, nameservers: list[str] = None,
timeout: float = 2.0, timeout: int = 2.0,
) -> list[str]: ) -> list[str]:
""" """
Queries DNS Queries DNS
@@ -149,9 +142,9 @@ def query_dns(
record_type = record_type.upper() record_type = record_type.upper()
cache_key = "{0}_{1}".format(domain, record_type) cache_key = "{0}_{1}".format(domain, record_type)
if cache: if cache:
cached_records = cache.get(cache_key, None) records = cache.get(cache_key, None)
if isinstance(cached_records, list): if records:
return cast(list[str], cached_records) return records
resolver = dns.resolver.Resolver() resolver = dns.resolver.Resolver()
timeout = float(timeout) timeout = float(timeout)
@@ -165,12 +158,26 @@ def query_dns(
resolver.nameservers = nameservers resolver.nameservers = nameservers
resolver.timeout = timeout resolver.timeout = timeout
resolver.lifetime = timeout resolver.lifetime = timeout
records = list( if record_type == "TXT":
map( resource_records = list(
lambda r: r.to_text().replace('"', "").rstrip("."), map(
resolver.resolve(domain, record_type, lifetime=timeout), lambda r: r.strings,
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
_resource_record = [
resource_record[0][:0].join(resource_record)
for resource_record in resource_records
if resource_record
]
records = [r.decode() for r in _resource_record]
else:
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
)
) )
)
if cache: if cache:
cache[cache_key] = records cache[cache_key] = records
@@ -181,9 +188,9 @@ def get_reverse_dns(
ip_address, ip_address,
*, *,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None, nameservers: list[str] = None,
timeout: float = 2.0, timeout: int = 2.0,
) -> Optional[str]: ) -> str:
""" """
Resolves an IP address to a hostname using a reverse DNS query Resolves an IP address to a hostname using a reverse DNS query
@@ -201,7 +208,7 @@ def get_reverse_dns(
try: try:
address = dns.reversename.from_address(ip_address) address = dns.reversename.from_address(ip_address)
hostname = query_dns( hostname = query_dns(
str(address), "PTR", cache=cache, nameservers=nameservers, timeout=timeout address, "PTR", cache=cache, nameservers=nameservers, timeout=timeout
)[0] )[0]
except dns.exception.DNSException as e: except dns.exception.DNSException as e:
@@ -238,7 +245,7 @@ def timestamp_to_human(timestamp: int) -> str:
def human_timestamp_to_datetime( def human_timestamp_to_datetime(
human_timestamp: str, *, to_utc: bool = False human_timestamp: str, *, to_utc: Optional[bool] = False
) -> datetime: ) -> datetime:
""" """
Converts a human-readable timestamp into a Python ``datetime`` object Converts a human-readable timestamp into a Python ``datetime`` object
@@ -269,12 +276,10 @@ def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
float: The converted timestamp float: The converted timestamp
""" """
human_timestamp = human_timestamp.replace("T", " ") human_timestamp = human_timestamp.replace("T", " ")
return int(human_timestamp_to_datetime(human_timestamp).timestamp()) return human_timestamp_to_datetime(human_timestamp).timestamp()
def get_ip_address_country( def get_ip_address_country(ip_address: str, *, db_path: Optional[str] = None) -> str:
ip_address: str, *, db_path: Optional[str] = None
) -> Optional[str]:
""" """
Returns the ISO code for the country associated Returns the ISO code for the country associated
with the given IPv4 or IPv6 address with the given IPv4 or IPv6 address
@@ -337,14 +342,14 @@ def get_ip_address_country(
def get_service_from_reverse_dns_base_domain( def get_service_from_reverse_dns_base_domain(
base_domain, base_domain: str,
*, *,
always_use_local_file: bool = False, always_use_local_file: Optional[bool] = False,
local_file_path: Optional[str] = None, local_file_path: Optional[str] = None,
url: Optional[str] = None, url: Optional[str] = None,
offline: bool = False, offline: Optional[bool] = False,
reverse_dns_map: Optional[ReverseDNSMap] = None, reverse_dns_map: Optional[dict[str, Any]] = None,
) -> ReverseDNSService: ) -> dict[str, Any]:
""" """
Returns the service name of a given base domain name from reverse DNS. Returns the service name of a given base domain name from reverse DNS.
@@ -361,6 +366,12 @@ def get_service_from_reverse_dns_base_domain(
the supplied reverse_dns_base_domain and the type will be None the supplied reverse_dns_base_domain and the type will be None
""" """
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
base_domain = base_domain.lower().strip() base_domain = base_domain.lower().strip()
if url is None: if url is None:
url = ( url = (
@@ -368,24 +379,11 @@ def get_service_from_reverse_dns_base_domain(
"/parsedmarc/master/parsedmarc/" "/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv" "resources/maps/base_reverse_dns_map.csv"
) )
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None: if reverse_dns_map is None:
reverse_dns_map_value = {} reverse_dns_map = dict()
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
csv_file = io.StringIO() csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0: if not (offline or always_use_local_file) and len(reverse_dns_map) == 0:
try: try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...") logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT} headers = {"User-Agent": USER_AGENT}
@@ -402,7 +400,7 @@ def get_service_from_reverse_dns_base_domain(
logging.debug("Response body:") logging.debug("Response body:")
logger.debug(csv_file.read()) logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0: if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...") logger.info("Loading included reverse DNS map...")
path = str( path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv") files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
@@ -411,28 +409,27 @@ def get_service_from_reverse_dns_base_domain(
path = local_file_path path = local_file_path
with open(path) as csv_file: with open(path) as csv_file:
load_csv(csv_file) load_csv(csv_file)
service: ReverseDNSService
try: try:
service = reverse_dns_map_value[base_domain] service = reverse_dns_map[base_domain]
except KeyError: except KeyError:
service = {"name": base_domain, "type": None} service = dict(name=base_domain, type=None)
return service return service
def get_ip_address_info( def get_ip_address_info(
ip_address, ip_address: str,
*, *,
ip_db_path: Optional[str] = None, ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None, reverse_dns_map_path: Optional[str] = None,
always_use_local_files: bool = False, always_use_local_files: Optional[bool] = False,
reverse_dns_map_url: Optional[str] = None, reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None, cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[ReverseDNSMap] = None, reverse_dns_map: Optional[dict[str, Any]] = None,
offline: bool = False, offline: Optional[bool] = False,
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
timeout: float = 2.0, timeout: Optional[float] = 2.0,
) -> IPAddressInfo: ) -> dict[str, Any]:
""" """
Returns reverse DNS and country information for the given IP address Returns reverse DNS and country information for the given IP address
@@ -455,22 +452,12 @@ def get_ip_address_info(
""" """
ip_address = ip_address.lower() ip_address = ip_address.lower()
if cache is not None: if cache is not None:
cached_info = cache.get(ip_address, None) info = cache.get(ip_address, None)
if ( if info:
cached_info
and isinstance(cached_info, dict)
and "ip_address" in cached_info
):
logger.debug(f"IP address {ip_address} was found in cache") logger.debug(f"IP address {ip_address} was found in cache")
return cast(IPAddressInfo, cached_info) return info
info: IPAddressInfo = { info = dict()
"ip_address": ip_address, info["ip_address"] = ip_address
"reverse_dns": None,
"country": None,
"base_domain": None,
"name": None,
"type": None,
}
if offline: if offline:
reverse_dns = None reverse_dns = None
else: else:
@@ -480,6 +467,9 @@ def get_ip_address_info(
country = get_ip_address_country(ip_address, db_path=ip_db_path) country = get_ip_address_country(ip_address, db_path=ip_db_path)
info["country"] = country info["country"] = country
info["reverse_dns"] = reverse_dns info["reverse_dns"] = reverse_dns
info["base_domain"] = None
info["name"] = None
info["type"] = None
if reverse_dns is not None: if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns) base_domain = get_base_domain(reverse_dns)
if base_domain is not None: if base_domain is not None:
@@ -504,7 +494,7 @@ def get_ip_address_info(
return info return info
def parse_email_address(original_address: str) -> dict[str, Optional[str]]: def parse_email_address(original_address: str) -> EmailAddress:
if original_address[0] == "": if original_address[0] == "":
display_name = None display_name = None
else: else:
@@ -568,7 +558,7 @@ def is_mbox(path: str) -> bool:
return _is_mbox return _is_mbox
def is_outlook_msg(content) -> bool: def is_outlook_msg(content: Union[bytes, Any]) -> bool:
""" """
Checks if the given content is an Outlook msg OLE/MSG file Checks if the given content is an Outlook msg OLE/MSG file
@@ -583,7 +573,7 @@ def is_outlook_msg(content) -> bool:
) )
def convert_outlook_msg(msg_bytes: bytes) -> bytes: def convert_outlook_msg(msg_bytes: bytes) -> str:
""" """
Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to
standard RFC 822 format standard RFC 822 format
@@ -592,7 +582,7 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
msg_bytes (bytes): the content of the .msg file msg_bytes (bytes): the content of the .msg file
Returns: Returns:
A RFC 822 bytes payload A RFC 822 string
""" """
if not is_outlook_msg(msg_bytes): if not is_outlook_msg(msg_bytes):
raise ValueError("The supplied bytes are not an Outlook MSG file") raise ValueError("The supplied bytes are not an Outlook MSG file")
@@ -601,13 +591,14 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
os.chdir(tmp_dir) os.chdir(tmp_dir)
with open("sample.msg", "wb") as msg_file: with open("sample.msg", "wb") as msg_file:
msg_file.write(msg_bytes) msg_file.write(msg_bytes)
rfc822_bytes: bytes
try: try:
subprocess.check_call( subprocess.check_call(
["msgconvert", "sample.msg"], stdout=null_file, stderr=null_file ["msgconvert", "sample.msg"], stdout=null_file, stderr=null_file
) )
eml_path = "sample.eml" eml_path = "sample.eml"
with open(eml_path, "rb") as eml_file: with open(eml_path, "rb") as eml_file:
rfc822 = eml_file.read() rfc822_bytes = eml_file.read()
except FileNotFoundError: except FileNotFoundError:
raise EmailParserError( raise EmailParserError(
"Failed to convert Outlook MSG: msgconvert utility not found" "Failed to convert Outlook MSG: msgconvert utility not found"
@@ -616,12 +607,12 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
os.chdir(orig_dir) os.chdir(orig_dir)
shutil.rmtree(tmp_dir) shutil.rmtree(tmp_dir)
return rfc822 return rfc822_bytes.decode("utf-8", errors="replace")
def parse_email( def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: bool = False data: Union[bytes, str], *, strip_attachment_payloads: Optional[bool] = False
) -> dict: ) -> dict[str, Any]:
""" """
A simplified email parser A simplified email parser
@@ -636,7 +627,8 @@ def parse_email(
if isinstance(data, bytes): if isinstance(data, bytes):
if is_outlook_msg(data): if is_outlook_msg(data):
data = convert_outlook_msg(data) data = convert_outlook_msg(data)
data = data.decode("utf-8", errors="replace") else:
data = data.decode("utf-8", errors="replace")
parsed_email = mailparser.parse_from_string(data) parsed_email = mailparser.parse_from_string(data)
headers = json.loads(parsed_email.headers_json).copy() headers = json.loads(parsed_email.headers_json).copy()
parsed_email = json.loads(parsed_email.mail_json).copy() parsed_email = json.loads(parsed_email.mail_json).copy()

View File

@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python :: 3" "Programming Language :: Python :: 3"
] ]
requires-python = ">=3.9" requires-python = ">=3.9, <3.14"
dependencies = [ dependencies = [
"azure-identity>=1.8.0", "azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0", "azure-monitor-ingestion>=1.0.0",
@@ -48,7 +48,7 @@ dependencies = [
"imapclient>=2.1.0", "imapclient>=2.1.0",
"kafka-python-ng>=2.2.2", "kafka-python-ng>=2.2.2",
"lxml>=4.4.0", "lxml>=4.4.0",
"mailsuite>=1.11.2", "mailsuite>=1.9.18",
"msgraph-core==0.2.2", "msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0", "opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0", "publicsuffixlist>=0.10.0",

19
tests.py Executable file → Normal file
View File

@@ -12,9 +12,6 @@ from lxml import etree
import parsedmarc import parsedmarc
import parsedmarc.utils import parsedmarc.utils
# Detect if running in GitHub Actions to skip DNS lookups
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
def minify_xml(xml_string): def minify_xml(xml_string):
parser = etree.XMLParser(remove_blank_text=True) parser = etree.XMLParser(remove_blank_text=True)
@@ -124,7 +121,7 @@ class Test(unittest.TestCase):
continue continue
print("Testing {0}: ".format(sample_path), end="") print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file( parsed_report = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE sample_path, always_use_local_files=True
)["report"] )["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report) parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
print("Passed!") print("Passed!")
@@ -132,7 +129,7 @@ class Test(unittest.TestCase):
def testEmptySample(self): def testEmptySample(self):
"""Test empty/unparasable report""" """Test empty/unparasable report"""
with self.assertRaises(parsedmarc.ParserError): with self.assertRaises(parsedmarc.ParserError):
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE) parsedmarc.parse_report_file("samples/empty.xml")
def testForensicSamples(self): def testForensicSamples(self):
"""Test sample forensic/ruf/failure DMARC reports""" """Test sample forensic/ruf/failure DMARC reports"""
@@ -142,12 +139,8 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="") print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file: with open(sample_path) as sample_file:
sample_content = sample_file.read() sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email( parsed_report = parsedmarc.parse_report_email(sample_content)["report"]
sample_content, offline=OFFLINE_MODE parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report) parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
print("Passed!") print("Passed!")
@@ -159,9 +152,7 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path): if os.path.isdir(sample_path):
continue continue
print("Testing {0}: ".format(sample_path), end="") print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file( parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report) parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!") print("Passed!")