GS Cache: Stream multiple responses content as JSON

When extract files, we stream the result content in JSON. There's no
problem when files number is limited. However, when extracting many
files, say > 700, we have to split the requests into sub-requests. We
have to consolidate all response together and stream all of them into
*one* JSON stream.

This change refactor the old iter_content_as_json function to a class
which add queue the response multiple times. Finally, we call `stream`
to stream all responses in JSON.

BUG=chromium:824580
TEST=Ran unit tests.

Change-Id: Ib1ab2f7ed2437c5c5f62412e2b9e92e18af413a3
Reviewed-on: https://chromium-review.googlesource.com/1105482
Commit-Ready: Congbin Guo <guocb@chromium.org>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/gs_cache/range_response.py b/gs_cache/range_response.py
index 8883003..8b3e45e 100644
--- a/gs_cache/range_response.py
+++ b/gs_cache/range_response.py
@@ -8,8 +8,18 @@
 from __future__ import division
 from __future__ import print_function
 
+import collections
+import itertools
+import json
+import re
+
 import constants
 
+_RANGE_HEADER_SEPARATORS = re.compile('[-/ ]')
+
+_ContentRangeHeader = collections.namedtuple('_ContentRangeHeader',
+                                             ('bytes', 'start', 'end', 'total'))
+
 
 class FormatError(Exception):
   """Exception raised when we parse wrong format of response."""
@@ -19,8 +29,133 @@
   """Exception raised when we cannot get a file match the range."""
 
 
-class FileIterator(object):
-  """The iterator of files in a response of multipart range request.
+class ResponseQueueError(Exception):
+  """Exception raised when trying to queue responses not allowed."""
+
+
+def _get_file_by_range_header(range_header_str, file_name_map):
+  """Get file name and size by the Content-Range header.
+
+  The format of Content-Range header is like:
+    Content-Range: bytes <start>-<end>/<total>
+  We get the <start> and <end> from it and retrieve the file name from
+  |file_name_map|.
+
+  Args:
+    range_header_str: A string of range header.
+    file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
+
+  Returns:
+    A tuple of (filename, size).
+
+  Raises:
+    FormatError: Raised when response content interrupted.
+    NoFileFoundError: Raised when we cannot get a file matches the range.
+  """
+  # Split the part of 'Content-Range:' first if needed.
+  if range_header_str.lower().startswith('content-range:'):
+    range_header_str = range_header_str.split(': ', 1)[1]
+
+  try:
+    range_header = _ContentRangeHeader._make(
+        _RANGE_HEADER_SEPARATORS.split(range_header_str)
+    )
+    size = int(range_header.end) - int(range_header.start) + 1
+  except (IndexError, ValueError):
+    raise FormatError('Wrong format of content range header: %s' %
+                      range_header_str)
+
+  try:
+    filename = file_name_map[(range_header.start, size)]
+  except KeyError:
+    raise NoFileFoundError('Cannot find a file matches the range %s' %
+                           range_header_str)
+
+  return filename, size
+
+
+class JsonStreamer(object):
+  """A class to stream the responses for range requests.
+
+  The class accepts responses and format the file content in all of them as a
+  JSON stream. The format:
+    '{"<filename>": "<content>", "<filename>": "<content>", ...}'
+  """
+
+  def __init__(self):
+    self._files_iter_list = []
+    self._can_add_more_response = True
+
+  def queue_response(self, response, file_info_list):
+    """Add a reponse to the queue to be streamed as JSON.
+
+    We can add either:
+      1. one and only one response for single-part range requests, or
+      2. a series of responses for multi-part range requests.
+
+    Args:
+      response: An instance of requests.Response, which may be the response of a
+        single range request, or a multi-part range request.
+      file_info_list: A list of tarfile_utils.TarMemberInfo. We use it to look
+        up file name by content start offset and size.
+
+    Raises:
+      FormatError: Raised when response to be queued isn't for a range request.
+      ResponseQueueError: Raised when either queuing more than one response for
+        single-part range request, or mixed responses for single-part and
+        multi-part range request.
+    """
+    if not self._can_add_more_response:
+      raise ResponseQueueError(
+          'No more reponses can be added when there was a response for '
+          'single-part range request in the queue!')
+
+    file_name_map = {(f.content_start, int(f.size)): f.filename
+                     for f in file_info_list}
+
+    # Check if the response is for single range, or multi-part range. For a
+    # single range request, the response must have header 'Content-Range'. For a
+    # multi-part ranges request, the Content-Type header must be like
+    # 'multipart/byteranges; ......'.
+    content_range = response.headers.get('Content-Range', None)
+    content_type = response.headers.get('Content-Type', '')
+
+    if content_range:
+      if self._files_iter_list:
+        raise ResponseQueueError(
+            'Cannot queue more than one responses for single-part range '
+            'request, or mix responses for single-part and multi-part.')
+      filename, _ = _get_file_by_range_header(content_range, file_name_map)
+      self._files_iter_list = [iter([(filename, response.content)])]
+      self._can_add_more_response = False
+
+    elif content_type.startswith('multipart/byteranges;'):
+      self._files_iter_list.append(
+          iter(_FileIterator(response, file_name_map)))
+
+    else:
+      raise FormatError('The response is not for a range request.')
+
+  def stream(self):
+    """Yield the series of responses content as a JSON stream.
+
+    Yields:
+      A JSON stream in format described above.
+    """
+    files_iter = itertools.chain(*self._files_iter_list)
+
+    json_encoder = json.JSONEncoder()
+    filename, content = next(files_iter)
+    yield '{%s: %s' % (json_encoder.encode(filename),
+                       json_encoder.encode(content))
+    for filename, content in files_iter:
+      yield ', %s: %s' % (json_encoder.encode(filename),
+                          json_encoder.encode(content))
+    yield '}'
+
+
+class _FileIterator(object):
+  """The iterator of files in a response of multi-part range request.
 
   An example response is like:
 
