autotest: use attestation and tpm_manager client

BUG=b:175510138
TEST=test_that $DUT platform_*

Cq-Depend: chromium:2603198
Change-Id: I34e9bb5e8a6cfa54e710555e5966a53799670b2d
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2589435
Commit-Queue: Yi Chou <yich@google.com>
Tested-by: Yi Chou <yich@google.com>
Reviewed-by: Andrey Pronin <apronin@chromium.org>
Reviewed-by: Leo Lai <cylai@google.com>
diff --git a/client/bin/site_sysinfo.py b/client/bin/site_sysinfo.py
index 94454f0..2cbee2b 100755
--- a/client/bin/site_sysinfo.py
+++ b/client/bin/site_sysinfo.py
@@ -442,8 +442,9 @@
         from autotest_lib.client.cros import cryptohome
         # Get the dictionary attack counter.
         keyval["TPM_DICTIONARY_ATTACK_COUNTER"] = (
-                cryptohome.get_tpm_more_status().get(
-                    'dictionary_attack_counter', 'Failed to query cryptohome'))
+                cryptohome.get_tpm_da_info().get(
+                        'dictionary_attack_counter',
+                        'Failed to query tpm_manager'))
 
         # Return the updated keyvals.
         return keyval
diff --git a/client/cros/cryptohome.py b/client/cros/cryptohome.py
index 904046d..61f6698 100644
--- a/client/cros/cryptohome.py
+++ b/client/cros/cryptohome.py
@@ -18,7 +18,9 @@
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.cros.cros_disks import DBusClient
 
+ATTESTATION_CMD = '/usr/bin/attestation_client'
 CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
+TPM_MANAGER_CMD = '/usr/bin/tpm_manager_client'
 GUEST_USER_NAME = '$guest'
 UNAVAILABLE_ACTION = 'Unknown action or no action given.'
 MOUNT_RETRY_COUNT = 20
@@ -88,63 +90,65 @@
         A TPM status dictionary, for example:
         { 'Enabled': True,
           'Owned': True,
-          'Being Owned': False,
-          'Ready': True,
-          'Password': ''
+          'Ready': True
         }
     """
-    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
+    out = __run_cmd(TPM_MANAGER_CMD + ' status --nonsensitive')
     status = {}
-    for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
-        match = re.search('TPM %s: (true|false)' % field, out)
+    for field in ['is_enabled', 'is_owned']:
+        match = re.search('%s: (true|false)' % field, out)
         if not match:
             raise ChromiumOSError('Invalid TPM status: "%s".' % out)
         status[field] = match.group(1) == 'true'
-    match = re.search('TPM Password: (\w*)', out)
-    status['Password'] = ''
-    if match:
-        status['Password'] = match.group(1)
+    status['Enabled'] = status['is_enabled']
+    status['Owned'] = status['is_owned']
+    status['Ready'] = status['is_enabled'] and status['is_owned']
     return status
 
 
-def get_tpm_more_status():
-    """Get more of the TPM status.
+def get_tpm_password():
+    """Get the TPM password.
 
     Returns:
-        A TPM more status dictionary, for example:
-        { 'dictionary_attack_lockout_in_effect': False,
-          'attestation_prepared': False,
-          'boot_lockbox_finalized': False,
-          'enabled': True,
-          'owned': True,
-          'owner_password': ''
+        A TPM password
+    """
+    out = __run_cmd(TPM_MANAGER_CMD + ' status')
+    match = re.search('owner_password: (\w*)', out)
+    password = ''
+    if match:
+        hex_pass = match.group(1).decode("hex")
+        password = ''.join(
+                chr(int(hex_pass[i:i + 2], 16))
+                for i in range(0, len(hex_pass), 2))
+    return password
+
+
+def get_tpm_da_info():
+    """Get the TPM dictionary attack information.
+    Returns:
+        A TPM dictionary attack status dictionary, for example:
+        {
           'dictionary_attack_counter': 0,
-          'dictionary_attack_lockout_seconds_remaining': 0,
-          'dictionary_attack_threshold': 10,
-          'attestation_enrolled': False,
-          'initialized': True,
-          'verified_boot_measured': False,
-          'install_lockbox_finalized': True
+          'dictionary_attack_threshold': 200,
+          'dictionary_attack_lockout_in_effect': False,
+          'dictionary_attack_lockout_seconds_remaining': 0
         }
-        An empty dictionary is returned if the command is not supported.
     """
     status = {}
-    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :')
-    if out.startswith(UNAVAILABLE_ACTION):
-        # --action=tpm_more_status only exists >= 41.
-        logging.info('Method not supported!')
-        return status
-    for line in out.splitlines():
+    out = __run_cmd(TPM_MANAGER_CMD + ' get_da_info')
+    for line in out.splitlines()[1:-1]:
         items = line.strip().split(':')
+        if len(items) != 2:
+            continue
         if items[1].strip() == 'false':
             value = False
         elif items[1].strip() == 'true':
             value = True
