| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module provides functions to record input events.""" |
| |
| import logging |
| import re |
| import select |
| import subprocess |
| import threading |
| import time |
| |
| from linux_input import EV_MSC, EV_SYN, MSC_SCAN, SYN_REPORT |
| |
| |
| # Define extra misc events below as they are not defined in linux_input. |
| MSC_SCAN_BTN_LEFT = 90001 |
| MSC_SCAN_BTN_RIGHT = 90002 |
| MSC_SCAN_BTN_MIDDLE = 90003 |
| |
| |
| class InputEventRecorderError(Exception): |
| """An exception class for input_event_recorder module.""" |
| pass |
| |
| |
| class Event(object): |
| """An event class based on evtest constructed from an evtest event. |
| |
| An ordinary event looks like: |
| Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316 |
| |
| A SYN_REPORT event looks like: |
| Event: time 10788.289613, -------------- SYN_REPORT ------------ |
| |
| """ |
| |
| def __init__(self, type=0, code=0, value=0): |
| """Construction of an input event. |
| |
| @param type: the event type |
| @param code: the event code |
| @param value: the event value |
| |
| """ |
| self.type = type |
| self.code = code |
| self.value= value |
| |
| |
| @staticmethod |
| def from_string(ev_string): |
| """Convert an event string to an event object. |
| |
| @param ev_string: an event string. |
| |
| @returns: an event object if the event string conforms to |
| event pattern format. None otherwise. |
| |
| """ |
| # Get the pattern of an ordinary event |
| ev_pattern_time = r'Event:\s*time\s*(\d+\.\d+)' |
| ev_pattern_type = r'type\s*(\d+)\s*\(\w+\)' |
| ev_pattern_code = r'code\s*(\d+)\s*\(\w+\)' |
| ev_pattern_value = r'value\s*(-?\d+)' |
| ev_sep = r',\s*' |
| ev_pattern_str = ev_sep.join([ev_pattern_time, |
| ev_pattern_type, |
| ev_pattern_code, |
| ev_pattern_value]) |
| ev_pattern = re.compile(ev_pattern_str, re.I) |
| |
| # Get the pattern of the SYN_REPORT event |
| ev_pattern_type_SYN_REPORT = r'-+\s*SYN_REPORT\s-+' |
| ev_pattern_SYN_REPORT_str = ev_sep.join([ev_pattern_time, |
| ev_pattern_type_SYN_REPORT]) |
| ev_pattern_SYN_REPORT = re.compile(ev_pattern_SYN_REPORT_str, re.I) |
| |
| # Check if it is a SYN event. |
| result = ev_pattern_SYN_REPORT.search(ev_string) |
| if result: |
| return Event(EV_SYN, SYN_REPORT, 0) |
| else: |
| # Check if it is a regular event. |
| result = ev_pattern.search(ev_string) |
| if result: |
| ev_type = int(result.group(2)) |
| ev_code = int(result.group(3)) |
| ev_value = int(result.group(4)) |
| return Event(ev_type, ev_code, ev_value) |
| else: |
| logging.warn('not an event: %s', ev_string) |
| return None |
| |
| |
| def is_syn(self): |
| """Determine if the event is a SYN report event. |
| |
| @returns: True if it is a SYN report event. False otherwise. |
| |
| """ |
| return self.type == EV_SYN and self.code == SYN_REPORT |
| |
| |
| def value_tuple(self): |
| """A tuple of the event type, code, and value. |
| |
| @returns: the tuple of the event type, code, and value. |
| |
| """ |
| return (self.type, self.code, self.value) |
| |
| |
| def __eq__(self, other): |
| """determine if two events are equal. |
| |
| @param line: an event string line. |
| |
| @returns: True if two events are equal. False otherwise. |
| |
| """ |
| return (self.type == other.type and |
| self.code == other.code and |
| self.value == other.value) |
| |
| |
| def __str__(self): |
| """A string representation of the event. |
| |
| @returns: a string representation of the event. |
| |
| """ |
| return '%d %d %d' % (self.type, self.code, self.value) |
| |
| |
| class InputEventRecorder(object): |
| """An input event recorder. |
| |
| Refer to recording_example() below about how to record input events. |
| |
| """ |
| |
| INPUT_DEVICE_INFO_FILE = '/proc/bus/input/devices' |
| SELECT_TIMEOUT_SECS = 1 |
| |
| def __init__(self, device_name): |
| """Construction of input event recorder. |
| |
| @param device_name: the device name of the input device node to record. |
| |
| """ |
| self.device_name = device_name |
| self.device_node = self.get_device_node_by_name(device_name) |
| if self.device_node is None: |
| err_msg = 'Failed to find the device node of %s' % device_name |
| raise InputEventRecorderError(err_msg) |
| self._recording_thread = None |
| self._stop_recording_thread_event = threading.Event() |
| self.tmp_file = '/tmp/evtest.dat' |
| self.events = [] |
| |
| |
| def get_device_node_by_name(self, device_name): |
| """Get the input device node by name. |
| |
| Example of a RN-42 emulated mouse device information looks like |
| |
| I: Bus=0005 Vendor=0000 Product=0000 Version=0000 |
| N: Name="RNBT-A96F" |
| P: Phys=6c:29:95:1a:b8:18 |
| S: Sysfs=/devices/pci0000:00/0000:00:14.0/usb1/1-8/1-8:1.0/bluetooth/hci0/hci0:512:29/0005:0000:0000.0004/input/input15 |
| U: Uniq=00:06:66:75:a9:6f |
| H: Handlers=event12 |
| B: PROP=0 |
| B: EV=17 |
| B: KEY=70000 0 0 0 0 |
| B: REL=103 |
| B: MSC=10 |
| |
| @param device_name: the device name of the target input device node. |
| |
| @returns: the corresponding device node of the device. |
| |
| """ |
| device_node = None |
| device_found = None |
| device_pattern = re.compile('N: Name=.*%s' % device_name, re.I) |
| event_number_pattern = re.compile('H: Handlers=.*event(\d*)', re.I) |
| with open(self.INPUT_DEVICE_INFO_FILE) as info: |
| for line in info: |
| if device_found: |
| result = event_number_pattern.search(line) |
| if result: |
| event_number = int(result.group(1)) |
| device_node = '/dev/input/event%d' % event_number |
| break |
| else: |
| device_found = device_pattern.search(line) |
| return device_node |
| |
| |
| def record(self): |
| """Record input events.""" |
| logging.info('Recording input events of %s.', self.device_node) |
| cmd = 'evtest %s' % self.device_node |
| self._recorder = subprocess.Popen(cmd, stdout=subprocess.PIPE, |
| shell=True) |
| with open(self.tmp_file, 'w') as output_f: |
| while True: |
| read_list, _, _ = select.select( |
| [self._recorder.stdout], [], [], 1) |
| if read_list: |
| line = self._recorder.stdout.readline() |
| output_f.write(line) |
| ev = Event.from_string(line) |
| if ev: |
| self.events.append(ev.value_tuple()) |
| elif self._stop_recording_thread_event.is_set(): |
| self._stop_recording_thread_event.clear() |
| break |
| |
| |
| def start(self): |
| """Start the recording thread.""" |
| logging.info('Start recording thread.') |
| self._recording_thread = threading.Thread(target=self.record) |
| self._recording_thread.start() |
| |
| |
| def stop(self): |
| """Stop the recording thread.""" |
| logging.info('Stop recording thread.') |
| self._stop_recording_thread_event.set() |
| self._recording_thread.join() |
| |
| |
| def clear_events(self): |
| """Clear the event list.""" |
| self.events = [] |
| |
| |
| def get_events(self): |
| """Get the event list. |
| |
| @returns: the event list. |
| """ |
| return self.events |
| |
| |
| SYN_EVENT = Event(EV_SYN, SYN_REPORT, 0) |
| MSC_SCAN_BTN_EVENT = {'LEFT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_LEFT), |
| 'RIGHT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_RIGHT), |
| 'MIDDLE': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_MIDDLE)} |
| |
| |
| def recording_example(): |
| """Example code for capturing input events on a Samus. |
| |
| For a quick swipe, it outputs events in numeric format: |
| |
| (3, 57, 9) |
| (3, 53, 641) |
| (3, 54, 268) |
| (3, 58, 60) |
| (3, 48, 88) |
| (1, 330, 1) |
| (1, 325, 1) |
| (3, 0, 641) |
| (3, 1, 268) |
| (3, 24, 60) |
| (0, 0, 0) |
| (3, 53, 595) |
| (3, 54, 288) |
| (3, 0, 595) |
| (3, 1, 288) |
| (0, 0, 0) |
| (3, 57, -1) |
| (1, 330, 0) |
| (1, 325, 0) |
| (3, 24, 0) |
| (0, 0, 0) |
| |
| The above events in corresponding evtest text format are: |
| |
| Event: time .782950, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value 9 |
| Event: time .782950, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 641 |
| Event: time .782950, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 268 |
| Event: time .782950, type 3 (EV_ABS), code 58 (ABS_MT_PRESSURE), value 60 |
| Event: time .782950, type 3 (EV_ABS), code 59 (?), value 0 |
| Event: time .782950, type 3 (EV_ABS), code 48 (ABS_MT_TOUCH_MAJOR), value 88 |
| Event: time .782950, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 1 |
| Event: time .782950, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 1 |
| Event: time .782950, type 3 (EV_ABS), code 0 (ABS_X), value 641 |
| Event: time .782950, type 3 (EV_ABS), code 1 (ABS_Y), value 268 |
| Event: time .782950, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 60 |
| Event: time .782950, -------------- SYN_REPORT ------------ |
| Event: time .798273, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 595 |
| Event: time .798273, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 288 |
| Event: time .798273, type 3 (EV_ABS), code 0 (ABS_X), value 595 |
| Event: time .798273, type 3 (EV_ABS), code 1 (ABS_Y), value 288 |
| Event: time .798273, -------------- SYN_REPORT ------------ |
| Event: time .821437, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value -1 |
| Event: time .821437, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 0 |
| Event: time .821437, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 0 |
| Event: time .821437, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 0 |
| Event: time .821437, -------------- SYN_REPORT ------------ |
| """ |
| device_name = 'Atmel maXTouch Touchpad' |
| recorder = InputEventRecorder(device_name) |
| print 'Samus touchpad device name:', recorder.device_name |
| print 'Samus touchpad device node:', recorder.device_node |
| print 'Please make gestures on the touchpad for up to 5 seconds.' |
| recorder.clear_events() |
| recorder.start() |
| time.sleep(5) |
| recorder.stop() |
| for e in recorder.get_events(): |
| print e |
| |
| |
| if __name__ == '__main__': |
| recording_example() |