blob: d2694097b8eb3722bc49adff67d34a894656e5c0 [file] [log] [blame]
from __future__ import unicode_literals
from __future__ import print_function
import sys
import json
import uuid
# Source of truth is DUTPool enum at
# https://cs.chromium.org/chromium/infra/go/src/infra/libs/skylab/inventory/device.proto
MANAGED_POOLS = {
"cq": "DUT_POOL_CQ",
# TODO(gregorynisbet): BVT is obsolete, send stuff to QUOTA intead
"bvt": "DUT_POOL_QUOTA",
"suites": "DUT_POOL_SUITES",
"cts": "DUT_POOL_CTS",
"cts-perbuild": "DUT_POOL_CTS_PERBUILD",
"continuous": "DUT_POOL_CONTINUOUS",
"arc-presubmit": "DUT_POOL_ARC_PRESUBMIT",
"quota": "DUT_POOL_QUOTA",
}
VIDEO_ACCELERATION_WHITELIST = {
"VIDEO_ACCELERATION_H264",
"VIDEO_ACCELERATION_ENC_H264",
"VIDEO_ACCELERATION_VP8",
"VIDEO_ACCELERATION_ENC_VP8",
"VIDEO_ACCELERATION_VP9",
"VIDEO_ACCELERATION_ENC_VP9",
"VIDEO_ACCELERATION_VP9_2",
"VIDEO_ACCELERATION_ENC_VP9_2",
"VIDEO_ACCELERATION_H265",
"VIDEO_ACCELERATION_ENC_H265",
"VIDEO_ACCELERATION_MJPG",
"VIDEO_ACCELERATION_ENC_MJPG",
}
PHASE_WHITELIST = {
"PHASE_INVALID",
"PHASE_EVT",
"PHASE_EVT2",
"PHASE_DVT",
"PHASE_DVT2",
"PHASE_PVT",
"PHASE_PVT2",
"PHASE_PVT3",
"PHASE_MP",
}
CR50_PHASE_WHITELIST = {
"CR50_PHASE_INVALID",
"CR50_PHASE_PREPVT",
"CR50_PHASE_PVT",
}
def _normalize_pools(l):
"""take in the list of pools and distribute them between criticalPools and
self_serve_pools"""
pools = l.get_all_strings("pool")
out = {"criticalPools": [], "self_serve_pools": []}
for pool in pools:
if pool in MANAGED_POOLS:
# convert name to prototype enum for skylab-managed pools
out["criticalPools"].append(MANAGED_POOLS[pool])
else:
# for unmanaged pools preserve the name
out["self_serve_pools"].append(pool)
#TODO(gregorynisbet): reject empty pools too.
if len(out["criticalPools"]) > 1:
sys.stderr.write("multiple critical pools %s\n" % pools)
out["criticalPools"] = ["DUT_POOL_SUITES"]
return out
def _get_chameleon(l):
out = l.get_enum("chameleon", prefix="CHAMELEON_TYPE_")
# send CHAMELEON_TYPE_['HDMI'] -> CHAMELEON_TYPE_HDMI
out = "".join(ch for ch in out if ch not in "[']")
if out == "CHAMELEON_TYPE_INVALID":
return None
if out == "CHAMELEON_TYPE_":
return None
good_val = False
for ch in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMOPQRSTUVWXYZ0123456789":
if out.startswith("CHAMELEON_TYPE_" + ch):
good_val = True
if good_val:
return out
else:
return None
EC_TYPE_ATEST_TO_SK = {
"cros": "EC_TYPE_CHROME_OS",
}
REQUIRED_LABELS = ["board", "model", "sku", "brand"]
class SkylabMissingLabelException(Exception):
pass
class Labels(object):
"""a queryable interface to labels taken from autotest"""
def __init__(self, labels=None):
self.bools = set()
self.strings = {}
if isinstance(labels, Labels):
self.bools.update(labels.bools)
self.strings.update(labels.strings)
elif labels:
for label in labels:
self._add_label(label["name"])
def __len__(self):
return len(self.bools) + len(self.strings)
def __eq__(self, other):
return self.bools == other.bools and self.strings == other.strings
def _add_label(self, name):
"""add a label with a name of the autotest form:
key or key:value.
"""
key, sep, value = name.partition(":")
if sep:
self.strings.setdefault(key, [])
self.strings[key].append(value)
else:
self.bools.add(key)
def get_bool(self, x):
return x in self.bools
def get_string(self, x, default=None):
item = self.strings.get(x, [])
# TODO(gregorynisbet) -- what should we actually do if there's more than
# one value associated with the same key?
if item:
return item[0]
else:
return default
def get_all_strings(self, x, default=None):
return self.strings.get(x, [])
def get_enum(self, x, default=None, prefix=None):
if default is None:
default = "INVALID"
raw = self.get_string(x, default=default)
return prefix + raw.upper()
def get_enum_or_none(self, x, prefix=None):
assert prefix.endswith("_")
raw = self.get_string(x, default=None)
if raw is None:
return None
else:
return prefix + raw.upper()
def bool_keys_starting_with(self, prefix):
"""get the boolean keys beginning with a certain prefix.
Takes time proportional to the number of boolean keys.
"""
for x in self.bools:
if x.startswith(prefix):
yield x
def _cr50_phase(l):
inferred_cr50_phase = l.get_enum("cr50", prefix="CR50_PHASE_")
if inferred_cr50_phase in CR50_PHASE_WHITELIST:
return inferred_cr50_phase
else:
return "CR50_PHASE_INVALID"
def _conductive(l):
out = l.get_string("conductive")
if out is None:
return False
if out in ("False", "false", 0, None, "0", "None", "no", "Flase"):
return False
else:
return True
def _cts_abi(l):
"""The ABI has the structure cts_abi_x86 and cts_abi_arm
instead of the expected cts_abi:x86 and cts_abi_arm
"""
out = []
for abi in ["cts_abi_x86", "cts_abi_arm"]:
if l.get_bool(abi):
out.append(abi.upper())
return out
def _cts_cpu(l):
out = []
for abi in ["cts_cpu_x86", "cts_cpu_arm"]:
if l.get_bool(abi):
out.append(abi.upper())
return out
def _os_type(l):
"""Get the operating system type"""
return l.get_enum("os", prefix="OS_TYPE_")
def _ec_type(l):
"""Get the ec type."""
name = l.get_string("ec")
return EC_TYPE_ATEST_TO_SK.get(name, "EC_TYPE_INVALID")
def _video_acceleration(l):
"""produce a list of enums corresponding
to the video_acc_ keys in the atest format
"""
out = []
for prefix in ["video_acc", "hw_video_acc"]:
for key in l.bool_keys_starting_with(prefix=prefix):
_, delim, suffix = key.rpartition("video_acc_")
assert delim == "video_acc_"
new_label = "VIDEO_ACCELERATION" + "_" + suffix.upper()
if new_label in VIDEO_ACCELERATION_WHITELIST:
out.append(new_label)
return out
def _platform(l):
return l.get_string("platform") or l.get_string("Platform")
def _phase(l):
inferred_phase = l.get_enum("phase", prefix="PHASE_")
if inferred_phase in PHASE_WHITELIST:
return inferred_phase
else:
return "PHASE_INVALID"
def validate_required_fields_for_skylab(skylab_fields):
"""Does 'skylab_fields' have all required fields to add a DUT?
Throw a SkylabMissingLabelException if any mandatory field is not present
@param skylab_fields : a DUT description to be handed to 'skylab add-dut'
@returns: Nothing
"""
try:
labels = skylab_fields["common"]["labels"]
except (KeyError, TypeError, ValueError):
raise ValueError(
'skylab_fields["common"]["labels"] = { ... } is not present')
for label in REQUIRED_LABELS:
if label not in labels or labels[label] is None:
raise SkylabMissingLabelException(label)
return
def process_labels(labels, platform):
"""produce a JSON object of the kind accepted by skylab add-dut
for the labels from autotest
"""
l = Labels(labels)
pools = _normalize_pools(l)
# The enum-type keys below default to None
# except for 'telephony' and 'modem', which default to ''
# This is intentional.
# This function will always return a json-like Python data object,
# even in cases where some normally required fields are missing.
# The explicit None is there as an explicit placeholder.
out = {
# boolean keys in label
"arc": l.get_bool("arc"),
# string keys in label
"board": l.get_string("board", default=None),
"brand": l.get_string("brand-code", default=None),
"cr50Phase": _cr50_phase(l),
"hwidSku": l.get_string("sku", default=None),
"model": l.get_string("model", default=None),
"platform": platform,
"referenceDesign": l.get_string("reference_design"),
# NOTE: the autotest label corresponding to "sku" is
# "device-sku", not "sku"
"sku": l.get_string("device-sku", default=None),
# enum keys
"ecType": _ec_type(l),
"osType": _os_type(l),
"phase": _phase(l),
# list of enum keys
"criticalPools": pools["criticalPools"],
"ctsAbi": _cts_abi(l),
"ctsCpu": _cts_cpu(l),
# list of string keys
"self_serve_pools": pools["self_serve_pools"],
# capabilities substructure
"capabilities": {
# boolean keys in capabilities
"atrus": l.get_bool("atrus"),
"bluetooth": l.get_bool("bluetooth"),
"detachablebase": l.get_bool("detachablebase"),
"flashrom": l.get_bool("flashrom"),
"hotwording": l.get_bool("hotwording"),
"internalDisplay": l.get_bool("internal_display"),
"lucidsleep": l.get_bool("lucidsleep"),
"touchpad": l.get_bool("touchpad"),
"webcam": l.get_bool("webcam"),
# string keys in capabilities
"graphics": l.get_string("graphics", default=None),
"gpuFamily": l.get_string("gpu_family", default=None),
"modem": l.get_string("modem", default=""),
"power": l.get_string("power", default=None),
"storage": l.get_string("storage", default=None),
"telephony": l.get_string("telephony", default=""),
# enum keys in capabilities
"carrier": l.get_enum("carrier", prefix="CARRIER_"),
# video acceleration is its own thing.
"videoAcceleration": _video_acceleration(l),
},
# peripherals substructure
"peripherals": {
"audioBoard": l.get_bool("audio_board"),
"audioBox": l.get_bool("audio_box"),
"audioLoopbackDongle": l.get_bool("audio_loopback_dongle"),
"chameleon": l.get_bool("chameleon"),
"chameleonType": _get_chameleon(l),
"conductive": _conductive(l),
"huddly": l.get_bool("huddly"),
"mimo": l.get_bool("mimo"),
"servo": l.get_bool("servo"),
"stylus": l.get_bool("stylus"),
"wificell": l.get_bool("wificell"),
},
# test hints substructure
"testCoverageHints": {
"chaosDut": l.get_bool("chaos_dut"),
"chromesign": l.get_bool("chromesign"),
"hangoutApp": l.get_bool("hangout_app"),
"meetApp": l.get_bool("meet_app"),
"recoveryTest": l.get_bool("recovery_test"),
"testAudiojack": l.get_bool("test_audio_jack"),
"testHdmiaudio": l.get_bool("test_hdmiaudio"),
"testUsbprinting": l.get_bool("test_usbprinting"),
"usbDetect": l.get_bool("usb_detect"),
},
}
if not out["criticalPools"]:
del out["criticalPools"]
if not out["self_serve_pools"]:
del out["self_serve_pools"]
return out
# accepts: string possibly in camelCase
# returns: string in snake_case
def to_snake_case(str):
out = []
for i, x in enumerate(str):
if i == 0:
out.append(x.lower())
continue
if x.isupper():
out.append("_")
out.append(x.lower())
else:
out.append(x.lower())
return "".join(out)
def write(*args, **kwargs):
print(*args, sep="", end="", **kwargs)
def writeln(*args, **kwargs):
print(*args, sep="", end="\n", **kwargs)
# accepts: key, value, indentation level
# returns: nothing
# emits: textual protobuf format, best effort
def print_textpb_keyval(key, val, level=0):
# repeated field, repeat the key in every stanza
if isinstance(val, (list, tuple)):
for x in val:
# TODO(gregorynisbet): nested lists?
print_textpb_keyval(to_snake_case(key), x, level=level)
# if the value is a dictionary, don't print :
elif isinstance(val, dict):
write((level * " "), to_snake_case(key), " ")
print_textpb(val, level=level)
else:
write((level * " "), to_snake_case(key), ":", " ")
print_textpb(val, level=0)
# accepts: obj, indentation level
# returns: nothing
# emits: textual protobuf format, best effort
def print_textpb(obj, level=0):
# not sure what we want for None
# an empty string seems like a good choice
if obj is None:
writeln((level * " "), '""')
elif isinstance(obj, (bytes, unicode)) and obj.startswith("[IGNORED]"):
writeln((level * " "), json.dumps(str(uuid.uuid4())))
elif isinstance(obj, (int, long, float, bool)):
writeln((level * " "), json.dumps(obj))
elif isinstance(obj, (bytes, unicode)):
# guess that something is not an enum if it
# contains at least one lowercase letter or a space
# or does not contain an underscore
is_enum = True
for ch in obj:
if ch.islower() or ch == " ":
is_enum = False
break
# check for the underscore
is_enum = is_enum and "_" in obj
if is_enum:
writeln((level * " "), obj)
else:
writeln((level * " "), json.dumps(obj))
elif isinstance(obj, dict):
writeln("{")
for key in sorted(obj):
print_textpb_keyval(key=key, val=obj[key], level=(2 + level))
writeln((level * " "), "}")
elif isinstance(obj, (list, tuple)):
raise RuntimeError("No sequences on toplevel")
else:
raise RuntimeError("Unsupported type (%s)" % type(obj))