[autotest] Forward RPCs that update host labels to shards

Other than label_add_hosts and label_remove_hosts,
we have 2 more RPCs that updates host labels.
They are host_add_labels and host_remove_labels.
These RPCs should be forwarded to shards.

BUG=chromium:497887
TEST=puppylab. test_host13 is a host in a shard.
>>> import common
>>> from autotest_lib.server import frontend
>>> afe = frontend.AFE()
>>> h = afe.get_hosts(['test_host13'])[0]
>>> h.add_labels(['l1','l3'])
Adding labels ['l1', 'l3'] to host test_host13
>>> h.remove_labels(['l1'])
DEPLOY=apache

Change-Id: Ic37c026efa009eff3aa5276cbbef8c3e1d62e508
Reviewed-on: https://chromium-review.googlesource.com/276211
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index fd9312e..131650b 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -242,8 +242,8 @@
 3
 
 # add hosts to labels
->>> rpc_interface.host_add_labels('host1', ['label1'])
->>> rpc_interface.host_add_labels('host2', ['label1', 'label2'])
+>>> rpc_interface.host_add_labels(id='host1', labels=['label1'])
+>>> rpc_interface.host_add_labels(id='host2', labels=['label1', 'label2'])
 
 # check labels for hosts
 >>> data = rpc_interface.get_hosts(hostname='host1')
@@ -265,7 +265,7 @@
 [u'host2']
 
 # remove a host from a label
->>> rpc_interface.host_remove_labels('host2', ['label2'])
+>>> rpc_interface.host_remove_labels(id='host2', labels=['label2'])
 >>> data = rpc_interface.get_hosts(hostname='host1')
 >>> data[0]['labels']
 [u'label1']
@@ -273,8 +273,8 @@
 []
 
 # Cleanup
->>> rpc_interface.host_remove_labels('host2', ['label1'])
->>> rpc_interface.host_remove_labels('host1', ['label1'])
+>>> rpc_interface.host_remove_labels(id='host2', labels=['label1'])
+>>> rpc_interface.host_remove_labels(id='host1', labels=['label1'])
 
 
 # Other interface for new CLI
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index e70a7a6..119d0dc 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -116,6 +116,7 @@
     label.host_set.add(*host_objs)
 
 
+@rpc_utils.route_rpc_to_master
 def label_add_hosts(id, hosts):
     """Adds a label with the given id to the given hosts.
 
@@ -129,11 +130,6 @@
     @raises ValueError: If the id specified is an int/long (label id)
                         while the label does not exist.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        rpc_utils.route_rpc_to_master('label_add_hosts', id=id, hosts=hosts)
-        return
-
     try:
         label = models.Label.smart_get(id)
     except models.Label.DoesNotExist:
@@ -177,6 +173,7 @@
     models.Label.smart_get(id).host_set.remove(*host_objs)
 
 
+@rpc_utils.route_rpc_to_master
 def label_remove_hosts(id, hosts):
     """Removes a label of the given id from the given hosts.
 
@@ -185,11 +182,6 @@
     @param id: id or name of a label.
     @param hosts: A list of hostnames or ids. More often hostnames.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        rpc_utils.route_rpc_to_master('label_remove_hosts', id=id, hosts=hosts)
-        return
-
     host_objs = models.Host.smart_get_bulk(hosts)
     rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
 
@@ -315,23 +307,62 @@
         host.update_object(update_data)
 
 
-def host_add_labels(id, labels):
-    labels = models.Label.smart_get_bulk(labels)
-    host = models.Host.smart_get(id)
+def add_labels_to_host(id, labels):
+    """Adds labels to a given host only in local DB.
 
-    platforms = [label.name for label in labels if label.platform]
+    @param id: id or hostname for a host.
+    @param labels: ids or names for labels.
+    """
+    label_objs = models.Label.smart_get_bulk(labels)
+    models.Host.smart_get(id).labels.add(*label_objs)
+
+
+@rpc_utils.route_rpc_to_master
+def host_add_labels(id, labels):
+    """Adds labels to a given host.
+
+    @param id: id or hostname for a host.
+    @param labels: ids or names for labels.
+
+    @raises ValidationError: If adding more than one platform label.
+    """
+    label_objs = models.Label.smart_get_bulk(labels)
+    platforms = [label.name for label in label_objs if label.platform]
     if len(platforms) > 1:
         raise model_logic.ValidationError(
             {'labels': 'Adding more than one platform label: %s' %
                        ', '.join(platforms)})
