#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Sync the database to elasticsearch index for use by Searching tools."""
from __future__ import print_function, absolute_import
from json import dumps
from time import sleep
from threading import Thread
try:
from Queue import Queue
except ImportError: # pragma: no cover
from queue import Queue
from math import ceil
from datetime import datetime
import requests
from six import text_type
from elasticsearch import Elasticsearch, helpers, ElasticsearchException
from .config import get_config
from .root import Root
from .search_render import ELASTIC_INDEX, SearchRender
ELASTIC_CONNECT_ATTEMPTS = 40
ELASTIC_WAIT = 3
ELASTIC_ENDPOINT = get_config().get('elasticsearch', 'url')
SYNC_OBJECTS = [
'keys',
'values',
'relationships',
'transactions',
'projects',
'users',
'instruments',
'institutions',
'groups'
]
[docs]def es_client():
"""Get the elasticsearch client object."""
esclient = Elasticsearch(
[ELASTIC_ENDPOINT],
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60,
timeout=60
)
mapping_params = {
'properties': {
'key': {'type': 'keyword'},
'value': {'type': 'keyword'},
'key_value_pairs': {
'properties': {
'key': {'type': 'keyword'},
'value': {'type': 'keyword'}
}
},
'release': {
'type': 'keyword'
},
'updated_date': {
'type': 'date'
},
'created_date': {
'type': 'date'
},
'transaction_ids': {
'type': 'text',
'fielddata': True
},
'has_doi': {
'type': 'text',
'fielddata': True
},
'users': {
'properties': {
'keyword': {
'type': 'keyword'
},
}
},
'instruments': {
'properties': {
'keyword': {
'type': 'keyword'
},
}
},
'projects': {
'properties': {
'keyword': {
'type': 'keyword'
},
'closed_date': {
'type': 'date'
},
'actual_start_date': {
'type': 'date'
},
'actual_end_date': {
'type': 'date'
}
}
},
'institutions': {
'properties': {
'keyword': {
'type': 'keyword'
},
}
},
'science_themes': {
'properties': {
'keyword': {
'type': 'keyword'
},
}
},
'instrument_groups': {
'properties': {
'keyword': {
'type': 'keyword'
},
}
}
}
}
# pylint: disable=unexpected-keyword-arg
esclient.indices.create(index=ELASTIC_INDEX, ignore=400)
esclient.indices.put_mapping(
index=ELASTIC_INDEX,
doc_type='doc',
body=dumps(mapping_params)
)
# pylint: enable=unexpected-keyword-arg
return esclient
[docs]def try_es_connect(attempts=0):
"""Recursively try to connect to elasticsearch."""
try:
cli = es_client()
cli.info()
except ElasticsearchException as ex: # pragma: no cover pulled from metadata
if attempts < ELASTIC_CONNECT_ATTEMPTS:
sleep(ELASTIC_WAIT)
attempts += 1
try_es_connect(attempts)
else:
raise ex
[docs]def start_work(work_queue):
"""The main thread for the work."""
cli = es_client()
job = work_queue.get()
while job:
print('Starting {} ({}): {}'.format(job[0], job[1], job[2]))
try_doing_work(cli, job)
work_queue.task_done()
print('Finished {} ({}): {}'.format(job[0], job[1], job[2]))
job = work_queue.get()
work_queue.task_done()
[docs]def try_doing_work(cli, job):
"""Try doing some work even if you fail."""
tries_left = 5
success = False
while not success and tries_left:
try:
helpers.bulk(cli, yield_data(*job))
success = True
except ElasticsearchException: # pragma: no cover
tries_left -= 1
return success
[docs]def yield_data(obj, time_field, page, items_per_page, time_delta):
"""yield objects from obj for bulk ingest."""
get_args = {
'{time_field}': '{epoch}',
'{time_field}_operator': 'gt',
'page_number': '{page}',
'items_per_page': '{items_per_page}',
'recursion_depth': '0',
'recursion_limit': '1'
}
get_list = ['{}={}'.format(key, val) for key, val in get_args.items()]
url = '{base_url}/{orm_obj}?'+'&'.join(get_list)
resp = requests.get(
text_type(url).format(
base_url=get_config().get('metadata', 'endpoint_url'),
orm_obj=obj,
time_field=time_field,
epoch=time_delta.isoformat(),
page=page,
items_per_page=items_per_page
)
)
objs = resp.json()
return SearchRender.generate(obj, objs)
[docs]def create_worker_threads(threads, work_queue):
"""Create the worker threads and return the list."""
work_threads = []
for i in range(threads):
wthread = Thread(target=start_work, args=(work_queue,))
wthread.daemon = True
wthread.start()
work_threads.append(wthread)
return work_threads
[docs]def generate_work(items_per_page, work_queue, time_ago):
"""Generate the work from the db and send it to the work queue."""
now = datetime.now()
time_delta = (now - time_ago).replace(microsecond=0)
for obj in SYNC_OBJECTS:
for time_field in ['created', 'updated']:
resp = requests.get(
text_type('{base_url}/objectinfo/{orm_obj}?{time_field}={epoch}&{time_field}_operator=gt').format(
base_url=get_config().get('metadata', 'endpoint_url'),
time_field=time_field,
orm_obj=obj,
epoch=time_delta.isoformat()
)
)
total_count = resp.json()['record_count']
num_pages = int(ceil(float(total_count) / items_per_page))
for page in range(1, num_pages + 1):
work_queue.put(
(obj, time_field, page, items_per_page, time_delta))
[docs]def search_sync(args):
"""Main search sync subcommand."""
try_es_connect()
Root.try_meta_connect()
work_queue = Queue(32)
work_threads = create_worker_threads(args.threads, work_queue)
generate_work(args.items_per_page, work_queue, args.time_ago)
for i in range(args.threads):
work_queue.put(False)
for wthread in work_threads:
wthread.join()
work_queue.join()