#!/usr/bin/python
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


# DESCRIPTION :
#
# This UI is intended to be used by the factory autotest suite to
# provide factory operators feedback on test status and control over
# execution order.
#
# In short, the UI is composed of a 'console' panel on the bottom of
# the screen which displays the autotest log, and there is also a
# 'test list' panel on the right hand side of the screen.  The
# majority of the screen is dedicated to tests, which are executed in
# seperate processes, but instructed to display their own UIs in this
# dedicated area whenever possible.  Tests in the test list are
# executed in order by default, but can be activated on demand via
# associated keyboard shortcuts.  As tests are run, their status is
# color-indicated to the operator -- greyed out means untested, yellow
# means active, green passed and red failed.


import gobject
import gtk
import imp
import os
import pango
import select
import signal
import subprocess
import sys
import time

import common
import factory
import factory_state
import factory_ui_lib as ful
import utils

from gtk import gdk
from Xlib import X
from Xlib.display import Display as X_Display


# These definitions are expose these classes directly into this
# namespace, so that the test_list that is sent from the control file
# can have cleaner syntax.  These are done in this fashion, as opposed
# to "from factory import <class>" to work-around Python namespace
# wackiness -- the from syntax does not work, creating new classes.
OperatorTest = factory.OperatorTest
InformationScreen = factory.InformationScreen
AutomatedSequence = factory.AutomatedSequence
AutomatedSubTest = factory.AutomatedSubTest
AutomatedRebootSubTest = factory.AutomatedRebootSubTest


_LABEL_EN_SIZE = (170, 35)
_LABEL_ZW_SIZE = (70, 35)
_LABEL_EN_FONT = pango.FontDescription('courier new extra-condensed 16')
_LABEL_ZW_FONT = pango.FontDescription('normal 12')
_LABEL_T_SIZE = (40, 35)
_LABEL_T_FONT = pango.FontDescription('arial ultra-condensed 10')
_LABEL_UNTESTED_FG = gtk.gdk.color_parse('grey40')
_LABEL_TROUGH_COLOR = gtk.gdk.color_parse('grey20')
_LABEL_STATUS_SIZE = (140, 30)
_LABEL_STATUS_FONT = pango.FontDescription(
    'courier new bold extra-condensed 16')
_OTHER_LABEL_FONT = pango.FontDescription('courier new condensed 20')

_ST_LABEL_EN_SIZE = (250, 35)
_ST_LABEL_ZW_SIZE = (150, 35)

class Console:
    '''Display a progress log.  Implemented by launching an borderless
    xterm at a strategic location, and running tail against the log.'''

    def __init__(self, allocation):
        xterm_coords = '145x13+%d+%d' % (allocation.x, allocation.y)
        factory.log('xterm_coords = %s' % xterm_coords)
        xterm_opts = ('-bg black -fg lightgray -bw 0 -g %s' % xterm_coords)
        xterm_cmd = (('urxvt %s -e bash -c ' % xterm_opts).split() +
                     ['tail -f "%s"' % factory.CONSOLE_LOG_PATH])
        factory.log('xterm_cmd = %s' % xterm_cmd)
        self._proc = subprocess.Popen(xterm_cmd)

    def __del__(self):
        factory.log('console_proc __del__')
        self._proc.kill()


# Capture keyboard events here for debugging -- under normal
# circumstances, all keyboard events should be captured by executing
# tests, and hence this should not be called.

def handle_key_release_event(_, event):
    factory.log('base ui key event (%s)' % event.keyval)
    return True


