[autotest] Sync suite job keyvals to shards.
As we roll out shard, we need a way to communicate suite-level
metadata to shards. One way to do it is to store suite-level
metadata as job keyvals and sync them to shard, which is what
this CL is about.
Currently, only the keyval 'suite_min_duts' is needed by a shard.
In the future, we can pipe more keyvals to shards as needed like
file_bugs and retry if we were to move bug filing and retry
out of dynamic suites.
CQ-DEPEND=CL:254270
DEPLOY=apache,shard_client
BUG=chromium:432653
TEST=Setup a testing cluster with on master and one shard.
python
>> import common
>> from autotest_lib.server import frontend
>> afe = frontend.AFE(server='localhost:8001')
>> afe.run('shard_heartbeat', shard_hostname='localhost:8004')
Confirm the returned packet has 'suite_keyvals'
TEST=Setup a testing cluster with on master and one shard.
- Create a suite job and confirm suite keyvals are synced to shard.
- Confirm keyvals known to shard won't get synced multiple times.
TEST=unittest
TEST=Run dummy suite with run_suite, confirm everything still works.
Change-Id: I5d7816c2b3f6dfa374a297c09d597573f762c374
Reviewed-on: https://chromium-review.googlesource.com/254230
Tested-by: Fang Deng <fdeng@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Commit-Queue: Fang Deng <fdeng@chromium.org>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 5d37be2..6804c32 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -986,12 +986,15 @@
@param known_job_ids: List of ids of jobs the shard already has.
@param known_host_ids: List of ids of hosts the shard already has.
- @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
+ @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
+ (hosts, jobs, suite_job_keyvals).
"""
hosts = models.Host.assign_to_shard(shard, known_host_ids)
jobs = models.Job.assign_to_shard(shard, known_job_ids)
-
- return hosts, jobs
+ parent_job_ids = [job.parent_job_id for job in jobs]
+ suite_job_keyvals = models.JobKeyval.objects.filter(
+ job_id__in=parent_job_ids)
+ return hosts, jobs, suite_job_keyvals
def _persist_records_with_type_sent_from_shard(
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 334340c..2976e95 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -339,8 +339,8 @@
@param known_job_ids: List of ids of jobs the shard already has.
@param known_host_ids: List of ids of hosts the shard already has.
- @returns: Serialized representations of hosts, jobs and their dependencies
- to be inserted into a shard's database.
+ @returns: Serialized representations of hosts, jobs, suite job keyvals
+ and their dependencies to be inserted into a shard's database.
"""
# The following alternatives to sending host and job ids in every heartbeat
# have been considered:
@@ -378,12 +378,13 @@
with timer:
shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
- hosts, jobs = rpc_utils.find_records_for_shard(
+ hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
shard_obj,
known_job_ids=known_job_ids, known_host_ids=known_host_ids)
return {
'hosts': [host.serialize() for host in hosts],
'jobs': [job.serialize() for job in jobs],
+ 'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
}
diff --git a/scheduler/shard/shard_client.py b/scheduler/shard/shard_client.py
index ebb236b..5dfd6af 100755
--- a/scheduler/shard/shard_client.py
+++ b/scheduler/shard/shard_client.py
@@ -121,11 +121,14 @@
"""
hosts_serialized = heartbeat_response['hosts']
jobs_serialized = heartbeat_response['jobs']
+ suite_keyvals_serialized = heartbeat_response['suite_keyvals']
autotest_stats.Gauge(STATS_KEY).send(
'hosts_received', len(hosts_serialized))
autotest_stats.Gauge(STATS_KEY).send(
'jobs_received', len(jobs_serialized))
+ autotest_stats.Gauge(STATS_KEY).send(
+ 'suite_keyvals_received', len(suite_keyvals_serialized))
for host in hosts_serialized:
with transaction.commit_on_success():
@@ -133,11 +136,18 @@
for job in jobs_serialized:
with transaction.commit_on_success():
models.Job.deserialize(job)
+ for keyval in suite_keyvals_serialized:
+ with transaction.commit_on_success():
+ models.JobKeyval.deserialize(keyval)
host_ids = [h['id'] for h in hosts_serialized]
logging.info('Heartbeat response contains hosts %s', host_ids)
job_ids = [j['id'] for j in jobs_serialized]
logging.info('Heartbeat response contains jobs %s', job_ids)
+ parent_jobs_with_keyval = set([kv['job_id']
+ for kv in suite_keyvals_serialized])
+ logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
+ parent_jobs_with_keyval)
# If the master has just sent any jobs that we think have completed,
# re-sync them with the master. This is especially useful when a
diff --git a/scheduler/shard/shard_client_unittest.py b/scheduler/shard/shard_client_unittest.py
index f9c2330..012e8e2 100644
--- a/scheduler/shard/shard_client_unittest.py
+++ b/scheduler/shard/shard_client_unittest.py
@@ -4,6 +4,7 @@
import datetime
import mox
+import unittest
import common
@@ -50,7 +51,8 @@
def expect_heartbeat(self, shard_hostname='host1',
known_job_ids=[], known_host_ids=[],
hqes=[], jobs=[],
- side_effect=None, return_hosts=[], return_jobs=[]):
+ side_effect=None, return_hosts=[], return_jobs=[],
+ return_suite_keyvals=[]):
call = self.afe.run(
'shard_heartbeat', shard_hostname=shard_hostname,
hqes=hqes, jobs=jobs,
@@ -63,6 +65,7 @@
call.AndReturn({
'hosts': return_hosts,
'jobs': return_jobs,
+ 'suite_keyvals': return_suite_keyvals,
})
@@ -124,6 +127,7 @@
'owner': u'autotest_system',
'parse_failed_repair': True,
'priority': 40,
+ 'parent_job_id': 0,
'reboot_after': 0,
'reboot_before': 1,
'run_reset': True,
@@ -135,6 +139,13 @@
'timeout_mins': 1440}
+ def _get_sample_serialized_suite_keyvals(self):
+ return {'id': 1,
+ 'job_id': 0,
+ 'key': 'test_key',
+ 'value': 'test_value'}
+
+
def testHeartbeat(self):
"""Trigger heartbeat, verify RPCs and persisting of the responses."""
self.setup_mocks()
@@ -142,8 +153,11 @@
global_config.global_config.override_config_value(
'SHARD', 'shard_hostname', 'host1')
- self.expect_heartbeat(return_hosts=[self._get_sample_serialized_host()],
- return_jobs=[self._get_sample_serialized_job()])
+ self.expect_heartbeat(
+ return_hosts=[self._get_sample_serialized_host()],
+ return_jobs=[self._get_sample_serialized_job()],
+ return_suite_keyvals=[
+ self._get_sample_serialized_suite_keyvals()])
modified_sample_host = self._get_sample_serialized_host()
modified_sample_host['hostname'] = 'host2'
@@ -175,6 +189,10 @@
host = models.Host.objects.get(id=2)
self.assertEqual(host.hostname, 'host1')
+ # Check if suite keyval was saved to DB
+ suite_keyval = models.JobKeyval.objects.filter(job_id=0)[0]
+ self.assertEqual(suite_keyval.key, 'test_key')
+
sut.do_heartbeat()
# Ensure it wasn't overwritten
@@ -296,3 +314,7 @@
sut.loop()
self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ unittest.main()