Revert "chromite: Remove TreeIsOpen and dependencies."

This reverts commit e057820e36f58e99d052b914b104d1c0eba9fca6.

Reason for revert:Broke the CQ master builder

https://cros-goldeneye.corp.google.com/chromeos/healthmonitoring/buildDetails?buildbucketId=8915697068235833824

19:13:53: ERROR: <type 'exceptions.AttributeError'>: 'module' object has no attribute 'TreeIsClosedException'
Traceback (most recent call last):
  File "/b/s/w/ir/cache/cbuild/repository/chromite/lib/failures_lib.py", line 230, in wrapped_functor
    return functor(*args, **kwargs)
  File "/b/s/w/ir/cache/cbuild/repository/chromite/cbuildbot/stages/sync_stages.py", line 773, in PerformStage
    except validation_pool.TreeIsClosedException as e:
AttributeError: 'module' object has no attribute 'TreeIsClosedException'


Original change's description:
> chromite: Remove TreeIsOpen and dependencies.
> 
> TEST=./run_tests
> BUG=chromium:903420
> 
> Change-Id: If08789053b882127950b9fa45d53437d13131552
> Reviewed-on: https://chromium-review.googlesource.com/1568051
> Commit-Ready: Evan Hernandez <evanhernandez@chromium.org>
> Tested-by: Evan Hernandez <evanhernandez@chromium.org>
> Reviewed-by: Dhanya Ganesh <dhanyaganesh@chromium.org>

Bug: chromium:903420
Change-Id: If81f024e0f679eaef32b0a4744a1ee369e65b73d
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/1576281
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Commit-Queue: Keith Haddow <haddowk@chromium.org>
Tested-by: Keith Haddow <haddowk@chromium.org>
diff --git a/cbuildbot/stages/handle_changes_stages.py b/cbuildbot/stages/handle_changes_stages.py
index 5852016..b68b7b5 100644
--- a/cbuildbot/stages/handle_changes_stages.py
+++ b/cbuildbot/stages/handle_changes_stages.py
@@ -18,6 +18,8 @@
 from chromite.lib import cros_logging as logging
 from chromite.lib import hwtest_results
 from chromite.lib import metrics
+from chromite.lib import timeout_util
+from chromite.lib import tree_status
 
 
 class CommitQueueHandleChangesStage(generic_stages.BuilderStage):
@@ -115,6 +117,39 @@
             build_stages_dict, triage_relevant_changes.STAGE_SYNC))
     return builds_passed_sync_stage
 