class TestLabelBox(gtk.EventBox):

    def __init__(self, test, show_shortcut=False):
        gtk.EventBox.__init__(self)
        self.modify_bg(gtk.STATE_NORMAL, ful.LABEL_COLORS[ful.UNTESTED])

        label_en = ful.make_label(test.label_en, size=_LABEL_EN_SIZE,
                                  font=_LABEL_EN_FONT, alignment=(0.5, 0.5),
                                  fg=_LABEL_UNTESTED_FG)
        label_zw = ful.make_label(test.label_zw, size=_LABEL_ZW_SIZE,
                                  font=_LABEL_ZW_FONT, alignment=(0.5, 0.5),
                                  fg=_LABEL_UNTESTED_FG)
        label_t = ful.make_label('C-' + test.kbd_shortcut.upper(),
                                 size=_LABEL_T_SIZE,
                                 font=_LABEL_T_FONT, alignment=(0.5, 0.5),
                                 fg=ful.BLACK)

        # build a better label_en with shortcuts
        index_hotkey = test.label_en.upper().find(test.kbd_shortcut.upper())
        if show_shortcut and index_hotkey >= 0:
            attrs = label_en.get_attributes() or pango.AttrList()
            attrs.insert(pango.AttrUnderline(
                    pango.UNDERLINE_LOW, index_hotkey, index_hotkey + 1))
            attrs.insert(pango.AttrWeight(
                    pango.WEIGHT_BOLD, index_hotkey, index_hotkey + 1))
            label_en.set_attributes(attrs)

        hbox = gtk.HBox()
        hbox.pack_start(label_en, False, False)
        hbox.pack_start(label_zw, False, False)
        hbox.pack_start(label_t, False, False)
        self.add(hbox)
        self.label_list = [label_en, label_zw]

    def update(self, status):
        label_fg = status == ful.UNTESTED and _LABEL_UNTESTED_FG or ful.BLACK
        for label in self.label_list:
            label.modify_fg(gtk.STATE_NORMAL, label_fg)
        self.modify_bg(gtk.STATE_NORMAL, ful.LABEL_COLORS[status])
        self.queue_draw()


class SubTestLabelBox(gtk.EventBox):

    def __init__(self, test):
        gtk.EventBox.__init__(self)
        self.modify_bg(gtk.STATE_NORMAL, ful.BLACK)
        label_status = ful.make_label(ful.UNTESTED, size=_LABEL_STATUS_SIZE,
                                      alignment=(0, 0.5),
                                      font=_LABEL_STATUS_FONT,
                                      fg=_LABEL_UNTESTED_FG)
        label_sep = ful.make_label(' : ', alignment=(0.5, 0.5),
                                   font=_LABEL_EN_FONT)
        label_en = ful.make_label(test.label_en, size=_ST_LABEL_EN_SIZE,
                                  alignment=(0.5, 0.5),
                                  font=_LABEL_EN_FONT)
        label_zw = ful.make_label(test.label_zw, size=_ST_LABEL_ZW_SIZE,
                                  alignment=(0.5, 0.5), font=_LABEL_ZW_FONT)
        hbox = gtk.HBox()
        hbox.pack_end(label_status, False, False)
        hbox.pack_end(label_sep, False, False)
        hbox.pack_end(label_zw, False, False)
        hbox.pack_end(label_en, False, False)
        self.add(hbox)
        self.label_status = label_status

    def update(self, status):
        self.label_status.set_text(status)
        self.label_status.modify_fg(gtk.STATE_NORMAL, ful.LABEL_COLORS[status])
        self.queue_draw()


