GS Cache: refactor files iteration.
This change refactors files iteration in response of multi-part range
request using coroutine. This will make code easier to understand and
maintain.
BUG=chromium:824580
TEST=Ran unit tests.
Change-Id: I3a261855245e37f6f1a6171369cb9fa27cb5ed70
Reviewed-on: https://chromium-review.googlesource.com/1108797
Commit-Ready: ChromeOS CL Exonerator Bot <chromiumos-cl-exonerator@appspot.gserviceaccount.com>
Tested-by: Congbin Guo <guocb@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/gs_cache/range_response.py b/gs_cache/range_response.py
index 8b3e45e..c8c4dac 100644
--- a/gs_cache/range_response.py
+++ b/gs_cache/range_response.py
@@ -16,6 +16,7 @@
import constants
_RANGE_HEADER_SEPARATORS = re.compile('[-/ ]')
+_ONE_LINE = object() # Special object to indicate data reader to read one line.
_ContentRangeHeader = collections.namedtuple('_ContentRangeHeader',
('bytes', 'start', 'end', 'total'))
@@ -131,7 +132,7 @@
elif content_type.startswith('multipart/byteranges;'):
self._files_iter_list.append(
- iter(_FileIterator(response, file_name_map)))
+ _file_iterator(response, file_name_map))
else:
raise FormatError('The response is not for a range request.')
@@ -154,7 +155,83 @@
yield '}'
-class _FileIterator(object):
+def _data_reader(data_iter):
+ """A coroutine to read data from |data_iter|.
+
+ It accepts two type of parameter:
+ 1. _ONE_LINE: Read one CRLF ended line if possible.
+ 2. An integer N: Read at most N bytes.
+
+ Args:
+ data_iter: An iterator of data source.
+
+ Yields:
+ The data read.
+ """
+ buffered = next(data_iter)
+
+ # Get what to be read in runtime by passing value into the generator. See
+ # https://docs.python.org/2.5/whatsnew/pep-342.html for syntax details.
+ to_be_read = yield
+
+ while True:
+ if to_be_read is _ONE_LINE:
+ parts = buffered.split('\r\n', 1)
+ if len(parts) == 2:
+ line, buffered = parts
+ to_be_read = (yield line)
+ continue
+
+ else: # Read at most |to_be_read| bytes of data.
+ bytes_remaining = to_be_read - len(buffered)
+ if bytes_remaining < 0:
+ read_bytes = buffered[:bytes_remaining]
+ buffered = buffered[bytes_remaining:]
+ to_be_read = (yield read_bytes)
+ continue
+
+ try:
+ buffered += next(data_iter)
+ except StopIteration:
+ break
+
+ if buffered:
+ yield buffered
+
+
+def _read_line(reader):
+ """Read one CRLF ended line from the response.
+
+ Returns:
+ The line read. Return None if nothing to read.
+ """
+ return reader.send(_ONE_LINE)
+
+
+def _read_empty_line(reader):
+ """Read one line and assert it is empty."""
+ try:
+ line = _read_line(reader)
+ except StopIteration:
+ raise FormatError('Expect an empty line, but got EOF.')
+ if line:
+ raise FormatError('Expect an empty line, but got "%s".' % line)
+
+
+def _read_bytes(reader, max_bytes):
+ """Read at most |max_bytes| bytes from the reader.
+
+ Args:
+ reader:
+ max_bytes: An integer of maximum bytes of bytes to read.
+
+ Returns:
+ The bytes read. Return None if nothing to read.
+ """
+ return reader.send(max_bytes)
+
+
+def _file_iterator(response, file_name_map):
"""The iterator of files in a response of multi-part range request.
An example response is like:
@@ -177,118 +254,41 @@
In our application, each part is the content of a file. This class iterates
the files.
+
+ Args:
+ response: An instance of requests.response.
+ file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
+
+ Yields:
+ A pair of (name, content) of the file.
+
+ Raises:
+ FormatError: Raised when response content interrupted.
"""
+ reader = _data_reader(
+ response.iter_content(constants.READ_BUFFER_SIZE_BYTES))
+ reader.next() # initialize the coroutine
- def __init__(self, response, file_name_map):
- """Constructor.
+ _read_empty_line(reader) # The first line is empty.
+ while True:
+ _read_line(reader) # The second line is the boundary.
+ _read_line(reader) # The line sub content type.
+ sub_range_header = _read_line(reader) # The line of sub content range.
+ if sub_range_header is None:
+ break
+ _read_empty_line(reader) # Another empty line.
- Args:
- response: An instance of requests.response.
- file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
- """
- self._response_iter = response.iter_content(
- constants.READ_BUFFER_SIZE_BYTES)
- self._chunk = None
- self._file_name_map = file_name_map
+ filename, size = _get_file_by_range_header(sub_range_header,
+ file_name_map)
+ content = _read_bytes(reader, size)
- def __iter__(self):
- self._chunk = next(self._response_iter)
- return self._iter_files()
+ _read_empty_line(reader) # Every content has a trailing '\r\n'.
- def _read_next_chunk(self):
- """Helper function to read next chunk of data and return current chunk."""
- buffered = self._chunk
- try:
- self._chunk = next(self._response_iter)
- except StopIteration:
- self._chunk = None
+ bytes_read = 0 if content is None else len(content)
+ if bytes_read != size:
+ raise FormatError(
+ '%s: Error in reading content (read %d B, expect %d B)' %
+ (filename, bytes_read, size)
+ )
- return buffered
-
- def _read_line(self):
- """Read one CRLF ended line from the response.
-
- Returns:
- The line read. Return None if nothing to read.
- """
- if self._chunk is None:
- return None
-
- buffered = ''
- while True:
- buffered += self._chunk
- parts = buffered.split('\r\n', 1)
- if len(parts) == 2:
- line, self._chunk = parts
- return line
- else: # No '\r\n' in current chunk. Read one more.
- self._read_next_chunk()
- if self._chunk is None:
- return buffered
-
- def _read_bytes(self, max_bytes):
- """Read at most |max_bytes| bytes from the response.
-
- Args:
- max_bytes: An integer of maximum bytes of bytes to read.
-
- Returns:
- The bytes read. Return None if nothing to read.
- """
- if self._chunk is None:
- return None
-
- buffered = ''
- bytes_remaining = max_bytes
- while True:
- bytes_remaining -= len(self._chunk)
- if bytes_remaining < 0:
- buffered += self._chunk[:bytes_remaining]
- self._chunk = self._chunk[bytes_remaining:]
- return buffered
-
- buffered += self._read_next_chunk()
- if self._chunk is None:
- return buffered
-
- def _read_empty_line(self):
- """Read one line and assert it is empty."""
- line = self._read_line()
- if line is None:
- raise FormatError('Expect an empty line, but got EOF.')
-
- if line:
- raise FormatError('Expect an empty line, but got "%s".' % line)
-
- def _iter_files(self):
- """Iterate the files in the response.
-
- Yields:
- A pair of (name, content) of the file.
-
- Raises:
- FormatError: Raised when response content interrupted.
- """
- self._read_empty_line() # The first line is empty.
- while True:
- self._read_line() # The second line is the boundary.
- self._read_line() # The line sub content type.
- sub_range_header = self._read_line() # The line of sub content range.
- if sub_range_header is None:
- break
- self._read_empty_line() # Another empty line.
-
- filename, size = _get_file_by_range_header(sub_range_header,
- self._file_name_map)
- content = self._read_bytes(size)
-
- self._read_empty_line() # Every content has a trailing '\r\n'.
-
- bytes_read = 0 if content is None else len(content)
- if bytes_read != size:
- raise FormatError(
- '%s: Error in reading content (read %d B, expect %d B)' %
- (filename, bytes_read, size)
- )
-
- yield filename, content
+ yield filename, content
diff --git a/gs_cache/tests/range_response_test.py b/gs_cache/tests/range_response_test.py
index eb0c2c6..8f51649 100644
--- a/gs_cache/tests/range_response_test.py
+++ b/gs_cache/tests/range_response_test.py
@@ -90,8 +90,8 @@
"""Test streaming empty response."""
self.response.iter_content.return_value = iter([''])
self.streamer.queue_response(self.response, self.file_info_list)
- result = ''.join(self.streamer.stream())
- self.assertEqual(result, '')
+ with self.assertRaises(range_response.FormatError):
+ ''.join(self.streamer.stream())
def test_stream__multipart_ranges(self):
"""Test streaming files in one response."""
@@ -105,10 +105,11 @@
self.response.iter_content.return_value = iter(self.good_response)
self.streamer.queue_response(self.response, self.file_info_list)
- # Queue the same response but with different file name map.
- self.response.iter_content.return_value = iter(self.good_response)
+ response2 = mock.MagicMock()
+ response2.headers = self.response.headers
+ response2.iter_content.return_value = iter(self.good_response)
self.streamer.queue_response(
- self.response,
+ response2,
[tarfile_utils.TarMemberInfo('FOO', '', '', '10', '10'),
tarfile_utils.TarMemberInfo('BAR', '', '', '123', '1000')])