GS Cache: Stream multiple responses content as JSON
When extract files, we stream the result content in JSON. There's no
problem when files number is limited. However, when extracting many
files, say > 700, we have to split the requests into sub-requests. We
have to consolidate all response together and stream all of them into
*one* JSON stream.
This change refactor the old iter_content_as_json function to a class
which add queue the response multiple times. Finally, we call `stream`
to stream all responses in JSON.
BUG=chromium:824580
TEST=Ran unit tests.
Change-Id: Ib1ab2f7ed2437c5c5f62412e2b9e92e18af413a3
Reviewed-on: https://chromium-review.googlesource.com/1105482
Commit-Ready: Congbin Guo <guocb@chromium.org>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/gs_cache/range_response.py b/gs_cache/range_response.py
index 8883003..8b3e45e 100644
--- a/gs_cache/range_response.py
+++ b/gs_cache/range_response.py
@@ -8,8 +8,18 @@
from __future__ import division
from __future__ import print_function
+import collections
+import itertools
+import json
+import re
+
import constants
+_RANGE_HEADER_SEPARATORS = re.compile('[-/ ]')
+
+_ContentRangeHeader = collections.namedtuple('_ContentRangeHeader',
+ ('bytes', 'start', 'end', 'total'))
+
class FormatError(Exception):
"""Exception raised when we parse wrong format of response."""
@@ -19,8 +29,133 @@
"""Exception raised when we cannot get a file match the range."""
-class FileIterator(object):
- """The iterator of files in a response of multipart range request.
+class ResponseQueueError(Exception):
+ """Exception raised when trying to queue responses not allowed."""
+
+
+def _get_file_by_range_header(range_header_str, file_name_map):
+ """Get file name and size by the Content-Range header.
+
+ The format of Content-Range header is like:
+ Content-Range: bytes <start>-<end>/<total>
+ We get the <start> and <end> from it and retrieve the file name from
+ |file_name_map|.
+
+ Args:
+ range_header_str: A string of range header.
+ file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
+
+ Returns:
+ A tuple of (filename, size).
+
+ Raises:
+ FormatError: Raised when response content interrupted.
+ NoFileFoundError: Raised when we cannot get a file matches the range.
+ """
+ # Split the part of 'Content-Range:' first if needed.
+ if range_header_str.lower().startswith('content-range:'):
+ range_header_str = range_header_str.split(': ', 1)[1]
+
+ try:
+ range_header = _ContentRangeHeader._make(
+ _RANGE_HEADER_SEPARATORS.split(range_header_str)
+ )
+ size = int(range_header.end) - int(range_header.start) + 1
+ except (IndexError, ValueError):
+ raise FormatError('Wrong format of content range header: %s' %
+ range_header_str)
+
+ try:
+ filename = file_name_map[(range_header.start, size)]
+ except KeyError:
+ raise NoFileFoundError('Cannot find a file matches the range %s' %
+ range_header_str)
+
+ return filename, size
+
+
+class JsonStreamer(object):
+ """A class to stream the responses for range requests.
+
+ The class accepts responses and format the file content in all of them as a
+ JSON stream. The format:
+ '{"<filename>": "<content>", "<filename>": "<content>", ...}'
+ """
+
+ def __init__(self):
+ self._files_iter_list = []
+ self._can_add_more_response = True
+
+ def queue_response(self, response, file_info_list):
+ """Add a reponse to the queue to be streamed as JSON.
+
+ We can add either:
+ 1. one and only one response for single-part range requests, or
+ 2. a series of responses for multi-part range requests.
+
+ Args:
+ response: An instance of requests.Response, which may be the response of a
+ single range request, or a multi-part range request.
+ file_info_list: A list of tarfile_utils.TarMemberInfo. We use it to look
+ up file name by content start offset and size.
+
+ Raises:
+ FormatError: Raised when response to be queued isn't for a range request.
+ ResponseQueueError: Raised when either queuing more than one response for
+ single-part range request, or mixed responses for single-part and
+ multi-part range request.
+ """
+ if not self._can_add_more_response:
+ raise ResponseQueueError(
+ 'No more reponses can be added when there was a response for '
+ 'single-part range request in the queue!')
+
+ file_name_map = {(f.content_start, int(f.size)): f.filename
+ for f in file_info_list}
+
+ # Check if the response is for single range, or multi-part range. For a
+ # single range request, the response must have header 'Content-Range'. For a
+ # multi-part ranges request, the Content-Type header must be like
+ # 'multipart/byteranges; ......'.
+ content_range = response.headers.get('Content-Range', None)
+ content_type = response.headers.get('Content-Type', '')
+
+ if content_range:
+ if self._files_iter_list:
+ raise ResponseQueueError(
+ 'Cannot queue more than one responses for single-part range '
+ 'request, or mix responses for single-part and multi-part.')
+ filename, _ = _get_file_by_range_header(content_range, file_name_map)
+ self._files_iter_list = [iter([(filename, response.content)])]
+ self._can_add_more_response = False
+
+ elif content_type.startswith('multipart/byteranges;'):
+ self._files_iter_list.append(
+ iter(_FileIterator(response, file_name_map)))
+
+ else:
+ raise FormatError('The response is not for a range request.')
+
+ def stream(self):
+ """Yield the series of responses content as a JSON stream.
+
+ Yields:
+ A JSON stream in format described above.
+ """
+ files_iter = itertools.chain(*self._files_iter_list)
+
+ json_encoder = json.JSONEncoder()
+ filename, content = next(files_iter)
+ yield '{%s: %s' % (json_encoder.encode(filename),
+ json_encoder.encode(content))
+ for filename, content in files_iter:
+ yield ', %s: %s' % (json_encoder.encode(filename),
+ json_encoder.encode(content))
+ yield '}'
+
+
+class _FileIterator(object):
+ """The iterator of files in a response of multi-part range request.
An example response is like:
@@ -44,19 +179,17 @@
the files.
"""
- def __init__(self, response, file_info_list):
+ def __init__(self, response, file_name_map):
"""Constructor.
Args:
response: An instance of requests.response.
- file_info_list: A list of tarfile_utils.TarMemberInfo. We use it to look
- up file name by content start offset and size.
+ file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
"""
self._response_iter = response.iter_content(
constants.READ_BUFFER_SIZE_BYTES)
self._chunk = None
- self._file_name_map = {(f.content_start, int(f.size)): f.filename
- for f in file_info_list}
+ self._file_name_map = file_name_map
def __iter__(self):
self._chunk = next(self._response_iter)
@@ -135,7 +268,6 @@
Raises:
FormatError: Raised when response content interrupted.
- NoFileFoundError: Raised when we cannot get a file matches the range.
"""
self._read_empty_line() # The first line is empty.
while True:
@@ -146,20 +278,10 @@
break
self._read_empty_line() # Another empty line.
- # The header format is: "Content-Range: bytes START-END/TOTAL"
- try:
- start, end = sub_range_header.split(' ')[2].split('/')[0].split('-')
- size = int(end) - int(start) + 1
- except (IndexError, ValueError):
- raise FormatError('Wrong format of sub content range header: %s' %
- sub_range_header)
- try:
- filename = self._file_name_map[(start, size)]
- except KeyError:
- raise NoFileFoundError('Cannot find a file matches the range %s' %
- sub_range_header)
-
+ filename, size = _get_file_by_range_header(sub_range_header,
+ self._file_name_map)
content = self._read_bytes(size)
+
self._read_empty_line() # Every content has a trailing '\r\n'.
bytes_read = 0 if content is None else len(content)
diff --git a/gs_cache/tests/range_response_test.py b/gs_cache/tests/range_response_test.py
index 9cfd28b..eb0c2c6 100644
--- a/gs_cache/tests/range_response_test.py
+++ b/gs_cache/tests/range_response_test.py
@@ -9,6 +9,7 @@
from __future__ import division
from __future__ import print_function
+import json
import unittest
import mock
@@ -18,23 +19,64 @@
# pylint: disable=protected-access
-class FileIteratorTest(unittest.TestCase):
- """Test class for FileIterator."""
+class JsonStreamerBasicTest(unittest.TestCase):
+ """Basic test case for range_response.JsonStreamer."""
+
+ def setUp(self):
+ self.streamer = range_response.JsonStreamer()
+ self.single_part_response = mock.MagicMock()
+ self.single_part_response.headers = {'Content-Range': 'bytes 100-1099/*'}
+ self.single_part_response.content = 'A' * 1000
+ self.file_info_list = [tarfile_utils.TarMemberInfo('foo', '', '', '100',
+ '1000')]
+
+ def test_single_part_response(self):
+ """Test handling of single-part response."""
+ self.streamer.queue_response(self.single_part_response, self.file_info_list)
+ result = json.loads(''.join(self.streamer.stream()))
+ self.assertDictEqual(result, {'foo': 'A' * 1000})
+
+ def test_add_response_not_for_range_request(self):
+ """Test add response which not for range request."""
+ response = mock.MagicMock()
+ response.headers = {}
+ with self.assertRaises(range_response.FormatError):
+ self.streamer.queue_response(response, [])
+
+ def test_add_two_single_part_response(self):
+ """Test adding two single-part response."""
+ self.streamer.queue_response(self.single_part_response, self.file_info_list)
+ with self.assertRaises(range_response.ResponseQueueError):
+ self.streamer.queue_response(self.single_part_response, [])
+
+ def test_add_single_part_after_a_multi_part(self):
+ """Test adding a single-part response after some multi-part responses."""
+ response = mock.MagicMock()
+ response.headers = {
+ 'Content-Type': 'multipart/byteranges; boundary=boundary',
+ }
+ response.iter_content.return_value = iter([''])
+ self.streamer.queue_response(response, self.file_info_list)
+
+ with self.assertRaises(range_response.ResponseQueueError):
+ self.streamer.queue_response(self.single_part_response, [])
+
+
+class MultiPartResponseTest(unittest.TestCase):
+ """Test class for handling one response of multi-part range request."""
def setUp(self):
self.response = mock.MagicMock()
+ self.response.headers = {
+ 'Content-Type': 'multipart/byteranges; boundary=boundary',
+ }
self.file_info_list = [
tarfile_utils.TarMemberInfo('foo', '', '', '10', '10'),
tarfile_utils.TarMemberInfo('bar', '', '', '123', '1000')]
- def test_iter_files_empty_response(self):
- self.response.iter_content.return_value = iter([''])
- files = list(range_response.FileIterator(self.response, mock.MagicMock()))
- self.assertListEqual(files, [])
+ self.streamer = range_response.JsonStreamer()
- def test_iter_files(self):
- """Test iterating files in the response."""
- self.response.iter_content.return_value = iter([
+ self.good_response = [
'\r\nboundary\r\nContent-Type: some/type\r',
'\nContent-Range: bytes 10-19/T\r\n\r\n012',
'3456789\r\nboundary\r\nContent-Type: some',
@@ -42,39 +84,77 @@
'\n\r\n' + 'a' * 400,
'a' * 600,
'\r\nboundary--\r\n',
- ])
+ ]
- files = list(range_response.FileIterator(self.response,
- self.file_info_list))
- self.assertListEqual(files, [('foo', '0123456789'), ('bar', 'a' * 1000)])
+ def test_stream__empty_response(self):
+ """Test streaming empty response."""
+ self.response.iter_content.return_value = iter([''])
+ self.streamer.queue_response(self.response, self.file_info_list)
+ result = ''.join(self.streamer.stream())
+ self.assertEqual(result, '')
- def test_iter_files_file_not_found(self):
- """Test iter_files which cannot find file names."""
+ def test_stream__multipart_ranges(self):
+ """Test streaming files in one response."""
+ self.response.iter_content.return_value = iter(self.good_response)
+ self.streamer.queue_response(self.response, self.file_info_list)
+ result = json.loads(''.join(self.streamer.stream()))
+ self.assertDictEqual(result, {'foo': '0123456789', 'bar': 'a' * 1000})
+
+ def test_stream__two_multipart_ranges(self):
+ """Test streaming files in two responses."""
+ self.response.iter_content.return_value = iter(self.good_response)
+ self.streamer.queue_response(self.response, self.file_info_list)
+
+ # Queue the same response but with different file name map.
+ self.response.iter_content.return_value = iter(self.good_response)
+ self.streamer.queue_response(
+ self.response,
+ [tarfile_utils.TarMemberInfo('FOO', '', '', '10', '10'),
+ tarfile_utils.TarMemberInfo('BAR', '', '', '123', '1000')])
+
+ result = json.loads(''.join(self.streamer.stream()))
+ self.assertDictEqual(result, {'foo': '0123456789', 'FOO': '0123456789',
+ 'bar': 'a' * 1000, 'BAR': 'a' * 1000})
+
+ def test_stream__file_not_found(self):
+ """Test streaming which cannot find file names."""
self.response.iter_content.return_value = iter([
'\r\nboundary\r\nContent-Type: some/type\r',
'\nContent-Range: bytes 10-19/T\r\n\r\n012',
'3456789\r\n',
'\r\nboundary--\r\n',
])
+ self.streamer.queue_response(self.response, [])
with self.assertRaises(range_response.NoFileFoundError):
- list(range_response.FileIterator(self.response, []))
+ list(self.streamer.stream())
- def test_iter_files_bad_range_header(self):
- """Test iter_files with bad range header."""
+ def test_stream__bad_sub_range_header(self):
+ """Test streaming with bad range header."""
self.response.iter_content.return_value = iter([
'\r\nboundary\r\nContent-Type: some/type\r',
'\nContent-RangeXXXXXXXXXXXXXXX'
])
+ self.streamer.queue_response(self.response, [])
with self.assertRaises(range_response.FormatError):
- list(range_response.FileIterator(self.response, []))
+ list(self.streamer.stream())
- def test_iter_files_bad_size(self):
- """Test iter_files with bad file size."""
+ def test_stream__bad_size(self):
+ """Test streaming with bad file size."""
self.response.iter_content.return_value = iter([
'\r\nboundary\r\nContent-Type: some/type\r',
'\nContent-Range: bytes 10-19/T\r\n\r\n012',
'34\r\n',
'\r\nboundary--\r\n',
])
+ self.streamer.queue_response(self.response, self.file_info_list)
with self.assertRaises(range_response.FormatError):
- list(range_response.FileIterator(self.response, self.file_info_list))
+ list(self.streamer.stream())
+
+ def test_stream__single_range(self):
+ """Test formatting a single range response."""
+ self.response.headers = {'Content-Type': 'some/type',
+ 'Content-Range': 'bytes 10-19/*'}
+ self.response.content = 'x' * 10
+ self.streamer.queue_response(self.response, self.file_info_list)
+ result = ''.join(self.streamer.stream())
+ self.assertEqual(result, json.dumps({'foo': self.response.content}))