| #!/usr/bin/env python2 |
| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Standalone service to monitor AFE servers and report to ts_mon""" |
| |
| import argparse |
| import sys |
| import time |
| import logging |
| import multiprocessing |
| import urllib2 |
| |
| import common |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.frontend.afe.json_rpc import proxy |
| from autotest_lib.server import frontend |
| # import needed to setup host_attributes |
| # pylint: disable=unused-import |
| from autotest_lib.server import site_host_attributes |
| from autotest_lib.site_utils import server_manager_utils |
| from chromite.lib import metrics |
| from chromite.lib import ts_mon_config |
| |
| METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc' |
| METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations' |
| METRIC_TICK = METRIC_ROOT + '/tick' |
| METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error' |
| |
| FAILURE_REASONS = { |
| proxy.JSONRPCException: 'JSONRPCException', |
| } |
| |
| def afe_rpc_call(hostname): |
| """Perform one rpc call set on server |
| |
| @param hostname: server's hostname to poll |
| """ |
| afe_monitor = AfeMonitor(hostname) |
| try: |
| afe_monitor.run() |
| except Exception as e: |
| metrics.Counter(METRIC_MONITOR_ERROR).increment( |
| fields={'target_hostname': hostname}) |
| logging.exception('Exception when running against host %s', hostname) |
| |
| |
| def update_shards(shards, shards_lock, period=600, stop_event=None): |
| """Updates dict of shards |
| |
| @param shards: list of shards to be updated |
| @param shards_lock: shared lock for accessing shards |
| @param period: time between polls |
| @param stop_event: Event that can be set to stop polling |
| """ |
| while(not stop_event or not stop_event.is_set()): |
| start_time = time.time() |
| |
| logging.debug('Updating Shards') |
| new_shards = set(server_manager_utils.get_shards()) |
| |
| with shards_lock: |
| current_shards = set(shards) |
| rm_shards = current_shards - new_shards |
| add_shards = new_shards - current_shards |
| |
| if rm_shards: |
| for s in rm_shards: |
| shards.remove(s) |
| |
| if add_shards: |
| shards.extend(add_shards) |
| |
| if rm_shards: |
| logging.info('Servers left production: %s', str(rm_shards)) |
| |
| if add_shards: |
| logging.info('Servers entered production: %s', |
| str(add_shards)) |
| |
| wait_time = (start_time + period) - time.time() |
| if wait_time > 0: |
| time.sleep(wait_time) |
| |
| |
| def poll_rpc_servers(servers, servers_lock, shards=None, period=60, |
| stop_event=None): |
| """Blocking function that polls all servers and shards |
| |
| @param servers: list of servers to poll |
| @param servers_lock: lock to be used when accessing servers or shards |
| @param shards: list of shards to poll |
| @param period: time between polls |
| @param stop_event: Event that can be set to stop polling |
| """ |
| pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4) |
| |
| while(not stop_event or not stop_event.is_set()): |
| start_time = time.time() |
| with servers_lock: |
| all_servers = set(servers).union(shards) |
| |
| logging.debug('Starting Server Polling: %s', ', '.join(all_servers)) |
| pool.map(afe_rpc_call, all_servers) |
| |
| logging.debug('Finished Server Polling') |
| |
| metrics.Counter(METRIC_TICK).increment() |
| |
| wait_time = (start_time + period) - time.time() |
| if wait_time > 0: |
| time.sleep(wait_time) |
| |
| |
| class RpcFlightRecorder(object): |
| """Monitors a list of AFE""" |
| def __init__(self, servers, with_shards=True, poll_period=60): |
| """ |
| @param servers: list of afe services to monitor |
| @param with_shards: also record status on shards |
| @param poll_period: frequency to poll all services, in seconds |
| """ |
| self._manager = multiprocessing.Manager() |
| |
| self._poll_period = poll_period |
| |
| self._servers = self._manager.list(servers) |
| self._servers_lock = self._manager.RLock() |
| |
| self._with_shards = with_shards |
| self._shards = self._manager.list() |
| self._update_shards_ps = None |
| self._poll_rpc_server_ps = None |
| |
| self._stop_event = multiprocessing.Event() |
| |
| def start(self): |
| """Call to start recorder""" |
| if(self._with_shards): |
| shard_args = [self._shards, self._servers_lock] |
| shard_kwargs = {'stop_event': self._stop_event} |
| self._update_shards_ps = multiprocessing.Process( |
| name='update_shards', |
| target=update_shards, |
| args=shard_args, |
| kwargs=shard_kwargs) |
| |
| self._update_shards_ps.start() |
| |
| poll_args = [self._servers, self._servers_lock] |
| poll_kwargs= {'shards':self._shards, |
| 'period':self._poll_period, |
| 'stop_event':self._stop_event} |
| self._poll_rpc_server_ps = multiprocessing.Process( |
| name='poll_rpc_servers', |
| target=poll_rpc_servers, |
| args=poll_args, |
| kwargs=poll_kwargs) |
| |
| self._poll_rpc_server_ps.start() |
| |
| def close(self): |
| """Send close event to all sub processes""" |
| self._stop_event.set() |
| |
| |
| def termitate(self): |
| """Terminate processes""" |
| self.close() |
| if self._poll_rpc_server_ps: |
| self._poll_rpc_server_ps.terminate() |
| |
| if self._update_shards_ps: |
| self._update_shards_ps.terminate() |
| |
| if self._manager: |
| self._manager.shutdown() |
| |
| |
| def join(self, timeout=None): |
| """Blocking call until closed and processes complete |
| |
| @param timeout: passed to each process, so could be >timeout""" |
| if self._poll_rpc_server_ps: |
| self._poll_rpc_server_ps.join(timeout) |
| |
| if self._update_shards_ps: |
| self._update_shards_ps.join(timeout) |
| |
| def _failed(fields, msg_str, reason, err=None): |
| """Mark current run failed |
| |
| @param fields, ts_mon fields to mark as failed |
| @param msg_str, message string to be filled |
| @param reason: why it failed |
| @param err: optional error to log more debug info |
| """ |
| fields['success'] = False |
| fields['failure_reason'] = reason |
| logging.warning("%s failed - %s", msg_str, reason) |
| if err: |
| logging.debug("%s fail_err - %s", msg_str, str(err)) |
| |
| class AfeMonitor(object): |
| """Object that runs rpc calls against the given afe frontend""" |
| |
| def __init__(self, hostname): |
| """ |
| @param hostname: hostname of server to monitor, string |
| """ |
| self._hostname = hostname |
| self._afe = frontend.AFE(server=self._hostname) |
| self._metric_fields = {'target_hostname': self._hostname} |
| |
| |
| def run_cmd(self, cmd, expected=None): |
| """Runs rpc command and log metrics |
| |
| @param cmd: string of rpc command to send |
| @param expected: expected result of rpc |
| """ |
| metric_fields = self._metric_fields.copy() |
| metric_fields['command'] = cmd |
| metric_fields['success'] = True |
| metric_fields['failure_reason'] = '' |
| |
| with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS, |
| fields=dict(metric_fields), scale=0.001) as f: |
| |
| msg_str = "%s:%s" % (self._hostname, cmd) |
| |
| |
| try: |
| result = self._afe.run(cmd) |
| logging.debug("%s result = %s", msg_str, result) |
| if expected is not None and expected != result: |
| _failed(f, msg_str, 'IncorrectResponse') |
| |
| except urllib2.HTTPError as e: |
| _failed(f, msg_str, 'HTTPError:%d' % e.code) |
| |
| except Exception as e: |
| _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'), |
| err=e) |
| |
| if type(e) not in FAILURE_REASONS: |
| raise |
| |
| if f['success']: |
| logging.info("%s success", msg_str) |
| |
| |
| def run(self): |
| """Tests server and returns the result""" |
| self.run_cmd('get_server_time') |
| self.run_cmd('ping_db', [True]) |
| |
| |
| def get_parser(): |
| """Returns argparse parser""" |
| parser = argparse.ArgumentParser(description=__doc__) |
| |
| parser.add_argument('-a', '--afe', action='append', default=[], |
| help='Autotest FrontEnd server to monitor') |
| |
| parser.add_argument('-p', '--poll-period', type=int, default=60, |
| help='Frequency to poll AFE servers') |
| |
| parser.add_argument('--no-shards', action='store_false', dest='with_shards', |
| help='Disable shard updating') |
| |
| return parser |
| |
| |
| def main(argv): |
| """Main function |
| |
| @param argv: commandline arguments passed |
| """ |
| parser = get_parser() |
| options = parser.parse_args(argv[1:]) |
| |
| |
| if not options.afe: |
| options.afe = [global_config.global_config.get_config_value( |
| 'SERVER', 'global_afe_hostname', default='cautotest')] |
| |
| with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder', |
| indirect=True): |
| flight_recorder = RpcFlightRecorder(options.afe, |
| with_shards=options.with_shards, |
| poll_period=options.poll_period) |
| |
| flight_recorder.start() |
| flight_recorder.join() |
| |
| |
| if __name__ == '__main__': |
| main(sys.argv) |