| # Copyright 2012-2014 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import io |
| import sys |
| from datetime import datetime |
| from time import mktime |
| from email.utils import formatdate, parsedate |
| |
| try: |
| from urllib.request import urlopen as _urlopen |
| import urllib.parse as urllib_parse |
| import urllib.request as urllib_request |
| from urllib.parse import splituser as urllib_parse_splituser |
| except ImportError: |
| from urllib import urlopen as _urlopen |
| import urlparse as urllib_parse |
| import urllib2 as urllib_request |
| from urllib import splituser as urllib_parse_splituser |
| |
| if sys.hexversion >= 0x3000000: |
| # pylint: disable=W0622 |
| long = int |
| |
| # to account for the difference between TIMESTAMP of the index' contents |
| # and the file-'mtime' |
| TIMESTAMP_TOLERANCE = 5 |
| |
| |
| def have_pep_476(): |
| """ |
| Test whether ssl certificate verification is enabled by default for |
| stdlib http clients (PEP 476). |
| |
| @returns: bool, True if ssl certificate verification is enabled by |
| default |
| """ |
| return hasattr(__import__('ssl'), '_create_unverified_context') |
| |
| |
| def urlopen(url, if_modified_since=None): |
| parse_result = urllib_parse.urlparse(url) |
| if parse_result.scheme not in ("http", "https"): |
| return _urlopen(url) |
| else: |
| netloc = urllib_parse_splituser(parse_result.netloc)[1] |
| url = urllib_parse.urlunparse((parse_result.scheme, netloc, parse_result.path, parse_result.params, parse_result.query, parse_result.fragment)) |
| password_manager = urllib_request.HTTPPasswordMgrWithDefaultRealm() |
| request = urllib_request.Request(url) |
| request.add_header('User-Agent', 'Gentoo Portage') |
| if if_modified_since: |
| request.add_header('If-Modified-Since', _timestamp_to_http(if_modified_since)) |
| if parse_result.username is not None: |
| password_manager.add_password(None, url, parse_result.username, parse_result.password) |
| auth_handler = CompressedResponseProcessor(password_manager) |
| opener = urllib_request.build_opener(auth_handler) |
| hdl = opener.open(request) |
| if hdl.headers.get('last-modified', ''): |
| try: |
| add_header = hdl.headers.add_header |
| except AttributeError: |
| # Python 2 |
| add_header = hdl.headers.addheader |
| add_header('timestamp', _http_to_timestamp(hdl.headers.get('last-modified'))) |
| return hdl |
| |
| def _timestamp_to_http(timestamp): |
| dt = datetime.fromtimestamp(float(long(timestamp)+TIMESTAMP_TOLERANCE)) |
| stamp = mktime(dt.timetuple()) |
| return formatdate(timeval=stamp, localtime=False, usegmt=True) |
| |
| def _http_to_timestamp(http_datetime_string): |
| tuple = parsedate(http_datetime_string) |
| timestamp = mktime(tuple) |
| return str(long(timestamp)) |
| |
| class CompressedResponseProcessor(urllib_request.HTTPBasicAuthHandler): |
| # Handler for compressed responses. |
| |
| def http_request(self, req): |
| req.add_header('Accept-Encoding', 'bzip2,gzip,deflate') |
| return req |
| https_request = http_request |
| |
| def http_response(self, req, response): |
| decompressed = None |
| if response.headers.get('content-encoding') == 'bzip2': |
| import bz2 |
| decompressed = io.BytesIO(bz2.decompress(response.read())) |
| elif response.headers.get('content-encoding') == 'gzip': |
| from gzip import GzipFile |
| decompressed = GzipFile(fileobj=io.BytesIO(response.read()), mode='r') |
| elif response.headers.get('content-encoding') == 'deflate': |
| import zlib |
| try: |
| decompressed = io.BytesIO(zlib.decompress(response.read())) |
| except zlib.error: # they ignored RFC1950 |
| decompressed = io.BytesIO(zlib.decompress(response.read(), -zlib.MAX_WBITS)) |
| if decompressed: |
| old_response = response |
| response = urllib_request.addinfourl(decompressed, old_response.headers, old_response.url, old_response.code) |
| response.msg = old_response.msg |
| return response |
| https_response = http_response |