-        elif items[1].strip().isdigit():
-            value = int(items[1].strip())
+        elif items[1].split('(')[0].strip().isdigit():
+            value = int(items[1].split('(')[0].strip())
         else:
             value = items[1].strip(' "')
-        status[items[0]] = value
+        status[items[0].strip()] = value
     return status
 
 
@@ -210,7 +214,7 @@
 
 def is_tpm_lockout_in_effect():
     """Returns true if the TPM lockout is in effect; false otherwise."""
-    status = get_tpm_more_status()
+    status = get_tpm_da_info()
     return status.get('dictionary_attack_lockout_in_effect', None)
 
 
@@ -233,13 +237,28 @@
     return status
 
 
+def get_install_attribute_status():
+    """Query the install attribute status
+
+    Returns:
+        A status string, which could be:
+          "UNKNOWN"
+          "TPM_NOT_OWNED"
+          "FIRST_INSTALL"
+          "VALID"
+          "INVALID"
+    """
+    out = __run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_get_status')
+    return out.strip()
+
+
 def get_tpm_attestation_status():
     """Get the TPM attestation status.  Works similar to get_tpm_status().
     """
-    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status')
+    out = __run_cmd(ATTESTATION_CMD + ' status')
     status = {}
-    for field in ['Prepared', 'Enrolled']:
-        match = re.search('Attestation %s: (true|false)' % field, out)
+    for field in ['prepared_for_enrollment', 'enrolled']:
+        match = re.search('%s: (true|false)' % field, out)
         if not match:
             raise ChromiumOSError('Invalid attestation status: "%s".' % out)
         status[field] = match.group(1) == 'true'
@@ -318,7 +337,7 @@
         mounted = False
         while retry < MOUNT_RETRY_COUNT and not mounted:
             time.sleep(1)
-            logging.info("Retry " + str(retry + 1))
+            logging.info("Retry %s", str(retry + 1))
             __run_cmd(' '.join(args))
             # TODO: Remove this additional call to get_user_hash(user) when
             # crbug.com/690994 is fixed
@@ -343,6 +362,7 @@
 
 
 def test_auth(user, password):
+    """Test key auth."""
     cmd = [CRYPTOHOME_CMD, '--action=check_key_ex', '--user=%s' % user,
            '--password=%s' % password, '--async']
     out = __run_cmd(' '.join(cmd))
@@ -351,6 +371,7 @@
 
 
 def add_le_key(user, password, new_password, new_key_label):