+
+    host_obj = models.Host.smart_get(id)
     if len(platforms) == 1:
-        models.Host.check_no_platform([host])
-    host.labels.add(*labels)
+        models.Host.check_no_platform([host_obj])
+
+    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
+                         id=id, labels=labels)
+    add_labels_to_host(id, labels)
 
 
+def remove_labels_from_host(id, labels):
+    """Removes labels from a given host only in local DB.
+
+    @param id: id or hostname for a host.
+    @param labels: ids or names for labels.
+    """
+    label_objs = models.Label.smart_get_bulk(labels)
+    models.Host.smart_get(id).labels.remove(*label_objs)
+
+
+@rpc_utils.route_rpc_to_master
 def host_remove_labels(id, labels):
-    labels = models.Label.smart_get_bulk(labels)
-    models.Host.smart_get(id).labels.remove(*labels)
+    """Removes labels from a given host.
+
+    @param id: id or hostname for a host.
+    @param labels: ids or names for labels.
+    """
+    host_obj = models.Host.smart_get(id)
+    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
+                         id=id, labels=labels)
+    remove_labels_from_host(id, labels)
 
 
 def get_host_attribute(attribute, **host_filter_data):
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index a0edc7d..d9550d8 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -54,7 +54,8 @@
                           rpc_interface. label_add_hosts, id='platform2',
                           hosts=['host1', 'host2'])
         self.assertRaises(model_logic.ValidationError,
-                          rpc_interface.host_add_labels, 'host1', ['platform2'])
+                          rpc_interface.host_add_labels,
+                          id='host1', labels=['platform2'])
         # make sure the platform didn't get added
         platforms = rpc_interface.get_labels(
             host__hostname__in=['host1', 'host2'], platform=True)
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index c043ae7..569fc42 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -7,6 +7,7 @@
 __author__ = 'showard@google.com (Steve Howard)'
 
 import datetime
+from functools import wraps
 import inspect
 import os
 import sys
@@ -1259,12 +1260,17 @@
             'SHARD', 'global_afe_hostname')
 
 
-def route_rpc_to_master(rpc_name, **kwargs):
+def route_rpc_to_master(func):
     """Route RPC to master AFE.
 
-    @param rpc_name: The name of the rpc.
-    @param **kwargs: The kwargs for the rpc.
+    @param func: The function to decorate
 
+    @returns: The function to replace func with.
     """
-    master_afe = frontend.AFE(server=get_global_afe_hostname())
-    return master_afe.run(rpc_name, **kwargs)
+    @wraps(func)
+    def replacement(**kwargs):
+        if server_utils.is_shard():
+            master_afe = frontend.AFE(server=get_global_afe_hostname())
+            return master_afe.run(func.func_name, **kwargs)
+        return func(**kwargs)
+    return replacement
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 0cd3739..5885e88 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -496,6 +496,7 @@
     return [s.get_details() for s in servers]
 
 
+@rpc_utils.route_rpc_to_master
 def get_stable_version(board=stable_version_utils.DEFAULT):
     """Get stable version for the given board.
 
@@ -504,36 +505,29 @@
              of CROS.stable_cros_version if stable_versinos table does not have
              entry of board DEFAULT.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        return rpc_utils.route_rpc_to_master('get_stable_version', board=board)
     return stable_version_utils.get(board)
 
 
+@rpc_utils.route_rpc_to_master
 def get_all_stable_versions():
     """Get stable versions for all boards.
 
     @return: A dictionary of board:version.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        return rpc_utils.route_rpc_to_master('get_all_stable_versions')
     return stable_version_utils.get_all()
 
 
+@rpc_utils.route_rpc_to_master
 def set_stable_version(version, board=stable_version_utils.DEFAULT):
     """Modify stable version for the given board.
 
     @param version: The new value of stable version for given board.
     @param board: Name of the board, default to value `DEFAULT`.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        return rpc_utils.route_rpc_to_master('set_stable_version',
-                                             version=version, board=board)
     stable_version_utils.set(version=version, board=board)
 
 
+@rpc_utils.route_rpc_to_master
 def delete_stable_version(board):
     """Modify stable version for the given board.
 
@@ -542,10 +536,6 @@
 
     @param board: Name of the board.
     """
-    # This RPC call should be accepted only by master.
-    if utils.is_shard():
-        return rpc_utils.route_rpc_to_master('delete_stable_version',
-                                             board=board)
     stable_version_utils.delete(board=board)