Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
4219306365 Update Python 3.9 version table entry to note Debian 11/RHEL 9 usage
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-03 16:27:53 +00:00
copilot-swe-agent[bot]
a6e009c149 Drop Python 3.9 support: update CI matrix, pyproject.toml, docs, and README
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-03-03 16:20:34 +00:00
copilot-swe-agent[bot]
33384bd612 Initial plan 2026-03-03 16:18:42 +00:00
24 changed files with 104 additions and 2083 deletions

View File

@@ -1,72 +0,0 @@
name: Bug report
description: Report a reproducible parsedmarc bug
title: "[Bug]: "
labels:
- bug
body:
- type: input
id: version
attributes:
label: parsedmarc version
description: Include the parsedmarc version or commit if known.
placeholder: 9.x.x
validations:
required: true
- type: dropdown
id: input_backend
attributes:
label: Input backend
description: Which input path or mailbox backend is involved?
options:
- IMAP
- MS Graph
- Gmail API
- Maildir
- mbox
- Local file / direct parse
- Other
validations:
required: true
- type: textarea
id: environment
attributes:
label: Environment
description: Runtime, container image, OS, Python version, or deployment details.
placeholder: Docker on Debian, Python 3.12, parsedmarc installed from PyPI
validations:
required: true
- type: textarea
id: config
attributes:
label: Sanitized config
description: Include the relevant config fragment with secrets removed.
render: ini
- type: textarea
id: steps
attributes:
label: Steps to reproduce
description: Describe the smallest reproducible sequence you can.
placeholder: |
1. Configure parsedmarc with ...
2. Run ...
3. Observe ...
validations:
required: true
- type: textarea
id: expected_actual
attributes:
label: Expected vs actual behavior
description: What did you expect, and what happened instead?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs or traceback
description: Paste sanitized logs or a traceback if available.
render: text
- type: textarea
id: samples
attributes:
label: Sample report availability
description: If you can share a sanitized sample report or message, note that here.

View File

@@ -1,5 +0,0 @@
blank_issues_enabled: true
contact_links:
- name: Security issue
url: https://github.com/domainaware/parsedmarc/security/policy
about: Please use the security policy and avoid filing public issues for undisclosed vulnerabilities.

View File

@@ -1,30 +0,0 @@
name: Feature request
description: Suggest a new feature or behavior change
title: "[Feature]: "
labels:
- enhancement
body:
- type: textarea
id: problem
attributes:
label: Problem statement
description: What workflow or limitation are you trying to solve?
validations:
required: true
- type: textarea
id: proposal
attributes:
label: Proposed behavior
description: Describe the feature or behavior you want.
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives considered
description: Describe workarounds or alternative approaches you considered.
- type: textarea
id: impact
attributes:
label: Compatibility or operational impact
description: Note config, output, performance, or deployment implications if relevant.

View File

@@ -1,24 +0,0 @@
## Summary
-
## Why
-
## Testing
-
## Backward Compatibility / Risk
-
## Related Issue
- Closes #
## Checklist
- [ ] Tests added or updated if behavior changed
- [ ] Docs updated if config or user-facing behavior changed

View File

