[Autotest] Allow the syncing of labels added/removed.

This allows us to perform label add/remove actions
on the master and have it pass down to the shards those hosts
are on by introducing a generic decorator.

The cl also fixes some logging issues with the shard-client,
and a bug with propogating the invalid bit of a host.

TEST=Added/removed labels.
BUG=chromium:431786
DEPLOY=apache

Change-Id: Ic176054e04a5508bb3e967ecc902422628446d32
Reviewed-on: https://chromium-review.googlesource.com/231558
Reviewed-by: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index b0a16f3..4dd4a29 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -278,8 +278,8 @@
 
 # Other interface for new CLI
 # add hosts to labels
->>> rpc_interface.label_add_hosts('label1', ['host1'])
->>> rpc_interface.label_add_hosts('label2', ['host1', 'host2'])
+>>> rpc_interface.label_add_hosts(id='label1', hosts=['host1'])
+>>> rpc_interface.label_add_hosts(id='label2', hosts=['host1', 'host2'])
 
 # check labels for hosts
 >>> data = rpc_interface.get_hosts(hostname='host1')
@@ -301,7 +301,7 @@
 [u'host1', u'host2']
 
 # remove a host from a label
->>> rpc_interface.label_remove_hosts('label2', ['host2'])
+>>> rpc_interface.label_remove_hosts(id='label2', hosts=['host2'])
 >>> data = rpc_interface.get_hosts(hostname='host1')
 >>> data[0]['labels']
 [u'label1', u'label2']
@@ -310,11 +310,11 @@
 [u'host1']
 
 # Remove multiple hosts from a label
->>> rpc_interface.label_add_hosts('label2', ['host2'])
+>>> rpc_interface.label_add_hosts(id='label2', hosts=['host2'])
 >>> data = rpc_interface.get_hosts(labels__name='label2')
 >>> [host['hostname'] for host in data]
 [u'host1', u'host2']
->>> rpc_interface.label_remove_hosts('label2', ['host2', 'host1'])
+>>> rpc_interface.label_remove_hosts(id='label2', hosts=['host2', 'host1'])
 >>> rpc_interface.get_hosts(labels__name='label2')
 []
 
@@ -749,11 +749,11 @@
 >>> unused = rpc_interface.add_host(hostname='ah3-blue')
 >>> unused = rpc_interface.add_host(hostname='ah4-blue')
 >>> two_id = rpc_interface.add_label(name='two-label')
->>> rpc_interface.label_add_hosts(two_id, ['ahost1', 'ahost2',
-...                                        'ah3-blue', 'ah4-blue'])
+>>> rpc_interface.label_add_hosts(
+...        id=two_id, hosts=['ahost1', 'ahost2', 'ah3-blue', 'ah4-blue'])
 >>> unused = rpc_interface.add_label(name='red-label')
 >>> blue_id = rpc_interface.add_label(name='blue-label')
->>> rpc_interface.label_add_hosts(blue_id, ['ah3-blue', 'ah4-blue'])
+>>> rpc_interface.label_add_hosts(id=blue_id, hosts=['ah3-blue', 'ah4-blue'])
 
 >>> rpc_interface.atomic_group_add_labels(mini_rack_group_id,
 ...                                       ['one-label', 'two-label',
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index ff8c28b..f8a83f8 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -382,6 +382,7 @@
                                          'hostattribute_set',
                                          'labels',
                                          'shard'])
+    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid'])
 
 
     def custom_deserialize_relation(self, link, data):
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 0c7130a..a21bd06 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -66,15 +66,39 @@
 def delete_label(id):
     models.Label.smart_get(id).delete()
 
-
+@rpc_utils.forward_multi_host_rpc_to_shards
 def label_add_hosts(id, hosts):
+    """Add the label with the given id to the list of hosts.
+
+    The given label will be created if it doesn't exist, provided the `id`
+    supplied is a label name not an int/long id.
+
+    @param id: An id or label name. More often a label name.
+    @param hosts: A list of hostnames or ids. More often hostnames.
+
+    @raises models.Label.DoesNotExist: If the id specified is an int/long
+        and a label with that id doesn't exist.
+    """
     host_objs = models.Host.smart_get_bulk(hosts)
-    label = models.Label.smart_get(id)
+    try:
+        # In the rare event that we're given an id and not a label name,
+        # it should already exist.
+        label = models.Label.smart_get(id)
+    except models.Label.DoesNotExist:
+        # This matches the type checks in smart_get, which is a hack
+        # in and off itself. The aim here is to create any non-existent
+        # label, which we cannot do if the 'id' specified isn't a label name.
+        if isinstance(id, basestring):
+            label = models.Label.smart_get(add_label(id))
+        else:
+            raise
+
     if label.platform:
         models.Host.check_no_platform(host_objs)
     label.host_set.add(*host_objs)
 
 