class UiState():

    def __init__(self, test_widget_box, automated_sequence_set):

        def make_empty_test_label_widget():
            label_box = gtk.EventBox()
            label_box.modify_bg(gtk.STATE_NORMAL, ful.BLACK)
            label = ful.make_label('no active test', font=_OTHER_LABEL_FONT,
                                   alignment=(0.5, 0.5))
            label_box.add(label)
            return label_box

        def make_automated_sequence_label_widget(subtest_list):
            vbox = gtk.VBox()
            vbox.set_spacing(0)
            for subtest in subtest_list:
                label_box = SubTestLabelBox(subtest)
                self.set_label_box(subtest, label_box)
                vbox.pack_start(label_box, False, False)
            return vbox

        self._label_box_map = {}
        self._test_widget_box = test_widget_box
        self._empty_test_widget = make_empty_test_label_widget()
        self._active_test_widget = self._empty_test_widget
        self.active_test = None

        self._test_widget_box.add(self._empty_test_widget)

        self._automated_seq_widget_map = dict(
            (test, make_automated_sequence_label_widget(test.subtest_list))
            for test in automated_sequence_set)

    def status_change_callback(self, test, status):
        label_box = self._label_box_map.get(test)
        if label_box:
            label_box.update(status)

    def set_label_box(self, test, label_box):
        self._label_box_map[test] = label_box

    def set_active_test(self, test_db, test):
        '''Control what kind of widget is shown in the testing area of
        the screen.  For normal operator tests, this is just a label
        saying there is no active test.  The expectation is that the
        operator test itself has a window obscuring this message.  For
        automated sequences, since there is no other window, the no
        active test message is replaced with an updated list of
        subtest status.'''
        if test is None or test == self.active_test:
            return
        factory.log('UI active test is %s' % test_db.get_unique_id_str(test))
        self.active_test = test
        self._test_widget_box.remove(self._active_test_widget)
        active_widget = (isinstance(test, factory.AutomatedSequence)
                         and self._automated_seq_widget_map[test]
                         or self._empty_test_widget)
        self._test_widget_box.add(active_widget)
        self._active_test_widget = active_widget
        self._test_widget_box.show_all()


def state_change_handler(fd, cb_condition, test_db, test_state, ui_state):
    # Currently the 'state change' event (sent via fd) is fired when each single
    # test changed its state, but the state_change_handler will change all tests
    # at once.  To get best performance, we wait N=merge_events_duration seconds
    # to collect and accumulate all changes occured in a short period.
    events = 0
    max_merged_events = 10
    merge_events_duration = 0.2
    while (events < max_merged_events and
           len(select.select((fd,), (), (), merge_events_duration)[0]) > 0):
        fd.read(1)
        events = events + 1

    try:
        factory.log('factory_ui: refresh status (merged %s events)' % events)
        for name, status in test_state.get_all_tests().items():
            test = test_db.get_test_by_unique_id_str(name)
            ui_state.status_change_callback(test, status)
            if (status == factory.ACTIVE and
                (not isinstance(test, AutomatedSubTest))):
                ui_state.set_active_test(test_db, test)
    except Exception, e:
        factory.log('state_change_handler got exception: %s', repr(e))
    return True

def kill_active_test(control_pid):
    # Currently we kill the active test directly.
    # If we want to use the old way - notify main control to kill it,
    # then send USR1 to main control instead:
    #   os.kill(control_pid, signal.SIGUSR1)
    active_test_data = factory.get_shared_data('active_test_data')
    if active_test_data is None:
        return
    factory.log('KILLING active test %s' % repr(active_test_data))
    try:
        utils.nuke_pid(active_test_data[1])
        # To remove the dependency of utils, use kill:
        # os.kill(active_test_data[1], signal.SIGTERM)
    except OSError, e:
        factory.log('Failed to kill test: ' + repr(e))

def grab_shortcut_keys(kbd_shortcut_set, control_pid):
    disp = X_Display()
    root = disp.screen().root

    # We want to receive KeyPress events
    root.change_attributes(event_mask = X.KeyPressMask)

    keycode_map = {}
    for shortcut in kbd_shortcut_set:
        keysym = gdk.keyval_from_name(shortcut)
        keycode = disp.keysym_to_keycode(keysym)
        keycode_map[keycode] = shortcut
        root.grab_key(keycode, X.ControlMask, 1,
                      X.GrabModeAsync, X.GrabModeAsync)

    # This flushes the XGrabKey calls to the server.
    for x in range(0, root.display.pending_events()):
        root.display.next_event()

    def handle_xevent(src, cond, xhandle=root.display,
                      keycode_map=keycode_map,
                      control_pid=control_pid):
        for i in range(0, xhandle.pending_events()):
            xevent = xhandle.next_event()
            if xevent.type == X.KeyPress:
                keycode = xevent.detail
                factory.set_shared_data('activated_kbd_shortcut',
                                        keycode_map[keycode])
                kill_active_test(control_pid)
        return True

    gobject.io_add_watch(root.display, gobject.IO_IN, handle_xevent)


