[autotest] Forward the job created by specifying host label to shard

On AFE, we can create a job by specifying a host label.
Such jobs are not currently forwarded to a shard.
By creating a record in afe_jobs_dependency_labels table for the job,
we can make it forwarded.

BUG=chromium:508770
TEST=Puppylab. Create a job with a meta_host from AFE and check if
the job is forwarded to the shard.
DEPLOY=apache

Change-Id: I86388abb7f685f8c6950364ebed95e6b4081f143
Reviewed-on: https://chromium-review.googlesource.com/284924
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Fang Deng <fdeng@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 699eddf..89cc47d 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -1257,27 +1257,29 @@
     NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))'
 
     SQL_SHARD_JOBS = (
-        'SELECT t1.id FROM afe_jobs t1 '
+        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
         'INNER JOIN afe_host_queue_entries t2  ON '
-        '  (t1.id = t2.job_id '
-        '   AND t2.complete != 1 AND t2.active != 1 '
+        '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
         '   %(check_known_jobs)s) '
-        'INNER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) '
+        'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) '
         'WHERE (t3.label_id IN  '
         '  (SELECT label_id FROM afe_shards_labels '
         '   WHERE shard_id = %(shard_id)s) '
-        '  AND t2.complete != 1)'
+        '  OR t2.meta_host IN '
+        '  (SELECT label_id FROM afe_shards_labels '
+        '   WHERE shard_id = %(shard_id)s))'
         )
 
-    # Find frontend jobs that should be sent to shard.
+    # Jobs can be created with assigned hosts and have no dependency
+    # labels nor meta_host.
     # We are looking for:
-    #     - a job whose hqe's meta host is null
+    #     - a job whose hqe's meta_host is null
     #     - a job whose hqe has a host
     #     - one of the host's labels matches the shard's label.
     # Non-aborted known jobs, completed jobs, active jobs, jobs
-    # without hqe are exluded as we do for non-frontend jobs.
-    SQL_SHARD_FRONTEND_JOBS = (
-        'SELECT t1.id FROM afe_jobs t1 '
+    # without hqe are exluded as we do with SQL_SHARD_JOBS.
+    SQL_SHARD_JOBS_WITH_HOSTS = (
+        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
         'INNER JOIN afe_host_queue_entries t2 ON '
         '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
         '   AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL '
@@ -1287,8 +1289,9 @@
         '  (SELECT label_id FROM afe_shards_labels '
         '   WHERE shard_id = %(shard_id)s))'
         )
+
     # Even if we had filters about complete, active and aborted
-    # bits in the above two sqls, there is a chance that
+    # bits in the above two SQLs, there is a chance that
     # the result may still contain a job with an hqe with 'complete=1'
     # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.'
     # This happens when a job has two (or more) hqes and at least
@@ -1558,7 +1561,7 @@
             check_known_jobs_exclude = 'AND NOT ' + check_known_jobs
             check_known_jobs_include = 'OR ' + check_known_jobs
 
-        for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_FRONTEND_JOBS]:
+        for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]:
             query = Job.objects.raw(sql % {
                     'check_known_jobs': check_known_jobs_exclude,
                     'shard_id': shard.id})
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index ac5d2ea..2761c31 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -569,7 +569,6 @@
 def create_new_job(owner, options, host_objects, metahost_objects,
                    atomic_group=None):
     all_host_objects = host_objects + metahost_objects
-    metahost_counts = _get_metahost_counts(metahost_objects)
     dependencies = options.get('dependencies', [])
     synch_count = options.get('synch_count')