+  def _CheckToTSanity(self):
+    """Check and return whether Top-of-tree is healthy and the tree is opened.
+
+    Returns:
+      A boolean indicating whether ToT is sane.
+    """
+    tot_sanity = True
+
+    # If the tree was not open when we acquired a pool, do not assume that
+    # tot was sane.
+    if not self.sync_stage.pool.tree_was_open:
+      logging.info('The tree was not open when changes were acquired so we are '
+                   'attributing failures to the broken tree rather than the '
+                   'changes.')
+      tot_sanity = False
+
+    if tot_sanity:
+      try:
+        status = tree_status.WaitForTreeStatus(
+            period=tree_status.DEFAULT_WAIT_FOR_TREE_STATUS_SLEEP,
+            timeout=tree_status.DEFAULT_WAIT_FOR_TREE_STATUS_TIMEOUT,
+            throttled_ok=True)
+        tot_sanity = (status == constants.TREE_OPEN)
+      except timeout_util.TimeoutError:
+        logging.warning('Timed out waiting for getting tree status in %s(s).',
+                        tree_status.DEFAULT_WAIT_FOR_TREE_STATUS_TIMEOUT)
+        tot_sanity = False
+
+      if not tot_sanity:
+        logging.info('The tree is not open now, so we are attributing '
+                     'failures to the broken tree rather than the changes.')
+    return tot_sanity
+
   def _HandleCommitQueueFailure(self, failing, inflight, no_stat,
                                 self_destructed):
     """Handle changes in the validation pool upon build failure or timeout.
@@ -173,11 +208,14 @@
           changes, messages, changes_by_config,
           passed_in_history_slaves_by_change, failing, inflight, no_stat)
 
+    tot_sanity = self._CheckToTSanity()
+
     if not self_destructed and inflight:
       # The master didn't destruct itself and some slave(s) timed out due to
       # unknown causes, so only reject infra changes (probably just chromite
       # changes).
-      self.sync_stage.pool.HandleValidationTimeout(changes=changes)
+      self.sync_stage.pool.HandleValidationTimeout(
+          sanity=tot_sanity, changes=changes)
       return
 
     failed_hwtests = None
@@ -200,6 +238,7 @@
     # what changes to reject.
     self.sync_stage.pool.HandleValidationFailure(
         messages,
+        sanity=tot_sanity,
         changes=changes,
         no_stat=no_stat,
         failed_hwtests=failed_hwtests)
diff --git a/cbuildbot/stages/handle_changes_stages_unittest.py b/cbuildbot/stages/handle_changes_stages_unittest.py
index c30096c..4390823 100644
--- a/cbuildbot/stages/handle_changes_stages_unittest.py
+++ b/cbuildbot/stages/handle_changes_stages_unittest.py
@@ -21,6 +21,8 @@
 from chromite.lib import config_lib
 from chromite.lib import constants
 from chromite.lib import hwtest_results
+from chromite.lib import timeout_util
+from chromite.lib import tree_status
 from chromite.lib.buildstore import FakeBuildStore
 
 
@@ -46,6 +48,8 @@
                      '_GetSlaveMappingAndCLActions',
                      return_value=(dict(), []))
     self.PatchObject(clactions, 'GetRelevantChangesForBuilds')
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     return_value=constants.TREE_OPEN)
     self.PatchObject(relevant_changes.RelevantChanges,
                      'GetPreviouslyPassedSlavesForChanges')
     self.mock_record_metrics = self.PatchObject(
@@ -58,10 +62,11 @@
   def tearDown(self):
     cidb.CIDBConnectionFactory.ClearMock()
 
-  def _MockSyncStage(self):
+  def _MockSyncStage(self, tree_was_open=True):
     sync_stage = sync_stages.CommitQueueSyncStage(self._run, self.buildstore)
     sync_stage.pool = mock.MagicMock()
     sync_stage.pool.applied = self.changes
+    sync_stage.pool.tree_was_open = tree_was_open
 
     sync_stage.pool.handle_failure_mock = self.PatchObject(
         sync_stage.pool, 'HandleValidationFailure')
@@ -101,16 +106,46 @@
                      '_GetBuildsPassedSyncStage')
     stage.sync_stage.pool.SubmitPartialPool.return_value = self.changes
 
-  def testHandleCommitQueueFailure(self):
+  def testHandleCommitQueueFailureWithOpenTree(self):
     """Test _HandleCommitQueueFailure with open tree."""
     stage = self.ConstructStage()
     self._MockPartialSubmit(stage)
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     return_value=constants.TREE_OPEN)
     self.PatchObject(generic_stages.BuilderStage,
                      'GetScheduledSlaveBuildbucketIds', return_value=[])
 
     stage._HandleCommitQueueFailure(set(['test1']), set(), set(), False)
     stage.sync_stage.pool.handle_failure_mock.assert_called_once_with(
-        mock.ANY, no_stat=set(), changes=self.changes,
+        mock.ANY, sanity=True, no_stat=set(), changes=self.changes,
+        failed_hwtests=None)
+
+  def testHandleCommitQueueFailureWithThrottledTree(self):
+    """Test _HandleCommitQueueFailure with throttled tree."""
+    stage = self.ConstructStage()
+    self._MockPartialSubmit(stage)
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     return_value=constants.TREE_THROTTLED)
+    self.PatchObject(generic_stages.BuilderStage,
+                     'GetScheduledSlaveBuildbucketIds', return_value=[])
+
+    stage._HandleCommitQueueFailure(set(['test1']), set(), set(), False)
+    stage.sync_stage.pool.handle_failure_mock.assert_called_once_with(
+        mock.ANY, sanity=False, no_stat=set(), changes=self.changes,
+        failed_hwtests=None)
+
+  def testHandleCommitQueueFailureWithClosedTree(self):
+    """Test _HandleCommitQueueFailure with closed tree."""
+    stage = self.ConstructStage()
+    self._MockPartialSubmit(stage)
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     side_effect=timeout_util.TimeoutError())
+    self.PatchObject(generic_stages.BuilderStage,
+                     'GetScheduledSlaveBuildbucketIds', return_value=[])
+
+    stage._HandleCommitQueueFailure(set(['test1']), set(), set(), False)
+    stage.sync_stage.pool.handle_failure_mock.assert_called_once_with(
+        mock.ANY, sanity=False, no_stat=set(), changes=self.changes,
         failed_hwtests=None)
 
   def testHandleCommitQueueFailureWithFailedHWtests(self):
@@ -127,17 +162,19 @@
     mock_get_hwtests = self.PatchObject(
         hwtest_results.HWTestResultManager,
         'GetFailedHWTestsFromCIDB', return_value=mock_failed_hwtests)
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     return_value=constants.TREE_OPEN)
     self.PatchObject(generic_stages.BuilderStage,
                      'GetScheduledSlaveBuildbucketIds', return_value=['123'])
 
     stage._HandleCommitQueueFailure(set(['test1']), set(), set(), False)
     stage.sync_stage.pool.handle_failure_mock.assert_called_once_with(
-        mock.ANY, no_stat=set(), changes=self.changes,
+        mock.ANY, sanity=True, no_stat=set(), changes=self.changes,
         failed_hwtests=mock_failed_hwtests)
     mock_get_hwtests.assert_called_once_with(db, [slave_build_id])
 
   def VerifyStage(self, failing, inflight, no_stat, handle_failure=False,
-                  handle_timeout=False, stage=None,
+                  handle_timeout=False, sane_tot=True, stage=None,
                   all_slaves=None, slave_stages=None, fatal=True,
                   self_destructed=False):
     """Runs and Verifies PerformStage.
@@ -148,6 +185,7 @@
       no_stat: The names of the builders that had no status.
       handle_failure: If True, calls HandleValidationFailure.
       handle_timeout: If True, calls HandleValidationTimeout.
+      sane_tot: If not true, assumes TOT is not sane.
       stage: If set, use this constructed stage, otherwise create own.
       all_slaves: Optional set of all slave configs.
       slave_stages: Optional list of slave stages.
@@ -214,12 +252,12 @@
 
     if handle_failure:
       stage.sync_stage.pool.handle_failure_mock.assert_called_once_with(
-          mock.ANY, no_stat=set(no_stat),
+          mock.ANY, no_stat=set(no_stat), sanity=sane_tot,
           changes=self.other_changes, failed_hwtests=mock.ANY)
 
     if handle_timeout:
       stage.sync_stage.pool.handle_timeout_mock.assert_called_once_with(
-          changes=self.other_changes)
+          sanity=mock.ANY, changes=self.other_changes)
 
   def testCompletionSuccess(self):
     """Verify stage when the completion_stage succeeded."""
diff --git a/cbuildbot/stages/sync_stages.py b/cbuildbot/stages/sync_stages.py
index db13542..8596534 100644
--- a/cbuildbot/stages/sync_stages.py
+++ b/cbuildbot/stages/sync_stages.py
@@ -1062,24 +1062,30 @@
 
     build_id = self._run.attrs.metadata.GetDict().get('build_id')
 
