Refactor CQ logic around the concept of transactions.

Previous CQ behaviour basically was optimistic, but violated
some of the constraints it aimed for- for example, CQ-DEPEND
cycles weren't strictly enforced (two nodes, if one applied, one
didn't, the applied one would be left in place).

The code is now broken down into two phases; resolution of each
desired transaction, and application of each transaction, preferring
the transaction pulling in the most revs (working it's way down
to the smaller transactions- the notion being that long chains are
more of a pain to land, thus give them preference over one offs that
can quickly be rebased).

For the application phase, if a resolved transaction fails to apply,
the repo is *fully* restored to it's original state.  Effectively,
all or none for the commits in that transaction.  Since we generate
overlapping transactions, this isn't an issue for CQ mode- it'll
pick up the sub-transaction w/in the greater one, trying to apply that
(or skipping the sub-transaction if the greater transaction landed).

In this way, we can ensure that all constraints for a change landing
are properly honored.

W/in the resolution phases, we have two modes of operation; frozen,
and non-frozen.  Frozen is long term CQ behaviour (which is preserved);
only work from the patch pool it knows of.

For non frozen, this allows this code to arbitrary resolve gerrit/paladin
dependencies, querying as necessary.  This is required for
I60e1112c849f149f5c5ae0cc1ca6bd752bab72fd (which is a required step
to enable cherry-picking via Icd646d198721a916fd81effd31df30a70718210f).

Via the transaction support, it becomes possible for a patch to be tried
multiple times via separate transactions; this unfortunately required a
full overhaul of the error reporting for CQ; now, instead of assigning
messages onto the patch object itself (which could be involved in
multiple transactions, differing ways of being blocked from being
submitted), exceptions are thrown, chained together with the causes
of each step accessible.

That change unfortunately means that this patch cannot go through CQ
itself- it implicitly changes the pickle format for the 'conflicting_changes'
argument of ValidationPool, and cannot be done in a compatible fashion.  As
such testToTCompatibility is known to fail w/ this CL (it'll work after the
CL is in ToT), and to actually land this CL, we'll have to chump it in.
Is what it is frankly; there isn't a way around it in this case.

Finally, tests were heavily refactored, resulting in some minor
API shifts (renames for consistancy), and chunking the tests up
into logical groups of tests making it easier for debugging-
one starts with TestPatchSeries, then to TestCoreLogic, then to
user visible tests.  This was done since the ordering mentioned above
is literal- if PatchSeries is broken, everything descending from it
is going to misbehave.

BUG=chromium-os:27415, chromium-os:27256, chromium-os:29546
TEST=cbuildbot --remote x86-generic-paladin run
TEST=run_tests.sh; note that validation_pool_unittest.py fails
     testToTCompatibility; per message above, this is unavoidable.

Change-Id: I4325b5cf8ba0d48b21da9ae9043f95ec917c751f
Reviewed-on: https://gerrit.chromium.org/gerrit/20811
Tested-by: Brian Harring <ferringb@chromium.org>
Reviewed-by: David James <davidjames@chromium.org>
diff --git a/buildbot/patch.py b/buildbot/patch.py
index 8cf6b9a..91f50a8 100644
--- a/buildbot/patch.py
+++ b/buildbot/patch.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Copyright (c) 2011-2012 The Chromium OS Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
@@ -12,37 +12,83 @@
 
 from chromite.lib import cros_build_lib
 
+
 class PatchException(Exception):
-  """Exception thrown by GetGerritPatchInfo."""
+  """Base exception class all patch exception derive from."""
+
+  # Unless instances override it, default all exceptions to ToT.
+  inflight = False
+
+  def __init__(self, patch, message=None):
+    Exception.__init__(self)
+    self.patch = patch
+    self.message = message
+    self.args = (patch,)
+    if message is not None:
+      self.args += (message,)
+
+  def __str__(self):
+    return "Change %s failed: %s" % (self.patch, self.message)
 
 
-class ApplyPatchException(Exception):
+class ApplyPatchException(PatchException):
   """Exception thrown if we fail to apply a patch."""
 
   def __init__(self, patch, inflight=False):
-    super(ApplyPatchException, self).__init__()
-    self.patch = patch
+    PatchException.__init__(self, patch)
     self.inflight = inflight
+    self.args += (inflight,)
 
   def __str__(self):
