blob: d31b01a55c5db3c534a8c59577152c63d8e7728f [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Guide the user to perform gestures. Record and validate the gestures."""
import fcntl
import os
import subprocess
import sys
import time
import common_util
import firmware_utils
import fuzzy
import mini_color
import test_conf as conf
import validators
sys.path.append('/usr/local/autotest/bin/input')
import input_device
class TestFlow:
"""Guide the user to perform gestures. Record and validate the gestures."""
def __init__(self, device_geometry, device, win, parser, output):
self.device_geometry = device_geometry
self.device = device
self.device_node = self.device.device_node
self.firmware_version = self.device.get_firmware_version()
self.board = firmware_utils.get_board()
self.output = output
self._get_record_cmd()
self.win = win
self.win.set_prompt(self._get_prompt_result())
self.parser = parser
self.packets = None
self.gesture_file_name = None
self.prefix_space = self.output.get_prefix_space()
self.scores = []
self.gesture_list = conf.gesture_list
self._get_all_gesture_variations()
self.init_flag = False
self.system_device = self._non_blocking_open(self.device_node)
self.evdev_device = input_device.InputEvent()
self.screen_shot = firmware_utils.ScreenShot(self.geometry_str)
def __del__(self):
self.system_device.close()
def _non_blocking_open(self, filename):
"""Open the file in non-blocing mode."""
fd = open(filename)
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
return fd
def _non_blocking_read(self, dev, fd):
"""Non-blocking read on fd."""
try:
dev.read(fd)
event = (dev.tv_sec, dev.tv_usec, dev.type, dev.code, dev.value)
except Exception, e:
event = None
return event
def _reopen_system_device(self):
"""Close the device and open a new one."""
self.system_device.close()
self.system_device = open(self.device_node)
self.system_device = self._non_blocking_open(self.device_node)
def _get_prompt_next(self):
"""Prompt for next gesture."""
prompt = ("Press SPACE to save this file and go to next test,\n"
" 'm' to save this file and record again,\n"
" 'd' to delete this file and try again,\n"
" 'x' to discard this file and exit.")
return prompt
def _get_prompt_result(self):
"""Prompt to see test result through timeout callback."""
prompt = ("Perform the gesture now.\n"
"See the test result on the right after finger lifted.")
return prompt
def _get_prompt_result_for_keyboard(self):
"""Prompt to see test result using keyboard."""
prompt = ("Press SPACE to see the test result,\n"
" 'd' to delete this file and try again,\n"
" 'x' to exit.")
return prompt
def _get_prompt_no_data(self):
"""Prompt to remind user of performing gestures."""
prompt = ("You need to perform the specified gestures "
"before pressing SPACE.\n")
return prompt + self._get_prompt_result()
def _get_record_cmd(self):
"""Get the device event record command."""
self.record_program = 'mtplot'
if not common_util.program_exists(self.record_program):
msg = 'Error: the program "%s" does not exist in $PATH.'
self.output.print_report(msg % self.record_program)
exit(1)
display_name = firmware_utils.get_display_name()
self.geometry_str = '%dx%d+%d+%d' % self.device_geometry
format_str = '%s %s -d %s -g %s'
self.record_cmd = format_str % (self.record_program,
self.device_node,
display_name,
self.geometry_str)
self.output.print_report('Record program: %s' % self.record_cmd)
def _span_seq(self, seq1, seq2):
"""Span sequence seq1 over sequence seq2.
E.g., seq1 = (('a', 'b'), 'c')
seq2 = ('1', ('2', '3'))
res = (('a', 'b', '1'), ('a', 'b', '2', '3'),
('c', '1'), ('c', '2', '3'))
E.g., seq1 = ('a', 'b')
seq2 = ('1', '2', '3')
res = (('a', '1'), ('a', '2'), ('a', '3'),
('b', '1'), ('b', '2'), ('b', '3'))
E.g., seq1 = (('a', 'b'), ('c', 'd'))
seq2 = ('1', '2', '3')
res = (('a', 'b', '1'), ('a', 'b', '2'), ('a', 'b', '3'),
('c', 'd', '1'), ('c', 'd', '2'), ('c', 'd', '3'))
"""
to_list = lambda s: list(s) if isinstance(s, tuple) else [s]
return tuple(tuple(to_list(s1) + to_list(s2)) for s1 in seq1
for s2 in seq2)
def span_variations(self, seq):
"""Span the variations of a gesture."""
return reduce(self._span_seq, seq) if isinstance(seq[0], tuple) else seq
def _stop(self):
"""Terminate the recording process."""
self.record_proc.poll()
# Terminate the process only when it was not terminated yet.
if self.record_proc.returncode is None:
self.record_proc.terminate()
self.record_proc.wait()
self.output.print_window('')
def _get_gesture_image_name(self):
"""Get the gesture file base name without file extension."""
filepath = os.path.splitext(self.gesture_file_name)[0]
self.gesture_image_name = filepath + '.png'
return filepath
def _stop_record_and_post_image(self):
"""Terminate the recording process."""
self.screen_shot.dump(self._get_gesture_image_name())
self.record_proc.terminate()
self.record_proc.wait()
self.win.set_image(self.gesture_image_name)
def _create_prompt(self, test, variation):
"""Create a color prompt."""
prompt = test.prompt
if isinstance(variation, tuple):
subprompt = reduce(lambda s1, s2: s1 + s2,
tuple(test.subprompt[s] for s in variation))
elif variation is None or test.subprompt is None:
subprompt = None
else:
subprompt = test.subprompt[variation]
if subprompt is None:
color_prompt = prompt
else:
color_prompt = mini_color.color_string(prompt, '{', '}', 'green')
color_prompt = color_prompt.format(*subprompt)
monochrome_prompt = prompt.format(*subprompt)
color_msg_format = mini_color.color_string('\n<%s>:\n%s%s', '<', '>',
'blue')
color_msg = color_msg_format % (test.name, self.prefix_space,
color_prompt)
msg = '%s: %s' % (test.name, monochrome_prompt)
prompt_choice = 'Enter your choice: '
return (msg, color_msg, prompt_choice)
def _choice_exit(self):
"""Procedure to exit."""
self._stop()
if os.path.exists(self.gesture_file_name):
os.remove(self.gesture_file_name)
self.output.print_report(self.deleted_msg)
def _stop_record_and_rm_file(self):
"""Stop recording process and remove the current gesture file."""
self._stop()
if os.path.exists(self.gesture_file_name):
os.remove(self.gesture_file_name)
self.output.print_report(self.deleted_msg)
def _create_gesture_file_name(self, gesture, variation):
"""Create the gesture file name based on its variation."""
if variation is None:
gesture_name = gesture.name
else:
if type(variation) is tuple:
name_list = [gesture.name,] + list(variation)
else:
name_list = [gesture.name, variation]
gesture_name = '.'.join(name_list)
basename = conf.filename.sep.join([
gesture_name,
firmware_utils.get_board(),
'fw_' + self.firmware_version,
firmware_utils.get_current_time_str()])
filename = '.'.join([basename, conf.filename.ext])
return filename
def _add_scores(self, new_scores):
"""Add the new scores of a single gesture to the scores list."""
if new_scores is not None:
self.scores += new_scores
def _final_scores(self, scores):
"""Print the final score."""
# Note: conf.score_aggregator uses a function in fuzzy module.
final_score = eval(conf.score_aggregator)(scores)
self.output.print_report('\nFinal score: %s\n' % str(final_score))
def _handle_user_choice_save_after_parsing(self, next_gesture):
"""Handle user choice for saving the parsed gesture file."""
self.output.print_window('')
self.output.print_report(self.saved_msg)
self._add_scores(self.new_scores)
self.win.set_prompt(self._get_prompt_result())
if self._pre_setup_this_gesture_variation(next_gesture=next_gesture):
# There are more gestures.
self._setup_this_gesture_variation()
else:
# No more gesture.
self._final_scores(self.scores)
self.output.stop()
self.win.stop()
self.packets = None
def _handle_user_choice_discard_after_parsing(self):
"""Handle user choice for discarding the parsed gesture file."""
self.output.print_window('')
self.win.set_prompt(self._get_prompt_result())
self._setup_this_gesture_variation()
self.packets = None
def _handle_user_choice_exit_after_parsing(self):
"""Handle user choice to exit after the gesture file is parsed."""
self._stop_record_and_rm_file()
self.win.stop()
def _handle_user_choice_validate_before_parsing(self):
"""Handle user choice for validating before gesture file is parsed."""
# Parse the device events. Make sure there are events.
self.packets = self.parser.parse_file(self.gesture_file_name)
if self.packets:
# Validate this gesture and get the results.
(self.new_scores, msg_list) = validators.validate(self.packets,
self.gesture,
self.variation)
self.output.print_all(msg_list)
self.gesture_file.close()
self.win.set_prompt(self._get_prompt_next())
self._stop_record_and_post_image()
else:
self.win.set_prompt(self._get_prompt_no_data(), color='red')
def _handle_user_choice_exit_before_parsing(self):
"""Handle user choice to exit before the gesture file is parsed."""
self.gesture_file.close()
self._stop_record_and_rm_file()
self.win.stop()
def _is_parsing_gesture_file_done(self):
"""Is parsing the gesture file done?"""
return self.packets is not None
def user_choice_callback(self, widget, event):
"""A callback to handle the key pressed by the user.
This is the primary GUI event-driven method handling the user input.
"""
choice = event.string
if self._is_parsing_gesture_file_done():
# Save this gesture file and go to next gesture.
if choice in (' ', '\r'):
self._handle_user_choice_save_after_parsing(next_gesture=True)
# Save this file and perform the same gesture again.
elif choice == 'm':
self._handle_user_choice_save_after_parsing(next_gesture=False)
# Discard this file and perform the gesture again.
elif choice == 'd':
self._handle_user_choice_discard_after_parsing()
# The user wants to exit.
elif choice == 'x':
self._handle_user_choice_exit_after_parsing()
# The user presses any wrong key.
else:
self.win.set_prompt(self._get_prompt_next(), color='red')
else:
# Save this gesture file and go to next gesture.
if choice in (' ', '\r'):
self._handle_user_choice_validate_before_parsing()
# Discard this file and perform the gesture again.
elif choice == 'd':
self._handle_user_choice_discard_after_parsing()
elif choice == 'x':
self._handle_user_choice_exit_before_parsing()
# The user presses any wrong key.
else:
self.win.set_prompt(self._get_prompt_result(), color='red')
def _get_all_gesture_variations(self):
"""Get all variations for all gestures."""
gesture_variations_list = []
for gesture in self.gesture_list:
variations = self.span_variations(gesture.variations)
for variation in variations:
gesture_variations_list.append((gesture, variation))
self.gesture_variations = iter(gesture_variations_list)
def gesture_timeout_callback(self):
"""A callback watching whether a gesture has timed out."""
if self.gesture_continues_flag:
self.gesture_continues_flag = False
return True
else:
self.win.set_input_focus()
self._handle_user_choice_validate_before_parsing()
self.win.remove_event_source(self.gesture_file_watch_tag)
self.win.set_input_focus()
return False
def gesture_file_watch_callback(self, fd, condition, evdev_device):
"""A callback to watch the device input."""
# Read the device node continuously until end
event = True
while event:
event = self._non_blocking_read(evdev_device, fd)
self.gesture_continues_flag = True
if (not self.gesture_begins_flag):
self.gesture_begins_flag = True
self.win.register_timeout_add(self.gesture_timeout_callback,
firmware_utils.Gesture.TIMEOUT)
return True
def init_gesture_setup_callback(self, widget, event):
"""A callback to set up environment before a user starts a gesture."""
if not self.init_flag:
self.init_flag = True
self._pre_setup_this_gesture_variation()
self._setup_this_gesture_variation()
def _pre_setup_this_gesture_variation(self, next_gesture=True):
"""Get gesture, variation, filename, prompt, etc."""
if next_gesture:
gesture_variation = next(self.gesture_variations, None)
if gesture_variation is None:
return False
self.gesture, self.variation = gesture_variation
self.gesture_file_name = os.path.join(self.output.log_dir,
self._create_gesture_file_name(self.gesture, self.variation))
(msg, color_msg, _) = self._create_prompt(self.gesture, self.variation)
self.win.set_gesture_name(msg)
self.output.print_report(color_msg)
self.saved_msg = '(saved: %s)\n' % self.gesture_file_name
self.deleted_msg = '(deleted: %s)\n' % self.gesture_file_name
return True
def _setup_this_gesture_variation(self):
"""Fork a new process for mtplot. Add io watch for the gesture file."""
self.gesture_file = open(self.gesture_file_name, 'w')
self.record_proc = subprocess.Popen(self.record_cmd.split(),
stdout=self.gesture_file)
# Set input focus to the firmware window rather than mtplot
time.sleep(0.2)
self.win.set_input_focus()
# Watch if data come in to the monitored file.
self.gesture_begins_flag = False
self._reopen_system_device()
self.gesture_file_watch_tag = self.win.register_io_add_watch(
self.gesture_file_watch_callback, self.system_device,
self.evdev_device)