diff --git a/docs/source/index.md b/docs/source/index.md index 0b82ff5..1b27df2 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -351,6 +351,19 @@ The full set of configuration options are: (Default: `https://www.googleapis.com/auth/gmail.modify`) - `oauth2_port` - int: The TCP port for the local server to listen on for the OAuth2 response (Default: 8080) +- `log_analytics` + - `client_id` - str: The app registration's client ID + - `client_secret` - str: The app registraton's client secret + - `tenant_id` - str: The tenant id where the app registration resides + - `dce` - str: The Data Collection Endpoint (DCE). Example: `https://{DCE-NAME}.{REGION}.ingest.monitor.azure.com`. + - `dcr_immutable_id` - str: The immutable ID of the Data Collection Rule (DCR) + - `dcr_aggregate_stream` - str: The stream name for aggregate reports in the DCR + - `dcr_forensic_stream` - str: The stream name for the forensic reports in the DCR + + :::{note} + Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). + ::: + :::{warning} It is **strongly recommended** to **not** use the `nameservers` diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 578de06..42dae36 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -20,7 +20,7 @@ from tqdm import tqdm from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \ parse_report_file, get_dmarc_reports_from_mbox, elastic, kafkaclient, \ splunk, save_output, email_results, ParserError, __version__, \ - InvalidDMARCReport, s3, syslog + InvalidDMARCReport, s3, syslog, loganalytics from parsedmarc.mail import IMAPConnection, MSGraphConnection, GmailConnection from parsedmarc.mail.graph import AuthMethod @@ -170,6 +170,29 @@ def _main(): forensic_reports_) except splunk.SplunkError as e: logger.error("Splunk HEC error: {0}".format(e.__str__())) + if opts.la_dce: + try: + la_client = loganalytics.LogAnalyticsClient( + client_id=opts.la_client_id, + client_secret=opts.la_client_secret, + tenant_id=opts.la_tenant_id, + dce=opts.la_dce, + dcr_immutable_id=opts.la_dcr_immutable_id, + dcr_aggregate_stream=opts.la_dcr_aggregate_stream, + dcr_forensic_stream=opts.la_dcr_forensic_stream + ) + la_client.publish_results( + reports_, + opts.save_aggregate, + opts.save_forensic) + except loganalytics.LogAnalyticsException as e: + logger.error("Log Analytics error: {0}".format(e.__str__())) + except Exception as e: + logger.error( + "Unknown error occured" + + " during the publishing" + + " to Log Analitics: " + + e.__str__()) arg_parser = ArgumentParser(description="Parses DMARC reports") arg_parser.add_argument("-c", "--config-file", @@ -313,7 +336,14 @@ def _main(): log_file=args.log_file, n_procs=1, chunk_size=1, - ip_db_path=None + ip_db_path=None, + la_client_id=None, + la_client_secret=None, + la_tenant_id=None, + la_dce=None, + la_dcr_immutable_id=None, + la_dcr_aggregate_stream=None, + la_dcr_forensic_stream=None ) args = arg_parser.parse_args() @@ -721,6 +751,22 @@ def _main(): if "oauth2_port" in gmail_api_config: opts.gmail_api_oauth2_port = \ gmail_api_config.get("oauth2_port", 8080) + if "log_analytics" in config.sections(): + log_analytics_config = config["log_analytics"] + opts.la_client_id = \ + log_analytics_config.get("client_id") + opts.la_client_secret = \ + log_analytics_config.get("client_secret") + opts.la_tenant_id = \ + log_analytics_config.get("tenant_id") + opts.la_dce = \ + log_analytics_config.get("dce") + opts.la_dcr_immutable_id = \ + log_analytics_config.get("dcr_immutable_id") + opts.la_dcr_aggregate_stream = \ + log_analytics_config.get("dcr_aggregate_stream") + opts.la_dcr_forensic_stream = \ + log_analytics_config.get("dcr_forensic_stream") logger.setLevel(logging.ERROR) diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py new file mode 100644 index 0000000..9ca1496 --- /dev/null +++ b/parsedmarc/loganalytics.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +from parsedmarc.log import logger +from azure.core.exceptions import HttpResponseError +from azure.identity import ClientSecretCredential +from azure.monitor.ingestion import LogsIngestionClient + + +class LogAnalyticsException(Exception): + """Raised when an Elasticsearch error occurs""" + + +class LogAnalyticsConfig(): + """ + The LogAnalyticsConfig class is used to define the configuration + for the Log Analytics Client. + + Properties: + client_id (str): + The client ID of the service principle. + client_secret (str): + The client secret of the service principle. + tenant_id (str): + The tenant ID where + the service principle resides. + dce (str): + The Data Collection Endpoint (DCE) + used by the Data Collection Rule (DCR). + dcr_immutable_id (str): + The immutable ID of + the Data Collection Rule (DCR). + dcr_aggregate_stream (str): + The Stream name where + the Aggregate DMARC reports + need to be pushed. + dcr_forensic_stream (str): + The Stream name where + the Forensic DMARC reports + need to be pushed. + """ + def __init__( + self, + client_id: str, + client_secret: str, + tenant_id: str, + dce: str, + dcr_immutable_id: str, + dcr_aggregate_stream: str, + dcr_forensic_stream: str): + self.client_id = client_id + self.client_secret = client_secret + self.tenant_id = tenant_id + self.dce = dce + self.dcr_immutable_id = dcr_immutable_id + self.dcr_aggregate_stream = dcr_aggregate_stream + self.dcr_forensic_stream = dcr_forensic_stream + + +class LogAnalyticsClient(object): + """ + The LogAnalyticsClient is used to push + the generated DMARC reports to Log Analytics + via Data Collection Rules. + """ + def __init__( + self, + client_id: str, + client_secret: str, + tenant_id: str, + dce: str, + dcr_immutable_id: str, + dcr_aggregate_stream: str, + dcr_forensic_stream: str): + self.conf = LogAnalyticsConfig( + client_id=client_id, + client_secret=client_secret, + tenant_id=tenant_id, + dce=dce, + dcr_immutable_id=dcr_immutable_id, + dcr_aggregate_stream=dcr_aggregate_stream, + dcr_forensic_stream=dcr_forensic_stream + ) + if ( + not self.conf.client_id or + not self.conf.client_secret or + not self.conf.tenant_id or + not self.conf.dce or + not self.conf.dcr_immutable_id): + raise LogAnalyticsException( + "Invalid configuration. " + + "One or more required settings are missing.") + + def publish_json( + self, + results, + logs_client: LogsIngestionClient, + dcr_stream: str): + """ + Background function to publish given + DMARC reprot to specific Data Collection Rule. + + Args: + results (list): + The results generated by parsedmarc. + logs_client (LogsIngestionClient): + The client used to send the DMARC reports. + dcr_stream (str): + The stream name where the DMARC reports needs to be pushed. + """ + try: + logs_client.upload(self.conf.dcr_immutable_id, dcr_stream, results) + except HttpResponseError as e: + raise LogAnalyticsException( + "Upload failed: {error}" + .format(error=e)) + + def publish_results( + self, + results, + save_aggregate: bool, + save_forensic: bool): + """ + Function to publish DMARC reports to Log Analytics + via Data Collection Rules (DCR). + Look below for docs: + https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + + Args: + results (list): + The DMARC reports (Aggregate & Forensic) + save_aggregate (bool): + Whether Aggregate reports can be saved into Log Analytics + save_forensic (bool): + Whether Forensic reports can be saved into Log Analytics + """ + conf = self.conf + credential = ClientSecretCredential( + tenant_id=conf.tenant_id, + client_id=conf.client_id, + client_secret=conf.client_secret + ) + logs_client = LogsIngestionClient(conf.dce, credential=credential) + if ( + results['aggregate_reports'] and + conf.dcr_aggregate_stream and + len(results['aggregate_reports']) > 0 and + save_aggregate): + logger.info("Publishing aggregate reports.") + self.publish_json( + results['aggregate_reports'], + logs_client, + conf.dcr_aggregate_stream) + logger.info("Successfully pushed aggregate reports.") + if ( + results['forensic_reports'] and + conf.dcr_forensic_stream and + len(results['forensic_reports']) > 0 and + save_forensic): + logger.info("Publishing forensic reports.") + self.publish_json( + results['forensic_reports'], + logs_client, + conf.dcr_forensic_stream) + logger.info("Successfully pushed forensic reports.") diff --git a/pyproject.toml b/pyproject.toml index 96e1d36..6b5fb7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ classifiers = [ ] dependencies = [ "azure-identity>=1.8.0", + "azure-monitor-ingestion>=1.0.0", "boto3>=1.16.63", "dateparser>=1.1.1", "dnspython>=2.0.0", diff --git a/requirements.txt b/requirements.txt index 6aa66ab..57f8d07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ lxml>=4.4.0 boto3>=1.16.63 msgraph-core>=0.2.2 azure-identity>=1.8.0 +azure-monitor-ingestion>=1.0.0 google-api-core>=2.4.0 google-api-python-client>=2.35.0 google-auth>=2.3.3