[autotest] Scheduler, drone_manager, drone_utility stats.

Adds some useful stats to drone manager, handle agents and
drone utility. These stats should help us track processes,
figure out where the drone_manager latency is coming from
and draw correlations between the number of agents scheduled
and drone refresh time.

This cl also moves site_drone_utility's kill_process method
into drone_utility, and modifies the nuke_pids function to only
wait on and kill processes that haven't already died.

TEST=Ran suites.
BUG=chromium:400486
DEPLOY=scheduler

Change-Id: I56e6ee05fa2ae1935435dbc2055d7f99a9a89e5e
Reviewed-on: https://chromium-review.googlesource.com/211769
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/client/common_lib/site_utils.py b/client/common_lib/site_utils.py
index d5fe764..8f44a9e 100644
--- a/client/common_lib/site_utils.py
+++ b/client/common_lib/site_utils.py
@@ -222,9 +222,17 @@
 
     @param pid_list: List of PID's to kill.
     @param signal_queue: Queue of signals to send the PID's to terminate them.
+
+    @return: A mapping of the signal name to the number of processes it
+        was sent to.
     """
+    sig_count = {}
+    # Though this is slightly hacky it beats hardcoding names anyday.
+    sig_names = dict((k, v) for v, k in signal.__dict__.iteritems()
+                     if v.startswith('SIG'))
     for sig in signal_queue:
         logging.debug('Sending signal %s to the following pids:', sig)
+        sig_count[sig_names.get(sig, 'unknown_signal')] = len(pid_list)
         for pid in pid_list:
             logging.debug('Pid %d', pid)
             try:
@@ -233,10 +241,13 @@
                 # The process may have died from a previous signal before we
                 # could kill it.
                 pass
+        pid_list = [pid for pid in pid_list if base_utils.pid_is_alive(pid)]
+        if not pid_list:
+            break
         time.sleep(CHECK_PID_IS_ALIVE_TIMEOUT)
     failed_list = []
     if signal.SIGKILL in signal_queue:
-        return
+        return sig_count
     for pid in pid_list:
         if base_utils.pid_is_alive(pid):
             failed_list.append('Could not kill %d for process name: %s.' % pid,
@@ -244,6 +255,7 @@
     if failed_list:
         raise error.AutoservRunError('Following errors occured: %s' %
                                      failed_list, None)
+    return sig_count
 
 
 def externalize_host(host):
@@ -410,4 +422,4 @@
     try:
         return int(base_utils.system_output('pgrep -o ^X$')) > 0
     except Exception:
-        return False
\ No newline at end of file
+        return False
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 23fcff2..801fbe5 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -381,6 +381,9 @@
                 info = self._registered_pidfile_info[pidfile_id]
                 if info.num_processes is not None:
                     drone.active_processes += info.num_processes
+        stats.Gauge(self._STATS_KEY).send(
+                '%s.%s' % (drone.hostname.replace('.', '_'),
+                           'active_processes'), drone.active_processes)
 
 
     def _check_drone_process_limit(self, drone):
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index e9f1f53..a4fd0fe 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -345,7 +345,8 @@
     _DRONE_HOST = ssh_host.SSHHost
 
 
-    def create_drone(self, drone_hostname, mock_hostname):
+    def create_drone(self, drone_hostname, mock_hostname,
+                     timestamp_remote_calls=False):
         """Create and initialize a Remote Drone.
 
         @return: A remote drone instance.
@@ -355,7 +356,8 @@
         drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
                 mock_host)
         mock_host.is_up.expect_call().and_return(True)
-        return self._DRONE_CLASS(drone_hostname)
+        return self._DRONE_CLASS(drone_hostname,
+                                 timestamp_remote_calls=timestamp_remote_calls)
 
 
     def create_fake_pidfile_info(self, tag='tag', name='name'):
@@ -473,14 +475,16 @@
     _DRONE_HOST = local_host.LocalHost
 
 
-    def create_drone(self, drone_hostname, mock_hostname):
+    def create_drone(self, drone_hostname, mock_hostname,
+                     timestamp_remote_calls=False):
         """Create and initialize a Remote Drone.
 
         @return: A remote drone instance.
         """
         mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
         self.god.stub_function(drones.drone_utility, 'create_host')
