diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index dcc9463..4c3b532 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -1451,7 +1451,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, raise IMAPError("DNS resolution failed") except ConnectionRefusedError: raise IMAPError("Connection refused") - except ConnectionResetError: + except (KeyError, ConnectionResetError): logger.debug("IMAP error: Connection reset") logger.debug("Reconnecting watcher") server = imapclient.IMAPClient(host) @@ -1542,7 +1542,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, raise IMAPError("DNS resolution failed") except ConnectionRefusedError: raise IMAPError("Connection refused") - except ConnectionResetError: + except (KeyError, ConnectionResetError): logger.debug("IMAP error: Connection reset") logger.debug("Reconnecting watcher") server = imapclient.IMAPClient(host) @@ -1558,6 +1558,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True, test=test, nameservers=ns, dns_timeout=dt) + callback(res) + server.idle() except ConnectionAbortedError: raise IMAPError("Connection aborted") except TimeoutError: diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 5cf72cc..fec89a9 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -7,6 +7,8 @@ import json from elasticsearch_dsl.search import Q from elasticsearch_dsl import connections, Object, Document, Index, Nested, \ InnerDoc, Integer, Text, Boolean, DateRange, Ip, Date +from elasticsearch.helpers import reindex + from parsedmarc.utils import human_timestamp_to_datetime @@ -29,7 +31,7 @@ class _PublishedPolicy(InnerDoc): p = Text() sp = Text() pct = Integer() - fo = Integer() # TODO: Change this to Text (issue #31) + fo = Text() # TODO: Change this to Text (issue #31) class _DKIMResult(InnerDoc): @@ -211,6 +213,7 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): aggregate_indexes (list): A list of aggregate index names forensic_indexes (list): A list of forensic index names """ + version = 2 if aggregate_indexes is None: aggregate_indexes = [] if forensic_indexes is None: @@ -221,10 +224,32 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): fo_field = "published_policy.fo" fo = "fo" fo_mapping = aggregate_index.get_field_mapping(fields=[fo_field])[ - aggregate_index_name]["mappings"][doc][fo_field]["mapping"][fo] + aggregate_index_name]["mappings"] + if doc not in fo_mapping: + continue + + fo_mapping = fo_mapping[doc][fo_field]["mapping"][fo] fo_type = fo_mapping["type"] if fo_type == "long": - pass # TODO: Do reindex, delete, and alias here (issue #31) + # TODO: Do reindex, delete, and alias here (issue #31) + new_index_name = "{0}-v{1}".format(aggregate_index_name, version) + body = {"properties": {"published_policy.fo": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + Index(new_index_name).create() + Index(new_index_name).put_mapping(doc_type=doc, body=body) + reindex(connections.get_connection(), aggregate_index_name, new_index_name) + Index(aggregate_index_name).delete() + Index(new_index_name).put_alias(name=aggregate_index_name) + for forensic_index in forensic_indexes: pass