blob: 53576b9fa3e45bc4f38d8b36c46de4a3e434cb29 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import logging
import syslog
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import process_watcher
from autotest_lib.client.cros import dbus_util
SERVICE_NAME = 'org.chromium.leaderd'
# In test images, we add a supplementary set of rules that expand the DBus
# access policy to tolerate us claiming this name and sending messages to
# these services.
TEST_SERVICE_NAME_PREFIX = 'org.chromium.leaderd.test'
DBUS_INTERFACE_MANAGER = 'org.chromium.leaderd.Manager'
DBUS_INTERFACE_GROUP = 'org.chromium.leaderd.Group'
DBUS_PATH_MANAGER = '/org/chromium/leaderd/Manager'
GROUP_PROPERTY_MEMBERS = 'MemberUUIDs'
GROUP_PROPERTY_LEADER_UUID = 'LeaderUUID'
def _get_bus(bus):
"""Get a dbus.Bus object to use.
@param bus: dbus.Bus object to use or None.
"""
if bus is None:
# This is a global singleton DBus connection that stays
# open until explicitly closed or the process exits.
bus = dbus.SystemBus()
return bus
def get_nth_service_name(n):
"""Get the DBus service name for the Nth instance of leaderd.
@param n: int starting from 0 inclusive.
@return string: DBus service name for Nth instance.
"""
if n is None:
return SERVICE_NAME
return '%s.TestInstance%d' % (TEST_SERVICE_NAME_PREFIX, n)
def confirm_leaderd_started(service_name=SERVICE_NAME,
bus=None,
timeout_seconds=10):
"""Confirm that an instance of leaderd is responding to queries over DBus.
@param service_name: string well known DBus connection name of instance.
@param bus: dbus.Bus instance.
@param timeout_seconds: number of seconds to wait for leaderd to respond.
"""
bus = _get_bus(bus)
start_time = time.time()
while time.time() - start_time < timeout_seconds:
try:
manager = dbus.Interface(
bus.get_object(service_name, DBUS_PATH_MANAGER),
DBUS_INTERFACE_MANAGER)
if manager.Ping() == 'Hello world!':
return
except:
pass
time.sleep(0.5)
raise error.TestFail('Timed out before leaderd at %s started.' %
service_name)
class LeaderdHelper(object):
"""Helper object for manipulating leaderd."""
def start_instance(self,
leaderd_service_name,
peerd_service_name,
handler_name,
verbosity_level=3):
"""Start an instance of leaderd.
@param leaderd_service_name: string well known DBus service name to
claim for this instance of leaderd.
@param peerd_service_name: string well known DBus service name of
peerd service for this leaderd instance to depend on.
@param handler_name: string protocol handler name to use from the
webserver.
@param verbosity_level: integer leaderd logging verbosity level.
@return ProcessWatcher object that should be closed to clean up the
started process.
"""
# Here, we're forced to launch leaderd without upstart, because
# upstart is configured to launch a single instance of leaderd,
# rather than many instances in parallel.
syslog.syslog('Starting leaderd service=%s' % leaderd_service_name)
watcher = process_watcher.ProcessWatcher(
'/usr/bin/leaderd',
args=['--v=%d' % verbosity_level,
'--service_name=%s' % leaderd_service_name, '--peerd_service_name=%s' % peerd_service_name,
'--protocol_handler_name=%s' % handler_name,
],
minijail_config=process_watcher.MinijailConfig(user='leaderd',
group='leaderd'))
watcher.start()
confirm_leaderd_started(service_name=leaderd_service_name)
return watcher
def join_group(self, group_id, instance_number=None, bus=None):
"""Join a group on a given instance of leaderd.
@param group_id: string UUID of group to join.
@param instance_number: instance number of leaderd to join to the group.
If not provided, joins the global leaderd instance to the group.
@param bus: dbus.Bus instance if something other than the system bus
should be used to communicate with this instance.
@return string DBus path of group.
"""
logging.info('Joining leaderd group %s.', group_id)
service_name = get_nth_service_name(instance_number)
bus = _get_bus(bus)
manager = dbus.Interface(
bus.get_object(service_name, DBUS_PATH_MANAGER),
DBUS_INTERFACE_MANAGER)
# Note that this instance of leaderd will leave this group
# when the DBus connection held by |bus| dies.
group_path = manager.JoinGroup(dbus.String(str(group_id)),
dbus.Dictionary(dict(), 'sv'))
return dbus_util.dbus2primitive(group_path)
def set_score(self, group_path, score, instance_number=None, bus=None):
"""Set the score of a particular group.
@param group_path: string DBus path of group to set score on.
@param score: integer score.
@param instance_number: instance number of leaderd to join to the group.
If not provided, joins the global leaderd instance to the group.
@param bus: dbus.Bus instance if something other than the system bus
should be used to communicate with this instance.
"""
bus = _get_bus(bus)
service_name = get_nth_service_name(instance_number)
group = dbus.Interface(
bus.get_object(service_name, group_path),
DBUS_INTERFACE_GROUP)
group.SetScore(dbus.Int32(score))
def get_members(self, group_path, instance_number=None, bus=None):
"""Get the members of a group.
@param group_path: string path of group on DBus.
@param instance_number: integer instance of leaderd to contact.
Defaults to the normal system service instance.
@param bus: instance of dbus.Bus if something other than the system bus
should be used.
@return list of string UUIDs representing peers in the group as
seen by this instance of leaderd.
"""
service_name = get_nth_service_name(instance_number)
bus = _get_bus(bus)
group = dbus.Interface(
bus.get_object(service_name, group_path),
dbus.PROPERTIES_IFACE)
members = group.Get(DBUS_INTERFACE_GROUP, GROUP_PROPERTY_MEMBERS)
return dbus_util.dbus2primitive(members)
def get_leader(self, group_path, instance_number=None, bus=None):
"""Get the leader according to a particular group.
@param group_path: string DBus path of group to set score on.
@param instance_number: instance number of leaderd to join to the group.
If not provided, joins the global leaderd instance to the group.
@param bus: dbus.Bus instance if something other than the system bus
should be used to communicate with this instance.
@return string leader id found on group.
"""
service_name = get_nth_service_name(instance_number)
bus = _get_bus(bus)
group = dbus.Interface(
bus.get_object(service_name, group_path),
dbus.PROPERTIES_IFACE)
leader = group.Get(DBUS_INTERFACE_GROUP, GROUP_PROPERTY_LEADER_UUID)
return dbus_util.dbus2primitive(leader)
def confirm_instances_see_each_other(self, group_paths, expected_peer_ids,
timeout_seconds=5, bus=None):
"""Confirm that some previously created instances agree they're all
members of a group.
@param group_paths: dictionary mapping from instance numbers to string
DBus group paths of the group on those instances.
@param expected_peer_ids: list of string peer ids that should be listed
in each group's membership list.
@param timeout_seconds: number of seconds to wait for convergence.
@param bus: dbus.Bus instance if something other than the system bus
should be used to communicate with this instance.
@return True iff all groups list |expected_peer_ids| as group members.
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
for instance_number, group_path in group_paths.iteritems():
actual_peer_ids = sorted(self.get_members(
group_path, instance_number=instance_number, bus=bus))
if actual_peer_ids != expected_peer_ids:
logging.debug('Expected to find group members %r, but got '
'%r on instance %d.',
expected_peer_ids,
actual_peer_ids,
instance_number)
time.sleep(1)
continue # Found an instance with a bad membership list.
return True # All instances agree on the correct membership.
return False # Timed out.
def confirm_instances_follow_leader(self, group_paths, expected_leader_id,
timeout_seconds=30, bus=None):
"""Confirm that some previously created instances agree on a leader.
@param group_paths: dictionary mapping from instance numbers to string
DBus group paths of the group on those instances.
@param expected_leader_id: string identifier of leader (a UUID).
@param timeout_seconds: number of seconds to wait for convergence.
@param bus: dbus.Bus instance if something other than the system bus
should be used to communicate with this instance.
@return True iff all groups give |expected_leader_id| as their leader.
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
for instance_number, group_path in group_paths.iteritems():
actual_leader_id = self.get_leader(
group_path, instance_number=instance_number, bus=bus)
if actual_leader_id != expected_leader_id:
logging.debug('Expected to find leader id %s, but got %s '
'on instance %d.',
expected_leader_id,
actual_leader_id,
instance_number)
time.sleep(1)
continue # Found a peer that disagrees about the leader.
return True # Everyone agrees on the expected leader.
return False # Timed out.