diff --git a/CHANGELOG.md b/CHANGELOG.md index d68c0ac..8aa3888 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,15 @@ +4.2.1 +----- + +- Fix bug where `parsedmarc` would always try to save to Elastic search, +even if only `--hec` was used +- Add options to save reports as a Kafka topic (mikesiegel - #21) + 4.2.0 ------ - Save each aggregate report record as a separate Splunk event -- Fix IMAP delete action (issue # 20) +- Fix IMAP delete action (#20) - Suppress Splunk SSL validation warnings - Change default logging level to `WARNING` diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 72e1ad6..0ee8dd6 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1216,7 +1216,7 @@ def get_dmarc_reports_from_inbox(host=None, yield l[i:i + n] if delete and test: - raise ValueError("delete and test options are mutually exclusive") + raise ValueError("--delete and --test options are mutually exclusive") if connection is None and (user is None or password is None): raise ValueError("Must supply a connection, or a username and " diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 148e572..c9f73b2 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -30,9 +30,9 @@ def _main(): print(output_str) if args.kafka_hosts: try: - kafkaClient = kafkaclient.KafkaClient(args.kafka_hosts) - except Exception as error: - logger.error("Kafka Error: {0}".format(error.__str__())) + kafka_client = kafkaclient.KafkaClient(args.kafka_hosts) + except Exception as error_: + logger.error("Kafka Error: {0}".format(error_.__str__())) if args.save_aggregate: for report in reports_["aggregate_reports"]: try: @@ -47,7 +47,7 @@ def _main(): exit(1) try: if args.kafka_hosts: - kafkaClient.save_aggregate_reports_to_kafka( + kafka_client.save_aggregate_reports_to_kafka( report, kafka_aggregate_topic) except Exception as error_: logger.error("Kafka Error: {0}".format( @@ -73,9 +73,8 @@ def _main(): error_.__str__())) try: if args.kafka_hosts: - kafkaClient.save_forensic_reports_to_kafka( + kafka_client.save_forensic_reports_to_kafka( report, kafka_forensic_topic) - except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) @@ -152,10 +151,10 @@ def _main(): " or URLs") arg_parser.add_argument("--kafka-aggregate-topic", help="The Kafka topic to publish aggregate " - "reports to.") + "reports to", default="dmarc_aggregate") arg_parser.add_argument("--kafka-forensic_topic", help="The Kafka topic to publish forensic reports" - " to.") + " to", default="dmarc_forensic") arg_parser.add_argument("--save-aggregate", action="store_true", default=False, help="Save aggregate reports to search indexes") @@ -226,7 +225,7 @@ def _main(): es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix) if args.save_aggregate or args.save_forensic: - if (args.elasticsearch_host is None and args.hec + if (args.elasticsearch_host is None and args.hec is None and args.kafka_hosts is None): args.elasticsearch_host = ["localhost:9200"] try: @@ -250,14 +249,8 @@ def _main(): args.hec_index, verify=verify) - kafka_aggregate_topic = "dmarc_aggrregate" - kafka_forensic_topic = "dmarc_forensic" - - if args.kafka_aggregate_topic: - kafka_aggregate_topic = args.kafka_aggregate_topic - - if args.kafka_forensic_topic: - kafka_forensic_topic = args.kafka_forensic_topic + kafka_aggregate_topic = args.kafka_aggregate_topic + kafka_forensic_topic = args.kafka_forensic_topic file_paths = [] for file_path in args.file_path: @@ -333,7 +326,7 @@ def _main(): exit(1) if args.host and args.watch: - logger.info("Watching for email - Quit with ^c") + logger.info("Watching for email - Quit with ctrl-c") ssl = True if args.imap_no_ssl: ssl = False diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 7f48c64..985736f 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -1,11 +1,14 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- +import logging +import json + from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError -import json +logger = logging.getLogger("parsedmarc") + class KafkaError(RuntimeError): """Raised when a Kafka error occurs""" @@ -13,12 +16,12 @@ class KafkaError(RuntimeError): class KafkaClient(object): def __init__(self, kafka_hosts): try: - def serializer(v): lambda v: json.dumps(v).encode('utf-8') self.producer = KafkaProducer( - value_serializer=serializer, + value_serializer=lambda v: json.dumps(v).encode( + 'utf-8'), bootstrap_servers=kafka_hosts) except NoBrokersAvailable: - raise KafkaError("No Kafka brokers availabe") + raise KafkaError("No Kafka brokers available") def save_aggregate_reports_to_kafka(self, aggregate_reports, aggregate_topic): @@ -27,7 +30,8 @@ class KafkaClient(object): Args: aggregate_reports (list): A list of aggregate report dictionaries - to save to kafka + to save to Kafka + aggregate_topic (str): The name of the Kafka topic """ if type(aggregate_reports) == dict: @@ -37,10 +41,19 @@ class KafkaClient(object): return try: + logger.debug("Saving aggregate reports to Kafka") self.producer.send(aggregate_topic, aggregate_reports) except UnknownTopicOrPartitionError: - raise KafkaError("Unknown topic or partition on broker") - self.producer.flush() + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): """ @@ -48,7 +61,8 @@ class KafkaClient(object): Args: forensic_reports (list): A list of forensic report dicts - to save to kafka + to save to Kafka + forensic_topic (str): The name of the Kafka topic """ if type(forensic_reports) == dict: @@ -58,7 +72,16 @@ class KafkaClient(object): return try: + logger.debug("Saving forensic reports to Kafka") self.producer.send(forensic_topic, forensic_reports) except UnknownTopicOrPartitionError: - raise KafkaError("Unknown topic or partition on broker") - self.producer.flush() + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) diff --git a/requirements.txt b/requirements.txt index b3a4bc4..6d978e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,10 +10,11 @@ mail-parser dateparser elasticsearch>=6.3.0,<7.0.0 elasticsearch-dsl>=6.2.1,<7.0.0 +kafka-python flake8 sphinx==1.7.9 sphinx_rtd_theme collective.checkdocs wheel rstcheck -kafka-python + diff --git a/setup.py b/setup.py index 8e6412f..17567fc 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ from setuptools import setup from codecs import open from os import path -__version__ = "4.2.0k" +__version__ = "4.2.1" description = "A Python package and CLI for parsing aggregate and " \ "forensic DMARC reports" @@ -94,7 +94,7 @@ setup( install_requires=['dnspython', 'publicsuffix', 'xmltodict', 'geoip2', 'urllib3', 'requests', 'imapclient', 'mail-parser', 'dateparser', 'elasticsearch>=6.3.0,<7.0.0', - 'elasticsearch-dsl>=6.2.1,<7.0.0' + 'elasticsearch-dsl>=6.2.1,<7.0.0', 'kafka-python' ], entry_points={