Add comprehensive test coverage for Google SecOps module

- Added 9 new tests covering previously untested functionality
- Tests now cover: backward compatibility, deprecated parameters/functions, API client initialization, API endpoint generation, error handling, helper methods, detection fields structure, and IP enrichment
- All tests use mocking for API calls to avoid requiring actual credentials
- Total test count increased from 4 to 13 tests
- All tests passing with 100% success rate
This commit is contained in:
copilot-swe-agent[bot]
2026-06-04 02:03:47 +00:00
committed by GitHub
parent 67ebe48344
commit f1a91d75ad
+315
View File
@@ -5,7 +5,11 @@ SecOps (Chronicle) UDM format with proper event types, metadata, and field struc
"""
import json
import os
import tempfile
import unittest
import warnings
from unittest.mock import MagicMock, patch
import parsedmarc
@@ -166,6 +170,317 @@ class TestGoogleSecOps(unittest.TestCase):
found_count
), "failed_session_count should be in detection_fields"
def test_backward_compatibility_deprecated_parameters(self):
"""Test backward compatibility with deprecated parameter names."""
from parsedmarc.google_secops import GoogleSecOpsClient
from unittest.mock import patch
# Test include_ruf_payload -> include_failure_payload
with patch("parsedmarc.google_secops.logger.warning") as mock_warning:
client = GoogleSecOpsClient(include_ruf_payload=True, use_stdout=True)
assert mock_warning.call_count >= 1
assert any(
"include_ruf_payload is deprecated" in str(call)
for call in mock_warning.call_args_list
)
assert client.include_failure_payload is True
# Test ruf_payload_max_bytes -> failure_payload_max_bytes
with patch("parsedmarc.google_secops.logger.warning") as mock_warning:
client = GoogleSecOpsClient(ruf_payload_max_bytes=2048, use_stdout=True)
assert mock_warning.call_count >= 1
assert any(
"ruf_payload_max_bytes is deprecated" in str(call)
for call in mock_warning.call_args_list
)
assert client.failure_payload_max_bytes == 2048
# Test both deprecated parameters together
with patch("parsedmarc.google_secops.logger.warning") as mock_warning:
client = GoogleSecOpsClient(
include_ruf_payload=True, ruf_payload_max_bytes=2048, use_stdout=True
)
assert mock_warning.call_count >= 2
assert client.include_failure_payload is True
assert client.failure_payload_max_bytes == 2048
def test_deprecated_forensic_function_alias(self):
"""Test the deprecated save_forensic_report_to_google_secops alias."""
from parsedmarc.google_secops import GoogleSecOpsClient
from unittest.mock import patch
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/failure/dmarc_ruf_report_linkedin.eml"
parsed_file = parsedmarc.parse_report_file(sample_path)
parsed_report = parsed_file["report"]
# Test deprecated function with warning
with patch("parsedmarc.google_secops.logger.warning") as mock_warning:
events = client.save_forensic_report_to_google_secops(parsed_report)
assert mock_warning.call_count >= 1
assert any(
"save_forensic_report_to_google_secops is deprecated" in str(call)
for call in mock_warning.call_args_list
)
assert len(events) > 0
def test_api_client_initialization_error(self):
"""Test that API client initialization fails without required parameters."""
from parsedmarc.google_secops import GoogleSecOpsClient, GoogleSecOpsError
# Test missing credentials when use_stdout=False
with self.assertRaises(GoogleSecOpsError) as context:
GoogleSecOpsClient(use_stdout=False)
assert "api_credentials_file and api_customer_id are required" in str(
context.exception
)
# Test missing customer_id
with self.assertRaises(GoogleSecOpsError) as context:
GoogleSecOpsClient(api_credentials_file="/tmp/fake.json", use_stdout=False)
assert "api_credentials_file and api_customer_id are required" in str(
context.exception
)
def test_get_api_endpoint(self):
"""Test API endpoint URL generation."""
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
client.api_customer_id = "test-customer-123"
client.api_region = "us"
client.api_log_type = "DMARC"
endpoint = client._get_api_endpoint()
assert "us-chronicle.googleapis.com" in endpoint
assert "test-customer-123" in endpoint
assert "DMARC" in endpoint
# Test different region
client.api_region = "europe"
endpoint = client._get_api_endpoint()
assert "europe-chronicle.googleapis.com" in endpoint
def test_helper_methods(self):
"""Test helper methods for severity, description, and timestamp formatting."""
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
# Test _get_severity
assert client._get_severity("reject", False, False) == "HIGH"
assert client._get_severity("quarantine", False, False) == "MEDIUM"
assert client._get_severity("quarantine", True, False) == "LOW"
assert client._get_severity("none", False, False) == "LOW"
# Test _get_description - note the actual signature
desc = client._get_description(
dmarc_pass=False,
spf_result="pass",
dkim_result="fail",
spf_aligned=False,
dkim_aligned=False,
disposition="none",
)
assert "DMARC fail" in desc
assert "disposition=none" in desc
desc_pass = client._get_description(
dmarc_pass=True,
spf_result="pass",
dkim_result="pass",
spf_aligned=True,
dkim_aligned=True,
disposition="none",
)
assert "DMARC pass" in desc_pass
# Test _format_timestamp
timestamp = client._format_timestamp("2024-06-01 12:00:00")
assert timestamp == "2024-06-01T12:00:00+00:00"
# Test with timezone already present
timestamp_tz = client._format_timestamp("2024-06-01T12:00:00+00:00")
assert timestamp_tz == "2024-06-01T12:00:00+00:00"
def test_detection_fields_structure(self):
"""Test that detection_fields are properly structured in all event types."""
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
# Test aggregate report detection fields
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
parsed_file = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True
)
events = client.save_aggregate_report_to_google_secops(parsed_file["report"])
for event in events:
event_dict = json.loads(event)
detection_fields = event_dict["security_result"][0]["detection_fields"]
# Verify key fields are present
field_keys = [field["key"] for field in detection_fields]
assert "dmarc.disposition" in field_keys
assert "dmarc.policy" in field_keys
assert "dmarc.pass" in field_keys
assert "dmarc.spf_aligned" in field_keys
assert "dmarc.dkim_aligned" in field_keys
assert "dmarc.header_from" in field_keys
assert "dmarc.report_org" in field_keys
assert "dmarc.report_id" in field_keys
def test_ip_enrichment_fields(self):
"""Test that IP enrichment fields are included in detection_fields."""
from parsedmarc.google_secops import GoogleSecOpsClient
client = GoogleSecOpsClient(use_stdout=True)
sample_path = "samples/aggregate/example.net!example.com!1529366400!1529452799.xml"
parsed_file = parsedmarc.parse_report_file(
sample_path, always_use_local_files=True
)
events = client.save_aggregate_report_to_google_secops(parsed_file["report"])
for event in events:
event_dict = json.loads(event)
detection_fields = event_dict["security_result"][0]["detection_fields"]
field_keys = [field["key"] for field in detection_fields]
# Check for IP enrichment fields (if present in the sample data)
# These are optional but should be in detection_fields when present
if "dmarc.source_service_name" in field_keys:
# Verify it's properly structured
for field in detection_fields:
if field["key"] == "dmarc.source_service_name":
assert isinstance(field["value"], str)
@patch("parsedmarc.google_secops.service_account.Credentials")
@patch("parsedmarc.google_secops.requests.Session")
def test_api_event_submission(self, mock_session_class, mock_credentials_class):
"""Test event submission to Chronicle API."""
from parsedmarc.google_secops import GoogleSecOpsClient
# Create a temporary credentials file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as tmp_file:
json.dump(
{
"type": "service_account",
"project_id": "test-project",
"private_key_id": "key123",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----",
"client_email": "test@test-project.iam.gserviceaccount.com",
"client_id": "123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
},
tmp_file,
)
tmp_credentials = tmp_file.name
try:
# Mock credentials
mock_creds = MagicMock()
mock_creds.valid = True
mock_creds.token = "test-token"
mock_credentials_class.from_service_account_file.return_value = mock_creds
# Mock session and response
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.post.return_value = mock_response
mock_session_class.return_value = mock_session
# Create client with API mode
client = GoogleSecOpsClient(
api_credentials_file=tmp_credentials,
api_customer_id="test-customer",
api_region="us",
use_stdout=False,
)
# Send test events
test_events = ['{"test": "event1"}', '{"test": "event2"}']
client._send_events_to_api(test_events)
# Verify API was called
mock_session.post.assert_called_once()
call_args = mock_session.post.call_args
# Verify endpoint
assert "us-chronicle.googleapis.com" in call_args[0][0]
# Verify payload structure
payload = call_args[1]["json"]
assert "inline_source" in payload
assert "logs" in payload["inline_source"]
assert len(payload["inline_source"]["logs"]) == 2
finally:
os.unlink(tmp_credentials)
@patch("parsedmarc.google_secops.service_account.Credentials")
@patch("parsedmarc.google_secops.requests.Session")
def test_api_error_handling(self, mock_session_class, mock_credentials_class):
"""Test error handling when Chronicle API returns errors."""
from parsedmarc.google_secops import GoogleSecOpsClient, GoogleSecOpsError
# Create a temporary credentials file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as tmp_file:
json.dump(
{
"type": "service_account",
"project_id": "test-project",
"private_key_id": "key123",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----",
"client_email": "test@test-project.iam.gserviceaccount.com",
"client_id": "123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
},
tmp_file,
)
tmp_credentials = tmp_file.name
try:
# Mock credentials
mock_creds = MagicMock()
mock_creds.valid = True
mock_creds.token = "test-token"
mock_credentials_class.from_service_account_file.return_value = mock_creds
# Mock session and error response
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.text = "Invalid request"
mock_session.post.return_value = mock_response
mock_session_class.return_value = mock_session
# Create client with API mode
client = GoogleSecOpsClient(
api_credentials_file=tmp_credentials,
api_customer_id="test-customer",
use_stdout=False,
)
# Test error handling
test_events = ['{"test": "event"}']
with self.assertRaises(GoogleSecOpsError) as context:
client._send_events_to_api(test_events)
assert "Chronicle API error: 400" in str(context.exception)
finally:
os.unlink(tmp_credentials)
if __name__ == "__main__":
unittest.main()