-        local_drone = self._DRONE_CLASS()
+        local_drone = self._DRONE_CLASS(
+                timestamp_remote_calls=timestamp_remote_calls)
         self.god.stub_with(local_drone, '_host', mock_host)
         return local_drone
 
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 30cd336..1b262da 100755
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -11,6 +11,7 @@
 """
 
 
+import argparse
 import pickle, subprocess, os, shutil, sys, time, signal, getpass
 import datetime, traceback, tempfile, itertools, logging
 import common
@@ -30,7 +31,8 @@
 _TEMPORARY_DIRECTORY = 'drone_tmp'
 _TRANSFER_FAILED_FILE = '.transfer_failed'
 
-timer = stats.Timer('drone_utility')
+_STATS_KEY = 'drone_utility'
+timer = stats.Timer(_STATS_KEY)
 
 class _MethodCall(object):
     def __init__(self, method, args, kwargs):
@@ -210,14 +212,27 @@
         return results
 
 
-    def kill_process(self, process):
+    @timer.decorate
+    def kill_processes(self, process_list):
+        """Send signals escalating in severity to the processes in process_list.
+
+        @param process_list: A list of drone_manager.Process objects representing
+            the processes to kill.
+        """
+        kill_proc_key = 'kill_processes'
+        stats.Gauge(_STATS_KEY).send('%s.%s' % (kill_proc_key, 'net'),
+                                     len(process_list))
         signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
         try:
-            utils.nuke_pid(process.pid, signal_queue=signal_queue)
-        except error.AutoservPidAlreadyDeadError:
-            self._warn('Tried to kill a pid:%d that did not exist.' %
-                       process.pid)
-
+            logging.info('List of process to be killed: %s', process_list)
+            sig_counts = utils.nuke_pids(
+                            [process.pid for process in process_list],
+                            signal_queue=signal_queue)
+            for name, count in sig_counts.iteritems():
+                stats.Gauge(_STATS_KEY).send('%s.%s' % (kill_proc_key, name),
+                                             count)
+        except error.AutoservRunError as e:
+            self._warn('Error occured when killing processes. Error: %s' % e)
 
 
     def _convert_old_host_log(self, log_path):
@@ -477,6 +492,14 @@
                           separator))
 
 
+def _parse_args(args):
+    parser = argparse.ArgumentParser(description='Local drone process manager.')
+    parser.add_argument('--call_time',
+                        help='Time this process was invoked from the master',
+                        default=None, type=float)
+    return parser.parse_args(args)
+
+
 SiteDroneUtility = utils.import_site_class(
    __file__, 'autotest_lib.scheduler.site_drone_utility',
    'SiteDroneUtility', BaseDroneUtility)
@@ -495,6 +518,11 @@
             drone_logging_config.DroneLoggingConfig())
     with timer.get_client('decode'):
         calls = parse_input()
+    args = _parse_args(sys.argv[1:])
+    if args.call_time is not None:
+        stats.Gauge(_STATS_KEY).send('invocation_overhead',
+                                     time.time() - args.call_time)
+
     drone_utility = DroneUtility()
     return_value = drone_utility.execute_calls(calls)
     with timer.get_client('encode'):
diff --git a/scheduler/drones.py b/scheduler/drones.py
index f9d869b..edca060 100644
--- a/scheduler/drones.py
+++ b/scheduler/drones.py
@@ -1,6 +1,10 @@
 #pylint: disable-msg=C0111
 
-import cPickle, os, tempfile, logging
+import cPickle
+import logging
+import os
+import tempfile
+import time
 import common
 from autotest_lib.scheduler import drone_utility, email_manager
 from autotest_lib.client.bin import local_host
@@ -22,7 +26,13 @@
     * allowed_users: set of usernames allowed to use this drone.  if None,
             any user can use this drone.
     """
-    def __init__(self):
+    def __init__(self, timestamp_remote_calls=True):
+        """Instantiate an abstract drone.
+
+        @param timestamp_remote_calls: If true, drone_utility is invoked with
+            the --call_time option and the current time. Currently this is only
+            used for testing.
+        """
         self._calls = []
         self.hostname = None
         self.enabled = True
@@ -31,6 +41,7 @@
         self.allowed_users = None
         self._autotest_install_dir = AUTOTEST_INSTALL_DIR
         self._host = None
+        self.timestamp_remote_calls = timestamp_remote_calls
 
 
     def shutdown(self):
@@ -69,8 +80,12 @@
     def _execute_calls_impl(self, calls):
         if not self._host:
             raise ValueError('Drone cannot execute calls without a host.')
+        drone_utility_cmd = self._drone_utility_path
+        if self.timestamp_remote_calls:
+            drone_utility_cmd = '%s --call_time %s' % (
+                    drone_utility_cmd, time.time())
         logging.info("Running drone_utility on %s", self.hostname)
