# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import glob, logging, os, re, shutil, subprocess, sys, time

import auth_server, common, constants, cros_logging, cros_ui, cryptohome
import dns_server, login, ownership, pyauto_test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error

class UITest(pyauto_test.PyAutoTest):
    """Base class for tests that drive some portion of the user interface.

    PYAUTO IS DEPRECATED. PLEASE DO NOT USE THIS CLASS. Use Telemetry instead.

    By default subclasses will use the default remote credentials before
    the run_once method is invoked, and will log out at the completion
    of the test case even if an exception is thrown.

    Subclasses can opt out of the automatic login by setting the member
    variable 'auto_login' to False.

    Subclasses can log in with arbitrary credentials by passing
    the 'creds' parameter in their control file.  See the documentation of
    UITest.initialize for more details.

    If your subclass overrides the initialize() or cleanup() methods, it
    should make sure to invoke this class' version of those methods as well.
    The standard super(...) function cannot be used for this, since the base
    test class is not a 'new style' Python class.
    """
    version = 1

    skip_oobe = True
    auto_login = True
    fake_owner = True
    username = None
    password = None

    # Processes that we know crash and are willing to ignore.
    crash_blacklist = []

    # ftrace-related files.
    _ftrace_process_fork_event_enable_file = \
        '/sys/kernel/debug/tracing/events/sched/sched_process_fork/enable'
    _ftrace_process_fork_event_filter_file = \
        '/sys/kernel/debug/tracing/events/sched/sched_process_fork/filter'
    _ftrace_signal_generate_event_enable_file = \
        '/sys/kernel/debug/tracing/events/signal/signal_generate/enable'
    _ftrace_signal_generate_event_filter_file = \
        '/sys/kernel/debug/tracing/events/signal/signal_generate/filter'
    _ftrace_trace_file = '/sys/kernel/debug/tracing/trace'

    _last_chrome_log = ''


    def start_authserver(self, authenticator=None):
        """Spin up a local mock of the Google Accounts server, then spin up
        a local fake DNS server and tell the networking stack to use it.  This
        will trick Chrome into talking to our mock when we login.
        Subclasses can override this method to change this behavior.
        """
        self._authServer = auth_server.GoogleAuthServer(
            authenticator=authenticator)
        self._authServer.run()
        self._dnsServer = dns_server.LocalDns()
        self._dnsServer.run()


    def stop_authserver(self):
        """Tears down fake dns and fake Google Accounts server.  If your
        subclass does not create these objects, you will want to override this
        method as well.
        """
        if hasattr(self, '_authServer'):
            self._authServer.stop()
            del self._authServer
        if hasattr(self, '_dnsServer'):
            try:
                self._dnsServer.stop()
            except utils.TimeoutError as err:
                raise error.TestWarn(err)
            del self._dnsServer


    def start_chrome_event_tracing(self):
        """Start tracing events of a chrome process being created or receiving a
        signal.
        """
        try:
            # Clear the trace buffer.
            utils.open_write_close(self._ftrace_trace_file, '')

            # Trace only chrome process creation events, which we may later use
            # to determine if a chrome process is killed by its parent.
            utils.open_write_close(
                self._ftrace_process_fork_event_filter_file,
                'child_comm==chrome')
            # Trace only chrome processes receiving any signal except for
            # the uninteresting SIGPROF (sig 27 on x86 and arm).
            utils.open_write_close(
                self._ftrace_signal_generate_event_filter_file,
                'comm==chrome && sig!=27')

            # Enable the process_fork event tracing.
            utils.open_write_close(
                self._ftrace_process_fork_event_enable_file, '1')
            # Enable the signal_generate event tracing.
            utils.open_write_close(
                self._ftrace_signal_generate_event_enable_file, '1')
        except IOError as err:
            logging.warning('Failed to start chrome signal tracing: %s', err)


    def stop_chrome_event_tracing(self):
        """Stop tracing events of a chrome process being created or receiving a
        signal.
        """
        try:
            # Disable the process_fork event tracing.
            utils.open_write_close(
                self._ftrace_process_fork_event_enable_file, '0')
            # Disable the signal_generate event tracing.
            utils.open_write_close(
                self._ftrace_signal_generate_event_enable_file, '0')

            # Clear the process_fork event filter.
            utils.open_write_close(
                self._ftrace_process_fork_event_filter_file, '0')
            # Clear the signal_generate event filter.
            utils.open_write_close(
                self._ftrace_signal_generate_event_filter_file, '0')

            # Dump the trace buffer to a log file.
            trace_file = os.path.join(self.resultsdir, 'chrome_event_trace')
            trace_data = utils.read_file(self._ftrace_trace_file)
            utils.open_write_close(trace_file, trace_data)
        except IOError as err:
            logging.warning('Failed to stop chrome signal tracing: %s', err)


    def start_tcpdump(self, iface):
        """Start tcpdump process, if not running already."""
        if not hasattr(self, '_tcpdump'):
            self._tcpdump = subprocess.Popen(
                ['tcpdump', '-i', iface, '-vv'], stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT)


    def stop_tcpdump(self, fname_prefix):
        """Stop tcpdump process and save output to a new file."""
        if hasattr(self, '_tcpdump'):
            self._tcpdump.terminate()
            # Save output to a new file
            next_index = len(glob.glob(
                os.path.join(self.resultsdir, '%s-*' % fname_prefix)))
            tcpdump_file = os.path.join(
                self.resultsdir, '%s-%d' % (fname_prefix, next_index))
            logging.info('Saving tcpdump output to %s' % tcpdump_file)
            utils.open_write_close(tcpdump_file, self._tcpdump.communicate()[0])
            del self._tcpdump


    def __log_all_processes(self, fname_prefix):
        """Log all processes to a file.

        Args:
            fname_prefix: Prefix of the log file.
        """
        try:
            next_index = len(glob.glob(
                os.path.join(self.resultsdir, '%s-*' % fname_prefix)))
            log_file = os.path.join(
                self.resultsdir, '%s-%d' % (fname_prefix, next_index))
            utils.open_write_close(log_file, utils.system_output('ps -eF'))
        except (error.CmdError, IOError, OSError) as err:
            logging.warning('Failed to log all processes: %s', err)


    def __perform_ui_diagnostics(self):
        """Save diagnostic logs about UI.

        This includes the output of:
          $ initctl status ui
          $ ps auxwww
        """
        output_file = os.path.join(self.resultsdir, 'ui_diagnostics.txt')
        with open(output_file, 'w') as output_fd:
            print >> output_fd, time.asctime(), '\n'
            cmd = 'initctl status ui'
            print >> output_fd, '$ %s' % cmd
            print >> output_fd, utils.system_output(cmd), '\n'
            cmd = 'ps auxwww'
            print >> output_fd, '$ %s' % cmd
            print >> output_fd, utils.system_output(cmd), '\n'
        logging.info('Saved UI diagnostics to %s' % output_file)


    def __generate_coredumps(self, names):
        """Generate core dump files in results dir for given processes.

        Note that the coredumps are forced via SIGBUS and the processes will be
        terminated. Ideally we should use gdb gcore to create dumps
        non-intrusively. However, the current dumps generated by gcore could not
        be properly read back by gdb, i.e. no reasonable symbolized stack could
        be generated.

        Args:
            names: A list of process names that need to be dumped.
        """

        # Get all pids of named processes.
        pids = []
        for name in names:
            # Get pids of given name, slice [1:] to skip ps's first line 'PID'
            pids = pids + [ pid.strip() for pid in utils.system_output(
                'ps -C %s -o pid' % name).splitlines()[1:]]
        logging.info('Will force core dumps for the following pid: %s' %
            ' '.join(pids))

        # Stop all processes so that forcing dump would change their state.
        for pid in pids:
            utils.system('kill -STOP %s' % pid)

        # Force core dump.
        for pid in pids:
            utils.system('kill -BUS %s' % pid)

        # Resume to let the core dump finish.
        for pid in reversed(pids):
            utils.system('kill -CONT %s' % pid)


    def initialize(self, creds=None, is_creating_owner=False,
                   extra_chrome_flags=[], subtract_extra_chrome_flags=[],
                   *args, **kwargs):
        """Overridden from test.initialize() to log out and (maybe) log in.

        If self.auto_login is True, this will automatically log in using the
        credentials specified by 'creds' at startup, otherwise login will not
        happen.

        Regardless of the state of self.auto_login, the self.username and
        self.password properties will be set to the credentials specified
        by 'creds'.

        Authentication is not performed against live servers.  Instead, we spin
        up a local DNS server that will lie and say that all sites resolve to
        127.0.0.1.  The DNS server tells flimflam via DBus that it should be
        used to resolve addresses.  We then spin up a local httpd that will
        respond to queries at the Google Accounts endpoints.  We clear the DNS
        setting and tear down these servers in cleanup().

        Args:
            creds: String specifying the credentials for this test case.  Can
                be a named set of credentials as defined by
                constants.CREDENTIALS, or a 'username:password' pair.
                Defaults to None -- browse without signing-in.
            is_creating_owner: If the test case is creating a new device owner.
            extra_chrome_flags: Extra chrome flags to pass to chrome, if any.
            subtract_extra_chrome_flags: Remove default flags passed to chrome
                by pyauto, if any.
        """
        # Mark /var/log/messages now; we'll run through all subsequent
        # log messages at the end of the test and log info about processes that
        # crashed.
        self._log_reader = cros_logging.LogReader()
        self._log_reader.set_start_by_current()

        # Do not use gaia even for incognito browsing.
        self.start_authserver()

        # Run tcpdump on 'lo' interface to investigate network
        # issues in the lab during login.
        self.start_tcpdump(iface='lo')

        # Log all processes so that we can correlate PIDs to processes in
        # the chrome signal trace.
        self.__log_all_processes('processes--before-tracing')

        # Start event tracing related to chrome processes.
        self.start_chrome_event_tracing()

        # We yearn for Chrome coredumps...
        open(constants.CHROME_CORE_MAGIC_FILE, 'w').close()

        # The UI must be taken down to ensure that no stale state persists.
        cros_ui.stop()
        (self.username, self.password) = self.__resolve_creds(creds)
        # Ensure there's no stale cryptohome from previous tests.
        try:
            cryptohome.remove_all_vaults()
        except cryptohome.ChromiumOSError as err:
            logging.error(err)

        # Fake ownership unless the test is explicitly testing owner creation.
        if not is_creating_owner:
            logging.info('Faking ownership...')
            ownership.fake_ownership()
            self.fake_owner = True
        else:
            logging.info('Erasing stale owner state.')
            ownership.clear_ownership_files()
            self.fake_owner = False

        try:
            cros_ui.start()
        except:
            self.__perform_ui_diagnostics()
            if not login.wait_for_browser_exit('Chrome crashed during login'):
              self.__generate_coredumps([constants.BROWSER])
            raise

        # Save name of the last chrome log before our test started.
        log_files = glob.glob(constants.CHROME_LOG_DIR + '/chrome_*')
        self._last_chrome_log = max(log_files) if log_files else ''

        pyauto_test.PyAutoTest.initialize(
            self, auto_login=False,
            extra_chrome_flags=extra_chrome_flags,
            subtract_extra_chrome_flags=subtract_extra_chrome_flags,
            *args, **kwargs)
        if self.skip_oobe or self.auto_login:
            self.pyauto.SkipToLogin()
        if self.auto_login:
            self.login(self.username, self.password)
            if is_creating_owner:
                login.wait_for_ownership()


    def __resolve_creds(self, creds):
        """Map credential identifier to username, password and type.
        Args:
          creds: credential identifier to resolve.

        Returns:
          A (username, password) tuple.
        """
        if not creds:
            return [None, None]  # Browse without signing-in.
        if creds[0] == '$':
            if creds not in constants.CREDENTIALS:
                raise error.TestFail('Unknown credentials: %s' % creds)

            (name, passwd) = constants.CREDENTIALS[creds]
            return [cryptohome.canonicalize(name), passwd]

        (name, passwd) = creds.split(':')
        return [cryptohome.canonicalize(name), passwd]


    def take_screenshot(self, fname_prefix, format='png'):
      """Take screenshot and save to a new file in the results dir.

      Args:
        fname_prefix: prefix for the output fname
        format:       string indicating file format ('png', 'jpg', etc)

      Returns:
        the path of the saved screenshot file
      """
      next_index = len(glob.glob(
          os.path.join(self.resultsdir, '%s-*.%s' % (fname_prefix, format))))
      screenshot_file = os.path.join(
          self.resultsdir, '%s-%d.%s' % (fname_prefix, next_index, format))
      logging.info('Saving screenshot to %s.' % screenshot_file)

      old_exc_type = sys.exc_info()[0]
      try:
          utils.system('DISPLAY=:0.0 XAUTHORITY=/home/chronos/.Xauthority '
                       '/usr/local/bin/import -window root -depth 8 %s' %
                       screenshot_file)
      except Exception as err:
          # Do not raise an exception if the screenshot fails while processing
          # another exception.
          if old_exc_type is None:
              raise
          logging.error(err)

      return screenshot_file


    def login(self, username=None, password=None):
        """Log in with a set of credentials.

        This method is called from UITest.initialize(), so you won't need it
        unless your testcase has cause to log in multiple times.  This
        DOES NOT affect self.username or self.password.

        If username and self.username are not defined, logs in as guest.

        Forces a log out if already logged in.
        Blocks until login is complete.

        Args:
            username: username to log in as, defaults to self.username.
            password: password to log in with, defaults to self.password.

        Raises:
            error.TestError, if login has an error
        """
        if self.logged_in():
            self.logout()

        uname = username or self.username
        passwd = password or self.password

        try:
            screenshot_name = 'login-success-screenshot'
            if uname:  # Regular login
                login_error = self.pyauto.Login(username=uname,
                                                password=passwd)
                if login_error:
                    screenshot_name = 'login-error-screenshot'
                    raise error.TestFail(
                        'Error during login (%s, %s): %s.  See the file named '
                        '%s.png in the results folder.' % (uname, passwd,
                        login_error, screenshot_name))
            else:  # Login as guest
                self.pyauto.LoginAsGuest()
                logging.info('Logged in as guest.')
            if not self.logged_in():
                screenshot_name = 'login-bizarre-fail-screenshot'
                raise error.TestFail('Login was successful, but logged_in() '
                                     'returned False. This should not happen. '
                                     'Please check the file named %s.png '
                                     'located in the results folder.' %
                                     screenshot_name)
        except Exception as err:
            if isinstance(err, error.AutotestError):
                raise  # Do not modify our own errors.

            screenshot_name = 'login-fail-screenshot'
            raise error.TestFail('Exception raised during login: %s. See the '
                                 'file named %s.png in the results folder.' %
                                 (err, screenshot_name))
        finally:
            self.take_screenshot(fname_prefix=screenshot_name)
            self.stop_tcpdump(fname_prefix='tcpdump-lo--till-login')

        logging.info('Logged in as %s.  You can verify with the '
                     'file named %s.png located in the results '
                     'folder.' % (uname, screenshot_name))

    def logged_in(self):
        return self.pyauto.GetLoginInfo()['is_logged_in']


    def logout(self):
        """Log out.

        This method is called from UITest.cleanup(), so you won't need it
        unless your testcase needs to test functionality while logged out.
        """
        if not self.logged_in():
            return
        self._save_logs_from_cryptohome()

        try:
            cros_ui.restart(self.pyauto.Logout)
        except:
            self.__perform_ui_diagnostics()
            if not login.wait_for_browser_exit('Chrome crashed during logout'):
              self.__generate_coredumps([constants.BROWSER])
            raise


    def _save_logs_from_cryptohome(self):
        """Recover dirs from cryptohome in case another test run wipes."""
        try:
            for dir in constants.CRYPTOHOME_DIRS_TO_RECOVER:
                dir_path = os.path.join(constants.CRYPTOHOME_MOUNT_PT, dir)
                if os.path.isdir(dir_path):
                    target = os.path.join(self.resultsdir,
                                          '%s-%f' % (dir, time.time()))
                    logging.debug('Saving %s to %s.', dir_path, target)
                    shutil.copytree(src=dir_path, dst=target, symlinks=True)
        except (IOError, OSError, shutil.Error) as err:
            logging.error(err)


    def validate_basic_policy(self, basic_policy):
        # Pull in protobuf definitions.
        sys.path.append(self.srcdir)
        from device_management_backend_pb2 import PolicyFetchResponse
        from device_management_backend_pb2 import PolicyData
        from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
        from chrome_device_policy_pb2 import UserWhitelistProto

        response_proto = PolicyFetchResponse()
        response_proto.ParseFromString(basic_policy)
        ownership.assert_has_policy_data(response_proto)

        poldata = PolicyData()
        poldata.ParseFromString(response_proto.policy_data)
        ownership.assert_has_device_settings(poldata)
        ownership.assert_username(poldata, self.username)

        polval = ChromeDeviceSettingsProto()
        polval.ParseFromString(poldata.policy_value)
        ownership.assert_new_users(polval, True)
        ownership.assert_users_on_whitelist(polval, (self.username,))


    def __log_crashed_processes(self, processes):
        """Runs through the log watched by |watcher| to see if a crash was
        reported for any process names not listed in |processes|. SIGABRT
        crashes in chrome or supplied-chrome during ui restart are ignored.
        """
        ui_restart_begin_regex = re.compile(cros_ui.UI_RESTART_ATTEMPT_MSG)
        crash_regex = re.compile(
            'Received crash notification for ([-\w]+).+ (sig \d+)')
        ui_restart_end_regex = re.compile(cros_ui.UI_RESTART_COMPLETE_MSG)

        in_restart = False
        for line in self._log_reader.get_logs().splitlines():
            if ui_restart_begin_regex.search(line):
                in_restart = True
            elif ui_restart_end_regex.search(line):
                in_restart = False
            else:
                match = crash_regex.search(line)
                if (match and not match.group(1) in processes and
                    not (in_restart and
                         (match.group(1) == constants.BROWSER or
                          match.group(1) == 'supplied_chrome') and
                         match.group(2) == 'sig 6')):
                    self.job.record('INFO', self.tagged_testname,
                                    line[match.start():])


    def execute(self, iterations=None, test_length=None,
                profile_only=None, _get_time=time.time,
                postprocess_profiled_run=None, constraints=(), *args, **kwargs):
        """Wrapper around execute to take a screenshot for any exception."""
        try:
            super(UITest, self).execute(iterations=iterations,
                                        test_length=test_length,
                                        profile_only=profile_only,
                                        _get_time=_get_time,
                                        postprocess_profiled_run=
                                          postprocess_profiled_run,
                                        constraints=constraints,
                                        *args, **kwargs)
        except:
            self.take_screenshot(fname_prefix='test-fail-screenshot')
            raise


    def cleanup(self):
        """Overridden from pyauto_test.cleanup() to log out and restart
           session_manager when the test is complete.
        """
        try:
            # Save all chrome logs created during the test.
            try:
                for fullpath in glob.glob(
                    constants.CHROME_LOG_DIR + '/chrome_*'):
                    if os.path.isfile(fullpath) and \
                        not os.path.islink(fullpath) and \
                        fullpath > self._last_chrome_log:  # ignore old logs
                        shutil.copy2(fullpath, self.resultsdir)

            except (IOError, OSError) as err:
                logging.error(err)

            self._save_logs_from_cryptohome()
            pyauto_test.PyAutoTest.cleanup(self)

            if os.path.isfile(constants.CRYPTOHOMED_LOG):
                try:
                    base = os.path.basename(constants.CRYPTOHOMED_LOG)
                    shutil.copy(constants.CRYPTOHOMED_LOG,
                                os.path.join(self.resultsdir, base))
                except (IOError, OSError) as err:
                    logging.error(err)

            if self.fake_owner:
                logging.info('Erasing fake owner state.')
                ownership.clear_ownership_files()

            self.__log_crashed_processes(self.crash_blacklist)

            if os.path.isfile(constants.CHROME_CORE_MAGIC_FILE):
                os.unlink(constants.CHROME_CORE_MAGIC_FILE)
        finally:
            self.stop_chrome_event_tracing()
            self.__log_all_processes('processes--after-tracing')
            self.stop_tcpdump(fname_prefix='tcpdump-lo--till-end')
            self.stop_authserver()


    def get_auth_endpoint_misses(self):
        if hasattr(self, '_authServer'):
            return self._authServer.get_endpoint_misses()
        else:
            return {}
