5.1.0 - Add support for TLS/SSL and uerename/password auth for Kafka

This commit is contained in:
Sean Whalen
2018-11-27 12:02:30 -05:00
parent f19ea5b950
commit a227b73cfb
5 changed files with 60 additions and 12 deletions

View File

@@ -1,3 +1,8 @@
5.1.0
-----
- Add support for TLS/SSL and uerename/password auth to Kafka
5.0.2
-----

View File

@@ -38,7 +38,7 @@ from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime
from parsedmarc.utils import parse_email
__version__ = "5.0.2"
__version__ = "5.1.0"
logger = logging.getLogger("parsedmarc")
logger.debug("parsedmarc v{0}".format(__version__))

View File

@@ -29,7 +29,11 @@ def _main():
print(output_str)
if args.kafka_hosts:
try:
kafka_client = kafkaclient.KafkaClient(args.kafka_hosts)
kafka_client = kafkaclient.KafkaClient(
args.kafka_hosts,
username=args.kafka_username,
password=args.kafka_password
)
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))
if args.save_aggregate:
@@ -151,8 +155,20 @@ def _main():
help="Skip certificate verification for Splunk "
"HEC")
arg_parser.add_argument("-K", "--kafka-hosts", nargs="*",
help="A list of one or more Kafka hostnames"
" or URLs")
help="A list of one or more Kafka hostnames")
arg_parser.add_argument("--kafka-username",
help=""""An optional Kafka username
Note: For Azure Event hub, this is
literally $ConnectionString""")
arg_parser.add_argument("--kafka-password",
help=""""An optional Kafka password
Note: For Azure Event hub, this is
the Azure-provided connection string""")
arg_parser.add_argument("--kafka-use-ssl",
action="store_true",
help="""Use SSL/TLS to connect to Kafka
(implied when ``--kafka-username or
--kafka-password are provided)""")
arg_parser.add_argument("--kafka-aggregate-topic",
help="The Kafka topic to publish aggregate "
"reports to (Default: dmarc_aggregate)",

View File

@@ -2,6 +2,7 @@
import logging
import json
import ssl
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable, UnknownTopicOrPartitionError
@@ -16,16 +17,41 @@ class KafkaError(RuntimeError):
class KafkaClient(object):
def __init__(self, kafka_hosts):
try:
self.producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode(
def __init__(self, kafka_hosts, use_ssl=False, username=None,
password=None):
"""
Initializes the Kafka client
Args:
kafka_hosts (list): A list of Kafka hostnames
(with optional port numbers)
use_ssl (bool): Use a SSL/TLS connection
username (str): An optional username
password (str): An optional password
Notes:
``use_ssl=True`` is implied when a username or password are
supplied.
When using Azure Event Hubs, the username is literally
``$ConnectionString``, and the password is the
Azure Event Hub connection string.
"""
config = dict(value_serializer=lambda v: json.dumps(v).encode(
'utf-8'),
bootstrap_servers=kafka_hosts)
bootstrap_servers=kafka_hosts)
if use_ssl or username or password:
config["security_protocol"] = "SSL"
config["ssl_context"] = ssl.create_default_context()
if username or password:
config["sasl_plain_username"] = username or ""
config["sasl_plain_password"] = password or ""
try:
self.producer = KafkaProducer(**config)
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers available")
def strip_metadata(self, report):
@staticmethod
def strip_metadata(report):
"""
Duplicates org_name, org_email and report_id into JSON root
and removes report_metadata key to bring it more inline
@@ -38,7 +64,8 @@ class KafkaClient(object):
return report
def generate_daterange(self, report):
@staticmethod
def generate_daterange(report):
"""
Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS
based on begin and end dates for easier parsing in Kibana.

View File

@@ -14,7 +14,7 @@ from setuptools import setup
from codecs import open
from os import path
__version__ = "5.0.2"
__version__ = "5.1.0"
description = "A Python package and CLI for parsing aggregate and " \
"forensic DMARC reports"