-        result = self._host.run('python %s' % self._drone_utility_path,
+        result = self._host.run('python %s' % drone_utility_cmd,
                                 stdin=cPickle.dumps(calls), stdout_tee=None,
                                 connect_timeout=300)
         try:
@@ -133,8 +148,9 @@
 
 
 class _LocalDrone(_AbstractDrone):
-    def __init__(self):
-        super(_LocalDrone, self).__init__()
+    def __init__(self, timestamp_remote_calls=True):
+        super(_LocalDrone, self).__init__(
+                timestamp_remote_calls=timestamp_remote_calls)
         self.hostname = 'localhost'
         self._host = local_host.LocalHost()
         self._drone_utility = drone_utility.DroneUtility()
@@ -151,8 +167,9 @@
 
 
 class _RemoteDrone(_AbstractDrone):
-    def __init__(self, hostname):
-        super(_RemoteDrone, self).__init__()
+    def __init__(self, hostname, timestamp_remote_calls=True):
+        super(_RemoteDrone, self).__init__(
+                timestamp_remote_calls=timestamp_remote_calls)
         self.hostname = hostname
         self._host = drone_utility.create_host(hostname)
         if not self._host.is_up():
diff --git a/scheduler/drones_unittest.py b/scheduler/drones_unittest.py
index a3c3953..9f9aadc 100755
--- a/scheduler/drones_unittest.py
+++ b/scheduler/drones_unittest.py
@@ -47,7 +47,7 @@
                 stdin=cPickle.dumps(mock_calls), stdout_tee=None,
                 connect_timeout=mock.is_instance_comparator(int)).and_return(
                         mock_result)
-        drone = drones._RemoteDrone('fakehost')
+        drone = drones._RemoteDrone('fakehost', timestamp_remote_calls=False)
         self.assertEqual('mock return', drone._execute_calls_impl(mock_calls))
         self.god.check_playback()
 
@@ -58,7 +58,7 @@
         drones.drone_utility.create_host.expect_call('fakehost').and_return(
                 self._mock_host)
         self._mock_host.is_up.expect_call().and_return(True)
-        drone = drones._RemoteDrone('fakehost')
+        drone = drones._RemoteDrone('fakehost', timestamp_remote_calls=False)
         mock_return={}
         mock_return['results'] = ['mock return']
         mock_return['warnings'] = []
diff --git a/scheduler/site_drone_utility.py b/scheduler/site_drone_utility.py
index 9a8e80f..9171c6d 100644
--- a/scheduler/site_drone_utility.py
+++ b/scheduler/site_drone_utility.py
@@ -9,16 +9,3 @@
 # This is coordinated with site_monitor_db.py.
 def check_parse(process_info):
     return process_info['comm'] == 'site_parse'
-
-
-class SiteDroneUtility(object):
-
-
-    def kill_processes(self, process_list):
-        signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
-        try:
-            logging.info('List of process to be killed: %s', process_list)
-            utils.nuke_pids([process.pid for process in process_list],
-                            signal_queue=signal_queue)
-        except error.AutoservRunError as e:
-            self._warn('Error occured when killing processes. Error: %s' % e)
\ No newline at end of file
diff --git a/scheduler/site_drones.py b/scheduler/site_drones.py
index 2022f64..ff43d92 100644
--- a/scheduler/site_drones.py
+++ b/scheduler/site_drones.py
@@ -10,11 +10,16 @@
     """
 
 
-    def __init__(self):
+    def __init__(self, timestamp_remote_calls=True):
         """
         Add a new private variable _processes_to_kill to _AbstractDrone
+
+        @param timestamp_remote_calls: If true, drone_utility is invoked with
+            the --call_time option and the current time. Currently this is only
+            used for testing.
         """
-        super(_SiteAbstractDrone, self).__init__()
+        super(_SiteAbstractDrone, self).__init__(
+                timestamp_remote_calls=timestamp_remote_calls)
         self._processes_to_kill = []
 
 
diff --git a/scheduler/thread_lib_unittest.py b/scheduler/thread_lib_unittest.py
index 66b59f0..3554cdb 100644
--- a/scheduler/thread_lib_unittest.py
+++ b/scheduler/thread_lib_unittest.py
@@ -29,7 +29,7 @@
         drones.drone_utility.create_host.expect_call(hostname).and_return(
                 self._mock_host)
         self._mock_host.is_up.expect_call().and_return(True)
-        return drones._RemoteDrone(hostname)
+        return drones._RemoteDrone(hostname, timestamp_remote_calls=False)
 
 
     def setUp(self):