Add Gmail service account auth mode with delegated user support (#676)

This commit is contained in:
Kili
2026-03-09 22:04:30 +01:00
committed by GitHub
parent d49ce6a13f
commit a3c5bb906b
4 changed files with 235 additions and 4 deletions

View File

@@ -394,10 +394,19 @@ The full set of configuration options are:
credentials, None to disable (Default: `None`)
- `token_file` - str: Path to save the token file
(Default: `.token`)
- `auth_mode` - str: Authentication mode, `installed_app` (default)
or `service_account`
- `service_account_user` - str: Delegated mailbox user for Gmail
service account auth (required for domain-wide delegation). Also
accepted as `delegated_user` for backward compatibility.
:::{note}
credentials_file and token_file can be got with [quickstart](https://developers.google.com/gmail/api/quickstart/python).Please change the scope to `https://www.googleapis.com/auth/gmail.modify`.
:::
:::{note}
When `auth_mode = service_account`, `credentials_file` must point to a
Google service account key JSON file, and `token_file` is not used.
:::
- `include_spam_trash` - bool: Include messages in Spam and
Trash when searching reports (Default: `False`)
- `scopes` - str: Comma separated list of scopes to use when

View File

@@ -710,6 +710,8 @@ def _main():
gmail_api_paginate_messages=True,
gmail_api_scopes=[],
gmail_api_oauth2_port=8080,
gmail_api_auth_mode="installed_app",
gmail_api_service_account_user=None,
maildir_path=None,
maildir_create=False,
log_file=args.log_file,
@@ -1287,6 +1289,16 @@ def _main():
opts.gmail_api_oauth2_port = gmail_api_config.getint(
"oauth2_port", 8080
)
if "auth_mode" in gmail_api_config:
opts.gmail_api_auth_mode = gmail_api_config.get("auth_mode").strip()
if "service_account_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"service_account_user"
).strip()
elif "delegated_user" in gmail_api_config:
opts.gmail_api_service_account_user = gmail_api_config.get(
"delegated_user"
).strip()
if "maildir" in config.sections():
maildir_api_config = config["maildir"]
@@ -1741,6 +1753,8 @@ def _main():
paginate_messages=opts.gmail_api_paginate_messages,
reports_folder=opts.mailbox_reports_folder,
oauth2_port=opts.gmail_api_oauth2_port,
auth_mode=opts.gmail_api_auth_mode,
service_account_user=opts.gmail_api_service_account_user,
)
except Exception:

View File

@@ -10,6 +10,7 @@ from typing import List
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
@@ -18,7 +19,29 @@ from parsedmarc.log import logger
from parsedmarc.mail.mailbox_connection import MailboxConnection
def _get_creds(token_file, credentials_file, scopes, oauth2_port):
def _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode="installed_app",
service_account_user=None,
):
normalized_auth_mode = (auth_mode or "installed_app").strip().lower()
if normalized_auth_mode == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=scopes,
)
if service_account_user:
creds = creds.with_subject(service_account_user)
return creds
if normalized_auth_mode != "installed_app":
raise ValueError(
f"Unsupported Gmail auth_mode '{auth_mode}'. "
"Expected 'installed_app' or 'service_account'."
)
creds = None
if Path(token_file).exists():
@@ -47,8 +70,17 @@ class GmailConnection(MailboxConnection):
reports_folder: str,
oauth2_port: int,
paginate_messages: bool,
auth_mode: str = "installed_app",
service_account_user: str | None = None,
):
creds = _get_creds(token_file, credentials_file, scopes, oauth2_port)
creds = _get_creds(
token_file,
credentials_file,
scopes,
oauth2_port,
auth_mode=auth_mode,
service_account_user=service_account_user,
)
self.service = build("gmail", "v1", credentials=creds)
self.include_spam_trash = include_spam_trash
self.reports_label_id = self._find_label_id_for_label(reports_folder)

180
tests.py
View File