+    """Add low entropy key."""
     args = [CRYPTOHOME_CMD, '--action=add_key_ex', '--key_policy=le',
             '--user=%s' % user, '--password=%s' % password,
             '--new_key_label=%s' % new_key_label,
@@ -359,6 +380,7 @@
 
 
 def remove_key(user, password, remove_key_label):
+    """Remove a key."""
     args = [CRYPTOHOME_CMD, '--action=remove_key_ex', '--user=%s' % user,
             '--password=%s' % password,
             '--remove_key_label=%s' % remove_key_label]
@@ -366,6 +388,7 @@
 
 
 def get_supported_key_policies():
+    """Get supported key policies."""
     args = [CRYPTOHOME_CMD, '--action=get_supported_key_policies']
     out = __run_cmd(' '.join(args))
     logging.info(out)
@@ -415,7 +438,7 @@
         return ns_mount_line.split()
 
     try:
-        logging.debug('Active cryptohome mounts:\n' +
+        logging.debug('Active cryptohome mounts:\n%s',
                       utils.system_output('cat %s' % cryptohomed_path))
         mount_line = utils.system_output(
             'grep %s %s' % (mount_point, cryptohomed_path),
@@ -529,7 +552,7 @@
     http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
     """
     if not credential:
-      return None
+        return None
 
     parts = credential.split('@')
     if len(parts) != 2:
@@ -543,6 +566,7 @@
 
 
 def crash_cryptohomed():
+    """Let cryptohome crash."""
     # Try to kill cryptohomed so we get something to work with.
     pid = __run_cmd('pgrep cryptohomed')
     try:
@@ -616,6 +640,7 @@
 
 
 def change_password(user, password, new_password):
+    """Change user password."""
     args = [
             CRYPTOHOME_CMD,
             '--action=migrate_key_ex',
@@ -682,19 +707,19 @@
 
 
     def __wait_for_specific_signal(self, signal, data):
-      """Wait for the |signal| with matching |data|
-         Returns the resulting dict on success or {} on error.
-      """
-      # Do not bubble up the timeout here, just return {}.
-      result = {}
-      try:
-          result = self.wait_for_signal(signal)
-      except utils.TimeoutError:
-          return {}
-      for k in data.keys():
-          if k not in result or result[k] != data[k]:
+        """Wait for the |signal| with matching |data|
+          Returns the resulting dict on success or {} on error.
+        """
+        # Do not bubble up the timeout here, just return {}.
+        result = {}
+        try:
+            result = self.wait_for_signal(signal)
+        except utils.TimeoutError:
             return {}
-      return result
+        for k in data.keys():
+            if k not in result or result[k] != data[k]:
+                return {}
+        return result
 
 
     # Perform a data-less async call.
diff --git a/client/site_tests/desktopui_CheckRlzPingSent/desktopui_CheckRlzPingSent.py b/client/site_tests/desktopui_CheckRlzPingSent/desktopui_CheckRlzPingSent.py
index 598ff87..18fef0a 100644
--- a/client/site_tests/desktopui_CheckRlzPingSent/desktopui_CheckRlzPingSent.py
+++ b/client/site_tests/desktopui_CheckRlzPingSent/desktopui_CheckRlzPingSent.py
@@ -88,8 +88,8 @@
     def _wait_for_rlz_lock(self):
         """Waits for the DUT to get into locked state after login."""
         def get_install_lockbox_finalized_status():
-            status = cryptohome.get_tpm_more_status()
-            return status.get('install_lockbox_finalized')
+            status = cryptohome.get_install_attribute_status()
+            return status == 'VALID'
 
         try:
             utils.poll_for_condition(
diff --git a/client/site_tests/firmware_Cr50VirtualNVRam/firmware_Cr50VirtualNVRam.py b/client/site_tests/firmware_Cr50VirtualNVRam/firmware_Cr50VirtualNVRam.py
index 9e2775e..3565cd8 100644
--- a/client/site_tests/firmware_Cr50VirtualNVRam/firmware_Cr50VirtualNVRam.py
+++ b/client/site_tests/firmware_Cr50VirtualNVRam/firmware_Cr50VirtualNVRam.py
@@ -2,7 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-import logging, re
+import re
 from autotest_lib.client.bin import test, utils
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.cros import service_stopper
@@ -37,6 +37,7 @@
     return out
 
 def expect_tpmc_error(subcommand, expected_error):
+    """Expect a tpmc error."""
     check_tpmc(subcommand, '.*failed.*%s.*' % expected_error)
 
 class firmware_Cr50VirtualNVRam(test.test):
@@ -49,7 +50,7 @@
         global tpm_pw_hex
         cryptohome.take_tpm_ownership(wait_for_ownership=True)
 
-        tpm_owner_password = cryptohome.get_tpm_status()['Password']
+        tpm_owner_password = cryptohome.get_tpm_password()
         if not tpm_owner_password:
             raise error.TestError('TPM owner password is empty after '
                                   'taking ownership.')
@@ -275,6 +276,7 @@
                           '0x12f');  # TPM_RC_AUTH_UNAVAILABLE
 
     def initialize(self):
+        """Initialize the test."""
         self.__take_tpm_ownership()
         # Stop services that access to the TPM, to be able to use tpmc.
         # Note: for TPM2 the order of re-starting services (they are started
@@ -306,4 +308,5 @@
         self.__read_tests()
 
     def cleanup(self):
+        """Cleanup the test."""
         self._services.restore_services()
diff --git a/client/site_tests/platform_CryptohomeTpmLiveTest/platform_CryptohomeTpmLiveTest.py b/client/site_tests/platform_CryptohomeTpmLiveTest/platform_CryptohomeTpmLiveTest.py
index 5d9b409..232e2c5 100644
--- a/client/site_tests/platform_CryptohomeTpmLiveTest/platform_CryptohomeTpmLiveTest.py
+++ b/client/site_tests/platform_CryptohomeTpmLiveTest/platform_CryptohomeTpmLiveTest.py
@@ -13,9 +13,10 @@
     version = 1
 
     def run_once(self):
+        """Run TPM live tests."""
         cryptohome.take_tpm_ownership(wait_for_ownership=True)
 
-        tpm_owner_password = cryptohome.get_tpm_status()['Password']
+        tpm_owner_password = cryptohome.get_tpm_password()
         if not tpm_owner_password:
             raise error.TestError('TPM owner password is empty after taking '
                                   'ownership.')
diff --git a/client/site_tests/platform_InitLoginPerf/platform_InitLoginPerf.py b/client/site_tests/platform_InitLoginPerf/platform_InitLoginPerf.py
index 62065f0..26baec2 100644
--- a/client/site_tests/platform_InitLoginPerf/platform_InitLoginPerf.py
+++ b/client/site_tests/platform_InitLoginPerf/platform_InitLoginPerf.py
@@ -23,7 +23,9 @@
     @return: Attestation readiness status - True/False.
 
     """
-    return cryptohome.get_tpm_more_status().get('attestation_prepared', False)
+    return cryptohome.get_tpm_attestation_status().get(
+            'prepared_for_enrollment', False)
+
 
 def get_bootstat_timestamp(name, occurrence):
     """Gets the timestamp in ms of the given timestamp name and occurrence
@@ -177,7 +179,7 @@
                                     expected_value=True,
                                     timeout_sec=timeout):
             logging.debug('tpm_more_status: %r',
-                          cryptohome.get_tpm_more_status())
+                          cryptohome.get_tpm_attestation_status())
             raise error.TestFail('Timeout waiting for attestation_prepared')
 
     def get_init_durations(self):