blob: 5f3a7a56a596ea962136d3a85c89411f20370abc [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import json
import os
import tempfile
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cros_disks import CrosDisksTester
def try_remove(filename):
try:
os.remove(filename)
return True
except OSError:
return False
class CrosDisksFuseTester(CrosDisksTester):
"""Common steps for all FUSE-based tests.
"""
def __init__(self, test, test_configs):
super(CrosDisksFuseTester, self).__init__(test)
self._test_configs = test_configs
def setup_test_case(self, config):
pass
def teardown_test_case(self, config):
pass
def verify_test_case(self, config, mount_result):
pass
def _test_case(self, config):
logging.info('Testing "%s"', config['description'])
self.setup_test_case(config)
try:
source = config['test_mount_source_uri']
fstype = config.get('test_mount_filesystem_type')
options = config['test_mount_options']
expected_mount_completion = {
'status': config['expected_mount_status'],
'source_path': source,
}
if 'expected_mount_path' in config:
expected_mount_completion['mount_path'] = \
config['expected_mount_path']
self.cros_disks.mount(source, fstype, options)
result = self.cros_disks.expect_mount_completion(
expected_mount_completion)
try:
self.verify_test_case(config, result)
finally:
self.cros_disks.unmount(source, ['lazy'])
finally:
self.teardown_test_case(config)
def _run_all_test_cases(self):
try:
for config in self._test_configs:
self._test_case(config)
except RuntimeError:
cmd = 'ls -la %s' % tempfile.gettempdir()
logging.debug(utils.run(cmd))
raise
def get_tests(self):
return [self._run_all_test_cases]
SSH_DIR_PATH = '/home/chronos/user/.ssh'
AUTHORIZED_KEYS = os.path.join(SSH_DIR_PATH, 'authorized_keys')
AUTHORIZED_KEYS_BACKUP = AUTHORIZED_KEYS + '.sshfsbak'
class CrosDisksSshfsTester(CrosDisksFuseTester):
"""A tester to verify sshfs support in CrosDisks.
"""
def __init__(self, test, test_configs):
super(CrosDisksSshfsTester, self).__init__(test, test_configs)
def setup_test_case(self, config):
if os.path.exists(AUTHORIZED_KEYS):
# Make backup of the current authorized_keys
utils.run('mv -f ' + AUTHORIZED_KEYS + ' ' + AUTHORIZED_KEYS_BACKUP,
ignore_status=True)
keyfile = config.get('test_ssh_identity_file')
if keyfile:
self._generate_key(keyfile)
self._register_key(keyfile + '.pub')
knownhosts = config.get('test_ssh_known_hosts_file')
if knownhosts:
self._whitelist_host(knownhosts)
def teardown_test_case(self, config):
keyfile = config.get('test_ssh_identity_file')
if keyfile:
try_remove(keyfile)
try_remove(keyfile + '.pub')
knownhosts = config.get('test_ssh_known_hosts_file')
if knownhosts:
try_remove(knownhosts)
if os.path.exists(AUTHORIZED_KEYS_BACKUP):
# Restore authorized_keys from backup.
utils.run('mv -f ' + AUTHORIZED_KEYS_BACKUP + ' ' + AUTHORIZED_KEYS,
ignore_status=True)
def verify_test_case(self, config, mount_result):
if 'expected_file' in config:
f = config['expected_file']
if not os.path.exists(f):
raise error.TestFail('Expected file "' + f + '" not found')
def _generate_key(self, keyfile):
try_remove(keyfile)
try_remove(keyfile + '.pub')
utils.run('ssh-keygen -b 2048 -t rsa -f "' + keyfile + '" -q -N ""')
os.chmod(keyfile, 0644)
def _register_key(self, pubkey):
utils.run('sudo -u chronos mkdir -p ' + SSH_DIR_PATH,
ignore_status=True)
utils.run('sudo -u chronos cp -f ' + pubkey + ' ' + AUTHORIZED_KEYS)
def _whitelist_host(self, knownhosts):
hostkey = '/mnt/stateful_partition/etc/ssh/ssh_host_ed25519_key.pub'
with open(hostkey, 'rb') as f:
keydata = f.readline().split()
with open(knownhosts, 'wb') as f:
f.write('localhost {} {}\n'.format(keydata[0], keydata[1]))
os.chmod(knownhosts, 0644)
class platform_CrosDisksSshfs(test.test):
version = 1
def run_once(self, *args, **kwargs):
test_configs = []
config_file = '%s/%s' % (self.bindir, kwargs['config_file'])
with open(config_file, 'rb') as f:
test_configs.extend(json.load(f))
tester = CrosDisksSshfsTester(self, test_configs)
tester.run(*args, **kwargs)