@@ -44,19 +179,17 @@
   the files.
   """
 
-  def __init__(self, response, file_info_list):
+  def __init__(self, response, file_name_map):
     """Constructor.
 
     Args:
       response: An instance of requests.response.
-      file_info_list: A list of tarfile_utils.TarMemberInfo. We use it to look
-        up file name by content start offset and size.
+      file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
     """
     self._response_iter = response.iter_content(
         constants.READ_BUFFER_SIZE_BYTES)
     self._chunk = None
-    self._file_name_map = {(f.content_start, int(f.size)): f.filename
-                           for f in file_info_list}
+    self._file_name_map = file_name_map
 
   def __iter__(self):
     self._chunk = next(self._response_iter)
@@ -135,7 +268,6 @@
 
     Raises:
       FormatError: Raised when response content interrupted.
-      NoFileFoundError: Raised when we cannot get a file matches the range.
     """
     self._read_empty_line()  # The first line is empty.
     while True:
@@ -146,20 +278,10 @@
         break
       self._read_empty_line()  # Another empty line.
 
-      # The header format is: "Content-Range: bytes START-END/TOTAL"
-      try:
-        start, end = sub_range_header.split(' ')[2].split('/')[0].split('-')
-        size = int(end) - int(start) + 1
-      except (IndexError, ValueError):
-        raise FormatError('Wrong format of sub content range header: %s' %
-                          sub_range_header)
-      try:
-        filename = self._file_name_map[(start, size)]
-      except KeyError:
-        raise NoFileFoundError('Cannot find a file matches the range %s' %
-                               sub_range_header)
-
+      filename, size = _get_file_by_range_header(sub_range_header,
+                                                 self._file_name_map)
       content = self._read_bytes(size)
+
       self._read_empty_line()  # Every content has a trailing '\r\n'.
 
       bytes_read = 0 if content is None else len(content)
