Compare commits

..

9 Commits

Author SHA1 Message Date
Copilot
4d97bd25aa Skip DNS lookups in GitHub Actions to prevent test timeouts (#657)
* Add offline mode for tests in GitHub Actions to skip DNS lookups

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-02-18 18:19:28 -05:00
Copilot
17a612df0c Add TCP and TLS transport support to syslog module (#656)
- Updated parsedmarc/syslog.py to support UDP, TCP, and TLS protocols
- Added protocol parameter with UDP as default for backward compatibility
- Implemented TLS support with CA verification and client certificate auth
- Added retry logic for TCP/TLS connections with configurable attempts and delays
- Updated parsedmarc/cli.py with new config file parsing
- Updated documentation with examples for TCP and TLS configurations

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Remove CLI arguments for syslog options, keep config-file only

Per user request, removed command-line argument options for syslog parameters.
All new syslog options (protocol, TLS settings, timeout, retry) are now only
available via configuration file, consistent with other similar options.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Fix code review issues: remove trailing whitespace and add cert validation

- Removed trailing whitespace from syslog.py and usage.md
- Added warning when only one of certfile_path/keyfile_path is provided
- Improved error handling for incomplete TLS client certificate configuration

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

* Set minimum TLS version to 1.2 for enhanced security

Explicitly configured ssl_context.minimum_version = TLSVersion.TLSv1_2
to ensure only secure TLS versions are used for syslog connections.

Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: seanthegeek <44679+seanthegeek@users.noreply.github.com>
2026-02-18 18:12:59 -05:00
Blackmoon
221bc332ef Fixed a typo in policies.successful_session_count (#654) 2026-02-09 13:57:11 -05:00
Sean Whalen
a2a75f7a81 Fix timestamp parsing in aggregate report by removing fractional seconds 2026-01-21 13:08:48 -05:00
Anael Mobilia
50fcb51577 Update supported Python versions in docs + readme (#652)
* Update README.md

* Update index.md

* Update python-tests.yml
2026-01-19 14:40:01 -05:00
Sean Whalen
dd9ef90773 9.0.10
- Support Python 3.14+
2026-01-17 14:09:18 -05:00
Sean Whalen
0e3a4b0f06 9.0.9
Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
2026-01-08 13:29:23 -05:00
maraspr
343b53ef18 remove newlines before b64decode (#649) 2026-01-08 12:24:20 -05:00
maraspr
792079a3e8 Validate that string is base64 (#648) 2026-01-08 10:15:27 -05:00
15 changed files with 271 additions and 141 deletions

View File

@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v5

View File

@@ -1,5 +1,15 @@
# Changelog
## 9.0.10
- Support Python 3.14+
## 9.0.9
### Fixes
- Validate that a string is base64-encoded before trying to base64 decode it. (PRs #648 and #649)
## 9.0.8
### Fixes

View File

@@ -61,4 +61,4 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
| 3.14 | | Actively maintained |

1
ci.ini
View File

@@ -3,6 +3,7 @@ save_aggregate = True
save_forensic = True
save_smtp_tls = True
debug = True
offline = True
[elasticsearch]
hosts = http://localhost:9200

View File

@@ -29,14 +29,3 @@ token_file = /etc/example/token.json
include_spam_trash = True
paginate_messages = True
scopes = https://www.googleapis.com/auth/gmail.modify
[msgraph]
auth_method = ClientSecret
client_id = 12345678-90ab-cdef-1234-567890abcdef
client_secret = your-client-secret-here
tenant_id = 12345678-90ab-cdef-1234-567890abcdef
mailbox = dmarc-reports@example.com
# Use standard folder names - they work across all locales
# and avoid "Default folder Root not found" errors
reports_folder = Inbox
archive_folder = Archive

View File

@@ -61,7 +61,7 @@ for RHEL or Debian.
| 3.11 | ✅ | Actively maintained; supported until June 2028 (Debian 12) |
| 3.12 | ✅ | Actively maintained; supported until May 2035 (RHEL 10) |
| 3.13 | ✅ | Actively maintained; supported until June 2030 (Debian 13) |
| 3.14 | | Not currently supported due to [this imapclient bug](https://github.com/mjs/imapclient/issues/618)|
| 3.14 | | Actively maintained |
```{toctree}
:caption: 'Contents'

View File

@@ -171,8 +171,8 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}.
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
@@ -229,18 +229,6 @@ The full set of configuration options are:
username, you must grant the app `Mail.ReadWrite.Shared`.
:::
:::{tip}
When configuring folder names (e.g., `reports_folder`, `archive_folder`),
you can use standard folder names like `Inbox`, `Archive`, `Sent Items`, etc.
These will be automatically mapped to Microsoft Graph's well-known folder names,
which works reliably across different mailbox locales and avoids issues with
uninitialized or shared mailboxes. Supported folder names include:
- English: Inbox, Sent Items, Deleted Items, Drafts, Junk Email, Archive, Outbox
- German: Posteingang, Gesendete Elemente, Gelöschte Elemente, Entwürfe, Junk-E-Mail, Archiv
- French: Boîte de réception, Éléments envoyés, Éléments supprimés, Brouillons, Courrier indésirable, Archives
- Spanish: Bandeja de entrada, Elementos enviados, Elementos eliminados, Borradores, Correo no deseado
:::
:::{warning}
If you are using the `ClientSecret` auth method, you need to
grant the `Mail.ReadWrite` (application) permission to the
@@ -252,7 +240,7 @@ The full set of configuration options are:
group and use that as the group id.
```powershell
New-ApplicationAccessPolicy -AccessRight RestrictAccess
New-ApplicationAccessPolicy -AccessRight RestrictAccess
-AppId "<CLIENT_ID>" -PolicyScopeGroupId "<MAILBOX>"
-Description "Restrict access to dmarc reports mailbox."
```
@@ -348,13 +336,65 @@ The full set of configuration options are:
- `secret_access_key` - str: The secret access key (Optional)
- `syslog`
- `server` - str: The Syslog server name or IP address
- `port` - int: The UDP port to use (Default: `514`)
- `port` - int: The port to use (Default: `514`)
- `protocol` - str: The protocol to use: `udp`, `tcp`, or `tls` (Default: `udp`)
- `cafile_path` - str: Path to CA certificate file for TLS server verification (Optional)
- `certfile_path` - str: Path to client certificate file for TLS authentication (Optional)
- `keyfile_path` - str: Path to client private key file for TLS authentication (Optional)
- `timeout` - float: Connection timeout in seconds for TCP/TLS (Default: `5.0`)
- `retry_attempts` - int: Number of retry attempts for failed connections (Default: `3`)
- `retry_delay` - int: Delay in seconds between retry attempts (Default: `5`)
**Example UDP configuration (default):**
```ini
[syslog]
server = syslog.example.com
port = 514
```
**Example TCP configuration:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tcp
timeout = 10.0
retry_attempts = 5
```
**Example TLS configuration with server verification:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
timeout = 10.0
```
**Example TLS configuration with mutual authentication:**
```ini
[syslog]
server = syslog.example.com
port = 6514
protocol = tls
cafile_path = /path/to/ca-cert.pem
certfile_path = /path/to/client-cert.pem
keyfile_path = /path/to/client-key.pem
timeout = 10.0
retry_attempts = 3
retry_delay = 5
```
- `gmail_api`
- `credentials_file` - str: Path to file containing the
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
@@ -454,7 +494,7 @@ Update the limit to 2k per example:
PUT _cluster/settings
{
"persistent" : {
"cluster.max_shards_per_node" : 2000
"cluster.max_shards_per_node" : 2000
}
}
```

File diff suppressed because one or more lines are too long

View File

@@ -751,8 +751,8 @@ def parse_aggregate_report_xml(
new_report_metadata["report_id"] = report_id
date_range = report["report_metadata"]["date_range"]
begin_ts = int(date_range["begin"])
end_ts = int(date_range["end"])
begin_ts = int(date_range["begin"].split(".")[0])
end_ts = int(date_range["end"].split(".")[0])
span_seconds = end_ts - begin_ts
normalize_timespan = span_seconds > normalize_timespan_threshold_hours * 3600
@@ -892,7 +892,11 @@ def extract_report(content: Union[bytes, str, BinaryIO]) -> str:
try:
if isinstance(content, str):
try:
file_object = BytesIO(b64decode(content))
file_object = BytesIO(
b64decode(
content.replace("\n", "").replace("\r", ""), validate=True
)
)
except binascii.Error:
return content
header = file_object.read(6)

View File

@@ -697,6 +697,13 @@ def _main():
s3_secret_access_key=None,
syslog_server=None,
syslog_port=None,
syslog_protocol=None,
syslog_cafile_path=None,
syslog_certfile_path=None,
syslog_keyfile_path=None,
syslog_timeout=None,
syslog_retry_attempts=None,
syslog_retry_delay=None,
gmail_api_credentials_file=None,
gmail_api_token_file=None,
gmail_api_include_spam_trash=False,
@@ -1239,6 +1246,28 @@ def _main():
opts.syslog_port = syslog_config["port"]
else:
opts.syslog_port = 514
if "protocol" in syslog_config:
opts.syslog_protocol = syslog_config["protocol"]
else:
opts.syslog_protocol = "udp"
if "cafile_path" in syslog_config:
opts.syslog_cafile_path = syslog_config["cafile_path"]
if "certfile_path" in syslog_config:
opts.syslog_certfile_path = syslog_config["certfile_path"]
if "keyfile_path" in syslog_config:
opts.syslog_keyfile_path = syslog_config["keyfile_path"]
if "timeout" in syslog_config:
opts.syslog_timeout = float(syslog_config["timeout"])
else:
opts.syslog_timeout = 5.0
if "retry_attempts" in syslog_config:
opts.syslog_retry_attempts = int(syslog_config["retry_attempts"])
else:
opts.syslog_retry_attempts = 3
if "retry_delay" in syslog_config:
opts.syslog_retry_delay = int(syslog_config["retry_delay"])
else:
opts.syslog_retry_delay = 5
if "gmail_api" in config.sections():
gmail_api_config = config["gmail_api"]
@@ -1436,6 +1465,13 @@ def _main():
syslog_client = syslog.SyslogClient(
server_name=opts.syslog_server,
server_port=int(opts.syslog_port),
protocol=opts.syslog_protocol or "udp",
cafile_path=opts.syslog_cafile_path,
certfile_path=opts.syslog_certfile_path,
keyfile_path=opts.syslog_keyfile_path,
timeout=opts.syslog_timeout if opts.syslog_timeout is not None else 5.0,
retry_attempts=opts.syslog_retry_attempts if opts.syslog_retry_attempts is not None else 3,
retry_delay=opts.syslog_retry_delay if opts.syslog_retry_delay is not None else 5,
)
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

View File

@@ -1,3 +1,3 @@
__version__ = "9.0.8"
__version__ = "9.0.10"
USER_AGENT = f"parsedmarc/{__version__}"

View File

@@ -20,59 +20,6 @@ from msgraph.core import GraphClient
from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
# Mapping of common folder names to Microsoft Graph well-known folder names
# This avoids the "Default folder Root not found" error on uninitialized mailboxes
WELL_KNOWN_FOLDER_MAP = {
# English names
"inbox": "inbox",
"sent items": "sentitems",
"sent": "sentitems",
"sentitems": "sentitems",
"deleted items": "deleteditems",
"deleted": "deleteditems",
"deleteditems": "deleteditems",
"trash": "deleteditems",
"drafts": "drafts",
"junk email": "junkemail",
"junk": "junkemail",
"junkemail": "junkemail",
"spam": "junkemail",
"archive": "archive",
"outbox": "outbox",
"conversation history": "conversationhistory",
"conversationhistory": "conversationhistory",
# German names
"posteingang": "inbox",
"gesendete elemente": "sentitems",
"gesendet": "sentitems",
"gelöschte elemente": "deleteditems",
"gelöscht": "deleteditems",
"entwürfe": "drafts",
"junk-e-mail": "junkemail",
"archiv": "archive",
"postausgang": "outbox",
# French names
"boîte de réception": "inbox",
"éléments envoyés": "sentitems",
"envoyés": "sentitems",
"éléments supprimés": "deleteditems",
"supprimés": "deleteditems",
"brouillons": "drafts",
"courrier indésirable": "junkemail",
"archives": "archive",
"boîte d'envoi": "outbox",
# Spanish names
"bandeja de entrada": "inbox",
"elementos enviados": "sentitems",
"enviados": "sentitems",
"elementos eliminados": "deleteditems",
"eliminados": "deleteditems",
"borradores": "drafts",
"correo no deseado": "junkemail",
"archivar": "archive",
"bandeja de salida": "outbox",
}
class AuthMethod(Enum):
DeviceCode = 1
@@ -183,13 +130,6 @@ class MSGraphConnection(MailboxConnection):
self.mailbox_name = mailbox
def create_folder(self, folder_name: str):
# Check if this is a well-known folder - they already exist and cannot be created
if "/" not in folder_name:
well_known_name = WELL_KNOWN_FOLDER_MAP.get(folder_name.lower())
if well_known_name:
logger.debug(f"Folder '{folder_name}' is a well-known folder, skipping creation")
return
sub_url = ""
path_parts = folder_name.split("/")
if len(path_parts) > 1: # Folder is a subFolder
@@ -306,12 +246,6 @@ class MSGraphConnection(MailboxConnection):
parent_folder_id = folder_id
return self._find_folder_id_with_parent(path_parts[-1], parent_folder_id)
else:
# Check if this is a well-known folder name (case-insensitive)
well_known_name = WELL_KNOWN_FOLDER_MAP.get(folder_name.lower())
if well_known_name:
# Use well-known folder name directly to avoid querying uninitialized mailboxes
logger.debug(f"Using well-known folder name '{well_known_name}' for '{folder_name}'")
return well_known_name
return self._find_folder_id_with_parent(folder_name, None)
def _find_folder_id_with_parent(

View File

@@ -6,7 +6,10 @@ from __future__ import annotations
import json
import logging
import logging.handlers
from typing import Any
import socket
import ssl
import time
from typing import Any, Optional
from parsedmarc import (
parsed_aggregate_reports_to_csv_rows,
@@ -18,20 +21,150 @@ from parsedmarc import (
class SyslogClient(object):
"""A client for Syslog"""
def __init__(self, server_name: str, server_port: int):
def __init__(
self,
server_name: str,
server_port: int,
protocol: str = "udp",
cafile_path: Optional[str] = None,
certfile_path: Optional[str] = None,
keyfile_path: Optional[str] = None,
timeout: float = 5.0,
retry_attempts: int = 3,
retry_delay: int = 5,
):
"""
Initializes the SyslogClient
Args:
server_name (str): The Syslog server
server_port (int): The Syslog UDP port
server_port (int): The Syslog port
protocol (str): The protocol to use: "udp", "tcp", or "tls" (Default: "udp")
cafile_path (str): Path to CA certificate file for TLS server verification (Optional)
certfile_path (str): Path to client certificate file for TLS authentication (Optional)
keyfile_path (str): Path to client private key file for TLS authentication (Optional)
timeout (float): Connection timeout in seconds for TCP/TLS (Default: 5.0)
retry_attempts (int): Number of retry attempts for failed connections (Default: 3)
retry_delay (int): Delay in seconds between retry attempts (Default: 5)
"""
self.server_name = server_name
self.server_port = server_port
self.protocol = protocol.lower()
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger("parsedmarc_syslog")
self.logger.setLevel(logging.INFO)
log_handler = logging.handlers.SysLogHandler(address=(server_name, server_port))
# Create the appropriate syslog handler based on protocol
log_handler = self._create_syslog_handler(
server_name,
server_port,
self.protocol,
cafile_path,
certfile_path,
keyfile_path,
timeout,
retry_attempts,
retry_delay,
)
self.logger.addHandler(log_handler)
def _create_syslog_handler(
self,
server_name: str,
server_port: int,
protocol: str,
cafile_path: Optional[str],
certfile_path: Optional[str],
keyfile_path: Optional[str],
timeout: float,
retry_attempts: int,
retry_delay: int,
) -> logging.handlers.SysLogHandler:
"""
Creates a SysLogHandler with the specified protocol and TLS settings
"""
if protocol == "udp":
# UDP protocol (default, backward compatible)
return logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_DGRAM,
)
elif protocol in ["tcp", "tls"]:
# TCP or TLS protocol with retry logic
for attempt in range(1, retry_attempts + 1):
try:
if protocol == "tcp":
# TCP without TLS
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Set timeout on the socket
if hasattr(handler, "socket") and handler.socket:
handler.socket.settimeout(timeout)
return handler
else:
# TLS protocol
# Create SSL context with secure defaults
ssl_context = ssl.create_default_context()
# Explicitly set minimum TLS version to 1.2 for security
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
# Configure server certificate verification
if cafile_path:
ssl_context.load_verify_locations(cafile=cafile_path)
# Configure client certificate authentication
if certfile_path and keyfile_path:
ssl_context.load_cert_chain(
certfile=certfile_path,
keyfile=keyfile_path,
)
elif certfile_path or keyfile_path:
# Warn if only one of the two required parameters is provided
self.logger.warning(
"Both certfile_path and keyfile_path are required for "
"client certificate authentication. Client authentication "
"will not be used."
)
# Create TCP handler first
handler = logging.handlers.SysLogHandler(
address=(server_name, server_port),
socktype=socket.SOCK_STREAM,
)
# Wrap socket with TLS
if hasattr(handler, "socket") and handler.socket:
handler.socket = ssl_context.wrap_socket(
handler.socket,
server_hostname=server_name,
)
handler.socket.settimeout(timeout)
return handler
except Exception as e:
if attempt < retry_attempts:
self.logger.warning(
f"Syslog connection attempt {attempt}/{retry_attempts} failed: {e}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
self.logger.error(
f"Syslog connection failed after {retry_attempts} attempts: {e}"
)
raise
else:
raise ValueError(
f"Invalid protocol '{protocol}'. Must be 'udp', 'tcp', or 'tls'."
)
def save_aggregate_report_to_syslog(self, aggregate_reports: list[dict[str, Any]]):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:

View File

@@ -29,7 +29,7 @@ classifiers = [
"Operating System :: OS Independent",
"Programming Language :: Python :: 3"
]
requires-python = ">=3.9, <3.14"
requires-python = ">=3.9"
dependencies = [
"azure-identity>=1.8.0",
"azure-monitor-ingestion>=1.0.0",
@@ -48,7 +48,7 @@ dependencies = [
"imapclient>=2.1.0",
"kafka-python-ng>=2.2.2",
"lxml>=4.4.0",
"mailsuite>=1.11.1",
"mailsuite>=1.11.2",
"msgraph-core==0.2.2",
"opensearch-py>=2.4.2,<=3.0.0",
"publicsuffixlist>=0.10.0",

View File

@@ -12,6 +12,9 @@ from lxml import etree
import parsedmarc
import parsedmarc.utils
# Detect if running in GitHub Actions to skip DNS lookups
OFFLINE_MODE = os.environ.get("GITHUB_ACTIONS", "false").lower() == "true"
def minify_xml(xml_string):
parser = etree.XMLParser(remove_blank_text=True)
@@ -121,7 +124,7 @@ class Test(unittest.TestCase):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True
sample_path, always_use_local_files=True, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_aggregate_reports_to_csv(parsed_report)
print("Passed!")
@@ -129,7 +132,7 @@ class Test(unittest.TestCase):
def testEmptySample(self):
"""Test empty/unparasable report"""
with self.assertRaises(parsedmarc.ParserError):
parsedmarc.parse_report_file("samples/empty.xml")
parsedmarc.parse_report_file("samples/empty.xml", offline=OFFLINE_MODE)
def testForensicSamples(self):
"""Test sample forensic/ruf/failure DMARC reports"""
@@ -139,8 +142,12 @@ class Test(unittest.TestCase):
print("Testing {0}: ".format(sample_path), end="")
with open(sample_path) as sample_file:
sample_content = sample_file.read()
parsed_report = parsedmarc.parse_report_email(sample_content)["report"]
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsed_report = parsedmarc.parse_report_email(
sample_content, offline=OFFLINE_MODE
)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_forensic_reports_to_csv(parsed_report)
print("Passed!")
@@ -152,36 +159,12 @@ class Test(unittest.TestCase):
if os.path.isdir(sample_path):
continue
print("Testing {0}: ".format(sample_path), end="")
parsed_report = parsedmarc.parse_report_file(sample_path)["report"]
parsed_report = parsedmarc.parse_report_file(
sample_path, offline=OFFLINE_MODE
)["report"]
parsedmarc.parsed_smtp_tls_reports_to_csv(parsed_report)
print("Passed!")
def testMSGraphWellKnownFolders(self):
"""Test MSGraph well-known folder name mapping"""
from parsedmarc.mail.graph import WELL_KNOWN_FOLDER_MAP
# Test English folder names
assert WELL_KNOWN_FOLDER_MAP.get("inbox") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("sent items") == "sentitems"
assert WELL_KNOWN_FOLDER_MAP.get("deleted items") == "deleteditems"
assert WELL_KNOWN_FOLDER_MAP.get("archive") == "archive"
# Test case insensitivity - simulating how the code actually uses it
# This is what happens when user config has "reports_folder = Inbox"
assert WELL_KNOWN_FOLDER_MAP.get("inbox") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Inbox".lower()) == "inbox" # User's exact config
assert WELL_KNOWN_FOLDER_MAP.get("INBOX".lower()) == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Archive".lower()) == "archive"
# Test German folder names
assert WELL_KNOWN_FOLDER_MAP.get("posteingang") == "inbox"
assert WELL_KNOWN_FOLDER_MAP.get("Posteingang".lower()) == "inbox" # Capitalized
assert WELL_KNOWN_FOLDER_MAP.get("archiv") == "archive"
# Test that custom folders don't match
assert WELL_KNOWN_FOLDER_MAP.get("custom_folder") is None
assert WELL_KNOWN_FOLDER_MAP.get("my_reports") is None
if __name__ == "__main__":
unittest.main(verbosity=2)