+@rpc_utils.forward_multi_host_rpc_to_shards
 def label_remove_hosts(id, hosts):
     host_objs = models.Host.smart_get_bulk(hosts)
     models.Label.smart_get(id).host_set.remove(*host_objs)
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index 0513456..d4002d2 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -51,8 +51,8 @@
     def test_multiple_platforms(self):
         platform2 = models.Label.objects.create(name='platform2', platform=True)
         self.assertRaises(model_logic.ValidationError,
-                          rpc_interface. label_add_hosts, 'platform2',
-                          ['host1', 'host2'])
+                          rpc_interface. label_add_hosts, id='platform2',
+                          hosts=['host1', 'host2'])
         self.assertRaises(model_logic.ValidationError,
                           rpc_interface.host_add_labels, 'host1', ['platform2'])
         # make sure the platform didn't get added
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 5c0deb1..1f4a694 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1056,6 +1056,47 @@
     return replacement
 
 
+def forward_multi_host_rpc_to_shards(func):
+    """This decorator forwards rpc calls that modify multiple hosts.
+
+    If a host is assigned to a shard, rpcs that change his attributes should be
+    forwarded to the shard. Some calls however, take a list of hosts and a
+    single id to modify, eg: label_add_hosts. This wrapper will sift through
+    the list of hosts, find each of their shards, and forward the rpc for
+    those hosts to that shard before calling the local version of the given rpc.
+
+    This assumes:
+        1. The rpc call uses `smart_get` to retrieve host objects, not the
+           stock django `get` call. This is true for most, if not all rpcs in
+           the rpc_interface.
+        2. The kwargs to the function contain either a list of host ids or
+           hostnames, keyed under 'hosts'. This is true for all the rpc
+           functions that use 'smart_get'.
+
+    @param func: The function to decorate
+
+    @returns: The function to replace func with.
+    """
+    def replacement(**kwargs):
+        if not is_shard():
+
+            # Figure out which hosts are on which shards.
+            shard_host_map = {}
+            for host in models.Host.smart_get_bulk(kwargs['hosts']):
+                if host.shard:
+                    shard_host_map.setdefault(
+                            host.shard.hostname, []).append(host.hostname)
+
+            # Execute the rpc against the appropriate shards.
+            for shard, hostnames in shard_host_map.iteritems():
+                kwargs['hosts'] = hostnames
+                run_rpc_on_multiple_hostnames(func.func_name, [shard],
+                                              **kwargs)
+        return func(**kwargs)
+
+    return replacement
+
+
 def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
     """Runs an rpc to multiple AFEs
 
diff --git a/scheduler/shard/shard_client.py b/scheduler/shard/shard_client.py
index fbba520..55e7e0a 100755
--- a/scheduler/shard/shard_client.py
+++ b/scheduler/shard/shard_client.py
@@ -17,13 +17,11 @@
 from autotest_lib.frontend import setup_django_environment
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import global_config
-from autotest_lib.client.common_lib import logging_manager
 from autotest_lib.client.common_lib.cros.graphite import stats
 from autotest_lib.frontend.afe import models, rpc_utils
 from autotest_lib.scheduler import email_manager
+from autotest_lib.scheduler import scheduler_lib
 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
-from autotest_lib.scheduler.shard import shard_logging_config
-
 
 
 """
@@ -197,7 +195,8 @@
         """
         job_ids = list(models.Job.objects.filter(
             hostqueueentry__complete=False).values_list('id', flat=True))
-        host_ids = list(models.Host.objects.values_list('id', flat=True))
+        host_ids = list(models.Host.objects.filter(
+                invalid=0).values_list('id', flat=True))
         return job_ids, host_ids
 
 
@@ -313,8 +312,9 @@
     parser = argparse.ArgumentParser(description='Shard client.')
     options = parser.parse_args()
 
-    logging_manager.configure_logging(
-        shard_logging_config.ShardLoggingConfig())
+    scheduler_lib.setup_logging(
+            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
+            None, timestamped_logfile_prefix='shard_client')
 
     logging.info("Setting signal handler.")
     signal.signal(signal.SIGINT, handle_signal)
diff --git a/scheduler/shard/shard_logging_config.py b/scheduler/shard/shard_logging_config.py
deleted file mode 100644
index fe9f99c..0000000
--- a/scheduler/shard/shard_logging_config.py
+++ /dev/null
@@ -1,12 +0,0 @@
-#pylint: disable-msg=C0111
-import common
-import logging, os
-from autotest_lib.client.common_lib import logging_config
-
-class ShardLoggingConfig(logging_config.LoggingConfig):
-    """Logging configuration for the shard client."""
-
-    def configure_logging(self, verbose=False):
-        super(ShardLoggingConfig, self).configure_logging(
-                                                  use_console=self.use_console,
-                                                  verbose=verbose)