-    return 'Failed to apply patch %s to %s' % (
+    return 'Change %s no longer cleanly applies against %s.' % (
         self.patch, 'current patch series' if self.inflight else 'ToT')
 
 
-class MalformedChange(Exception):
-  pass
+class DependencyError(PatchException):
+
+  """Exception thrown when a change cannot be applied due to a failure in a
+  dependency."""
+
+  def __init__(self, patch, error):
+    """
+    Args:
+      patch: The GitRepoPatch instance that this exception concerns.
+      error: A PatchException object that can be stringified to describe
+        the error.
+    """
+    PatchException.__init__(self, patch)
+    self.inflight = error.inflight
+    self.error = error
+    self.args += (error,)
+
+  def __str__(self):
+    return "Patch %s depends on %s which has an error: %s" % (
+        self.patch, self.error.patch.id, self.error)
 
 
-class MissingChangeIDException(MalformedChange):
-  """Raised if a patch is missing a Change-ID."""
-
-
-class BrokenCQDepends(MalformedChange):
+class BrokenCQDepends(PatchException):
   """Raised if a patch has a CQ-DEPEND line that is ill formated."""
 
+  def __init__(self, patch, text):
+    PatchException.__init__(self, patch)
+    self.text = text
+    self.args += (text,)
 
-class BrokenChangeID(MalformedChange):
-  """Raised if a patch has an invalid Change-ID."""
+  def __str__(self):
+    return "Change %s has a malformed CQ-DEPEND target: %s" % (
+        self.patch, self.text)
+
+
+class BrokenChangeID(PatchException):
+  """Raised if a patch has an invalid or missing Change-ID."""
+
+  def __init__(self, patch, message):
+    PatchException.__init__(self, patch)
+    self.message = message
+    self.args += (message,)
+
+  def __str__(self):
+    return "Change %s has a broken ChangeId: %s" % (self.patch, self.message)
 
 
 def MakeChangeId(unusable=False):
@@ -322,7 +368,7 @@
     Returns:
       An ordered list of Gerrit revisions that this patch depends on.
     Raises:
-      MissingChangeIDException: If a dependent change is missing its ChangeID.
+      BrokenChangeID: If a dependent change is missing its ChangeID.
     """
     dependencies = []
     logging.info('Checking for Gerrit dependencies for change %s', self)
@@ -383,7 +429,7 @@
       return self.id
     try:
       self.change_id = self.id = self._ParseChangeId(commit_message)
-    except MissingChangeIDException:
+    except BrokenChangeID:
       logging.warning(
           'Change %s, sha1 %s lacks a change-id in its commit '
           'message.  CQ-DEPEND against this rev will not work, nor '
@@ -403,7 +449,7 @@
     git_metadata = re.split('\n{2,}', data.rstrip())[-1]
     change_id_match = self._GIT_CHANGE_ID_RE.findall(git_metadata)
     if not change_id_match:
-      raise MissingChangeIDException('Missing Change-Id in %s', data)
+      raise BrokenChangeID(self, 'Missing Change-Id in %s' % (data,))
 
     # Now, validate it.  This has no real effect on actual gerrit patches,
     # but for local patches the validation is useful for general sanity
@@ -440,7 +486,7 @@
       for chunk in chunks:
         if not chunk.isdigit():
           if not self._VALID_CHANGE_ID_RE.match(chunk):
-            raise BrokenCQDepends(self, match, chunk)
+            raise BrokenCQDepends(self, chunk)
           chunk = FormatChangeId(chunk)
         if chunk not in dependencies:
           dependencies.append(chunk)
@@ -635,7 +681,7 @@
             'Change-Id yielded: %s'
             % (self.change_id, self.sha1, parsed_id))
 
-    except MissingChangeIDException:
+    except BrokenChangeID:
       logging.warning(
           'Change %s, Change-Id %s, sha1 %s lacks a change-id in its commit '
           'message.  This breaks the ability for any dependencies '
@@ -669,10 +715,6 @@
   Args:
     manifest: The manifest object for the checkout in question.
     patches:  A list of user-specified patches, in project[:branch] form.
-
-  Raises:
-    PatchException if:
-      1. The project branch isn't specified and the project isn't on a branch.
   """
   patch_info = []
   for patch in patches:
@@ -686,7 +728,7 @@
     sha1 = result.output.strip()
 
     if not sha1:
-      raise PatchException('No changes found in %s:%s' % (project, branch))
+      cros_build_lib.Die('No changes found in %s:%s' % (project, branch))
 
     patch_info.append(LocalPatch(os.path.join(project_dir, '.git'),
                                         project, branch, tracking_branch,
@@ -718,16 +760,17 @@
     try:
       project, original_branch, ref, tracking_branch, tag = patch.split(':')
     except ValueError:
-      raise PatchException("Unexpected tryjob format.  You may be running an "
-                           "older version of chromite.  Run 'repo sync "
-                           "chromiumos/chromite'.")
+      cros_build_lib.Die(
+          "Unexpected tryjob format.  You may be running an "
+          "older version of chromite.  Run 'repo sync "
+          "chromiumos/chromite'.")
 
     if tag == constants.EXTERNAL_PATCH_TAG:
       push_url = constants.GERRIT_SSH_URL
     elif tag == constants.INTERNAL_PATCH_TAG:
       push_url = constants.GERRIT_INT_SSH_URL
     else:
-      raise PatchException('Bad remote patch format.  Unknown tag %s' % tag)
+      raise ValueError('Bad remote patch format.  Unknown tag %s' % tag)
 
     patch_info.append(UploadedLocalPatch(os.path.join(push_url, project),
                                          project, ref, tracking_branch,
diff --git a/buildbot/patch_unittest.py b/buildbot/patch_unittest.py
index bfdacbc..8a3bd1b 100755
--- a/buildbot/patch_unittest.py
+++ b/buildbot/patch_unittest.py
@@ -13,7 +13,6 @@
 import sys
 import copy
 import shutil
-import tempfile
 import time
 import unittest
 
@@ -261,7 +260,7 @@
     self.assertEqual(patch.GerritDependencies(git1), [cid2, cid1])
 
     patch = self.CommitChangeIdFile(git1, cid3, content='the glass walls.')
-    self.assertRaises(cros_patch.MissingChangeIDException,
+    self.assertRaises(cros_patch.BrokenChangeID,
                       patch.GerritDependencies, git1)
 
   def _CheckPaladin(self, repo, master_id, ids, extra):
@@ -508,7 +507,7 @@
     self.mox.ReplayAll()
 
     self.assertRaises(
-        cros_patch.PatchException,
+        SystemExit,
         cros_patch.PrepareLocalPatches,
         self.manifest, self.patches)
 
diff --git a/buildbot/validation_pool.py b/buildbot/validation_pool.py
index 44d8bf6..1f8b193 100644
--- a/buildbot/validation_pool.py
+++ b/buildbot/validation_pool.py
@@ -8,6 +8,7 @@
 ready for the commit queue to try.
 """
 
+import contextlib
 import json
 import logging
 import sys
@@ -32,16 +33,9 @@
   mox = None
 
 
-def _RunCommand(cmd, dryrun):
-  """Runs the specified shell cmd if dryrun=False."""
-  if dryrun:
-    logging.info('Would have run: %s', ' '.join(cmd))
-  else:
-    cros_build_lib.RunCommand(cmd, error_ok=True)
-
-
 class TreeIsClosedException(Exception):
   """Raised when the tree is closed and we wanted to submit changes."""
+
   def __init__(self):
     super(TreeIsClosedException, self).__init__(
         'TREE IS CLOSED.  PLEASE SET TO OPEN OR THROTTLED TO COMMIT')
@@ -49,32 +43,48 @@
 
 class FailedToSubmitAllChangesException(Exception):
   """Raised if we fail to submit any changes."""
+
   def __init__(self, changes):
     super(FailedToSubmitAllChangesException, self).__init__(
         'FAILED TO SUBMIT ALL CHANGES:  Could not verify that changes %s were '
         'submitted' % ' '.join(str(c) for c in changes))
 
 
+class InternalCQError(cros_patch.PatchException):
+  """Exception thrown when CQ has an unexpected/unhandled error."""
+
+  def __init__(self, patch, message):
+    cros_patch.PatchException.__init__(self, patch, message=message)
+
+  def __str__(self):
+    return "Patch %s failed to apply due to a CQ issue: %s" % (
+        self.patch, self.message)
+
+
 class NoMatchingChangeFoundException(Exception):
   """Raised if we try to apply a non-existent change."""
-  pass
 
 
-def MarkChangeFailedInflight(change):
-  """Set an appropriate error message for an inflight apply failure."""
-  change.apply_error_message = (
-      'Your change conflicted with another change being tested '
-      'in the last validation pool.  Please re-sync, rebase and '
-      're-upload.')
-  return change
+class DependencyNotReadyForCommit(cros_patch.PatchException):
+  """Exception thrown when a required dep isn't satisfied."""
+
+  def __init__(self, patch, unsatisfied_dep):
+    cros_patch.PatchException.__init__(self, patch)
+    self.unsatisfied_dep = unsatisfied_dep
+    self.args += (unsatisfied_dep,)
+
+  def __str__(self):
+    return ("Change %s isn't ready for CQ/commit since its dependency "
+            "%s isn't committed, or marked as Commit-Ready."
+            % (self.patch, self.unsatisfied_dep))
 
 
-def MarkChangeFailedToT(change):
-  """Set an appropriate error message for a ToT apply failure."""
-  change.apply_error_message = (
-      'Your change no longer cleanly applies against ToT.  '
-      'Please re-sync, rebase, and re-upload your change.')
-  return change
+def _RunCommand(cmd, dryrun):
+  """Runs the specified shell cmd if dryrun=False."""
+  if dryrun:
+    logging.info('Would have run: %s', ' '.join(cmd))
+  else:
+    cros_build_lib.RunCommand(cmd, error_ok=True)
 
 
 class HelperPool(object):
@@ -103,6 +113,7 @@
     Returns:
       An appropriately configured HelperPool instance.
     """
+
     external = gerrit_helper.GerritHelper(internal=False) if external else None
     internal = gerrit_helper.GerritHelper(internal=True) if internal else None
     return cls(internal=internal, external=external)
@@ -112,7 +123,14 @@
 
     If no helper is configured, an Exception is raised.
     """
-    if change.internal:
+    return self.GetHelper(change.internal)
+
+  def GetHelper(self, internal=False):
+    """Return the helper to use for internal versus external.
+
+    If no helper is configured, an Exception is raised.
+    """
+    if internal:
       if self._internal:
         return self._internal
     elif self._external:
@@ -122,7 +140,7 @@
         'Asked for an internal=%r helper, but none are allowed in this '
         'configuration.  This strongly points at the possibility of an '
         'internal bug.'
-        % (change.internal,))
+        % (internal,))
 
   def __iter__(self):
     for helper in (self._external, self._internal):
@@ -130,25 +148,63 @@
         yield helper
 
 
-class PatchSeries(object):
+def _PatchWrapException(functor):
+  """Decorator to intercept patch exceptions and wrap them.
 
+  Specifically, for known/handled Exceptions, it intercepts and
+  converts it into a DependencyError- via that, preserving the
+  cause, while casting it into an easier to use form (one that can
+  be chained in addition)."""
+  def f(self, parent, *args, **kwds):
+    try:
+      return functor(self, parent, *args, **kwds)
+    except gerrit_helper.GerritException, e:
+      new_exc = cros_patch.PatchException(parent, e)
+      raise new_exc.__class__, new_exc, sys.exc_info()[2]
+    except cros_patch.PatchException, e:
+      if e.patch.id == parent.id:
+        raise
+      new_exc = cros_patch.DependencyError(parent, e)
+      raise new_exc.__class__, new_exc, sys.exc_info()[2]
+
+  f.__name__ = functor.__name__
+  return f
+
+
+class PatchSeries(object):
   """Class representing a set of patches applied to a repository."""
 
-  def __init__(self, helper_pool=None):
+  def __init__(self, helper_pool=None, force_content_merging=False):
+    self.applied = []
+    self.failed = []
+    self.failed_tot = {}
+    self.force_content_merging = force_content_merging
     self._content_merging = {}
     if helper_pool is None:
       helper_pool = HelperPool.SimpleCreate(internal=True, external=True)
     self._helper_pool = helper_pool
+    # A mapping of ChangeId to exceptions if the patch failed against
+    # ToT.  Primarily used to keep the resolution/applying from going
+    # down known bad paths.
+    self._committed_cache = {}
+    self._lookup_cache = {}
+    self._change_deps_cache = {}
 
   def IsContentMerging(self, change, manifest):
     """Discern if the given change has Content Merging enabled in gerrit.
 
+    Note if the instance was created w/ force_content_merging=True,
+    then this function will lie and always return True to avoid the
+    admin-level access required of <=gerrit-2.1.
+
     Raises:
       AssertionError: If the gerrit helper requested is disallowed.
       GerritException: If there is a failure in querying gerrit.
     Returns:
       True if the change's project has content merging enabled, False if not.
     """
+    if self.force_content_merging:
+      return True
     helper = self._helper_pool.ForChange(change)
 
     if not helper.version.startswith('2.1'):
@@ -163,144 +219,357 @@
 
     return change.project in projects
 
-  def Apply(self, buildroot, changes, dryrun=False):
-    """Applies changes from pool into the directory specified by the buildroot.
+  def _GetGerritPatch(self, change, query):
+    """Query the configured helpers looking for a given change.
 
-    This method applies changes in the order specified.  It also respects
-    dependency order.
+    Args:
+      change: A cros_patch.GitRepoPatch derivative that we're querying
+        on behalf of.
+      query: The ChangeId or Change Number we're searching for.
+    """
+    helper = self._helper_pool.ForChange(change)
+    change = helper.QuerySingleRecord(query, must_match=True)
+    self.InjectLookupCache([change])
+    return change
+
+  @_PatchWrapException
+  def _LookupAndFilterChanges(self, parent, merged, deps, frozen=False):
+    """Given a set of deps (changes), return unsatisfied dependencies.
+
+    Args:
+      parent: The change we're resolving for.
+      merged: A container of changes we should consider as merged already.
+      deps: A sequence of dependencies for the parent that we need to identify
+        as either merged, or needing resolving.
+      frozen: If True, then raise an DependencyNotReady exception if any
+        new dependencies are required by this change that weren't already
+        supplied up front. This is used by the Commit Queue to notify users
+        when a change they have marked as 'Commit Ready' requires a change that
+        has not yet been marked as 'Commit Ready'.
+    Returns:
+      A sequence of cros_patch.GitRepoPatch instances (or derivatives) that
+      need to be resolved for this change to be mergable.
+    """
+    unsatisfied = []
+    for dep in deps:
+      if dep in self._committed_cache:
+        continue
+
+      dep_change = self._lookup_cache.get(dep)
+      if dep_change is not None:
+        if dep_change not in merged and dep_change not in unsatisfied:
+          unsatisfied.append(dep_change)
+        continue
+
+      dep_change = self._GetGerritPatch(parent, dep)
+      if dep_change.IsAlreadyMerged():
+        self.InjectCommittedPatches([dep_change])
+      elif frozen:
+        raise DependencyNotReadyForCommit(parent, dep)
+
+      if dep_change is not None:
+        assert dep == dep_change.id
+
+      unsatisfied.append(dep_change)
+    return unsatisfied
+
+  def CreateTransaction(self, change, buildroot, frozen=False):
+    """Given a change, resolve it into a transaction.
+
+    In this case, a transaction is defined as a group of commits that
+    must land for the given change to be merged- specifically its
+    parent deps, and its CQ-DEPENDS.
+
+    Args:
+      change: A cros_patch.GitRepoPatch instance to generate a transaction
+        for.
+      buildroot: Pathway to the root of a repo checkout to work on.
+      frozen: If True, then resolution is limited purely to what is in
+        the set of allowed changes; essentially, CQ mode.  If False,
+        arbitrary resolution is allowed, pulling changes as necessary
+        to create the transaction.
+    Returns:
+      A sequency of the necessary cros_patch.GitRepoPatch objects for
+      this transaction.
+    """
+    plan, stack = [], []
+    self._ResolveChange(change, buildroot,  plan, stack, frozen=frozen)
+    return plan
+
+  def _ResolveChange(self, change, buildroot, plan, stack, frozen=False):
+    """Helper for resolving a node and its dependencies into the plan.
+
+    No external code should call this; all internal code should invoke this
+    rather than ResolveTransaction since this maintains the necessary stack
+    tracking that is used to detect and handle cyclic dependencies.
+
+    Raises:
+      If the change couldn't be resolved, a DependencyError or
+      cros_patch.PatchException can be raised.
+    """
+    if change.id in self._committed_cache:
+      return
+    if change in stack:
+      # If the requested change is already in the stack, then immediately
+      # return- it's a cycle (requires CQ-DEPEND for it to occur); if
+      # the earlier resolution attempt succeeds, than implicitly this
+      # attempt will.
+      # TODO(ferringb,sosa): this check actually doesn't handle gerrit
+      # change numbers; support for that is broken currently anyways,
+      # but this is one of the spots that needs fixing for that support.
+      return
+    stack.append(change)
+    try:
+      self._PerformResolveChange(buildroot, change, plan,
+                                 stack, frozen=frozen)
+    finally:
+      stack.pop(-1)
+
+  @_PatchWrapException
+  def _GetDepsForChange(self, change, buildroot):
+    """Look up the gerrit/paladin deps for a change
+
+    Raises:
+      DependencyError: Thrown if there is an issue w/ the commits
+        metadata (either couldn't find the parent, or bad CQ-DEPEND).
 
     Returns:
-      A tuple of changes-applied, changes that failed against tot, and
-      changes that failed inflight.
+      A tuple of the change's GerritDependencies(), and PaladinDependencies()
     """
-    # Sets are used for performance reasons where changes_list is used to
-    # maintain ordering when applying changes.
-    changes_that_failed_to_apply_against_other_changes = set()
-    changes_that_failed_to_apply_to_tot = set()
-    changes_applied = set()
-    changes_list = []
+    # TODO(sosa, ferringb): Modify helper logic to allows deps to be specified
+    # across different gerrit instances.
+    val = self._change_deps_cache.get(change)
+    if val is None:
+      val = self._change_deps_cache[change] = (
+          change.GerritDependencies(buildroot),
+          change.PaladinDependencies(buildroot))
+    return val
+
+  def _PerformResolveChange(self, buildroot, change, plan, stack, frozen=False):
+    """Resolve and ultimately add a change into the plan."""
+    # Pull all deps up front, then process them.  Simplifies flow, and
+    # localizes the error closer to the cause.
+    gdeps, pdeps = self._GetDepsForChange(change, buildroot)
+    gdeps = self._LookupAndFilterChanges(change, plan, gdeps, frozen=frozen)
+    pdeps = self._LookupAndFilterChanges(change, plan, pdeps, frozen=frozen)
+
+    def _ProcessDeps(deps):
+      for dep in deps:
+        if dep in plan:
+          continue
+        try:
+          self._ResolveChange(dep, buildroot, plan, stack, frozen=frozen)
+        except cros_patch.PatchException, e:
+          raise cros_patch.DependencyError, \
+                cros_patch.DependencyError(change, e), \
+                sys.exc_info()[2]
+
+    _ProcessDeps(gdeps)
+    plan.append(change)
+    _ProcessDeps(pdeps)
+
+  def InjectCommittedPatches(self, changes):
+    """Record that the given patches are already committed.
+
+    This is primarily useful for external code to notify this object
+    that changes were applied to the tree outside its purview- specifically
+    useful for dependency resolution."""
+    for change in changes:
+      self._committed_cache[change.id] = change
+
+  def InjectLookupCache(self, changes):
+    """Inject into the internal lookup cache the given changes, using them
+    (rather than asking gerrit for them) as needed for dependencies.
+    """
+    for change in changes:
+      self._lookup_cache[change.id] = change
+
+  def Apply(self, buildroot, changes, dryrun=False, frozen=True, manifest=None):
+    """Applies changes from pool into the directory specified by the buildroot.
+
+    This method resolves each given change down into a set of transactions-
+    the change and its dependencies- that must go in, then tries to apply
+    the largest transaction first, working its way down.
+
+    If a transaction cannot be applied, then it is rolled back
+    in full- note that if a change is involved in multiple transactions,
+    if an earlier attempt fails, that change can be retried in a new
+    transaction if the failure wasn't caused by the patch being incompatible
+    to ToT.
+
+    Returns:
+      A tuple of changes-applied, Exceptions for the changes that failed
+      against ToT, and Exceptions that failed inflight;  These exceptions
+      are cros_patch.PatchException instances.
+    """
 
     # Used by content merging checks when we're operating against
     # >=gerrit-2.2.
-    manifest = cros_build_lib.ManifestCheckout.Cached(buildroot)
+    if manifest is None:
+      manifest = cros_build_lib.ManifestCheckout.Cached(buildroot)
 
-    # Maps Change numbers to GerritPatch object for lookup of dependent
-    # changes.
-    change_map = dict((change.id, change) for change in changes)
+    self.InjectLookupCache(changes)
+    resolved, applied, failed = [], [], []
     for change in changes:
-      logging.debug('Trying change %s', change.id)
-      helper = self._helper_pool.ForChange(change)
-      # We've already attempted this change because it was a dependent change
-      # of another change that was ready.
-      if (change in changes_that_failed_to_apply_to_tot or
-          change in changes_applied):
-        continue
-
-      # Change stacks consists of the change plus its dependencies in the order
-      # that they should be applied.
-      change_stack = [change]
-      apply_chain = True
-      deps = []
-
-      dependency_exc = None
-
-      # TODO(sosa): Modify helper logic to allows deps to be specified across
-      # different gerrit instances.
       try:
-        deps.extend(change.GerritDependencies(buildroot))
-        deps.extend(change.PaladinDependencies(buildroot))
-      except cros_patch.MissingChangeIDException as dependency_exc:
-        change.apply_error_message = (
-            'Could not apply change %s because change has a Gerrit Dependency '
-            'that does not contain a ChangeId.  Please remove this dependency '
-            'or update the dependency with a ChangeId.' % change.id)
-      except cros_patch.BrokenCQDepends as dependency_exc:
-        change.apply_error_message = (
-            'Could not apply change %s because change has a malformed '
-            'CQ-DEPEND. CQ-DEPEND must either be the long form Change-ID, '
-            'or the change number.' % change.id)
-      except cros_patch.BrokenChangeID as dependency_exc:
-        change.apply_error_message = (
-            'Could not apply change %s because a parent change has a malformed '
-            'Change-ID.  Please fix and retry.' % change.id)
+        resolved.append((change, self.CreateTransaction(change, buildroot,
+                                                        frozen=frozen)))
+      except cros_patch.PatchException, e:
+        logging.info("Failed creating transaction for %s: %s", change, e)
+        failed.append(e)
 
-      if dependency_exc:
-        logging.error(change.apply_error_message)
-        logging.error(str(dependency_exc))
-        changes_that_failed_to_apply_to_tot.add(change)
-        apply_chain = False
+    if not resolved:
+      # No work to do; either no changes were given to us, or all failed
+      # to be resolved.
+      return [], failed, []
+
+    # Sort by length, falling back to the order the changes were given to us.
+    # This is done to prefer longer transactions (more painful to rebase) over
+    # shorter transactions.
+    position = dict((change, idx) for idx, change in enumerate(changes))
+    def mk_key(data):
+      ids = [x.id for x in data[1]]
+      return -len(ids), position[data[0]]
+    resolved.sort(key=mk_key)
+
+    for inducing_change, transaction_changes in resolved:
+      try:
+        with self._Transaction(buildroot, transaction_changes):
+          logging.debug("Attempting transaction for %s: changes: %s",
+                        inducing_change,
+                        ', '.join(map(str, transaction_changes)))
+          self._ApplyChanges(inducing_change, manifest, transaction_changes,
+                             dryrun=dryrun)
+      except cros_patch.PatchException, e:
+        logging.info("Failed applying transaction for %s: %s",
+                     inducing_change, e)
+        failed.append(e)
+      else:
+        applied.extend(transaction_changes)
+        self.InjectCommittedPatches(transaction_changes)
+
+    # Uniquify while maintaining order.
+    def _uniq(l):
+      s = set()
+      for x in l:
+        if x not in s:
+          yield x
+          s.add(x)
+
+    applied = list(_uniq(applied))
+
+    failed = [x for x in failed if x.patch not in applied]
+    failed_tot = [x for x in failed if not x.inflight]
+    failed_inflight = [x for x in failed if x.inflight]
+    return applied, failed_tot, failed_inflight
+
+  @contextlib.contextmanager
+  def _Transaction(self, buildroot, commits):
+    """ContextManager used to rollback changes to a buildroot if necessary.
+
+    Specifically, if an unhandled non system exception occurs, this context
+    manager will roll back all relevant modifications to the git repos
+    involved.
+
+    Args:
+      buildroot: The manifest checkout we're operating upon, specifically
+        the root of it.
+      commits: A sequence of cros_patch.GitRepoPatch instances that compromise
+        this transaction- this is used to identify exactly what may be changed,
+        thus what needs to be tracked and rolled back if the transaction fails.
+    """
+    # First, the book keeping code; gather required data so we know what
+    # to rollback to should this transaction fail.  Specifically, we track
+    # what was checked out for each involved repo, and if it was a branch,
+    # the sha1 of the branch; that information is enough to rewind us back
+    # to the original repo state.
+    project_state = set(commit.ProjectDir(buildroot) for commit in commits)
+    resets, checkouts = [], []
+    for project_dir in project_state:
+      current_sha1 = cros_build_lib.RunGitCommand(
+          project_dir, ['rev-list', '-n1', 'HEAD']).output.strip()
+      assert current_sha1
+
+      result = cros_build_lib.RunGitCommand(
+          project_dir, ['symbolic-ref', 'HEAD'], error_code_ok=True)
+      if result.returncode == 128: # Detached HEAD.
+        checkouts.append((project_dir, current_sha1))
+      elif result.returncode == 0:
+        checkouts.append((project_dir, result.output.strip()))
+        resets.append((project_dir, current_sha1))
+      else:
+        raise Exception(
+            'Unexpected state from git symbolic-ref HEAD: exit %i\n'
+            'stdout: %s\nstderr: %s'
+            % (result.returncode, result.output, result.error))
+
+    committed_cache = self._committed_cache.copy()
+
+    try:
+      yield
+      # Reaching here means it was applied cleanly, thus return.
+      return
+    except (MemoryError, RuntimeError):
+      # Skip transactional rollback; if these occur, at least via
+      # the scenarios where they're *supposed* to be raised, we really
+      # should let things fail hard here.
+      raise
+    except:
+      # pylint: disable=W0702
+      logging.info("Rewinding transaction: failed changes: %s .",
+                   ', '.join(map(str, commits)))
+      for project_dir, ref in checkouts:
+        cros_build_lib.RunGitCommand(project_dir, ['checkout', ref])
+
+      for project_dir, sha1 in resets:
+        cros_build_lib.RunGitCommand(project_dir, ['reset', '--hard', sha1])
+
+      self._committed_cache = committed_cache
+      raise
+
+  @_PatchWrapException
+  def _ApplyChanges(self, _inducing_change, manifest, changes, dryrun=False):
+    """Apply a given ordered sequence of changes.
+
+    Args:
+      _inducing_change: The core GitRepoPatch instance that lead to this
+        sequence of changes; basically what this transaction was computed from.
+        Needs to be passed in so that the exception wrapping machinery can
+        convert any failures, assigning blame appropriately.
+      manifest: A ManifestCheckout instance representing what we're working on.
+      changes: A ordered sequence of GitRepoPatch instances to apply.
+      dryrun: Whether or not this is considered a production run.
+    """
+    # Bail immediately if we know one of the requisite patches won't apply.
+    for change in changes:
+      failure = self.failed_tot.get(change.id)
+      if failure is not None:
+        raise failure
+
+    applied = []
+    for change in changes:
+      if change.id in self._committed_cache:
         continue
 
-      for dep in deps:
-        dep_change = change_map.get(dep)
-        if not dep_change:
-          # The dep may have been committed already.
-          try:
-            if not helper.IsChangeCommitted(dep, must_match=False):
-              message = ('Could not apply change %s because dependent '
-                         'change %s is not ready to be committed.' % (
-                          change.id, dep))
-              logging.info(message)
-              change.apply_error_message = message
-              changes_that_failed_to_apply_to_tot.add(change)
-              apply_chain = False
-              break
-          except gerrit_helper.QueryNotSpecific:
-            message = ('Change %s could not be handled due to its dependency '
-                       '%s matching multiple branches.' % (change.id, dep))
-            logging.info(message)
-            change.apply_error_message = message
-            changes_that_failed_to_apply_to_tot.add(change)
-            apply_chain = False
-            break
-        else:
-          change_stack.insert(0, dep_change)
-
-      # Should we apply the chain -- i.e. all deps are ready.
-      if not apply_chain:
-        continue
-
-      # Apply changes in change_stack.  For chains that were aborted early,
-      # we still want to apply changes in change_stack because they were
-      # ready to be committed (o/w wouldn't have been in the change_map).
-      for change in change_stack:
-        try:
-          if change in changes_applied:
-            continue
-          elif change in changes_that_failed_to_apply_to_tot:
-            break
-
-          # If we're in dryrun mode, then 3way is always allowed.
-          # Otherwise, allow 3way only if the gerrit project allows it.
-          trivial = False if dryrun else not self.IsContentMerging(change,
-                                                                   manifest)
-
-          change.Apply(buildroot, trivial=trivial)
-
-        except cros_patch.ApplyPatchException as e:
-          if e.inflight:
-            changes_that_failed_to_apply_against_other_changes.add(
-                MarkChangeFailedInflight(change))
-          else:
-            changes_that_failed_to_apply_to_tot.add(
-                MarkChangeFailedToT(change))
-
-        except cros_patch.PatchException, e:
-          changes_that_failed_to_apply_to_tot.add(MarkChangeFailedToT(change))
-        else:
-          # We applied the change successfully.
-          changes_applied.add(change)
-          changes_list.append(change)
-          cros_build_lib.PrintBuildbotLink(str(change), change.url)
-          continue
-        break
+      # If we're in dryrun mode, than force content-merging; else, ask
+      # gerrit (or the underlying git configuration) if content-merging
+      # is allowed for this specific project.
+      if dryrun:
+        force_trivial = False
+      else:
+        force_trivial = not self.IsContentMerging(change, manifest)
+      try:
+        change.Apply(manifest.root, trivial=force_trivial)
+      except cros_patch.PatchException, e:
+        if not e.inflight:
+          self.failed_tot[change.id] = e
+        raise
+      applied.append(change)
+      if hasattr(change, 'url'):
+        cros_build_lib.PrintBuildbotLink(str(change), change.url)
 
     logging.debug('Done investigating changes.  Applied %s',
-                  ' '.join([c.id for c in changes_list]))
-
-    return (changes_list,
-            changes_that_failed_to_apply_to_tot,
-            changes_that_failed_to_apply_against_other_changes)
+                  ' '.join([c.id for c in applied]))
 
 
 class ValidationPool(object):
@@ -359,15 +628,21 @@
       raise ValueError("Invalid builder_name: %r" % (builder_name,))
 
     for changes_name, changes_value in (
-        ('changes', changes), ('non_os_changes', non_os_changes),
-        ('conflicting_changes', conflicting_changes)):
-      if changes_value is None:
+        ('changes', changes), ('non_os_changes', non_os_changes)):
+      if not changes_value:
         continue
       if not all(isinstance(x, cros_patch.GitRepoPatch) for x in changes_value):
         raise ValueError(
             'Invalid %s: all elements must be a GitRepoPatch derivative, got %r'
             % (changes_name, changes_value))
 
+    if conflicting_changes and not all(
+        isinstance(x, cros_patch.PatchException)
+        for x in conflicting_changes):
+      raise ValueError(
+          'Invalid conflicting_changes: all elements must be a '
+          'cros_patch.PatchException derivative, got %r'
+          % (conflicting_changes,))
 
     build_dashboard = self.GetBuildDashboardForOverlays(overlays)
 
@@ -380,6 +655,10 @@
     # See optional args for types of changes.
     self.changes = changes or []
     self.non_manifest_changes = non_os_changes or []
+    # Note, we hold onto these CLs since they conflict against our current CLs
+    # being tested; if our current ones succeed, we notify the user to deal
+    # w/ the conflict.  If the CLs we're testing fail, then there is no
+    # reason we can't try these again in the next run.
     self.changes_that_failed_to_apply_earlier = conflicting_changes or []
 
     # Private vars only used for pickling.
@@ -600,7 +879,7 @@
 
     return changes_in_manifest, changes_not_in_manifest
 
-  def ApplyPoolIntoRepo(self, buildroot):
+  def ApplyPoolIntoRepo(self, buildroot, manifest=None):
     """Applies changes from pool into the directory specified by the buildroot.
 
     This method applies changes in the order specified.  It also respects
@@ -611,7 +890,7 @@
     """
     try:
       applied, failed_tot, failed_inflight = self._patch_series.Apply(
-          buildroot, self.changes, self.dryrun)
+          buildroot, self.changes, self.dryrun, manifest=manifest)
     except (KeyboardInterrupt, RuntimeError, SystemExit):
       raise
     except Exception, e:
@@ -621,14 +900,17 @@
       # Stash a copy of the tb guts, since the next set of steps can
       # wipe it.
       exc = sys.exc_info()
-      cros_build_lib.Error(
-          "Unhandled Exception occured during CQ's Apply: %s\n"
+      msg = (
+          "Unhandled Exception occurred during CQ's Apply: %s\n"
           "Failing the entire series to prevent CQ from going into an "
-          "infinite loop hanging on these CLs.\n"
-          "Affected patches: %s"
-          % (e, ' '.join(x.change_id for x in self.changes)))
+          "infinite loop hanging on these CLs." % (e,))
+      cros_build_lib.Error(
+          "%s\nAffected Patches are: %s", msg,
+          ', '.join(x.change_id for x in self.changes))
       try:
-        self.HandleApplicationFailure(self.changes)
+        self._HandleApplyFailure(
+            [InternalCQError(patch, msg) for patch in self.changes])
+      # pylint: disable=W0703
       except Exception, e:
         if mox is None or not isinstance(e, mox.Error):
           # If it's not a mox error, let it fly.
@@ -637,12 +919,21 @@
 
     if self.is_master:
       for change in applied:
-        self.HandleApplied(change)
+        self._HandleApplySuccess(change)
 
     if failed_tot:
-      logging.info('Changes %s could not be applied cleanly.',
-                  ' '.join([c.id for c in failed_tot]))
-      self.HandleApplicationFailure(failed_tot)
+      logging.info(
+          'The following changes could not cleanly be applied to ToT: %s',
+          ' '.join([c.patch.id for c in failed_tot]))
+      self._HandleApplyFailure(failed_tot)
+
+    if failed_inflight:
+      logging.info(
+          'The following changes could not cleanly be applied against the '
+          'current stack of patches; if this stack fails, they will be tried '
+          'in the next run.  Inflight failed changes: %s',
+          ' '.join([c.patch.id for c in failed_inflight]))
+
     self.changes_that_failed_to_apply_earlier.extend(failed_inflight)
     self.changes = applied
 
@@ -667,29 +958,28 @@
     # We use the default timeout here as while we want some robustness against
     # the tree status being red i.e. flakiness, we don't want to wait too long
     # as validation can become stale.
-    if self.dryrun or ValidationPool._IsTreeOpen():
-      for change in changes:
-        was_change_submitted = False
-        logging.info('Change %s will be submitted', change)
-        try:
-          self.SubmitChange(change)
-          was_change_submitted = self._helper_pool.ForChange(
-              change).IsChangeCommitted(str(change.gerrit_number), self.dryrun)
-        except cros_build_lib.RunCommandError:
-          logging.error('gerrit review --submit failed for change.')
-        finally:
-          if not was_change_submitted:
-            logging.error('Could not submit %s', str(change))
-            self.HandleCouldNotSubmit(change)
-            changes_that_failed_to_submit.append(change)
-
-      if changes_that_failed_to_submit:
-        raise FailedToSubmitAllChangesException(changes_that_failed_to_submit)
-
-    else:
+    if not self.dryrun and not self._IsTreeOpen():
       raise TreeIsClosedException()
 
-  def SubmitChange(self, change):
+    for change in changes:
+      was_change_submitted = False
+      logging.info('Change %s will be submitted', change)
+      try:
+        self._SubmitChange(change)
+        was_change_submitted = self._helper_pool.ForChange(
+            change).IsChangeCommitted(str(change.gerrit_number), self.dryrun)
+      except cros_build_lib.RunCommandError:
+        logging.error('gerrit review --submit failed for change.')
+      finally:
+        if not was_change_submitted:
+          logging.error('Could not submit %s', str(change))
+          self._HandleCouldNotSubmit(change)
+          changes_that_failed_to_submit.append(change)
+
+    if changes_that_failed_to_submit:
+      raise FailedToSubmitAllChangesException(changes_that_failed_to_submit)
+
+  def _SubmitChange(self, change):
     """Submits patch using Gerrit Review."""
     cmd = self._helper_pool.ForChange(change).GetGerritReviewCommand(
         ['--submit', '%s,%s' % (change.gerrit_number, change.patch_number)])
@@ -712,27 +1002,49 @@
       TreeIsClosedException: if the tree is closed.
       FailedToSubmitAllChangesException: if we can't submit a change.
     """
+    # Note that _SubmitChanges can throw an exception if it can't
+    # submit all changes; in that particular case, don't mark the inflight
+    # failures patches as failed in gerrit- some may apply next time we do
+    # a CQ run (since the# submit state has changed, we have no way of
+    # knowing).  They *likely* will still fail, but this approach tries
+    # to minimize wasting the developers time.
     self._SubmitChanges(self.changes)
     if self.changes_that_failed_to_apply_earlier:
-      self.HandleApplicationFailure(self.changes_that_failed_to_apply_earlier)
+      self._HandleApplyFailure(self.changes_that_failed_to_apply_earlier)
 
-  def HandleApplicationFailure(self, changes):
+  def _HandleApplyFailure(self, failures):
     """Handles changes that were not able to be applied cleanly.
 
     Args:
       changes: GerritPatch's to handle.
     """
-    for change in changes:
-      logging.info('Change %s did not apply cleanly.', change.change_id)
+    for failure in failures:
+      logging.info('Change %s did not apply cleanly.', failure.patch)
       if self.is_master:
-        self.HandleCouldNotApply(change)
+        self._HandleCouldNotApply(failure)
+
+  def _HandleCouldNotApply(self, failure):
+    """Handler for when Paladin fails to apply a change.
+
+    This handler notifies set CodeReview-2 to the review forcing the developer
+    to re-upload a rebased change.
+
+    Args:
+      change: GerritPatch instance to operate upon.
+    """
+    msg = 'The Commit Queue failed to apply your change in %(build_log)s .'
+    msg += '  %(failure)s'
+    self._SendNotification(failure.patch, msg, failure=failure)
+    self._helper_pool.ForChange(failure.patch).RemoveCommitReady(
+        failure.patch, dryrun=self.dryrun)
 
   def HandleValidationFailure(self, failed_stage=None, exception=None):
     """Handles failed changes by removing them from next Validation Pools."""
     logging.info('Validation failed for all changes.')
     for change in self.changes:
       logging.info('Validation failed for change %s.', change)
-      self.HandleCouldNotVerify(change, failed_stage, exception)
+      self._HandleCouldNotVerify(change, failed_stage=failed_stage,
+                                 exception=exception)
 
   def HandleValidationTimeout(self):
     """Handles changes that timed out."""
@@ -749,11 +1061,20 @@
           change, dryrun=self.dryrun)
 
   def _SendNotification(self, change, msg, **kwargs):
-    msg %= dict(build_log=self.build_log, **kwargs)
+    d = dict(build_log=self.build_log, **kwargs)
+    try:
+      msg %= d
+    except (TypeError, ValueError), e:
+      logging.error(
+          "Generation of message %s for change %s failed: dict was %r, "
+          "exception %s", msg, change, d, e)
+      raise e.__class__(
+          "Generation of message %s for change %s failed: dict was %r, "
+          "exception %s" % (msg, change, d, e))
     PaladinMessage(msg, change, self._helper_pool.ForChange(change)).Send(
         self.dryrun)
 
-  def HandleCouldNotSubmit(self, change):
+  def _HandleCouldNotSubmit(self, change):
     """Handler that is called when Paladin can't submit a change.
 
     This should be rare, but if an admin overrides the commit queue and commits
@@ -770,7 +1091,7 @@
     self._helper_pool.ForChange(change).RemoveCommitReady(
         change, dryrun=self.dryrun)
 
-  def HandleCouldNotVerify(self, change, failed_stage=None, exception=None):
+  def _HandleCouldNotVerify(self, change, failed_stage=None, exception=None):
     """Handler for when Paladin fails to validate a change.
 
     This handler notifies set Verified-1 to the review forcing the developer
@@ -783,49 +1104,19 @@
       exception: The exception object thrown by the first failure.
     """
     if failed_stage and exception:
-      detail = (
-        'Oops! The %s stage failed: '
-        '%s' % (failed_stage, exception)
-      )
+      detail = 'Oops!  The %s stage failed: %s' % (failed_stage, exception)
     else:
-      detail = 'Oops! The Commit Queue failed to verify your change.'
+      detail = 'Oops!  The commit queue failed to verify your change.'
 
-    self._SendNotification(change,
+    self._SendNotification(
+        change,
         '%(detail)s\n\nPlease check whether the failure is your fault: '
         '%(build_log)s . If your change is not at fault, you may mark it as '
-        'ready again.', detail=detail
-    )
+        'ready again.', detail=detail)
     self._helper_pool.ForChange(change).RemoveCommitReady(
         change, dryrun=self.dryrun)
 
-  def HandleCouldNotApply(self, change):
-    """Handler for when Paladin fails to apply a change.
-
-    This handler notifies set CodeReview-2 to the review forcing the developer
-    to re-upload a rebased change.
-
-    Args:
-      change: GerritPatch instance to operate upon.
-    """
-    msg = 'The Commit Queue failed to apply your change in %(build_log)s . '
-    # This is written this way to protect against bugs in CQ itself.  We log
-    # it both to the build output, and mark the change w/ it.
-    extra_msg = getattr(change, 'apply_error_message', None)
-    if extra_msg is None:
-      logging.error(
-          'Change %s was passed to HandleCouldNotApply without an appropriate '
-          'apply_error_message set.  Internal bug.', change)
-      extra_msg = (
-          'Internal CQ issue: extra error info was not given,  Please contact '
-          'the build team and ensure they are aware of this specific change '
-          'failing.')
-
-    msg += extra_msg
-    self._SendNotification(change, msg)
-    self._helper_pool.ForChange(change).RemoveCommitReady(
-        change, dryrun=self.dryrun)
-
-  def HandleApplied(self, change):
+  def _HandleApplySuccess(self, change):
     """Handler for when Paladin successfully applies a change.
 
     This handler notifies a developer that their change is being tried as
diff --git a/buildbot/validation_pool_unittest.py b/buildbot/validation_pool_unittest.py
index bff991c..5894185 100755
--- a/buildbot/validation_pool_unittest.py
+++ b/buildbot/validation_pool_unittest.py
@@ -1,11 +1,15 @@
 #!/usr/bin/python
 
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Copyright (c) 2011-2012 The Chromium OS Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
 """Module that contains unittests for validation_pool module."""
 
+import contextlib
+import copy
+import functools
+import itertools
 import mox
 import os
 import pickle
@@ -13,8 +17,6 @@
 import time
 import unittest
 import urllib
-import copy
-import itertools
 
 import constants
 sys.path.insert(0, constants.SOURCE_ROOT)
@@ -27,9 +29,12 @@
 from chromite.lib import cros_build_lib
 from chromite.lib import cros_test_lib
 
-_CountingSource = itertools.count()
+_GetNumber = iter(itertools.count()).next
 
-# pylint: disable=E1120
+class MockPatch(mox.MockObject):
+
+  def __eq__(self, other):
+    return self.id == getattr(other, 'id')
 
 def GetTestJson(change_id=None):
   """Get usable fake Gerrit patch json data
@@ -47,6 +52,9 @@
 class TestValidationPool(mox.MoxTestBase):
   """Tests methods in validation_pool.ValidationPool."""
 
+
+class base_mixin(object):
+
   def setUp(self):
     mox.MoxTestBase.setUp(self)
     self.mox.StubOutWithMock(validation_pool, '_RunCommand')
@@ -55,115 +63,184 @@
     # Supress all gerrit access; having this occur is generally a sign
     # the code is either misbehaving, or the tests are bad.
     self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'Query')
+    self._patch_counter = (itertools.count(1)).next
+    self.build_root = 'fakebuildroot'
 
-  def MockPatch(self, change_id, patch_number=None):
-    patch = self.mox.CreateMock(cros_patch.GerritPatch)
+  def MockPatch(self, change_id=None, patch_number=None, is_merged=False,
+                project='chromiumos/chromite'):
+    # pylint: disable=W0201
+    # We have to use a custom mock class to fix some brain behaviour of
+    # pymox where multiple separate mocks can easily equal each other
+    # (or not; the behaviour varies depending on stubs used).
+    patch = MockPatch(cros_patch.GerritPatch)
+    self.mox._mock_objects.append(patch)
 
     patch.internal = False
+    if change_id is None:
+      change_id = self._patch_counter()
     patch.change_id = patch.id = 'ChangeId%i' % (change_id,)
     patch.gerrit_number = change_id
     patch.patch_number = (patch_number if patch_number is not None else
-                          _CountingSource.next())
+                          _GetNumber())
     patch.url = 'fake_url/%i' % (change_id,)
     patch.apply_error_message = None
-    patch.project = 'chromiumos/chromite'
+    patch.project = project
+    patch.sha1 = 'sha1-%s' % (patch.change_id,)
+    patch.IsAlreadyMerged = lambda:is_merged
     return patch
 
-  def GetPool(self, *args, **kwds):
-    kwds.setdefault('helper_pool', validation_pool.HelperPool.SimpleCreate())
-    pool = validation_pool.ValidationPool(*args, **kwds)
-    self.mox.StubOutWithMock(pool, '_SendNotification')
-    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, '_SqlQuery')
+  def GetPatches(self, how_many=1):
+    l = [self.MockPatch() for _ in xrange(how_many)]
+    if how_many == 1:
+      return l[0]
+    return l
+
+  def MakeHelper(self, internal=None, external=None):
+    if internal:
+      internal = self.mox.CreateMock(gerrit_helper.GerritHelper)
+      internal.version = '2.1'
+      internal.internal = True
+    if external:
+      external = self.mox.CreateMock(gerrit_helper.GerritHelper)
+      external.internal = False
+      external.version = '2.1'
+    return validation_pool.HelperPool(internal=internal, external=external)
+
+
+# pylint: disable=W0212,R0904
+class TestPatchSeries(base_mixin, mox.MoxTestBase):
+  """Tests the core resolution and applying logic of
+  validation_pool.ValidationPool."""
+
+  def setUp(self):
+    base_mixin.setUp(self)
+    # All tests should set their content merging projects via
+    # SetContentMergingProjects since FindContentMergingProjects
+    # requires admin rights in gerrit.
     self.mox.StubOutWithMock(gerrit_helper.GerritHelper,
                              'FindContentMergingProjects')
-    return pool
 
   @staticmethod
-  def SetPoolsContentMergingProjects(pool, *projects):
-    gerrit_helper.GerritHelper.FindContentMergingProjects().AndReturn(
-        frozenset(projects))
+  def SetContentMergingProjects(series, projects=(), internal=False):
+    helper = series._helper_pool.GetHelper(internal)
+    series._content_merging[helper] = frozenset(projects)
 
-  def testSimpleDepApplyPoolIntoRepo(self):
+  @contextlib.contextmanager
+  def _ValidateTransactionCall(self, build_root, _changes):
+    self.assertEqual(build_root, self.build_root)
+    yield
+
+  def GetPatchSeries(self, helper_pool=None, force_content_merging=False):
+    if helper_pool is None:
+      helper_pool = self.MakeHelper(internal=True, external=True)
+    series = validation_pool.PatchSeries(helper_pool, force_content_merging)
+
+    # Suppress transactions.
+    series._Transaction = self._ValidateTransactionCall
+
+    return series
+
+  def assertPath(self, _patch, return_value, path):
+    self.assertEqual(path, self.build_root)
+    if isinstance(return_value, Exception):
+      raise return_value
+    return return_value
+
+  def SetPatchDeps(self, patch, parents=(), cq=()):
+    patch.GerritDependencies = functools.partial(
+        self.assertPath, patch, parents)
+    patch.PaladinDependencies = functools.partial(
+        self.assertPath, patch, cq)
+    patch.Fetch = functools.partial(
+        self.assertPath, patch, patch.sha1)
+
+  def assertResults(self, series, changes, applied=(), failed_tot=(),
+                    failed_inflight=(), frozen=True, dryrun=False):
+    # Convenience; set the content pool as necessary.
+    for internal in set(x.internal for x in changes):
+      helper = series._helper_pool.GetHelper(internal)
+      series._content_merging.setdefault(helper, frozenset())
+
+    manifest = self.mox.CreateMock(cros_build_lib.ManifestCheckout)
+    manifest.root = self.build_root
+    result = series.Apply(manifest.root, changes, dryrun=dryrun,
+                          frozen=frozen, manifest=manifest)
+
+    _GetIds = lambda seq:[x.id for x in seq]
+    _GetFailedIds = lambda seq:_GetIds(x.patch for x in seq)
+
+    applied_result = _GetIds(result[0])
+    failed_tot_result, failed_inflight_result = map(_GetFailedIds, result[1:])
+
+    applied = _GetIds(applied)
+    failed_tot = _GetIds(failed_tot)
+    failed_inflight = _GetIds(failed_inflight)
+
+    self.assertEqual(
+        [applied, failed_tot, failed_inflight],
+        [applied_result, failed_tot_result, failed_inflight_result])
+
+  def testApplyWithDeps(self):
     """Test that we can apply changes correctly and respect deps.
 
     This tests a simple out-of-order change where change1 depends on change2
     but tries to get applied before change2.  What should happen is that
     we should notice change2 is a dep of change1 and apply it first.
     """
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
+    series = self.GetPatchSeries()
 
-    build_root = 'fakebuildroot'
+    patch1, patch2 = patches = self.GetPatches(2)
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch1, patch2]
-    self.SetPoolsContentMergingProjects(pool)
+    self.SetPatchDeps(patch2)
+    self.SetPatchDeps(patch1, [patch2.id])
 
-    patch1.GerritDependencies(build_root).AndReturn(['ChangeId2'])
-    patch1.PaladinDependencies(build_root).AndReturn([])
-
-    patch2.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch2)
-    patch1.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch1)
+    patch2.Apply(self.build_root, trivial=True)
+    patch1.Apply(self.build_root, trivial=True)
 
     self.mox.ReplayAll()
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
+    self.assertResults(series, patches, [patch2, patch1])
     self.mox.VerifyAll()
 
-  def testSimpleNoApplyPoolIntoRepo(self):
+  @staticmethod
+  def _SetQuery(series, change):
+    helper = series._helper_pool.GetHelper(change.internal)
+    return helper.QuerySingleRecord(change.id, must_match=True)
+
+  def testApplyMissingDep(self):
     """Test that we don't try to apply a change without met dependencies.
 
     Patch2 is in the validation pool that depends on Patch1 (which is not)
     Nothing should get applied.
     """
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    patch2.project = 'fake_project'
-    build_root = 'fakebuildroot'
+    series = self.GetPatchSeries()
 
-    helper = self.mox.CreateMock(gerrit_helper.GerritHelper)
-    helper_pool = validation_pool.HelperPool(external=helper)
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name',
-                        True, False, helper_pool=helper_pool)
-    pool.changes = [patch2]
+    patch1, patch2 = self.GetPatches(2)
 
-    patch2.GerritDependencies(build_root).AndReturn(['ChangeId1'])
-    patch2.PaladinDependencies(build_root).AndReturn([])
-    helper.IsChangeCommitted(patch1.id, must_match=False).AndReturn(False)
-    pool._SendNotification(patch2, mox.StrContains('dependent change'))
-    helper.RemoveCommitReady(patch2, dryrun=False)
+    self.SetPatchDeps(patch2, [patch1.id])
+    self._SetQuery(series, patch1).AndReturn(patch1)
 
     self.mox.ReplayAll()
-    self.assertFalse(pool.ApplyPoolIntoRepo(build_root))
+    self.assertResults(series, [patch2],
+                       [], [patch2])
     self.mox.VerifyAll()
 
-  def testSimpleDepApplyWhenAlreadySubmitted(self):
+  def testApplyWithCommittedDeps(self):
     """Test that we apply a change with dependency already committed."""
-    patch1 = self.MockPatch(1)
+    series = self.GetPatchSeries()
+
+    patch1 = self.MockPatch(1, is_merged=True)
     patch2 = self.MockPatch(2)
-    patch2.project = 'fake_project'
-    build_root = 'fakebuildroot'
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch2]
-    patch2.project = '3way-project'
-    self.SetPoolsContentMergingProjects(pool, '3way-project')
+    self.SetPatchDeps(patch2, [patch1.id])
+    self._SetQuery(series, patch1).AndReturn(patch1)
 
-    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'IsChangeCommitted')
-    gerrit_helper.GerritHelper.IsChangeCommitted(
-        patch1.id, must_match=False).AndReturn(True)
-
-    patch2.GerritDependencies(build_root).AndReturn(['ChangeId1'])
-    patch2.PaladinDependencies(build_root).AndReturn([])
-    patch2.Apply(build_root, trivial=False)
-    pool.HandleApplied(patch2)
+    patch2.Apply(self.build_root, trivial=True)
 
     self.mox.ReplayAll()
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
+    self.assertResults(series, [patch2], [patch2])
     self.mox.VerifyAll()
 
-  def testSimpleDepFailedApplyPoolIntoRepo(self):
+  def testApplyPartialFailures(self):
     """Test that can apply changes correctly when one change fails to apply.
 
     This tests a simple change order where 1 depends on 2 and 1 fails to apply.
@@ -173,87 +250,44 @@
     Since patch1 fails to apply, we should also get a call to handle the
     failure.
     """
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    patch3 = self.MockPatch(3)
-    patch4 = self.MockPatch(4)
-    patch5 = self.MockPatch(5)
-    build_root = 'fakebuildroot'
+    series = self.GetPatchSeries()
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch1, patch2, patch3, patch4, patch5]
-    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'RemoveCommitReady')
-    self.SetPoolsContentMergingProjects(pool)
-    pool.build_log = 'log'
+    patch1, patch2, patch3, patch4 = patches = self.GetPatches(4)
 
-    patch1.GerritDependencies(build_root).AndReturn([])
-    patch1.PaladinDependencies(build_root).AndReturn([])
-    patch1.Apply(build_root, trivial=True).AndRaise(
+    self.SetPatchDeps(patch1)
+    self.SetPatchDeps(patch2, [patch1.id])
+    self.SetPatchDeps(patch3)
+    self.SetPatchDeps(patch4)
+
+    patch1.Apply(self.build_root, trivial=True).AndRaise(
         cros_patch.ApplyPatchException(patch1))
 
-    patch2.GerritDependencies(build_root).AndReturn(['ChangeId1'])
-    patch2.PaladinDependencies(build_root).AndReturn([])
-    patch3.GerritDependencies(build_root).AndReturn([])
-    patch3.PaladinDependencies(build_root).AndReturn([])
-    patch3.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch3)
-
-    # This one should be handled later (not where patch1 is handled.
-    patch4.GerritDependencies(build_root).AndReturn([])
-    patch4.PaladinDependencies(build_root).AndReturn([])
-    patch4.Apply(build_root, trivial=True).AndRaise(
+    patch3.Apply(self.build_root, trivial=True)
+    patch4.Apply(self.build_root, trivial=True).AndRaise(
         cros_patch.ApplyPatchException(patch1, inflight=True))
 
-    patch5.GerritDependencies(build_root).AndReturn([])
-    patch5.PaladinDependencies(build_root).AndReturn([])
-    patch5.Apply(build_root, trivial=True).AndRaise(
-        cros_patch.PatchException())
-
-    validation_pool.MarkChangeFailedToT(patch1)
-    pool.HandleCouldNotApply(patch1)
-    validation_pool.MarkChangeFailedToT(patch5)
-    pool.HandleCouldNotApply(patch5)
-
     self.mox.ReplayAll()
-    # TODO(ferringb): remove the need for this.
-    # Reset before re-running so the error messages don't persist; they're
-    # currently stored on the instances themselves, although that'll be
-    # rectified soon enough
-    for patch in pool.changes:
-      patch.apply_error_message = None
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
-    self.assertTrue(patch4 in pool.changes_that_failed_to_apply_earlier)
+    self.assertResults(series, patches,
+                       [patch3], [patch2, patch1], [patch4])
     self.mox.VerifyAll()
 
-  def testSimpleApplyButMissingChangeIDIntoRepo(self):
+  def testApplyMissingChangeId(self):
     """Test that applies changes correctly with a dep with missing changeid."""
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    build_root = 'fakebuildroot'
+    series = self.GetPatchSeries()
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    self.mox.StubOutWithMock(pool, 'HandleCouldNotApply')
+    patch1, patch2 = patches = self.GetPatches(2)
 
-    pool.changes = [patch1, patch2]
-    pool.build_log = 'log'
-    self.SetPoolsContentMergingProjects(pool)
+    patch1.GerritDependencies(self.build_root).AndRaise(
+        cros_patch.BrokenChangeID(patch1, 'Could not find changeid'))
+    self.SetPatchDeps(patch2)
 
-    patch1.GerritDependencies(build_root).AndRaise(
-        cros_patch.MissingChangeIDException('Could not find changeid'))
-
-    patch2.GerritDependencies(build_root).AndReturn([])
-    patch2.PaladinDependencies(build_root).AndReturn([])
-    patch2.Apply(build_root, trivial=True)
-
-    pool.HandleApplied(patch2)
-    pool.HandleCouldNotApply(patch1)
+    patch2.Apply(self.build_root, trivial=True)
 
     self.mox.ReplayAll()
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
-    self.assertEqual([patch2.id], [x.id for x in pool.changes])
+    self.assertResults(series, patches, [patch2], [patch1], [])
     self.mox.VerifyAll()
 
-  def testMoreComplexDepApplyPoolIntoRepo(self):
+  def testComplexApply(self):
     """More complex deps test.
 
     This tests a total of 2 change chains where the first change we see
@@ -264,128 +298,227 @@
     This test also checks the patch order to verify that Apply re-orders
     correctly based on the chain.
     """
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    patch3 = self.MockPatch(3)
-    patch4 = self.MockPatch(4)
-    patch5 = self.MockPatch(5)
+    series = self.GetPatchSeries()
 
-    build_root = 'fakebuildroot'
+    patch1, patch2, patch3, patch4, patch5 = patches = self.GetPatches(5)
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch1, patch2, patch3, patch4, patch5]
+    self.SetPatchDeps(patch1, [patch2.id])
+    self.SetPatchDeps(patch2)
+    self.SetPatchDeps(patch3, [patch1.id, patch2.id])
+    self.SetPatchDeps(patch4, cq=[patch5.id])
+    self.SetPatchDeps(patch5)
 
-    self.SetPoolsContentMergingProjects(pool)
-
-    patch1.GerritDependencies(build_root).AndReturn(['ChangeId2'])
-    patch1.PaladinDependencies(build_root).AndReturn([])
-    patch3.GerritDependencies(build_root).AndReturn(['ChangeId1', 'ChangeId2'])
-    patch3.PaladinDependencies(build_root).AndReturn([])
-    patch4.GerritDependencies(build_root).AndReturn([])
-    patch4.PaladinDependencies(build_root).AndReturn(['ChangeId5'])
-
-    patch2.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch2)
-    patch1.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch1)
-    patch3.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch3)
-    patch5.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch5)
-    patch4.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch4)
+    for patch in (patch2, patch1, patch3, patch4, patch5):
+      patch.Apply(self.build_root, trivial=True)
 
     self.mox.ReplayAll()
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
-    # Check order.
-    self.assertEquals([x.id for x in pool.changes],
-                      [y.id for y in [patch2, patch1, patch3, patch5, patch4]])
+    self.assertResults(
+        series, patches, [patch2, patch1, patch3, patch4, patch5])
     self.mox.VerifyAll()
 
-  def testNoDepsApplyPoolIntoRepo(self):
+  def testApplyStandalonePatches(self):
     """Simple apply of two changes with no dependent CL's."""
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    build_root = 'fakebuildroot'
+    series = self.GetPatchSeries()
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch1, patch2]
-    self.SetPoolsContentMergingProjects(pool)
+    patches = self.GetPatches(3)
 
-    patch1.GerritDependencies(build_root).AndReturn([])
-    patch1.PaladinDependencies(build_root).AndReturn([])
-    patch2.GerritDependencies(build_root).AndReturn([])
-    patch2.PaladinDependencies(build_root).AndReturn([])
+    for patch in patches:
+      self.SetPatchDeps(patch)
 
-    patch1.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch1)
-
-    patch2.Apply(build_root, trivial=True)
-    pool.HandleApplied(patch2)
+    for patch in patches:
+      patch.Apply(self.build_root, trivial=True)
 
     self.mox.ReplayAll()
-    self.assertTrue(pool.ApplyPoolIntoRepo(build_root))
+    self.assertResults(series, patches, patches)
     self.mox.VerifyAll()
 
-  def testSubmitPoolWithSomeFailures(self):
-    """Tests submitting a pool when some changes fail to be submitted.
 
-    Tests what happens when we try to submit 3 patches with 2 patches failing
-    to submit correctly (one with submit failure and the other not showing up
-    as submitted in Gerrit.
+# pylint: disable=W0212,R0904
+class TestCoreLogic(base_mixin, mox.MoxTestBase):
+  """Tests the core resolution and applying logic of
+  validation_pool.ValidationPool."""
+
+  def setUp(self):
+    base_mixin.setUp(self)
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, '_SqlQuery')
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper,
+                             'FindContentMergingProjects')
+
+  def MakePool(self, overlays=constants.PUBLIC_OVERLAYS, build_number=1,
+               builder_name='foon', is_master=True, dryrun=True, **kwds):
+    handlers = kwds.pop('handlers', False)
+    kwds.setdefault('helper_pool', validation_pool.HelperPool.SimpleCreate())
+    kwds.setdefault('changes', [])
+
+    pool = validation_pool.ValidationPool(
+        overlays, build_number, builder_name, is_master, dryrun, **kwds)
+    self.mox.StubOutWithMock(pool, '_SendNotification')
+    if handlers:
+      self.mox.StubOutWithMock(pool, '_HandleApplySuccess')
+      self.mox.StubOutWithMock(pool, '_HandleApplyFailure')
+      self.mox.StubOutWithMock(pool, '_HandleCouldNotApply')
+    self.mox.StubOutWithMock(pool, '_patch_series')
+    return pool
+
+  def MakeFailure(self, patch, inflight=True):
+    return cros_patch.ApplyPatchException(patch, inflight=inflight)
+
+  def GetPool(self, changes, applied=(), tot=(),
+              inflight=(), dryrun=True, **kwds):
+    pool = self.MakePool(changes=changes, **kwds)
+    applied = list(applied)
+    tot = [self.MakeFailure(x, inflight=False) for x in tot]
+    inflight = [self.MakeFailure(x, inflight=True) for x in inflight]
+    pool._patch_series.Apply(
+        self.build_root, changes, dryrun, manifest=mox.IgnoreArg()
+        ).AndReturn((applied, tot, inflight))
+
+    for patch in applied:
+      pool._HandleApplySuccess(patch).AndReturn(None)
+
+    if tot:
+      pool._HandleApplyFailure(tot).AndReturn(None)
+
+    # We stash this on the pool object so we can reuse it during validation.
+    # We could stash this in the test instances, but that would break
+    # for any tests that do multiple pool instances.
+
+    pool._test_data = (changes, applied, tot, inflight)
+
+    return pool
+
+  def runApply(self, pool, result):
+    self.assertEqual(result, pool.ApplyPoolIntoRepo(self.build_root))
+    self.assertEqual(pool.changes, pool._test_data[1])
+    failed_inflight = pool.changes_that_failed_to_apply_earlier
+    expected_inflight = set(pool._test_data[3])
+    # Intersect the results, since it's possible there were results failed
+    # results that weren't related to the ApplyPoolIntoRepo call.
+    self.assertEqual(set(failed_inflight).intersection(expected_inflight),
+                     expected_inflight)
+
+    # Ensure that no old code/pathways are setting apply_error_message.
+    for patch in pool._test_data[0]:
+      if getattr(patch, 'apply_error_message', None) is not None:
+        raise AssertionError(
+            "patch %s has an apply_error_message that is not None: %s"
+            % (patch, patch.apply_error_message))
+
+    self.assertEqual(pool.changes, pool._test_data[1])
+
+  def testPatchSeriesInteraction(self):
+    """Verify the interaction between PatchSeries and ValidationPool.
+
+    Effectively, this validates data going into PatchSeries, and coming back
+    out; verifies the hand off to _Handle* functions, but no deeper.
     """
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    patch3 = self.MockPatch(3)
+    patches = self.GetPatches(3)
 
-    helper = self.mox.CreateMock(gerrit_helper.GerritHelper)
+    apply_pool = self.GetPool(patches, applied=patches, handlers=True)
+    all_inflight = self.GetPool(patches, inflight=patches, handlers=True)
+    all_tot = self.GetPool(patches, tot=patches, handlers=True)
+    mixed = self.GetPool(patches, tot=patches[0:1], inflight=patches[1:2],
+                         applied=patches[2:3], handlers=True)
 
-    build_root = 'fakebuildroot'
+    self.mox.ReplayAll()
+    self.runApply(apply_pool, True)
+    self.runApply(all_inflight, False)
+    self.runApply(all_tot, False)
+    self.runApply(mixed, True)
+    self.mox.VerifyAll()
 
-    validation_pool.ValidationPool._IsTreeOpen().AndReturn(True)
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False,
-                        helper_pool=validation_pool.HelperPool(external=helper,
-                                                               internal=None))
-    pool.changes = [patch1, patch2, patch3]
-    pool.dryrun = False
+  def testHandleApplySuccess(self):
+    """Validate steps taken for successfull application."""
+    patch = self.GetPatches(1)
+    pool = self.MakePool()
+    pool._SendNotification(patch, mox.StrContains('has picked up your change'))
+    self.mox.ReplayAll()
+    pool._HandleApplySuccess(patch)
+    self.mox.VerifyAll()
 
-    self.mox.StubOutWithMock(pool, 'SubmitChange')
-    self.mox.StubOutWithMock(helper, 'RemoveCommitReady')
-    pool.SubmitChange(patch1)
-    helper.IsChangeCommitted(str(patch1.gerrit_number), False).AndReturn(False)
-    pool.HandleCouldNotSubmit(patch1)
-    result = cros_build_lib.CommandResult(cmd='cmd', returncode=1)
-    pool.SubmitChange(patch2).AndRaise(
-        cros_build_lib.RunCommandError('Failed to submit', result))
-    pool.HandleCouldNotSubmit(patch2)
-    pool.SubmitChange(patch3)
-    helper.IsChangeCommitted(str(patch3.gerrit_number), False).AndReturn(True)
+  def testHandleApplyFailure(self):
+    failures = [cros_patch.ApplyPatchException(x) for x in self.GetPatches(4)]
+
+    notified_patches = failures[:2]
+    unnotified_patches = failures[2:]
+    master_pool = self.MakePool(dryrun=False)
+    slave_pool = self.MakePool(is_master=False)
+
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'RemoveCommitReady')
+
+    for failure in notified_patches:
+      master_pool._SendNotification(
+          failure.patch,
+          mox.StrContains('failed to apply your change'),
+          failure=mox.IgnoreArg())
+      # This pylint suppressin shouldn't be necessary, but pylint is invalidly
+      # thinking that the first arg isn't passed in; we suppress it to suppress
+      # the pylnt bug.
+      # pylint: disable=E1120
+      gerrit_helper.GerritHelper.RemoveCommitReady(failure.patch, dryrun=False)
+
+    self.mox.ReplayAll()
+    master_pool._HandleApplyFailure(notified_patches)
+    slave_pool._HandleApplyFailure(unnotified_patches)
+    self.mox.VerifyAll()
+
+  def testSubmitPoolFailures(self):
+    pool = self.MakePool(dryrun=False)
+    patch1, patch2, patch3 = patches = self.GetPatches(3)
+    failed = self.GetPatches(3)
+    pool.changes = patches[:]
+    # While we don't do anything w/ these patches, that's
+    # intentional; we're verifying that it isn't submitted
+    # if there is a failure.
+    pool.changes_that_failed_to_apply_earlier = failed[:]
+
+    self.mox.StubOutWithMock(pool, '_SubmitChange')
+    self.mox.StubOutWithMock(pool, '_HandleCouldNotSubmit')
+
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'IsChangeCommitted')
+
+    pool._SubmitChange(patch1).AndReturn(None)
+    gerrit_helper.GerritHelper.IsChangeCommitted(
+        str(patch1.gerrit_number), False).AndReturn(True)
+
+    pool._SubmitChange(patch2).AndReturn(None)
+    gerrit_helper.GerritHelper.IsChangeCommitted(
+        str(patch2.gerrit_number), False).InAnyOrder().AndReturn(False)
+
+    pool._HandleCouldNotSubmit(patch2).InAnyOrder()
+
+    pool._SubmitChange(patch3).AndRaise(
+        cros_build_lib.RunCommandError('blah', None))
+    pool._HandleCouldNotSubmit(patch3).InAnyOrder().AndReturn(None)
+
+    pool._IsTreeOpen().AndReturn(True)
 
     self.mox.ReplayAll()
     self.assertRaises(validation_pool.FailedToSubmitAllChangesException,
-                      validation_pool.ValidationPool.SubmitPool, (pool))
+                      pool.SubmitPool)
     self.mox.VerifyAll()
 
-  def testSimpleSubmitPool(self):
-    """Tests the ability to submit a list of changes."""
-    helper = self.mox.CreateMock(gerrit_helper.GerritHelper)
+  def testSubmitPool(self):
+    pool = self.MakePool(dryrun=False)
+    passed = self.GetPatches(3)
+    failed = self.GetPatches(3)
+    pool.changes = passed
+    pool.changes_that_failed_to_apply_earlier = failed[:]
 
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    build_root = 'fakebuildroot'
+    self.mox.StubOutWithMock(pool, '_SubmitChange')
+    self.mox.StubOutWithMock(pool, '_HandleCouldNotSubmit')
+    self.mox.StubOutWithMock(pool, '_HandleApplyFailure')
 
-    validation_pool.ValidationPool._IsTreeOpen().AndReturn(True)
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False,
-                        helper_pool=validation_pool.HelperPool(external=helper,
-                                                               internal=None))
-    pool.changes = [patch1, patch2]
-    pool.dryrun = False
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'IsChangeCommitted')
 
-    self.mox.StubOutWithMock(pool, 'SubmitChange')
-    pool.SubmitChange(patch1)
-    helper.IsChangeCommitted(str(patch1.gerrit_number), False).AndReturn(True)
-    pool.SubmitChange(patch2)
-    helper.IsChangeCommitted(str(patch2.gerrit_number), False).AndReturn(True)
+    for patch in passed:
+      pool._SubmitChange(patch).AndReturn(None)
+      gerrit_helper.GerritHelper.IsChangeCommitted(
+          str(patch.gerrit_number), False).AndReturn(True)
+
+    pool._HandleApplyFailure(failed)
+
+    pool._IsTreeOpen().AndReturn(True)
 
     self.mox.ReplayAll()
     pool.SubmitPool()
@@ -393,23 +526,24 @@
 
   def testSubmitNonManifestChanges(self):
     """Simple test to make sure we can submit non-manifest changes."""
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    helper = self.mox.CreateMock(gerrit_helper.GerritHelper)
-    build_root = 'fakebuildroot'
+    pool = self.MakePool(dryrun=False)
+    patch1, patch2 = passed = self.GetPatches(2)
+    pool.non_manifest_changes = passed[:]
 
-    validation_pool.ValidationPool._IsTreeOpen().AndReturn(True)
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False,
-                        helper_pool=validation_pool.HelperPool(external=helper,
-                                                               internal=None))
-    pool.non_manifest_changes = [patch1, patch2]
-    pool.dryrun = False
+    self.mox.StubOutWithMock(pool, '_SubmitChange')
+    self.mox.StubOutWithMock(pool, '_HandleCouldNotSubmit')
 
-    self.mox.StubOutWithMock(pool, 'SubmitChange')
-    pool.SubmitChange(patch1)
-    helper.IsChangeCommitted(str(patch1.gerrit_number), False).AndReturn(True)
-    pool.SubmitChange(patch2)
-    helper.IsChangeCommitted(str(patch2.gerrit_number), False).AndReturn(True)
+    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'IsChangeCommitted')
+
+    pool._SubmitChange(patch1).AndReturn(None)
+    gerrit_helper.GerritHelper.IsChangeCommitted(
+        str(patch1.gerrit_number), False).AndReturn(True)
+
+    pool._SubmitChange(patch2).AndReturn(None)
+    gerrit_helper.GerritHelper.IsChangeCommitted(
+        str(patch2.gerrit_number), False).AndReturn(True)
+
+    pool._IsTreeOpen().AndReturn(True)
 
     self.mox.ReplayAll()
     pool.SubmitNonManifestChanges()
@@ -417,117 +551,42 @@
 
   def testGerritSubmit(self):
     """Tests submission review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
+    pool = self.MakePool(dryrun=False)
 
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    validation_pool._RunCommand(
-        'ssh -p 29418 gerrit.chromium.org gerrit review '
-        '--submit 1112,2'.split(), False).AndReturn(None)
+    patch = self.GetPatches(1)
+    cmd = ('ssh -p 29418 gerrit.chromium.org gerrit review '
+           '--submit %i,%i' % (patch.gerrit_number, patch.patch_number))
+    validation_pool._RunCommand(cmd.split(), False).AndReturn(None)
     self.mox.ReplayAll()
-    pool.SubmitChange(my_patch)
-    self.mox.VerifyAll()
-
-  def testGerritHandleApplied(self):
-    """Tests review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    pool._SendNotification(my_patch, mox.IgnoreArg())
-
-    self.mox.ReplayAll()
-    pool.HandleApplied(my_patch)
-    self.mox.VerifyAll()
-
-  def testGerritHandleApplyError(self):
-    """Tests review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    gerrit_helper.GerritHelper._SqlQuery(
-        mox.IgnoreArg(), dryrun=mox.IgnoreArg(),
-        is_command=True).AndReturn(None)
-
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    pool._SendNotification(my_patch, mox.IgnoreArg())
-
-    self.mox.ReplayAll()
-    pool.HandleCouldNotApply(my_patch)
-    self.mox.VerifyAll()
-
-  def testGerritHandleSubmitError(self):
-    """Tests review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    gerrit_helper.GerritHelper._SqlQuery(
-        mox.IgnoreArg(), dryrun=mox.IgnoreArg(),
-        is_command=True).AndReturn(None)
-
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    pool._SendNotification(my_patch, mox.IgnoreArg())
-
-    self.mox.ReplayAll()
-    pool.HandleCouldNotSubmit(my_patch)
-    self.mox.VerifyAll()
-
-  def testGerritHandleVerifyError(self):
-    """Tests review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    gerrit_helper.GerritHelper._SqlQuery(
-        mox.IgnoreArg(), dryrun=mox.IgnoreArg(),
-        is_command=True).AndReturn(None)
-
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    pool._SendNotification(my_patch, mox.IgnoreArg(), detail=mox.IgnoreArg())
-
-    self.mox.ReplayAll()
-    pool.HandleCouldNotVerify(my_patch)
-    self.mox.VerifyAll()
-
-  def testGerritHandleVerifyErrorWithException(self):
-    """Tests review string looks correct."""
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    gerrit_helper.GerritHelper._SqlQuery(
-        mox.IgnoreArg(), dryrun=mox.IgnoreArg(),
-        is_command=True).AndReturn(None)
-
-    my_patch = cros_patch.GerritPatch(GetTestJson(), False)
-    failed_stage = 'BuildTarget'
-    error = 'Error compiling chromeos-base/update_engine'
-    pool._SendNotification(my_patch, mox.IgnoreArg(),
-                           detail=mox.And(mox.StrContains(error),
-                                          mox.StrContains(failed_stage)))
-
-    self.mox.ReplayAll()
-    pool.HandleCouldNotVerify(my_patch, failed_stage, Exception(error))
+    pool._SubmitChange(patch)
     self.mox.VerifyAll()
 
   def testUnhandledExceptions(self):
     """Test that CQ doesn't loop due to unhandled Exceptions."""
-    patch1 = self.MockPatch(1)
-    patch2 = self.MockPatch(2)
-    build_root = 'fakebuildroot'
+    pool = self.MakePool(dryrun=False)
+    patches = self.GetPatches(2)
+    pool.changes = patches[:]
 
     class MyException(Exception):
       pass
 
-    pool = self.GetPool(constants.PUBLIC_OVERLAYS, 1, 'build_name', True, False)
-    pool.changes = [patch1, patch2]
-    self.mox.StubOutWithMock(gerrit_helper.GerritHelper, 'RemoveCommitReady')
-    self.SetPoolsContentMergingProjects(pool)
-    pool.build_log = 'log'
+    self.mox.StubOutWithMock(pool._patch_series, 'Apply')
+    # Suppressed because pylint can't tell that we just replaced Apply via mox.
+    # pylint: disable=E1101
+    pool._patch_series.Apply(
+        self.build_root, patches, False, manifest=mox.IgnoreArg()).AndRaise(
+        MyException)
 
-    patch1.GerritDependencies(build_root).AndReturn([])
-    patch1.PaladinDependencies(build_root).AndReturn([])
-    patch1.Apply(build_root, trivial=True).AndRaise(MyException())
-
-    pool.HandleCouldNotApply(patch1)
-    pool.HandleCouldNotApply(patch2)
+    def _ValidateExceptioN(changes):
+      for patch in changes:
+        self.assertTrue(isinstance(patch, validation_pool.InternalCQError),
+                        msg="Expected %s to be type InternalCQError, got %r" %
+                        (patch, type(patch)))
+      self.assertEqual(set(patches),
+                       set(x.patch for x in changes))
 
     self.mox.ReplayAll()
-    # TODO(ferringb): remove the need for this.
-    # Reset before re-running so the error messages don't persist; they're
-    # currently stored on the instances themselves, although that'll be
-    # rectified soon enough
-    for patch in pool.changes:
-      patch.apply_error_message = None
-    self.assertRaises(MyException, pool.ApplyPoolIntoRepo, build_root)
+    self.assertRaises(MyException, pool.ApplyPoolIntoRepo, self.build_root)
     self.mox.VerifyAll()
 
 
@@ -555,7 +614,7 @@
     self.mox.StubOutWithMock(urllib, 'urlopen')
     status_url = 'https://chromiumos-status.appspot.com/current?format=json'
     backoff = 1
-    for attempt in range(retries_500):
+    for _attempt in range(retries_500):
       urllib.urlopen(status_url).AndReturn(return_status)
       return_status.getcode().AndReturn(500)
       time.sleep(backoff)
@@ -660,6 +719,7 @@
     changes = [cros_patch.GerritPatch(GetTestJson(ids[0]), True)]
     non_os = [cros_patch.GerritPatch(GetTestJson(ids[1]), False)]
     conflicting = [cros_patch.GerritPatch(GetTestJson(ids[2]), True)]
+    conflicting = [cros_patch.PatchException(x) for x in conflicting]
     pool = validation_pool.ValidationPool(
         constants.PUBLIC_OVERLAYS, 1, 'testing', True, True,
         changes=changes, non_os_changes=non_os,
@@ -670,14 +730,15 @@
   def _CheckTestData(data):
     results = pickle.loads(data)
     pool, changes, non_os, conflicting = results
-    def _f(source, value):
+    def _f(source, value, getter=lambda x:x):
       assert len(source) == len(value)
       for s_item, v_item in zip(source, value):
-        assert s_item.id == v_item.id
-        assert s_item.internal == v_item.internal
+        assert getter(s_item).id == getter(v_item).id
+        assert getter(s_item).internal == getter(v_item).internal
     _f(pool.changes, changes)
     _f(pool.non_manifest_changes, non_os)
-    _f(pool.changes_that_failed_to_apply_earlier, conflicting)
+    _f(pool.changes_that_failed_to_apply_earlier, conflicting,
+       getter=lambda s:getattr(s, 'patch', s))
     return ''