fake_telemetry: add functionality to handle telemetry requests

This CL adds a new class that lets gs_archive_server handle the
setup_telemetry RPC requests. This class also exracts tar.bz2 files
to the telemetry_src directory and then renames that directory to src.

BUG=chromium:1087989
TEST=Manually tested on chromeos2-devservertest. Output
and exact command can be found at http://gpaste/5618309229707264.

Change-Id: I54163b717ba2b3e093336a27a6935bab9a317e84
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/dev-util/+/2315611
Tested-by: Sanika Kulkarni <sanikak@chromium.org>
Commit-Queue: Sanika Kulkarni <sanikak@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
Auto-Submit: Sanika Kulkarni <sanikak@chromium.org>
diff --git a/gs_cache/fake_telemetry.py b/gs_cache/fake_telemetry.py
index 96e66ed..9cd38a2 100644
--- a/gs_cache/fake_telemetry.py
+++ b/gs_cache/fake_telemetry.py
@@ -12,6 +12,7 @@
 from __future__ import print_function
 
 import cherrypy  # pylint: disable=import-error
+import telemetry_setup
 
 
 def get_config():
@@ -29,4 +30,5 @@
   def GET(self, **kwargs):
     """A URL handler for setting up telemetry."""
     archive_url = kwargs.get('archive_url')
