Merge branch 'master' into copilot/support-dmarcbis-reports

This commit is contained in:
Sean Whalen
2026-03-09 17:20:20 -04:00
committed by GitHub
5 changed files with 259 additions and 16 deletions

View File

@@ -10,7 +10,32 @@ on:
branches: [ master ]
jobs:
build:
lint-docs-build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Check code style
run: |
ruff check .
- name: Test building documentation
run: |
cd docs
make html
- name: Test building packages
run: |
hatch build
test:
needs: lint-docs-build
runs-on: ubuntu-latest
services:
@@ -46,13 +71,6 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .[build]
- name: Test building documentation
run: |
cd docs
make html
- name: Check code style
run: |
ruff check .
- name: Run unit tests
run: |
pytest --cov --cov-report=xml tests.py
@@ -61,9 +79,6 @@ jobs:
pip install -e .
parsedmarc --debug -c ci.ini samples/aggregate/*
parsedmarc --debug -c ci.ini samples/failure/*
- name: Test building packages
run: |
hatch build
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:

View File

@@ -394,10 +394,19 @@ The full set of configuration options are:
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
- `auth_mode` - str: Authentication mode, `installed_app` (default)
or `service_account`
- `service_account_user` - str: Delegated mailbox user for Gmail
service account auth (required for domain-wide delegation). Also
accepted as `delegated_user` for backward compatibility.
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
:::{note}
When `auth_mode = service_account`, `credentials_file` must point to a
Google service account key JSON file, and `token_file` is not used.
:::
- `include_spam_trash` - bool: Include messages in Spam and
Trash when searching reports (Default: `False`)
- `scopes` - str: Comma separated list of scopes to use when

View File

@@ -710,6 +710,8 @@ def _main():
gmail_api_paginate_messages=True,
gmail_api_scopes=[],
gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None,
maildir_create=False,
log_file=args.log_file,
@@ -1295,6 +1297,16 @@ def _main():
opts.gmail_api_oauth2_port = gmail_api_config.getint(
"oauth2_port", 8080
)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"service_account_user"
).strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"delegated_user"
).strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -1755,6 +1767,8 @@ def _main():
paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
)
except Exception:

View File

@@ -10,6 +10,7 @@ from typing import List
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
@@ -18,7 +19,29 @@ from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds(token_file, credentials_file, scopes, oauth2_port):
def _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
creds = None
if Path(token_file).exists():
@@ -47,8 +70,17 @@ class GmailConnection(MailboxConnection):
reports_folder: str,
oauth2_port: int,
paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
):
creds = _get_creds(token_file, credentials_file, scopes, oauth2_port)
creds = _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder)
@@ -126,7 +158,7 @@ class GmailConnection(MailboxConnection):
return urlsafe_b64decode(msg["raw"]).decode(errors="replace")
def delete_message(self, message_id: str):
self.service.users().messages().delete(userId="me", id=message_id)
self.service.users().messages().delete(userId="me", id=message_id).execute()
def move_message(self, message_id: str, folder_name: str):
label_id = self._find_label_id_for_label(folder_name)

177
tests.py
View File

@@ -5,6 +5,8 @@ from __future__ import absolute_import, print_function, unicode_literals
import json
import os
import sys
import tempfile
import unittest
from datetime import datetime, timedelta, timezone
@@ -13,8 +15,7 @@ from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from unittest.mock import MagicMock
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from lxml import etree
from googleapiclient.errors import HttpError
@@ -22,6 +23,7 @@ from httplib2 import Response
from imapclient.exceptions import IMAPClientError
import parsedmarc
import parsedmarc.cli
from parsedmarc.mail.gmail import GmailConnection
from parsedmarc.mail.gmail import _get_creds
from parsedmarc.mail.graph import MSGraphConnection
@@ -1739,6 +1741,7 @@ class TestGmailConnection(unittest.TestCase):
messages_api.modify.assert_called_once()
connection.delete_message("m1")
messages_api.delete.assert_called_once_with(userId="me", id="m1")
messages_api.delete.return_value.execute.assert_called_once()
def testGetCredsFromTokenFile(self):
creds = MagicMock()
@@ -2095,7 +2098,177 @@ class TestImapConnection(unittest.TestCase):
with self.assertRaises(_BreakLoop):
connection.watch(callback, check_timeout=1)
callback.assert_called_once_with(connection)
class TestGmailAuthModes(unittest.TestCase):
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
def testGetCredsServiceAccountWithoutSubject(self, mock_from_service_account_file):
service_creds = MagicMock()
service_creds.with_subject.return_value = MagicMock()
mock_from_service_account_file.return_value = service_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.readonly"],
oauth2_port=8080,
auth_mode="service_account",
service_account_user=None,
)
self.assertIs(creds, service_creds)
mock_from_service_account_file.assert_called_once_with(
"service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.readonly"],
)
service_creds.with_subject.assert_not_called()
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
def testGetCredsServiceAccountWithSubject(self, mock_from_service_account_file):
base_creds = MagicMock()
delegated_creds = MagicMock()
base_creds.with_subject.return_value = delegated_creds
mock_from_service_account_file.return_value = base_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
self.assertIs(creds, delegated_creds)
base_creds.with_subject.assert_called_once_with("dmarc@example.com")
def testGetCredsRejectsUnsupportedAuthMode(self):
with self.assertRaises(ValueError):
gmail_module._get_creds(
token_file=".token",
credentials_file="client-secret.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="unsupported",
)
@patch("parsedmarc.mail.gmail.Path.exists", return_value=True)
@patch("parsedmarc.mail.gmail.Credentials.from_authorized_user_file")
def testGetCredsInstalledAppStillUsesTokenFile(
self, mock_from_authorized_user_file, _mock_exists
):
token_creds = MagicMock()
token_creds.valid = True
mock_from_authorized_user_file.return_value = token_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="client-secret.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="installed_app",
)
self.assertIs(creds, token_creds)
mock_from_authorized_user_file.assert_called_once_with(
".token",
["https://www.googleapis.com/auth/gmail.modify"],
)
@patch("parsedmarc.mail.gmail.GmailConnection._find_label_id_for_label")
@patch("parsedmarc.mail.gmail.build")
@patch("parsedmarc.mail.gmail._get_creds")
def testGmailConnectionPassesAuthModeAndDelegatedUser(
self, mock_get_creds, mock_build, mock_find_label
):
mock_get_creds.return_value = MagicMock()
mock_build.return_value = MagicMock()
mock_find_label.return_value = "INBOX"
gmail_module.GmailConnection(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
include_spam_trash=False,
reports_folder="INBOX",
oauth2_port=8080,
paginate_messages=True,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
mock_get_creds.assert_called_once_with(
".token",
"service-account.json",
["https://www.googleapis.com/auth/gmail.modify"],
8080,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.GmailConnection")
def testCliPassesGmailServiceAccountAuthSettings(
self, mock_gmail_connection, mock_get_mailbox_reports
):
mock_gmail_connection.return_value = MagicMock()
mock_get_mailbox_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
config = """[general]
silent = true
[gmail_api]
credentials_file = /tmp/service-account.json
auth_mode = service_account
service_account_user = dmarc@example.com
scopes = https://www.googleapis.com/auth/gmail.modify
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
cfg_file.write(config)
config_path = cfg_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
parsedmarc.cli._main()
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("auth_mode"), "service_account"
)
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
"dmarc@example.com",
)
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.GmailConnection")
def testCliAcceptsDelegatedUserAlias(self, mock_gmail_connection, mock_get_reports):
mock_gmail_connection.return_value = MagicMock()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
config = """[general]
silent = true
[gmail_api]
credentials_file = /tmp/service-account.json
auth_mode = service_account
delegated_user = delegated@example.com
scopes = https://www.googleapis.com/auth/gmail.modify
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
cfg_file.write(config)
config_path = cfg_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
parsedmarc.cli._main()
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
"delegated@example.com",
)
if __name__ == "__main__":
unittest.main(verbosity=2)