GS Cache: extracting large number of files.

We use "range request" to extract files. When using pattern, we don't
know how many files matched. If they are too many, say > 700, we cannot
set all ranges in one single request because our caching server may
response 400 error that request header too long.

In this case, we split the ranges into chunks, say 100 range per chunk,
and send them to caching server one chunk per request.

BUG=chromium:824580
TEST=Ran unit tests.

Change-Id: If1adafcae96ddcb4c9ec1319411ae3298fa6397d
Reviewed-on: https://chromium-review.googlesource.com/1107104
Commit-Ready: Congbin Guo <guocb@chromium.org>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Aviv Keshet <akeshet@chromium.org>
diff --git a/gs_cache/gs_archive_server.py b/gs_cache/gs_archive_server.py
index 70acb9a..db3f9c7 100644
--- a/gs_cache/gs_archive_server.py
+++ b/gs_cache/gs_archive_server.py
@@ -25,6 +25,7 @@
 import fnmatch
 import functools
 import httplib
+import itertools
 import os
 import StringIO
 import subprocess
@@ -43,6 +44,7 @@
 from chromite.lib import gs
 
 _WRITE_BUFFER_SIZE_BYTES = 1024 * 1024  # 1 MB
+_MAX_RANGES_PER_REQUEST = 100
 
 # When extract files from TAR (either compressed or uncompressed), we suppose
 # the TAR exists, so we can call `download` RPC to get it. It's straightforward
@@ -164,6 +166,22 @@
   return found_lines
 
 
+def _split_into_chunks(iterable, size):
+  """Split the iterable into chunks of |size|.
+
+  Examples:
+    >>> list(_split_into_chunks(range(10), 3))
+    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
+    >>> list(_split_into_chunks(xrange(8), 4))
+    [[0, 1, 2, 3], [4, 5, 6, 7]]
+  """
+  iter_ = iter(iterable)
+  chunk = list(itertools.islice(iter_, size))
+  while chunk:
+    yield chunk
+    chunk = list(itertools.islice(iter_, size))
+
+
 class _CachingServer(object):
   r"""The interface of caching server for GsArchiveServer.
 
@@ -439,17 +457,21 @@
     if not found_lines:
       return '{}'
 
-    found_files = [
-        tarfile_utils.TarMemberInfo._make(urllib.unquote(line).rsplit(
+    # Too many ranges may result in error of 'request header too long'. So we
+    # split the files into chunks and request one by one.
+    found_files = _split_into_chunks(
+        [tarfile_utils.TarMemberInfo._make(urllib.unquote(line).rsplit(
             ',', len(tarfile_utils.TarMemberInfo._fields) - 1))
-        for line in found_lines
-    ]
-    ranges = [(int(f.content_start), int(f.content_start) + int(f.size) - 1)
-              for f in found_files]
-    rsp = self._send_range_request(archive, ranges, headers)
+         for line in found_lines],
+        _MAX_RANGES_PER_REQUEST)
 
     streamer = range_response.JsonStreamer()
-    streamer.queue_response(rsp, found_files)
+    for part_of_found_files in found_files:
+      ranges = [(int(f.content_start), int(f.content_start) + int(f.size) - 1)
+                for f in part_of_found_files]
+      rsp = self._send_range_request(archive, ranges, headers)
+      streamer.queue_response(rsp, part_of_found_files)
+
     return streamer.stream()
 
   def _send_range_request(self, archive, ranges, headers):
@@ -469,6 +491,7 @@
         httplib.PARTIAL_CONTENT.
     """
     ranges = ['%d-%d' % r for r in ranges]
+    headers = headers.copy()
     headers['Range'] = 'bytes=%s' % (','.join(ranges))
     rsp = self._caching_server.download(archive, headers=headers)
 
diff --git a/gs_cache/tests/gs_archive_server_test.py b/gs_cache/tests/gs_archive_server_test.py
index 824178c..632d86d 100644
--- a/gs_cache/tests/gs_archive_server_test.py
+++ b/gs_cache/tests/gs_archive_server_test.py
@@ -145,6 +145,9 @@
   def setUp(self):
     """Setup method."""
     self.server = gs_archive_server.GsArchiveServer('')
+    self.list_member_mock = mock.MagicMock()
+    self.list_member_mock.return_value.iter_lines.return_value = [
+        'foo,,,0,3', 'bar,,,3,10', 'baz,,,13,5', 'foo%2Cbar,,,20,10']
 
   def test_list_member(self):
     """Test list_member RPC."""
@@ -174,10 +177,9 @@
           _ = [int(d) for d in file_info[1:]]
 
   def test_extract_from_tar(self):
-    """Test extract a file from a TAR archive."""
+    """Test extracting a file from a TAR archive."""
     with mock.patch.object(self.server, '_caching_server') as cache_server:
-      cache_server.list_member.return_value.iter_lines.return_value = [
-          'foo,_,_,0,3', 'bar,_,_,3,10', 'baz,_,_,13,5']
+      cache_server.list_member = self.list_member_mock
       cache_server.download.return_value.headers = {
           'Content-Range': 'bytes 3-12/*'}
 
@@ -189,6 +191,40 @@
       # Extract an non-exist file. Should return '{}'
       self.assertEqual('{}', self.server.extract('bar.tar', file='footar'))
 
+  def test_extract_two_files_from_tar(self):
+    """Test extracting two files from a TAR archive."""
+    with mock.patch.object(self.server, '_caching_server') as cache_server:
+      cache_server.list_member = self.list_member_mock
+      cache_server.download.return_value.headers = {
+          'Content-Type': 'multipart/byteranges; boundary=xxx'}
+      cache_server.download.return_value.status_code = httplib.PARTIAL_CONTENT
+      # pylint: disable=protected-access
+      gs_archive_server._MAX_RANGES_PER_REQUEST = 100
+
+      with mock.patch('range_response.JsonStreamer'):
+        self.server.extract('bar.tar', file=['bar', 'foo'])
+
+      cache_server.download.assert_called_with(
+          'bar.tar', headers={'Range': 'bytes=0-2,3-12'})
+
+  def test_extract_many_files_from_tar(self):
+    """Test extracting many files which result in a series of range requests."""
+    with mock.patch.object(self.server, '_caching_server') as cache_server:
+      cache_server.list_member = self.list_member_mock
+      cache_server.download.return_value.headers = {
+          'Content-Type': 'multipart/byteranges; boundary=xxx'}
+      cache_server.download.return_value.status_code = httplib.PARTIAL_CONTENT
+
+      # pylint: disable=protected-access
+      gs_archive_server._MAX_RANGES_PER_REQUEST = 1
+      with mock.patch('range_response.JsonStreamer'):
+        self.server.extract('bar.tar', file=['bar', 'foo'])
+
+      cache_server.download.assert_any_call(
+          'bar.tar', headers={'Range': 'bytes=0-2'})
+      cache_server.download.assert_any_call(
+          'bar.tar', headers={'Range': 'bytes=3-12'})
+
   def test_decompress_tgz(self):
     """Test decompress a tgz file."""
     with mock.patch.object(self.server, '_caching_server') as cache_server: