blob: fc9db74a0efb0eb02765edaceab0df191a216daf [file] [log] [blame]
# Copyright 2012-2014 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import io
import sys
from datetime import datetime
from time import mktime
from email.utils import formatdate, parsedate
from urllib.request import urlopen as _urlopen
import urllib.parse as urllib_parse
import urllib.request as urllib_request
from urllib.parse import splituser as urllib_parse_splituser
except ImportError:
from urllib import urlopen as _urlopen
import urlparse as urllib_parse
import urllib2 as urllib_request
from urllib import splituser as urllib_parse_splituser
if sys.hexversion >= 0x3000000:
# pylint: disable=W0622
long = int
# to account for the difference between TIMESTAMP of the index' contents
# and the file-'mtime'
def have_pep_476():
Test whether ssl certificate verification is enabled by default for
stdlib http clients (PEP 476).
@returns: bool, True if ssl certificate verification is enabled by
return hasattr(__import__('ssl'), '_create_unverified_context')
def urlopen(url, if_modified_since=None):
parse_result = urllib_parse.urlparse(url)
if parse_result.scheme not in ("http", "https"):
return _urlopen(url)
netloc = urllib_parse_splituser(parse_result.netloc)[1]
url = urllib_parse.urlunparse((parse_result.scheme, netloc, parse_result.path, parse_result.params, parse_result.query, parse_result.fragment))
password_manager = urllib_request.HTTPPasswordMgrWithDefaultRealm()
request = urllib_request.Request(url)
request.add_header('User-Agent', 'Gentoo Portage')
if if_modified_since:
request.add_header('If-Modified-Since', _timestamp_to_http(if_modified_since))
if parse_result.username is not None:
password_manager.add_password(None, url, parse_result.username, parse_result.password)
auth_handler = CompressedResponseProcessor(password_manager)
opener = urllib_request.build_opener(auth_handler)
hdl =
if hdl.headers.get('last-modified', ''):
add_header = hdl.headers.add_header
except AttributeError:
# Python 2
add_header = hdl.headers.addheader
add_header('timestamp', _http_to_timestamp(hdl.headers.get('last-modified')))
return hdl
def _timestamp_to_http(timestamp):
dt = datetime.fromtimestamp(float(long(timestamp)+TIMESTAMP_TOLERANCE))
stamp = mktime(dt.timetuple())
return formatdate(timeval=stamp, localtime=False, usegmt=True)
def _http_to_timestamp(http_datetime_string):
tuple = parsedate(http_datetime_string)
timestamp = mktime(tuple)
return str(long(timestamp))
class CompressedResponseProcessor(urllib_request.HTTPBasicAuthHandler):
# Handler for compressed responses.
def http_request(self, req):
req.add_header('Accept-Encoding', 'bzip2,gzip,deflate')
return req
https_request = http_request
def http_response(self, req, response):
decompressed = None
if response.headers.get('content-encoding') == 'bzip2':
import bz2
decompressed = io.BytesIO(bz2.decompress(
elif response.headers.get('content-encoding') == 'gzip':
from gzip import GzipFile
decompressed = GzipFile(fileobj=io.BytesIO(, mode='r')
elif response.headers.get('content-encoding') == 'deflate':
import zlib
decompressed = io.BytesIO(zlib.decompress(
except zlib.error: # they ignored RFC1950
decompressed = io.BytesIO(zlib.decompress(, -zlib.MAX_WBITS))
if decompressed:
old_response = response
response = urllib_request.addinfourl(decompressed, old_response.headers, old_response.url, old_response.code)
response.msg = old_response.msg
return response
https_response = http_response