@@ -10,32 +10,7 @@ on:
branches: [ master ] branches: [ master ]
jobs: jobs:
lint-docs-build: build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Check code style
run: |
ruff check .
- name: Test building documentation
run: |
cd docs
make html
- name: Test building packages
run: |
hatch build
test:
needs: lint-docs-build
runs-on: ubuntu-latest runs-on: ubuntu-latest
services: services:
@@ -71,6 +46,13 @@ jobs:
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install .[build] pip install .[build]
- name: Test building documentation
run: |
cd docs
make html
- name: Check code style
run: |
ruff check .
- name: Run unit tests - name: Run unit tests
run: | run: |
pytest --cov --cov-report=xml tests.py pytest --cov --cov-report=xml tests.py
@@ -79,6 +61,9 @@ jobs:
pip install -e . pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/* parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/* parsedmarc --debug -c ci.ini samples/forensic/*
- name: Test building packages
run: |
hatch build
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v5 uses: codecov/codecov-action@v5
with: with:

View File

@@ -1,64 +0,0 @@
# AGENTS.md
This file provides guidance to AI agents when working with code in this repository.
## Project Overview
parsedmarc is a Python module and CLI utility for parsing DMARC aggregate (RUA), forensic (RUF), and SMTP TLS reports. It reads reports from IMAP, Microsoft Graph, Gmail API, Maildir, mbox files, or direct file paths, and outputs to JSON/CSV, Elasticsearch, OpenSearch, Splunk, Kafka, S3, Azure Log Analytics, syslog, or webhooks.
## Common Commands
```bash
# Install with dev/build dependencies
pip install .[build]
# Run all tests with coverage
pytest --cov --cov-report=xml tests.py
# Run a single test
pytest tests.py::Test::testAggregateSamples
# Lint and format
ruff check .
ruff format .
# Test CLI with sample reports
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
# Build docs
cd docs && make html
# Build distribution
hatch build
```
To skip DNS lookups during testing, set `GITHUB_ACTIONS=true`.
## Architecture
**Data flow:** Input sources → CLI (`cli.py:_main`) → Parse (`__init__.py`) → Enrich (DNS/GeoIP via `utils.py`) → Output integrations
### Key modules
- `parsedmarc/__init__.py` — Core parsing logic. Main functions: `parse_report_file()`, `parse_report_email()`, `parse_aggregate_report_xml()`, `parse_forensic_report()`, `parse_smtp_tls_report_json()`, `get_dmarc_reports_from_mailbox()`, `watch_inbox()`
- `parsedmarc/cli.py` — CLI entry point (`_main`), config file parsing, output orchestration
- `parsedmarc/types.py` — TypedDict definitions for all report types (`AggregateReport`, `ForensicReport`, `SMTPTLSReport`, `ParsingResults`)
- `parsedmarc/utils.py` — IP/DNS/GeoIP enrichment, base64 decoding, compression handling
- `parsedmarc/mail/` — Polymorphic mail connections: `IMAPConnection`, `GmailConnection`, `MSGraphConnection`, `MaildirConnection`
- `parsedmarc/{elastic,opensearch,splunk,kafkaclient,loganalytics,syslog,s3,webhook,gelf}.py` — Output integrations
### Report type system
`ReportType = Literal["aggregate", "forensic", "smtp_tls"]`. Exception hierarchy: `ParserError``InvalidDMARCReport``InvalidAggregateReport`/`InvalidForensicReport`, and `InvalidSMTPTLSReport`.
### Caching
IP address info cached for 4 hours, seen aggregate report IDs cached for 1 hour (via `ExpiringDict`).
## Code Style
- Ruff for formatting and linting (configured in `.vscode/settings.json`)
- TypedDict for structured data, type hints throughout
- Python ≥3.10 required
- Tests are in a single `tests.py` file using unittest; sample reports live in `samples/`

View File

@@ -1,38 +1,5 @@
# Changelog # Changelog
## 9.2.0
### Added
- OpenSearch AWS SigV4 authentication support (PR #673)
- IMAP move/delete compatibility fallbacks (PR #671)
- `fail_on_output_error` CLI option for sink failures (PR #672)
- Gmail service account auth mode for non-interactive runs (PR #676)
- Microsoft Graph certificate authentication support (PRs #692 and #693)
- Microsoft Graph well-known folder fallback for root listing failures (PR #618 and #684 close #609)
### Fixed
- Pass mailbox since filter through `watch_inbox` callback (PR #670 closes issue #581)
- `parsedmarc.mail.gmail.GmailConnection.delete_message` now properly calls the Gmail API (PR #668)
- Avoid extra mailbox fetch in batch and test mode (PR #691 closes #533)
## 9.1.2
### Fixes
- Fix duplicate detection for normalized aggregate reports in Elasticsearch/OpenSearch (PR #666 fixes issue #665)
## 9.1.1
### Fixes
- Fix the use of Elasticsearch and OpenSearch API keys (PR #660 fixes issue #653)
### Changes
- Drop support for Python 3.9 (PR #661)
## 9.1.0 ## 9.1.0
## Enhancements ## Enhancements

View File

@@ -1,3 +0,0 @@
# CLAUD.md
@AGENTS.md

View File

@@ -1,78 +0,0 @@
# Contributing
Thanks for contributing to parsedmarc.
## Local setup
Use a virtual environment for local development.
```bash
python3 -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
pip install .[build]
```
## Before opening a pull request
Run the checks that match your change:
```bash
ruff check .
pytest --cov --cov-report=xml tests.py
```
If you changed documentation:
```bash
cd docs
make html
```
If you changed CLI behavior or parsing logic, it is also useful to exercise the
sample reports:
```bash
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/forensic/*
```
To skip DNS lookups during tests, set:
```bash
GITHUB_ACTIONS=true
```
## Pull request guidelines
- Keep pull requests small and focused. Separate bug fixes, docs updates, and
repo-maintenance changes where practical.
- Add or update tests when behavior changes.
- Update docs when configuration or user-facing behavior changes.
- Include a short summary, the reason for the change, and the testing you ran.
- Link the related issue when there is one.
## Branch maintenance
Upstream `master` may move quickly. Before asking for review or after another PR
lands, rebase your branch onto the current upstream branch and force-push with
lease if needed:
```bash
git fetch upstream
git rebase upstream/master
git push --force-with-lease
```
## CI and coverage
GitHub Actions is the source of truth for linting, docs, and test status.
Codecov patch coverage is usually the most relevant signal for small PRs. Project
coverage can be noisier when the base comparison is stale, so interpret it in
the context of the actual diff.
## Questions
Use GitHub issues for bugs and feature requests. If you are not sure whether a
change is wanted, opening an issue first is usually the safest path.

View File

@@ -61,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | ✅ | Supported (requires `imapclient>=3.1.0`) | | 3.14 | ✅ | Actively maintained |

View File

@@ -1,29 +0,0 @@
# Security Policy
## Reporting a vulnerability
Please do not open a public GitHub issue for an undisclosed security
vulnerability. Use GitHub private vulnerability reporting in the Security tab of this project instead.
When reporting a vulnerability, include:
- the affected parsedmarc version or commit
- the component or integration involved
- clear reproduction details if available
- potential impact
- any suggested mitigation or workaround
## Supported versions
Security fixes will be applied to the latest released version and
the current `master` branch.
Older versions will not receive backported fixes.
## Disclosure process
After a report is received, maintainers can validate the issue, assess impact,
and coordinate a fix before public disclosure.
Please avoid publishing proof-of-concept details until maintainers have had a
reasonable opportunity to investigate and release a fix or mitigation.

View File

@@ -1,11 +0,0 @@
codecov:
require_ci_to_pass: true
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: false

View File

@@ -61,7 +61,7 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) | | 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) | | 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) | | 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | ✅ | Supported (requires `imapclient>=3.1.0`) | | 3.14 | ✅ | Actively maintained |
```{toctree} ```{toctree}
:caption: 'Contents' :caption: 'Contents'

View File

@@ -146,9 +146,6 @@ The full set of configuration options are:
- `dns_timeout` - float: DNS timeout period - `dns_timeout` - float: DNS timeout period
- `debug` - bool: Print debugging messages - `debug` - bool: Print debugging messages
- `silent` - bool: Only print errors (Default: `True`) - `silent` - bool: Only print errors (Default: `True`)
- `fail_on_output_error` - bool: Exit with a non-zero status code if
any configured output destination fails while saving/publishing
reports (Default: `False`)
- `log_file` - str: Write log messages to a file at this path - `log_file` - str: Write log messages to a file at this path
- `n_procs` - int: Number of process to run in parallel when - `n_procs` - int: Number of process to run in parallel when
parsing in CLI mode (Default: `1`) parsing in CLI mode (Default: `1`)
@@ -203,7 +200,7 @@ The full set of configuration options are:
- `password` - str: The IMAP password - `password` - str: The IMAP password
- `msgraph` - `msgraph`
- `auth_method` - str: Authentication method, valid types are - `auth_method` - str: Authentication method, valid types are
`UsernamePassword`, `DeviceCode`, `ClientSecret`, or `Certificate` `UsernamePassword`, `DeviceCode`, or `ClientSecret`
(Default: `UsernamePassword`). (Default: `UsernamePassword`).
- `user` - str: The M365 user, required when the auth method is - `user` - str: The M365 user, required when the auth method is
UsernamePassword UsernamePassword
@@ -211,11 +208,6 @@ The full set of configuration options are:
method is UsernamePassword method is UsernamePassword
- `client_id` - str: The app registration's client ID - `client_id` - str: The app registration's client ID
- `client_secret` - str: The app registration's secret - `client_secret` - str: The app registration's secret
- `certificate_path` - str: Path to a PEM or PKCS12 certificate
including the private key. Required when the auth method is
`Certificate`
- `certificate_password` - str: Optional password for the
certificate file when using `Certificate` auth
- `tenant_id` - str: The Azure AD tenant ID. This is required - `tenant_id` - str: The Azure AD tenant ID. This is required
for all auth methods except UsernamePassword. for all auth methods except UsernamePassword.
- `mailbox` - str: The mailbox name. This defaults to the - `mailbox` - str: The mailbox name. This defaults to the
@@ -253,9 +245,6 @@ The full set of configuration options are:
-Description "Restrict access to dmarc reports mailbox." -Description "Restrict access to dmarc reports mailbox."
``` ```
The same application permission and mailbox scoping guidance
applies to the `Certificate` auth method.
::: :::
- `elasticsearch` - `elasticsearch`
- `hosts` - str: A comma separated list of hostnames and ports - `hosts` - str: A comma separated list of hostnames and ports
@@ -292,10 +281,6 @@ The full set of configuration options are:
- `user` - str: Basic auth username - `user` - str: Basic auth username
- `password` - str: Basic auth password - `password` - str: Basic auth password
- `api_key` - str: API key - `api_key` - str: API key
- `auth_type` - str: Authentication type: `basic` (default) or `awssigv4` (the key `authentication_type` is accepted as an alias for this option)
- `aws_region` - str: AWS region for SigV4 authentication
(required when `auth_type = awssigv4`)
- `aws_service` - str: AWS service for SigV4 signing (Default: `es`)
- `ssl` - bool: Use an encrypted SSL/TLS connection - `ssl` - bool: Use an encrypted SSL/TLS connection
(Default: `True`) (Default: `True`)
- `timeout` - float: Timeout in seconds (Default: 60) - `timeout` - float: Timeout in seconds (Default: 60)
@@ -409,19 +394,10 @@ The full set of configuration options are:
credentials, None to disable (Default: `None`) credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file - `token_file` - str: Path to save the token file
(Default: `.token`) (Default: `.token`)
- `auth_mode` - str: Authentication mode, `installed_app` (default)
or `service_account`
- `service_account_user` - str: Delegated mailbox user for Gmail
service account auth (required for domain-wide delegation). Also
accepted as `delegated_user` for backward compatibility.
:::{note} :::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`. credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
::: :::
:::{note}
When `auth_mode = service_account`, `credentials_file` must point to a
Google service account key JSON file, and `token_file` is not used.
:::
- `include_spam_trash` - bool: Include messages in Spam and - `include_spam_trash` - bool: Include messages in Spam and
Trash when searching reports (Default: `False`) Trash when searching reports (Default: `False`)
- `scopes` - str: Comma separated list of scopes to use when - `scopes` - str: Comma separated list of scopes to use when
@@ -526,33 +502,6 @@ PUT _cluster/settings
Increasing this value increases resource usage. Increasing this value increases resource usage.
::: :::
## Performance tuning
For large mailbox imports or backfills, parsedmarc can consume a noticeable amount
of memory, especially when it runs on the same host as Elasticsearch or
OpenSearch. The following settings can reduce peak memory usage and make long
imports more predictable:
- Reduce `mailbox.batch_size` to smaller values such as `100-500` instead of
processing a very large message set at once. Smaller batches trade throughput
for lower peak memory use and less sink pressure.
- Keep `n_procs` low for mailbox-heavy runs. In practice, `1-2` workers is often
a safer starting point for large backfills than aggressive parallelism.
- Use `mailbox.since` to process reports in smaller time windows such as `1d`,
`7d`, or another interval that fits the backlog. This makes it easier to catch
up incrementally instead of loading an entire mailbox history in one run.
- Set `strip_attachment_payloads = True` when forensic reports contain large
attachments and you do not need to retain the raw payloads in the parsed
output.
- Prefer running parsedmarc separately from Elasticsearch or OpenSearch, or
reserve enough RAM for both services if they must share a host.
- For very large imports, prefer incremental supervised runs, such as a
scheduler or systemd service, over infrequent massive backfills.
These are operational tuning recommendations rather than hard requirements, but
they are often enough to avoid memory pressure and reduce failures during
high-volume mailbox processing.
## Multi-tenant support ## Multi-tenant support
Starting in `8.19.0`, ParseDMARC provides multi-tenant support by placing data into separate OpenSearch or Elasticsearch index prefixes. To set this up, create a YAML file that is formatted where each key is a tenant name, and the value is a list of domains related to that tenant, not including subdomains, like this: Starting in `8.19.0`, ParseDMARC provides multi-tenant support by placing data into separate OpenSearch or Elasticsearch index prefixes. To set this up, create a YAML file that is formatted where each key is a tenant name, and the value is a list of domains related to that tenant, not including subdomains, like this:

View File

@@ -962,12 +962,10 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
return report return report
def extract_report_from_file_path( def extract_report_from_file_path(file_path: str):
file_path: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]],
) -> str:
"""Extracts report from a file at the given file_path""" """Extracts report from a file at the given file_path"""
try: try:
with open(os.fspath(file_path), "rb") as report_file: with open(file_path, "rb") as report_file:
return extract_report(report_file.read()) return extract_report(report_file.read())
except FileNotFoundError: except FileNotFoundError:
raise ParserError("File was not found") raise ParserError("File was not found")
@@ -1662,7 +1660,7 @@ def parse_report_email(
def parse_report_file( def parse_report_file(
input_: Union[bytes, str, os.PathLike[str], os.PathLike[bytes], BinaryIO], input_: Union[bytes, str, BinaryIO],
*, *,
nameservers: Optional[list[str]] = None, nameservers: Optional[list[str]] = None,
dns_timeout: float = 2.0, dns_timeout: float = 2.0,
@@ -1679,8 +1677,7 @@ def parse_report_file(
file-like object. or bytes file-like object. or bytes
Args: Args:
input_ (str | os.PathLike | bytes | BinaryIO): A path to a file, input_ (str | bytes | BinaryIO): A path to a file, a file like object, or bytes
a file-like object, or bytes
nameservers (list): A list of one or more nameservers to use nameservers (list): A list of one or more nameservers to use
(Cloudflare's public DNS resolvers by default) (Cloudflare's public DNS resolvers by default)
dns_timeout (float): Sets the DNS timeout in seconds dns_timeout (float): Sets the DNS timeout in seconds
@@ -1697,10 +1694,9 @@ def parse_report_file(
dict: The parsed DMARC report dict: The parsed DMARC report
""" """
file_object: BinaryIO file_object: BinaryIO
if isinstance(input_, (str, os.PathLike)): if isinstance(input_, str):
file_path = os.fspath(input_) logger.debug("Parsing {0}".format(input_))
logger.debug("Parsing {0}".format(file_path)) file_object = open(input_, "rb")
file_object = open(file_path, "rb")
elif isinstance(input_, (bytes, bytearray, memoryview)): elif isinstance(input_, (bytes, bytearray, memoryview)):
file_object = BytesIO(bytes(input_)) file_object = BytesIO(bytes(input_))
else: else:
@@ -2141,17 +2137,14 @@ def get_dmarc_reports_from_mailbox(
"smtp_tls_reports": smtp_tls_reports, "smtp_tls_reports": smtp_tls_reports,
} }
if not test and not batch_size: if current_time:
if current_time: total_messages = len(
total_messages = len( connection.fetch_messages(reports_folder, since=current_time)
connection.fetch_messages(reports_folder, since=current_time) )
)
else:
total_messages = len(connection.fetch_messages(reports_folder))
else: else:
total_messages = 0 total_messages = len(connection.fetch_messages(reports_folder))
if total_messages > 0: if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run # Process emails that came in during the last run
results = get_dmarc_reports_from_mailbox( results = get_dmarc_reports_from_mailbox(
connection=connection, connection=connection,
@@ -2193,7 +2186,6 @@ def watch_inbox(
dns_timeout: float = 6.0, dns_timeout: float = 6.0,
strip_attachment_payloads: bool = False, strip_attachment_payloads: bool = False,
batch_size: int = 10, batch_size: int = 10,
since: Optional[Union[datetime, date, str]] = None,
normalize_timespan_threshold_hours: float = 24, normalize_timespan_threshold_hours: float = 24,
): ):
""" """
@@ -2220,7 +2212,6 @@ def watch_inbox(
strip_attachment_payloads (bool): Replace attachment payloads in strip_attachment_payloads (bool): Replace attachment payloads in
forensic report samples with None forensic report samples with None
batch_size (int): Number of messages to read and process before saving batch_size (int): Number of messages to read and process before saving
since: Search for messages since certain time
normalize_timespan_threshold_hours (float): Normalize timespans beyond this normalize_timespan_threshold_hours (float): Normalize timespans beyond this
""" """
@@ -2240,7 +2231,6 @@ def watch_inbox(
dns_timeout=dns_timeout, dns_timeout=dns_timeout,
strip_attachment_payloads=strip_attachment_payloads, strip_attachment_payloads=strip_attachment_payloads,
batch_size=batch_size, batch_size=batch_size,
since=since,
create_folders=False, create_folders=False,
normalize_timespan_threshold_hours=normalize_timespan_threshold_hours, normalize_timespan_threshold_hours=normalize_timespan_threshold_hours,
) )

View File

@@ -194,13 +194,6 @@ def _main():
return None return None
def process_reports(reports_): def process_reports(reports_):
output_errors = []
def log_output_error(destination, error):
message = f"{destination} Error: {error}"
logger.error(message)
output_errors.append(message)
indent_value = 2 if opts.prettify_json else None indent_value = 2 if opts.prettify_json else None
output_str = "{0}\n".format( output_str = "{0}\n".format(
json.dumps(reports_, ensure_ascii=False, indent=indent_value) json.dumps(reports_, ensure_ascii=False, indent=indent_value)
@@ -237,9 +230,11 @@ def _main():
except elastic.AlreadySaved as warning: except elastic.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_: except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__()) logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
except Exception as error_: except Exception as error_:
log_output_error("Elasticsearch exception", error_.__str__()) logger.error(
"Elasticsearch exception error: {}".format(error_.__str__())
)
try: try:
if opts.opensearch_hosts: if opts.opensearch_hosts:
@@ -257,9 +252,11 @@ def _main():
except opensearch.AlreadySaved as warning: except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_: except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__()) logger.error("OpenSearch Error: {0}".format(error_.__str__()))
except Exception as error_: except Exception as error_:
log_output_error("OpenSearch exception", error_.__str__()) logger.error(
"OpenSearch exception error: {}".format(error_.__str__())
)
try: try:
if opts.kafka_hosts: if opts.kafka_hosts:
@@ -267,25 +264,25 @@ def _main():
report, kafka_aggregate_topic report, kafka_aggregate_topic
) )
except Exception as error_: except Exception as error_:
log_output_error("Kafka", error_.__str__()) logger.error("Kafka Error: {0}".format(error_.__str__()))
try: try:
if opts.s3_bucket: if opts.s3_bucket:
s3_client.save_aggregate_report_to_s3(report) s3_client.save_aggregate_report_to_s3(report)
except Exception as error_: except Exception as error_:
log_output_error("S3", error_.__str__()) logger.error("S3 Error: {0}".format(error_.__str__()))
try: try:
if opts.syslog_server: if opts.syslog_server:
syslog_client.save_aggregate_report_to_syslog(report) syslog_client.save_aggregate_report_to_syslog(report)
except Exception as error_: except Exception as error_:
log_output_error("Syslog", error_.__str__()) logger.error("Syslog Error: {0}".format(error_.__str__()))
try: try:
if opts.gelf_host: if opts.gelf_host:
gelf_client.save_aggregate_report_to_gelf(report) gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_: except Exception as error_:
log_output_error("GELF", error_.__str__()) logger.error("GELF Error: {0}".format(error_.__str__()))
try: try:
if opts.webhook_aggregate_url: if opts.webhook_aggregate_url:
@@ -294,7 +291,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value) json.dumps(report, ensure_ascii=False, indent=indent_value)
) )
except Exception as error_: except Exception as error_:
log_output_error("Webhook", error_.__str__()) logger.error("Webhook Error: {0}".format(error_.__str__()))
if opts.hec: if opts.hec:
try: try:
@@ -302,7 +299,7 @@ def _main():
if len(aggregate_reports_) > 0: if len(aggregate_reports_) > 0:
hec_client.save_aggregate_reports_to_splunk(aggregate_reports_) hec_client.save_aggregate_reports_to_splunk(aggregate_reports_)
except splunk.SplunkError as e: except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__()) logger.error("Splunk HEC error: {0}".format(e.__str__()))
if opts.save_forensic: if opts.save_forensic:
for report in reports_["forensic_reports"]: for report in reports_["forensic_reports"]:
@@ -322,9 +319,9 @@ def _main():
except elastic.AlreadySaved as warning: except elastic.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_: except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__()) logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
except InvalidDMARCReport as error_: except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__()) logger.error(error_.__str__())
try: try:
shards = opts.opensearch_number_of_shards shards = opts.opensearch_number_of_shards
@@ -342,9 +339,9 @@ def _main():
except opensearch.AlreadySaved as warning: except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_: except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__()) logger.error("OpenSearch Error: {0}".format(error_.__str__()))
except InvalidDMARCReport as error_: except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__()) logger.error(error_.__str__())
try: try:
if opts.kafka_hosts: if opts.kafka_hosts:
@@ -352,25 +349,25 @@ def _main():
report, kafka_forensic_topic report, kafka_forensic_topic
) )
except Exception as error_: except Exception as error_:
log_output_error("Kafka", error_.__str__()) logger.error("Kafka Error: {0}".format(error_.__str__()))
try: try:
if opts.s3_bucket: if opts.s3_bucket:
s3_client.save_forensic_report_to_s3(report) s3_client.save_forensic_report_to_s3(report)
except Exception as error_: except Exception as error_:
log_output_error("S3", error_.__str__()) logger.error("S3 Error: {0}".format(error_.__str__()))
try: try:
if opts.syslog_server: if opts.syslog_server:
syslog_client.save_forensic_report_to_syslog(report) syslog_client.save_forensic_report_to_syslog(report)
except Exception as error_: except Exception as error_:
log_output_error("Syslog", error_.__str__()) logger.error("Syslog Error: {0}".format(error_.__str__()))
try: try:
if opts.gelf_host: if opts.gelf_host:
gelf_client.save_forensic_report_to_gelf(report) gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_: except Exception as error_:
log_output_error("GELF", error_.__str__()) logger.error("GELF Error: {0}".format(error_.__str__()))
try: try:
if opts.webhook_forensic_url: if opts.webhook_forensic_url:
@@ -379,7 +376,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value) json.dumps(report, ensure_ascii=False, indent=indent_value)
) )
except Exception as error_: except Exception as error_:
log_output_error("Webhook", error_.__str__()) logger.error("Webhook Error: {0}".format(error_.__str__()))
if opts.hec: if opts.hec:
try: try:
@@ -387,7 +384,7 @@ def _main():
if len(forensic_reports_) > 0: if len(forensic_reports_) > 0:
hec_client.save_forensic_reports_to_splunk(forensic_reports_) hec_client.save_forensic_reports_to_splunk(forensic_reports_)
except splunk.SplunkError as e: except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__()) logger.error("Splunk HEC error: {0}".format(e.__str__()))
if opts.save_smtp_tls: if opts.save_smtp_tls:
for report in reports_["smtp_tls_reports"]: for report in reports_["smtp_tls_reports"]:
@@ -407,9 +404,9 @@ def _main():
except elastic.AlreadySaved as warning: except elastic.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except elastic.ElasticsearchError as error_: except elastic.ElasticsearchError as error_:
log_output_error("Elasticsearch", error_.__str__()) logger.error("Elasticsearch Error: {0}".format(error_.__str__()))
except InvalidDMARCReport as error_: except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__()) logger.error(error_.__str__())
try: try:
shards = opts.opensearch_number_of_shards shards = opts.opensearch_number_of_shards
@@ -427,9 +424,9 @@ def _main():
except opensearch.AlreadySaved as warning: except opensearch.AlreadySaved as warning:
logger.warning(warning.__str__()) logger.warning(warning.__str__())
except opensearch.OpenSearchError as error_: except opensearch.OpenSearchError as error_:
log_output_error("OpenSearch", error_.__str__()) logger.error("OpenSearch Error: {0}".format(error_.__str__()))
except InvalidDMARCReport as error_: except InvalidDMARCReport as error_:
log_output_error("Invalid DMARC report", error_.__str__()) logger.error(error_.__str__())
try: try:
if opts.kafka_hosts: if opts.kafka_hosts:
@@ -437,25 +434,25 @@ def _main():
smtp_tls_reports, kafka_smtp_tls_topic smtp_tls_reports, kafka_smtp_tls_topic
) )
except Exception as error_: except Exception as error_:
log_output_error("Kafka", error_.__str__()) logger.error("Kafka Error: {0}".format(error_.__str__()))
try: try:
if opts.s3_bucket: if opts.s3_bucket:
s3_client.save_smtp_tls_report_to_s3(report) s3_client.save_smtp_tls_report_to_s3(report)
except Exception as error_: except Exception as error_:
log_output_error("S3", error_.__str__()) logger.error("S3 Error: {0}".format(error_.__str__()))
try: try:
if opts.syslog_server: if opts.syslog_server:
syslog_client.save_smtp_tls_report_to_syslog(report) syslog_client.save_smtp_tls_report_to_syslog(report)
except Exception as error_: except Exception as error_:
log_output_error("Syslog", error_.__str__()) logger.error("Syslog Error: {0}".format(error_.__str__()))
try: try:
if opts.gelf_host: if opts.gelf_host:
gelf_client.save_smtp_tls_report_to_gelf(report) gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_: except Exception as error_:
log_output_error("GELF", error_.__str__()) logger.error("GELF Error: {0}".format(error_.__str__()))
try: try:
if opts.webhook_smtp_tls_url: if opts.webhook_smtp_tls_url:
@@ -464,7 +461,7 @@ def _main():
json.dumps(report, ensure_ascii=False, indent=indent_value) json.dumps(report, ensure_ascii=False, indent=indent_value)
) )
except Exception as error_: except Exception as error_:
log_output_error("Webhook", error_.__str__()) logger.error("Webhook Error: {0}".format(error_.__str__()))
if opts.hec: if opts.hec:
try: try:
@@ -472,7 +469,7 @@ def _main():
if len(smtp_tls_reports_) > 0: if len(smtp_tls_reports_) > 0:
hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_) hec_client.save_smtp_tls_reports_to_splunk(smtp_tls_reports_)
except splunk.SplunkError as e: except splunk.SplunkError as e:
log_output_error("Splunk HEC", e.__str__()) logger.error("Splunk HEC error: {0}".format(e.__str__()))
if opts.la_dce: if opts.la_dce:
try: try:
@@ -493,16 +490,14 @@ def _main():
opts.save_smtp_tls, opts.save_smtp_tls,
) )
except loganalytics.LogAnalyticsException as e: except loganalytics.LogAnalyticsException as e:
log_output_error("Log Analytics", e.__str__()) logger.error("Log Analytics error: {0}".format(e.__str__()))
except Exception as e: except Exception as e:
log_output_error("Log Analytics", f"Unknown publishing error: {e}") logger.error(
"Unknown error occurred"
if opts.fail_on_output_error and output_errors: + " during the publishing"
raise ParserError( + " to Log Analytics: "
"Output destination failures detected: {0}".format( + e.__str__()
" | ".join(output_errors)
) )
)
arg_parser = ArgumentParser(description="Parses DMARC reports") arg_parser = ArgumentParser(description="Parses DMARC reports")
arg_parser.add_argument( arg_parser.add_argument(
@@ -644,8 +639,6 @@ def _main():
graph_password=None, graph_password=None,
graph_client_id=None, graph_client_id=None,
graph_client_secret=None, graph_client_secret=None,
graph_certificate_path=None,
graph_certificate_password=None,
graph_tenant_id=None, graph_tenant_id=None,
graph_mailbox=None, graph_mailbox=None,
graph_allow_unencrypted_storage=False, graph_allow_unencrypted_storage=False,
@@ -678,9 +671,6 @@ def _main():
opensearch_username=None, opensearch_username=None,
opensearch_password=None, opensearch_password=None,
opensearch_api_key=None, opensearch_api_key=None,
opensearch_auth_type="basic",
opensearch_aws_region=None,
opensearch_aws_service="es",
kafka_hosts=None, kafka_hosts=None,
kafka_username=None, kafka_username=None,
kafka_password=None, kafka_password=None,
@@ -720,8 +710,6 @@ def _main():
gmail_api_paginate_messages=True, gmail_api_paginate_messages=True,
gmail_api_scopes=[], gmail_api_scopes=[],
gmail_api_oauth2_port=8080, gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None, maildir_path=None,
maildir_create=False, maildir_create=False,
log_file=args.log_file, log_file=args.log_file,
@@ -746,7 +734,6 @@ def _main():
webhook_smtp_tls_url=None, webhook_smtp_tls_url=None,
webhook_timeout=60, webhook_timeout=60,
normalize_timespan_threshold_hours=24.0, normalize_timespan_threshold_hours=24.0,
fail_on_output_error=False,
) )
args = arg_parser.parse_args() args = arg_parser.parse_args()
@@ -829,10 +816,6 @@ def _main():
opts.silent = bool(general_config.getboolean("silent")) opts.silent = bool(general_config.getboolean("silent"))
if "warnings" in general_config: if "warnings" in general_config:
opts.warnings = bool(general_config.getboolean("warnings")) opts.warnings = bool(general_config.getboolean("warnings"))
if "fail_on_output_error" in general_config:
opts.fail_on_output_error = bool(
general_config.getboolean("fail_on_output_error")
)
if "log_file" in general_config: if "log_file" in general_config:
opts.log_file = general_config["log_file"] opts.log_file = general_config["log_file"]
if "n_procs" in general_config: if "n_procs" in general_config:
@@ -1014,19 +997,6 @@ def _main():
) )
exit(-1) exit(-1)
if opts.graph_auth_method == AuthMethod.Certificate.name:
if "certificate_path" in graph_config:
opts.graph_certificate_path = graph_config["certificate_path"]
else:
logger.critical(
"certificate_path setting missing from the msgraph config section"
)
exit(-1)
if "certificate_password" in graph_config:
opts.graph_certificate_password = graph_config[
"certificate_password"
]
if "client_id" in graph_config: if "client_id" in graph_config:
opts.graph_client_id = graph_config["client_id"] opts.graph_client_id = graph_config["client_id"]
else: else:
@@ -1088,10 +1058,10 @@ def _main():
opts.elasticsearch_password = elasticsearch_config["password"] opts.elasticsearch_password = elasticsearch_config["password"]
# Until 8.20 # Until 8.20
if "apiKey" in elasticsearch_config: if "apiKey" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["apiKey"] opts.elasticsearch_apiKey = elasticsearch_config["apiKey"]
# Since 8.20 # Since 8.20
if "api_key" in elasticsearch_config: if "api_key" in elasticsearch_config:
opts.elasticsearch_api_key = elasticsearch_config["api_key"] opts.elasticsearch_apiKey = elasticsearch_config["api_key"]
if "opensearch" in config: if "opensearch" in config:
opensearch_config = config["opensearch"] opensearch_config = config["opensearch"]
@@ -1128,20 +1098,10 @@ def _main():
opts.opensearch_password = opensearch_config["password"] opts.opensearch_password = opensearch_config["password"]
# Until 8.20 # Until 8.20
if "apiKey" in opensearch_config: if "apiKey" in opensearch_config:
opts.opensearch_api_key = opensearch_config["apiKey"] opts.opensearch_apiKey = opensearch_config["apiKey"]
# Since 8.20 # Since 8.20
if "api_key" in opensearch_config: if "api_key" in opensearch_config:
opts.opensearch_api_key = opensearch_config["api_key"] opts.opensearch_apiKey = opensearch_config["api_key"]
if "auth_type" in opensearch_config:
opts.opensearch_auth_type = opensearch_config["auth_type"].strip().lower()
elif "authentication_type" in opensearch_config:
opts.opensearch_auth_type = (
opensearch_config["authentication_type"].strip().lower()
)
if "aws_region" in opensearch_config:
opts.opensearch_aws_region = opensearch_config["aws_region"].strip()
if "aws_service" in opensearch_config:
opts.opensearch_aws_service = opensearch_config["aws_service"].strip()
if "splunk_hec" in config.sections(): if "splunk_hec" in config.sections():
hec_config = config["splunk_hec"] hec_config = config["splunk_hec"]
@@ -1327,16 +1287,6 @@ def _main():
opts.gmail_api_oauth2_port = gmail_api_config.getint( opts.gmail_api_oauth2_port = gmail_api_config.getint(
"oauth2_port", 8080 "oauth2_port", 8080
) )
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"service_account_user"
).strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"delegated_user"
).strip()
if "maildir" in config.sections(): if "maildir" in config.sections():
maildir_api_config = config["maildir"] maildir_api_config = config["maildir"]
@@ -1488,9 +1438,6 @@ def _main():
password=opts.opensearch_password, password=opts.opensearch_password,
api_key=opts.opensearch_api_key, api_key=opts.opensearch_api_key,
timeout=opensearch_timeout_value, timeout=opensearch_timeout_value,
auth_type=opts.opensearch_auth_type,
aws_region=opts.opensearch_aws_region,
aws_service=opts.opensearch_aws_service,
) )
opensearch.migrate_indexes( opensearch.migrate_indexes(
aggregate_indexes=[os_aggregate_index], aggregate_indexes=[os_aggregate_index],
@@ -1763,8 +1710,6 @@ def _main():
tenant_id=opts.graph_tenant_id, tenant_id=opts.graph_tenant_id,
client_id=opts.graph_client_id, client_id=opts.graph_client_id,
client_secret=opts.graph_client_secret, client_secret=opts.graph_client_secret,
certificate_path=opts.graph_certificate_path,
certificate_password=opts.graph_certificate_password,
username=opts.graph_user, username=opts.graph_user,
password=opts.graph_password, password=opts.graph_password,
token_file=opts.graph_token_file, token_file=opts.graph_token_file,
@@ -1796,8 +1741,6 @@ def _main():
paginate_messages=opts.gmail_api_paginate_messages, paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder, reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port, oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
) )
except Exception: except Exception:
@@ -1861,11 +1804,7 @@ def _main():
"smtp_tls_reports": smtp_tls_reports, "smtp_tls_reports": smtp_tls_reports,
} }
try: process_reports(parsing_results)
process_reports(parsing_results)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if opts.smtp_host: if opts.smtp_host:
try: try:
@@ -1910,7 +1849,6 @@ def _main():
dns_timeout=opts.dns_timeout, dns_timeout=opts.dns_timeout,
strip_attachment_payloads=opts.strip_attachment_payloads, strip_attachment_payloads=opts.strip_attachment_payloads,
batch_size=mailbox_batch_size_value, batch_size=mailbox_batch_size_value,
since=opts.mailbox_since,
ip_db_path=opts.ip_db_path, ip_db_path=opts.ip_db_path,
always_use_local_files=opts.always_use_local_files, always_use_local_files=opts.always_use_local_files,
reverse_dns_map_path=opts.reverse_dns_map_path, reverse_dns_map_path=opts.reverse_dns_map_path,
@@ -1921,9 +1859,6 @@ def _main():
except FileExistsError as error: except FileExistsError as error:
logger.error("{0}".format(error.__str__())) logger.error("{0}".format(error.__str__()))
exit(1) exit(1)
except ParserError as error:
logger.error(error.__str__())
exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,3 +1,3 @@
__version__ = "9.2.0" __version__ = "9.1.0"
USER_AGENT = f"parsedmarc/{__version__}" USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -413,8 +413,8 @@ def save_aggregate_report_to_elasticsearch(
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) # type: ignore
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType] report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) # pyright: ignore[reportArgumentType]
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType] domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) # pyright: ignore[reportArgumentType]
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date)))) # pyright: ignore[reportArgumentType] begin_date_query = Q(dict(match=dict(date_begin=begin_date))) # pyright: ignore[reportArgumentType]
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date)))) # pyright: ignore[reportArgumentType] end_date_query = Q(dict(match=dict(date_end=end_date))) # pyright: ignore[reportArgumentType]
if index_suffix is not None: if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix) search_index = "dmarc_aggregate_{0}*".format(index_suffix)

View File

@@ -10,7 +10,6 @@ from typing import List
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
@@ -19,29 +18,7 @@ from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds( def _get_creds(token_file, credentials_file, scopes, oauth2_port):
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
creds = None creds = None
if Path(token_file).exists(): if Path(token_file).exists():
@@ -70,17 +47,8 @@ class GmailConnection(MailboxConnection):
reports_folder: str, reports_folder: str,
oauth2_port: int, oauth2_port: int,
paginate_messages: bool, paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
): ):
creds = _get_creds( creds = _get_creds(token_file, credentials_file, scopes, oauth2_port)
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
self.service = build("gmail", "v1", credentials=creds) self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder) self.reports_label_id = self._find_label_id_for_label(reports_folder)
@@ -158,7 +126,7 @@ class GmailConnection(MailboxConnection):
return urlsafe_b64decode(msg["raw"]).decode(errors="replace") return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id).execute() self.service.users().messages().delete(userId="me", id=message_id)
def move_message(self, message_id: str, folder_name: str): def move_message(self, message_id: str, folder_name: str):
label_id = self._find_label_id_for_label(folder_name) label_id = self._find_label_id_for_label(folder_name)

View File

@@ -12,25 +12,19 @@ from azure.identity import (
UsernamePasswordCredential, UsernamePasswordCredential,
DeviceCodeCredential, DeviceCodeCredential,
ClientSecretCredential, ClientSecretCredential,
CertificateCredential,
TokenCachePersistenceOptions, TokenCachePersistenceOptions,
AuthenticationRecord, AuthenticationRecord,
) )
from msgraph.core import GraphClient from msgraph.core import GraphClient
from requests.exceptions import RequestException
from parsedmarc.log import logger from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection from parsedmarc.mail.mailbox_connection import MailboxConnection
GRAPH_REQUEST_RETRY_ATTEMPTS = 3
GRAPH_REQUEST_RETRY_DELAY_SECONDS = 5
class AuthMethod(Enum): class AuthMethod(Enum):
DeviceCode = 1 DeviceCode = 1
UsernamePassword = 2 UsernamePassword = 2
ClientSecret = 3 ClientSecret = 3
Certificate = 4
def _get_cache_args(token_path: Path, allow_unencrypted_storage): def _get_cache_args(token_path: Path, allow_unencrypted_storage):
@@ -89,55 +83,30 @@ def _generate_credential(auth_method: str, token_path: Path, **kwargs):
tenant_id=kwargs["tenant_id"], tenant_id=kwargs["tenant_id"],
client_secret=kwargs["client_secret"], client_secret=kwargs["client_secret"],
) )
elif auth_method == AuthMethod.Certificate.name:
cert_path = kwargs.get("certificate_path")
if not cert_path:
raise ValueError(
"certificate_path is required when auth_method is 'Certificate'"
)
credential = CertificateCredential(
client_id=kwargs["client_id"],
tenant_id=kwargs["tenant_id"],
certificate_path=cert_path,
password=kwargs.get("certificate_password"),
)
else: else:
raise RuntimeError(f"Auth method {auth_method} not found") raise RuntimeError(f"Auth method {auth_method} not found")
return credential return credential
class MSGraphConnection(MailboxConnection): class MSGraphConnection(MailboxConnection):
_WELL_KNOWN_FOLDERS = {
"inbox": "inbox",
"archive": "archive",
"drafts": "drafts",
"sentitems": "sentitems",
"deleteditems": "deleteditems",
"junkemail": "junkemail",
}
def __init__( def __init__(
self, self,
auth_method: str, auth_method: str,
mailbox: str, mailbox: str,
graph_url: str, graph_url: str,
client_id: str, client_id: str,
client_secret: Optional[str], client_secret: str,
username: Optional[str], username: str,
password: Optional[str], password: str,
tenant_id: str, tenant_id: str,
token_file: str, token_file: str,
allow_unencrypted_storage: bool, allow_unencrypted_storage: bool,
certificate_path: Optional[str] = None,
certificate_password: Optional[Union[str, bytes]] = None,
): ):
token_path = Path(token_file) token_path = Path(token_file)
credential = _generate_credential( credential = _generate_credential(
auth_method, auth_method,
client_id=client_id, client_id=client_id,
client_secret=client_secret, client_secret=client_secret,
certificate_path=certificate_path,
certificate_password=certificate_password,
username=username, username=username,
password=password, password=password,
tenant_id=tenant_id, tenant_id=tenant_id,
@@ -148,10 +117,10 @@ class MSGraphConnection(MailboxConnection):
"credential": credential, "credential": credential,
"cloud": graph_url, "cloud": graph_url,
} }
if not isinstance(credential, (ClientSecretCredential, CertificateCredential)): if not isinstance(credential, ClientSecretCredential):
scopes = ["Mail.ReadWrite"] scopes = ["Mail.ReadWrite"]
# Detect if mailbox is shared # Detect if mailbox is shared
if mailbox and username and username != mailbox: if mailbox and username != mailbox:
scopes = ["Mail.ReadWrite.Shared"] scopes = ["Mail.ReadWrite.Shared"]
auth_record = credential.authenticate(scopes=scopes) auth_record = credential.authenticate(scopes=scopes)
_cache_auth_record(auth_record, token_path) _cache_auth_record(auth_record, token_path)
@@ -160,23 +129,6 @@ class MSGraphConnection(MailboxConnection):
self._client = GraphClient(**client_params) self._client = GraphClient(**client_params)
self.mailbox_name = mailbox self.mailbox_name = mailbox
def _request_with_retries(self, method_name: str, *args, **kwargs):
for attempt in range(1, GRAPH_REQUEST_RETRY_ATTEMPTS + 1):
try:
return getattr(self._client, method_name)(*args, **kwargs)
except RequestException as error:
if attempt == GRAPH_REQUEST_RETRY_ATTEMPTS:
raise
logger.warning(
"Transient MS Graph %s error on attempt %s/%s: %s",
method_name.upper(),
attempt,
GRAPH_REQUEST_RETRY_ATTEMPTS,
error,
)
sleep(GRAPH_REQUEST_RETRY_DELAY_SECONDS)
raise RuntimeError("no retry attempts configured")
def create_folder(self, folder_name: str): def create_folder(self, folder_name: str):
sub_url = "" sub_url = ""
path_parts = folder_name.split("/") path_parts = folder_name.split("/")
@@ -191,7 +143,7 @@ class MSGraphConnection(MailboxConnection):
request_body = {"displayName": folder_name} request_body = {"displayName": folder_name}
request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" request_url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
resp = self._request_with_retries("post", request_url, json=request_body) resp = self._client.post(request_url, json=request_body)
if resp.status_code == 409: if resp.status_code == 409:
logger.debug(f"Folder {folder_name} already exists, skipping creation") logger.debug(f"Folder {folder_name} already exists, skipping creation")
elif resp.status_code == 201: elif resp.status_code == 201:
@@ -221,7 +173,7 @@ class MSGraphConnection(MailboxConnection):
params["$top"] = batch_size params["$top"] = batch_size
else: else:
params["$top"] = 100 params["$top"] = 100
result = self._request_with_retries("get", url, params=params) result = self._client.get(url, params=params)
if result.status_code != 200: if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}") raise RuntimeError(f"Failed to fetch messages {result.text}")
messages = result.json()["value"] messages = result.json()["value"]
@@ -229,7 +181,7 @@ class MSGraphConnection(MailboxConnection):
while "@odata.nextLink" in result.json() and ( while "@odata.nextLink" in result.json() and (
since is not None or (batch_size == 0 or batch_size - len(messages) > 0) since is not None or (batch_size == 0 or batch_size - len(messages) > 0)
): ):
result = self._request_with_retries("get", result.json()["@odata.nextLink"]) result = self._client.get(result.json()["@odata.nextLink"])
if result.status_code != 200: if result.status_code != 200:
raise RuntimeError(f"Failed to fetch messages {result.text}") raise RuntimeError(f"Failed to fetch messages {result.text}")
messages.extend(result.json()["value"]) messages.extend(result.json()["value"])
@@ -238,7 +190,7 @@ class MSGraphConnection(MailboxConnection):
def mark_message_read(self, message_id: str): def mark_message_read(self, message_id: str):
"""Marks a message as read""" """Marks a message as read"""
url = f"/users/{self.mailbox_name}/messages/{message_id}" url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("patch", url, json={"isRead": "true"}) resp = self._client.patch(url, json={"isRead": "true"})
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to mark message read{resp.status_code}: {resp.json()}" f"Failed to mark message read{resp.status_code}: {resp.json()}"
@@ -246,7 +198,7 @@ class MSGraphConnection(MailboxConnection):
def fetch_message(self, message_id: str, **kwargs): def fetch_message(self, message_id: str, **kwargs):
url = f"/users/{self.mailbox_name}/messages/{message_id}/$value" url = f"/users/{self.mailbox_name}/messages/{message_id}/$value"
result = self._request_with_retries("get", url) result = self._client.get(url)
if result.status_code != 200: if result.status_code != 200:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to fetch message{result.status_code}: {result.json()}" f"Failed to fetch message{result.status_code}: {result.json()}"
@@ -258,7 +210,7 @@ class MSGraphConnection(MailboxConnection):
def delete_message(self, message_id: str): def delete_message(self, message_id: str):
url = f"/users/{self.mailbox_name}/messages/{message_id}" url = f"/users/{self.mailbox_name}/messages/{message_id}"
resp = self._request_with_retries("delete", url) resp = self._client.delete(url)
if resp.status_code != 204: if resp.status_code != 204:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to delete message {resp.status_code}: {resp.json()}" f"Failed to delete message {resp.status_code}: {resp.json()}"
@@ -268,7 +220,7 @@ class MSGraphConnection(MailboxConnection):
folder_id = self._find_folder_id_from_folder_path(folder_name) folder_id = self._find_folder_id_from_folder_path(folder_name)
request_body = {"destinationId": folder_id} request_body = {"destinationId": folder_id}
url = f"/users/{self.mailbox_name}/messages/{message_id}/move" url = f"/users/{self.mailbox_name}/messages/{message_id}/move"
resp = self._request_with_retries("post", url, json=request_body) resp = self._client.post(url, json=request_body)
if resp.status_code != 201: if resp.status_code != 201:
raise RuntimeWarning( raise RuntimeWarning(
f"Failed to move message {resp.status_code}: {resp.json()}" f"Failed to move message {resp.status_code}: {resp.json()}"
@@ -296,19 +248,6 @@ class MSGraphConnection(MailboxConnection):
else: else:
return self._find_folder_id_with_parent(folder_name, None) return self._find_folder_id_with_parent(folder_name, None)
def _get_well_known_folder_id(self, folder_name: str) -> Optional[str]:
folder_key = folder_name.lower().replace(" ", "").replace("-", "")
alias = self._WELL_KNOWN_FOLDERS.get(folder_key)
if alias is None:
return None
url = f"/users/{self.mailbox_name}/mailFolders/{alias}?$select=id,displayName"
folder_resp = self._request_with_retries("get", url)
if folder_resp.status_code != 200:
return None
payload = folder_resp.json()
return payload.get("id")
def _find_folder_id_with_parent( def _find_folder_id_with_parent(
self, folder_name: str, parent_folder_id: Optional[str] self, folder_name: str, parent_folder_id: Optional[str]
): ):
@@ -317,12 +256,8 @@ class MSGraphConnection(MailboxConnection):
sub_url = f"/{parent_folder_id}/childFolders" sub_url = f"/{parent_folder_id}/childFolders"
url = f"/users/{self.mailbox_name}/mailFolders{sub_url}" url = f"/users/{self.mailbox_name}/mailFolders{sub_url}"
filter = f"?$filter=displayName eq '{folder_name}'" filter = f"?$filter=displayName eq '{folder_name}'"
folders_resp = self._request_with_retries("get", url + filter) folders_resp = self._client.get(url + filter)
if folders_resp.status_code != 200: if folders_resp.status_code != 200:
if parent_folder_id is None:
well_known_folder_id = self._get_well_known_folder_id(folder_name)
if well_known_folder_id:
return well_known_folder_id
raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}") raise RuntimeWarning(f"Failed to list folders.{folders_resp.json()}")
folders: list = folders_resp.json()["value"] folders: list = folders_resp.json()["value"]
matched_folders = [ matched_folders = [

View File

@@ -55,28 +55,10 @@ class IMAPConnection(MailboxConnection):
return cast(str, self._client.fetch_message(message_id, parse=False)) return cast(str, self._client.fetch_message(message_id, parse=False))
def delete_message(self, message_id: int): def delete_message(self, message_id: int):
try: self._client.delete_messages([message_id])
self._client.delete_messages([message_id])
except IMAPClientError as error:
logger.warning(
"IMAP delete fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.add_flags([message_id], [r"\Deleted"], silent=True)
self._client.expunge()
def move_message(self, message_id: int, folder_name: str): def move_message(self, message_id: int, folder_name: str):
try: self._client.move_messages([message_id], folder_name)
self._client.move_messages([message_id], folder_name)
except IMAPClientError as error:
logger.warning(
"IMAP move fallback for message %s due to server error: %s",
message_id,
error,
)
self._client.copy([message_id], folder_name)
self.delete_message(message_id)
def keepalive(self): def keepalive(self):
self._client.noop() self._client.noop()

View File

@@ -4,9 +4,7 @@ from __future__ import annotations
from typing import Any, Optional, Union from typing import Any, Optional, Union
import boto3
from opensearchpy import ( from opensearchpy import (
AWSV4SignerAuth,
Boolean, Boolean,
Date, Date,
Document, Document,
@@ -17,7 +15,6 @@ from opensearchpy import (
Nested, Nested,
Object, Object,
Q, Q,
RequestsHttpConnection,
Search, Search,
Text, Text,
connections, connections,
@@ -275,9 +272,6 @@ def set_hosts(
password: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
timeout: Optional[float] = 60.0, timeout: Optional[float] = 60.0,
auth_type: str = "basic",
aws_region: Optional[str] = None,
aws_service: str = "es",
): ):
""" """
Sets the OpenSearch hosts to use Sets the OpenSearch hosts to use
@@ -290,9 +284,6 @@ def set_hosts(
password (str): The password to use for authentication password (str): The password to use for authentication
api_key (str): The Base64 encoded API key to use for authentication api_key (str): The Base64 encoded API key to use for authentication
timeout (float): Timeout in seconds timeout (float): Timeout in seconds
auth_type (str): OpenSearch auth mode: basic (default) or awssigv4
aws_region (str): AWS region for SigV4 auth (required for awssigv4)
aws_service (str): AWS service for SigV4 signing (default: es)
""" """
if not isinstance(hosts, list): if not isinstance(hosts, list):
hosts = [hosts] hosts = [hosts]
@@ -304,32 +295,10 @@ def set_hosts(
conn_params["ca_certs"] = ssl_cert_path conn_params["ca_certs"] = ssl_cert_path
else: else:
conn_params["verify_certs"] = False conn_params["verify_certs"] = False
normalized_auth_type = (auth_type or "basic").strip().lower() if username and password:
if normalized_auth_type == "awssigv4": conn_params["http_auth"] = username + ":" + password
if not aws_region: if api_key:
raise OpenSearchError( conn_params["api_key"] = api_key
"OpenSearch AWS SigV4 auth requires 'aws_region' to be set"
)
session = boto3.Session()
credentials = session.get_credentials()
if credentials is None:
raise OpenSearchError(
"Unable to load AWS credentials for OpenSearch SigV4 authentication"
)
conn_params["http_auth"] = AWSV4SignerAuth(
credentials, aws_region, aws_service
)
conn_params["connection_class"] = RequestsHttpConnection
elif normalized_auth_type == "basic":
if username and password:
conn_params["http_auth"] = username + ":" + password
if api_key:
conn_params["api_key"] = api_key
else:
raise OpenSearchError(
f"Unsupported OpenSearch auth_type '{auth_type}'. "
"Expected 'basic' or 'awssigv4'."
)
connections.create_connection(**conn_params) connections.create_connection(**conn_params)
@@ -444,8 +413,8 @@ def save_aggregate_report_to_opensearch(
org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) org_name_query = Q(dict(match_phrase=dict(org_name=org_name)))
report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id)))
domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) domain_query = Q(dict(match_phrase={"published_policy.domain": domain}))
begin_date_query = Q(dict(range=dict(date_begin=dict(gte=begin_date)))) begin_date_query = Q(dict(match=dict(date_begin=begin_date)))
end_date_query = Q(dict(range=dict(date_end=dict(lte=end_date)))) end_date_query = Q(dict(match=dict(date_end=end_date)))
if index_suffix is not None: if index_suffix is not None:
search_index = "dmarc_aggregate_{0}*".format(index_suffix) search_index = "dmarc_aggregate_{0}*".format(index_suffix)

View File

@@ -2,7 +2,7 @@
requires = [ requires = [
"hatchling>=1.27.0", "hatchling>=1.27.0",
] ]
requires_python = ">=3.10,<3.15" requires_python = ">=3.10,<3.14"
build-backend = "hatchling.build" build-backend = "hatchling.build"
[project] [project]
@@ -45,7 +45,7 @@ dependencies = [
"google-auth-httplib2>=0.1.0", "google-auth-httplib2>=0.1.0",
"google-auth-oauthlib>=0.4.6", "google-auth-oauthlib>=0.4.6",
"google-auth>=2.3.3", "google-auth>=2.3.3",
"imapclient>=3.1.0", "imapclient>=2.1.0",
"kafka-python-ng>=2.2.2", "kafka-python-ng>=2.2.2",
"lxml>=4.4.0", "lxml>=4.4.0",
"mailsuite>=1.11.2", "mailsuite>=1.11.2",

1349
tests.py

File diff suppressed because it is too large Load Diff