Elasticsearch index migration

This commit is contained in:
Sean Whalen
2018-11-17 20:43:55 -05:00
parent 7d2301c5bd
commit aa88d3eeb4
2 changed files with 32 additions and 5 deletions
+4 -2
View File
@@ -1451,7 +1451,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
raise IMAPError("DNS resolution failed")
except ConnectionRefusedError:
raise IMAPError("Connection refused")
except ConnectionResetError:
except (KeyError, ConnectionResetError):
logger.debug("IMAP error: Connection reset")
logger.debug("Reconnecting watcher")
server = imapclient.IMAPClient(host)
@@ -1542,7 +1542,7 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
raise IMAPError("DNS resolution failed")
except ConnectionRefusedError:
raise IMAPError("Connection refused")
except ConnectionResetError:
except (KeyError, ConnectionResetError):
logger.debug("IMAP error: Connection reset")
logger.debug("Reconnecting watcher")
server = imapclient.IMAPClient(host)
@@ -1558,6 +1558,8 @@ def watch_inbox(host, username, password, callback, port=None, ssl=True,
test=test,
nameservers=ns,
dns_timeout=dt)
callback(res)
server.idle()
except ConnectionAbortedError:
raise IMAPError("Connection aborted")
except TimeoutError:
+28 -3
View File
@@ -7,6 +7,8 @@ import json
from elasticsearch_dsl.search import Q
from elasticsearch_dsl import connections, Object, Document, Index, Nested, \
InnerDoc, Integer, Text, Boolean, DateRange, Ip, Date
from elasticsearch.helpers import reindex
from parsedmarc.utils import human_timestamp_to_datetime
@@ -29,7 +31,7 @@ class _PublishedPolicy(InnerDoc):
p = Text()
sp = Text()
pct = Integer()
fo = Integer() # TODO: Change this to Text (issue #31)
fo = Text() # TODO: Change this to Text (issue #31)
class _DKIMResult(InnerDoc):
@@ -211,6 +213,7 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
aggregate_indexes (list): A list of aggregate index names
forensic_indexes (list): A list of forensic index names
"""
version = 2
if aggregate_indexes is None:
aggregate_indexes = []
if forensic_indexes is None:
@@ -221,10 +224,32 @@ def migrate_indexes(aggregate_indexes=None, forensic_indexes=None):
fo_field = "published_policy.fo"
fo = "fo"
fo_mapping = aggregate_index.get_field_mapping(fields=[fo_field])[
aggregate_index_name]["mappings"][doc][fo_field]["mapping"][fo]
aggregate_index_name]["mappings"]
if doc not in fo_mapping:
continue
fo_mapping = fo_mapping[doc][fo_field]["mapping"][fo]
fo_type = fo_mapping["type"]
if fo_type == "long":
pass # TODO: Do reindex, delete, and alias here (issue #31)
# TODO: Do reindex, delete, and alias here (issue #31)
new_index_name = "{0}-v{1}".format(aggregate_index_name, version)
body = {"properties": {"published_policy.fo": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
Index(new_index_name).create()
Index(new_index_name).put_mapping(doc_type=doc, body=body)
reindex(connections.get_connection(), aggregate_index_name, new_index_name)
Index(aggregate_index_name).delete()
Index(new_index_name).put_alias(name=aggregate_index_name)
for forensic_index in forensic_indexes:
pass