| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import glob |
| import logging |
| import os |
| import subprocess |
| import tempfile |
| import time |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| |
| |
| class Device(object): |
| """Information about a specific input device.""" |
| def __init__(self, input_type): |
| self.input_type = input_type # e.g. 'touchpad' |
| self.emulated = False # Whether device is real or not |
| self.emulation_process = None # Process of running emulation |
| self.name = 'unknown' # e.g. 'Atmel maXTouch Touchpad' |
| self.fw_id = None # e.g. '6.0' |
| self.hw_id = None # e.g. '90.0' |
| self.node = None # e.g. '/dev/input/event4' |
| self.device_dir = None # e.g. '/sys/class/input/event4/device/device' |
| |
| def __str__(self): |
| s = '%s:' % self.input_type |
| s += '\n Name: %s' % self.name |
| s += '\n Node: %s' % self.node |
| s += '\n hw_id: %s' % self.hw_id |
| s += '\n fw_id: %s' % self.fw_id |
| s += '\n Emulated: %s' % self.emulated |
| return s |
| |
| |
| class InputPlayback(object): |
| """ |
| Provides an interface for playback and emulating peripherals via evemu-*. |
| |
| Example use: player = InputPlayback() |
| player.emulate(property_file=path_to_file) |
| player.find_connected_inputs() |
| player.playback(path_to_file) |
| player.blocking_playback(path_to_file) |
| player.close() |
| |
| """ |
| |
| _DEFAULT_PROPERTY_FILES = {'mouse': 'mouse.prop', |
| 'keyboard': 'keyboard.prop'} |
| _PLAYBACK_COMMAND = 'evemu-play --insert-slot0 %s < %s' |
| |
| # Define the overhead (500 ms) elapsed for launching evemu-play and the |
| # latency from event injection to the first event read by Chrome Input |
| # thread. |
| _PLAYBACK_OVERHEAD_LATENCY = 0.5 |
| |
| # Define a keyboard as anything with any keys #2 to #248 inclusive, |
| # as defined in the linux input header. This definition includes things |
| # like the power button, so reserve the "keyboard" label for things with |
| # letters/numbers and define the rest as "other_keyboard". |
| _MINIMAL_KEYBOARD_KEYS = ['1', 'Q', 'SPACE'] |
| _KEYBOARD_KEYS = [ |
| '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'MINUS', 'EQUAL', |
| 'BACKSPACE', 'TAB', 'Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', |
| 'P', 'LEFTBRACE', 'RIGHTBRACE', 'ENTER', 'LEFTCTRL', 'A', 'S', 'D', |
| 'F', 'G', 'H', 'J', 'K', 'L', 'SEMICOLON', 'APOSTROPHE', 'GRAVE', |
| 'LEFTSHIFT', 'BACKSLASH', 'Z', 'X', 'C', 'V', 'B', 'N', 'M', |
| 'COMMA', 'DOT', 'SLASH', 'RIGHTSHIFT', 'KPASTERISK', 'LEFTALT', |
| 'SPACE', 'CAPSLOCK', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', |
| 'F9', 'F10', 'NUMLOCK', 'SCROLLLOCK', 'KP7', 'KP8', 'KP9', |
| 'KPMINUS', 'KP4', 'KP5', 'KP6', 'KPPLUS', 'KP1', 'KP2', 'KP3', |
| 'KP0', 'KPDOT', 'ZENKAKUHANKAKU', '102ND', 'F11', 'F12', 'RO', |
| 'KATAKANA', 'HIRAGANA', 'HENKAN', 'KATAKANAHIRAGANA', 'MUHENKAN', |
| 'KPJPCOMMA', 'KPENTER', 'RIGHTCTRL', 'KPSLASH', 'SYSRQ', 'RIGHTALT', |
| 'LINEFEED', 'HOME', 'UP', 'PAGEUP', 'LEFT', 'RIGHT', 'END', 'DOWN', |
| 'PAGEDOWN', 'INSERT', 'DELETE', 'MACRO', 'MUTE', 'VOLUMEDOWN', |
| 'VOLUMEUP', 'POWER', 'KPEQUAL', 'KPPLUSMINUS', 'PAUSE', 'SCALE', |
| 'KPCOMMA', 'HANGEUL', 'HANGUEL', 'HANJA', 'YEN', 'LEFTMETA', |
| 'RIGHTMETA', 'COMPOSE', 'STOP', 'AGAIN', 'PROPS', 'UNDO', 'FRONT', |
| 'COPY', 'OPEN', 'PASTE', 'FIND', 'CUT', 'HELP', 'MENU', 'CALC', |
| 'SETUP', 'WAKEUP', 'FILE', 'SENDFILE', 'DELETEFILE', 'XFER', |
| 'PROG1', 'PROG2', 'WWW', 'MSDOS', 'COFFEE', 'SCREENLOCK', |
| 'DIRECTION', 'CYCLEWINDOWS', 'MAIL', 'BOOKMARKS', 'COMPUTER', |
| 'BACK', 'FORWARD', 'CLOSECD', 'EJECTCD', 'EJECTCLOSECD', 'NEXTSONG', |
| 'PLAYPAUSE', 'PREVIOUSSONG', 'STOPCD', 'RECORD', 'REWIND', 'PHONE', |
| 'ISO', 'CONFIG', 'HOMEPAGE', 'REFRESH', 'EXIT', 'MOVE', 'EDIT', |
| 'SCROLLUP', 'SCROLLDOWN', 'KPLEFTPAREN', 'KPRIGHTPAREN', 'NEW', |
| 'REDO', 'F13', 'F14', 'F15', 'F16', 'F17', 'F18', 'F19', 'F20', |
| 'F21', 'F22', 'F23', 'F24', 'PLAYCD', 'PAUSECD', 'PROG3', 'PROG4', |
| 'DASHBOARD', 'SUSPEND', 'CLOSE', 'PLAY', 'FASTFORWARD', 'BASSBOOST', |
| 'PRINT', 'HP', 'CAMERA', 'SOUND', 'QUESTION', 'EMAIL', 'CHAT', |
| 'SEARCH', 'CONNECT', 'FINANCE', 'SPORT', 'SHOP', 'ALTERASE', |
| 'CANCEL', 'BRIGHTNESSDOWN', 'BRIGHTNESSUP', 'MEDIA', |
| 'SWITCHVIDEOMODE', 'KBDILLUMTOGGLE', 'KBDILLUMDOWN', 'KBDILLUMUP', |
| 'SEND', 'REPLY', 'FORWARDMAIL', 'SAVE', 'DOCUMENTS', 'BATTERY', |
| 'BLUETOOTH', 'WLAN', 'UWB', 'UNKNOWN', 'VIDEO_NEXT', 'VIDEO_PREV', |
| 'BRIGHTNESS_CYCLE', 'BRIGHTNESS_AUTO', 'BRIGHTNESS_ZERO', |
| 'DISPLAY_OFF', 'WWAN', 'WIMAX', 'RFKILL', 'MICMUTE'] |
| |
| |
| def __init__(self): |
| self.devices = {} |
| self._emulated_device = None |
| |
| |
| def has(self, input_type): |
| """Return True/False if device has a input of given type. |
| |
| @param input_type: string of type, e.g. 'touchpad' |
| |
| """ |
| return input_type in self.devices |
| |
| |
| def _get_input_events(self): |
| """Return a list of all input event nodes.""" |
| return glob.glob('/dev/input/event*') |
| |
| |
| def emulate(self, input_type='mouse', property_file=None): |
| """ |
| Emulate the given input (or default for type) with evemu-device. |
| |
| Emulating more than one of the same device type will only allow playback |
| on the last one emulated. The name of the last-emulated device is |
| noted to be sure this is the case. |
| |
| Property files are made with the evemu-describe command, |
| e.g. 'evemu-describe /dev/input/event12 > property_file'. |
| |
| @param input_type: 'mouse' or 'keyboard' to use default property files. |
| Need not be specified if supplying own file. |
| @param property_file: Property file of device to be emulated. Generate |
| with 'evemu-describe' command on test image. |
| |
| """ |
| new_device = Device(input_type) |
| new_device.emulated = True |
| |
| # Checks for any previous emulated device and kills the process |
| self.close() |
| |
| if not property_file: |
| if input_type not in self._DEFAULT_PROPERTY_FILES: |
| raise error.TestError('Please supply a property file for input ' |
| 'type %s' % input_type) |
| current_dir = os.path.dirname(os.path.realpath(__file__)) |
| property_file = os.path.join( |
| current_dir, self._DEFAULT_PROPERTY_FILES[input_type]) |
| if not os.path.isfile(property_file): |
| raise error.TestError('Property file %s not found!' % property_file) |
| |
| with open(property_file) as fh: |
| name_line = fh.readline() # Format "N: NAMEOFDEVICE" |
| new_device.name = name_line[3:-1] |
| |
| logging.info('Emulating %s %s (%s).', input_type, new_device.name, |
| property_file) |
| num_events_before = len(self._get_input_events()) |
| new_device.emulation_process = subprocess.Popen( |
| ['evemu-device', property_file], stdout=subprocess.PIPE) |
| |
| self._emulated_device = new_device |
| |
| # Ensure there are more input events than there were before. |
| try: |
| expected = num_events_before + 1 |
| exception = error.TestError('Error emulating %s!' % input_type) |
| utils.poll_for_condition( |
| lambda: len(self._get_input_events()) == expected, |
| exception=exception) |
| except error.TestError as e: |
| self.close() |
| raise e |
| |
| |
| def _find_device_properties(self, device): |
| """Return string of properties for given node. |
| |
| @return: string of properties. |
| |
| """ |
| with tempfile.NamedTemporaryFile() as temp_file: |
| filename = temp_file.name |
| evtest_process = subprocess.Popen(['evtest', device], |
| stdout=temp_file) |
| |
| def find_exit(): |
| """Polling function for end of output.""" |
| interrupt_cmd = 'grep "interrupt to exit" %s | wc -l' % filename |
| line_count = utils.run(interrupt_cmd).stdout.strip() |
| return line_count != '0' |
| |
| utils.poll_for_condition(find_exit) |
| evtest_process.kill() |
| temp_file.seek(0) |
| props = temp_file.read() |
| return props |
| |
| |
| def _determine_input_type(self, props): |
| """Find input type (if any) from a string of properties. |
| |
| @return: string of type, or None |
| |
| """ |
| if props.find('REL_X') >= 0 and props.find('REL_Y') >= 0: |
| if (props.find('ABS_MT_POSITION_X') >= 0 and |
| props.find('ABS_MT_POSITION_Y') >= 0): |
| return 'multitouch_mouse' |
| else: |
| return 'mouse' |
| if props.find('ABS_X') >= 0 and props.find('ABS_Y') >= 0: |
| if (props.find('BTN_STYLUS') >= 0 or |
| props.find('BTN_STYLUS2') >= 0 or |
| props.find('BTN_TOOL_PEN') >= 0): |
| return 'stylus' |
| if (props.find('ABS_PRESSURE') >= 0 or |
| props.find('BTN_TOUCH') >= 0): |
| if (props.find('BTN_LEFT') >= 0 or |
| props.find('BTN_MIDDLE') >= 0 or |
| props.find('BTN_RIGHT') >= 0 or |
| props.find('BTN_TOOL_FINGER') >= 0): |
| return 'touchpad' |
| else: |
| return 'touchscreen' |
| if props.find('BTN_LEFT') >= 0: |
| return 'touchscreen' |
| if props.find('KEY_') >= 0: |
| for key in self._MINIMAL_KEYBOARD_KEYS: |
| if props.find('KEY_%s' % key) >= 0: |
| return 'keyboard' |
| for key in self._KEYBOARD_KEYS: |
| if props.find('KEY_%s' % key) >= 0: |
| return 'other_keyboard' |
| return |
| |
| |
| def _get_contents_of_file(self, filepath): |
| """Return the contents of the given file. |
| |
| @param filepath: string of path to file |
| |
| @returns: contents of file. Assumes file exists. |
| |
| """ |
| return utils.run('cat %s' % filepath).stdout.strip() |
| |
| |
| def _find_input_name(self, device_dir, name=None): |
| """Find the associated input* name for the given device directory. |
| |
| E.g. given '/dev/input/event4', return 'input3'. |
| |
| @param device_dir: the device directory. |
| @param name: the device name. |
| |
| |
| @returns: string of the associated input name. |
| |
| """ |
| input_names = glob.glob(os.path.join(device_dir, 'input', 'input*')) |
| for input_name in input_names: |
| name_path = os.path.join(input_name, 'name') |
| if not os.path.exists(name_path): |
| continue |
| if name == self._get_contents_of_file(name_path): |
| return os.path.basename(input_name) |
| # Raise if name could not be matched. |
| logging.error('Input names found(%s): %s', device_dir, input_names) |
| raise error.TestError('Could not match input* to this device!') |
| |
| |
| def _find_device_ids_for_styluses(self, device_dir, name=None): |
| """Find the fw_id and hw_id for the stylus in the given directory. |
| |
| @param device_dir: the device directory. |
| @param name: the device name. |
| |
| @returns: firmware id, hardware id for this device. |
| |
| """ |
| hw_id = 'wacom' # Wacom styluses don't actually have hwids. |
| fw_id = None |
| |
| # Find fw_id for wacom styluses via wacom_flash command. Arguments |
| # to this command are wacom_flash (dummy placeholder arg) -a (i2c name) |
| # Find i2c name if any /dev/i2c-* link to this device's input event. |
| input_name = self._find_input_name(device_dir, name) |
| i2c_paths = glob.glob('/dev/i2c-*') |
| for i2c_path in i2c_paths: |
| class_folder = i2c_path.replace('dev', 'sys/class/i2c-adapter') |
| input_folder_path = os.path.join(class_folder, '*', '*', |
| 'input', input_name) |
| contents_of_input_folder = glob.glob(input_folder_path) |
| if len(contents_of_input_folder) != 0: |
| i2c_name = i2c_path[len('/dev/'):] |
| cmd = 'wacom_flash dummy -a %s' % i2c_name |
| # Do not throw an exception if wacom_flash does not exist. |
| result = utils.run(cmd, ignore_status=True) |
| if result.exit_status == 0: |
| fw_id = result.stdout.split()[-1] |
| break |
| |
| if fw_id == '': |
| fw_id = None |
| return fw_id, hw_id |
| |
| |
| def _find_device_ids(self, device_dir, input_type, name): |
| """Find the fw_id and hw_id for the given device directory. |
| |
| Finding fw_id and hw_id applicable only for touchpads, touchscreens, |
| and styluses. |
| |
| @param device_dir: the device directory. |
| @param input_type: string of input type. |
| @param name: string of input name. |
| |
| @returns: firmware id, hardware id |
| |
| """ |
| fw_id, hw_id = None, None |
| |
| if not device_dir or input_type not in ['touchpad', 'touchscreen', |
| 'stylus']: |
| return fw_id, hw_id |
| if input_type == 'stylus': |
| return self._find_device_ids_for_styluses(device_dir, name) |
| |
| # Touch devices with custom drivers usually save this info as a file. |
| fw_filenames = ['fw_version', 'firmware_version', 'firmware_id'] |
| for fw_filename in fw_filenames: |
| fw_path = os.path.join(device_dir, fw_filename) |
| if os.path.exists(fw_path): |
| if fw_id: |
| logging.warning('Found new potential fw_id when previous ' |
| 'value was %s!', fw_id) |
| fw_id = self._get_contents_of_file(fw_path) |
| |
| hw_filenames = ['hw_version', 'product_id', 'board_id'] |
| for hw_filename in hw_filenames: |
| hw_path = os.path.join(device_dir, hw_filename) |
| if os.path.exists(hw_path): |
| if hw_id: |
| logging.warning('Found new potential hw_id when previous ' |
| 'value was %s!', hw_id) |
| hw_id = self._get_contents_of_file(hw_path) |
| |
| # Hw_ids for Weida and 2nd gen Synaptics are different. |
| if not hw_id: |
| id_folder = os.path.abspath(os.path.join(device_dir, '..', 'id')) |
| product_path = os.path.join(id_folder, 'product') |
| vendor_path = os.path.join(id_folder, 'vendor') |
| |
| if os.path.isfile(product_path): |
| product = self._get_contents_of_file(product_path) |
| if name.startswith('WD'): # Weida ts, e.g. sumo |
| if os.path.isfile(vendor_path): |
| vendor = self._get_contents_of_file(vendor_path) |
| hw_id = vendor + product |
| else: # Synaptics tp or ts, e.g. heli, lulu, setzer |
| hw_id = product |
| |
| if not fw_id: |
| # Fw_ids for 2nd gen Synaptics can only be found via rmi4update. |
| # See if any /dev/hidraw* link to this device's input event. |
| input_name = self._find_input_name(device_dir, name) |
| hidraws = glob.glob('/dev/hidraw*') |
| for hidraw in hidraws: |
| class_folder = hidraw.replace('dev', 'sys/class/hidraw') |
| input_folder_path = os.path.join(class_folder, 'device', |
| 'input', input_name) |
| if os.path.exists(input_folder_path): |
| fw_id = utils.run('rmi4update -p -d %s' % hidraw, |
| ignore_status=True).stdout.strip() |
| if fw_id == '': |
| fw_id = None |
| break |
| |
| return fw_id, hw_id |
| |
| |
| def find_connected_inputs(self): |
| """Determine the nodes of all present input devices, if any. |
| |
| Cycle through all possible /dev/input/event* and find which ones |
| are touchpads, touchscreens, mice, keyboards, etc. |
| These nodes can be used for playback later. |
| If the type of input is already emulated, prefer that device. Otherwise, |
| prefer the last node found of that type (e.g. for multiple touchpads). |
| Record the found devices in self.devices. |
| |
| """ |
| self.devices = {} # Discard any previously seen nodes. |
| |
| input_events = self._get_input_events() |
| for event in input_events: |
| properties = self._find_device_properties(event) |
| input_type = self._determine_input_type(properties) |
| if input_type: |
| new_device = Device(input_type) |
| new_device.node = event |
| |
| class_folder = event.replace('dev', 'sys/class') |
| name_file = os.path.join(class_folder, 'device', 'name') |
| if os.path.isfile(name_file): |
| name = self._get_contents_of_file(name_file) |
| logging.info('Found %s: %s at %s.', input_type, name, event) |
| |
| # If a particular device is expected, make sure name matches. |
| if (self._emulated_device and |
| self._emulated_device.input_type == input_type): |
| if self._emulated_device.name != name: |
| continue |
| else: |
| new_device.emulated = True |
| process = self._emulated_device.emulation_process |
| new_device.emulation_process = process |
| new_device.name = name |
| |
| # Find the devices folder containing power info |
| # e.g. /sys/class/event4/device/device |
| # Search that folder for hwid and fwid |
| device_dir = os.path.join(class_folder, 'device', 'device') |
| if os.path.exists(device_dir): |
| new_device.device_dir = device_dir |
| new_device.fw_id, new_device.hw_id = self._find_device_ids( |
| device_dir, input_type, new_device.name) |
| |
| if new_device.emulated: |
| self._emulated_device = new_device |
| |
| self.devices[input_type] = new_device |
| logging.debug(self.devices[input_type]) |
| |
| |
| def playback(self, filepath, input_type='touchpad'): |
| """Playback a given input file. |
| |
| Create input file using evemu-record. |
| E.g. 'evemu-record $NODE -1 > $FILENAME' |
| |
| @param filepath: path to the input file on the DUT. |
| @param input_type: name of device type; 'touchpad' by default. |
| Types are returned by the _determine_input_type() |
| function. |
| input_type must be known. Check using has(). |
| |
| """ |
| assert(input_type in self.devices) |
| node = self.devices[input_type].node |
| logging.info('Playing back finger-movement on %s, file=%s.', node, |
| filepath) |
| utils.run(self._PLAYBACK_COMMAND % (node, filepath)) |
| |
| |
| def blocking_playback(self, filepath, input_type='touchpad'): |
| """Playback a given set of inputs and sleep for duration. |
| |
| The input file is of the format <name>\nE: <time> <input>\nE: ... |
| Find the total time by the difference between the first and last input. |
| |
| @param filepath: path to the input file on the DUT. |
| @param input_type: name of device type; 'touchpad' by default. |
| Types are returned by the _determine_input_type() |
| function. |
| input_type must be known. Check using has(). |
| |
| """ |
| with open(filepath) as fh: |
| lines = fh.readlines() |
| start = float(lines[0].split(' ')[1]) |
| end = float(lines[-1].split(' ')[1]) |
| sleep_time = end - start + self._PLAYBACK_OVERHEAD_LATENCY |
| start_time = time.time() |
| self.playback(filepath, input_type) |
| end_time = time.time() |
| elapsed_time = end_time - start_time |
| if elapsed_time < sleep_time: |
| sleep_time -= elapsed_time |
| logging.info('Blocking for %s seconds after playback.', sleep_time) |
| time.sleep(sleep_time) |
| |
| |
| def blocking_playback_of_default_file(self, filename, input_type='mouse'): |
| """Playback a default file and sleep for duration. |
| |
| Use a default gesture file for the default keyboard/mouse, saved in |
| this folder. |
| Device should be emulated first. |
| |
| @param filename: the name of the file (path is to this folder). |
| @param input_type: name of device type; 'mouse' by default. |
| Types are returned by the _determine_input_type() |
| function. |
| input_type must be known. Check using has(). |
| |
| """ |
| current_dir = os.path.dirname(os.path.realpath(__file__)) |
| gesture_file = os.path.join(current_dir, filename) |
| self.blocking_playback(gesture_file, input_type=input_type) |
| |
| |
| def close(self): |
| """Kill emulation if necessary.""" |
| if self._emulated_device: |
| num_events_before = len(self._get_input_events()) |
| device_name = self._emulated_device.name |
| |
| self._emulated_device.emulation_process.kill() |
| |
| # Ensure there is one fewer input event before returning. |
| try: |
| expected = num_events_before - 1 |
| utils.poll_for_condition( |
| lambda: len(self._get_input_events()) == expected, |
| exception=error.TestError()) |
| except error.TestError as e: |
| logging.warning('Could not kill emulated %s!', device_name) |
| |
| self._emulated_device = None |
| |
| |
| def __enter__(self): |
| """Allow usage in 'with' statements.""" |
| return self |
| |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """Release resources on completion of a 'with' statement.""" |
| self.close() |