diff --git a/gs_cache/tests/range_response_test.py b/gs_cache/tests/range_response_test.py
index 9cfd28b..eb0c2c6 100644
--- a/gs_cache/tests/range_response_test.py
+++ b/gs_cache/tests/range_response_test.py
@@ -9,6 +9,7 @@
 from __future__ import division
 from __future__ import print_function
 
+import json
 import unittest
 
 import mock
@@ -18,23 +19,64 @@
 
 
 # pylint: disable=protected-access
-class FileIteratorTest(unittest.TestCase):
-  """Test class for FileIterator."""
+class JsonStreamerBasicTest(unittest.TestCase):
+  """Basic test case for range_response.JsonStreamer."""
+
+  def setUp(self):
+    self.streamer = range_response.JsonStreamer()
+    self.single_part_response = mock.MagicMock()
+    self.single_part_response.headers = {'Content-Range': 'bytes 100-1099/*'}
+    self.single_part_response.content = 'A' * 1000
+    self.file_info_list = [tarfile_utils.TarMemberInfo('foo', '', '', '100',
+                                                       '1000')]
+
+  def test_single_part_response(self):
+    """Test handling of single-part response."""
+    self.streamer.queue_response(self.single_part_response, self.file_info_list)
+    result = json.loads(''.join(self.streamer.stream()))
+    self.assertDictEqual(result, {'foo': 'A' * 1000})
+
+  def test_add_response_not_for_range_request(self):
+    """Test add response which not for range request."""
+    response = mock.MagicMock()
+    response.headers = {}
+    with self.assertRaises(range_response.FormatError):
+      self.streamer.queue_response(response, [])
+
+  def test_add_two_single_part_response(self):
+    """Test adding two single-part response."""
+    self.streamer.queue_response(self.single_part_response, self.file_info_list)
+    with self.assertRaises(range_response.ResponseQueueError):
+      self.streamer.queue_response(self.single_part_response, [])
+
+  def test_add_single_part_after_a_multi_part(self):
+    """Test adding a single-part response after some multi-part responses."""
+    response = mock.MagicMock()
+    response.headers = {
+        'Content-Type': 'multipart/byteranges; boundary=boundary',
+    }
+    response.iter_content.return_value = iter([''])
+    self.streamer.queue_response(response, self.file_info_list)
+
+    with self.assertRaises(range_response.ResponseQueueError):
+      self.streamer.queue_response(self.single_part_response, [])
+
+
+class MultiPartResponseTest(unittest.TestCase):
+  """Test class for handling one response of multi-part range request."""
 
   def setUp(self):
     self.response = mock.MagicMock()
+    self.response.headers = {
+        'Content-Type': 'multipart/byteranges; boundary=boundary',
+    }
     self.file_info_list = [
         tarfile_utils.TarMemberInfo('foo', '', '', '10', '10'),
         tarfile_utils.TarMemberInfo('bar', '', '', '123', '1000')]
 
-  def test_iter_files_empty_response(self):
-    self.response.iter_content.return_value = iter([''])
-    files = list(range_response.FileIterator(self.response, mock.MagicMock()))
-    self.assertListEqual(files, [])
+    self.streamer = range_response.JsonStreamer()
 
-  def test_iter_files(self):
-    """Test iterating files in the response."""
-    self.response.iter_content.return_value = iter([
+    self.good_response = [
         '\r\nboundary\r\nContent-Type: some/type\r',
         '\nContent-Range: bytes 10-19/T\r\n\r\n012',
         '3456789\r\nboundary\r\nContent-Type: some',
@@ -42,39 +84,77 @@
         '\n\r\n' + 'a' * 400,
         'a' * 600,
         '\r\nboundary--\r\n',
-    ])
+    ]
 
