| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import math |
| import threading |
| |
| import common |
| from autotest_lib.client.common_lib import env |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.client.common_lib.cros import retry |
| from autotest_lib.frontend.afe.json_rpc import proxy |
| from autotest_lib.server import frontend |
| try: |
| from chromite.lib import retry_util |
| from chromite.lib import timeout_util |
| except ImportError: |
| logging.warn('Unable to import chromite.') |
| retry_util = None |
| timeout_util = None |
| |
| try: |
| from chromite.lib import metrics |
| except ImportError: |
| logging.warn('Unable to import metrics from chromite.') |
| metrics = utils.metrics_mock |
| |
| |
| def convert_timeout_to_retry(backoff, timeout_min, delay_sec): |
| """Compute the number of retry attempts for use with chromite.retry_util. |
| |
| @param backoff: The exponential backoff factor. |
| @param timeout_min: The maximum amount of time (in minutes) to sleep. |
| @param delay_sec: The amount to sleep (in seconds) between each attempt. |
| |
| @return: The number of retry attempts in the case of exponential backoff. |
| """ |
| # Estimate the max_retry in the case of exponential backoff: |
| # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r) |
| # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) ) |
| # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff)) |
| # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff) |
| total_sleep = timeout_min * 60 |
| numerator = math.log10(1 - (total_sleep / delay_sec) * (1 - backoff)) |
| denominator = math.log10(backoff) |
| return int(math.ceil(numerator / denominator)) |
| |
| |
| class RetryingAFE(frontend.AFE): |
| """Wrapper around frontend.AFE that retries all RPCs. |
| |
| Timeout for retries and delay between retries are configurable. |
| """ |
| def __init__(self, timeout_min=30, delay_sec=10, **dargs): |
| """Constructor |
| |
| @param timeout_min: timeout in minutes until giving up. |
| @param delay_sec: pre-jittered delay between retries in seconds. |
| """ |
| self.timeout_min = timeout_min |
| self.delay_sec = delay_sec |
| super(RetryingAFE, self).__init__(**dargs) |
| |
| |
| def set_timeout(self, timeout_min): |
| """Set timeout minutes for the AFE server. |
| |
| @param timeout_min: The timeout minutes for AFE server. |
| """ |
| self.timeout_min = timeout_min |
| |
| |
| def run(self, call, **dargs): |
| """Method for running RPC call. |
| |
| @param call: A string RPC call. |
| @param dargs: the parameters of the RPC call. |
| """ |
| if retry_util is None: |
| raise ImportError('Unable to import chromite. Please consider ' |
| 'running build_externals to build site packages.') |
| # exc_retry: We retry if this exception is raised. |
| # raiselist: Exceptions that we raise immediately if caught. |
| exc_retry = Exception |
| raiselist = (ImportError, error.RPCException, proxy.JSONRPCException, |
| timeout_util.TimeoutError, error.ControlFileNotFound) |
| backoff = 2 |
| max_retry = convert_timeout_to_retry(backoff, self.timeout_min, |
| self.delay_sec) |
| |
| def _run(self, call, **dargs): |
| return super(RetryingAFE, self).run(call, **dargs) |
| |
| def handler(exc): |
| """Check if exc is an exc_retry or if it's in raiselist. |
| |
| @param exc: An exception. |
| |
| @return: True if exc is an exc_retry and is not |
| in raiselist. False otherwise. |
| """ |
| is_exc_to_check = isinstance(exc, exc_retry) |
| is_in_raiselist = isinstance(exc, raiselist) |
| return is_exc_to_check and not is_in_raiselist |
| |
| # If the call is not in main thread, signal can't be used to abort the |
| # call. In that case, use a basic retry which does not enforce timeout |
| # if the process hangs. |
| @retry.retry(Exception, timeout_min=self.timeout_min, |
| delay_sec=self.delay_sec, |
| raiselist=[ImportError, error.RPCException, |
| proxy.ValidationError]) |
| def _run_in_child_thread(self, call, **dargs): |
| return super(RetryingAFE, self).run(call, **dargs) |
| |
| if isinstance(threading.current_thread(), threading._MainThread): |
| # Set the keyword argument for GenericRetry |
| dargs['sleep'] = self.delay_sec |
| dargs['backoff_factor'] = backoff |
| # timeout_util.Timeout fundamentally relies on sigalrm, and doesn't |
| # work at all in wsgi environment (just emits logs spam). So, don't |
| # use it in wsgi. |
| try: |
| if env.IN_MOD_WSGI: |
| return retry_util.GenericRetry(handler, max_retry, _run, |
| self, call, **dargs) |
| with timeout_util.Timeout(self.timeout_min * 60): |
| return retry_util.GenericRetry(handler, max_retry, _run, |
| self, call, **dargs) |
| except timeout_util.TimeoutError: |
| c = metrics.Counter( |
| 'chromeos/autotest/retrying_afe/retry_timeout') |
| # Reserve field job_details for future use. |
| f = {'destination_server': self.server.split(':')[0], |
| 'call': call, |
| 'job_details': ''} |
| c.increment(fields=f) |
| raise |
| else: |
| return _run_in_child_thread(self, call, **dargs) |
| |
| |
| class RetryingTKO(frontend.TKO): |
| """Wrapper around frontend.TKO that retries all RPCs. |
| |
| Timeout for retries and delay between retries are configurable. |
| """ |
| def __init__(self, timeout_min=30, delay_sec=10, **dargs): |
| """Constructor |
| |
| @param timeout_min: timeout in minutes until giving up. |
| @param delay_sec: pre-jittered delay between retries in seconds. |
| """ |
| self.timeout_min = timeout_min |
| self.delay_sec = delay_sec |
| super(RetryingTKO, self).__init__(**dargs) |
| |
| |
| def run(self, call, **dargs): |
| """Method for running RPC call. |
| |
| @param call: A string RPC call. |
| @param dargs: the parameters of the RPC call. |
| """ |
| @retry.retry(Exception, timeout_min=self.timeout_min, |
| delay_sec=self.delay_sec, |
| raiselist=[ImportError, error.RPCException, |
| proxy.ValidationError]) |
| def _run(self, call, **dargs): |
| return super(RetryingTKO, self).run(call, **dargs) |
| return _run(self, call, **dargs) |