def main(test_list, status_file_path, control_pid):
    '''This process is launched by the autotest suite_Factory control
    process, which should be identified by the <control pid> cmdline
    argument.  When operators press keyboard shortcuts, the shortcut
    value is sent by set_shared_data() and a SIGUSR1 is sent to
    the control program.'''

    test_db = factory.TestDatabase(test_list)
    test_state = factory_state.get_instance()

    window = gtk.Window(gtk.WINDOW_TOPLEVEL)
    window.connect('destroy', lambda _: gtk.main_quit())
    window.modify_bg(gtk.STATE_NORMAL, ful.BLACK)

    screen = window.get_screen()
    if (screen is None):
        factory.log('ERROR: communication with the X server is not working, ' +
                    'could not find a working screen.  UI exiting.')
        sys.exit(1)

    screen_size = (screen.get_width(), screen.get_height())
    window.set_size_request(*screen_size)

    label_trough = gtk.VBox()
    label_trough.set_spacing(0)

    rhs_box = gtk.EventBox()
    rhs_box.modify_bg(gtk.STATE_NORMAL, _LABEL_TROUGH_COLOR)
    rhs_box.add(label_trough)

    console_box = gtk.EventBox()
    console_box.set_size_request(-1, 180)
    console_box.modify_bg(gtk.STATE_NORMAL, ful.BLACK)

    test_widget_box = gtk.Alignment(xalign=0.5, yalign=0.5)
    test_widget_box.set_size_request(-1, -1)

    lhs_box = gtk.VBox()
    lhs_box.pack_end(console_box, False, False)
    lhs_box.pack_start(test_widget_box)
    lhs_box.pack_start(ful.make_hsep(3), False, False)

    base_box = gtk.HBox()
    base_box.pack_end(rhs_box, False, False)
    base_box.pack_end(ful.make_vsep(3), False, False)
    base_box.pack_start(lhs_box)

    window.connect('key-release-event', handle_key_release_event)
    window.add_events(gtk.gdk.KEY_RELEASE_MASK)

    ui_state = UiState(test_widget_box, test_db.get_automated_sequences())

    for test in test_list:
        label_box = TestLabelBox(test, True)
        ui_state.set_label_box(test, label_box)
        label_trough.pack_start(label_box, False, False)
        label_trough.pack_start(ful.make_hsep(), False, False)

    window.add(base_box)
    window.show_all()

    kbd_shortcut_db = factory.KbdShortcutDatabase(test_list, test_db)
    grab_shortcut_keys(kbd_shortcut_db.get_shortcut_keys(), control_pid)

    ful.hide_cursor(window.window)

    test_widget_allocation = test_widget_box.get_allocation()
    test_widget_size = (test_widget_allocation.width,
                        test_widget_allocation.height)
    factory.set_shared_data('test_widget_size', test_widget_size)

    console = Console(console_box.get_allocation())
    gobject.io_add_watch(sys.stdin, gobject.IO_IN, state_change_handler,
                         test_db, test_state, ui_state)

    # This is to notify suite_Factory/control that UI is ready.
    sys.stdout.write('Factory UI Ready\n')
    sys.stdout.flush()

    factory.log('factory_ui setup done, starting gtk.main()...')
    gtk.main()
    factory.log('factory_ui gtk.main() finished, exiting.')

if __name__ == '__main__':

    if len(sys.argv) != 4:
        print ('usage: %s <test list path> <status file path> <control pid>' %
               sys.argv[0])
    test_list_path, status_file_path, control_pid_str = sys.argv[1:]
    control_pid = int(control_pid_str)

    execfile(test_list_path)

    main(TEST_LIST, status_file_path, control_pid)
