blob: 9526005e7cf4f987fa03c6a5806e73f38ebbeeb1 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import socket
import subprocess
import sys
import time
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
class security_HtpdateHTTP(test.test):
"""Implements regression tests for Date header parsing in htpdate."""
version = 1
_PORT = 19998
_FLAG = '/var/run/HtpdateHTTP'
_MAX_FLAG_WAIT = 10
_MAX_RECV = 1024
_HTPDATE_BIN = '/usr/sbin/htpdate'
# Test cases taken from http://crosbug.com/8941#c3.
_TESTS = {
'No Date Header': 'HTTP/1.1 200 OK\r\n\r\n',
'Empty Date': 'HTTP/1.1 200 OK\r\nDate: \r\n\r\n',
'No GMT':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar 2012 17:11:52\r\n\r\n',
'No Seconds':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar 2012 17:11:\r\n\r\n',
'No Minutes':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar 2012 17:\r\n\r\n',
'No Hours':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar 2012\r\n\r\n',
'No Year':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar\r\n\r\n',
'No Month':
'HTTP/1.1 200 OK\r\nDate: Wed, 14\r\n\r\n',
'Single Digit Day':
'HTTP/1.1 200 OK\r\nDate: Wed, 7 Mar 2012 17:11:52 GMT\r\n\r\n',
'Multi Digit Day < 10':
'HTTP/1.1 200 OK\r\nDate: Wed, 07 Mar 2012 17:11:52 GMT\r\n\r\n',
'Multi Digit Day > 10':
'HTTP/1.1 200 OK\r\nDate: Wed, 14 Mar 2012 17:11:52 GMT\r\n\r\n',
}
def wait_on_flag(self, flag_file, max_checks):
for i in range(max_checks):
if os.path.exists(flag_file):
return True
else:
time.sleep(1)
return os.path.exists(flag_file)
def test_case(self, test_name, test_data):
pid = os.fork()
if (pid):
# We're in the parent. Spin until the child is ready for us,
# otherwise we might race and launch htpdate before the
# listener is set up.
if not self.wait_on_flag(self._FLAG, self._MAX_FLAG_WAIT):
raise error.TestFail('Listener never showed up.')
proc = subprocess.Popen([self._HTPDATE_BIN, '-q', '-d',
'localhost:'+str(self._PORT)])
retcode = proc.wait()
logging.debug('Test case %s: Exit status: %d' %
(test_name, retcode))
if retcode < 0:
raise error.TestFail('Crashed during %s.' % test_name)
else:
# We're in the child. Note that while it might seem
# inefficient to set up and tear down this socket per-test
# in an entirely seperate process, testing shows this has
# some important stability consequences for the test
# itself. Htpdate in some - but not all - cases will
# attempt 2 consecutive connections, and we get long hangs
# during the second attempt if the listening process has
# not exited. Tearing down the listener from within the
# long-lived python process is apparently not sufficient
# to get the connection attempt refused quickly, so we do
# it in a child that we can exit entirely.
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', self._PORT))
server.listen(0)
# Touch the flagfile to let the parent know the listen is up.
file(self._FLAG, 'a').close()
sock, address = server.accept()
junk = sock.recv(self._MAX_RECV)
sock.send(test_data)
sock.shutdown(socket.SHUT_RDWR)
sock.close()
os.unlink(self._FLAG)
sys.exit(0)
def run_once(self):
for test_name, test_data in self._TESTS.items():
self.test_case(test_name, test_data)