# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import time
import StringIO
import subprocess

from autotest_lib.client.common_lib import error, utils
from autotest_lib.client.common_lib.cros import tpm_utils
from autotest_lib.client.cros import constants
from autotest_lib.server.cros.faft.firmware_test import FirmwareTest


class firmware_IntegratedU2F(FirmwareTest):
    """Verify U2F using the on-board cr50 firmware works."""
    version = 1

    U2F_TEST_PATH = '/usr/local/bin/U2FTest'

    U2F_FORCE_PATH = '/var/lib/u2f/force/u2f.force'
    G2F_FORCE_PATH = '/var/lib/u2f/force/g2f.force'
    USER_KEYS_FORCE_PATH = '/var/lib/u2f/force/user_keys.force'

    VID = '18D1'
    PID = '502C'
    SHORT_WAIT = 1

    def cleanup(self):
        """Remove *.force files"""
        self.host.run('rm -f /var/lib/u2f/force/*.force')

        # Restart u2fd so that flag change takes effect.
        self.host.run('restart u2fd')

        # Put the device back to a known state; also restarts the device.
        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)

        super(firmware_IntegratedU2F, self).cleanup()


    def u2fd_is_running(self):
        """Returns True if u2fd is running on the host"""
        return 'running' in self.host.run('status u2fd').stdout


    def cryptohome_ready(self):
        """Return True if cryptohome is running."""
        return 'running' in self.host.run('status cryptohomed').stdout


    def owner_key_exists(self):
        """Return True if constants.OWNER_KEY_FILE exists."""
        logging.info('checking for owner key')
        return self.host.path_exists(constants.OWNER_KEY_FILE)


    def wait_for_policy(self):
        """Start u2fd on the host"""

        # Wait for cryptohome to show the TPM is ready before logging in.
        if not utils.wait_for_value(self.cryptohome_ready, True,
                                    timeout_sec=60):
            raise error.TestError('Cryptohome did not start')

        # Wait for the owner key to exist before trying to start u2fd.
        if not utils.wait_for_value(self.owner_key_exists, True,
                                    timeout_sec=120):
            raise error.TestError('Device did not create owner key')


    def attestation_init_complete(self):
        """Return True if prepare_for_enrollment has completed"""
        return 'prepared_for_enrollment: true' in self.host.run(
            'attestation_client status').stdout

    def chaps_init_complete(self):
        """Return True if chaps token initialization has completed"""
        return 'available with 2 token' in self.host.run(
            'chaps_client --ping').stderr

    def wait_for_cr50(self):
      """Wait for cr50 to complete any OOBE initialization"""

      if not utils.wait_for_value(self.attestation_init_complete,
                                  True, timeout_sec=120):
            raise error.TestError('Attestation initialization did not complete')

      if not utils.wait_for_value(self.chaps_init_complete,
                                  True, timeout_sec=120):
            raise error.TestError('Chaps initialization did not complete')


    def set_u2fd_flags(self, u2f, g2f, user_keys):
        # Start by removing all flags.
        self.host.run('rm -f /var/lib/u2f/force/*.force')

        if u2f:
          self.host.run('touch %s' % self.U2F_FORCE_PATH)

        if g2f:
          self.host.run('touch %s' % self.G2F_FORCE_PATH)

        if user_keys:
          self.host.run('touch %s' % self.USER_KEYS_FORCE_PATH)

        # Restart u2fd so that flag change takes effect.
        self.host.run('restart u2fd')

        # Make sure it is still running
        if not self.u2fd_is_running():
            raise error.TestFail('could not start u2fd')
        logging.info('u2fd is running')


    def find_u2f_device(self):
        """Find the U2F device

        Returns:
            0 if the device hasn't been found. Non-zero if it has
        """
        self.device = ''
        path = '/sys/bus/hid/devices/*:%s:%s.*/hidraw' % (self.VID, self.PID)
        try:
            self.device = self.host.run('ls ' + path).stdout.strip()
        except error.AutoservRunError, e:
            logging.info('Could not find device')
        return len(self.device)


    def update_u2f_device_path(self):
        """Get the integrated u2f device."""
        start_time = time.time()
        utils.wait_for_value(self.find_u2f_device, max_threshold=1,
            timeout_sec=30)
        wait_time = int(time.time() - start_time)
        if wait_time:
            logging.info('Took %ss to find device', wait_time)
        self.dev_path = '/dev/' + self.device


    def check_u2ftest_and_press_power_button(self):
        """Check stdout and press the power button if prompted

        Returns:
            True if the process has terminated.
        """
        time.sleep(self.SHORT_WAIT)
        self.output += self.get_u2ftest_output()
        logging.info(self.output)
        if 'Touch device and hit enter..' in self.output:
            # press the power button
            self.servo.power_short_press()
            logging.info('pressed power button')
            time.sleep(self.SHORT_WAIT)
            # send enter to the test process
            self.u2ftest_job.sp.stdin.write('\n')
            logging.info('hit enter')
            self.output = ''
        return self.u2ftest_job.sp.poll() is not None


    def get_u2ftest_output(self):
        """Read the new output"""
        self.u2ftest_job.process_output()
        self.stdout.seek(self.last_len)
        output = self.stdout.read().strip()
        self.last_len = self.stdout.len
        return output

    def run_u2ftest(self):
        """Run U2FTest with the U2F device"""
        self.last_len = 0
        self.output = ''

        u2ftest_cmd = utils.sh_escape('%s %s' % (self.U2F_TEST_PATH,
                                                 self.dev_path))
        full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
            u2ftest_cmd)
        self.stdout = StringIO.StringIO()
        # Start running U2FTest in the background.
        self.u2ftest_job = utils.BgJob(full_ssh_command,
                                       nickname='u2ftest',
                                       stdout_tee=self.stdout,
                                       stderr_tee=utils.TEE_TO_LOGS,
                                       stdin=subprocess.PIPE)
        if self.u2ftest_job == None:
            raise error.TestFail('could not start u2ftest')

        try:
            utils.wait_for_value(self.check_u2ftest_and_press_power_button,
                expected_value=True, timeout_sec=30)
        finally:
            self.close_u2ftest()


    def close_u2ftest(self):
        """Terminate the process and check the results."""
        exit_status = utils.nuke_subprocess(self.u2ftest_job.sp)

        stdout = self.stdout.getvalue().strip()
        if stdout:
            logging.debug('stdout of U2FTest:\n%s', stdout)
        if exit_status:
            logging.error('stderr of U2FTest:\n%s', self.output)
            raise error.TestError('U2FTest: %s' % self.output)


    def run_once(self, host):
        """Run U2FTest"""
        self.host = host

        if not self.host.path_exists(self.U2F_TEST_PATH):
            raise error.TestNAError('Device does not have U2FTest support')

        # u2fd reads files from the user's home dir, so we need to log in.
        self.host.run('/usr/local/autotest/bin/autologin.py')

        # u2fd needs the policy file to exist.
        self.wait_for_policy()

        # Wait for OOBE initialiation to complete, as long-running operations
        # (eg RSA key generation) could cause U2F operations to timeout.
        self.wait_for_cr50()

        logging.info("testing u2fd --u2f")
        self.set_u2fd_flags(True, False, False)
        # Setting the flags restarts u2fd, which will re-create the u2f device.
        self.update_u2f_device_path()
        self.run_u2ftest();

        logging.info("testing u2fd --g2f")
        self.set_u2fd_flags(False, True, False)
        self.update_u2f_device_path()
        self.run_u2ftest();

        logging.info("testing u2fd --u2f --user_keys")
        self.set_u2fd_flags(True, False, True)
        self.update_u2f_device_path()
        self.run_u2ftest();

        logging.info("testing u2fd --g2f --user_keys")
        self.set_u2fd_flags(False, True, True)
        self.update_u2f_device_path()
        self.run_u2ftest();
