[autotest] Sync suite job keyvals to shards.

As we roll out shard, we need a way to communicate suite-level
metadata to shards. One way to do it is to store suite-level
metadata as job keyvals and sync them to shard, which is what
this CL is about.

Currently, only the keyval 'suite_min_duts' is needed by a shard.
In the future, we can pipe more keyvals to shards as needed like
file_bugs and retry if we were to move bug filing and retry
out of dynamic suites.

CQ-DEPEND=CL:254270
DEPLOY=apache,shard_client
BUG=chromium:432653
TEST=Setup a testing cluster with on master and one shard.
python
>> import common
>> from autotest_lib.server import frontend
>> afe = frontend.AFE(server='localhost:8001')
>> afe.run('shard_heartbeat', shard_hostname='localhost:8004')
Confirm the returned packet has 'suite_keyvals'
TEST=Setup a testing cluster with on master and one shard.
- Create a suite job and confirm suite keyvals are synced to shard.
- Confirm keyvals known to shard won't get synced multiple times.
TEST=unittest
TEST=Run dummy suite with run_suite, confirm everything still works.

Change-Id: I5d7816c2b3f6dfa374a297c09d597573f762c374
Reviewed-on: https://chromium-review.googlesource.com/254230
Tested-by: Fang Deng <fdeng@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Commit-Queue: Fang Deng <fdeng@chromium.org>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 5d37be2..6804c32 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -986,12 +986,15 @@
     @param known_job_ids: List of ids of jobs the shard already has.
     @param known_host_ids: List of ids of hosts the shard already has.
 
-    @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
+    @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
+              (hosts, jobs, suite_job_keyvals).
     """
     hosts = models.Host.assign_to_shard(shard, known_host_ids)
     jobs = models.Job.assign_to_shard(shard, known_job_ids)
-
-    return hosts, jobs
+    parent_job_ids = [job.parent_job_id for job in jobs]
+    suite_job_keyvals = models.JobKeyval.objects.filter(
+        job_id__in=parent_job_ids)
+    return hosts, jobs, suite_job_keyvals
 
 
 def _persist_records_with_type_sent_from_shard(
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 334340c..2976e95 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -339,8 +339,8 @@
     @param known_job_ids: List of ids of jobs the shard already has.
     @param known_host_ids: List of ids of hosts the shard already has.
 
-    @returns: Serialized representations of hosts, jobs and their dependencies
-              to be inserted into a shard's database.
+    @returns: Serialized representations of hosts, jobs, suite job keyvals
+              and their dependencies to be inserted into a shard's database.
     """
     # The following alternatives to sending host and job ids in every heartbeat
     # have been considered:
@@ -378,12 +378,13 @@
     with timer:
         shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
         rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
-        hosts, jobs = rpc_utils.find_records_for_shard(
+        hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
             shard_obj,
             known_job_ids=known_job_ids, known_host_ids=known_host_ids)
         return {
             'hosts': [host.serialize() for host in hosts],
             'jobs': [job.serialize() for job in jobs],
+            'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
         }
 
 
diff --git a/scheduler/shard/shard_client.py b/scheduler/shard/shard_client.py
index ebb236b..5dfd6af 100755
--- a/scheduler/shard/shard_client.py
+++ b/scheduler/shard/shard_client.py
@@ -121,11 +121,14 @@
         """
         hosts_serialized = heartbeat_response['hosts']
         jobs_serialized = heartbeat_response['jobs']
+        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
 
         autotest_stats.Gauge(STATS_KEY).send(
             'hosts_received', len(hosts_serialized))
         autotest_stats.Gauge(STATS_KEY).send(
             'jobs_received', len(jobs_serialized))
+        autotest_stats.Gauge(STATS_KEY).send(
+            'suite_keyvals_received', len(suite_keyvals_serialized))
 
         for host in hosts_serialized:
             with transaction.commit_on_success():
@@ -133,11 +136,18 @@
         for job in jobs_serialized:
             with transaction.commit_on_success():
                 models.Job.deserialize(job)
+        for keyval in suite_keyvals_serialized:
+            with transaction.commit_on_success():
+                models.JobKeyval.deserialize(keyval)
 
         host_ids = [h['id'] for h in hosts_serialized]
         logging.info('Heartbeat response contains hosts %s', host_ids)
         job_ids = [j['id'] for j in jobs_serialized]
         logging.info('Heartbeat response contains jobs %s', job_ids)
+        parent_jobs_with_keyval = set([kv['job_id']
+                                       for kv in suite_keyvals_serialized])
+        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
+                     parent_jobs_with_keyval)
 
         # If the master has just sent any jobs that we think have completed,
         # re-sync them with the master. This is especially useful when a
diff --git a/scheduler/shard/shard_client_unittest.py b/scheduler/shard/shard_client_unittest.py
index f9c2330..012e8e2 100644
--- a/scheduler/shard/shard_client_unittest.py
+++ b/scheduler/shard/shard_client_unittest.py
@@ -4,6 +4,7 @@
 
 import datetime
 import mox
+import unittest
 
 import common
 
@@ -50,7 +51,8 @@
     def expect_heartbeat(self, shard_hostname='host1',
                          known_job_ids=[], known_host_ids=[],
                          hqes=[], jobs=[],
-                         side_effect=None, return_hosts=[], return_jobs=[]):
+                         side_effect=None, return_hosts=[], return_jobs=[],
+                         return_suite_keyvals=[]):
         call = self.afe.run(
             'shard_heartbeat', shard_hostname=shard_hostname,
             hqes=hqes, jobs=jobs,
@@ -63,6 +65,7 @@
         call.AndReturn({
                 'hosts': return_hosts,
                 'jobs': return_jobs,
+                'suite_keyvals': return_suite_keyvals,
             })
 
 
@@ -124,6 +127,7 @@
                 'owner': u'autotest_system',
                 'parse_failed_repair': True,
                 'priority': 40,
+                'parent_job_id': 0,
                 'reboot_after': 0,
                 'reboot_before': 1,
                 'run_reset': True,
@@ -135,6 +139,13 @@
                 'timeout_mins': 1440}
 
 
+    def _get_sample_serialized_suite_keyvals(self):
+        return {'id': 1,
+                'job_id': 0,
+                'key': 'test_key',
+                'value': 'test_value'}
+
+
     def testHeartbeat(self):
         """Trigger heartbeat, verify RPCs and persisting of the responses."""
         self.setup_mocks()
@@ -142,8 +153,11 @@
         global_config.global_config.override_config_value(
                 'SHARD', 'shard_hostname', 'host1')
 
-        self.expect_heartbeat(return_hosts=[self._get_sample_serialized_host()],
-                              return_jobs=[self._get_sample_serialized_job()])
+        self.expect_heartbeat(
+                return_hosts=[self._get_sample_serialized_host()],
+                return_jobs=[self._get_sample_serialized_job()],
+                return_suite_keyvals=[
+                        self._get_sample_serialized_suite_keyvals()])
 
         modified_sample_host = self._get_sample_serialized_host()
         modified_sample_host['hostname'] = 'host2'
@@ -175,6 +189,10 @@
         host = models.Host.objects.get(id=2)
         self.assertEqual(host.hostname, 'host1')
 
+        # Check if suite keyval  was saved to DB
+        suite_keyval = models.JobKeyval.objects.filter(job_id=0)[0]
+        self.assertEqual(suite_keyval.key, 'test_key')
+
         sut.do_heartbeat()
 
         # Ensure it wasn't overwritten
@@ -296,3 +314,7 @@
         sut.loop()
 
         self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+    unittest.main()