-    # In order to acquire a pool, we need an initialized buildroot.
-    if not git.FindRepoDir(self.repo.directory):
-      self.repo.Initialize()
+    try:
+      # In order to acquire a pool, we need an initialized buildroot.
+      if not git.FindRepoDir(self.repo.directory):
+        self.repo.Initialize()
 
-    query = constants.CQ_READY_QUERY
-    if self._run.options.cq_gerrit_override:
-      query = (self._run.options.cq_gerrit_override, None)
+      query = constants.CQ_READY_QUERY
+      if self._run.options.cq_gerrit_override:
+        query = (self._run.options.cq_gerrit_override, None)
 
-    self.pool = validation_pool.ValidationPool.AcquirePool(
-        overlays=self._run.config.overlays,
-        repo=self.repo,
-        build_number=self._run.buildnumber,
-        builder_name=self._run.GetBuilderName(),
-        buildbucket_id=self._run.options.buildbucket_id,
-        query=query,
-        dryrun=self._run.options.debug,
-        change_filter=self._ChangeFilter,
-        builder_run=self._run)
+      self.pool = validation_pool.ValidationPool.AcquirePool(
+          overlays=self._run.config.overlays,
+          repo=self.repo,
+          build_number=self._run.buildnumber,
+          builder_name=self._run.GetBuilderName(),
+          buildbucket_id=self._run.options.buildbucket_id,
+          query=query,
+          dryrun=self._run.options.debug,
+          check_tree_open=(not self._run.options.debug or
+                           self._run.options.mock_tree_status),
+          change_filter=self._ChangeFilter,
+          builder_run=self._run)
+    except validation_pool.TreeIsClosedException as e:
+      logging.warning(str(e))
+      return None
 
     build_identifier, _ = self._run.GetCIDBHandle()
     build_id = build_identifier.cidb_id
@@ -1957,6 +1963,7 @@
         if k.HasReadyFlag() or status_map[k] != constants.CL_STATUS_FAILED
     }
 
