diff --git a/docs/source/usage.md b/docs/source/usage.md index 6287a98..c97c872 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -203,7 +203,7 @@ The full set of configuration options are: - `password` - str: The IMAP password - `msgraph` - `auth_method` - str: Authentication method, valid types are - `UsernamePassword`, `DeviceCode`, or `ClientSecret` + `UsernamePassword`, `DeviceCode`, `ClientSecret`, or `Certificate` (Default: `UsernamePassword`). - `user` - str: The M365 user, required when the auth method is UsernamePassword @@ -211,6 +211,11 @@ The full set of configuration options are: method is UsernamePassword - `client_id` - str: The app registration's client ID - `client_secret` - str: The app registration's secret + - `certificate_path` - str: Path to a PEM or PKCS12 certificate + including the private key. Required when the auth method is + `Certificate` + - `certificate_password` - str: Optional password for the + certificate file when using `Certificate` auth - `tenant_id` - str: The Azure AD tenant ID. This is required for all auth methods except UsernamePassword. - `mailbox` - str: The mailbox name. This defaults to the @@ -248,6 +253,9 @@ The full set of configuration options are: -Description "Restrict access to dmarc reports mailbox." ``` + The same application permission and mailbox scoping guidance + applies to the `Certificate` auth method. + ::: - `elasticsearch` - `hosts` - str: A comma separated list of hostnames and ports diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index b285c6d..8ea38cf 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -644,6 +644,8 @@ def _main(): graph_password=None, graph_client_id=None, graph_client_secret=None, + graph_certificate_path=None, + graph_certificate_password=None, graph_tenant_id=None, graph_mailbox=None, graph_allow_unencrypted_storage=False, @@ -1012,6 +1014,19 @@ def _main(): ) exit(-1) + if opts.graph_auth_method == AuthMethod.Certificate.name: + if "certificate_path" in graph_config: + opts.graph_certificate_path = graph_config["certificate_path"] + else: + logger.critical( + "certificate_path setting missing from the msgraph config section" + ) + exit(-1) + if "certificate_password" in graph_config: + opts.graph_certificate_password = graph_config[ + "certificate_password" + ] + if "client_id" in graph_config: opts.graph_client_id = graph_config["client_id"] else: @@ -1748,6 +1763,8 @@ def _main(): tenant_id=opts.graph_tenant_id, client_id=opts.graph_client_id, client_secret=opts.graph_client_secret, + certificate_path=opts.graph_certificate_path, + certificate_password=opts.graph_certificate_password, username=opts.graph_user, password=opts.graph_password, token_file=opts.graph_token_file, diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index 9595a85..7713d45 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -12,6 +12,7 @@ from azure.identity import ( UsernamePasswordCredential, DeviceCodeCredential, ClientSecretCredential, + CertificateCredential, TokenCachePersistenceOptions, AuthenticationRecord, ) @@ -29,6 +30,7 @@ class AuthMethod(Enum): DeviceCode = 1 UsernamePassword = 2 ClientSecret = 3 + Certificate = 4 def _get_cache_args(token_path: Path, allow_unencrypted_storage): @@ -87,6 +89,13 @@ def _generate_credential(auth_method: str, token_path: Path, **kwargs): tenant_id=kwargs["tenant_id"], client_secret=kwargs["client_secret"], ) + elif auth_method == AuthMethod.Certificate.name: + credential = CertificateCredential( + client_id=kwargs["client_id"], + tenant_id=kwargs["tenant_id"], + certificate_path=kwargs["certificate_path"], + password=kwargs.get("certificate_password"), + ) else: raise RuntimeError(f"Auth method {auth_method} not found") return credential @@ -114,12 +123,16 @@ class MSGraphConnection(MailboxConnection): tenant_id: str, token_file: str, allow_unencrypted_storage: bool, + certificate_path: Optional[str] = None, + certificate_password: Optional[Union[str, bytes]] = None, ): token_path = Path(token_file) credential = _generate_credential( auth_method, client_id=client_id, client_secret=client_secret, + certificate_path=certificate_path, + certificate_password=certificate_password, username=username, password=password, tenant_id=tenant_id, @@ -130,7 +143,7 @@ class MSGraphConnection(MailboxConnection): "credential": credential, "cloud": graph_url, } - if not isinstance(credential, ClientSecretCredential): + if not isinstance(credential, (ClientSecretCredential, CertificateCredential)): scopes = ["Mail.ReadWrite"] # Detect if mailbox is shared if mailbox and username != mailbox: diff --git a/tests.py b/tests.py index 5941070..6807477 100755 --- a/tests.py +++ b/tests.py @@ -815,6 +815,31 @@ class TestGraphConnection(unittest.TestCase): client_id="cid", tenant_id="tenant", client_secret="secret" ) + def testGenerateCredentialCertificate(self): + fake_credential = object() + with patch.object( + graph_module, "CertificateCredential", return_value=fake_credential + ) as mocked: + result = _generate_credential( + graph_module.AuthMethod.Certificate.name, + Path("/tmp/token"), + client_id="cid", + client_secret="secret", + certificate_path="/tmp/cert.pem", + certificate_password="secret-pass", + username="user", + password="pass", + tenant_id="tenant", + allow_unencrypted_storage=False, + ) + self.assertIs(result, fake_credential) + mocked.assert_called_once_with( + client_id="cid", + tenant_id="tenant", + certificate_path="/tmp/cert.pem", + password="secret-pass", + ) + def testInitUsesSharedMailboxScopes(self): class FakeCredential: def __init__(self): @@ -847,6 +872,35 @@ class TestGraphConnection(unittest.TestCase): graph_client.call_args.kwargs.get("scopes"), ["Mail.ReadWrite.Shared"] ) + def testInitCertificateAuthSkipsInteractiveAuthenticate(self): + class DummyCertificateCredential: + pass + + fake_credential = DummyCertificateCredential() + with patch.object(graph_module, "CertificateCredential", DummyCertificateCredential): + with patch.object( + graph_module, "_generate_credential", return_value=fake_credential + ): + with patch.object(graph_module, "_cache_auth_record") as cache_auth: + with patch.object(graph_module, "GraphClient") as graph_client: + MSGraphConnection( + auth_method=graph_module.AuthMethod.Certificate.name, + mailbox="shared@example.com", + graph_url="https://graph.microsoft.com", + client_id="cid", + client_secret=None, + certificate_path="/tmp/cert.pem", + certificate_password="secret-pass", + username=None, + password=None, + tenant_id="tenant", + token_file="/tmp/token-file", + allow_unencrypted_storage=False, + ) + cache_auth.assert_not_called() + graph_client.assert_called_once() + self.assertNotIn("scopes", graph_client.call_args.kwargs) + def testCreateFolderAndMoveErrors(self): connection = MSGraphConnection.__new__(MSGraphConnection) connection.mailbox_name = "mailbox@example.com" @@ -1214,6 +1268,49 @@ since = 2d self.assertEqual(system_exit.exception.code, 1) self.assertEqual(mock_watch_inbox.call_args.kwargs.get("since"), "2d") + @patch("parsedmarc.cli.get_dmarc_reports_from_mailbox") + @patch("parsedmarc.cli.MSGraphConnection") + def testCliPassesMsGraphCertificateAuthSettings( + self, mock_graph_connection, mock_get_mailbox_reports + ): + mock_graph_connection.return_value = object() + mock_get_mailbox_reports.return_value = { + "aggregate_reports": [], + "forensic_reports": [], + "smtp_tls_reports": [], + } + + config_text = """[general] +silent = true + +[msgraph] +auth_method = Certificate +client_id = client-id +tenant_id = tenant-id +mailbox = shared@example.com +certificate_path = /tmp/msgraph-cert.pem +certificate_password = cert-pass +""" + + with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg: + cfg.write(config_text) + cfg_path = cfg.name + self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path)) + + with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]): + parsedmarc.cli._main() + + self.assertEqual( + mock_graph_connection.call_args.kwargs.get("auth_method"), "Certificate" + ) + self.assertEqual( + mock_graph_connection.call_args.kwargs.get("certificate_path"), + "/tmp/msgraph-cert.pem", + ) + self.assertEqual( + mock_graph_connection.call_args.kwargs.get("certificate_password"), + "cert-pass", + ) class _FakeGraphClient: def get(self, url, params=None): if "/mailFolders/inbox?$select=id,displayName" in url: