|
|
|
|
@@ -298,7 +298,9 @@ authentication_type = awssigv4
|
|
|
|
|
aws_region = eu-west-1
|
|
|
|
|
aws_service = aoss
|
|
|
|
|
"""
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
"w", suffix=".ini", delete=False
|
|
|
|
|
) as config_file:
|
|
|
|
|
config_file.write(config)
|
|
|
|
|
config_path = config_file.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
|
@@ -347,7 +349,9 @@ password = test-password
|
|
|
|
|
[elasticsearch]
|
|
|
|
|
hosts = localhost
|
|
|
|
|
"""
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
"w", suffix=".ini", delete=False
|
|
|
|
|
) as config_file:
|
|
|
|
|
config_file.write(config)
|
|
|
|
|
config_path = config_file.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
|
@@ -395,7 +399,9 @@ password = test-password
|
|
|
|
|
[elasticsearch]
|
|
|
|
|
hosts = localhost
|
|
|
|
|
"""
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
"w", suffix=".ini", delete=False
|
|
|
|
|
) as config_file:
|
|
|
|
|
config_file.write(config)
|
|
|
|
|
config_path = config_file.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
|
@@ -435,8 +441,8 @@ hosts = localhost
|
|
|
|
|
mock_save_aggregate.side_effect = parsedmarc.elastic.ElasticsearchError(
|
|
|
|
|
"aggregate sink failed"
|
|
|
|
|
)
|
|
|
|
|
mock_save_forensic_opensearch.side_effect = parsedmarc.cli.opensearch.OpenSearchError(
|
|
|
|
|
"forensic sink failed"
|
|
|
|
|
mock_save_forensic_opensearch.side_effect = (
|
|
|
|
|
parsedmarc.cli.opensearch.OpenSearchError("forensic sink failed")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
config = """[general]
|
|
|
|
|
@@ -456,7 +462,9 @@ hosts = localhost
|
|
|
|
|
[opensearch]
|
|
|
|
|
hosts = localhost
|
|
|
|
|
"""
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as config_file:
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
"w", suffix=".ini", delete=False
|
|
|
|
|
) as config_file:
|
|
|
|
|
config_file.write(config)
|
|
|
|
|
config_path = config_file.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(config_path) and os.remove(config_path))
|
|
|
|
|
@@ -555,9 +563,7 @@ class TestGmailConnection(unittest.TestCase):
|
|
|
|
|
"from_authorized_user_file",
|
|
|
|
|
return_value=creds,
|
|
|
|
|
):
|
|
|
|
|
returned = _get_creds(
|
|
|
|
|
token_path, "credentials.json", ["scope"], 8080
|
|
|
|
|
)
|
|
|
|
|
returned = _get_creds(token_path, "credentials.json", ["scope"], 8080)
|
|
|
|
|
finally:
|
|
|
|
|
os.remove(token_path)
|
|
|
|
|
self.assertEqual(returned, creds)
|
|
|
|
|
@@ -611,9 +617,7 @@ class TestGmailConnection(unittest.TestCase):
|
|
|
|
|
"from_authorized_user_file",
|
|
|
|
|
return_value=expired_creds,
|
|
|
|
|
):
|
|
|
|
|
returned = _get_creds(
|
|
|
|
|
token_path, "credentials.json", ["scope"], 8080
|
|
|
|
|
)
|
|
|
|
|
returned = _get_creds(token_path, "credentials.json", ["scope"], 8080)
|
|
|
|
|
finally:
|
|
|
|
|
os.remove(token_path)
|
|
|
|
|
|
|
|
|
|
@@ -671,7 +675,9 @@ class TestGraphConnection(unittest.TestCase):
|
|
|
|
|
with patch.object(graph_module, "sleep") as mocked_sleep:
|
|
|
|
|
messages = connection._get_all_messages("/url", batch_size=0, since=None)
|
|
|
|
|
self.assertEqual([msg["id"] for msg in messages], ["1"])
|
|
|
|
|
mocked_sleep.assert_called_once_with(graph_module.GRAPH_REQUEST_RETRY_DELAY_SECONDS)
|
|
|
|
|
mocked_sleep.assert_called_once_with(
|
|
|
|
|
graph_module.GRAPH_REQUEST_RETRY_DELAY_SECONDS
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def testGetAllMessagesRaisesAfterRetryExhaustion(self):
|
|
|
|
|
connection = MSGraphConnection.__new__(MSGraphConnection)
|
|
|
|
|
@@ -715,7 +721,9 @@ class TestGraphConnection(unittest.TestCase):
|
|
|
|
|
def testFetchMessagesPassesSinceAndBatchSize(self):
|
|
|
|
|
connection = MSGraphConnection.__new__(MSGraphConnection)
|
|
|
|
|
connection.mailbox_name = "mailbox@example.com"
|
|
|
|
|
connection._find_folder_id_from_folder_path = MagicMock(return_value="folder-id")
|
|
|
|
|
connection._find_folder_id_from_folder_path = MagicMock(
|
|
|
|
|
return_value="folder-id"
|
|
|
|
|
)
|
|
|
|
|
connection._get_all_messages = MagicMock(return_value=[{"id": "1"}])
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
connection.fetch_messages("Inbox", since="2026-03-01", batch_size=5), ["1"]
|
|
|
|
|
@@ -776,7 +784,9 @@ class TestGraphConnection(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
def testGenerateCredentialDeviceCode(self):
|
|
|
|
|
fake_credential = object()
|
|
|
|
|
with patch.object(graph_module, "_get_cache_args", return_value={"cached": True}):
|
|
|
|
|
with patch.object(
|
|
|
|
|
graph_module, "_get_cache_args", return_value={"cached": True}
|
|
|
|
|
):
|
|
|
|
|
with patch.object(
|
|
|
|
|
graph_module,
|
|
|
|
|
"DeviceCodeCredential",
|
|
|
|
|
@@ -916,14 +926,18 @@ class TestGraphConnection(unittest.TestCase):
|
|
|
|
|
fake_credential.authenticate.assert_called_once_with(scopes=["Mail.ReadWrite"])
|
|
|
|
|
cache_auth.assert_called_once()
|
|
|
|
|
graph_client.assert_called_once()
|
|
|
|
|
self.assertEqual(graph_client.call_args.kwargs.get("scopes"), ["Mail.ReadWrite"])
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
graph_client.call_args.kwargs.get("scopes"), ["Mail.ReadWrite"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def testInitCertificateAuthSkipsInteractiveAuthenticate(self):
|
|
|
|
|
class DummyCertificateCredential:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
fake_credential = DummyCertificateCredential()
|
|
|
|
|
with patch.object(graph_module, "CertificateCredential", DummyCertificateCredential):
|
|
|
|
|
with patch.object(
|
|
|
|
|
graph_module, "CertificateCredential", DummyCertificateCredential
|
|
|
|
|
):
|
|
|
|
|
with patch.object(
|
|
|
|
|
graph_module, "_generate_credential", return_value=fake_credential
|
|
|
|
|
):
|
|
|
|
|
@@ -1023,8 +1037,12 @@ class TestImapConnection(unittest.TestCase):
|
|
|
|
|
with self.assertRaises(_BreakLoop):
|
|
|
|
|
connection.watch(callback, check_timeout=1)
|
|
|
|
|
callback.assert_called_once_with(connection)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestGmailAuthModes(unittest.TestCase):
|
|
|
|
|
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
|
|
|
|
|
@patch(
|
|
|
|
|
"parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file"
|
|
|
|
|
)
|
|
|
|
|
def testGetCredsServiceAccountWithoutSubject(self, mock_from_service_account_file):
|
|
|
|
|
service_creds = MagicMock()
|
|
|
|
|
service_creds.with_subject.return_value = MagicMock()
|
|
|
|
|
@@ -1046,7 +1064,9 @@ class TestGmailAuthModes(unittest.TestCase):
|
|
|
|
|
)
|
|
|
|
|
service_creds.with_subject.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file")
|
|
|
|
|
@patch(
|
|
|
|
|
"parsedmarc.mail.gmail.service_account.Credentials.from_service_account_file"
|
|
|
|
|
)
|
|
|
|
|
def testGetCredsServiceAccountWithSubject(self, mock_from_service_account_file):
|
|
|
|
|
base_creds = MagicMock()
|
|
|
|
|
delegated_creds = MagicMock()
|
|
|
|
|
@@ -1252,6 +1272,7 @@ class TestImapFallbacks(unittest.TestCase):
|
|
|
|
|
connection.move_message(99, "Archive")
|
|
|
|
|
delete_mock.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMailboxWatchSince(unittest.TestCase):
|
|
|
|
|
def testWatchInboxPassesSinceToMailboxFetch(self):
|
|
|
|
|
mailbox_connection = SimpleNamespace()
|
|
|
|
|
@@ -1353,6 +1374,7 @@ class TestMailboxPerformance(unittest.TestCase):
|
|
|
|
|
create_folders=False,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(len(connection.fetch_calls), 1)
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
def testCliPassesMsGraphCertificateAuthSettings(
|
|
|
|
|
@@ -1429,6 +1451,79 @@ mailbox = shared@example.com
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
def testCliUsesMsGraphUserAsMailboxForUsernamePasswordAuth(
|
|
|
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
mock_graph_connection.return_value = object()
|
|
|
|
|
mock_get_mailbox_reports.return_value = {
|
|
|
|
|
"aggregate_reports": [],
|
|
|
|
|
"forensic_reports": [],
|
|
|
|
|
"smtp_tls_reports": [],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = UsernamePassword
|
|
|
|
|
client_id = client-id
|
|
|
|
|
client_secret = client-secret
|
|
|
|
|
user = owner@example.com
|
|
|
|
|
password = test-password
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
|
|
|
"owner@example.com",
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("username"),
|
|
|
|
|
"owner@example.com",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphPasswordForUsernamePasswordAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = UsernamePassword
|
|
|
|
|
client_id = client-id
|
|
|
|
|
client_secret = client-secret
|
|
|
|
|
user = owner@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"password setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeGraphClient:
|
|
|
|
|
def get(self, url, params=None):
|
|
|
|
|
if "/mailFolders/inbox?$select=id,displayName" in url:
|
|
|
|
|
@@ -1467,15 +1562,14 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
|
|
|
|
connection._request_with_retries = MagicMock(
|
|
|
|
|
side_effect=lambda method_name, *args, **kwargs: getattr(
|
|
|
|
|
connection._client, method_name
|
|
|
|
|
)(
|
|
|
|
|
*args, **kwargs
|
|
|
|
|
)
|
|
|
|
|
)(*args, **kwargs)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
folder_id = connection._find_folder_id_with_parent("Inbox", None)
|
|
|
|
|
self.assertEqual(folder_id, "inbox-id")
|
|
|
|
|
connection._request_with_retries.assert_any_call(
|
|
|
|
|
"get", "/users/shared@example.com/mailFolders?$filter=displayName eq 'Inbox'"
|
|
|
|
|
"get",
|
|
|
|
|
"/users/shared@example.com/mailFolders?$filter=displayName eq 'Inbox'",
|
|
|
|
|
)
|
|
|
|
|
connection._request_with_retries.assert_any_call(
|
|
|
|
|
"get", "/users/shared@example.com/mailFolders/inbox?$select=id,displayName"
|
|
|
|
|
@@ -1488,9 +1582,7 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
|
|
|
|
connection._request_with_retries = MagicMock(
|
|
|
|
|
side_effect=lambda method_name, *args, **kwargs: getattr(
|
|
|
|
|
connection._client, method_name
|
|
|
|
|
)(
|
|
|
|
|
*args, **kwargs
|
|
|
|
|
)
|
|
|
|
|
)(*args, **kwargs)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(RuntimeWarning):
|
|
|
|
|
@@ -1509,5 +1601,314 @@ class TestMSGraphFolderFallback(unittest.TestCase):
|
|
|
|
|
connection._get_well_known_folder_id.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMSGraphCliValidation(unittest.TestCase):
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
def testCliPassesMsGraphClientSecretAuthSettings(
|
|
|
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
mock_graph_connection.return_value = object()
|
|
|
|
|
mock_get_mailbox_reports.return_value = {
|
|
|
|
|
"aggregate_reports": [],
|
|
|
|
|
"forensic_reports": [],
|
|
|
|
|
"smtp_tls_reports": [],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = ClientSecret
|
|
|
|
|
client_id = client-id
|
|
|
|
|
client_secret = client-secret
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("auth_method"), "ClientSecret"
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("client_secret"),
|
|
|
|
|
"client-secret",
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("tenant_id"), "tenant-id"
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
|
|
|
"shared@example.com",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphClientSecretForClientSecretAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = ClientSecret
|
|
|
|
|
client_id = client-id
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"client_secret setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphTenantIdForClientSecretAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = ClientSecret
|
|
|
|
|
client_id = client-id
|
|
|
|
|
client_secret = client-secret
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"tenant_id setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphMailboxForClientSecretAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = ClientSecret
|
|
|
|
|
client_id = client-id
|
|
|
|
|
client_secret = client-secret
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"mailbox setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
def testCliAllowsMsGraphDeviceCodeWithoutUser(
|
|
|
|
|
self, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
mock_graph_connection.return_value = object()
|
|
|
|
|
mock_get_mailbox_reports.return_value = {
|
|
|
|
|
"aggregate_reports": [],
|
|
|
|
|
"forensic_reports": [],
|
|
|
|
|
"smtp_tls_reports": [],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = DeviceCode
|
|
|
|
|
client_id = client-id
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("auth_method"), "DeviceCode"
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
mock_graph_connection.call_args.kwargs.get("mailbox"),
|
|
|
|
|
"shared@example.com",
|
|
|
|
|
)
|
|
|
|
|
self.assertIsNone(mock_graph_connection.call_args.kwargs.get("username"))
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphTenantIdForDeviceCodeAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = DeviceCode
|
|
|
|
|
client_id = client-id
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"tenant_id setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphMailboxForDeviceCodeAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = DeviceCode
|
|
|
|
|
client_id = client-id
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"mailbox setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphTenantIdForCertificateAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = Certificate
|
|
|
|
|
client_id = client-id
|
|
|
|
|
mailbox = shared@example.com
|
|
|
|
|
certificate_path = /tmp/msgraph-cert.pem
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"tenant_id setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
@patch("parsedmarc.cli.get_dmarc_reports_from_mailbox")
|
|
|
|
|
@patch("parsedmarc.cli.MSGraphConnection")
|
|
|
|
|
@patch("parsedmarc.cli.logger")
|
|
|
|
|
def testCliRequiresMsGraphMailboxForCertificateAuth(
|
|
|
|
|
self, mock_logger, mock_graph_connection, mock_get_mailbox_reports
|
|
|
|
|
):
|
|
|
|
|
config_text = """[general]
|
|
|
|
|
silent = true
|
|
|
|
|
|
|
|
|
|
[msgraph]
|
|
|
|
|
auth_method = Certificate
|
|
|
|
|
client_id = client-id
|
|
|
|
|
tenant_id = tenant-id
|
|
|
|
|
certificate_path = /tmp/msgraph-cert.pem
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".ini", delete=False) as cfg:
|
|
|
|
|
cfg.write(config_text)
|
|
|
|
|
cfg_path = cfg.name
|
|
|
|
|
self.addCleanup(lambda: os.path.exists(cfg_path) and os.remove(cfg_path))
|
|
|
|
|
|
|
|
|
|
with patch.object(sys, "argv", ["parsedmarc", "-c", cfg_path]):
|
|
|
|
|
with self.assertRaises(SystemExit) as system_exit:
|
|
|
|
|
parsedmarc.cli._main()
|
|
|
|
|
|
|
|
|
|
self.assertEqual(system_exit.exception.code, -1)
|
|
|
|
|
mock_logger.critical.assert_called_once_with(
|
|
|
|
|
"mailbox setting missing from the msgraph config section"
|
|
|
|
|
)
|
|
|
|
|
mock_graph_connection.assert_not_called()
|
|
|
|
|
mock_get_mailbox_reports.assert_not_called()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
unittest.main(verbosity=2)
|
|
|
|
|
|