GS Cache: Extract a file from TAR.

This change adds support to extract a file from a tar archive in google
storage. The change is based on "Range Request" which set a http "Range"
header in a normal http downloading. For example:

   Range: bytes 123-456

This requests download the bytes from 123 to 456 (included).

The basic workflow of `extract` is:
1. Call `list_member` to get all file list of the Tar file.
2. Search the target file in the file list, then get the start/end data.
3. Call `download` with "Range" header to download that part.

BUG=chromium:824580
TEST=Ran unit tests.

Change-Id: I2550945932ec67fb7e9a7ae7e75194b3f2377d22
Reviewed-on: https://chromium-review.googlesource.com/1068662
Commit-Ready: Congbin Guo <guocb@chromium.org>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/gs_cache/gs_archive_server.py b/gs_cache/gs_archive_server.py
index 8977dd7..adeca2c 100644
--- a/gs_cache/gs_archive_server.py
+++ b/gs_cache/gs_archive_server.py
@@ -35,9 +35,12 @@
 from chromite.lib import gs
 
 # some http status codes
+_HTTP_OK = 200
+_HTTP_PARTIAL_CONTENT = 206
 _HTTP_BAD_REQUEST = 400
 _HTTP_UNAUTHORIZED = 401
 _HTTP_NOT_FOUND = 404
+_HTTP_INTERNAL_SERVER_ERROR = 500
 _HTTP_SERVICE_UNAVAILABLE = 503
 
 _READ_BUFFER_SIZE_BYTES = 1024 * 1024  # 1 MB
@@ -55,6 +58,12 @@
   _logger.log(level, extra=cherrypy.request.headers, *args, **kwargs)
 
 