+    is_tree_open = tree_status.IsTreeOpen(throttled_ok=True)
     launch_count = 0
     cl_launch_count = 0
     launch_count_limit = (
@@ -1968,7 +1975,10 @@
       launches.setdefault(frozenset(plan), []).append(config)
 
     for plan, configs in launches.iteritems():
-      if launch_count >= launch_count_limit:
+      if not is_tree_open:
+        logging.info('Tree is closed, not launching configs %r for plan %s.',
+                     configs, cros_patch.GetChangesAsString(plan))
+      elif launch_count >= launch_count_limit:
         logging.info(
             'Hit or exceeded maximum launch count of %s this cycle, '
             'not launching configs %r for plan %s.', launch_count_limit,
@@ -2119,17 +2129,19 @@
       self._ProcessExpiry(c, v[0], v[1], pool, current_db_time)
 
     # Submit changes that are ready to submit, if we can.
-    pool.SubmitNonManifestChanges(reason=constants.STRATEGY_NONMANIFEST)
-    submit_reason = constants.STRATEGY_PRECQ_SUBMIT
-    will_submit = {c: submit_reason for c in will_submit}
-    submitted, _ = pool.SubmitChanges(will_submit)
+    if tree_status.IsTreeOpen(throttled_ok=True):
+      pool.SubmitNonManifestChanges(
+          check_tree_open=False, reason=constants.STRATEGY_NONMANIFEST)
+      submit_reason = constants.STRATEGY_PRECQ_SUBMIT
+      will_submit = {c: submit_reason for c in will_submit}
+      submitted, _ = pool.SubmitChanges(will_submit, check_tree_open=False)
 
-    # Record stats about submissions in monarch.
-    if db:
-      submitted_change_actions = db.GetActionsForChanges(submitted)
-      strategies = {m: constants.STRATEGY_PRECQ_SUBMIT for m in submitted}
-      clactions_metrics.RecordSubmissionMetrics(
-          clactions.CLActionHistory(submitted_change_actions), strategies)
+      # Record stats about submissions in monarch.
+      if db:
+        submitted_change_actions = db.GetActionsForChanges(submitted)
+        strategies = {m: constants.STRATEGY_PRECQ_SUBMIT for m in submitted}
+        clactions_metrics.RecordSubmissionMetrics(
+            clactions.CLActionHistory(submitted_change_actions), strategies)
 
     self._LaunchPreCQsIfNeeded(pool, changes)
 
@@ -2178,5 +2190,6 @@
         buildbucket_id=self._run.options.buildbucket_id,
         query=query,
         dryrun=self._run.options.debug,
+        check_tree_open=False,
         change_filter=self.ProcessChanges,
         builder_run=self._run)
diff --git a/cbuildbot/stages/sync_stages_unittest.py b/cbuildbot/stages/sync_stages_unittest.py
index c7b2be9..d3ff486 100644
--- a/cbuildbot/stages/sync_stages_unittest.py
+++ b/cbuildbot/stages/sync_stages_unittest.py
@@ -46,6 +46,8 @@
 from chromite.lib import osutils
 from chromite.lib import patch as cros_patch
 from chromite.lib import patch_unittest
+from chromite.lib import timeout_util
+from chromite.lib import tree_status
 from chromite.lib.buildstore import FakeBuildStore
 
 # It's normal for unittests to access protected members.
@@ -379,6 +381,8 @@
   def PerformSync(self,
                   committed=False,
                   num_patches=1,
+                  tree_open=True,
+                  tree_throttled=False,
                   pre_cq_status=constants.CL_STATUS_PASSED,
                   runs=0,
                   changes=None,
@@ -390,6 +394,9 @@
       committed: Value to be returned by mock patches' IsChangeCommitted.
                  Default: False.
       num_patches: The number of mock patches to create. Default: 1.
+      tree_open: If True, behave as if tree is open. Default: True.
+      tree_throttled: If True, behave as if tree is throttled
+                      (overriding the tree_open arg). Default: False.
       pre_cq_status: PreCQ status for mock patches. Default: passed.
       runs: The maximum number of times to allow validation_pool.AcquirePool
             to wait for additional changes. runs=0 means never wait for
@@ -409,6 +416,9 @@
         'approval_timestamp',
         time.time() - sync_stages.PreCQLauncherStage.LAUNCH_DELAY * 60)
     changes = changes or [MockPatch(**kwargs)] * num_patches
+    if tree_throttled:
+      for change in changes:
+        change.flags['COMR'] = '2'
     if pre_cq_status is not None:
       config = constants.PRE_CQ_DEFAULT_CONFIGS[0]
       new_build_id = self.buildstore.InsertBuild('Pre cq group', 1, config,
@@ -433,6 +443,23 @@
 
       self.PatchObject(
           gerrit.GerritHelper, 'Query', side_effect=Query, autospec=True)
+      if tree_throttled:
+        self.PatchObject(
+            tree_status,
+            'WaitForTreeStatus',
+            return_value=constants.TREE_THROTTLED,
+            autospec=True)
+      elif tree_open:
+        self.PatchObject(
+            tree_status,
+            'WaitForTreeStatus',
+            return_value=constants.TREE_OPEN,
+            autospec=True)
+      else:
+        self.PatchObject(
+            tree_status,
+            'WaitForTreeStatus',
+            side_effect=timeout_util.TimeoutError())
 
       exit_it = itertools.chain([False] * runs, itertools.repeat(True))
       self.PatchObject(
@@ -620,6 +647,13 @@
     self.assertItemsEqual(self.sync_stage.pool.candidates, changes)
     self.assertItemsEqual(self.sync_stage.pool.non_manifest_changes, [])
 
+  def testTreeClosureBlocksCommit(self):
+    """Test that tree closures block commits."""
+    self.assertRaises(
+        failures_lib.ExitEarlyException,
+        self._testCommitNonManifestChange,
+        tree_open=False)
+
 
 class PreCQLauncherStageTest(MasterCQSyncTestCase):
   """Tests for the PreCQLauncherStage."""
@@ -945,7 +979,7 @@
     self.PerformSync(pre_cq_status=None, changes=[change], patch_objects=False)
     submit_reason = constants.STRATEGY_PRECQ_SUBMIT
     verified_cls = {c: submit_reason for c in set([change])}
-    m.assert_called_with(verified_cls)
+    m.assert_called_with(verified_cls, check_tree_open=False)
 
   def testSubmitUnableInPreCQ(self):
     change = self._PrepareSubmittableChange()
@@ -1029,6 +1063,17 @@
         count_launches(),
         3 * sync_stages.PreCQLauncherStage.MAX_LAUNCHES_PER_CYCLE_DERIVATIVE)
 
+  def testNoLaunchClosedTree(self):
+    self.PatchObject(tree_status, 'IsTreeOpen', return_value=False)
+
+    # Create a change that is ready to be tested.
+    change = self._PrepareChangesWithPendingVerifications()[0]
+    change.approval_timestamp = 0
+
+    # Change should still be pending.
+    self.PerformSync(pre_cq_status=None, changes=[change], runs=2)
+    self.assertAllStatuses([change], constants.CL_PRECQ_CONFIG_STATUS_PENDING)
+
   def testDontTestSubmittedPatches(self):
     # Create a change that has been submitted.
     change = self._PrepareChangesWithPendingVerifications()[0]
@@ -1369,6 +1414,10 @@
     """See MasterCQSyncTestCase"""
     self._testDefaultSync()
 
+  def testTreeClosureIsOK(self):
+    """Test that tree closures block commits."""
+    self._testCommitNonManifestChange(tree_open=False)
+
   def _GetPreCQStatus(self, change):
     """Helper method to get pre-cq status of a CL from fake_db."""
     action_history = self.fake_db.GetActionsForChanges([change])
diff --git a/cbuildbot/validation_pool.py b/cbuildbot/validation_pool.py
index 5ad86cb..065317f 100644
--- a/cbuildbot/validation_pool.py
+++ b/cbuildbot/validation_pool.py
@@ -15,6 +15,7 @@
 import functools
 import httplib
 import os
+import random
 import time
 from xml.dom import minidom
 
@@ -60,6 +61,28 @@
 # The url prefix of the CL status page.
 CL_STATUS_URL_PREFIX = 'https://chromeos-cl-viewer-ui.googleplex.com/cl_status'
 
+class TreeIsClosedException(Exception):
+  """Raised when the tree is closed and we wanted to submit changes."""
+
+  def __init__(self, closed_or_throttled=False):
+    """Initialization.
+
+    Args:
+      closed_or_throttled: True if the exception is being thrown on a
+                           possibly 'throttled' tree. False if only
+                           thrown on a 'closed' tree. Default: False
+    """
+    if closed_or_throttled:
+      status_text = 'closed or throttled'
+      opposite_status_text = 'open'
+    else:
+      status_text = 'closed'
+      opposite_status_text = 'throttled or open'
+
+    super(TreeIsClosedException, self).__init__(
+        'Tree is %s.  Please set tree status to %s to '
+        'proceed.' % (status_text, opposite_status_text))
+
 
 class FailedToSubmitAllChangesException(failures_lib.StepFailure):
   """Raised if we fail to submit any change."""
@@ -145,6 +168,12 @@
   method that grabs the commits that are ready for validation.
   """
 
+  # Global variable to control whether or not we should allow CL's to get tried
+  # and/or committed when the tree is throttled.
+  # TODO(sosa): Remove this global once metrics show that this is the direction
+  # we want to go (and remove all additional throttled_ok logic from this
+  # module.
+  THROTTLED_OK = True
   GLOBAL_DRYRUN = False
   DEFAULT_TIMEOUT = 60 * 60 * 4
   SLEEP_TIMEOUT = 30
@@ -190,8 +219,6 @@
       pre_cq_trybot: If set to True, this is a Pre-CQ trybot. (Note: The Pre-CQ
         launcher is NOT considered a Pre-CQ trybot.)
       tree_was_open: Whether the tree was open when the pool was created.
-        IMPORTANT: This field does nothing but cannot be removed due to
-        pickling. Because this class is deprecated, we leave it.
       applied: List of CLs that have been applied to the current repo.
       buildbucket_id: Buildbucket id of the current build as a string or int.
                       None if not buildbucket scheduled.
@@ -274,7 +301,6 @@
     self._buildbucket_id = buildbucket_id
 
     # Set to False if the tree was not open when we acquired changes.
-    # Unused except for picking.
     self.tree_was_open = tree_was_open
 
     # A set of changes filtered by throttling, default to None.
@@ -336,6 +362,7 @@
   @failures_lib.SetFailureType(failures_lib.BuilderFailure)
   def AcquirePreCQPool(cls, *args, **kwargs):
     """See ValidationPool.__init__ for arguments."""
+    kwargs.setdefault('tree_was_open', True)
     kwargs.setdefault('pre_cq_trybot', True)
     kwargs.setdefault('is_master', True)
     kwargs.setdefault('applied', [])
@@ -407,7 +434,7 @@
 
   @classmethod
   def AcquirePool(cls, overlays, repo, build_number, builder_name,
-                  buildbucket_id, query, dryrun=False,
+                  buildbucket_id, query, dryrun=False, check_tree_open=True,
                   change_filter=None, builder_run=None):
     """Acquires the current pool from Gerrit.
 
@@ -425,12 +452,17 @@
       query: constants.CQ_READY_QUERY, PRECQ_READY_QUERY, or a custom
         query description of the form (<query_str>, None).
       dryrun: Don't submit anything to gerrit.
+      check_tree_open: If True, only return when the tree is open.
       change_filter: If set, use change_filter(pool, changes,
         non_manifest_changes) to filter out unwanted patches.
       builder_run: instance used to record CL actions to metadata and cidb.
 
     Returns:
       ValidationPool object.
+
+    Raises:
+      TreeIsClosedException: if the tree is closed (or throttled, if not
+                             |THROTTLED_OK|).
     """
     if change_filter is None:
       change_filter = lambda _, x, y: (x, y)
@@ -450,15 +482,26 @@
                       exc_info=True)
 
     end_time = time.time() + timeout
+    status = constants.TREE_OPEN
 
     while True:
       current_time = time.time()
       time_left = end_time - current_time
+      # Wait until the tree becomes open.
+      if check_tree_open:
+        try:
+          status = tree_status.WaitForTreeStatus(
+              period=cls.SLEEP_TIMEOUT, timeout=time_left,
+              throttled_ok=cls.THROTTLED_OK)
+        except timeout_util.TimeoutError:
+          raise TreeIsClosedException(
+              closed_or_throttled=not cls.THROTTLED_OK)
 
       # Sync so that we are up-to-date on what is committed.
       repo.Sync()
 
       gerrit_query, ready_fn = query
+      tree_was_open = (status == constants.TREE_OPEN)
 
       try:
         experimental_builders = tree_status.GetExperimentalBuilders()
@@ -477,6 +520,7 @@
           is_master=True,
           dryrun=dryrun,
           builder_run=builder_run,
+          tree_was_open=tree_was_open,
           applied=[],
           buildbucket_id=buildbucket_id)
 
@@ -493,6 +537,33 @@
 
     return pool
 
+  def _GetFailStreak(self):
+    """Returns the fail streak for the validation pool.
+
+    Queries CIDB for the last CQ_SEARCH_HISTORY builds from the current build_id
+    and returns how many of them haven't passed in a row. This is used for
+    tree throttled validation pool logic.
+    """
+    # TODO(sosa): Remove Google Storage Fail Streak Counter.
+    build_identifier, _ = self._run.GetCIDBHandle()
+    build_id = build_identifier.cidb_id
+    if not self.buildstore.AreClientsReady():
+      return 0
+
+    builds = self.buildstore.GetBuildHistory(self._run.config.name,
+                                             ValidationPool.CQ_SEARCH_HISTORY,
+                                             ignore_build_id=build_id)
+    number_of_failures = 0
+    # Iterate through the ordered list of builds until you get one that is
+    # passed.
+    for build in builds:
+      if build['status'] != constants.BUILDER_STATUS_PASSED:
+        number_of_failures += 1
+      else:
+        break
+
+    return number_of_failures
+
   def AddPendingCommitsIntoPool(self, manifest):
     """Add the pending commits from |manifest| into pool.
 
@@ -607,10 +678,12 @@
     we only complain after REJECTION_GRACE_PERIOD has passed since the patch
     was uploaded.
 
-    This helps in two situations:
-      1) If the developer is in the middle of marking a stack of changes as
+    This helps in three situations:
+      1) crbug.com/705023: if the error is a DependencyError, and the dependent
+         CL was filtered by a throttled tree, do not reject the CL set.
+      2) If the developer is in the middle of marking a stack of changes as
          ready, we won't reject their work until the grace period has passed.
