blob: 36d7e0902b6fa85e5ec109b12d2db6d9311c2afa [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import dbus
import logging
import pipes
import re
import shlex
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
# Represents the result of a dbus-send call. |sender| refers to the temporary
# bus name of dbus-send, |responder| to the remote process, and |response|
# contains the parsed response.
DBusSendResult = collections.namedtuple('DBusSendResult', ['sender',
'responder',
'response'])
# Used internally.
DictEntry = collections.namedtuple('DictEntry', ['key', 'value'])
def _build_token_stream(headerless_dbus_send_output):
"""A tokenizer for dbus-send output.
The output is basically just like splitting on whitespace, except that
strings are kept together by " characters.
@param headerless_dbus_send_output: list of lines of dbus-send output
without the meta-information prefix.
@return list of tokens in dbus-send output.
"""
return shlex.split(' '.join(headerless_dbus_send_output))
def _parse_value(token_stream):
"""Turn a stream of tokens from dbus-send output into native python types.
@param token_stream: output from _build_token_stream() above.
"""
# Assumes properly tokenized output (strings with spaces handled).
# Assumes tokens are pre-stripped
token_type = token_stream.pop(0)
if token_type == 'variant':
token_type = token_stream.pop(0)
if token_type == 'object':
token_type = token_stream.pop(0) # Should be 'path'
token_value = token_stream.pop(0)
INT_TYPES = ('int16', 'uint16', 'int32', 'uint32',
'int64', 'uint64', 'byte')
if token_type in INT_TYPES:
return int(token_value)
if token_type == 'string' or token_type == 'path':
return token_value # shlex removed surrounding " chars.
if token_type == 'boolean':
return token_value == 'true'
if token_type == 'double':
return float(token_value)
if token_type == 'array':
values = []
while token_stream[0] != ']':
values.append(_parse_value(token_stream))
token_stream.pop(0)
if values and all([isinstance(x, DictEntry) for x in values]):
values = dict(values)
return values
if token_type == 'dict':
assert token_value == 'entry('
key = _parse_value(token_stream)
value = _parse_value(token_stream)
assert token_stream.pop(0) == ')'
return DictEntry(key=key, value=value)
raise error.TestError('Unhandled DBus type found: %s' % token_type)
def _parse_dbus_send_output(dbus_send_stdout):
"""Turn dbus-send output into usable Python types.
This looks like:
localhost ~ # dbus-send --system --dest=org.chromium.flimflam \
--print-reply --reply-timeout=2000 / \
org.chromium.flimflam.Manager.GetProperties
method return time=1490931987.170070 sender=org.chromium.flimflam -> \
destination=:1.37 serial=6 reply_serial=2
array [
dict entry(
string "ActiveProfile"
variant string "/profile/default"
)
dict entry(
string "ArpGateway"
variant boolean true
)
...
]
@param dbus_send_output: string stdout from dbus-send
@return a DBusSendResult.
"""
lines = dbus_send_stdout.strip().splitlines()
# The first line contains meta-information about the response
header = lines[0]
lines = lines[1:]
dbus_address_pattern = r'[:\d\\.]+|[a-zA-Z.]+'
# The header may or may not have a time= field.
match = re.match(r'method return (time=[\d\\.]+ )?sender=(%s) -> '
r'destination=(%s) serial=\d+ reply_serial=\d+' %
(dbus_address_pattern, dbus_address_pattern), header)
# For backward compatibility, match old dbus-send output too.
# TODO: drop this when dbus is upgraded to 1.10.12+.
if match is None:
match = re.match(r'method return sender=(%s) -> dest=(%s) '
r'reply_serial=\d+' %
(dbus_address_pattern, dbus_address_pattern), header)
if match is None:
raise error.TestError('Could not parse dbus-send header: %s' % header)
sender = match.group(1)
responder = match.group(2)
token_stream = _build_token_stream(lines)
ret_val = _parse_value(token_stream)
# Note that DBus permits multiple response values, and this is not handled.
logging.debug('Got DBus response: %r', ret_val)
return DBusSendResult(sender=sender, responder=responder, response=ret_val)
def _dbus2string(raw_arg):
"""Turn a dbus.* type object into a string that dbus-send expects.
@param raw_dbus dbus.* type object to stringify.
@return string suitable for dbus-send.
"""
int_map = {
dbus.Int16: 'int16:',
dbus.Int32: 'int32:',
dbus.Int64: 'int64:',
dbus.UInt16: 'uint16:',
dbus.UInt32: 'uint32:',
dbus.UInt64: 'uint64:',
dbus.Double: 'double:',
dbus.Byte: 'byte:',
}
if isinstance(raw_arg, dbus.String):
return pipes.quote('string:%s' % raw_arg.replace('"', r'\"'))
if isinstance(raw_arg, dbus.Boolean):
if raw_arg:
return 'boolean:true'
else:
return 'boolean:false'
for prim_type, prefix in int_map.iteritems():
if isinstance(raw_arg, prim_type):
return prefix + str(raw_arg)
raise error.TestError('No support for serializing %r' % raw_arg)
def _build_arg_string(raw_args):
"""Construct a string of arguments to a DBus method as dbus-send expects.
@param raw_args list of dbus.* type objects to seriallize.
@return string suitable for dbus-send.
"""
return ' '.join([_dbus2string(arg) for arg in raw_args])
def dbus_send(bus_name, interface, object_path, method_name, args=None,
host=None, timeout_seconds=2, tolerate_failures=False, user=None):
"""Call dbus-send without arguments.
@param bus_name: string identifier of DBus connection to send a message to.
@param interface: string DBus interface of object to call method on.
@param object_path: string DBus path of remote object to call method on.
@param method_name: string name of method to call.
@param args: optional list of arguments. Arguments must be of types
from the python dbus module.
@param host: An optional host object if running against a remote host.
@param timeout_seconds: number of seconds to wait for a response.
@param tolerate_failures: boolean True to ignore problems receiving a
response.
@param user: An option argument to run dbus-send as a given user.
"""
run = utils.run if host is None else host.run
cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s '
'%s %s.%s' % (int(timeout_seconds * 1000), bus_name,
object_path, interface, method_name))
if user is not None:
cmd = ('sudo -u %s %s' % (user, cmd))
if args is not None:
cmd = cmd + ' ' + _build_arg_string(args)
result = run(cmd, ignore_status=tolerate_failures)
if result.exit_status != 0:
logging.debug('%r', result.stdout)
return None
return _parse_dbus_send_output(result.stdout)
def get_property(bus_name, interface, object_path, property_name, host=None):
"""A helpful wrapper that extracts the value of a DBus property.
@param bus_name: string identifier of DBus connection to send a message to.
@param interface: string DBus interface exposing the property.
@param object_path: string DBus path of remote object to call method on.
@param property_name: string name of property to get.
@param host: An optional host object if running against a remote host.
"""
return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get',
args=[dbus.String(interface), dbus.String(property_name)],
host=host)