+def _log_filtered_headers(all_headers, filtered_headers, level=logging.DEBUG):
+  """Log the filtered headers only."""
+  _log('Filtered headers: %s', {k: all_headers.get(k) for k in
+                                filtered_headers}, level=level)
+
+
 def _check_file_extension(filename, ext_names=None):
   """Check the file name and, optionally, the ext name.
 
@@ -80,6 +89,26 @@
   return filename
 
 
+def _safe_get_param(all_params, param_name):
+  """Check if |param_name| is in |all_params|.
+
+  Args:
+    all_params: A dict of all parameters of the call.
+    param_name: The parameter name to be checked.
+
+  Returns:
+    The value of |param_name|.
+
+  Raises:
+    Raise HTTP 400 error when the required parameter isn't in |all_params|.
+  """
+  try:
+    return all_params[param_name]
+  except KeyError:
+    raise cherrypy.HTTPError(_HTTP_BAD_REQUEST,
+                             'Parameter "%s" is required!' % param_name)
+
+
 def _to_cherrypy_error(func):
   """A decorator to convert Exceptions raised to proper cherrypy.HTTPError."""
   @functools.wraps(func)
@@ -133,9 +162,13 @@
     """Helper function to generate all RPC calls to the proxy server."""
     url = urlparse.urlunsplit(self._url + ('%s/%s' % (action, path),
                                            urllib.urlencode(args or {}), None))
-    _log('Sending request to proxy: %s', url)
+    _log('Sending request to caching server: %s', url)
+    # The header to control using or bypass cache.
+    _log_filtered_headers(headers, ('X-No-Cache',))
     rsp = requests.get(url, headers=headers, stream=True)
-    _log('Proxy response %s', rsp.status_code)
+    _log('Caching server response %s', rsp.status_code)
+    _log_filtered_headers(rsp.headers, ('Content-Type', 'Content-Length',
+                                        'X-Cache', 'Cache-Control', 'Date'))
     rsp.raise_for_status()
     return rsp
 
@@ -143,6 +176,10 @@
     """Call download RPC."""
     return self._call('download', path, headers=headers)
 
+  def list_member(self, path, headers=None):
+    """Call list_member RPC."""
+    return self._call('list_member', path, headers=headers)
+
 
 class GsArchiveServer(object):
   """The backend of Google Storage Cache server."""
@@ -202,14 +239,11 @@
       with tar_tv, contextlib.closing(StringIO.StringIO()) as stream:
         tar_tv.seek(0)
         for info in tarfile_utils.list_tar_members(tar_tv):
-          # some pre-computation for easier use of clients
-          content_end = info.content_start + info.size - 1
-          record_end = info.record_start + info.record_size - 1
-
-          # encode file name using URL percent encoding, so ',' => '%2C'
-          stream.write('%s,%d,%d,%d,%d,%d,%d\n' % (
+          # Encode file name using URL percent encoding, so ',' in file name
+          # becomes to '%2C'.
+          stream.write('%s,%d,%d,%d,%d\n' % (
               urllib.quote(info.filename), info.record_start, info.record_size,
-              record_end, info.content_start, info.size, content_end))
+              info.content_start, info.size))
 
           if stream.tell() > _WRITE_BUFFER_SIZE_BYTES:
             yield stream.getvalue()
@@ -261,9 +295,84 @@
 
     return content
 
+  @cherrypy.expose
+  @_to_cherrypy_error
+  def extract(self, *args, **kwargs):
+    """Extract a file from a Tar archive.
+
+    Examples:
+      GET /extract/chromeos-image-archive/release/files.tar?file=path/to/file
+
+    Args:
+      *args: All parts of the GS path of the archive, without gs:// prefix.
+      kwargs: file: The URL ENCODED path of file to be extracted.
+
+    Returns:
+      The stream of extracted file.
+    """
+    # TODO(guocb): support compressed format of tar
+    archive = _check_file_extension('/'.join(args), ext_names=['.tar'])
+    filename = _safe_get_param(kwargs, 'file')
+    _log('Extracting "%s" from "%s".', filename, archive)
+    return self._extract_file_from_tar(filename, archive)
+
+  def _extract_file_from_tar(self, filename, archive):
+    """Extract file of |filename| from |archive|."""
+    # Call `list_member` and search |filename| in it. If found, create another
+    # "Range Request" to download that range of bytes.
+    all_files = self._caching_server.list_member(
+        archive, headers=cherrypy.request.headers)
+    # The format of each line is '<filename>,<data1>,<data2>...'. And the
+    # filename is encoded by URL percent encoding, so no ',' in filename. Thus
+    # search '<filename>,' is good enough for looking up the file information.
+    target_str = '%s,' % filename
+
+    for l in all_files.iter_lines(chunk_size=_READ_BUFFER_SIZE_BYTES):
+      # TODO(guocb): Seems the searching performance is OK: for a 60K lines
+      # list, searching a file in very last takes about 0.1 second. But it
+      # would be better if there's a better and not very complex way.
+
+      # We don't split the line by ',' and compare the filename part is because
+      # of performance.
+      if l.startswith(target_str):
+        _log('The line for the file found: %s', l)
+        file_info = tarfile_utils.TarMemberInfo._make(l.split(','))
+        rsp = self._send_range_request(archive, file_info.content_start,
+                                       file_info.size)
+        rsp.raise_for_status()
+        return rsp.iter_content(_WRITE_BUFFER_SIZE_BYTES)
+
+    raise cherrypy.HTTPError(
+        _HTTP_BAD_REQUEST,
+        'File "%s" is not in archive "%s"!' % (filename, archive))
+
+  def _send_range_request(self, archive, start, size):
+    """Create and send a "Range Request" to caching server.
+
+    Set HTTP Range header and just download the bytes in that "range".
+    https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
+    """
+    headers = cherrypy.request.headers.copy()
+    headers['Range'] = 'bytes=%s-%d' % (start, int(start) + int(size) - 1)
+    rsp = self._caching_server.download(archive, headers=headers)
+
+    if rsp.status_code == _HTTP_PARTIAL_CONTENT:
+      # Although this is a partial response, it has full content of
+      # extracted file. Thus reset the status code and content-length.
+      cherrypy.response.status = _HTTP_OK
+      cherrypy.response.headers['Content-Length'] = size
+    else:
+      _log('Expect HTTP_PARTIAL_CONTENT (206), but got %s', rsp.status_code,
+           level=logging.ERROR)
+      rsp.status_code = _HTTP_INTERNAL_SERVER_ERROR
+      rsp.reason = 'Range request failed for "%s"' % archive
+
+    return rsp
+
   # pylint:disable=protected-access
   download._cp_config = {'response.stream': True}
   list_member._cp_config = {'response.stream': True}
+  extract._cp_config = {'response.stream': True}
 
 
 def _url_type(input_string):
diff --git a/gs_cache/tests/gs_archive_server_test.py b/gs_cache/tests/gs_archive_server_test.py
index ae07a39..13d5629 100644
--- a/gs_cache/tests/gs_archive_server_test.py
+++ b/gs_cache/tests/gs_archive_server_test.py
@@ -22,6 +22,7 @@
 from cherrypy.test import helper
 
 import gs_archive_server
+import tarfile_utils
 from chromite.lib import cros_logging as logging
 
 _TESTING_SERVER = 'http://127.0.0.1:8888'
@@ -35,8 +36,13 @@
     },
     'a_tar_file': {
         'path': '%s/control_files.tar' % _DIR,
-        'members_md5': '0d5d60e9f10d41c60dd85a7f0081de5d',
-    }
+        'members_md5': 'e7fda7e72173f764c54e244673387623',
+    },
+    'a_file_from_tar': {
+        'path': 'autotest/frontend/afe/control_file.py',
+        'from': '%s/control_files.tar' % _DIR,
+        'md5': '31c71c463eb44aaae37e3f2c92423291',
+    },
 }
 
 # a tgz file with only one file "bar" which content is "foo\n"
@@ -90,15 +96,13 @@
       rsp.iter_content.return_value = (_A_TAR_FILE[:100], _A_TAR_FILE[100:])
       csv = list(self.server.list_member('baz.tar'))
       self.assertEquals(len(csv), 1)
-      (filename, record_start, record_size, record_end,
-       content_start, size, content_end) = csv[0].split(',')
-      self.assertEquals(filename, 'bar')
-      self.assertEquals(record_start, '0')
-      self.assertEquals(record_size, '1024')
-      self.assertEquals(record_end, '1023')  # 1024 - 1
-      self.assertEquals(content_start, '512')
-      self.assertEquals(size, '4')
-      self.assertEquals(content_end, '515\n')  # 512 + 4 - 1
+      file_info = tarfile_utils.TarMemberInfo._make(
+          csv[0].rstrip('\n').split(','))
+      self.assertEquals(file_info.filename, 'bar')
+      self.assertEquals(file_info.record_start, '0')
+      self.assertEquals(file_info.record_size, '1024')
+      self.assertEquals(file_info.content_start, '512')
+      self.assertEquals(file_info.size, '4')
 
       # test char quoting in file name
       with gzip.open(os.path.join(os.path.dirname(__file__),
@@ -106,11 +110,25 @@
         rsp.iter_content.return_value = f.read()
         members = next(self.server.list_member('baz.tar'))
         for csv in members.rstrip('\n').split('\n'):
-          # each line can be split into 7 elements, even ',' in filename
-          elements = csv.split(',')
-          self.assertEquals(len(elements), 7)
-          # elements from 1 to 6 are integers
-          _ = [int(d) for d in elements[1:7]]
+          file_info = tarfile_utils.TarMemberInfo._make(csv.split(','))
+          # The first element is filename, and all remaining elements are
+          # integers.
+          _ = [int(d) for d in file_info[1:]]
+
+  def test_extract_from_tar(self):
+    """Test extract a file from a TAR archive."""
+    with mock.patch.object(self.server, '_caching_server') as cache_server:
+      cache_server.list_member.return_value.iter_lines.return_value = [
+          'foo,_,_,0,3', 'bar,_,_,3,10', 'baz,_,_,13,5']
+
+      # extract an existing file.
+      self.server.extract('bar.tar', file='bar')
+      cache_server.download.assert_called_with('bar.tar',
+                                               headers={'Range': 'bytes=3-12'})
+
+      # extract an non-exist file.
+      with self.assertRaises(cherrypy.HTTPError):
+        self.server.extract('bar.tar', file='footar')
 
 
 def testing_server_setup():
@@ -129,7 +147,7 @@
 
 
 @unittest.skipUnless(testing_server_setup(), 'Testing servers not available!')
-class GsCacheBackendFunctionalTest(unittest.TestCase):
+class GsCacheBackendIntegrationTest(unittest.TestCase):
   """This is a functional blackbox test
 
   These tests depend on a full setup of the server and proxy server.
@@ -165,6 +183,13 @@
     self.assertEquals(rsp.headers['Content-Type'], 'text/csv;charset=utf-8')
     self._verify_md5(rsp.content, tested_file['members_md5'])
 
+  def test_extract_from_tar(self):
+    """Test extracting a file from a tar."""
+    for k in ('a_file_from_tar',):
+      tested_file = _TEST_DATA[k]
+      rsp = self._get_page('/extract/%(from)s?file=%(path)s' % tested_file)
+      self._verify_md5(rsp.content, tested_file['md5'])
+
 
 if __name__ == "__main__":
   unittest.main()