-      2) If the developer marks a big circular stack of changes as ready, and
+      3) If the developer marks a big circular stack of changes as ready, and
          some change in the middle of the stack doesn't apply, the user will
          get a chance to rebase their change before we mark all the changes as
          'not ready'.
@@ -702,6 +775,34 @@
 
       logging.PrintBuildbotLink(s, change.url)
 
+  def FilterChangesForThrottledTree(self):
+    """Apply Throttled Tree logic to select patch candidates.
+
+    This should be called before any CLs are applied.
+
+    If the tree is throttled, we only test a random subset of our candidate
+    changes. Call this to select that subset, and throw away unrelated changes.
+
+    If the three was open when this pool was created, it does nothing.
+    """
+    if self.tree_was_open:
+      return
+
+    # By filtering the candidates before any calls to Apply, we can make sure
+    # that repeated calls to Apply always consider the same list of candidates.
+    fail_streak = self._GetFailStreak()
+    test_pool_size = max(1, int(len(self.candidates) / (1.5**fail_streak)))
+    random.shuffle(self.candidates)
+
+    removed = self.candidates[test_pool_size:]
+    if removed:
+      self.filtered_set = set(removed)
+      logging.info('As the tree is throttled, it only picks a random subset of '
+                   'candidate changes. Changes not picked up in this run: %s ',
+                   cros_patch.GetChangesAsString(removed))
+
+    self.candidates = self.candidates[:test_pool_size]
+
   def ApplyPoolIntoRepo(self, manifest=None, filter_fn=lambda p: True):
     """Applies changes from pool into the directory specified by the buildroot.
 
@@ -1043,21 +1144,39 @@
 
     return errors
 
-  def SubmitChanges(self, verified_cls):
+  def SubmitChanges(self, verified_cls, check_tree_open=True,
+                    throttled_ok=True):
     """Submits the given changes to Gerrit.
 
     Args:
       verified_cls: A dictionary mapping the fully verified changes to their
         string reasons for submission.