-    files = list(range_response.FileIterator(self.response,
-                                             self.file_info_list))
-    self.assertListEqual(files, [('foo', '0123456789'), ('bar', 'a' * 1000)])
+  def test_stream__empty_response(self):
+    """Test streaming empty response."""
+    self.response.iter_content.return_value = iter([''])
+    self.streamer.queue_response(self.response, self.file_info_list)
+    result = ''.join(self.streamer.stream())
+    self.assertEqual(result, '')
 
-  def test_iter_files_file_not_found(self):
-    """Test iter_files which cannot find file names."""
+  def test_stream__multipart_ranges(self):
+    """Test streaming files in one response."""
+    self.response.iter_content.return_value = iter(self.good_response)
+    self.streamer.queue_response(self.response, self.file_info_list)
+    result = json.loads(''.join(self.streamer.stream()))
+    self.assertDictEqual(result, {'foo': '0123456789', 'bar': 'a' * 1000})
+
+  def test_stream__two_multipart_ranges(self):
+    """Test streaming files in two responses."""
+    self.response.iter_content.return_value = iter(self.good_response)
+    self.streamer.queue_response(self.response, self.file_info_list)
+
+    # Queue the same response but with different file name map.
+    self.response.iter_content.return_value = iter(self.good_response)
+    self.streamer.queue_response(
+        self.response,
+        [tarfile_utils.TarMemberInfo('FOO', '', '', '10', '10'),
+         tarfile_utils.TarMemberInfo('BAR', '', '', '123', '1000')])
+
+    result = json.loads(''.join(self.streamer.stream()))
+    self.assertDictEqual(result, {'foo': '0123456789', 'FOO': '0123456789',
+                                  'bar': 'a' * 1000, 'BAR': 'a' * 1000})
+
+  def test_stream__file_not_found(self):
+    """Test streaming which cannot find file names."""
     self.response.iter_content.return_value = iter([
         '\r\nboundary\r\nContent-Type: some/type\r',
         '\nContent-Range: bytes 10-19/T\r\n\r\n012',
         '3456789\r\n',
         '\r\nboundary--\r\n',
     ])
+    self.streamer.queue_response(self.response, [])
     with self.assertRaises(range_response.NoFileFoundError):
-      list(range_response.FileIterator(self.response, []))
+      list(self.streamer.stream())
 
-  def test_iter_files_bad_range_header(self):
-    """Test iter_files with bad range header."""
+  def test_stream__bad_sub_range_header(self):
+    """Test streaming with bad range header."""
     self.response.iter_content.return_value = iter([
         '\r\nboundary\r\nContent-Type: some/type\r',
         '\nContent-RangeXXXXXXXXXXXXXXX'
     ])
+    self.streamer.queue_response(self.response, [])
     with self.assertRaises(range_response.FormatError):
-      list(range_response.FileIterator(self.response, []))
+      list(self.streamer.stream())
 
-  def test_iter_files_bad_size(self):
-    """Test iter_files with bad file size."""
+  def test_stream__bad_size(self):
+    """Test streaming with bad file size."""
     self.response.iter_content.return_value = iter([
         '\r\nboundary\r\nContent-Type: some/type\r',
         '\nContent-Range: bytes 10-19/T\r\n\r\n012',
         '34\r\n',
         '\r\nboundary--\r\n',
     ])
+    self.streamer.queue_response(self.response, self.file_info_list)
     with self.assertRaises(range_response.FormatError):
-      list(range_response.FileIterator(self.response, self.file_info_list))
+      list(self.streamer.stream())
+
+  def test_stream__single_range(self):
+    """Test formatting a single range response."""
+    self.response.headers = {'Content-Type': 'some/type',
+                             'Content-Range': 'bytes 10-19/*'}
+    self.response.content = 'x' * 10
+    self.streamer.queue_response(self.response, self.file_info_list)
+    result = ''.join(self.streamer.stream())
+    self.assertEqual(result, json.dumps({'foo': self.response.content}))