blob: f5f8b884487b75db8ce995128da68d6f392dabe3 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides MTB parser and related packet methods."""
import logging
import math
import os
import re
import sys
from linux_input import *
# Include some constants
execfile('', globals())
def make_pretty_packet(packet):
"""Convert the event list in a packet to a pretty format."""
pretty_packet = []
for event in packet:
pretty_event = []
pretty_event.append('time %.6f,' % event[EV_TIME])
if event.get(SYN_REPORT):
pretty_event.append('-------------- SYN_REPORT ------------\n')
ev_type = event[EV_TYPE]
pretty_event.append('type %d (%s),' % (ev_type, EV_TYPES[ev_type]))
ev_code = event[EV_CODE]
pretty_event.append('code %d (%s),' %
(ev_code, EV_STRINGS[ev_type][ev_code]))
pretty_event.append('value %d' % event[EV_VALUE])
pretty_packet.append(' '.join(pretty_event))
return '\n'.join(pretty_packet)
class MTB:
"""An MTB class providing MTB format related utility methods."""
def __init__(self, packets):
self.packets = packets
def _define_check_event_func_list(self):
"""Define event function lists for various event cycles below."""
self.check_event_func_list = {}
self.MAX_FINGERS = 5
# One-finger touching the pad should generate the following events:
# BTN_TOUCH, and BTN_TOOL_FINGER: 0 -> 1 -> 0
self.check_event_func_list[1] = [self._is_BTN_TOUCH,
# Two-finger touching the pad should generate the following events:
# BTN_TOUCH, and BTN_TOOL_DOUBLETAP: 0 -> 1 -> 0
self.check_event_func_list[2] = [self._is_BTN_TOUCH,
# Three-finger touching the pad should generate the following events:
# BTN_TOUCH, and BTN_TOOL_TRIPLETAP: 0 -> 1 -> 0
self.check_event_func_list[3] = [self._is_BTN_TOUCH,
# Four-finger touching the pad should generate the following events:
# BTN_TOUCH, and BTN_TOOL_QUADTAP: 0 -> 1 -> 0
self.check_event_func_list[4] = [self._is_BTN_TOUCH,
# Five-finger touching the pad should generate the following events:
# BTN_TOUCH, and BTN_TOOL_QUINTTAP: 0 -> 1 -> 0
self.check_event_func_list[5] = [self._is_BTN_TOUCH,
# Physical click should generate the following events:
# BTN_LEFT: 0 -> 1 -> 0
self.check_event_func_click = [self._is_BTN_LEFT,]
def _is_ABS_MT_TRACKING_ID(self, event):
"""Is this event ABS_MT_TRACKING_ID?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
def _is_new_contact(self, event):
"""Is this packet generating new contact (Tracking ID)?"""
return self._is_ABS_MT_TRACKING_ID(event) and event[EV_VALUE] != -1
def _is_finger_leaving(self, event):
"""Is the finger is leaving in this packet?"""
return self._is_ABS_MT_TRACKING_ID(event) and event[EV_VALUE] == -1
def _is_ABS_MT_SLOT(self, event):
"""Is this packet ABS_MT_SLOT?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
event[EV_CODE] == ABS_MT_SLOT)
def _is_ABS_MT_POSITION_X(self, event):
"""Is this packet ABS_MT_POSITION_X?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
def _is_ABS_MT_POSITION_Y(self, event):
"""Is this packet ABS_MT_POSITION_Y?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
def _is_EV_KEY(self, event):
"""Is this an EV_KEY event?"""
return (not event.get(SYN_REPORT) and event[EV_TYPE] == EV_KEY)
def _is_BTN_LEFT(self, event):
"""Is this event BTN_LEFT?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_LEFT)
def _is_BTN_TOOL_FINGER(self, event):
"""Is this event BTN_TOOL_FINGER?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOOL_FINGER)
def _is_BTN_TOOL_DOUBLETAP(self, event):
"""Is this event BTN_TOOL_DOUBLETAP?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOOL_DOUBLETAP)
def _is_BTN_TOOL_TRIPLETAP(self, event):
"""Is this event BTN_TOOL_TRIPLETAP?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOOL_TRIPLETAP)
def _is_BTN_TOOL_QUADTAP(self, event):
"""Is this event BTN_TOOL_QUADTAP?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOOL_QUADTAP)
def _is_BTN_TOOL_QUINTTAP(self, event):
"""Is this event BTN_TOOL_QUINTTAP?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOOL_QUINTTAP)
def _is_BTN_TOUCH(self, event):
"""Is this event BTN_TOUCH?"""
return (self._is_EV_KEY(event) and event[EV_CODE] == BTN_TOUCH)
def _calc_movement_for_axis(self, x, prev_x):
"""Calculate the distance moved in an axis."""
return abs(x - prev_x) if prev_x is not None else 0
def _calc_distance(self, (x0, y0), (x1, y1)):
"""Calculate the distance between two points."""
dist_x = x1 - x0
dist_y = y1 - y0
return math.sqrt(dist_x * dist_x + dist_y * dist_y)
def _init_dict(self, keys, value):
"""Initialize a dictionary over the keys with the same given value.
Note: The following command does not always work:
dict.fromkeys(keys, value)
It works when value is a simple type, e.g., an integer.
However, if value is [] or {}, it does not work correctly.
The reason is that if the value is [] or {}, all the keys would
point to the same list or dictionary, which is not expected
in most cases.
return dict([(key, value) for key in keys])
def get_number_contacts(self):
"""Get the number of contacts (Tracking IDs)."""
num_contacts = 0
for packet in self.packets:
for event in packet:
if self._is_new_contact(event):
num_contacts += 1
return num_contacts
def get_x_y(self, target_slot):
"""Extract x and y positions in the target slot."""
# The default slot is slot 0 if no slot number is assigned.
# The rationale is that evdev is a state machine. It only reports
# the change. Slot 0 would not be reported by evdev if last time
# the last finger left the pad was at slot 0.
slot = 0
# Should not write "list_x = list_y = []" below.
# They would end up with pointing to the same list.
list_x = []
list_y = []
prev_x = prev_y = None
target_slot_live = False
initial_default_slot_0 = True
for packet in self.packets:
if (slot == target_slot and slot == 0 and not target_slot_live and
target_slot_live = True
initial_default_slot_0 = False
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
if slot == target_slot and not target_slot_live:
target_slot_live = True
if slot != target_slot:
# Update x value if available.
if self._is_ABS_MT_POSITION_X(event):
prev_x = event[EV_VALUE]
# Update y value if available.
elif self._is_ABS_MT_POSITION_Y(event):
prev_y = event[EV_VALUE]
# Check if the finger at the target_slot is leaving.
elif self._is_finger_leaving(event):
target_slot_live = False
# If target_slot is alive, and both x and y have
# been assigned values, append the x and y to the list no matter
# whether x or y position is reported in the current packet.
# This also handles the initial condition that no previous x or y
# is reported yet.
if target_slot_live and prev_x and prev_y:
return (list_x, list_y)
def get_points_for_every_tracking_id(self):
"""Extract points in every tracking id.
This method is applicable when fingers are contacting and leaving
the touchpad continuously. The same slot number, e.g., slot 0 or
slot 1, may be used for multiple times.
# The default slot is slot 0 if no slot number is assigned.
slot = 0
# points is a dictionary of lists, where each list holds all of
# the points in a tracking id.
points = {}
tracking_ids_all = []
tracking_ids_live = []
slot_to_tracking_id = {}
tracking_id_to_slot = {}
x = {}
y = {}
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
# Find a new tracking ID
if self._is_new_contact(event):
tracking_id = event[EV_VALUE]
points[tracking_id] = []
tracking_id_to_slot[tracking_id] = slot
slot_to_tracking_id[slot] = tracking_id
x[tracking_id] = None
y[tracking_id] = None
# A tracking ID is leaving.
elif self._is_finger_leaving(event):
leaving_tracking_id = slot_to_tracking_id[slot]
del slot_to_tracking_id[slot]
# Update x value if available.
elif self._is_ABS_MT_POSITION_X(event):
x[slot_to_tracking_id[slot]] = event[EV_VALUE]
# Update y value if available.
elif self._is_ABS_MT_POSITION_Y(event):
y[slot_to_tracking_id[slot]] = event[EV_VALUE]
for tracking_id in tracking_ids_live:
if x[tracking_id] and y[tracking_id]:
points[tracking_id].append((x[tracking_id], y[tracking_id]))
return points
def _calc_farthest_distance(self, points):
"""Calculate the farthest distance of points."""
return max([self._calc_distance(point, points[0]) for point in points])
def get_max_distance_of_all_tracking_ids(self):
"""Get the max moving distance of all tracking IDs."""
points = self.get_points_for_every_tracking_id()
max_distance = float('-infinity')
for tracking_id in sorted(points.keys()):
distance = self._calc_farthest_distance(points[tracking_id])
max_distance = max(max_distance, distance)
return max_distance
def get_x_y_multiple_slots(self, target_slots):
"""Extract points in multiple slots.
Only the packets with all specified slots are extracted.
This is useful to collect packets for pinch to zoom.
# Initialize slot_exists dictionary to False
slot_exists = dict.fromkeys(target_slots, False)
# Set the initial slot number to 0 because evdev is a state machine,
# and may not present slot 0.
slot = 0
# Initialze the following dict to []
# Don't use "dict.fromkeys(target_slots, [])"
list_x = self._init_dict(target_slots, [])
list_y = self._init_dict(target_slots, [])
x = self._init_dict(target_slots, None)
y = self._init_dict(target_slots, None)
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
if slot not in target_slots:
if self._is_ABS_MT_TRACKING_ID(event):
if self._is_new_contact(event):
slot_exists[slot] = True
elif self._is_finger_leaving(event):
slot_exists[slot] = False
elif self._is_ABS_MT_POSITION_X(event):
x[slot] = event[EV_VALUE]
elif self._is_ABS_MT_POSITION_Y(event):
y[slot] = event[EV_VALUE]
# Note:
# - All slot_exists must be True to append x, y positions for the
# slots.
# - All x and y values for all slots must have been reported once.
# (This handles the initial condition that no previous x or y
# is reported yet.)
# - If either x or y positions are reported in the current packet,
# append x and y to the list of that slot.
# (This handles the condition that only x or y is reported.)
# - Even in the case that neither x nor y is reported in current
# packet, cmt driver constructs and passes hwstate to gestures.
if (all(slot_exists.values()) and all(x.values()) and
for slot in target_slots:
return (list_x, list_y)
def get_points_multiple_slots(self, target_slots):
"""Get the points in multiple slots."""
list_x, list_y = self.get_x_y_multiple_slots(target_slots)
points_list = [zip(list_x[slot], list_y[slot]) for slot in target_slots]
points_dict = dict(zip(target_slots, points_list))
return points_dict
def get_relative_motion(self, target_slots):
"""Get the relative motion of the two target slots."""
# The slots in target_slots could be (0, 1), (1, 2) or other
# possibilities.
slot_a, slot_b = target_slots
points_dict = self.get_points_multiple_slots(target_slots)
points_slot_a = points_dict[slot_a]
points_slot_b = points_dict[slot_b]
# if only 0 or 1 point observed, the relative motion is 0.
if len(points_slot_a) <= 1 or len(points_slot_b) <= 1:
return 0
distance_begin = self._calc_distance(points_slot_a[0], points_slot_b[0])
distance_end = self._calc_distance(points_slot_a[-1], points_slot_b[-1])
relative_motion = distance_end - distance_begin
return relative_motion
def get_points(self, target_slot):
"""Get the points in the target slot."""
list_x, list_y = self.get_x_y(target_slot)
return zip(list_x, list_y)
def get_distances(self, target_slot):
"""Get the distances of neighbor points in the target slot."""
points = self.get_points(target_slot)
distances = []
for index in range(len(points) - 1):
distance = self._calc_distance(points[index], points[index + 1])
return distances
def get_distances_with_first_point(self, target_slot):
"""Get distances of the points in the target_slot with first point."""
points = self.get_points(target_slot)
point0 = points[0]
distances = [self._calc_distance(point, point0) for point in points]
return distances
def get_range(self):
"""Get the min and max values of (x, y) positions."""
min_x = min_y = float('infinity')
max_x = max_y = float('-infinity')
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_POSITION_X(event):
x = event[EV_VALUE]
min_x = min(min_x, x)
max_x = max(max_x, x)
elif self._is_ABS_MT_POSITION_Y(event):
y = event[EV_VALUE]
min_y = min(min_y, y)
max_y = max(max_y, y)
return (min_x, max_x, min_y, max_y)
def get_total_motion(self, target_slot):
"""Get the total motion in the target slot."""
prev_x = prev_y = None
accu_x = accu_y = 0
slot = None
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
elif self._is_ABS_MT_POSITION_X(event) and slot == target_slot:
x = event[EV_VALUE]
accu_x += self._calc_movement_for_axis(x, prev_x)
prev_x = x
elif self._is_ABS_MT_POSITION_Y(event) and slot == target_slot:
y = event[EV_VALUE]
accu_y += self._calc_movement_for_axis(y, prev_y)
prev_y = y
return (accu_x, accu_y)
def get_largest_distance(self, target_slot):
"""Get the largest distance of point to the first point."""
distances = self.get_distances_with_first_point(target_slot)
return max(distances)
def get_largest_gap_ratio(self, target_slot):
"""Get the largest gap ratio in the target slot."""
gaps = self.get_distances(target_slot)
gap_ratios = []
for index in range(1, len(gaps) - 1):
prev_gap = max(gaps[index - 1], 1)
curr_gap = gaps[index]
next_gap = max(gaps[index + 1], 1)
gap_ratios.append(2.0 * curr_gap / (prev_gap + next_gap))
largest_gap_ratio = max(gap_ratios) if gap_ratios else 0
return largest_gap_ratio
def get_displacement(self, target_slot):
"""Get the displacement in the target slot."""
displace = [map(lambda p0, p1: p1 - p0, axis[:len(axis) - 1], axis[1:])
for axis in self.get_x_y(target_slot)]
displacement_dict = dict(zip((X, Y), displace))
return displacement_dict
def get_reversed_motions(self, target_slot, direction):
"""Get the total reversed motions in the specified direction
in the target slot.
If direction is HORIZONTAL, consider only x axis.
If direction is VERTICAL, consider only y axis.
If direction is DIAGONAL, consider both x and y axes.
Assume that a list of displacement in some axis looks like
[10, 12, 8, -9, -2, 6, 8, 11, 12, 5, 2]
The number of positive displacements, which is 9, is greater than
the number of negative displacements, which is 2. In this case
(-9) + (-2) = -11 is the total reversed motion in this list.
Should the number of positive items be equal to the number of negative
items, we assume that the one with smaller sum is the reversed motion.
Take this list [10, 22, -9, -2, -3, 10] for example. The numbers of
items for both positive/negative displacements are the same. However,
the sum of negative values is smaller. Hence, the reversed motion is
the sum of negative values.
check_axes = {HORIZONTAL: (X,), VERTICAL: (Y,), DIAGONAL: (X, Y)}
displacement_dict = self.get_displacement(target_slot)
reversed_motions = {}
func_positive = lambda n: n > 0
func_negative = lambda n: n < 0
for axis in check_axes[direction]:
displacement = displacement_dict[axis]
list_positive = filter(func_positive, displacement)
list_negative = filter(func_negative, displacement)
num_positive = len(list_positive)
num_negative = len(list_negative)
sum_positive = sum(list_positive)
sum_negative = sum(list_negative)
if num_positive > num_negative:
reversed_motions[axis] = sum_negative
elif num_positive < num_negative:
reversed_motions[axis] = sum_positive
# Handle the very rare case below when num_positive == num_negative
elif sum_positive >= sum_negative:
reversed_motions[axis] = sum_negative
elif sum_positive < sum_negative:
reversed_motions[axis] = sum_positive
return reversed_motions
def get_num_packets(self, target_slot):
"""Get the number of packets in the target slot."""
list_x, list_y = self.get_x_y(target_slot)
return len(list_x)
def _call_check_event_func(self, event, expected_value, check_event_result,
"""Call all functions in check_event_func and return the results.
Note that since check_event_result is a dictionary, it is passed
by reference.
for func in check_event_func:
if func(event):
check_event_result[func] = (event[EV_VALUE] == expected_value)
def _get_event_cycles(self, check_event_func):
"""A generic method to get the number of event cycles.
For a tap, its event cycle looks like:
(1) finger touching the pad:
BTN_TOUCH: 0 -> 1
(2) finger leaving the pad:
BTN_TOUCH: 1 -> 0
For a one-finger physical click, its event cycle looks like:
(1) finger clicking and pressing:
BTN_LEFT : 0-> 1
BTN_TOUCH: 0 -> 1
(2) finger leaving:
BTN_LEFT : 1-> 0
BTN_TOUCH: 1 -> 0
This method counts how many such cycles there are in the packets.
# Initialize all check_event_result to False
# when all_events_observed is False and all check_event_result are True
# => all_events_observed is set to True
# when all_events_observed is True and all check_event_result are True
# => all_events_observed is set to False, and
# count is increased by 1
check_event_result = self._init_dict(check_event_func, False)
all_events_observed = False
count = 0
for packet in self.packets:
for event in packet:
if all_events_observed:
expected_value = 0
self._call_check_event_func(event, expected_value,
if all(check_event_result.values()):
all_events_observed = False
check_event_result = self._init_dict(check_event_func,
count += 1
expected_value = 1
self._call_check_event_func(event, expected_value,
if all(check_event_result.values()):
all_events_observed = True
check_event_result = self._init_dict(check_event_func,
return count
def _get_event_cycles_for_num_fingers(self, num_fingers):
return self._get_event_cycles(self.check_event_func_list[num_fingers])
def verify_exact_number_fingers_touch(self, num_fingers):
"""Verify the exact number of fingers touching the pad.
Example: for a two-finger touch
2-finger touch cycles should be equal to 1
3/4/5-finger touch cycles should be equal to 0
Don't care about 1-finger touch cycles which is not deterministic.
range_fingers = range(1, self.MAX_FINGERS)
flag_check = self._init_dict(range_fingers, True)
for f in range_fingers:
cycles = self._get_event_cycles_for_num_fingers(f)
if f == num_fingers:
flag_check[f] = cycles == 1
elif f > num_fingers:
flag_check[f] = cycles == 0
return all(flag_check)
def get_physical_clicks(self, num_fingers):
"""Get the count of physical clicks for the given number of fingers."""
flag_fingers_touch = self.verify_exact_number_fingers_touch(num_fingers)
click_cycles = self._get_event_cycles(self.check_event_func_click)
return click_cycles if flag_fingers_touch else 0
class MTBParser:
"""Touchpad MTB event Parser."""
def __init__(self):
def _get_event_re_patt(self):
"""Construct the regular expression search pattern of MTB events.
An ordinary event looks like
Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316
A SYN_REPORT event looks like
Event: time 10788.289613, -------------- SYN_REPORT ------------
# Get the pattern of an ordinary event
event_patt_time = 'Event:\s*time\s*(\d+\.\d+)'
event_patt_type = 'type\s*(\d+)\s*\(\w+\)'
event_patt_code = 'code\s*(\d+)\s*\(\w+\)'
event_patt_value = 'value\s*(-?\d+)'
event_sep = ',\s*'
event_patt = event_sep.join([event_patt_time,
self.event_re_patt = re.compile(event_patt, re.I)
# Get the pattern of the SYN_REPORT event
event_patt_type_SYN_REPORT = '-+\s*SYN_REPORT\s-+'
event_patt_SYN_REPORT = event_sep.join([event_patt_time,
self.event_re_patt_SYN_REPORT = re.compile(event_patt_SYN_REPORT, re.I)
def _get_event_dict_ordinary(self, line):
"""Construct the event dictionary for an ordinary event."""
result =
ev_dict = {}
if result is not None:
ev_dict[EV_TIME] = float(
ev_dict[EV_TYPE] = int(
ev_dict[EV_CODE] = int(
ev_dict[EV_VALUE] = int(
return ev_dict
def _get_event_dict_SYN_REPORT(self, line):
"""Construct the event dictionary for a SYN_REPORT event."""
result =
ev_dict = {}
if result is not None:
ev_dict[EV_TIME] = float(
ev_dict[SYN_REPORT] = True
return ev_dict
def _get_event_dict(self, line):
"""Construct the event dictionary."""
EVENT_FUNC_LIST = [self._get_event_dict_ordinary,
for get_event_func in EVENT_FUNC_LIST:
ev_dict = get_event_func(line)
if ev_dict:
return ev_dict
return False
def _is_SYN_REPORT(self, ev_dict):
"""Determine if this event is SYN_REPORT."""
return ev_dict.get(SYN_REPORT, False)
def parse(self, raw_event):
"""Parse the raw event string into a list of event dictionary."""
ev_list = []
packets = []
start_flag = False
for line in raw_event:
ev_dict = self._get_event_dict(line)
if ev_dict:
start_flag = True
if self._is_SYN_REPORT(ev_dict):
ev_list = []
elif start_flag:
logging.warn(' Warn: format problem in event:\n %s' % line)
return packets
def parse_file(self, file_name):
"""Parse raw device events in the given file name."""
packets = None
if os.path.isfile(file_name):
with open(file_name) as f:
packets = self.parse(f)
return packets
if __name__ == '__main__':
# Read a device file, and convert it to pretty packet format.
if len(sys.argv) != 2 or not os.path.exists(sys.argv[1]):
print 'Usage: %s device_file' % sys.argv[0]
with open(sys.argv[1]) as event_file:
packets = MTBParser().parse(event_file)
for packet in packets:
print make_pretty_packet(packet)