Bulk indexing of all elastic docs

This creates a general purpose Elastic object cache in the Browbeat
Elastic class. Every call to insert pushes into a cache which is then
emptied at the end of the workload run, when the cache max
size value is full or when more than a given amount of time has passed.
By default 1000 entires or 10 minutes. With a final flush being called
on the destruction of the Elasticsearch object. (even gets called on
sigterm so in theory you can ctrl-c and save your results)
It is emptied using a 4 parallel workers and the bulk api.
So it should be efficient at inserting large amounts of data.

If the upload of any one of the objects fails the rest will not be
inserted and the entire cache, included previously successful inserts
will be written to the disk. Possible improvement is to have the helper
return a list of pass/failed inserts and dump only those objects.

Change-Id: I0fceb5888c6f7d3167320593177d9cdc72504878
This commit is contained in:
jkilpatr 2017-05-30 16:14:15 -04:00
parent 1d3c20394e
commit c02a2890bf

View File

@ -10,7 +10,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import deque
import elasticsearch
from elasticsearch import helpers
import logging
import json
import datetime
@ -25,8 +27,12 @@ browbeat_uuid = uuid.uuid4()
class Elastic(object):
def __init__(self, config, workload, tool="browbeat"):
def __init__(self, config, workload, tool="browbeat", cache_size=1000, max_cache_time=10):
self.config = config
self.cache = deque()
self.max_cache_size = cache_size
self.last_upload = datetime.datetime.utcnow()
self.max_cache_age = datetime.timedelta(minutes=max_cache_time)
self.logger = logging.getLogger('browbeat.Elastic')
self.es = elasticsearch.Elasticsearch([
{'host': self.config['elasticsearch']['host'],
@ -38,6 +44,9 @@ class Elastic(object):
self.index = "{}-{}-{}".format(tool,
workload, today.strftime('%Y.%m.%d'))
def __del__(self):
self.flush_cache()
def load_json(self, result):
json_data = None
self.logger.info("Loading JSON")
@ -74,28 +83,37 @@ class Elastic(object):
sys.exit(1)
return result
def index_result(
self,
result,
test_name,
result_dir,
identifier='',
_type='result',
_id=None):
# Used to transform the cache dict into a elastic insertable iterable
def cache_insertable_iterable(self):
output = deque()
for item in self.cache:
es_item = {}
es_item['_id'] = item['_id']
es_item['_source'] = item['result']
es_item['_type'] = item['_type']
es_item['_index'] = self.index
output.append(es_item)
return output
def flush_cache(self):
if len(self.cache) == 0:
return True
retry = 2
result['browbeat_uuid'] = str(browbeat_uuid)
result['cloud_name'] = self.config['browbeat']['cloud_name']
result['browbeat_config'] = self.config
for i in range(retry):
try:
self.es.index(index=self.index,
id=_id,
body=result,
doc_type=_type,
refresh=True)
self.logger.debug("Pushed data to Elasticsearch to index {}"
"and browbeat UUID {}".
format(self.index, result['browbeat_uuid']))
to_upload = helpers.parallel_bulk(self.es,
self.cache_insertable_iterable())
counter = 0
num_items = len(self.cache)
for item in to_upload:
self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
counter))
output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
self.index)
output += " and browbeat UUID {}".format(str(browbeat_uuid))
self.logger.info(output)
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return True
except Exception as Err:
self.logger.error(
@ -103,17 +121,25 @@ class Elastic(object):
" in 10 seconds")
self.logger.error("Exception: {}".format(Err))
time.sleep(10)
if i == (retry - 1):
self.logger.error(
"Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON")
elastic_file = os.path.join(
result_dir, test_name + '-' + identifier + '-elastic' + '.' + 'json')
with open(elastic_file, 'w') as result_file:
json.dump(result, result_file,
indent=4, sort_keys=True)
self.logger.info(
"Saved Elasticsearch consumable result JSON to {}".format(elastic_file))
if i == (retry-1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON for {} cached items".format(len(self.cache)))
for item in self.cache:
filename = item['test_name'] + '-' + item['identifier']
filename += '-elastic' + '.' + 'json'
elastic_file = os.path.join(item['result_dir'],
filename)
with open(elastic_file, 'w') as result_file:
json.dump(item['result'],
result_file,
indent=4,
sort_keys=True)
self.logger.info("Saved Elasticsearch consumable result JSON to {}".
format(elastic_file))
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return False
def get_software_metadata(self, index, role, browbeat_uuid):
@ -233,3 +259,28 @@ class Elastic(object):
return results['hits']['hits']
else:
return False
def index_result(self,
result,
test_name,
result_dir,
identifier='',
_type='result',
_id=None):
data = {}
result['browbeat_uuid'] = str(browbeat_uuid)
result['cloud_name'] = self.config['browbeat']['cloud_name']
result['browbeat_config'] = self.config
data['result'] = result
data['test_name'] = test_name
data['result_dir'] = result_dir
data['identifier'] = identifier
data['_type'] = _type
data['_id'] = _id
self.cache.append(data)
now = datetime.datetime.utcnow()
if len(self.cache) <= self.max_cache_size \
and (now - self.last_upload) <= self.max_cache_age:
return True
else:
return self.flush_cache()