blob: e9272fcccc5d47d32b53b0178eecebd6281c399d [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import random
import string
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
URL_PING = 'ping'
URL_INFO = 'info'
URL_AUTH = 'v3/auth'
URL_PAIRING_CONFIRM = 'v3/pairing/confirm'
URL_PAIRING_START = 'v3/pairing/start'
URL_SETUP_START = 'v3/setup/start'
URL_SETUP_STATUS = 'v3/setup/status'
PRIVETD_CONF_FILE_PATH = '/tmp/privetd.conf'
PRIVETD_TEMP_STATE_FILE = '/tmp/privetd.state'
DEFAULT_DEVICE_CLASS = 'AB' # = development board
DEFAULT_DEVICE_MODEL_ID = 'AAA' # = unregistered
def privetd_is_installed(host=None):
"""Check if the privetd binary is installed.
@param host: Host object if we're interested in a remote host.
@return True iff privetd is installed in this system.
run =
if host is not None:
run =
result = run('if [ -f /usr/bin/privetd ]; then exit 0; fi; exit 1',
if result.exit_status == 0:
return True
return False
class PrivetdConfig(object):
"""An object that knows how to restart privetd in various configurations."""
_num_names_generated = 0
def naive_restart(host=None):
"""Restart privetd without modifying any settings.
@param host: Host object if privetd is running on a remote host.
run =
if host is not None:
run =
run('stop privetd', ignore_status=True)
run('start privetd')
def build_unique_device_name():
"""@return a test-unique name for a Privet device."""
RAND_CHARS = string.ascii_lowercase + string.digits
rand_token = ''.join([random.choice(RAND_CHARS)
for _ in range(NUM_RAND_CHARS)])
name = 'CrOS Core %s_%2d' % (rand_token,
PrivetdConfig._num_names_generated += 1
logging.debug('Generated unique device name %s', name)
return name
def __init__(self,
"""Construct a privetd configuration.
@param wifi_bootstrap_mode: one of BOOTSTRAP_CONFIG_* above.
@param gcd_bootstrap_mode: one of BOOTSTRAP_CONFIG_* above.
@param monitor_timeout_seconds: int timeout for the WiFi bootstrapping
state machine.
@param connect_timeout_seconds: int timeout for the WiFi bootstrapping
state machine.
@param bootstrap_timeout_seconds: int timeout for the WiFi bootstrapping
state machine.
@param log_verbosity: int logging verbosity for privetd.
@param state_file_path: string path to privetd state file.
@param clean_state: bool True to clear state from the state file.
|state_file_path| must not be None if this is True.
@param log_verbosity: integer verbosity level of log messages.
@param enable_ping: bool True if we should enable the ping URL
on the privetd web server.
@param http_port: integer port number for the privetd HTTP server.
@param https_port: integer port number for the privetd HTTPS server.
@param device_whitelist: list of string network interface names to
consider exclusively for connectivity monitoring (e.g.
['eth0', 'wlan0']).
@param disable_security: bool True to disable pairing security
@param device_name: string Device name. A 'unique' device name
will be generated if this is unspecified.
@param device_class: string device class, a two character string.
@param device_model_id: string device model ID, a 3 character string.
self.wifi_bootstrap_mode = wifi_bootstrap_mode
self.gcd_bootstrap_mode = gcd_bootstrap_mode
self.monitor_timeout_seconds = monitor_timeout_seconds
self.connect_timeout_seconds = connect_timeout_seconds
self.bootstrap_timeout_seconds = bootstrap_timeout_seconds
self.log_verbosity = log_verbosity
self.clean_state = clean_state
self.state_file_path = state_file_path
self.enable_ping = enable_ping
self.http_port = http_port
self.https_port = https_port
self.device_whitelist = device_whitelist
self.disable_pairing_security = disable_pairing_security
self.device_name = (device_name or
self.device_class = device_class
self.device_model_id = device_model_id
def restart_with_config(self, host=None):
"""Restart privetd in this configuration.
@param host: Host object if privetd is running on a remote host.
run =
if host is not None:
run =
conf_dict = {
'wifi_bootstrapping_mode': self.wifi_bootstrap_mode,
'gcd_bootstrapping_mode': self.gcd_bootstrap_mode,
'monitor_timeout_seconds': self.monitor_timeout_seconds,
'connect_timeout_seconds': self.connect_timeout_seconds,
'bootstrap_timeout_seconds': self.bootstrap_timeout_seconds,
'device_class': self.device_class,
'device_model_id': self.device_model_id,
'device_name': self.device_name,
flag_list = []
flag_list.append('PRIVETD_LOG_LEVEL=%d' % self.log_verbosity)
flag_list.append('PRIVETD_HTTP_PORT=%d' % self.http_port)
flag_list.append('PRIVETD_HTTPS_PORT=%d' % self.https_port)
if self.enable_ping:
if self.disable_pairing_security:
if self.device_whitelist:
flag_list.append('PRIVETD_DEVICE_WHITELIST=%s' %
if self.state_file_path:
flag_list.append('PRIVETD_STATE_PATH=%s' % self.state_file_path)
run('stop privetd', ignore_status=True)
conf_lines = ['%s=%s' % pair for pair in conf_dict.iteritems()]
# Go through this convoluted shell magic here because we need to create
# this file on both remote and local hosts (see how run() is defined).
run('cat <<EOF >%s\n%s\nEOF\n' % (PRIVETD_CONF_FILE_PATH,
if self.clean_state:
if not self.state_file_path:
raise error.TestError('Cannot clean unknown state file path.')
run('echo > %s' % self.state_file_path)
run('chown privetd:privetd %s' % self.state_file_path)
run('start privetd %s' % ' '.join(flag_list))
def is_softap_ssid(self, ssid):
"""Check whether |ssid| could represent privetd with this config.
@param ssid: string SSID of network.
@return True iff this could be a network started by privetd configured
with settings in |self|.
logging.debug('Checking whether softAP SSID "%s" '
'looks like a privet SSID', ssid)
if len(ssid) > 31:
logging.debug('SSID was too long')
return False
if ssid.find('.') < 0:
logging.debug('Missing SSID separator')
return False
name, suffix = ssid.split('.', 1)
if len(suffix) != 10:
logging.debug('Suffix was %d characters, rather than 10.',
return False
device_class = suffix[0:2]
model_id = suffix[2:5]
flags = suffix[5:7]
version = suffix[7:10]
if version != 'prv':
logging.debug('Suffix should end with prv, not %s', suffix)
return False
if self.device_class is not None and device_class != self.device_class:
logging.debug('Expected device_class=%s, but got %s',
self.device_class, device_class)
return False
if self.device_model_id and model_id != self.device_model_id:
logging.debug('Expected model_id=%s, but got %s',
self.device_model_id, model_id)
return False
# TODO(wiley) Add flag support
if not name.startswith(self.device_name):
logging.debug('Expected SSID to start with "%s" but got "%s".',
self.device_name, name)
return False
return True
class PrivetdHelper(object):
"""Delegate class containing logic useful with privetd."""
def __init__(self, host=None):
self._host = None
self._run =
if host is not None:
self._host = host
self._run =
self._http_port = DEFAULT_HTTP_PORT
self._https_port = DEFAULT_HTTPS_PORT
def _build_privet_url(self, path_fragment, use_https=True):
"""Builds a request URL for privet.
@param path_fragment: URL path fragment to be appended to /privet/ URL.
@param use_https: set to False to use 'http' protocol instead of https.
@return The full URL to be used for request.
protocol = 'http'
port = self._http_port
if use_https:
protocol = 'https'
port = self._https_port
hostname = ''
url = '%s://%s:%s/privet/%s' % (protocol, hostname, port, path_fragment)
return url
def _http_request(self, url, request_data=None, retry_count=0,
retry_delay=0.3, headers={}):
"""Sends a GET/POST request to a web server at the given |url|.
If the request fails due to error 111:Connection refused, try it again
after |retry_delay| seconds and repeat this to a max |retry_count|.
This is needed to make sure peerd has a chance to start up and start
responding to HTTP requests.
@param url: URL path to send the request to.
@param request_data: json data to send in POST request.
If None, a GET request is sent with no data.
@param retry_count: max request retry count.
@param retry_delay: retry_delay (in seconds) between retries.
@param headers: optional dictionary of http request headers
@return The string content of the page requested at url.
logging.debug('Requesting %s', url)
args = []
if request_data is not None:
headers['Content-Type'] = 'application/json; charset=utf8'
for header in headers.iteritems():
args.append(': '.join(header))
# TODO(wiley do cert checking
# Write the HTTP code to stdout
output_file = '/tmp/privetd_http_output'
while retry_count >= 0:
result = self._run('curl %s' % url, args=args,
retry_count -= 1
raw_response = ''
success = result.exit_status == 0
http_code = result.stdout
if success:
raw_response = self._run('cat %s' % output_file).stdout
logging.debug('Got raw response: %s', raw_response)
if success and http_code == '200':
return raw_response
if retry_count < 0:
raise error.TestFail('Failed requesting %s (code=%s)' %
(url, http_code))
logging.warn('Failed to connect to host. Retrying...')
def send_privet_request(self, path_fragment, request_data=None,
auth_token='Privet anonymous'):
"""Sends a privet request over HTTPS.
@param path_fragment: URL path fragment to be appended to /privet/ URL.
@param request_data: json data to send in POST request.
If None, a GET request is sent with no data.
@param auth_token: authorization token to be added as 'Authorization'
http header using 'Privet' as the auth realm.
if isinstance(request_data, dict):
request_data = json.dumps(request_data)
headers = {'Authorization': auth_token}
url = self._build_privet_url(path_fragment, use_https=True)
data = self._http_request(url, request_data=request_data,
json_data = json.loads(data)
data = json.dumps(json_data) # Drop newlines, pretty format.
finally:'Received /privet/%s response: %s',
path_fragment, data)
return json_data
def ping_server(self, use_https=False):
"""Ping the privetd webserver.
Reuses port numbers from the last restart request. The server
must have been restarted with enable_ping=True for this to work.
@param use_https: set to True to use 'https' protocol instead of 'http'.
url = self._build_privet_url(URL_PING, use_https=use_https);
content = self._http_request(url, retry_count=5)
if content != 'Hello, world!':
raise error.TestFail('Unexpected response from web server: %s.' %
def privet_auth(self):
"""Go through pairing and insecure auth.
@return resulting auth token.
data = {'pairing': 'pinCode', 'crypto': 'none'}
pairing = self.send_privet_request(URL_PAIRING_START, request_data=data)
data = {'sessionId': pairing['sessionId'],
'clientCommitment': pairing['deviceCommitment']
self.send_privet_request(URL_PAIRING_CONFIRM, request_data=data)
data = {'authCode': pairing['deviceCommitment'],
'mode': 'pairing',
'requestedScope': 'owner'
auth = self.send_privet_request(URL_AUTH, request_data=data)
auth_token = '%s %s' % (auth['tokenType'], auth['accessToken'])
return auth_token
def setup_add_wifi_credentials(self, ssid, passphrase, data={}):
"""Add WiFi credentials to the data provided to setup_start().
@param ssid: string ssid of network to connect to.
@param passphrase: string passphrase for network.
@param data: optional dict of information to append to.
data['wifi'] = {'ssid': ssid, 'passphrase': passphrase}
return data
def setup_start(self, data, auth_token):
"""Provide privetd with credentials for various services.
@param data: dict of information to give to privetd. Should be
formed by one or more calls to setup_add_*() above.
@param auth_token: string auth token returned from privet_auth()
@return dict containing the parsed JSON response.
response = self.send_privet_request(URL_SETUP_START, request_data=data,
return response
def wifi_setup_was_successful(self, ssid, auth_token):
"""Detect whether privetd thinks bootstrapping has succeeded.
@param ssid: string network we expect to connect to.
@param auth_token: string auth token returned from prviet_auth()
@return True iff setup/status reports success in connecting to
the given network.
response = self.send_privet_request(URL_SETUP_STATUS,
return (response['wifi']['status'] == 'success' and
response['wifi']['ssid'] == ssid)