4.2.1 - Bug fixes and Kafka support

This commit is contained in:
Sean Whalen
2018-10-10 20:33:17 -04:00
parent 524f9c0327
commit cff1cede46
6 changed files with 58 additions and 34 deletions

View File

@@ -1,8 +1,15 @@
4.2.1
-----
- Fix bug where `parsedmarc` would always try to save to Elastic search,
even if only `--hec` was used
- Add options to save reports as a Kafka topic (mikesiegel - #21)
4.2.0
------
- Save each aggregate report record as a separate Splunk event
- Fix IMAP delete action (issue # 20)
- Fix IMAP delete action (#20)
- Suppress Splunk SSL validation warnings
- Change default logging level to `WARNING`

View File

@@ -1216,7 +1216,7 @@ def get_dmarc_reports_from_inbox(host=None,
yield l[i:i + n]
if delete and test:
raise ValueError("delete and test options are mutually exclusive")
raise ValueError("--delete and --test options are mutually exclusive")
if connection is None and (user is None or password is None):
raise ValueError("Must supply a connection, or a username and "

View File

@@ -30,9 +30,9 @@ def _main():
print(output_str)
if args.kafka_hosts:
try:
kafkaClient = kafkaclient.KafkaClient(args.kafka_hosts)
except Exception as error:
logger.error("Kafka Error: {0}".format(error.__str__()))
kafka_client = kafkaclient.KafkaClient(args.kafka_hosts)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
if args.save_aggregate:
for report in reports_["aggregate_reports"]:
try:
@@ -47,7 +47,7 @@ def _main():
exit(1)
try:
if args.kafka_hosts:
kafkaClient.save_aggregate_reports_to_kafka(
kafka_client.save_aggregate_reports_to_kafka(
report, kafka_aggregate_topic)
except Exception as error_:
logger.error("Kafka Error: {0}".format(
@@ -73,9 +73,8 @@ def _main():
error_.__str__()))
try:
if args.kafka_hosts:
kafkaClient.save_forensic_reports_to_kafka(
kafka_client.save_forensic_reports_to_kafka(
report, kafka_forensic_topic)
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
@@ -152,10 +151,10 @@ def _main():
" or URLs")
arg_parser.add_argument("--kafka-aggregate-topic",
help="The Kafka topic to publish aggregate "
"reports to.")
"reports to", default="dmarc_aggregate")
arg_parser.add_argument("--kafka-forensic_topic",
help="The Kafka topic to publish forensic reports"
" to.")
" to", default="dmarc_forensic")
arg_parser.add_argument("--save-aggregate", action="store_true",
default=False,
help="Save aggregate reports to search indexes")
@@ -226,7 +225,7 @@ def _main():
es_forensic_index = "{0}_{1}".format(es_forensic_index, suffix)
if args.save_aggregate or args.save_forensic:
if (args.elasticsearch_host is None and args.hec
if (args.elasticsearch_host is None and args.hec is None
and args.kafka_hosts is None):
args.elasticsearch_host = ["localhost:9200"]
try:
@@ -250,14 +249,8 @@ def _main():
args.hec_index,
verify=verify)
kafka_aggregate_topic = "dmarc_aggrregate"
kafka_forensic_topic = "dmarc_forensic"
if args.kafka_aggregate_topic:
kafka_aggregate_topic = args.kafka_aggregate_topic
if args.kafka_forensic_topic:
kafka_forensic_topic = args.kafka_forensic_topic
kafka_aggregate_topic = args.kafka_aggregate_topic
kafka_forensic_topic = args.kafka_forensic_topic
file_paths = []
for file_path in args.file_path:
@@ -333,7 +326,7 @@ def _main():
exit(1)
if args.host and args.watch:
logger.info("Watching for email - Quit with ^c")
logger.info("Watching for email - Quit with ctrl-c")
ssl = True
if args.imap_no_ssl:
ssl = False

View File

@@ -1,11 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import json
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
import json
logger = logging.getLogger("parsedmarc")
class KafkaError(RuntimeError):
"""Raised when a Kafka error occurs"""
@@ -13,12 +16,12 @@ class KafkaError(RuntimeError):
class KafkaClient(object):
def __init__(self, kafka_hosts):
try:
def serializer(v): lambda v: json.dumps(v).encode('utf-8')
self.producer = KafkaProducer(
value_serializer=serializer,
value_serializer=lambda v: json.dumps(v).encode(
'utf-8'),
bootstrap_servers=kafka_hosts)
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers availabe")
raise KafkaError("No Kafka brokers available")
def save_aggregate_reports_to_kafka(self, aggregate_reports,
aggregate_topic):
@@ -27,7 +30,8 @@ class KafkaClient(object):
Args:
aggregate_reports (list): A list of aggregate report dictionaries
to save to kafka
to save to Kafka
aggregate_topic (str): The name of the Kafka topic
"""
if type(aggregate_reports) == dict:
@@ -37,10 +41,19 @@ class KafkaClient(object):
return
try:
logger.debug("Saving aggregate reports to Kafka")
self.producer.send(aggregate_topic, aggregate_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Unknown topic or partition on broker")
self.producer.flush()
raise KafkaError(
"Kafka error: Unknown topic or partition on broker")
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
try:
self.producer.flush()
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic):
"""
@@ -48,7 +61,8 @@ class KafkaClient(object):
Args:
forensic_reports (list): A list of forensic report dicts
to save to kafka
to save to Kafka
forensic_topic (str): The name of the Kafka topic
"""
if type(forensic_reports) == dict:
@@ -58,7 +72,16 @@ class KafkaClient(object):
return
try:
logger.debug("Saving forensic reports to Kafka")
self.producer.send(forensic_topic, forensic_reports)
except UnknownTopicOrPartitionError:
raise KafkaError("Unknown topic or partition on broker")
self.producer.flush()
raise KafkaError(
"Kafka error: Unknown topic or partition on broker")
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
try:
self.producer.flush()
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))

View File

@@ -10,10 +10,11 @@ mail-parser
dateparser
elasticsearch>=6.3.0,<7.0.0
elasticsearch-dsl>=6.2.1,<7.0.0
kafka-python
flake8
sphinx==1.7.9
sphinx_rtd_theme
collective.checkdocs
wheel
rstcheck
kafka-python

View File

@@ -14,7 +14,7 @@ from setuptools import setup
from codecs import open
from os import path
__version__ = "4.2.0k"
__version__ = "4.2.1"
description = "A Python package and CLI for parsing aggregate and " \
"forensic DMARC reports"
@@ -94,7 +94,7 @@ setup(
install_requires=['dnspython', 'publicsuffix', 'xmltodict', 'geoip2',
'urllib3', 'requests', 'imapclient', 'mail-parser',
'dateparser', 'elasticsearch>=6.3.0,<7.0.0',
'elasticsearch-dsl>=6.2.1,<7.0.0'
'elasticsearch-dsl>=6.2.1,<7.0.0', 'kafka-python'
],
entry_points={