+      check_tree_open: Whether to check that the tree is open before submitting
+        changes. If this is False, TreeIsClosedException will never be raised.
+      throttled_ok: if |check_tree_open|, treat a throttled tree as open
 
     Returns:
       (submitted, errors) where submitted is a set of changes that were
       submitted, and errors is a map {change: error} containing changes that
       failed to submit.
+
+    Raises:
+      TreeIsClosedException: if the tree is closed.
     """
     assert self.is_master, 'Non-master builder calling SubmitPool'
     assert not self.pre_cq_trybot, 'Trybot calling SubmitPool'
 
+    # TODO(pprabhu) It is bad form for master-paladin to do work after its
+    # deadline has passed. Extend the deadline after waiting for slave
+    # completion and ensure that none of the follow up stages go beyond the
+    # deadline.
+    if (check_tree_open and not self.dryrun and not
+        tree_status.IsTreeOpen(period=self.SLEEP_TIMEOUT,
+                               timeout=self.DEFAULT_TIMEOUT,
+                               throttled_ok=throttled_ok)):
+      raise TreeIsClosedException(
+          closed_or_throttled=not throttled_ok)
+
     changes = verified_cls.keys()
     # Filter out changes that were modified during the CQ run.
     filtered_changes, errors = self.FilterModifiedChanges(changes)
@@ -1489,24 +1608,34 @@
           build_id,
           [clactions.CLAction.FromGerritPatchAndAction(change, action, reason)])
 
-  def SubmitNonManifestChanges(self, reason=None):
+  def SubmitNonManifestChanges(self, check_tree_open=True, reason=None):
     """Commits changes to Gerrit from Pool that aren't part of the checkout.
 
     Args:
-      reason: string reason for submission to be recorded in cidb. (Should be
-        None or constant with name STRATEGY_* from constants.py)
-    """
-    verified_cls = {c:reason for c in self.non_manifest_changes}
-    self.SubmitChanges(verified_cls)
-
-  def SubmitPool(self, reason=None):
-    """Commits changes to Gerrit from Pool.  This is only called by a master.
-
-    Args:
+      check_tree_open: Whether to check that the tree is open before submitting
+        changes. If this is False, TreeIsClosedException will never be raised.
       reason: string reason for submission to be recorded in cidb. (Should be
         None or constant with name STRATEGY_* from constants.py)
 
     Raises:
+      TreeIsClosedException: if the tree is closed.
+    """
+    verified_cls = {c:reason for c in self.non_manifest_changes}
+    self.SubmitChanges(verified_cls,
+                       check_tree_open=check_tree_open)
+
+  def SubmitPool(self, check_tree_open=True, throttled_ok=True, reason=None):
+    """Commits changes to Gerrit from Pool.  This is only called by a master.
+
+    Args:
+      check_tree_open: Whether to check that the tree is open before submitting
+        changes. If this is False, TreeIsClosedException will never be raised.
+      throttled_ok: if |check_tree_open|, treat a throttled tree as open
+      reason: string reason for submission to be recorded in cidb. (Should be
+        None or constant with name STRATEGY_* from constants.py)
+
+    Raises:
+      TreeIsClosedException: if the tree is closed.
       FailedToSubmitAllChangesException: if we can't submit a change.
     """
     # Note that SubmitChanges can throw an exception if it can't
@@ -1516,7 +1645,9 @@
     # knowing).  They *likely* will still fail, but this approach tries
     # to minimize wasting the developers time.
     verified_cls = {c:reason for c in self.applied}
-    submitted, errors = self.SubmitChanges(verified_cls)
+    submitted, errors = self.SubmitChanges(verified_cls,
+                                           check_tree_open=check_tree_open,
+                                           throttled_ok=throttled_ok)
     if errors:
       logging.PrintBuildbotStepText(
           'Submitted %d of %d verified CLs.'
diff --git a/cbuildbot/validation_pool_unittest.py b/cbuildbot/validation_pool_unittest.py
index 4a9eadd..92fc90d 100644
--- a/cbuildbot/validation_pool_unittest.py
+++ b/cbuildbot/validation_pool_unittest.py
@@ -139,6 +139,9 @@
     self.PatchObject(gob_util, 'CreateHttpConn',
                      side_effect=AssertionError('Test should not contact GoB'))
     self.PatchObject(gob_util, 'CheckChange')
+    self.PatchObject(tree_status, 'IsTreeOpen', return_value=True)
+    self.PatchObject(tree_status, 'WaitForTreeStatus',
+                     return_value=constants.TREE_OPEN)
     self.PatchObject(tree_status, 'GetExperimentalBuilders',
                      return_value=[])
     self.fake_db = fake_cidb.FakeCIDBConnection()
@@ -728,25 +731,142 @@
     acquire_changes_mock = self.PatchObject(
         validation_pool.ValidationPool, 'AcquireChanges', return_value=True)
     self.PatchObject(time, 'sleep')
+    tree_status_mock = self.PatchObject(
+        tree_status, 'WaitForTreeStatus', return_value=constants.TREE_OPEN)
 
     query = constants.CQ_READY_QUERY
-    validation_pool.ValidationPool.AcquirePool(
+    pool = validation_pool.ValidationPool.AcquirePool(
         constants.PUBLIC_OVERLAYS, repo, 1, 'buildname', 'bb_id', query,
-        dryrun=False, builder_run=builder_run)
+        dryrun=False, check_tree_open=True, builder_run=builder_run)
 
+    self.assertTrue(pool.tree_was_open)
+    tree_status_mock.assert_called()
     acquire_changes_mock.assert_called()
 
-    # 2) Test need to loop at least once to get changes.
+    # 2) Test, tree open -> need to loop at least once to get changes.
     acquire_changes_mock.reset_mock()
     acquire_changes_mock.configure_mock(side_effect=iter([False, True]))
 
     query = constants.CQ_READY_QUERY
-    validation_pool.ValidationPool.AcquirePool(
+    pool = validation_pool.ValidationPool.AcquirePool(
         constants.PUBLIC_OVERLAYS, repo, 1, 'buildname', 'bb_id', query,
-        dryrun=False, builder_run=builder_run)
+        dryrun=False, check_tree_open=True, builder_run=builder_run)
 
+    self.assertTrue(pool.tree_was_open)
     self.assertEqual(acquire_changes_mock.call_count, 2)
 
+    # 3) Test, tree throttled -> use exponential fallback logic.
+    acquire_changes_mock.reset_mock()
+    acquire_changes_mock.configure_mock(return_value=True, side_effect=None)
+    tree_status_mock.configure_mock(return_value=constants.TREE_THROTTLED)
+
+    query = constants.CQ_READY_QUERY
+    pool = validation_pool.ValidationPool.AcquirePool(
+        constants.PUBLIC_OVERLAYS, repo, 1, 'buildname', 'bb_id', query,
+        dryrun=False, check_tree_open=True, builder_run=builder_run)
+
+    self.assertFalse(pool.tree_was_open)
+
+
+  def testGetFailStreak(self):
+    """Tests that we're correctly able to calculate a fail streak."""
+    # Leave first build as inflight.
+    builder_name = 'master-paladin'
+    slave_pool = self.MakePool(builder_name=builder_name, fake_db=self.fake_db)
+    self.fake_db.buildTable[0]['status'] = constants.BUILDER_STATUS_INFLIGHT
+    self.fake_db.buildTable[0]['build_config'] = builder_name
+    self.assertEqual(slave_pool._GetFailStreak(), 0)
+
+    # Create a passing build.
+    for i in range(2):
+      self.fake_db.InsertBuild(
+          builder_name, i, builder_name, 'abcdelicious',
+          status=constants.BUILDER_STATUS_PASSED)
+
+    self.assertEqual(slave_pool._GetFailStreak(), 0)
+
+    # Add a fail streak.
+    for i in range(3, 6):
+      self.fake_db.InsertBuild(
+          builder_name, i, builder_name, 'abcdelicious',
+          status=constants.BUILDER_STATUS_FAILED)
+
+    self.assertEqual(slave_pool._GetFailStreak(), 3)
+
+    # Add another success and failure.
+    self.fake_db.InsertBuild(
+        builder_name, 6, builder_name, 'abcdelicious',
+        status=constants.BUILDER_STATUS_PASSED)
+    self.fake_db.InsertBuild(
+        builder_name, 7, builder_name, 'abcdelicious',
+        status=constants.BUILDER_STATUS_FAILED)
+
+    self.assertEqual(slave_pool._GetFailStreak(), 1)
+
+    # Finally just add one last pass and make sure fail streak is wiped.
+    self.fake_db.InsertBuild(
+        builder_name, 8, builder_name, 'abcdelicious',
+        status=constants.BUILDER_STATUS_PASSED)
+
+    self.assertEqual(slave_pool._GetFailStreak(), 0)
+
+  def testFilterChangesForThrottledTree(self):
+    """Tests that we can correctly apply exponential fallback."""
+    patches = self.GetPatches(4)
+    streak_mock = self.PatchObject(
+        validation_pool.ValidationPool, '_GetFailStreak')
+
+    # Perform test.
+    slave_pool = self.MakePool(candidates=patches, tree_was_open=True)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
+    self.assertEqual(len(slave_pool.candidates), 4)
+    self.assertIsNone(slave_pool.filtered_set)
+
+    #
+    # Test when tree is closed with a streak of 1.
+    #
+
+    # pylint: disable=no-value-for-parameter
+    streak_mock.configure_mock(return_value=1)
+
+    # Perform test.
+    slave_pool = self.MakePool(candidates=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
+    self.assertEqual(len(slave_pool.candidates), 2)
+    self.assertEqual(len(slave_pool.filtered_set), 2)
+
+    #
+    # Test when tree is closed with a streak of 2.
+    #
+
+    streak_mock.configure_mock(return_value=2)
+    # Perform test.
+    slave_pool = self.MakePool(candidates=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
+    self.assertEqual(len(slave_pool.candidates), 1)
+    self.assertEqual(len(slave_pool.filtered_set), 3)
+
+    #
+    # Test when tree is closed with a streak of many.
+    #
+
+    # pylint: disable=no-value-for-parameter
+    streak_mock.configure_mock(return_value=200)
+
+    # Perform test.
+    slave_pool = self.MakePool(candidates=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
+    self.assertEqual(len(slave_pool.candidates), 1)
+    self.assertEqual(len(slave_pool.filtered_set), 3)
+
   def _UpdatedDependencyMap(self, dependency_map):
     pool = self.MakePool()
 
diff --git a/lib/tree_status.py b/lib/tree_status.py
index f507558..82e9eef 100644
--- a/lib/tree_status.py
+++ b/lib/tree_status.py
@@ -115,6 +115,70 @@
     return status_dict.get(TREE_STATUS_STATE)
 
 
+def WaitForTreeStatus(status_url=None, period=1, timeout=1, throttled_ok=False):
+  """Wait for tree status to be open (or throttled, if |throttled_ok|).
+
+  Args:
+    status_url: The status url to check i.e.
+      'https://status.appspot.com/current?format=json'
+    period: How often to poll for status updates.
+    timeout: How long to wait until a tree status is discovered.
+    throttled_ok: is TREE_THROTTLED an acceptable status?
+
+  Returns:
+    The most recent tree status, either constants.TREE_OPEN or
+    constants.TREE_THROTTLED (if |throttled_ok|)
+
+  Raises:
+    timeout_util.TimeoutError if timeout expired before tree reached
+    acceptable status.
+  """
+  if not status_url:
+    status_url = CROS_TREE_STATUS_JSON_URL
+
+  acceptable_states = set([constants.TREE_OPEN])
+  verb = 'open'
+  if throttled_ok:
+    acceptable_states.add(constants.TREE_THROTTLED)
+    verb = 'not be closed'
+
+  timeout = max(timeout, 1)
+
+  def _LogMessage(remaining):
+    logging.info('Waiting for the tree to %s (%s left)...', verb, remaining)
+
+  def _get_status():
+    return _GetStatus(status_url)
+
+  return timeout_util.WaitForReturnValue(
+      acceptable_states, _get_status, timeout=timeout,
+      period=period, side_effect_func=_LogMessage)
+
+
+def IsTreeOpen(status_url=None, period=1, timeout=1, throttled_ok=False):
+  """Wait for tree status to be open (or throttled, if |throttled_ok|).
+
+  Args:
+    status_url: The status url to check i.e.
+      'https://status.appspot.com/current?format=json'
+    period: How often to poll for status updates.
+    timeout: How long to wait until a tree status is discovered.
+    throttled_ok: Does TREE_THROTTLED count as open?
+
+  Returns:
+    True if the tree is open (or throttled, if |throttled_ok|). False if
+    timeout expired before tree reached acceptable status.
+  """
+  if not status_url:
+    status_url = CROS_TREE_STATUS_JSON_URL
+
+  try:
+    WaitForTreeStatus(status_url=status_url, period=period, timeout=timeout,
+                      throttled_ok=throttled_ok)
+  except timeout_util.TimeoutError:
+    return False
+  return True
+
 def GetExperimentalBuilders(status_url=None, timeout=1):
   """Polls |status_url| and returns the list of experimental builders.
 
diff --git a/lib/tree_status_unittest.py b/lib/tree_status_unittest.py
index 6ebbfc9..c31e760 100644
--- a/lib/tree_status_unittest.py
+++ b/lib/tree_status_unittest.py
@@ -83,6 +83,59 @@
     self.PatchObject(urllib, 'urlopen', autospec=True,
                      side_effect=return_value)
 
+  def testTreeIsOpen(self):
+    """Tests that we return True is the tree is open."""
+    self._SetupMockTreeStatusResponses(rejected_status_count=5,
+                                       retries_500=5)
+    self.assertTrue(tree_status.IsTreeOpen(status_url=self.status_url,
+                                           period=0))
+
+  def testTreeIsClosed(self):
+    """Tests that we return false is the tree is closed."""
+    self._SetupMockTreeStatusResponses(output_final_status=False)
+    self.assertFalse(tree_status.IsTreeOpen(status_url=self.status_url,
+                                            period=0.1))
+
+  def testTreeIsThrottled(self):
+    """Tests that we return True if the tree is throttled."""
+    self._SetupMockTreeStatusResponses(
+        final_tree_status='Tree is throttled (flaky bug on flaky builder)',
+        final_general_state=constants.TREE_THROTTLED)
+    self.assertTrue(tree_status.IsTreeOpen(status_url=self.status_url,
+                                           throttled_ok=True))
+
+  def testTreeIsThrottledNotOk(self):
+    """Tests that we respect throttled_ok"""
+    self._SetupMockTreeStatusResponses(
+        rejected_tree_status='Tree is throttled (flaky bug on flaky builder)',
+        rejected_general_state=constants.TREE_THROTTLED,
+        output_final_status=False)
+    self.assertFalse(tree_status.IsTreeOpen(status_url=self.status_url,
+                                            period=0.1))
+
+  def testWaitForStatusOpen(self):
+    """Tests that we can wait for a tree open response."""
+    self._SetupMockTreeStatusResponses()
+    self.assertEqual(tree_status.WaitForTreeStatus(status_url=self.status_url),
+                     constants.TREE_OPEN)
+
+
+  def testWaitForStatusThrottled(self):
+    """Tests that we can wait for a tree open response."""
+    self._SetupMockTreeStatusResponses(
+        final_general_state=constants.TREE_THROTTLED)
+    self.assertEqual(tree_status.WaitForTreeStatus(status_url=self.status_url,
+                                                   throttled_ok=True),
+                     constants.TREE_THROTTLED)
+
+  def testWaitForStatusFailure(self):
+    """Tests that we can wait for a tree open response."""
+    self._SetupMockTreeStatusResponses(output_final_status=False)
+    self.assertRaises(timeout_util.TimeoutError,
+                      tree_status.WaitForTreeStatus,
+                      status_url=self.status_url,
+                      period=0.1)
+
   def testGetStatusDictParsesMessage(self):
     """Tests that _GetStatusDict parses message correctly."""
     self._SetupMockTreeStatusResponses(