Compare commits

..

27 Commits

Author SHA1 Message Date
Sean Whalen
110c6e507d Update docs 2025-12-01 17:04:37 -05:00
Sean Whalen
c8cdd90a1e Normalize timespans for aggregate reports in Elasticsearch and Opensearch 2025-12-01 16:34:40 -05:00
Sean Whalen
46a62cc10a Update launch configuration and metadata key for timespan in aggregate report 2025-12-01 16:10:41 -05:00
Sean Whalen
67fe009145 Add sources my name table to the Kibana DMARC Summary dashboard
This matches the table in the Splunk DMARC  Aggregate reports dashboard
2025-11-30 19:43:14 -05:00
Sean Whalen
e405e8fa53 Update changelog to correct timespan threshold for DMARC report normalization 2025-11-30 16:17:07 -05:00
Sean Whalen
a72d08ceb7 Refactor configuration loading for normalize_timespan_threshold_hours 2025-11-30 16:16:32 -05:00
Sean Whalen
2785e3df34 More fixes for normalize_timespan_threshold_hours: 2025-11-30 13:56:50 -05:00
Sean Whalen
f4470a7dd2 Fix normalize_timespan_threshold_hours 2025-11-30 13:46:21 -05:00
Sean Whalen
18b9894a1f Code formatting 2025-11-30 12:40:09 -05:00
Sean Whalen
d1791a97d3 Make timespan normalization hours configurable, with a 24.0 default 2025-11-30 12:23:38 -05:00
Sean Whalen
47ca6561c1 Fix changelog version 2025-11-30 10:46:48 -05:00
Sean Whalen
a0e18206ce Bump version to 9.0.0 2025-11-29 23:01:04 -05:00
Sean Whalen
9e4ffdd54c Add interval_begin, interval_end, and normalized_timespan to the Splunk report 2025-11-29 21:32:33 -05:00
Sean Whalen
434bd49eb3 Fix normalized_timespan in CSV output for aggregate reports 2025-11-29 21:23:39 -05:00
Sean Whalen
589038d2c9 Add normalized_timespan to CSV output for aggregate reports 2025-11-29 21:17:27 -05:00
Sean Whalen
c558224671 Rename normalized_timespan to timespan_requires_normalization and include interval_begin and interval_end in CSV output 2025-11-29 21:16:30 -05:00
Sean Whalen
044aa9e9a0 Include interval_begin in splunk output for accurate timestamping 2025-11-29 20:50:13 -05:00
Sean Whalen
6270468d30 Remove unneeded fields 2025-11-29 17:13:24 -05:00
Sean Whalen
832be7cfa3 Clean up imports 2025-11-29 16:56:12 -05:00
Sean Whalen
04dd11cf54 Fix formatting 2025-11-29 16:51:57 -05:00
Sean Whalen
0b41942916 Always include interval_begin and interval_end in records 2025-11-29 16:46:03 -05:00
Sean Whalen
f14a34202f Add morse type hints 2025-11-29 16:33:40 -05:00
Sean Whalen
daa6653c29 Bump version to 8.20.0 and update changelog for new report volume normalization 2025-11-29 15:26:25 -05:00
Sean Whalen
45d1093a99 Normalize report volumes when a report timespan exceed 24 hours 2025-11-29 14:52:57 -05:00
Sean Whalen
c1a757ca29 Remove outdated launch config 2025-11-29 14:45:21 -05:00
Sean Whalen
69b9d25a99 Revert code formatting 2025-11-29 14:14:54 -05:00
Sean Whalen
94d65f979d Code formatting 2025-11-29 14:04:20 -05:00
37 changed files with 1445 additions and 2316 deletions

View File

@@ -24,11 +24,11 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v3
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@v3
with:
images: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
@@ -40,14 +40,16 @@ jobs:
type=semver,pattern={{major}}.{{minor}}
- name: Log in to the Container registry
uses: docker/login-action@v3
# https://github.com/docker/login-action/releases/tag/v2.0.0
uses: docker/login-action@49ed152c8eca782a232dede0303416e8f356c37b
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v6
# https://github.com/docker/build-push-action/releases/tag/v3.0.0
uses: docker/build-push-action@e551b19e49efd4e98792db7592c17c09b89db8d8
with:
context: .
push: ${{ github.event_name == 'release' }}

View File

@@ -15,7 +15,7 @@ jobs:
services:
elasticsearch:
image: elasticsearch:8.19.7
image: elasticsearch:8.18.2
env:
discovery.type: single-node
cluster.name: parsedmarc-cluster
@@ -30,18 +30,18 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install system dependencies
run: |
sudo apt-get -q update
sudo apt-get -qy install libemail-outlook-message-perl
sudo apt-get update
sudo apt-get install -y libemail-outlook-message-perl
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
@@ -65,6 +65,6 @@ jobs:
run: |
hatch build
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}

296
.vscode/settings.json vendored
View File

@@ -1,166 +1,144 @@
{
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true,
// Let Ruff handle lint fixes + import sorting on save
"editor.codeActionsOnSave": {
"source.fixAll.ruff": "explicit",
"source.organizeImports.ruff": "explicit"
}
},
"markdownlint.config": {
"MD024": false
},
"cSpell.words": [
"adkim",
"akamaiedge",
"amsmath",
"andrewmcgilvray",
"arcname",
"aspf",
"autoclass",
"automodule",
"backported",
"bellsouth",
"boto",
"brakhane",
"Brightmail",
"CEST",
"CHACHA",
"checkdmarc",
"Codecov",
"confnew",
"dateparser",
"dateutil",
"Davmail",
"DBIP",
"dearmor",
"deflist",
"devel",
"DMARC",
"Dmarcian",
"dnspython",
"dollarmath",
"dpkg",
"exampleuser",
"expiringdict",
"fieldlist",
"GELF",
"genindex",
"geoip",
"geoipupdate",
"Geolite",
"geolocation",
"githubpages",
"Grafana",
"hostnames",
"htpasswd",
"httpasswd",
"httplib",
"ifhost",
"IMAP",
"imapclient",
"infile",
"Interaktive",
"IPDB",
"journalctl",
"kafkaclient",
"keepalive",
"keyout",
"keyrings",
"Leeman",
"libemail",
"linkify",
"LISTSERV",
"loganalytics",
"lxml",
"mailparser",
"mailrelay",
"mailsuite",
"maxdepth",
"MAXHEADERS",
"maxmind",
"mbox",
"mfrom",
"mhdw",
"michaeldavie",
"mikesiegel",
"Mimecast",
"mitigations",
"MMDB",
"modindex",
"msgconvert",
"msgraph",
"MSSP",
"multiprocess",
"Munge",
"ndjson",
"newkey",
"Nhcm",
"nojekyll",
"nondigest",
"nosecureimap",
"nosniff",
"nwettbewerb",
"opensearch",
"opensearchpy",
"parsedmarc",
"passsword",
"pbar",
"Postorius",
"premade",
"privatesuffix",
"procs",
"publicsuffix",
"publicsuffixlist",
"publixsuffix",
"pygelf",
"pypy",
"pytest",
"quickstart",
"Reindex",
"replyto",
"reversename",
"Rollup",
"Rpdm",
"SAMEORIGIN",
"sdist",
"Servernameone",
"setuptools",
"smartquotes",
"SMTPTLS",
"sortlists",
"sortmaps",
"sourcetype",
"STARTTLS",
"tasklist",
"timespan",
"tlsa",
"tlsrpt",
"toctree",
"TQDDM",
"tqdm",
"truststore",
"Übersicht",
"uids",
"Uncategorized",
"unparasable",
"uper",
"urllib",
"Valimail",
"venv",
"Vhcw",
"viewcode",
"virtualenv",
"WBITS",
"webmail",
"Wettbewerber",
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
"adkim",
"akamaiedge",
"amsmath",
"andrewmcgilvray",
"arcname",
"aspf",
"autoclass",
"automodule",
"backported",
"bellsouth",
"boto",
"brakhane",
"Brightmail",
"CEST",
"CHACHA",
"checkdmarc",
"Codecov",
"confnew",
"dateparser",
"dateutil",
"Davmail",
"DBIP",
"dearmor",
"deflist",
"devel",
"DMARC",
"Dmarcian",
"dnspython",
"dollarmath",
"dpkg",
"exampleuser",
"expiringdict",
"fieldlist",
"genindex",
"geoip",
"geoipupdate",
"Geolite",
"geolocation",
"githubpages",
"Grafana",
"hostnames",
"htpasswd",
"httpasswd",
"httplib",
"IMAP",
"imapclient",
"infile",
"Interaktive",
"IPDB",
"journalctl",
"keepalive",
"keyout",
"keyrings",
"Leeman",
"libemail",
"linkify",
"LISTSERV",
"lxml",
"mailparser",
"mailrelay",
"mailsuite",
"maxdepth",
"maxmind",
"mbox",
"mfrom",
"michaeldavie",
"mikesiegel",
"mitigations",
"MMDB",
"modindex",
"msgconvert",
"msgraph",
"MSSP",
"Munge",
"ndjson",
"newkey",
"Nhcm",
"nojekyll",
"nondigest",
"nosecureimap",
"nosniff",
"nwettbewerb",
"opensearch",
"parsedmarc",
"passsword",
"Postorius",
"premade",
"procs",
"publicsuffix",
"publicsuffixlist",
"publixsuffix",
"pygelf",
"pypy",
"pytest",
"quickstart",
"Reindex",
"replyto",
"reversename",
"Rollup",
"Rpdm",
"SAMEORIGIN",
"sdist",
"Servernameone",
"setuptools",
"smartquotes",
"SMTPTLS",
"sortlists",
"sortmaps",
"sourcetype",
"STARTTLS",
"tasklist",
"timespan",
"tlsa",
"tlsrpt",
"toctree",
"TQDDM",
"tqdm",
"truststore",
"Übersicht",
"uids",
"unparasable",
"uper",
"urllib",
"Valimail",
"venv",
"Vhcw",
"viewcode",
"virtualenv",
"WBITS",
"webmail",
"Wettbewerber",
"Whalen",
"whitespaces",
"xennn",
"xmltodict",
"xpack",
"zscholl"
],
}

File diff suppressed because it is too large Load Diff

View File