-    return 'Fake Telemetry: To be implemented. Archive URL: %s\n' % archive_url
+    with telemetry_setup.TelemetrySetup(archive_url) as tlm:
+      return tlm.Setup()
diff --git a/gs_cache/telemetry_setup.py b/gs_cache/telemetry_setup.py
new file mode 100644
index 0000000..51a178d
--- /dev/null
+++ b/gs_cache/telemetry_setup.py
@@ -0,0 +1,253 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""A class that sets up the environment for telemetry testing."""
+
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import errno
+import os
+import shutil
+import subprocess
+import tempfile
+import threading
+
+import requests
+
+import cherrypy  # pylint: disable=import-error
+
+from chromite.lib import cros_logging as logging
+
+
+# Define module logger.
+_logger = logging.getLogger(__file__)
+
+# Define all GS Cache related constants.
+GS_CACHE_HOSTNAME = '127.0.0.1'
+GS_CACHE_PORT = '8888'
+GS_CACHE_EXRTACT_RPC = 'extract'
+GS_CACHE_BASE_URL = ('http://%s:%s/%s' %
+                     (GS_CACHE_HOSTNAME, GS_CACHE_PORT, GS_CACHE_EXRTACT_RPC))
+
+
+def _log(*args, **kwargs):
+  """A wrapper function of logging.debug/info, etc."""
+  level = kwargs.pop('level', logging.DEBUG)
+  _logger.log(level, extra=cherrypy.request.headers, *args, **kwargs)
+
+
+def _GetBucketAndBuild(archive_url):
+  """Gets the build name from the archive_url.
+
+  Args:
+    archive_url: The archive_url is typically in the format
+        gs://<gs_bucket>/<build_name>. Deduce the bucket and build name from
+        this URL by splitting at the appropriate '/'.
+
+  Returns:
+    Name of the GS bucket as a string.
+    Name of the build as a string.
+  """
+  clean_url = archive_url.strip('gs://')
+  parts = clean_url.split('/')
+  return parts[0], '/'.join(parts[1:])
+
+
+class TelemetrySetupError(Exception):
+  """Exception class used by this module."""
+  pass
+
+
+class LockDict(object):
+  """A dictionary of locks.
+
+  This class provides a thread-safe store of threading.Lock objects, which can
+  be used to regulate access to any set of hashable resources.
+
+  Usage:
+    foo_lock_dict = LockDict()
+    ...
+    with foo_lock_dict.lock('bar'):
+      # Critical section for 'bar'
+  """
+  def __init__(self):
+    self._lock = self._new_lock()
+    self._dict = {}
+
+  @staticmethod
+  def _new_lock():
+    return threading.Lock()
+
+  def lock(self, key):
+    with self._lock:
+      lock = self._dict.get(key)
+      if not lock:
+        lock = self._new_lock()
+        self._dict[key] = lock
+      return lock
+
+
+class TelemetrySetup(object):
+  """Class that sets up the environment for telemetry testing."""
+
+  # Relevant directory paths.
+  _BASE_DIR_PATH = '/home/chromeos-test/images'
+  _PARTIAL_DEPENDENCY_DIR_PATH = 'autotest/packages'
+
+  # Relevant directory names.
+  _TELEMETRY_SRC_DIR_NAME = 'telemetry_src'
+  _TEST_SRC_DIR_NAME = 'test_src'
+  _SRC_DIR_NAME = 'src'
+
+  # Names of the telemetry dependency tarballs.
+  _DEPENDENCIES = [
+      'dep-telemetry_dep.tar.bz2',
+      'dep-page_cycler_dep.tar.bz2',
+      'dep-chrome_test.tar.bz2',
+      'dep-perf_data_dep.tar.bz2',
+  ]
+
+  def __init__(self, archive_url):
+    """Initializes the TelemetrySetup class.
+
+    Args:
+      archive_url: The URL of the archive supplied through the /setup_telemetry
+          request. It is typically in the format gs://<gs_bucket>/<build_name>
+    """
+    self._bucket, self._build = _GetBucketAndBuild(archive_url)
+    self._build_dir = os.path.join(self._BASE_DIR_PATH, self._build)
+    self._temp_dir_path = tempfile.mkdtemp(prefix='gsc-telemetry')
+    self._tlm_src_dir_path = os.path.join(self._build_dir,
+                                          self._TELEMETRY_SRC_DIR_NAME)
+    self._telemetry_lock_dict = LockDict()
+
+  def __enter__(self):
+    """Called while entering context manager; does nothing."""
+    return self
+
+  def __exit__(self, exc_type, exc_value, traceback):
+    """Called while exiting context manager; cleans up temp dirs."""
+    try:
+      shutil.rmtree(self._temp_dir_path)
+    except Exception as e:
+      _log('Something went wrong. Could not delete %s due to exception: %s',
+           self._temp_dir_path, e, level=logging.WARNING)
+
+  def Setup(self):
+    """Sets up the environment for telemetry testing.
+
+    This method downloads the telemetry dependency tarballs and extracts them
+    into a 'src' directory.
+
+    Returns:
+      Path to the src directry where the telemetry dependencies have been
+          downloaded and extracted.
+    """
+    src_folder = os.path.join(self._tlm_src_dir_path, self._SRC_DIR_NAME)
+    test_src = os.path.join(self._tlm_src_dir_path, self._TEST_SRC_DIR_NAME)
+
+    with self._telemetry_lock_dict.lock(self._tlm_src_dir_path):
+      if not os.path.exists(src_folder):
+        self._MkDirP(self._tlm_src_dir_path)
+
+        # Download the required dependency tarballs.
+        for dep in self._DEPENDENCIES:
+          dep_path = self._DownloadFilesFromTar(dep, self._temp_dir_path)
+          if os.path.exists(dep_path):
+            self._ExtractTarball(dep_path, self._tlm_src_dir_path)
+
+        # By default all the tarballs extract to test_src but some parts of
+        # the telemetry code specifically hardcoded to exist inside of 'src'.
+        try:
+          shutil.move(test_src, src_folder)
+        except shutil.Error:
+          raise TelemetrySetupError(
+              'Failure in telemetry setup for build %s. Appears that the '
+              'test_src to src move failed.' % self._build)
+
+    return src_folder
+
+  def _DownloadFilesFromTar(self, filename, dest_path):
+    """Downloads the given tar.bz2 file.
+
+    The given tar.bz2 file is downloaded by calling the 'extract' RPC of
+    gs_archive_server.
+
+    Args:
+      filename: Name of the tar.bz2 file to be downloaded.
+      dest_path: Full path to the directory where it should be downloaded.
+
+    Returns:
+      Full path to the downloaded file.
+
+    Raises:
+      TelemetrySetupError when the download cannot be completed for any reason.
+    """
+    dep_path = os.path.join(dest_path, filename)
+    params = 'file=%s/%s' % (self._PARTIAL_DEPENDENCY_DIR_PATH, filename)
+    partial_url = ('%s/%s/%s/autotest_packages.tar' %
+                   (GS_CACHE_BASE_URL, self._bucket, self._build))
+    url = '%s?%s' % (partial_url, params)
+    resp = requests.get(url)
+    try:
+      resp.raise_for_status()
+      with open(dep_path, 'w') as f:
+        for content in resp.iter_content():
+          f.write(content)
+    except Exception as e:
+      if (isinstance(e, requests.exceptions.HTTPError)
+          and resp.status_code == 404):
+        _log('The request %s returned a 404 Not Found status. This dependency '
+             'could be new and therefore does not exist in this specific '
+             'tarball. Hence, squashing the exception and proceeding.',
+             url, level=logging.ERROR)
+      else:
+        raise TelemetrySetupError('An error occurred while trying to complete '
+                                  'the extract request %s: %s' % (url, str(e)))
+    return dep_path
+
+  def _ExtractTarball(self, tarball_path, dest_path):
+    """Extracts the given tarball into the destination directory.
+
+    Args:
+      tarball_path: Full path to the tarball to be extracted.
+      dest_path: Full path to the directory where the tarball should be
+          extracted.
+
+    Raises:
+      TelemetrySetupError if the method is unable to extract the tarball for
+          any reason.
+    """
+    cmd = ['tar', 'xf', tarball_path, '--directory', dest_path]
+    try:
+      proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE)
+      proc.communicate()
+    except Exception as e:
+      shutil.rmtree(dest_path)
+      raise TelemetrySetupError(
+          'An exception occurred while trying to untar %s into %s: %s' %
+          (tarball_path, dest_path, str(e)))
+
+  def _MkDirP(self, path):
+    """Recursively creates the given directory.
+
+    Args:
+      path: Full path to the directory that needs to the created.
+
+    Raises:
+      TelemetrySetupError is the method is unable to create directories for any
+          reason except OSError EEXIST which indicates that the directory
+          already exists.
+    """
+    try:
+      os.makedirs(path)
+    except Exception as e:
+      if not isinstance(e, OSError) or e.errno != errno.EEXIST:
+        raise TelemetrySetupError(
+            'Could not create directory %s due to %s.' % (path, str(e)))