blob: 99042bb1bf8068316f1e93ee3f44ce120352547a [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import logging
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.tendo import peerd_config
from autotest_lib.client.cros import chrooted_avahi
from autotest_lib.client.cros.netprotos import interface_host
from autotest_lib.client.cros.netprotos import zeroconf
from autotest_lib.client.cros.tendo import peerd_dbus_helper
class peerd_DiscoverServices(test.test):
"""Test that peerd can correctly discover services over mDNS."""
version = 1
FakeService = collections.namedtuple('FakeService',
'service_id service_info port')
FAKE_HOST_HOSTNAME = 'test-host'
TEST_TIMEOUT_SECONDS = 30
PEER_ID = '123e4567-e89b-12d3-a456-426655440000'
PEER_SERBUS_VERSION = '1.12'
PEER_SERVICES = [FakeService('test-service-0',
{'some_data': 'a value',
'other_data': 'another value',
},
8080),
FakeService('test-service-1',
{'again': 'so much data',
},
8081),
]
SERBUS_SERVICE_NAME = '_serbus'
SERBUS_PROTOCOL = '_tcp'
SERBUS_PORT = 0
SERBUS_TXT_DICT = {'ver': PEER_SERBUS_VERSION,
'id': PEER_ID,
'services': '.'.join([service.service_id
for service in PEER_SERVICES])
}
UNIQUE_PREFIX = 'a_unique_mdns_prefix'
def initialize(self):
# Make sure these are initiallized to None in case we throw
# during self.initialize().
self._chrooted_avahi = None
self._peerd = None
self._host = None
self._zc_listener = None
self._chrooted_avahi = chrooted_avahi.ChrootedAvahi()
self._chrooted_avahi.start()
# Start up a fresh copy of peerd with really verbose logging.
self._peerd = peerd_dbus_helper.make_helper(
peerd_config.PeerdConfig(verbosity_level=3))
# Listen on our half of the interface pair for mDNS advertisements.
self._host = interface_host.InterfaceHost(
self._chrooted_avahi.unchrooted_interface_name)
self._zc_listener = zeroconf.ZeroconfDaemon(self._host,
self.FAKE_HOST_HOSTNAME)
# The queries for hostname/dns_domain are IPCs and therefore relatively
# expensive. Do them just once.
hostname = self._chrooted_avahi.hostname
dns_domain = self._chrooted_avahi.dns_domain
if not hostname or not dns_domain:
raise error.TestFail('Failed to get hostname/domain from avahi.')
self._dns_domain = dns_domain
self._hostname = '%s.%s' % (hostname, dns_domain)
def cleanup(self):
for obj in (self._chrooted_avahi,
self._host,
self._peerd):
if obj is not None:
obj.close()
def _has_expected_peer(self):
peer = self._peerd.has_peer(self.PEER_ID)
if peer is None:
logging.debug('No peer found.')
return False
logging.debug('Found peer=%s', peer)
if len(peer.services) != len(self.PEER_SERVICES):
logging.debug('Found %d services, but expected %d.',
len(peer.services), len(self.PEER_SERVICES))
return False
for service_id, info, port in self.PEER_SERVICES:
service = None
for s in peer.services:
if s.service_id == service_id:
service = s
break
else:
logging.debug('No service %s found.', service_id)
return False
if service.service_info != info:
logging.debug('Invalid info found for service %s, '
'expected %r but got %r.', service_id,
info, service.service_info)
return False
if len(service.service_ips) != 1:
logging.debug('Missing service IP for service %s.',
service_id)
return False
# We're publishing records from a "peer" outside the chroot.
expected_addr = (self._chrooted_avahi.MONITOR_IF_IP.addr, port)
if service.service_ips[0] != expected_addr:
logging.debug('Expected service IP for service %s=%r '
'but got %r.',
service_id, expected_addr, service.service_ips[0])
return False
return True
def run_once(self):
# Expose serbus mDNS records through our fake peer.
self._zc_listener.register_service(
self.UNIQUE_PREFIX,
self.SERBUS_SERVICE_NAME,
self.SERBUS_PROTOCOL,
self.SERBUS_PORT,
['='.join(pair) for pair in self.SERBUS_TXT_DICT.iteritems()])
for service_id, info, port in self.PEER_SERVICES:
self._zc_listener.register_service(
self.UNIQUE_PREFIX,
'_' + service_id,
self.SERBUS_PROTOCOL,
port,
['='.join(pair) for pair in info.iteritems()])
# Look for mDNS records through peerd
self._peerd.start_monitoring([peerd_dbus_helper.TECHNOLOGY_MDNS])
# Wait for advertisements of that service to appear from avahi.
logging.info('Waiting for peerd to discover our services.')
success, duration = self._host.run_until(self._has_expected_peer,
self.TEST_TIMEOUT_SECONDS)
logging.debug('Took %f seconds to find our peer.', duration)
if not success:
raise error.TestFail('Peerd failed to publish suitable DBus '
'proxies in time.')