blob: 4f0a0e0ee4426131d52a4e0cc94799badf99e845 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file defines helper functions for putting entries into elasticsearch.
"""Utils for sending metadata to elasticsearch
Elasticsearch is a key-value store NOSQL database.
Source is here: https://github.com/elasticsearch/elasticsearch
We will be using es to store our metadata.
For example, if we wanted to store the following metadata:
metadata = {
'host_id': 1
'job_id': 20
'time_start': 100000
'time_recorded': 100006
}
The following call will send metadata to the default es server.
autotest_es.post(index, metadata)
We can also specify which port and host to use.
Using for testing: Sometimes, when we choose a single index
to put entries into, we want to clear that index of all
entries before running our tests. Use clear_index function.
(see es_utils_functionaltest.py for an example)
This file also contains methods for sending queries to es. Currently,
the query (json dict) we send to es is quite complicated (but flexible).
For example, the below query returns job_id, host_id, and job_start
for all job_ids in [0, 99999] and host_id matching 10.
range_eq_query = {
'fields': ['job_id', 'host_id', 'job_start'],
'query': {
'filtered': {
'query': {
'match': {
'host_id': 10,
}
}
'filter': {
'range': {
'job_id': {
'gte': 0,
'lte': 99999,
}
}
}
}
}
}
To send a query once it is created, call execute_query() to send it to the
intended elasticsearch server. The query() function can be used to construct a
query with certain parameters and execute it all in one call.
"""
import es_utils
import common
from autotest_lib.client.common_lib import global_config
# Server and ports for elasticsearch (for metadata use only)
METADATA_ES_SERVER = global_config.global_config.get_config_value(
'CROS', 'ES_HOST', default=None)
ES_PORT = global_config.global_config.get_config_value(
'CROS', 'ES_PORT', type=int, default=9200)
ES_UDP_PORT = global_config.global_config.get_config_value(
'CROS', 'ES_UDP_PORT', type=int, default=9700)
# Whether to use http. udp is very little overhead (around 3 ms) compared to
# using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for
# subsequent connections.
ES_USE_HTTP = global_config.global_config.get_config_value(
'CROS', 'ES_USE_HTTP', type=bool, default=False)
# If CLIENT/metadata_index is not set, INDEX_METADATA falls back to
# autotest instance name (SERVER/hostname).
INDEX_METADATA = global_config.global_config.get_config_value(
'CLIENT', 'metadata_index', type=str, default=None)
if not INDEX_METADATA:
INDEX_METADATA = global_config.global_config.get_config_value(
'SERVER', 'hostname', type=str, default='localhost')
DEFAULT_BULK_POST_RETRIES = 5
def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT,
timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
udp_port=ES_UDP_PORT,
*args, **kwargs):
"""This function takes a series of arguments which are passed to the
es_utils.ESMetadata constructor, and any other arguments are passed to
its post() function. For an explanation of each, see those functions in
es_utils.
"""
if not host:
return
esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port,
timeout=timeout, index=index, udp_port=udp_port)
return esmd.post(*args, **kwargs)
def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT,
timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs):
"""This function takes a series of arguments which are passed to the
es_utils.ESMetadata constructor, and a list of metadata, then upload to
Elasticsearch server using Elasticsearch bulk API. This can greatly nhance
the performance of uploading data using HTTP.
For an explanation of each argument, see those functions in es_utils.
"""
if not host:
return True
esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
timeout=timeout, index=index,
udp_port=ES_UDP_PORT)
# bulk post may fail due to the amount of data, retry several times.
for _ in range(retries):
if esmd.bulk_post(data_list, *args, **kwargs):
return True
return False
def execute_query(host=METADATA_ES_SERVER, port=ES_PORT,
timeout=es_utils.DEFAULT_TIMEOUT, index=INDEX_METADATA,
*args, **kwargs):
"""This function takes a series of arguments which are passed to the
es_utils.ESMetadata constructor, and any other arguments are passed to
its execute_query() function. For an explanation of each, see those
functions in es_utils.
"""
esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
timeout=timeout, index=index, udp_port=0)
return esmd.execute_query(*args, **kwargs)
def query(host=METADATA_ES_SERVER, port=ES_PORT,
timeout=es_utils.DEFAULT_TIMEOUT,
index=INDEX_METADATA, *args, **kwargs):
"""This function takes a series of arguments which are passed to the
es_utils.ESMetadata constructor, and any other arguments are passed to
its query() function. For an explanation of each, see those functions in
es_utils.
"""
esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
timeout=timeout, index=index, udp_port=0)
return esmd.query(*args, **kwargs)