Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
b1e76c8569 Fix IMAP since option date format to use RFC 3501 compliant DD-Mon-YYYY format
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2025-12-29 17:49:27 +00:00
copilot-swe-agent[bot]
a5de313a57 Initial plan 2025-12-29 17:43:14 +00:00
13 changed files with 40 additions and 376 deletions

View File

@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v5

View File

@@ -1,36 +1,5 @@
# Changelog
## 9.1.0
## Enhancements
- Add TCP and TLS support for syslog output. (#656)
- Skip DNS lookups in GitHub Actions to prevent DNS timeouts during tests timeouts. (#657)
- Remove microseconds from DMARC aggregate report time ranges before parsing them.
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes
- Fix logging configuration not propagating to child parser processes (#646).
- Update `mailsuite` dependency to `?=1.11.1` to solve issues with iCloud IMAP (#493).
## 9.0.7
## Fixes
- Fix IMAP `since` option (#PR 645 closes issues #581 and #643).
## 9.0.6
### Fixes

View File

@@ -61,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Actively maintained |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|

1
ci.ini
View File

@@ -3,7 +3,6 @@ save_aggregate = True
save_forensic = True
save_smtp_tls = True
debug = True
offline = True
[elasticsearch]
hosts = http://localhost:9200

View File

@@ -61,7 +61,7 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Actively maintained |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
```{toctree}
:caption: 'Contents'

View File

@@ -171,8 +171,8 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
@@ -240,7 +240,7 @@ The full set of configuration options are:
group and use that as the group id.
```powershell
New-ApplicationAccessPolicy -AccessRight RestrictAccess
New-ApplicationAccessPolicy -AccessRight RestrictAccess
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
-Description "Restrict access to dmarc reports mailbox."
```
@@ -336,65 +336,13 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional)
- `syslog`
- `server` - str: The Syslog server name or IP address
- `port` - int: The port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `port` - int: The UDP port to use (Default: `514`)
- `gmail_api`
- `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
@@ -494,7 +442,7 @@ Update the limit to 2k per example:
PUT _cluster/settings
{
"persistent" : {
"cluster.max_shards_per_node" : 2000
"cluster.max_shards_per_node" : 2000
}
}
```

File diff suppressed because one or more lines are too long

View File

@@ -595,12 +595,18 @@ def parsed_smtp_tls_reports_to_csv_rows(
if "mx_host_patterns" in policy:
record["mx_host_patterns"] = "|".join(policy["mx_host_patterns"])
successful_record = record.copy()
successful_record["policy_domain"] = policy["policy_domain"]
successful_record["policy_type"] = policy["policy_type"]
successful_record["policy_domain"] = policy[
"policy_domain"
]
successful_record["policy_type"] = policy[
"policy_type"
]
successful_record["successful_session_count"] = policy[
"successful_session_count"
]
successful_record["failed_session_count"] = policy["failed_session_count"]
successful_record["failed_session_count"] = policy[
"failed_session_count"
]
rows.append(successful_record)
if "failure_details" in policy:
for failure_details in policy["failure_details"]:
@@ -751,8 +757,8 @@ def parse_aggregate_report_xml(
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
begin_ts = int(date_range["begin"].split(".")[0])
end_ts = int(date_range["end"].split(".")[0])
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
span_seconds = end_ts - begin_ts
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
@@ -892,11 +898,7 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
try:
if isinstance(content, str):
try:
file_object = BytesIO(
b64decode(
content.replace("\n", "").replace("\r", ""), validate=True
)
)
file_object = BytesIO(b64decode(content))
except binascii.Error:
return content
header = file_object.read(6)
@@ -1946,9 +1948,7 @@ def get_dmarc_reports_from_mailbox(
"Only days and weeks values in 'since' option are \
considered for IMAP connections. Examples: 2d or 1w"
)
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime(
"%d-%b-%Y"
)
since = (datetime.now(timezone.utc) - timedelta(minutes=_since)).strftime("%d-%b-%Y")
current_time = datetime.now(timezone.utc).strftime("%d-%b-%Y")
elif isinstance(connection, MSGraphConnection):
since = (

View File

@@ -46,7 +46,6 @@ from parsedmarc.mail import (
MSGraphConnection,
)
from parsedmarc.mail.graph import AuthMethod
from parsedmarc.types import ParsingResults
from parsedmarc.utils import get_base_domain, get_reverse_dns, is_mbox
# Increase the max header limit for very large emails. `_MAXHEADERS` is a
@@ -68,48 +67,6 @@ def _str_to_list(s):
return list(map(lambda i: i.lstrip(), _list))
def _configure_logging(log_level, log_file=None):
"""
Configure logging for the current process.
This is needed for child processes to properly log messages.
Args:
log_level: The logging level (e.g., logging.DEBUG, logging.WARNING)
log_file: Optional path to log file
"""
# Get the logger
from parsedmarc.log import logger
# Set the log level
logger.setLevel(log_level)
# Add StreamHandler with formatter if not already present
# Check if we already have a StreamHandler to avoid duplicates
# Use exact type check to distinguish from FileHandler subclass
has_stream_handler = any(type(h) is logging.StreamHandler for h in logger.handlers)
if not has_stream_handler:
formatter = logging.Formatter(
fmt="%(levelname)8s:%(filename)s:%(lineno)d:%(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Add FileHandler if log_file is specified
if log_file:
try:
fh = logging.FileHandler(log_file, "a")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
except (IOError, OSError, PermissionError) as error:
logger.warning("Unable to write to log file: {}".format(error))
def cli_parse(
file_path,
sa,
@@ -122,29 +79,8 @@ def cli_parse(
reverse_dns_map_url,
normalize_timespan_threshold_hours,
conn,
log_level=logging.ERROR,
log_file=None,
):
"""Separated this function for multiprocessing
Args:
file_path: Path to the report file
sa: Strip attachment payloads flag
nameservers: List of nameservers
dns_timeout: DNS timeout
ip_db_path: Path to IP database
offline: Offline mode flag
always_use_local_files: Always use local files flag
reverse_dns_map_path: Path to reverse DNS map
reverse_dns_map_url: URL to reverse DNS map
normalize_timespan_threshold_hours: Timespan threshold
conn: Pipe connection for IPC
log_level: Logging level for this process
log_file: Optional path to log file
"""
# Configure logging in this child process
_configure_logging(log_level, log_file)
"""Separated this function for multiprocessing"""
try:
file_results = parse_report_file(
file_path,
@@ -697,13 +633,6 @@ def _main():
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
@@ -1246,28 +1175,6 @@ def _main():
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
@@ -1465,17 +1372,6 @@ def _main():
syslog_client = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts
if opts.syslog_retry_attempts is not None
else 3,
retry_delay=opts.syslog_retry_delay
if opts.syslog_retry_delay is not None
else 5,
)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))
@@ -1565,10 +1461,6 @@ def _main():
if n_procs < 1:
n_procs = 1
# Capture the current log level to pass to child processes
current_log_level = logger.level
current_log_file = opts.log_file
for batch_index in range((len(file_paths) + n_procs - 1) // n_procs):
processes = []
connections = []
@@ -1594,8 +1486,6 @@ def _main():
opts.reverse_dns_map_url,
opts.normalize_timespan_threshold_hours,
child_conn,
current_log_level,
current_log_file,
),
)
processes.append(process)
@@ -1798,13 +1688,13 @@ def _main():
logger.exception("Mailbox Error")
exit(1)
parsing_results: ParsingResults = {
results = {
"aggregate_reports": aggregate_reports,
"forensic_reports": forensic_reports,
"smtp_tls_reports": smtp_tls_reports,
}
process_reports(parsing_results)
process_reports(results)
if opts.smtp_host:
try:
@@ -1818,7 +1708,7 @@ def _main():
else _str_to_list(str(opts.smtp_to))
)
email_results(
parsing_results,
results,
opts.smtp_host,
opts.smtp_from,
smtp_to_value,

View File

@@ -1,3 +1,3 @@
__version__ = "9.1.0"
__version__ = "9.0.6"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -6,10 +6,7 @@ from __future__ import annotations
import json
import logging
import logging.handlers
import socket
import ssl
import time
from typing import Any, Optional
from typing import Any
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
@@ -21,150 +18,20 @@ from parsedmarc import (
class SyslogClient(object):
"""A client for Syslog"""
def __init__(
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
def __init__(self, server_name: str, server_port: int):
"""
Initializes the SyslogClient
Args:
server_name (str): The Syslog server
server_port (int): The Syslog port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
server_port (int): The Syslog UDP port
"""
self.server_name = server_name
self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
self.logger.addHandler(log_handler)
def _create_syslog_handler(
self,
server_name: str,
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:

View File

@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent",
"Programming Language :: Python :: 3"
]
requires-python = ">=3.9"
requires-python = ">=3.9, <3.14"
dependencies = [
"azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0",
@@ -48,7 +48,7 @@ dependencies = [
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.2",
"mailsuite>=1.11.0",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",

View File

@@ -12,9 +12,6 @@ from lxml import etree
import parsedmarc
import parsedmarc.utils
# Detect if running in GitHub Actions to skip DNS lookups
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
def minify_xml(xml_string):
parser = etree.XMLParser(remove_blank_text=True)
@@ -124,7 +121,7 @@ class Test(unittest.TestCase):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
sample_path, always_use_local_files=True
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
print("Passed!")
@@ -132,7 +129,7 @@ class Test(unittest.TestCase):
def testEmptySample(self):
"""Test empty/unparasable report"""
with self.assertRaises(parsedmarc.ParserError):
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE)
parsedmarc.parse_report_file("samples/empty.xml")
def testForensicSamples(self):
"""Test sample forensic/ruf/failure DMARC reports"""
@@ -142,12 +139,8 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_email(sample_content)["report"]
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
print("Passed!")
@@ -159,9 +152,7 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")