diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 43d2184..4e0fb97 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -119,9 +119,8 @@ class KafkaClient(object): logger.debug("Saving aggregate report to Kafka") self.producer.send(aggregate_topic, slice) except UnknownTopicOrPartitionError: - raise KafkaError( - "Kafka error: Unknown topic or partition" - " on broker") + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") except Exception as e: raise KafkaError( "Kafka error: {0}".format(e.__str__())) @@ -132,34 +131,34 @@ class KafkaClient(object): "Kafka error: {0}".format(e.__str__())) def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): - """ - Saves forensic DMARC reports to Kafka, sends individual - records (slices) since Kafka requires messages to be <= 1MB - by default. + """ + Saves forensic DMARC reports to Kafka, sends individual + records (slices) since Kafka requires messages to be <= 1MB + by default. - Args: - forensic_reports (list): A list of forensic report dicts - to save to Kafka - forensic_topic (str): The name of the Kafka topic + Args: + forensic_reports (list): A list of forensic report dicts + to save to Kafka + forensic_topic (str): The name of the Kafka topic - """ - if type(forensic_reports) == dict: - forensic_reports = [forensic_reports] + """ + if type(forensic_reports) == dict: + forensic_reports = [forensic_reports] - if len(forensic_reports) < 1: - return + if len(forensic_reports) < 1: + return - try: - logger.debug("Saving forensic reports to Kafka") - self.producer.send(forensic_topic, forensic_reports) - except UnknownTopicOrPartitionError: - raise KafkaError( - "Kafka error: Unknown topic or partition on broker") - except Exception as e: - raise KafkaError( - "Kafka error: {0}".format(e.__str__())) - try: - self.producer.flush() - except Exception as e: - raise KafkaError( - "Kafka error: {0}".format(e.__str__())) + try: + logger.debug("Saving forensic reports to Kafka") + self.producer.send(forensic_topic, forensic_reports) + except UnknownTopicOrPartitionError: + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__()))