@@ -4,13 +4,14 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import sys
import tempfile
import unittest
from base64 import urlsafe_b64encode
from glob import glob
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from unittest.mock import MagicMock
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from lxml import etree
from googleapiclient.errors import HttpError
@@ -18,6 +19,7 @@ from httplib2 import Response
from imapclient.exceptions import IMAPClientError
import parsedmarc
import parsedmarc.cli
from parsedmarc.mail.gmail import GmailConnection
from parsedmarc.mail.gmail import _get_creds
from parsedmarc.mail.graph import MSGraphConnection
@@ -613,5 +615,179 @@ class TestImapConnection(unittest.TestCase):
callback.assert_called_once_with(connection)
class TestGmailAuthModes(unittest.TestCase):
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
def testGetCredsServiceAccountWithoutSubject(self, mock_from_service_account_file):
service_creds = MagicMock()
service_creds.with_subject.return_value = MagicMock()
mock_from_service_account_file.return_value = service_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.readonly"],
oauth2_port=8080,
auth_mode="service_account",
service_account_user=None,
)
self.assertIs(creds, service_creds)
mock_from_service_account_file.assert_called_once_with(
"service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.readonly"],
)
service_creds.with_subject.assert_not_called()
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
def testGetCredsServiceAccountWithSubject(self, mock_from_service_account_file):
base_creds = MagicMock()
delegated_creds = MagicMock()
base_creds.with_subject.return_value = delegated_creds
mock_from_service_account_file.return_value = base_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
self.assertIs(creds, delegated_creds)
base_creds.with_subject.assert_called_once_with("dmarc@example.com")
def testGetCredsRejectsUnsupportedAuthMode(self):
with self.assertRaises(ValueError):
gmail_module._get_creds(
token_file=".token",
credentials_file="client-secret.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="unsupported",
)
@patch("parsedmarc.mail.gmail.Path.exists", return_value=True)
@patch("parsedmarc.mail.gmail.Credentials.from_authorized_user_file")
def testGetCredsInstalledAppStillUsesTokenFile(
self, mock_from_authorized_user_file, _mock_exists
):
token_creds = MagicMock()
token_creds.valid = True
mock_from_authorized_user_file.return_value = token_creds
creds = gmail_module._get_creds(
token_file=".token",
credentials_file="client-secret.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
oauth2_port=8080,
auth_mode="installed_app",
)
self.assertIs(creds, token_creds)
mock_from_authorized_user_file.assert_called_once_with(
".token",
["https://www.googleapis.com/auth/gmail.modify"],
)
@patch("parsedmarc.mail.gmail.GmailConnection._find_label_id_for_label")
@patch("parsedmarc.mail.gmail.build")
@patch("parsedmarc.mail.gmail._get_creds")
def testGmailConnectionPassesAuthModeAndDelegatedUser(
self, mock_get_creds, mock_build, mock_find_label
):
mock_get_creds.return_value = MagicMock()
mock_build.return_value = MagicMock()
mock_find_label.return_value = "INBOX"
gmail_module.GmailConnection(
token_file=".token",
credentials_file="service-account.json",
scopes=["https://www.googleapis.com/auth/gmail.modify"],
include_spam_trash=False,
reports_folder="INBOX",
oauth2_port=8080,
paginate_messages=True,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
mock_get_creds.assert_called_once_with(
".token",
"service-account.json",
["https://www.googleapis.com/auth/gmail.modify"],
8080,
auth_mode="service_account",
service_account_user="dmarc@example.com",
)
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.GmailConnection")
def testCliPassesGmailServiceAccountAuthSettings(
self, mock_gmail_connection, mock_get_mailbox_reports
):
mock_gmail_connection.return_value = MagicMock()
mock_get_mailbox_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
config = """[general]
silent = true
[gmail_api]
credentials_file = /tmp/service-account.json
auth_mode = service_account
service_account_user = dmarc@example.com
scopes = https://www.googleapis.com/auth/gmail.modify
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
cfg_file.write(config)
config_path = cfg_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
parsedmarc.cli._main()
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("auth_mode"), "service_account"
)
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
"dmarc@example.com",
)
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
@patch("parsedmarc.cli.GmailConnection")
def testCliAcceptsDelegatedUserAlias(self, mock_gmail_connection, mock_get_reports):
mock_gmail_connection.return_value = MagicMock()
mock_get_reports.return_value = {
"aggregate_reports": [],
"forensic_reports": [],
"smtp_tls_reports": [],
}
config = """[general]
silent = true
[gmail_api]
credentials_file = /tmp/service-account.json
auth_mode = service_account
delegated_user = delegated@example.com
scopes = https://www.googleapis.com/auth/gmail.modify
"""
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg_file:
cfg_file.write(config)
config_path = cfg_file.name
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
with patch.object(sys, "argv", ["parsedmarc", "-c", config_path]):
parsedmarc.cli._main()
self.assertEqual(
mock_gmail_connection.call_args.kwargs.get("service_account_user"),
"delegated@example.com",
)
if __name__ == "__main__":
unittest.main(verbosity=2)