blob: 04d9a354ae3242f3910d28a1a572767058fd59d6 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import glob
import json
import logging
import os
import pwd
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import autotemp, error
CONFIG_JSON_TEMPLATE = '''
{
"ociVersion": "1.0.0-rc1",
"platform": {
"os": "linux",
"arch": "all"
},
"process": {
"terminal": true,
"user": {
"uid": 0,
"gid": 0
},
"args": [],
"cwd": "/"
},
"root": {
"path": "rootfs",
"readonly": true
},
"hostname": "runc",
"mounts": [
{
"destination": "/",
"type": "bind",
"source": "/",
"options": [
"rbind",
"ro"
]
},
{
"destination": "/proc",
"type": "proc",
"source": "proc",
"options": [
"nodev",
"noexec",
"nosuid"
]
}
],
"hooks": {},
"linux": {
"namespaces": [
{
"type": "cgroup"
},
{
"type": "pid"
},
{
"type": "network"
},
{
"type": "ipc"
},
{
"type": "user"
},
{
"type": "uts"
},
{
"type": "mount"
}
],
"resources": {
"devices": [
{
"allow": false,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 1,
"minor": 5,
"access": "r"
}
]
},
"uidMappings": [
{
"hostID": 1000,
"containerID": 0,
"size": 1
}
],
"gidMappings": [
{
"hostID": 1000,
"containerID": 0,
"size": 1
}
]
}
}
'''
class security_RunOci(test.test):
"""Tests run_oci."""
version = 1
preserve_srcdir = True
def run_test_in_dir(self, test_config, oci_path):
"""
Executes the test in the given directory that points to an OCI image.
@param test_config: The test's configuration in a dict.
@param oci_path: The path of the directory that contains config.json.
"""
result = utils.run(
['/usr/bin/run_oci'] + test_config['run_oci_args'] +
['run', '-c', oci_path, 'test_container'] +
test_config.get('program_extra_argv', '').split(),
ignore_status=True, stderr_is_expected=True, verbose=True,
stdout_tee=utils.TEE_TO_LOGS, stderr_tee=utils.TEE_TO_LOGS)
expected = test_config['expected_result'].strip()
if result.stdout.strip() != expected:
logging.error('stdout mismatch %s != %s',
result.stdout.strip(), expected)
return False
expected_err = test_config.get('expected_stderr', '').strip()
if result.stderr.strip() != expected_err:
logging.error('stderr mismatch %s != %s',
result.stderr.strip(), expected_err)
return False
return True
def run_test(self, name, test_config):
"""
Runs one test from the src directory. Return 0 if the test passes,
return 1 on failure.
@param name: The name of the test.
@param test_config: The test's configuration in a dict.
"""
chronos_uid = pwd.getpwnam('chronos').pw_uid
td = autotemp.tempdir()
os.chown(td.name, chronos_uid, chronos_uid)
with open(os.path.join(td.name, 'config.json'), 'w') as config_file:
config = json.loads(CONFIG_JSON_TEMPLATE)
config['process']['args'] = test_config['program_argv']
if 'overrides' in test_config:
for path, value in test_config['overrides'].iteritems():
node = config
path = path.split('.')
for component in path[:-1]:
if component not in node:
node[component] = {}
node = node[component]
if (path[-1] in node and
isinstance(node[path[-1]], list) and
isinstance(value, list)):
node[path[-1]].extend(value)
else:
node[path[-1]] = value
logging.debug('Running %s with config.json %s',
name, json.dumps(config))
json.dump(config, config_file, indent=2)
rootfs_path = os.path.join(td.name, 'rootfs')
os.mkdir(rootfs_path)
os.chown(rootfs_path, chronos_uid, chronos_uid)
return self.run_test_in_dir(test_config, td.name)
def run_once(self):
"""
Runs each of the tests specified in the source directory.
This test fails if any subtest fails. Sub tests exercise the run_oci
command and check that the correct namespace mappings and mounts are
made. If any subtest fails, this test will fail.
"""
failed = []
ran = 0
for p in glob.glob('%s/test-*.json' % self.srcdir):
name = os.path.basename(p)
logging.info('Running: %s', name)
if not self.run_test(name, json.load(file(p))):
failed.append(name)
ran += 1
if ran == 0:
failed.append('No tests found to run from %s!' % (self.srcdir))
if failed:
logging.error('Failed: %s', failed)
raise error.TestFail('Failed: %s' % failed)