@@ -23,10 +23,11 @@ ProofPoint Email Fraud Defense, and Valimail.
## Help Wanted
This project is maintained by one developer. Please consider reviewing the open
[issues](https://github.com/domainaware/parsedmarc/issues) to see how you can
contribute code, documentation, or user support. Assistance on the pinned
issues would be particularly helpful.
This project is maintained by one developer. Please consider
reviewing the open
[issues](https://github.com/domainaware/parsedmarc/issues) to see how
you can contribute code, documentation, or user support. Assistance on
the pinned issues would be particularly helpful.
Thanks to all
[contributors](https://github.com/domainaware/parsedmarc/graphs/contributors)!
@@ -41,24 +42,6 @@ Thanks to all
- Consistent data structures
- Simple JSON and/or CSV output
- Optionally email the results
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for
use with premade dashboards
- Optionally send the results to Elasticsearch, Opensearch, and/or Splunk, for use
with premade dashboards
- Optionally send reports to Apache Kafka
## Python Compatibility
This project supports the following Python versions, which are either actively maintained or are the default versions
for RHEL or Debian.
| Version | Supported | Reason |
|---------|-----------|------------------------------------------------------------|
| < 3.6 | ❌ | End of Life (EOL) |
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) |
| 3.9 | ✅ | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
| 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | ✅ | Actively maintained |

View File

@@ -9,11 +9,12 @@ fi
. venv/bin/activate
pip install .[build]
ruff format .
ruff check .
cd docs
make clean
make html
touch build/html/.nojekyll
if [ -d "../../parsedmarc-docs" ]; then
if [ -d "./../parsedmarc-docs" ]; then
cp -rf build/html/* ../../parsedmarc-docs/
fi
cd ..

1
ci.ini
View File

@@ -3,7 +3,6 @@ save_aggregate = True
save_forensic = True
save_smtp_tls = True
debug = True
offline = True
[elasticsearch]
hosts = http://localhost:9200

View File

@@ -1,6 +1,8 @@
version: '3.7'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.7
image: docker.elastic.co/elasticsearch/elasticsearch:8.3.1
environment:
- network.host=127.0.0.1
- http.host=0.0.0.0
@@ -12,7 +14,7 @@ services:
- xpack.security.enabled=false
- xpack.license.self_generated.type=basic
ports:
- "127.0.0.1:9200:9200"
- 127.0.0.1:9200:9200
ulimits:
memlock:
soft: -1
@@ -28,7 +30,7 @@ services:
retries: 24
opensearch:
image: opensearchproject/opensearch:2
image: opensearchproject/opensearch:2.18.0
environment:
- network.host=127.0.0.1
- http.host=0.0.0.0
@@ -39,7 +41,7 @@ services:
- bootstrap.memory_lock=true
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
ports:
- "127.0.0.1:9201:9200"
- 127.0.0.1:9201:9200
ulimits:
memlock:
soft: -1

View File

@@ -28,13 +28,6 @@
:members:
```
## parsedmarc.types
```{eval-rst}
.. automodule:: parsedmarc.types
:members:
```
## parsedmarc.utils
```{eval-rst}

View File

@@ -20,7 +20,7 @@ from parsedmarc import __version__
# -- Project information -----------------------------------------------------
project = "parsedmarc"
copyright = "2018 - 2025, Sean Whalen and contributors"
copyright = "2018 - 2023, Sean Whalen and contributors"
author = "Sean Whalen and contributors"
# The version info for the project you're documenting, acts as replacement for

View File

@@ -45,24 +45,6 @@ and Valimail.
with premade dashboards
- Optionally send reports to Apache Kafka
## Python Compatibility
This project supports the following Python versions, which are either actively maintained or are the default versions
for RHEL or Debian.
| Version | Supported | Reason |
|---------|-----------|------------------------------------------------------------|
| < 3.6 | ❌ | End of Life (EOL) |
| 3.6 | ❌ | Used in RHEL 8, but not supported by project dependencies |
| 3.7 | ❌ | End of Life (EOL) |
| 3.8 | ❌ | End of Life (EOL) |
| 3.9 | ✅ | Supported until August 2026 (Debian 11); May 2032 (RHEL 9) |
| 3.10 | ✅ | Actively maintained |
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | ✅ | Actively maintained |
```{toctree}
:caption: 'Contents'
:maxdepth: 2

View File

@@ -199,7 +199,7 @@ sudo apt-get install libemail-outlook-message-perl
[geoipupdate releases page on github]: https://github.com/maxmind/geoipupdate/releases
[ip to country lite database]: https://db-ip.com/db/download/ip-to-country-lite
[license keys]: https://www.maxmind.com/en/accounts/current/license-key
[maxmind geoipupdate page]: https://dev.maxmind.com/geoip/updating-databases/
[maxmind geoipupdate page]: https://dev.maxmind.com/geoip/geoipupdate/
[maxmind geolite2 country database]: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data
[registering for a free geolite2 account]: https://www.maxmind.com/en/geolite2/signup
[to comply with various privacy regulations]: https://blog.maxmind.com/2019/12/18/significant-changes-to-accessing-and-using-geolite2-databases/

View File

@@ -171,8 +171,8 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}).
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
@@ -240,7 +240,7 @@ The full set of configuration options are:
group and use that as the group id.
```powershell
New-ApplicationAccessPolicy -AccessRight RestrictAccess
New-ApplicationAccessPolicy -AccessRight RestrictAccess
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
-Description "Restrict access to dmarc reports mailbox."
```
@@ -257,7 +257,7 @@ The full set of configuration options are:
:::
- `user` - str: Basic auth username
- `password` - str: Basic auth password
- `api_key` - str: API key
- `apiKey` - str: API key
- `ssl` - bool: Use an encrypted SSL/TLS connection
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
@@ -280,7 +280,7 @@ The full set of configuration options are:
:::
- `user` - str: Basic auth username
- `password` - str: Basic auth password
- `api_key` - str: API key
- `apiKey` - str: API key
- `ssl` - bool: Use an encrypted SSL/TLS connection
(Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60)
@@ -336,65 +336,13 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional)
- `syslog`
- `server` - str: The Syslog server name or IP address
- `port` - int: The port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `port` - int: The UDP port to use (Default: `514`)
- `gmail_api`
- `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
@@ -494,7 +442,7 @@ Update the limit to 2k per example:
PUT _cluster/settings
{
"persistent" : {
"cluster.max_shards_per_node" : 2000
"cluster.max_shards_per_node" : 2000
}
}
```

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -3,55 +3,54 @@
"""A CLI for parsing DMARC reports"""
import http.client
import json
import logging
from argparse import Namespace, ArgumentParser
import os
import sys
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from glob import glob
from multiprocessing import Pipe, Process
from ssl import CERT_NONE, create_default_context
import logging
import math
import yaml
from collections import OrderedDict
import json
from ssl import CERT_NONE, create_default_context
from multiprocessing import Pipe, Process
import sys
import http.client
from tqdm import tqdm
from parsedmarc import (
SEEN_AGGREGATE_REPORT_IDS,
InvalidDMARCReport,
get_dmarc_reports_from_mailbox,
watch_inbox,
parse_report_file,
get_dmarc_reports_from_mbox,
elastic,
opensearch,
kafkaclient,
splunk,
save_output,
email_results,
ParserError,
__version__,
elastic,
email_results,
gelf,
get_dmarc_reports_from_mailbox,
get_dmarc_reports_from_mbox,
kafkaclient,
loganalytics,
opensearch,
parse_report_file,
InvalidDMARCReport,
s3,
save_output,
splunk,
syslog,
watch_inbox,
loganalytics,
gelf,
webhook,
)
from parsedmarc.log import logger
from parsedmarc.mail import (
GmailConnection,
IMAPConnection,
MaildirConnection,
MSGraphConnection,
GmailConnection,
MaildirConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
# private stdlib attribute and may not exist in type stubs.
setattr(http.client, "_MAXHEADERS", 200)
from parsedmarc.log import logger
from parsedmarc.utils import is_mbox, get_reverse_dns, get_base_domain
from parsedmarc import SEEN_AGGREGATE_REPORT_IDS
http.client._MAXHEADERS = 200 # pylint:disable=protected-access
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
@@ -68,48 +67,6 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
@@ -122,29 +79,8 @@ def cli_parse(
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
"""Separated this function for multiprocessing"""
try:
file_results = parse_report_file(
file_path,
@@ -169,7 +105,6 @@ def _main():
"""Called when the module is executed"""
def get_index_prefix(report):
domain = None
if index_prefix_domain_map is None:
return None
if "policy_published" in report:
@@ -203,7 +138,7 @@ def _main():
print(output_str)
if opts.output:
save_output(
reports_,
results,
output_directory=opts.output,
aggregate_json_filename=opts.aggregate_json_filename,
forensic_json_filename=opts.forensic_json_filename,
@@ -658,7 +593,7 @@ def _main():
elasticsearch_monthly_indexes=False,
elasticsearch_username=None,
elasticsearch_password=None,
elasticsearch_api_key=None,
elasticsearch_apiKey=None,
opensearch_hosts=None,
opensearch_timeout=60,
opensearch_number_of_shards=1,
@@ -670,7 +605,7 @@ def _main():
opensearch_monthly_indexes=False,
opensearch_username=None,
opensearch_password=None,
opensearch_api_key=None,
opensearch_apiKey=None,
kafka_hosts=None,
kafka_username=None,
kafka_password=None,
@@ -697,13 +632,6 @@ def _main():
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
@@ -749,7 +677,7 @@ def _main():
if "general" in config.sections():
general_config = config["general"]
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
opts.silent = general_config.getboolean("silent")
if "normalize_timespan_threshold_hours" in general_config:
opts.normalize_timespan_threshold_hours = general_config.getfloat(
"normalize_timespan_threshold_hours"
@@ -758,10 +686,10 @@ def _main():
with open(general_config["index_prefix_domain_map"]) as f:
index_prefix_domain_map = yaml.safe_load(f)
if "offline" in general_config:
opts.offline = bool(general_config.getboolean("offline"))
opts.offline = general_config.getboolean("offline")
if "strip_attachment_payloads" in general_config:
opts.strip_attachment_payloads = bool(
general_config.getboolean("strip_attachment_payloads")
opts.strip_attachment_payloads = general_config.getboolean(
"strip_attachment_payloads"
)
if "output" in general_config:
opts.output = general_config["output"]
@@ -779,8 +707,6 @@ def _main():
opts.smtp_tls_csv_filename = general_config["smtp_tls_csv_filename"]
if "dns_timeout" in general_config:
opts.dns_timeout = general_config.getfloat("dns_timeout")
if opts.dns_timeout is None:
opts.dns_timeout = 2
if "dns_test_address" in general_config:
opts.dns_test_address = general_config["dns_test_address"]
if "nameservers" in general_config:
@@ -803,19 +729,19 @@ def _main():
)
exit(-1)
if "save_aggregate" in general_config:
opts.save_aggregate = bool(general_config.getboolean("save_aggregate"))
opts.save_aggregate = general_config["save_aggregate"]
if "save_forensic" in general_config:
opts.save_forensic = bool(general_config.getboolean("save_forensic"))
opts.save_forensic = general_config["save_forensic"]
if "save_smtp_tls" in general_config:
opts.save_smtp_tls = bool(general_config.getboolean("save_smtp_tls"))
opts.save_smtp_tls = general_config["save_smtp_tls"]
if "debug" in general_config:
opts.debug = bool(general_config.getboolean("debug"))
opts.debug = general_config.getboolean("debug")
if "verbose" in general_config:
opts.verbose = bool(general_config.getboolean("verbose"))
opts.verbose = general_config.getboolean("verbose")
if "silent" in general_config:
opts.silent = bool(general_config.getboolean("silent"))
opts.silent = general_config.getboolean("silent")
if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings"))
opts.warnings = general_config.getboolean("warnings")
if "log_file" in general_config:
opts.log_file = general_config["log_file"]
if "n_procs" in general_config:
@@ -825,15 +751,15 @@ def _main():
else:
opts.ip_db_path = None
if "always_use_local_files" in general_config:
opts.always_use_local_files = bool(
general_config.getboolean("always_use_local_files")
opts.always_use_local_files = general_config.getboolean(
"always_use_local_files"
)
if "reverse_dns_map_path" in general_config:
opts.reverse_dns_map_path = general_config["reverse_dns_path"]
if "reverse_dns_map_url" in general_config:
opts.reverse_dns_map_url = general_config["reverse_dns_url"]
if "prettify_json" in general_config:
opts.prettify_json = bool(general_config.getboolean("prettify_json"))
opts.prettify_json = general_config.getboolean("prettify_json")
if "mailbox" in config.sections():
mailbox_config = config["mailbox"]
@@ -844,11 +770,11 @@ def _main():
if "archive_folder" in mailbox_config:
opts.mailbox_archive_folder = mailbox_config["archive_folder"]
if "watch" in mailbox_config:
opts.mailbox_watch = bool(mailbox_config.getboolean("watch"))
opts.mailbox_watch = mailbox_config.getboolean("watch")
if "delete" in mailbox_config:
opts.mailbox_delete = bool(mailbox_config.getboolean("delete"))
opts.mailbox_delete = mailbox_config.getboolean("delete")
if "test" in mailbox_config:
opts.mailbox_test = bool(mailbox_config.getboolean("test"))
opts.mailbox_test = mailbox_config.getboolean("test")
if "batch_size" in mailbox_config:
opts.mailbox_batch_size = mailbox_config.getint("batch_size")
if "check_timeout" in mailbox_config:
@@ -872,15 +798,14 @@ def _main():
if "port" in imap_config:
opts.imap_port = imap_config.getint("port")
if "timeout" in imap_config:
opts.imap_timeout = imap_config.getint("timeout")
opts.imap_timeout = imap_config.getfloat("timeout")
if "max_retries" in imap_config:
opts.imap_max_retries = imap_config.getint("max_retries")
if "ssl" in imap_config:
opts.imap_ssl = bool(imap_config.getboolean("ssl"))
opts.imap_ssl = imap_config.getboolean("ssl")
if "skip_certificate_verification" in imap_config:
opts.imap_skip_certificate_verification = bool(
imap_config.getboolean("skip_certificate_verification")
)
imap_verify = imap_config.getboolean("skip_certificate_verification")
opts.imap_skip_certificate_verification = imap_verify
if "user" in imap_config:
opts.imap_user = imap_config["user"]
else:
@@ -908,7 +833,7 @@ def _main():
"section instead."
)
if "watch" in imap_config:
opts.mailbox_watch = bool(imap_config.getboolean("watch"))
opts.mailbox_watch = imap_config.getboolean("watch")
logger.warning(
"Use of the watch option in the imap "
"configuration section has been deprecated. "
@@ -923,7 +848,7 @@ def _main():
"section instead."
)
if "test" in imap_config:
opts.mailbox_test = bool(imap_config.getboolean("test"))
opts.mailbox_test = imap_config.getboolean("test")
logger.warning(
"Use of the test option in the imap "
"configuration section has been deprecated. "
@@ -1017,8 +942,8 @@ def _main():
opts.graph_url = graph_config["graph_url"]
if "allow_unencrypted_storage" in graph_config:
opts.graph_allow_unencrypted_storage = bool(
graph_config.getboolean("allow_unencrypted_storage")
opts.graph_allow_unencrypted_storage = graph_config.getboolean(
"allow_unencrypted_storage"
)
if "elasticsearch" in config:
@@ -1046,22 +971,18 @@ def _main():
if "index_prefix" in elasticsearch_config:
opts.elasticsearch_index_prefix = elasticsearch_config["index_prefix"]
if "monthly_indexes" in elasticsearch_config:
monthly = bool(elasticsearch_config.getboolean("monthly_indexes"))
monthly = elasticsearch_config.getboolean("monthly_indexes")
opts.elasticsearch_monthly_indexes = monthly
if "ssl" in elasticsearch_config:
opts.elasticsearch_ssl = bool(elasticsearch_config.getboolean("ssl"))
opts.elasticsearch_ssl = elasticsearch_config.getboolean("ssl")
if "cert_path" in elasticsearch_config:
opts.elasticsearch_ssl_cert_path = elasticsearch_config["cert_path"]
if "user" in elasticsearch_config:
opts.elasticsearch_username = elasticsearch_config["user"]
if "password" in elasticsearch_config:
opts.elasticsearch_password = elasticsearch_config["password"]
# Until 8.20
if "apiKey" in elasticsearch_config:
opts.elasticsearch_apiKey = elasticsearch_config["apiKey"]
# Since 8.20
if "api_key" in elasticsearch_config:
opts.elasticsearch_apiKey = elasticsearch_config["api_key"]
if "opensearch" in config:
opensearch_config = config["opensearch"]
@@ -1086,22 +1007,18 @@ def _main():
if "index_prefix" in opensearch_config:
opts.opensearch_index_prefix = opensearch_config["index_prefix"]
if "monthly_indexes" in opensearch_config:
monthly = bool(opensearch_config.getboolean("monthly_indexes"))
monthly = opensearch_config.getboolean("monthly_indexes")
opts.opensearch_monthly_indexes = monthly
if "ssl" in opensearch_config:
opts.opensearch_ssl = bool(opensearch_config.getboolean("ssl"))
opts.opensearch_ssl = opensearch_config.getboolean("ssl")
if "cert_path" in opensearch_config:
opts.opensearch_ssl_cert_path = opensearch_config["cert_path"]
if "user" in opensearch_config:
opts.opensearch_username = opensearch_config["user"]
if "password" in opensearch_config:
opts.opensearch_password = opensearch_config["password"]
# Until 8.20
if "apiKey" in opensearch_config:
opts.opensearch_apiKey = opensearch_config["apiKey"]
# Since 8.20
if "api_key" in opensearch_config:
opts.opensearch_apiKey = opensearch_config["api_key"]
if "splunk_hec" in config.sections():
hec_config = config["splunk_hec"]
@@ -1143,11 +1060,9 @@ def _main():
if "password" in kafka_config:
opts.kafka_password = kafka_config["password"]
if "ssl" in kafka_config:
opts.kafka_ssl = bool(kafka_config.getboolean("ssl"))
opts.kafka_ssl = kafka_config.getboolean("ssl")
if "skip_certificate_verification" in kafka_config:
kafka_verify = bool(
kafka_config.getboolean("skip_certificate_verification")
)
kafka_verify = kafka_config.getboolean("skip_certificate_verification")
opts.kafka_skip_certificate_verification = kafka_verify
if "aggregate_topic" in kafka_config:
opts.kafka_aggregate_topic = kafka_config["aggregate_topic"]
@@ -1179,11 +1094,9 @@ def _main():
if "port" in smtp_config:
opts.smtp_port = smtp_config.getint("port")
if "ssl" in smtp_config:
opts.smtp_ssl = bool(smtp_config.getboolean("ssl"))
opts.smtp_ssl = smtp_config.getboolean("ssl")
if "skip_certificate_verification" in smtp_config:
smtp_verify = bool(
smtp_config.getboolean("skip_certificate_verification")
)
smtp_verify = smtp_config.getboolean("skip_certificate_verification")
opts.smtp_skip_certificate_verification = smtp_verify
if "user" in smtp_config:
opts.smtp_user = smtp_config["user"]
@@ -1246,54 +1159,28 @@ def _main():
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
opts.gmail_api_credentials_file = gmail_api_config.get("credentials_file")
opts.gmail_api_token_file = gmail_api_config.get("token_file", ".token")
opts.gmail_api_include_spam_trash = bool(
gmail_api_config.getboolean("include_spam_trash", False)
opts.gmail_api_include_spam_trash = gmail_api_config.getboolean(
"include_spam_trash", False
)
opts.gmail_api_paginate_messages = bool(
gmail_api_config.getboolean("paginate_messages", True)
opts.gmail_api_paginate_messages = gmail_api_config.getboolean(
"paginate_messages", True
)
opts.gmail_api_scopes = gmail_api_config.get(
"scopes", default_gmail_api_scope
)
opts.gmail_api_scopes = _str_to_list(opts.gmail_api_scopes)
if "oauth2_port" in gmail_api_config:
opts.gmail_api_oauth2_port = gmail_api_config.getint(
"oauth2_port", 8080
)
opts.gmail_api_oauth2_port = gmail_api_config.get("oauth2_port", 8080)
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
opts.maildir_path = maildir_api_config.get("maildir_path")
opts.maildir_create = bool(
maildir_api_config.getboolean("maildir_create", fallback=False)
)
opts.maildir_create = maildir_api_config.get("maildir_create")
if "log_analytics" in config.sections():
log_analytics_config = config["log_analytics"]
@@ -1388,19 +1275,14 @@ def _main():
es_aggregate_index = "{0}{1}".format(prefix, es_aggregate_index)
es_forensic_index = "{0}{1}".format(prefix, es_forensic_index)
es_smtp_tls_index = "{0}{1}".format(prefix, es_smtp_tls_index)
elastic_timeout_value = (
float(opts.elasticsearch_timeout)
if opts.elasticsearch_timeout is not None
else 60.0
)
elastic.set_hosts(
opts.elasticsearch_hosts,
use_ssl=opts.elasticsearch_ssl,
ssl_cert_path=opts.elasticsearch_ssl_cert_path,
username=opts.elasticsearch_username,
password=opts.elasticsearch_password,
api_key=opts.elasticsearch_api_key,
timeout=elastic_timeout_value,
opts.elasticsearch_ssl,
opts.elasticsearch_ssl_cert_path,
opts.elasticsearch_username,
opts.elasticsearch_password,
opts.elasticsearch_apiKey,
timeout=opts.elasticsearch_timeout,
)
elastic.migrate_indexes(
aggregate_indexes=[es_aggregate_index],
@@ -1425,19 +1307,14 @@ def _main():
os_aggregate_index = "{0}{1}".format(prefix, os_aggregate_index)
os_forensic_index = "{0}{1}".format(prefix, os_forensic_index)
os_smtp_tls_index = "{0}{1}".format(prefix, os_smtp_tls_index)
opensearch_timeout_value = (
float(opts.opensearch_timeout)
if opts.opensearch_timeout is not None
else 60.0
)
opensearch.set_hosts(
opts.opensearch_hosts,
use_ssl=opts.opensearch_ssl,
ssl_cert_path=opts.opensearch_ssl_cert_path,
username=opts.opensearch_username,
password=opts.opensearch_password,
api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value,
opts.opensearch_ssl,
opts.opensearch_ssl_cert_path,
opts.opensearch_username,
opts.opensearch_password,
opts.opensearch_apiKey,
timeout=opts.opensearch_timeout,
)
opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index],
@@ -1465,13 +1342,6 @@ def _main():
syslog_client = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts if opts.syslog_retry_attempts is not None else 3,
retry_delay=opts.syslog_retry_delay if opts.syslog_retry_delay is not None else 5,
)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))
@@ -1553,23 +1423,16 @@ def _main():
results = []
pbar = None
if sys.stdout.isatty():
pbar = tqdm(total=len(file_paths))
n_procs = int(opts.n_procs or 1)
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
for batch_index in range(math.ceil(len(file_paths) / opts.n_procs)):
processes = []
connections = []
for proc_index in range(n_procs * batch_index, n_procs * (batch_index + 1)):
for proc_index in range(
opts.n_procs * batch_index, opts.n_procs * (batch_index + 1)
):
if proc_index >= len(file_paths):
break
@@ -1590,8 +1453,6 @@ def _main():
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
@@ -1604,15 +1465,12 @@ def _main():
for proc in processes:
proc.join()
if pbar is not None:
if sys.stdout.isatty():
counter += 1
pbar.update(1)
if pbar is not None:
pbar.close()
pbar.update(counter - pbar.n)
for result in results:
if isinstance(result[0], ParserError) or result[0] is None:
if type(result[0]) is ParserError:
logger.error("Failed to parse {0} - {1}".format(result[1], result[0]))
else:
if result[0]["report_type"] == "aggregate":
@@ -1633,11 +1491,6 @@ def _main():
smtp_tls_reports.append(result[0]["report"])
for mbox_path in mbox_paths:
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
strip = opts.strip_attachment_payloads
reports = get_dmarc_reports_from_mbox(
mbox_path,
@@ -1649,17 +1502,13 @@ def _main():
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
forensic_reports += reports["forensic_reports"]
smtp_tls_reports += reports["smtp_tls_reports"]
mailbox_connection = None
mailbox_batch_size_value = 10
mailbox_check_timeout_value = 30
normalize_timespan_threshold_hours_value = 24.0
if opts.imap_host:
try:
if opts.imap_user is None or opts.imap_password is None:
@@ -1672,23 +1521,16 @@ def _main():
if opts.imap_skip_certificate_verification:
logger.debug("Skipping IMAP certificate verification")
verify = False
if not opts.imap_ssl:
if opts.imap_ssl is False:
ssl = False
imap_timeout = (
int(opts.imap_timeout) if opts.imap_timeout is not None else 30
)
imap_max_retries = (
int(opts.imap_max_retries) if opts.imap_max_retries is not None else 4
)
imap_port_value = int(opts.imap_port) if opts.imap_port is not None else 993
mailbox_connection = IMAPConnection(
host=opts.imap_host,
port=imap_port_value,
port=opts.imap_port,
ssl=ssl,
verify=verify,
timeout=imap_timeout,
max_retries=imap_max_retries,
timeout=opts.imap_timeout,
max_retries=opts.imap_max_retries,
user=opts.imap_user,
password=opts.imap_password,
)
@@ -1709,7 +1551,7 @@ def _main():
username=opts.graph_user,
password=opts.graph_password,
token_file=opts.graph_token_file,
allow_unencrypted_storage=bool(opts.graph_allow_unencrypted_storage),
allow_unencrypted_storage=opts.graph_allow_unencrypted_storage,
graph_url=opts.graph_url,
)
@@ -1754,24 +1596,11 @@ def _main():
exit(1)
if mailbox_connection:
mailbox_batch_size_value = (
int(opts.mailbox_batch_size) if opts.mailbox_batch_size is not None else 10
)
mailbox_check_timeout_value = (
int(opts.mailbox_check_timeout)
if opts.mailbox_check_timeout is not None
else 30
)
normalize_timespan_threshold_hours_value = (
float(opts.normalize_timespan_threshold_hours)
if opts.normalize_timespan_threshold_hours is not None
else 24.0
)
try:
reports = get_dmarc_reports_from_mailbox(
connection=mailbox_connection,
delete=opts.mailbox_delete,
batch_size=mailbox_batch_size_value,
batch_size=opts.mailbox_batch_size,
reports_folder=opts.mailbox_reports_folder,
archive_folder=opts.mailbox_archive_folder,
ip_db_path=opts.ip_db_path,
@@ -1783,7 +1612,7 @@ def _main():
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
aggregate_reports += reports["aggregate_reports"]
@@ -1794,31 +1623,27 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
results = OrderedDict(
[
("aggregate_reports", aggregate_reports),
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports),
]
)
process_reports(parsing_results)
process_reports(results)
if opts.smtp_host:
try:
verify = True
if opts.smtp_skip_certificate_verification:
verify = False
smtp_port_value = int(opts.smtp_port) if opts.smtp_port is not None else 25
smtp_to_value = (
list(opts.smtp_to)
if isinstance(opts.smtp_to, list)
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,
port=smtp_port_value,
opts.smtp_to,
port=opts.smtp_port,
verify=verify,
username=opts.smtp_user,
password=opts.smtp_password,
@@ -1840,17 +1665,17 @@ def _main():
archive_folder=opts.mailbox_archive_folder,
delete=opts.mailbox_delete,
test=opts.mailbox_test,
check_timeout=mailbox_check_timeout_value,
check_timeout=opts.mailbox_check_timeout,
nameservers=opts.nameservers,
dns_timeout=opts.dns_timeout,
strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value,
batch_size=opts.mailbox_batch_size,
ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path,
reverse_dns_map_url=opts.reverse_dns_map_url,
offline=opts.offline,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours_value,
normalize_timespan_threshold_hours=opts.normalize_timespan_threshold_hours,
)
except FileExistsError as error:
logger.error("{0}".format(error.__str__()))

View File

@@ -1,3 +1,2 @@
__version__ = "9.0.10"
__version__ = "9.0.0"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -1,29 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from collections import OrderedDict
from typing import Any, Optional, Union
from elasticsearch.helpers import reindex
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import (
Boolean,
Date,
connections,
Object,
Document,
Index,
Nested,
InnerDoc,
Integer,
Ip,
Nested,
Object,
Search,
Text,
connections,
Boolean,
Ip,
Date,
Search,
)
from elasticsearch_dsl.search import Q
from elasticsearch.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class ElasticsearchError(Exception):
@@ -91,18 +89,18 @@ class _AggregateReportDoc(Document):
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
def add_policy_override(self, type_: str, comment: str):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) # pyright: ignore[reportCallIssue]
def add_policy_override(self, type_, comment):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
def add_dkim_result(self, domain, selector, result):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
) # pyright: ignore[reportCallIssue]
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) # pyright: ignore[reportCallIssue]
def add_spf_result(self, domain, scope, result):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
def save(self, **kwargs):
self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -135,26 +133,26 @@ class _ForensicSampleDoc(InnerDoc):
body = Text()
attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name: str, address: str):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_to(self, display_name, address):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_reply_to(self, display_name: str, address: str):
def add_reply_to(self, display_name, address):
self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address)
) # pyright: ignore[reportCallIssue]
)
def add_cc(self, display_name: str, address: str):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_cc(self, display_name, address):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_bcc(self, display_name: str, address: str):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) # pyright: ignore[reportCallIssue]
def add_bcc(self, display_name, address):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_attachment(self, filename: str, content_type: str, sha256: str):
def add_attachment(self, filename, content_type, sha256):
self.attachments.append(
_EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256
)
) # pyright: ignore[reportCallIssue]
)
class _ForensicReportDoc(Document):
@@ -201,15 +199,15 @@ class _SMTPTLSPolicyDoc(InnerDoc):
def add_failure_details(
self,
result_type: Optional[str] = None,
ip_address: Optional[str] = None,
receiving_ip: Optional[str] = None,
receiving_mx_helo: Optional[str] = None,
failed_session_count: Optional[int] = None,
sending_mta_ip: Optional[str] = None,
receiving_mx_hostname: Optional[str] = None,
additional_information_uri: Optional[str] = None,
failure_reason_code: Union[str, int, None] = None,
result_type,
ip_address,
receiving_ip,
receiving_mx_helo,
failed_session_count,
sending_mta_ip=None,
receiving_mx_hostname=None,
additional_information_uri=None,
failure_reason_code=None,
):
_details = _SMTPTLSFailureDetailsDoc(
result_type=result_type,
@@ -222,7 +220,7 @@ class _SMTPTLSPolicyDoc(InnerDoc):
additional_information=additional_information_uri,
failure_reason_code=failure_reason_code,
)
self.failure_details.append(_details) # pyright: ignore[reportCallIssue]
self.failure_details.append(_details)
class _SMTPTLSReportDoc(Document):
@@ -239,14 +237,13 @@ class _SMTPTLSReportDoc(Document):
def add_policy(
self,
policy_type: str,
policy_domain: str,
successful_session_count: int,
failed_session_count: int,
*,
policy_string: Optional[str] = None,
mx_host_patterns: Optional[list[str]] = None,
failure_details: Optional[str] = None,
policy_type,
policy_domain,
successful_session_count,
failed_session_count,
policy_string=None,
mx_host_patterns=None,
failure_details=None,
):
self.policies.append(
policy_type=policy_type,
@@ -256,7 +253,7 @@ class _SMTPTLSReportDoc(Document):
policy_string=policy_string,
mx_host_patterns=mx_host_patterns,
failure_details=failure_details,
) # pyright: ignore[reportCallIssue]
)
class AlreadySaved(ValueError):
@@ -264,25 +261,24 @@ class AlreadySaved(ValueError):
def set_hosts(
hosts: Union[str, list[str]],
*,
use_ssl: bool = False,
ssl_cert_path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
timeout: float = 60.0,
hosts,
use_ssl=False,
ssl_cert_path=None,
username=None,
password=None,
apiKey=None,
timeout=60.0,
):
"""
Sets the Elasticsearch hosts to use
Args:
hosts (str | list[str]): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
hosts (str): A single hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use a HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
apiKey (str): The Base64 encoded API key to use for authentication
timeout (float): Timeout in seconds
"""
if not isinstance(hosts, list):
@@ -295,14 +291,14 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
if username:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
if apiKey:
conn_params["api_key"] = apiKey
connections.create_connection(**conn_params)
def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def create_indexes(names, settings=None):
"""
Create Elasticsearch indexes
@@ -325,10 +321,7 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
raise ElasticsearchError("Elasticsearch error: {0}".format(e.__str__()))
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
"""
Updates index mappings
@@ -367,7 +360,7 @@ def migrate_indexes(
}
Index(new_index_name).create()
Index(new_index_name).put_mapping(doc_type=doc, body=body)
reindex(connections.get_connection(), aggregate_index_name, new_index_name) # pyright: ignore[reportArgumentType]
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
for forensic_index in forensic_indexes:
@@ -375,18 +368,18 @@ def migrate_indexes(
def save_aggregate_report_to_elasticsearch(
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
aggregate_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed DMARC aggregate report to Elasticsearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (OrderedDict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -402,50 +395,7 @@ def save_aggregate_report_to_elasticsearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try:
existing = search.execute()
except Exception as error_:
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
@@ -459,8 +409,8 @@ def save_aggregate_report_to_elasticsearch(
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
normalized_timespan = record["normalized_timespan"]
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
@@ -468,6 +418,41 @@ def save_aggregate_report_to_elasticsearch(
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise ElasticsearchError(
"Elasticsearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"Elasticsearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],
@@ -475,9 +460,9 @@ def save_aggregate_report_to_elasticsearch(
org_extra_contact_info=metadata["org_extra_contact_info"],
report_id=metadata["report_id"],
date_range=date_range,
date_begin=begin_date,
date_end=end_date,
normalized_timespan=normalized_timespan,
date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"],
normalized_timespan=record["normalized_timespan"],
errors=metadata["errors"],
published_policy=published_policy,
source_ip_address=record["source"]["ip_address"],
@@ -527,7 +512,7 @@ def save_aggregate_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
agg_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
agg_doc.meta.index = index
try:
agg_doc.save()
@@ -536,18 +521,18 @@ def save_aggregate_report_to_elasticsearch(
def save_forensic_report_to_elasticsearch(
forensic_report: dict[str, Any],
index_suffix: Optional[Any] = None,
index_prefix: Optional[str] = None,
monthly_indexes: Optional[bool] = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
forensic_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed DMARC forensic report to Elasticsearch
Args:
forensic_report (dict): A parsed forensic report
forensic_report (OrderedDict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -567,7 +552,7 @@ def save_forensic_report_to_elasticsearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
headers = OrderedDict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -581,7 +566,7 @@ def save_forensic_report_to_elasticsearch(
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds))) # pyright: ignore[reportArgumentType]
q = Q(dict(match=dict(arrival_date=arrival_date_epoch_milliseconds)))
from_ = None
to_ = None
@@ -596,7 +581,7 @@ def save_forensic_report_to_elasticsearch(
from_ = dict()
from_["sample.headers.from"] = headers["from"]
from_query = Q(dict(match_phrase=from_)) # pyright: ignore[reportArgumentType]
from_query = Q(dict(match_phrase=from_))
q = q & from_query
if "to" in headers:
# We convert the TO header from a string list to a flat string.
@@ -608,12 +593,12 @@ def save_forensic_report_to_elasticsearch(
to_ = dict()
to_["sample.headers.to"] = headers["to"]
to_query = Q(dict(match_phrase=to_)) # pyright: ignore[reportArgumentType]
to_query = Q(dict(match_phrase=to_))
q = q & to_query
if "subject" in headers:
subject = headers["subject"]
subject_query = {"match_phrase": {"sample.headers.subject": subject}}
q = q & Q(subject_query) # pyright: ignore[reportArgumentType]
q = q & Q(subject_query)
search.query = q
existing = search.execute()
@@ -691,7 +676,7 @@ def save_forensic_report_to_elasticsearch(
number_of_shards=number_of_shards, number_of_replicas=number_of_replicas
)
create_indexes([index], index_settings)
forensic_doc.meta.index = index # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
forensic_doc.meta.index = index
try:
forensic_doc.save()
except Exception as e:
@@ -703,18 +688,18 @@ def save_forensic_report_to_elasticsearch(
def save_smtp_tls_report_to_elasticsearch(
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed SMTP TLS report to Elasticsearch
Args:
report (dict): A parsed SMTP TLS report
report (OrderedDict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -738,10 +723,10 @@ def save_smtp_tls_report_to_elasticsearch(
report["begin_date"] = begin_date
report["end_date"] = end_date
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # pyright: ignore[reportArgumentType]
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "smtp_tls_{0}*".format(index_suffix)
@@ -800,7 +785,7 @@ def save_smtp_tls_report_to_elasticsearch(
policy_doc = _SMTPTLSPolicyDoc(
policy_domain=policy["policy_domain"],
policy_type=policy["policy_type"],
successful_session_count=policy["successful_session_count"],
succesful_session_count=policy["successful_session_count"],
failed_session_count=policy["failed_session_count"],
policy_string=policy_strings,
mx_host_patterns=mx_host_patterns,
@@ -842,10 +827,10 @@ def save_smtp_tls_report_to_elasticsearch(
additional_information_uri=additional_information_uri,
failure_reason_code=failure_reason_code,
)
smtp_tls_doc.policies.append(policy_doc) # pyright: ignore[reportCallIssue]
smtp_tls_doc.policies.append(policy_doc)
create_indexes([index], index_settings)
smtp_tls_doc.meta.index = index # pyright: ignore[reportOptionalMemberAccess, reportAttributeAccessIssue]
smtp_tls_doc.meta.index = index
try:
smtp_tls_doc.save()

View File

@@ -1,19 +1,17 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
import logging.handlers
import json
import threading
from typing import Any
from pygelf import GelfTcpHandler, GelfTlsHandler, GelfUdpHandler
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
parsed_forensic_reports_to_csv_rows,
parsed_smtp_tls_reports_to_csv_rows,
)
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler
log_context_data = threading.local()
@@ -50,7 +48,7 @@ class GelfClient(object):
)
self.logger.addHandler(self.handler)
def save_aggregate_report_to_gelf(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_gelf(self, aggregate_reports):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
@@ -58,14 +56,12 @@ class GelfClient(object):
log_context_data.parsedmarc = None
def save_forensic_report_to_gelf(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_gelf(self, forensic_reports):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc forensic report")
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports: dict[str, Any]):
def save_smtp_tls_report_to_gelf(self, smtp_tls_reports):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info("parsedmarc smtptls report")
self.logger.info(json.dumps(row))

View File

@@ -1,17 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
from ssl import SSLContext, create_default_context
from typing import Any, Optional, Union
from ssl import create_default_context
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
from collections import OrderedDict
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import __version__
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
class KafkaError(RuntimeError):
@@ -20,13 +18,7 @@ class KafkaError(RuntimeError):
class KafkaClient(object):
def __init__(
self,
kafka_hosts: list[str],
*,
ssl: Optional[bool] = False,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_context: Optional[SSLContext] = None,
self, kafka_hosts, ssl=False, username=None, password=None, ssl_context=None
):
"""
Initializes the Kafka client
@@ -36,7 +28,7 @@ class KafkaClient(object):
ssl (bool): Use a SSL/TLS connection
username (str): An optional username
password (str): An optional password
ssl_context (SSLContext): SSL context options
ssl_context: SSL context options
Notes:
``use_ssl=True`` is implied when a username or password are
@@ -46,7 +38,7 @@ class KafkaClient(object):
``$ConnectionString``, and the password is the
Azure Event Hub connection string.
"""
config: dict[str, Any] = dict(
config = dict(
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
bootstrap_servers=kafka_hosts,
client_id="parsedmarc-{0}".format(__version__),
@@ -63,7 +55,7 @@ class KafkaClient(object):
raise KafkaError("No Kafka brokers available")
@staticmethod
def strip_metadata(report: dict[str, Any]):
def strip_metadata(report):
"""
Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline
@@ -77,7 +69,7 @@ class KafkaClient(object):
return report
@staticmethod
def generate_date_range(report: dict[str, Any]):
def generate_daterange(report):
"""
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana.
@@ -94,11 +86,7 @@ class KafkaClient(object):
logger.debug("date_range is {}".format(date_range))
return date_range
def save_aggregate_reports_to_kafka(
self,
aggregate_reports: Union[dict[str, Any], list[dict[str, Any]]],
aggregate_topic: str,
):
def save_aggregate_reports_to_kafka(self, aggregate_reports, aggregate_topic):
"""
Saves aggregate DMARC reports to Kafka
@@ -108,14 +96,16 @@ class KafkaClient(object):
aggregate_topic (str): The name of the Kafka topic
"""
if isinstance(aggregate_reports, dict):
if isinstance(aggregate_reports, dict) or isinstance(
aggregate_reports, OrderedDict
):
aggregate_reports = [aggregate_reports]
if len(aggregate_reports) < 1:
return
for report in aggregate_reports:
report["date_range"] = self.generate_date_range(report)
report["date_range"] = self.generate_daterange(report)
report = self.strip_metadata(report)
for slice in report["records"]:
@@ -139,11 +129,7 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(
self,
forensic_reports: Union[dict[str, Any], list[dict[str, Any]]],
forensic_topic: str,
):
def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic):
"""
Saves forensic DMARC reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
@@ -173,11 +159,7 @@ class KafkaClient(object):
except Exception as e:
raise KafkaError("Kafka error: {0}".format(e.__str__()))
def save_smtp_tls_reports_to_kafka(
self,
smtp_tls_reports: Union[list[dict[str, Any]], dict[str, Any]],
smtp_tls_topic: str,
):
def save_smtp_tls_reports_to_kafka(self, smtp_tls_reports, smtp_tls_topic):
"""
Saves SMTP TLS reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB

View File

@@ -1,15 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any
from parsedmarc.log import logger
from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient
from parsedmarc.log import logger
class LogAnalyticsException(Exception):
"""Raised when an Elasticsearch error occurs"""
@@ -108,12 +102,7 @@ class LogAnalyticsClient(object):
"Invalid configuration. " + "One or more required settings are missing."
)
def publish_json(
self,
results,
logs_client: LogsIngestionClient,
dcr_stream: str,
):
def publish_json(self, results, logs_client: LogsIngestionClient, dcr_stream: str):
"""
Background function to publish given
DMARC report to specific Data Collection Rule.
@@ -132,11 +121,7 @@ class LogAnalyticsClient(object):
raise LogAnalyticsException("Upload failed: {error}".format(error=e))
def publish_results(
self,
results: dict[str, Any],
save_aggregate: bool,
save_forensic: bool,
save_smtp_tls: bool,
self, results, save_aggregate: bool, save_forensic: bool, save_smtp_tls: bool
):
"""
Function to publish DMARC and/or SMTP TLS reports to Log Analytics

View File

@@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from base64 import urlsafe_b64decode
from functools import lru_cache
from pathlib import Path
@@ -116,14 +112,14 @@ class GmailConnection(MailboxConnection):
else:
return [id for id in self._fetch_all_message_ids(reports_label_id)]
def fetch_message(self, message_id) -> str:
def fetch_message(self, message_id):
msg = (
self.service.users()
.messages()
.get(userId="me", id=message_id, format="raw")
.execute()
)
return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
return urlsafe_b64decode(msg["raw"])
def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id)
@@ -156,4 +152,3 @@ class GmailConnection(MailboxConnection):
for label in labels:
if label_name == label["id"] or label_name == label["name"]:
return label["id"]
return ""

View File

@@ -1,12 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from enum import Enum
from functools import lru_cache
from pathlib import Path
from time import sleep
from typing import Any, List, Optional, Union
from typing import List, Optional
from azure.identity import (
UsernamePasswordCredential,
@@ -28,7 +24,7 @@ class AuthMethod(Enum):
def _get_cache_args(token_path: Path, allow_unencrypted_storage):
cache_args: dict[str, Any] = {
cache_args = {
"cache_persistence_options": TokenCachePersistenceOptions(
name="parsedmarc", allow_unencrypted_storage=allow_unencrypted_storage
)
@@ -151,9 +147,9 @@ class MSGraphConnection(MailboxConnection):
else:
logger.warning(f"Unknown response {resp.status_code} {resp.json()}")
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
def fetch_messages(self, folder_name: str, **kwargs) -> List[str]:
"""Returns a list of message UIDs in the specified folder"""
folder_id = self._find_folder_id_from_folder_path(reports_folder)
folder_id = self._find_folder_id_from_folder_path(folder_name)
url = f"/users/{self.mailbox_name}/mailFolders/{folder_id}/messages"
since = kwargs.get("since")
if not since:
@@ -166,7 +162,7 @@ class MSGraphConnection(MailboxConnection):
def _get_all_messages(self, url, batch_size, since):
messages: list
params: dict[str, Union[str, int]] = {"$select": "id"}
params = {"$select": "id"}
if since:
params["$filter"] = f"receivedDateTime ge {since}"
if batch_size and batch_size > 0:

View File

@@ -1,9 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import cast
from time import sleep
from imapclient.exceptions import IMAPClientError
@@ -17,14 +11,14 @@ from parsedmarc.mail.mailbox_connection import MailboxConnection
class IMAPConnection(MailboxConnection):
def __init__(
self,
host: str,
user: str,
password: str,
port: int = 993,
ssl: bool = True,
verify: bool = True,
timeout: int = 30,
max_retries: int = 4,
host=None,
user=None,
password=None,
port=None,
ssl=True,
verify=True,
timeout=30,
max_retries=4,
):
self._username = user
self._password = password
@@ -46,18 +40,18 @@ class IMAPConnection(MailboxConnection):
def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder)
since = kwargs.get("since")
if since is not None:
return self._client.search(f"SINCE {since}")
if since:
return self._client.search(["SINCE", since])
else:
return self._client.search()
def fetch_message(self, message_id: int):
return cast(str, self._client.fetch_message(message_id, parse=False))
def fetch_message(self, message_id):
return self._client.fetch_message(message_id, parse=False)
def delete_message(self, message_id: int):
def delete_message(self, message_id: str):
self._client.delete_messages([message_id])
def move_message(self, message_id: int, folder_name: str):
def move_message(self, message_id: str, folder_name: str):
self._client.move_messages([message_id], folder_name)
def keepalive(self):

View File

@@ -1,8 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC
from typing import List
class MailboxConnection(ABC):
@@ -13,16 +10,16 @@ class MailboxConnection(ABC):
def create_folder(self, folder_name: str):
raise NotImplementedError
def fetch_messages(self, reports_folder: str, **kwargs):
def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
raise NotImplementedError
def fetch_message(self, message_id) -> str:
raise NotImplementedError
def delete_message(self, message_id):
def delete_message(self, message_id: str):
raise NotImplementedError
def move_message(self, message_id, folder_name: str):
def move_message(self, message_id: str, folder_name: str):
raise NotImplementedError
def keepalive(self):

View File

@@ -1,21 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import mailbox
import os
from time import sleep
from typing import Dict
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
import mailbox
import os
class MaildirConnection(MailboxConnection):
def __init__(
self,
maildir_path: str,
maildir_create: bool = False,
maildir_path=None,
maildir_create=False,
):
self._maildir_path = maildir_path
self._maildir_create = maildir_create
@@ -32,31 +27,27 @@ class MaildirConnection(MailboxConnection):
)
raise Exception(ex)
self._client = mailbox.Maildir(maildir_path, create=maildir_create)
self._subfolder_client: Dict[str, mailbox.Maildir] = {}
self._subfolder_client = {}
def create_folder(self, folder_name: str):
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
self._client.add_folder(folder_name)
def fetch_messages(self, reports_folder: str, **kwargs):
return self._client.keys()
def fetch_message(self, message_id: str) -> str:
msg = self._client.get(message_id)
if msg is not None:
msg = msg.as_string()
if msg is not None:
return msg
return ""
def fetch_message(self, message_id):
return self._client.get(message_id).as_string()
def delete_message(self, message_id: str):
self._client.remove(message_id)
def move_message(self, message_id: str, folder_name: str):
message_data = self._client.get(message_id)
if message_data is None:
return
if folder_name not in self._subfolder_client:
self._subfolder_client[folder_name] = self._client.add_folder(folder_name)
if folder_name not in self._subfolder_client.keys():
self._subfolder_client = mailbox.Maildir(
os.join(self.maildir_path, folder_name), create=self.maildir_create
)
self._subfolder_client[folder_name].add(message_data)
self._client.remove(message_id)

View File

@@ -1,29 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any, Optional, Union
from collections import OrderedDict
from opensearchpy import (
Boolean,
Date,
Q,
connections,
Object,
Document,
Index,
Nested,
InnerDoc,
Integer,
Ip,
Nested,
Object,
Q,
Search,
Text,
connections,
Boolean,
Ip,
Date,
Search,
)
from opensearchpy.helpers import reindex
from parsedmarc import InvalidForensicReport
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport
class OpenSearchError(Exception):
@@ -91,18 +89,18 @@ class _AggregateReportDoc(Document):
dkim_results = Nested(_DKIMResult)
spf_results = Nested(_SPFResult)
def add_policy_override(self, type_: str, comment: str):
def add_policy_override(self, type_, comment):
self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment))
def add_dkim_result(self, domain: str, selector: str, result: _DKIMResult):
def add_dkim_result(self, domain, selector, result):
self.dkim_results.append(
_DKIMResult(domain=domain, selector=selector, result=result)
)
def add_spf_result(self, domain: str, scope: str, result: _SPFResult):
def add_spf_result(self, domain, scope, result):
self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result))
def save(self, **kwargs): # pyright: ignore[reportIncompatibleMethodOverride]
def save(self, **kwargs):
self.passed_dmarc = False
self.passed_dmarc = self.spf_aligned or self.dkim_aligned
@@ -135,21 +133,21 @@ class _ForensicSampleDoc(InnerDoc):
body = Text()
attachments = Nested(_EmailAttachmentDoc)
def add_to(self, display_name: str, address: str):
def add_to(self, display_name, address):
self.to.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_reply_to(self, display_name: str, address: str):
def add_reply_to(self, display_name, address):
self.reply_to.append(
_EmailAddressDoc(display_name=display_name, address=address)
)
def add_cc(self, display_name: str, address: str):
def add_cc(self, display_name, address):
self.cc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_bcc(self, display_name: str, address: str):
def add_bcc(self, display_name, address):
self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address))
def add_attachment(self, filename: str, content_type: str, sha256: str):
def add_attachment(self, filename, content_type, sha256):
self.attachments.append(
_EmailAttachmentDoc(
filename=filename, content_type=content_type, sha256=sha256
@@ -201,15 +199,15 @@ class _SMTPTLSPolicyDoc(InnerDoc):
def add_failure_details(
self,
result_type: Optional[str] = None,
ip_address: Optional[str] = None,
receiving_ip: Optional[str] = None,
receiving_mx_helo: Optional[str] = None,
failed_session_count: Optional[int] = None,
sending_mta_ip: Optional[str] = None,
receiving_mx_hostname: Optional[str] = None,
additional_information_uri: Optional[str] = None,
failure_reason_code: Union[str, int, None] = None,
result_type,
ip_address,
receiving_ip,
receiving_mx_helo,
failed_session_count,
sending_mta_ip=None,
receiving_mx_hostname=None,
additional_information_uri=None,
failure_reason_code=None,
):
_details = _SMTPTLSFailureDetailsDoc(
result_type=result_type,
@@ -239,14 +237,13 @@ class _SMTPTLSReportDoc(Document):
def add_policy(
self,
policy_type: str,
policy_domain: str,
successful_session_count: int,
failed_session_count: int,
*,
policy_string: Optional[str] = None,
mx_host_patterns: Optional[list[str]] = None,
failure_details: Optional[str] = None,
policy_type,
policy_domain,
successful_session_count,
failed_session_count,
policy_string=None,
mx_host_patterns=None,
failure_details=None,
):
self.policies.append(
policy_type=policy_type,
@@ -264,25 +261,24 @@ class AlreadySaved(ValueError):
def set_hosts(
hosts: Union[str, list[str]],
*,
use_ssl: Optional[bool] = False,
ssl_cert_path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
timeout: Optional[float] = 60.0,
hosts,
use_ssl=False,
ssl_cert_path=None,
username=None,
password=None,
apiKey=None,
timeout=60.0,
):
"""
Sets the OpenSearch hosts to use
Args:
hosts (str|list[str]): A single hostname or URL, or list of hostnames or URLs
hosts (str|list): A hostname or URL, or list of hostnames or URLs
use_ssl (bool): Use an HTTPS connection to the server
ssl_cert_path (str): Path to the certificate chain
username (str): The username to use for authentication
password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication
apiKey (str): The Base64 encoded API key to use for authentication
timeout (float): Timeout in seconds
"""
if not isinstance(hosts, list):
@@ -295,14 +291,14 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path
else:
conn_params["verify_certs"] = False
if username and password:
if username:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
if apiKey:
conn_params["api_key"] = apiKey
connections.create_connection(**conn_params)
def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
def create_indexes(names, settings=None):
"""
Create OpenSearch indexes
@@ -325,10 +321,7 @@ def create_indexes(names: list[str], settings: Optional[dict[str, Any]] = None):
raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
def migrate_indexes(
aggregate_indexes: Optional[list[str]] = None,
forensic_indexes: Optional[list[str]] = None,
):
def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
"""
Updates index mappings
@@ -375,18 +368,18 @@ def migrate_indexes(
def save_aggregate_report_to_opensearch(
aggregate_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
aggregate_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed DMARC aggregate report to OpenSearch
Args:
aggregate_report (dict): A parsed forensic report
aggregate_report (OrderedDict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -402,50 +395,7 @@ def save_aggregate_report_to_opensearch(
org_name = metadata["org_name"]
report_id = metadata["report_id"]
domain = aggregate_report["policy_published"]["domain"]
begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True)
end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True)
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
index_date = begin_date.strftime("%Y-%m-%d")
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
try:
existing = search.execute()
except Exception as error_:
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"OpenSearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
published_policy = _PublishedPolicy(
domain=aggregate_report["policy_published"]["domain"],
adkim=aggregate_report["policy_published"]["adkim"],
@@ -459,8 +409,8 @@ def save_aggregate_report_to_opensearch(
for record in aggregate_report["records"]:
begin_date = human_timestamp_to_datetime(record["interval_begin"], to_utc=True)
end_date = human_timestamp_to_datetime(record["interval_end"], to_utc=True)
normalized_timespan = record["normalized_timespan"]
begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ")
end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ")
if monthly_indexes:
index_date = begin_date.strftime("%Y-%m")
else:
@@ -468,6 +418,41 @@ def save_aggregate_report_to_opensearch(
aggregate_report["begin_date"] = begin_date
aggregate_report["end_date"] = end_date
date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]]
org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix)
else:
search_index = "dmarc_aggregate*"
if index_prefix is not None:
search_index = "{0}{1}".format(index_prefix, search_index)
search = Search(index=search_index)
query = org_name_query & report_id_query & domain_query
query = query & begin_date_query & end_date_query
search.query = query
try:
existing = search.execute()
except Exception as error_:
raise OpenSearchError(
"OpenSearch's search for existing report \
error: {}".format(error_.__str__())
)
if len(existing) > 0:
raise AlreadySaved(
"An aggregate report ID {0} from {1} about {2} "
"with a date range of {3} UTC to {4} UTC already "
"exists in "
"OpenSearch".format(
report_id, org_name, domain, begin_date_human, end_date_human
)
)
agg_doc = _AggregateReportDoc(
xml_schema=aggregate_report["xml_schema"],
org_name=metadata["org_name"],
@@ -475,9 +460,8 @@ def save_aggregate_report_to_opensearch(
org_extra_contact_info=metadata["org_extra_contact_info"],
report_id=metadata["report_id"],
date_range=date_range,
date_begin=begin_date,
date_end=end_date,
normalized_timespan=normalized_timespan,
date_begin=aggregate_report["begin_date"],
date_end=aggregate_report["end_date"],
errors=metadata["errors"],
published_policy=published_policy,
source_ip_address=record["source"]["ip_address"],
@@ -536,18 +520,18 @@ def save_aggregate_report_to_opensearch(
def save_forensic_report_to_opensearch(
forensic_report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
forensic_report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed DMARC forensic report to OpenSearch
Args:
forensic_report (dict): A parsed forensic report
forensic_report (OrderedDict): A parsed forensic report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily
@@ -567,7 +551,7 @@ def save_forensic_report_to_opensearch(
sample_date = forensic_report["parsed_sample"]["date"]
sample_date = human_timestamp_to_datetime(sample_date)
original_headers = forensic_report["parsed_sample"]["headers"]
headers: dict[str, Any] = {}
headers = OrderedDict()
for original_header in original_headers:
headers[original_header.lower()] = original_headers[original_header]
@@ -703,18 +687,18 @@ def save_forensic_report_to_opensearch(
def save_smtp_tls_report_to_opensearch(
report: dict[str, Any],
index_suffix: Optional[str] = None,
index_prefix: Optional[str] = None,
monthly_indexes: bool = False,
number_of_shards: int = 1,
number_of_replicas: int = 0,
report,
index_suffix=None,
index_prefix=None,
monthly_indexes=False,
number_of_shards=1,
number_of_replicas=0,
):
"""
Saves a parsed SMTP TLS report to OpenSearch
Args:
report (dict): A parsed SMTP TLS report
report (OrderedDict): A parsed SMTP TLS report
index_suffix (str): The suffix of the name of the index to save to
index_prefix (str): The prefix of the name of the index to save to
monthly_indexes (bool): Use monthly indexes instead of daily indexes
@@ -724,7 +708,7 @@ def save_smtp_tls_report_to_opensearch(
Raises:
AlreadySaved
"""
logger.info("Saving SMTP TLS report to OpenSearch")
logger.info("Saving aggregate report to OpenSearch")
org_name = report["organization_name"]
report_id = report["report_id"]
begin_date = human_timestamp_to_datetime(report["begin_date"], to_utc=True)
@@ -800,7 +784,7 @@ def save_smtp_tls_report_to_opensearch(
policy_doc = _SMTPTLSPolicyDoc(
policy_domain=policy["policy_domain"],
policy_type=policy["policy_type"],
successful_session_count=policy["successful_session_count"],
succesful_session_count=policy["successful_session_count"],
failed_session_count=policy["failed_session_count"],
policy_string=policy_strings,
mx_host_patterns=mx_host_patterns,

View File

@@ -132,7 +132,6 @@ asu-vei.ru,ASU-VEI,Industrial
atextelecom.com.br,ATEX Telecom,ISP
atmailcloud.com,atmail,Email Provider
ats.ca,ATS Healthcare,Healthcare
att.net,AT&T,ISP
atw.ne.jp,ATW,Web Host
au-net.ne.jp,KDDI,ISP
au.com,au,ISP
@@ -243,7 +242,6 @@ carandainet.com.br,CN Internet,ISP
cardhealth.com,Cardinal Health,Healthcare
cardinal.com,Cardinal Health,Healthcare
cardinalhealth.com,Cardinal Health,Healthcare
cardinalscriptnet.com,Cardinal Health,Healthcare
carecentrix.com,CareCentrix,Healthcare
carleton.edu,Carlton College,Education
carrierzone.com,carrierzone,Email Security
@@ -699,7 +697,6 @@ hdsupply-email.com,HD Supply,Retail
healthall.com,UC Health,Healthcare
healthcaresupplypros.com,Healthcare Supply Pros,Healthcare
healthproductsforyou.com,Health Products For You,Healthcare
healthtouch.com,Cardinal Health,Healthcare
helloserver6.com,1st Source Web,Marketing
helpforcb.com,InterServer,Web Host
helpscout.net,Help Scout,SaaS
@@ -756,8 +753,6 @@ hostwindsdns.com,Hostwinds,Web Host
hotnet.net.il,Hot Net Internet Services,ISP
hp.com,HP,Technology
hringdu.is,Hringdu,ISP
hslda.net,Home School Legal Defense Association (HSLDA),Education
hslda.org,Home School Legal Defense Association (HSLDA),Education
hspherefilter.com,"DynamicNet, Inc. (DNI)",Web Host
htc.net,HTC,ISP
htmlservices.it,HTMLServices.it,MSP
@@ -768,7 +763,6 @@ hughston.com,Hughston Clinic,Healthcare
hvvc.us,Hivelocity,Web Host
i2ts.ne.jp,i2ts,Web Host
i4i.com,i4i,Technology
ibindley.com,Cardinal Health,Healthcare
ice.co.cr,Grupo ICE,Industrial
icehosting.nl,IceHosting,Web Host
icewarpcloud.in,IceWrap,Email Provider
@@ -838,7 +832,6 @@ ip-5-196-151.eu,OVH,Web Host
ip-51-161-36.net,OVH,Web Host
ip-51-195-53.eu,OVH,Web Host
ip-51-254-53.eu,OVH,Web Host
ip-51-38-67.eu,OVH,Web Host
ip-51-77-42.eu,OVH,Web Host
ip-51-83-140.eu,OVH,Web Host
ip-51-89-240.eu,OVH,Web Host
@@ -1224,7 +1217,6 @@ nettoday.co.th,Net Today,Web Host
netventure.pl,Netventure,MSP
netvigator.com,HKT,ISP
netvision.net.il,013 Netvision,ISP
network-tech.com,Network Technologies International (NTI),SaaS
network.kz,network.kz,ISP
network80.com,Network80,Web Host
neubox.net,Neubox,Web Host
1 base_reverse_dns name type
132 atextelecom.com.br ATEX Telecom ISP
133 atmailcloud.com atmail Email Provider
134 ats.ca ATS Healthcare Healthcare
att.net AT&T ISP
135 atw.ne.jp ATW Web Host
136 au-net.ne.jp KDDI ISP
137 au.com au ISP
242 cardhealth.com Cardinal Health Healthcare
243 cardinal.com Cardinal Health Healthcare
244 cardinalhealth.com Cardinal Health Healthcare
cardinalscriptnet.com Cardinal Health Healthcare
245 carecentrix.com CareCentrix Healthcare
246 carleton.edu Carlton College Education
247 carrierzone.com carrierzone Email Security
697 healthall.com UC Health Healthcare
698 healthcaresupplypros.com Healthcare Supply Pros Healthcare
699 healthproductsforyou.com Health Products For You Healthcare
healthtouch.com Cardinal Health Healthcare
700 helloserver6.com 1st Source Web Marketing
701 helpforcb.com InterServer Web Host
702 helpscout.net Help Scout SaaS
753 hotnet.net.il Hot Net Internet Services ISP
754 hp.com HP Technology
755 hringdu.is Hringdu ISP
hslda.net Home School Legal Defense Association (HSLDA) Education
hslda.org Home School Legal Defense Association (HSLDA) Education
756 hspherefilter.com DynamicNet, Inc. (DNI) Web Host
757 htc.net HTC ISP
758 htmlservices.it HTMLServices.it MSP
763 hvvc.us Hivelocity Web Host
764 i2ts.ne.jp i2ts Web Host
765 i4i.com i4i Technology
ibindley.com Cardinal Health Healthcare
766 ice.co.cr Grupo ICE Industrial
767 icehosting.nl IceHosting Web Host
768 icewarpcloud.in IceWrap Email Provider
832 ip-51-161-36.net OVH Web Host
833 ip-51-195-53.eu OVH Web Host
834 ip-51-254-53.eu OVH Web Host
ip-51-38-67.eu OVH Web Host
835 ip-51-77-42.eu OVH Web Host
836 ip-51-83-140.eu OVH Web Host
837 ip-51-89-240.eu OVH Web Host
1217 netventure.pl Netventure MSP
1218 netvigator.com HKT ISP
1219 netvision.net.il 013 Netvision ISP
network-tech.com Network Technologies International (NTI) SaaS
1220 network.kz network.kz ISP
1221 network80.com Network80 Web Host
1222 neubox.net Neubox Web Host

View File

@@ -13,6 +13,8 @@ def _main():
csv_headers = ["source_name", "message_count"]
output_rows = []
known_unknown_domains = []
psl_overrides = []
known_domains = []

View File

@@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
from typing import Any
import boto3
from parsedmarc.log import logger
@@ -12,16 +8,16 @@ from parsedmarc.utils import human_timestamp_to_datetime
class S3Client(object):
"""A client for interacting with Amazon S3"""
"""A client for a Amazon S3"""
def __init__(
self,
bucket_name: str,
bucket_path: str,
region_name: str,
endpoint_url: str,
access_key_id: str,
secret_access_key: str,
bucket_name,
bucket_path,
region_name,
endpoint_url,
access_key_id,
secret_access_key,
):
"""
Initializes the S3Client
@@ -51,18 +47,18 @@ class S3Client(object):
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
self.bucket = self.s3.Bucket(self.bucket_name) # type: ignore
self.bucket = self.s3.Bucket(self.bucket_name)
def save_aggregate_report_to_s3(self, report: dict[str, Any]):
def save_aggregate_report_to_s3(self, report):
self.save_report_to_s3(report, "aggregate")
def save_forensic_report_to_s3(self, report: dict[str, Any]):
def save_forensic_report_to_s3(self, report):
self.save_report_to_s3(report, "forensic")
def save_smtp_tls_report_to_s3(self, report: dict[str, Any]):
def save_smtp_tls_report_to_s3(self, report):
self.save_report_to_s3(report, "smtp_tls")
def save_report_to_s3(self, report: dict[str, Any], report_type: str):
def save_report_to_s3(self, report, report_type):
if report_type == "smtp_tls":
report_date = report["begin_date"]
report_id = report["report_id"]

View File

@@ -1,14 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import socket
from typing import Any, Union
from urllib.parse import urlparse
import socket
import json
import requests
import urllib3
import requests
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
@@ -28,13 +23,7 @@ class HECClient(object):
# http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
def __init__(
self,
url: str,
access_token: str,
index: str,
source: str = "parsedmarc",
verify=True,
timeout=60,
self, url, access_token, index, source="parsedmarc", verify=True, timeout=60
):
"""
Initializes the HECClient
@@ -48,9 +37,9 @@ class HECClient(object):
timeout (float): Number of seconds to wait for the server to send
data before giving up
"""
parsed_url = urlparse(url)
url = urlparse(url)
self.url = "{0}://{1}/services/collector/event/1.0".format(
parsed_url.scheme, parsed_url.netloc
url.scheme, url.netloc
)
self.access_token = access_token.lstrip("Splunk ")
self.index = index
@@ -59,19 +48,14 @@ class HECClient(object):
self.session = requests.Session()
self.timeout = timeout
self.session.verify = verify
self._common_data: dict[str, Union[str, int, float, dict]] = dict(
host=self.host, source=self.source, index=self.index
)
self._common_data = dict(host=self.host, source=self.source, index=self.index)
self.session.headers = {
"User-Agent": USER_AGENT,
"Authorization": "Splunk {0}".format(self.access_token),
}
def save_aggregate_reports_to_splunk(
self,
aggregate_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
def save_aggregate_reports_to_splunk(self, aggregate_reports):
"""
Saves aggregate DMARC reports to Splunk
@@ -91,7 +75,7 @@ class HECClient(object):
json_str = ""
for report in aggregate_reports:
for record in report["records"]:
new_report: dict[str, Union[str, int, float, dict]] = dict()
new_report = dict()
for metadata in report["report_metadata"]:
new_report[metadata] = report["report_metadata"][metadata]
new_report["interval_begin"] = record["interval_begin"]
@@ -134,10 +118,7 @@ class HECClient(object):
if response["code"] != 0:
raise SplunkError(response["text"])
def save_forensic_reports_to_splunk(
self,
forensic_reports: Union[list[dict[str, Any]], dict[str, Any]],
):
def save_forensic_reports_to_splunk(self, forensic_reports):
"""
Saves forensic DMARC reports to Splunk
@@ -171,9 +152,7 @@ class HECClient(object):
if response["code"] != 0:
raise SplunkError(response["text"])
def save_smtp_tls_reports_to_splunk(
self, reports: Union[list[dict[str, Any]], dict[str, Any]]
):
def save_smtp_tls_reports_to_splunk(self, reports):
"""
Saves aggregate DMARC reports to Splunk

View File

@@ -1,15 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import logging
import logging.handlers
import socket
import ssl
import time
from typing import Any, Optional
import json
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
@@ -21,161 +14,31 @@ from parsedmarc import (
class SyslogClient(object):
"""A client for Syslog"""
def __init__(
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
def __init__(self, server_name, server_port):
"""
Initializes the SyslogClient
Args:
server_name (str): The Syslog server
server_port (int): The Syslog port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
server_port (int): The Syslog UDP port
"""
self.server_name = server_name
self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
self.logger.addHandler(log_handler)
def _create_syslog_handler(
self,
server_name: str,
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
def save_aggregate_report_to_syslog(self, aggregate_reports):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_forensic_report_to_syslog(self, forensic_reports: list[dict[str, Any]]):
def save_forensic_report_to_syslog(self, forensic_reports):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))
def save_smtp_tls_report_to_syslog(self, smtp_tls_reports: list[dict[str, Any]]):
def save_smtp_tls_report_to_syslog(self, smtp_tls_reports):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))

View File

@@ -1,220 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# NOTE: This module is intentionally Python 3.9 compatible.
# - No PEP 604 unions (A | B)
# - No typing.NotRequired / Required (3.11+) to avoid an extra dependency.
# For optional keys, use total=False TypedDicts.
ReportType = Literal["aggregate", "forensic", "smtp_tls"]
class AggregateReportMetadata(TypedDict):
org_name: str
org_email: str
org_extra_contact_info: Optional[str]
report_id: str
begin_date: str
end_date: str
timespan_requires_normalization: bool
original_timespan_seconds: int
errors: List[str]
class AggregatePolicyPublished(TypedDict):
domain: str
adkim: str
aspf: str
p: str
sp: str
pct: str
fo: str
class IPSourceInfo(TypedDict):
ip_address: str
country: Optional[str]
reverse_dns: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
class AggregateAlignment(TypedDict):
spf: bool
dkim: bool
dmarc: bool
class AggregateIdentifiers(TypedDict):
header_from: str
envelope_from: Optional[str]
envelope_to: Optional[str]
class AggregatePolicyOverrideReason(TypedDict):
type: Optional[str]
comment: Optional[str]
class AggregateAuthResultDKIM(TypedDict):
domain: str
result: str
selector: str
class AggregateAuthResultSPF(TypedDict):
domain: str
result: str
scope: str
class AggregateAuthResults(TypedDict):
dkim: List[AggregateAuthResultDKIM]
spf: List[AggregateAuthResultSPF]
class AggregatePolicyEvaluated(TypedDict):
disposition: str
dkim: str
spf: str
policy_override_reasons: List[AggregatePolicyOverrideReason]
class AggregateRecord(TypedDict):
interval_begin: str
interval_end: str
source: IPSourceInfo
count: int
alignment: AggregateAlignment
policy_evaluated: AggregatePolicyEvaluated
disposition: str
identifiers: AggregateIdentifiers
auth_results: AggregateAuthResults
class AggregateReport(TypedDict):
xml_schema: str
report_metadata: AggregateReportMetadata
policy_published: AggregatePolicyPublished
records: List[AggregateRecord]
class EmailAddress(TypedDict):
display_name: Optional[str]
address: str
local: Optional[str]
domain: Optional[str]
class EmailAttachment(TypedDict, total=False):
filename: Optional[str]
mail_content_type: Optional[str]
sha256: Optional[str]
ParsedEmail = TypedDict(
"ParsedEmail",
{
# This is a lightly-specified version of mailsuite/mailparser JSON.
# It focuses on the fields parsedmarc uses in forensic handling.
"headers": Dict[str, Any],
"subject": Optional[str],
"filename_safe_subject": Optional[str],
"date": Optional[str],
"from": EmailAddress,
"to": List[EmailAddress],
"cc": List[EmailAddress],
"bcc": List[EmailAddress],
"attachments": List[EmailAttachment],
"body": Optional[str],
"has_defects": bool,
"defects": Any,
"defects_categories": Any,
},
total=False,
)
class ForensicReport(TypedDict):
feedback_type: Optional[str]
user_agent: Optional[str]
version: Optional[str]
original_envelope_id: Optional[str]
original_mail_from: Optional[str]
original_rcpt_to: Optional[str]
arrival_date: str
arrival_date_utc: str
authentication_results: Optional[str]
delivery_result: Optional[str]
auth_failure: List[str]
authentication_mechanisms: List[str]
dkim_domain: Optional[str]
reported_domain: str
sample_headers_only: bool
source: IPSourceInfo
sample: str
parsed_sample: ParsedEmail
class SMTPTLSFailureDetails(TypedDict):
result_type: str
failed_session_count: int
class SMTPTLSFailureDetailsOptional(SMTPTLSFailureDetails, total=False):
sending_mta_ip: str
receiving_ip: str
receiving_mx_hostname: str
receiving_mx_helo: str
additional_info_uri: str
failure_reason_code: str
ip_address: str
class SMTPTLSPolicySummary(TypedDict):
policy_domain: str
policy_type: str
successful_session_count: int
failed_session_count: int
class SMTPTLSPolicy(SMTPTLSPolicySummary, total=False):
policy_strings: List[str]
mx_host_patterns: List[str]
failure_details: List[SMTPTLSFailureDetailsOptional]
class SMTPTLSReport(TypedDict):
organization_name: str
begin_date: str
end_date: str
contact_info: Union[str, List[str]]
report_id: str
policies: List[SMTPTLSPolicy]
class AggregateParsedReport(TypedDict):
report_type: Literal["aggregate"]
report: AggregateReport
class ForensicParsedReport(TypedDict):
report_type: Literal["forensic"]
report: ForensicReport
class SMTPTLSParsedReport(TypedDict):
report_type: Literal["smtp_tls"]
report: SMTPTLSReport
ParsedReport = Union[AggregateParsedReport, ForensicParsedReport, SMTPTLSParsedReport]
class ParsingResults(TypedDict):
aggregate_reports: List[AggregateReport]
forensic_reports: List[ForensicReport]
smtp_tls_reports: List[SMTPTLSReport]

View File

@@ -1,26 +1,22 @@
# -*- coding: utf-8 -*-
"""Utility functions that might be useful for other projects"""
from __future__ import annotations
import base64
import csv
import hashlib
import io
import json
import logging
import mailbox
import os
import re
import shutil
import subprocess
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from collections import OrderedDict
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Optional, TypedDict, Union, cast
import subprocess
import shutil
import mailparser
from expiringdict import ExpiringDict
import json
import hashlib
import base64
import mailbox
import re
import csv
import io
try:
from importlib.resources import files
@@ -29,19 +25,19 @@ except ImportError:
from importlib.resources import files
import dns.exception
import dns.resolver
from dateutil.parser import parse as parse_date
import dns.reversename
import dns.resolver
import dns.exception
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests
from dateutil.parser import parse as parse_date
from parsedmarc.log import logger
import parsedmarc.resources.dbip
import parsedmarc.resources.maps
from parsedmarc.constants import USER_AGENT
from parsedmarc.log import logger
parenthesis_regex = re.compile(r"\s*\(.*\)\s*")
@@ -64,42 +60,25 @@ class DownloadError(RuntimeError):
"""Raised when an error occurs when downloading a file"""
class ReverseDNSService(TypedDict):
name: str
type: Optional[str]
ReverseDNSMap = dict[str, ReverseDNSService]
class IPAddressInfo(TypedDict):
ip_address: str
reverse_dns: Optional[str]
country: Optional[str]
base_domain: Optional[str]
name: Optional[str]
type: Optional[str]
def decode_base64(data: str) -> bytes:
def decode_base64(data):
"""
Decodes a base64 string, with padding being optional
Args:
data (str): A base64 encoded string
data: A base64 encoded string
Returns:
bytes: The decoded bytes
"""
data_bytes = bytes(data, encoding="ascii")
missing_padding = len(data_bytes) % 4
data = bytes(data, encoding="ascii")
missing_padding = len(data) % 4
if missing_padding != 0:
data_bytes += b"=" * (4 - missing_padding)
return base64.b64decode(data_bytes)
data += b"=" * (4 - missing_padding)
return base64.b64decode(data)
def get_base_domain(domain: str) -> Optional[str]:
def get_base_domain(domain):
"""
Gets the base domain name for the given domain
@@ -123,14 +102,7 @@ def get_base_domain(domain: str) -> Optional[str]:
return publicsuffix
def query_dns(
domain: str,
record_type: str,
*,
cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
) -> list[str]:
def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0):
"""
Queries DNS
@@ -149,9 +121,9 @@ def query_dns(
record_type = record_type.upper()
cache_key = "{0}_{1}".format(domain, record_type)
if cache:
cached_records = cache.get(cache_key, None)
if isinstance(cached_records, list):
return cast(list[str], cached_records)
records = cache.get(cache_key, None)
if records:
return records
resolver = dns.resolver.Resolver()
timeout = float(timeout)
@@ -165,25 +137,33 @@ def query_dns(
resolver.nameservers = nameservers
resolver.timeout = timeout
resolver.lifetime = timeout
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
if record_type == "TXT":
resource_records = list(
map(
lambda r: r.strings,
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
_resource_record = [
resource_record[0][:0].join(resource_record)
for resource_record in resource_records
if resource_record
]
records = [r.decode() for r in _resource_record]
else:
records = list(
map(
lambda r: r.to_text().replace('"', "").rstrip("."),
resolver.resolve(domain, record_type, lifetime=timeout),
)
)
)
if cache:
cache[cache_key] = records
return records
def get_reverse_dns(
ip_address,
*,
cache: Optional[ExpiringDict] = None,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
) -> Optional[str]:
def get_reverse_dns(ip_address, cache=None, nameservers=None, timeout=2.0):
"""
Resolves an IP address to a hostname using a reverse DNS query
@@ -201,7 +181,7 @@ def get_reverse_dns(
try:
address = dns.reversename.from_address(ip_address)
hostname = query_dns(
str(address), "PTR", cache=cache, nameservers=nameservers, timeout=timeout
address, "PTR", cache=cache, nameservers=nameservers, timeout=timeout
)[0]
except dns.exception.DNSException as e:
@@ -211,7 +191,7 @@ def get_reverse_dns(
return hostname
def timestamp_to_datetime(timestamp: int) -> datetime:
def timestamp_to_datetime(timestamp):
"""
Converts a UNIX/DMARC timestamp to a Python ``datetime`` object
@@ -224,7 +204,7 @@ def timestamp_to_datetime(timestamp: int) -> datetime:
return datetime.fromtimestamp(int(timestamp))
def timestamp_to_human(timestamp: int) -> str:
def timestamp_to_human(timestamp):
"""
Converts a UNIX/DMARC timestamp to a human-readable string
@@ -237,9 +217,7 @@ def timestamp_to_human(timestamp: int) -> str:
return timestamp_to_datetime(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def human_timestamp_to_datetime(
human_timestamp: str, *, to_utc: bool = False
) -> datetime:
def human_timestamp_to_datetime(human_timestamp, to_utc=False):
"""
Converts a human-readable timestamp into a Python ``datetime`` object
@@ -258,7 +236,7 @@ def human_timestamp_to_datetime(
return dt.astimezone(timezone.utc) if to_utc else dt
def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
def human_timestamp_to_unix_timestamp(human_timestamp):
"""
Converts a human-readable timestamp into a UNIX timestamp
@@ -269,12 +247,10 @@ def human_timestamp_to_unix_timestamp(human_timestamp: str) -> int:
float: The converted timestamp
"""
human_timestamp = human_timestamp.replace("T", " ")
return int(human_timestamp_to_datetime(human_timestamp).timestamp())
return human_timestamp_to_datetime(human_timestamp).timestamp()
def get_ip_address_country(
ip_address: str, *, db_path: Optional[str] = None
) -> Optional[str]:
def get_ip_address_country(ip_address, db_path=None):
"""
Returns the ISO code for the country associated
with the given IPv4 or IPv6 address
@@ -301,7 +277,7 @@ def get_ip_address_country(
]
if db_path is not None:
if not os.path.isfile(db_path):
if os.path.isfile(db_path) is False:
db_path = None
logger.warning(
f"No file exists at {db_path}. Falling back to an "
@@ -338,13 +314,12 @@ def get_ip_address_country(
def get_service_from_reverse_dns_base_domain(
base_domain,
*,
always_use_local_file: bool = False,
local_file_path: Optional[str] = None,
url: Optional[str] = None,
offline: bool = False,
reverse_dns_map: Optional[ReverseDNSMap] = None,
) -> ReverseDNSService:
always_use_local_file=False,
local_file_path=None,
url=None,
offline=False,
reverse_dns_map=None,
):
"""
Returns the service name of a given base domain name from reverse DNS.
@@ -361,6 +336,12 @@ def get_service_from_reverse_dns_base_domain(
the supplied reverse_dns_base_domain and the type will be None
"""
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(name=row["name"], type=row["type"])
base_domain = base_domain.lower().strip()
if url is None:
url = (
@@ -368,24 +349,11 @@ def get_service_from_reverse_dns_base_domain(
"/parsedmarc/master/parsedmarc/"
"resources/maps/base_reverse_dns_map.csv"
)
reverse_dns_map_value: ReverseDNSMap
if reverse_dns_map is None:
reverse_dns_map_value = {}
else:
reverse_dns_map_value = reverse_dns_map
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map_value[key] = {
"name": row["name"],
"type": row["type"],
}
reverse_dns_map = dict()
csv_file = io.StringIO()
if not (offline or always_use_local_file) and len(reverse_dns_map_value) == 0:
if not (offline or always_use_local_file) and len(reverse_dns_map) == 0:
try:
logger.debug(f"Trying to fetch reverse DNS map from {url}...")
headers = {"User-Agent": USER_AGENT}
@@ -402,7 +370,7 @@ def get_service_from_reverse_dns_base_domain(
logging.debug("Response body:")
logger.debug(csv_file.read())
if len(reverse_dns_map_value) == 0:
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
path = str(
files(parsedmarc.resources.maps).joinpath("base_reverse_dns_map.csv")
@@ -411,28 +379,26 @@ def get_service_from_reverse_dns_base_domain(
path = local_file_path
with open(path) as csv_file:
load_csv(csv_file)
service: ReverseDNSService
try:
service = reverse_dns_map_value[base_domain]
service = reverse_dns_map[base_domain]
except KeyError:
service = {"name": base_domain, "type": None}
service = dict(name=base_domain, type=None)
return service
def get_ip_address_info(
ip_address,
*,
ip_db_path: Optional[str] = None,
reverse_dns_map_path: Optional[str] = None,
always_use_local_files: bool = False,
reverse_dns_map_url: Optional[str] = None,
cache: Optional[ExpiringDict] = None,
reverse_dns_map: Optional[ReverseDNSMap] = None,
offline: bool = False,
nameservers: Optional[list[str]] = None,
timeout: float = 2.0,
) -> IPAddressInfo:
ip_db_path=None,
reverse_dns_map_path=None,
always_use_local_files=False,
reverse_dns_map_url=None,
cache=None,
reverse_dns_map=None,
offline=False,
nameservers=None,
timeout=2.0,
):
"""
Returns reverse DNS and country information for the given IP address
@@ -450,27 +416,17 @@ def get_ip_address_info(
timeout (float): Sets the DNS timeout in seconds
Returns:
dict: ``ip_address``, ``reverse_dns``, ``country``
OrderedDict: ``ip_address``, ``reverse_dns``
"""
ip_address = ip_address.lower()
if cache is not None:
cached_info = cache.get(ip_address, None)
if (
cached_info
and isinstance(cached_info, dict)
and "ip_address" in cached_info
):
info = cache.get(ip_address, None)
if info:
logger.debug(f"IP address {ip_address} was found in cache")
return cast(IPAddressInfo, cached_info)
info: IPAddressInfo = {
"ip_address": ip_address,
"reverse_dns": None,
"country": None,
"base_domain": None,
"name": None,
"type": None,
}
return info
info = OrderedDict()
info["ip_address"] = ip_address
if offline:
reverse_dns = None
else:
@@ -480,6 +436,9 @@ def get_ip_address_info(
country = get_ip_address_country(ip_address, db_path=ip_db_path)
info["country"] = country
info["reverse_dns"] = reverse_dns
info["base_domain"] = None
info["name"] = None
info["type"] = None
if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns)
if base_domain is not None:
@@ -504,7 +463,7 @@ def get_ip_address_info(
return info
def parse_email_address(original_address: str) -> dict[str, Optional[str]]:
def parse_email_address(original_address):
if original_address[0] == "":
display_name = None
else:
@@ -517,15 +476,17 @@ def parse_email_address(original_address: str) -> dict[str, Optional[str]]:
local = address_parts[0].lower()
domain = address_parts[-1].lower()
return {
"display_name": display_name,
"address": address,
"local": local,
"domain": domain,
}
return OrderedDict(
[
("display_name", display_name),
("address", address),
("local", local),
("domain", domain),
]
)
def get_filename_safe_string(string: str) -> str:
def get_filename_safe_string(string):
"""
Converts a string to a string that is safe for a filename
@@ -547,7 +508,7 @@ def get_filename_safe_string(string: str) -> str:
return string
def is_mbox(path: str) -> bool:
def is_mbox(path):
"""
Checks if the given content is an MBOX mailbox file
@@ -568,7 +529,7 @@ def is_mbox(path: str) -> bool:
return _is_mbox
def is_outlook_msg(content) -> bool:
def is_outlook_msg(content):
"""
Checks if the given content is an Outlook msg OLE/MSG file
@@ -583,7 +544,7 @@ def is_outlook_msg(content) -> bool:
)
def convert_outlook_msg(msg_bytes: bytes) -> bytes:
def convert_outlook_msg(msg_bytes):
"""
Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to
standard RFC 822 format
@@ -592,7 +553,7 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
msg_bytes (bytes): the content of the .msg file
Returns:
A RFC 822 bytes payload
A RFC 822 string
"""
if not is_outlook_msg(msg_bytes):
raise ValueError("The supplied bytes are not an Outlook MSG file")
@@ -619,9 +580,7 @@ def convert_outlook_msg(msg_bytes: bytes) -> bytes:
return rfc822
def parse_email(
data: Union[bytes, str], *, strip_attachment_payloads: bool = False
) -> dict:
def parse_email(data, strip_attachment_payloads=False):
"""
A simplified email parser

View File

@@ -1,9 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any, Optional, Union
import requests
from parsedmarc import logger
@@ -13,13 +7,7 @@ from parsedmarc.constants import USER_AGENT
class WebhookClient(object):
"""A client for webhooks"""
def __init__(
self,
aggregate_url: str,
forensic_url: str,
smtp_tls_url: str,
timeout: Optional[int] = 60,
):
def __init__(self, aggregate_url, forensic_url, smtp_tls_url, timeout=60):
"""
Initializes the WebhookClient
Args:
@@ -38,27 +26,25 @@ class WebhookClient(object):
"Content-Type": "application/json",
}
def save_forensic_report_to_webhook(self, report: str):
def save_forensic_report_to_webhook(self, report):
try:
self._send_to_webhook(self.forensic_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def save_smtp_tls_report_to_webhook(self, report: str):
def save_smtp_tls_report_to_webhook(self, report):
try:
self._send_to_webhook(self.smtp_tls_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def save_aggregate_report_to_webhook(self, report: str):
def save_aggregate_report_to_webhook(self, report):
try:
self._send_to_webhook(self.aggregate_url, report)
except Exception as error_:
logger.error("Webhook Error: {0}".format(error_.__str__()))
def _send_to_webhook(
self, webhook_url: str, payload: Union[bytes, str, dict[str, Any]]
):
def _send_to_webhook(self, webhook_url, payload):
try:
self.session.post(webhook_url, data=payload, timeout=self.timeout)
except Exception as error_:

View File

@@ -2,7 +2,6 @@
requires = [
"hatchling>=1.27.0",
]
requires_python = ">=3.9,<3.14"
build-backend = "hatchling.build"
[project]
@@ -29,7 +28,6 @@ classifiers = [
"Operating System :: OS Independent",
"Programming Language :: Python :: 3"
]
requires-python = ">=3.9"
dependencies = [
"azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0",
@@ -48,7 +46,7 @@ dependencies = [
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"mailsuite>=1.9.18",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",
@@ -88,11 +86,11 @@ include = [
[tool.hatch.build]
exclude = [
"base_reverse_dns.csv",
"find_bad_utf8.py",
"find_unknown_base_reverse_dns.py",
"unknown_base_reverse_dns.csv",
"sortmaps.py",
"README.md",
"*.bak"
"base_reverse_dns.csv",
"find_bad_utf8.py",
"find_unknown_base_reverse_dns.py",
"unknown_base_reverse_dns.csv",
"sortmaps.py",
"README.md",
"*.bak"
]

24
tests.py Executable file → Normal file
View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
@@ -12,9 +9,6 @@ from lxml import etree
import parsedmarc
import parsedmarc.utils
# Detect if running in GitHub Actions to skip DNS lookups
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
def minify_xml(xml_string):
parser = etree.XMLParser(remove_blank_text=True)
@@ -80,7 +74,7 @@ class Test(unittest.TestCase):
print()
file = "samples/extract_report/nice-input.xml"
print("Testing {0}: ".format(file), end="")
xmlout = parsedmarc.extract_report_from_file_path(file)
xmlout = parsedmarc.extract_report(file)
xmlin_file = open("samples/extract_report/nice-input.xml")
xmlin = xmlin_file.read()
xmlin_file.close()
@@ -124,7 +118,7 @@ class Test(unittest.TestCase):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
sample_path, always_use_local_files=True
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
print("Passed!")
@@ -132,7 +126,7 @@ class Test(unittest.TestCase):
def testEmptySample(self):
"""Test empty/unparasable report"""
with self.assertRaises(parsedmarc.ParserError):
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE)
parsedmarc.parse_report_file("samples/empty.xml")
def testForensicSamples(self):
"""Test sample forensic/ruf/failure DMARC reports"""
@@ -142,12 +136,8 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_email(sample_content)["report"]
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
print("Passed!")
@@ -159,9 +149,7 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")