diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 7eaeb47..f81aff9 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -30,6 +30,7 @@ class KafkaClient(object): for report in aggregate_reports: self.producer.send(aggregate_topic, report) + self.producer.flush() def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): @@ -49,3 +50,4 @@ class KafkaClient(object): for report in forensic_reports: self.producer.send(forensic_topic, report) + self.producer.flush()