blob: 32e9b98fd2a15a56628ecfc862ccf5377e973837 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides MTB parser and related packet methods."""
import copy
import logging
import math
import os
import re
import sys
from collections import defaultdict, namedtuple, OrderedDict
from firmware_constants import AXIS, GV, MTB, UNIT, VAL
from geometry.elements import Point
from geometry.two_farthest_clusters import (
get_radii_of_two_minimal_enclosing_circles as get_two_min_radii,
from linux_input import *
# Define TidPacket to keep the point, pressure, and SYN_REPOT time of a packet.
TidPacket = namedtuple('TidPacket', ['syn_time', 'point', 'pressure'])
# Define FingerPath class to keep track of the slot, and a list of tid packets
# of a finger (i.e., a tracking ID).
class FingerPath(namedtuple('FingerPath', ['slot', 'tid_packets'])):
"""This keeps the slot number and the list of tid packets of a finger."""
__slots__ = ()
def get(self, attr):
"""Get the list of the specified attribute, attr (i.e., point,
pressure, or syn_time), of TidPacket for the finger.
return [getattr(tid_packet, attr) for tid_packet in self.tid_packets]
def get_mtb_packets_from_file(event_file):
""" A helper function to get mtb packets by parsing the event file.
@param event_file: an mtb_event file
return Mtb(packets=MtbParser().parse_file(event_file))
def make_pretty_packet(packet):
"""Convert the event list in a packet to a pretty format."""
pretty_packet = []
for event in packet:
pretty_event = []
pretty_event.append('time %.6f,' % event[MTB.EV_TIME])
if event.get(MTB.SYN_REPORT):
pretty_event.append('-------------- SYN_REPORT ------------\n')
ev_type = event[MTB.EV_TYPE]
pretty_event.append('type %d (%s),' % (ev_type, EV_TYPES[ev_type]))
ev_code = event[MTB.EV_CODE]
pretty_event.append('code %d (%s),' %
(ev_code, EV_STRINGS[ev_type][ev_code]))
pretty_event.append('value %d' % event[MTB.EV_VALUE])
pretty_packet.append(' '.join(pretty_event))
return '\n'.join(pretty_packet)
def convert_to_evemu_format(packets):
"""Convert the text event format to the evemu format."""
evemu_output = []
evemu_format = 'E: %.6f %04x %04x %d'
evemu_format_syn_report = 'E: %.6f 0000 0000 0'
for packet in packets:
for event in packet:
if event.get(MTB.SYN_REPORT):
evemu_event = evemu_format_syn_report % event[MTB.EV_TIME]
evemu_event = evemu_format % (event[MTB.EV_TIME],
return evemu_output
def convert_mtplot_file_to_evemu_file(mtplot_file, evemu_dir=None,
evemu_ext='.evemu', force=False):
"""Convert a mtplot event file to an evemu event file.
'one_finger_swipe.dat' is converted to 'one_finger_swipe.evemu.dat'
if not os.path.isfile(mtplot_file):
print 'Error: there is no such file: "%s".' % mtplot_file
return None
# Convert mtplot event format to evemu event format.
mtplot_packets = MtbParser().parse_file(mtplot_file)
evemu_packets = convert_to_evemu_format(mtplot_packets)
# Create the evemu file from the mtplot file.
mtplot_dir, mtplot_filename = os.path.split(mtplot_file)
mtplot_basename, mtplot_ext = os.path.splitext(mtplot_filename)
evemu_dir = evemu_dir if evemu_dir else mtplot_dir
evemu_file = (os.path.join(evemu_dir, mtplot_basename) + evemu_ext +
# Make sure that the file to be created does not exist yet unless force flag
# is set to be True.
if os.path.isfile(evemu_file) and not force:
print 'Warning: the "%s" already exists. Quit.' % evemu_file
return None
# Write the converted evemu events to the evemu file.
with open(evemu_file, 'w') as evemu_f:
except Exception as e:
print 'Error: cannot write data to %s' % evemu_file
return None
return evemu_file
class MtbEvent:
"""Determine what an MTB event is.
This class is just a bundle of a variety of classmethods about
MTB event classification.
def is_ABS_MT_TRACKING_ID(cls, event):
"""Is this event ABS_MT_TRACKING_ID?"""
return (not event.get(MTB.SYN_REPORT) and
event[MTB.EV_TYPE] == EV_ABS and
def is_new_contact(cls, event):
"""Is this packet generating new contact (Tracking ID)?"""
return cls.is_ABS_MT_TRACKING_ID(event) and event[MTB.EV_VALUE] != -1
def is_finger_leaving(cls, event):
"""Is the finger is leaving in this packet?"""
return cls.is_ABS_MT_TRACKING_ID(event) and event[MTB.EV_VALUE] == -1
def is_ABS_MT_SLOT(cls, event):
"""Is this packet ABS_MT_SLOT?"""
return (not event.get(MTB.SYN_REPORT) and
event[MTB.EV_TYPE] == EV_ABS and
def is_ABS_MT_POSITION_X(cls, event):
"""Is this packet ABS_MT_POSITION_X?"""
return (not event.get(MTB.SYN_REPORT) and
event[MTB.EV_TYPE] == EV_ABS and
def is_ABS_MT_POSITION_Y(cls, event):
"""Is this packet ABS_MT_POSITION_Y?"""
return (not event.get(MTB.SYN_REPORT) and
event[MTB.EV_TYPE] == EV_ABS and
def is_ABS_MT_PRESSURE(self, event):
"""Is this packet ABS_MT_PRESSURE?"""
return (not event.get(MTB.SYN_REPORT) and
event[MTB.EV_TYPE] == EV_ABS and
def is_EV_KEY(cls, event):
"""Is this an EV_KEY event?"""
return (not event.get(MTB.SYN_REPORT) and event[MTB.EV_TYPE] == EV_KEY)
def is_BTN_LEFT(cls, event):
"""Is this event BTN_LEFT?"""
return (cls.is_EV_KEY(event) and event[MTB.EV_CODE] == BTN_LEFT)
def is_BTN_LEFT_value(cls, event, value):
"""Is this event BTN_LEFT with value equal to the specified value?"""
return (cls.is_BTN_LEFT(event) and event[MTB.EV_VALUE] == value)
def is_BTN_TOOL_FINGER(cls, event):
"""Is this event BTN_TOOL_FINGER?"""
return (cls.is_EV_KEY(event) and
def is_BTN_TOOL_DOUBLETAP(cls, event):
"""Is this event BTN_TOOL_DOUBLETAP?"""
return (cls.is_EV_KEY(event) and
def is_BTN_TOOL_TRIPLETAP(cls, event):
"""Is this event BTN_TOOL_TRIPLETAP?"""
return (cls.is_EV_KEY(event) and
def is_BTN_TOOL_QUADTAP(cls, event):
"""Is this event BTN_TOOL_QUADTAP?"""
return (cls.is_EV_KEY(event) and
def is_BTN_TOOL_QUINTTAP(cls, event):
"""Is this event BTN_TOOL_QUINTTAP?"""
return (cls.is_EV_KEY(event) and
def is_BTN_TOUCH(cls, event):
"""Is this event BTN_TOUCH?"""
return (cls.is_EV_KEY(event) and
def is_SYN_REPORT(self, event):
"""Determine if this event is SYN_REPORT."""
return event.get(MTB.SYN_REPORT, False)
class MtbEvemu:
"""A simplified class provides MTB utilities for evemu event format."""
def __init__(self, device):
self.mtb = Mtb(device=device)
self.num_tracking_ids = 0
def _convert_event(self, event):
(tv_sec, tv_usec, ev_type, ev_code, ev_value) = event
ev_dict = {MTB.EV_TIME: tv_sec + 0.000001 * tv_usec,
MTB.EV_TYPE: ev_type,
MTB.EV_CODE: ev_code,
MTB.EV_VALUE: ev_value}
return ev_dict
def all_fingers_leaving(self):
"""Is there no finger on the touch device?"""
return self.num_tracking_ids <= 0
def process_event(self, event):
"""Process the event and count existing fingers."""
converted_event = self._convert_event(event)
if MtbEvent.is_new_contact(converted_event):
self.num_tracking_ids += 1
elif MtbEvent.is_finger_leaving(converted_event):
self.num_tracking_ids -= 1
class MtbStateMachine:
"""The state machine for MTB events.
It traces the slots, tracking IDs, x coordinates, y coordinates, etc. If
these values are not changed explicitly, the values are kept across events.
Note that the kernel driver only reports what is changed. Due to its
internal state machine, it is possible that either x or y in
self.point[tid] is None initially even the instance has been created.
def __init__(self):
# Set the default slot to 0 as it may not be displayed in the MTB events
# Some abnormal event files may not display the tracking ID in the
# beginning. To handle this situation, we need to initialize
# the following variables: slot_to_tid, point
# As an example, refer to the following event file which is one of
# the golden samples with this problem.
# tests/data/stationary_finger_shift_with_2nd_finger_tap.dat
self.tid = None
self.slot = 0
self.slot_to_tid = {self.slot: self.tid}
self.point = {self.tid: Point()}
self.pressure = {self.tid: None}
self.syn_time = None
self.new_tid = False
self.number_fingers = 0
self.leaving_slots = []
def _del_leaving_slots(self):
"""Delete the leaving slots. Remove the slots and their tracking IDs."""
for slot in self.leaving_slots:
del self.slot_to_tid[slot]
self.number_fingers -= 1
self.leaving_slots = []
def add_event(self, event):
"""Update the internal states with the event.
@param event: an MTB event
self.new_tid = False
# Switch the slot.
if MtbEvent.is_ABS_MT_SLOT(event):
self.slot = event[MTB.EV_VALUE]
# Get a new tracking ID.
elif MtbEvent.is_new_contact(event):
self.tid = event[MTB.EV_VALUE]
self.slot_to_tid[self.slot] = self.tid
self.new_tid = True
self.point[self.tid] = Point()
self.number_fingers += 1
# A slot is leaving.
# Do not delete this slot until this last packet is reported.
elif MtbEvent.is_finger_leaving(event):
# Update x value.
elif MtbEvent.is_ABS_MT_POSITION_X(event):
self.point[self.slot_to_tid[self.slot]].x = event[MTB.EV_VALUE]
# Update y value.
elif MtbEvent.is_ABS_MT_POSITION_Y(event):
self.point[self.slot_to_tid[self.slot]].y = event[MTB.EV_VALUE]
# Update z value (pressure)
elif MtbEvent.is_ABS_MT_PRESSURE(event):
self.pressure[self.slot_to_tid[self.slot]] = event[MTB.EV_VALUE]
# Use the SYN_REPORT time as the packet time
elif MtbEvent.is_SYN_REPORT(event):
self.syn_time = event[MTB.EV_TIME]
def get_current_tid_data_for_all_tids(self, request_data_ready=True):
"""Get current packet's tid data including the point, the pressure, and
the syn_time for all tids.
@param request_data_ready: if set to true, it will not output
current_tid_data until all data including x, y, pressure,
syn_time, etc. in the packet have been assigned.
current_tid_data = []
for slot, tid in self.slot_to_tid.items():
point = copy.deepcopy(self.point.get(tid))
pressure = self.pressure.get(tid)
# Check if all attributes are non-None values.
# Note: we cannot use
# all([all(point.value()), pressure, self.syn_time])
# E.g., for a point = (0, 300), it will return False
# which is not what we want. We want it to return False
# only when there are None values.
data_ready = all(map(lambda e: e is not None,
list(point.value()) + [pressure, self.syn_time]))
if (not request_data_ready) or data_ready:
tid_packet = TidPacket(self.syn_time, point, pressure)
tid_packet = None
# Even tid_packet is None, we would like to report this tid so that
# its client function get_ordered_finger_paths() could construct
# an ordered dictionary correctly based on the tracking ID.
current_tid_data.append((tid, slot, tid_packet))
return sorted(current_tid_data)
class Mtb:
"""An MTB class providing MTB format related utility methods."""
def __init__(self, device=None, packets=None):
self.device = device
self.packets = packets
def _define_check_event_func_list(self):
"""Define event function lists for various event cycles below."""
self.check_event_func_list = {}
self.MAX_FINGERS = 5
# One-finger touching the device should generate the following events:
# BTN_TOUCH, and BTN_TOOL_FINGER: 0 -> 1 -> 0
self.check_event_func_list[1] = [MtbEvent.is_BTN_TOUCH,
# Two-finger touching the device should generate the following events:
# BTN_TOUCH, and BTN_TOOL_DOUBLETAP: 0 -> 1 -> 0
self.check_event_func_list[2] = [MtbEvent.is_BTN_TOUCH,
# Three-finger touching the device should generate the following events:
# BTN_TOUCH, and BTN_TOOL_TRIPLETAP: 0 -> 1 -> 0
self.check_event_func_list[3] = [MtbEvent.is_BTN_TOUCH,
# Four-finger touching the device should generate the following events:
# BTN_TOUCH, and BTN_TOOL_QUADTAP: 0 -> 1 -> 0
self.check_event_func_list[4] = [MtbEvent.is_BTN_TOUCH,
# Five-finger touching the device should generate the following events:
# BTN_TOUCH, and BTN_TOOL_QUINTTAP: 0 -> 1 -> 0
self.check_event_func_list[5] = [MtbEvent.is_BTN_TOUCH,
# Physical click should generate the following events:
# BTN_LEFT: 0 -> 1 -> 0
self.check_event_func_click = [MtbEvent.is_BTN_LEFT,]
def _calc_movement_for_axis(self, x, prev_x):
"""Calculate the distance moved in an axis."""
return abs(x - prev_x) if prev_x is not None else 0
def _calc_distance(self, (x0, y0), (x1, y1)):
"""Calculate the distance in pixels between two points."""
dist_x = x1 - x0
dist_y = y1 - y0
return math.sqrt(dist_x * dist_x + dist_y * dist_y)
def _init_dict(self, keys, value):
"""Initialize a dictionary over the keys with the same given value.
Note: The following command does not always work:
dict.fromkeys(keys, value)
It works when value is a simple type, e.g., an integer.
However, if value is [] or {}, it does not work correctly.
The reason is that if the value is [] or {}, all the keys would
point to the same list or dictionary, which is not expected
in most cases.
return dict([(key, copy.deepcopy(value)) for key in keys])
def get_number_contacts(self):
"""Get the number of contacts (Tracking IDs)."""
num_contacts = 0
for packet in self.packets:
for event in packet:
if MtbEvent.is_new_contact(event):
num_contacts += 1
return num_contacts
def get_x_y(self, target_slot):
"""Extract x and y positions in the target slot."""
# The default slot is slot 0 if no slot number is assigned.
# The rationale is that evdev is a state machine. It only reports
# the change. Slot 0 would not be reported by evdev if last time
# the last finger left the touch device was at slot 0.
slot = 0
# Should not write "list_x = list_y = []" below.
# They would end up with pointing to the same list.
list_x = []
list_y = []
prev_x = prev_y = None
target_slot_live = False
initial_default_slot_0 = True
for packet in self.packets:
if (slot == target_slot and slot == 0 and not target_slot_live and
target_slot_live = True
initial_default_slot_0 = False
for event in packet:
if MtbEvent.is_ABS_MT_SLOT(event):
slot = event[MTB.EV_VALUE]
if slot == target_slot and not target_slot_live:
target_slot_live = True
if slot != target_slot:
# Update x value if available.
if MtbEvent.is_ABS_MT_POSITION_X(event):
prev_x = event[MTB.EV_VALUE]
# Update y value if available.
elif MtbEvent.is_ABS_MT_POSITION_Y(event):
prev_y = event[MTB.EV_VALUE]
# Check if the finger at the target_slot is leaving.
elif MtbEvent.is_finger_leaving(event):
target_slot_live = False
# If target_slot is alive, and both x and y have
# been assigned values, append the x and y to the list no matter
# whether x or y position is reported in the current packet.
# This also handles the initial condition that no previous x or y
# is reported yet.
if target_slot_live and prev_x and prev_y:
return (list_x, list_y)
def get_ordered_finger_paths(self, request_data_ready=True):
"""Construct the finger paths ordered by the occurrences of the
finger contacts.
@param request_data_ready: if set to true, it will not output the
tid_data in a packet until all data including x, y, pressure,
syn_time, etc. in the packet have been assigned.
The finger_paths mapping the tid to its finger_path looks like
{tid1: finger_path1,
tid2: finger_path2,
where every tid represents a finger.
A finger_path effectively consists of a list of tid_packets of the same
tid in the event file. An example of its structure looks like
tid_packets = [tid_packet0, tid_packet1, tid_packet2, ...]
A tid_packet looks like
[100021.342104, # syn_time
(66, 100), # point
56, # pressure
... # maybe more attributes added later.
This method is applicable when fingers are contacting and leaving
the touch device continuously. The same slot number, e.g., slot 0 or
slot 1, may be used for multiple times.
Note that the finger contact starts at 0. The finger contacts look to
be equal to the slot numbers in practice. However, this assumption
seems not enforced in any document. For safety, we use the ordered
finger paths dict here to guarantee that we could access the ith finger
contact path data correctly.
Also note that we do not sort finger paths by tracking IDs to derive
the ordered dict because tracking IDs may wrap around.
# ordered_finger_paths_dict is an ordered dictionary of
# {tid: FingerPath}
ordered_finger_paths_dict = OrderedDict()
sm = MtbStateMachine()
for packet in self.packets:
# Inject events into the state machine to update its state.
for event in packet:
# If there are N fingers (tids) in a packet, we will have
# N tid_packet's in the current packet. The loop below is to
# append every tid_packet into its corresponding finger_path for
# every tracking id in the current packet.
for tid, slot, tid_packet in sm.get_current_tid_data_for_all_tids(
finger_path = ordered_finger_paths_dict.setdefault(
tid, FingerPath(slot, []))
if tid_packet:
return ordered_finger_paths_dict
def get_ordered_finger_path(self, finger, attr):
"""Extract the specified attribute from the packets of the ith finger
@param finger: the ith finger contact
@param attr: an attribute in a tid packet which could be either
'point', 'pressure', or 'syn_time'
# finger_paths is a list ordered by the occurrences of finger contacts
finger_paths = self.get_ordered_finger_paths().values()
if finger < len(finger_paths) and finger >= 0:
finger_path = finger_paths[finger]
return finger_path.get(attr)
return []
def get_slot_data(self, slot, attr):
"""Extract the attribute data of the specified slot.
@param attr: an attribute in a tid packet which could be either
'point', 'pressure', or 'syn_time'
for finger_path in self.get_ordered_finger_paths().values():
if finger_path.slot == slot:
return finger_path.get(attr)
return []
def get_max_distance_of_all_tracking_ids(self):
"""Get the max moving distance of all tracking IDs."""
return max(max(get_two_min_radii(finger_path.get('point'))
for finger_path in self.get_ordered_finger_paths().values()))
def get_list_of_rocs_of_all_tracking_ids(self):
"""For each tracking ID, compute the minimal enclosing circles.
Return a list of radii of such minimal enclosing circles of
all tracking IDs.
Note: rocs denotes the radii of circles
list_rocs = []
for finger_path in self.get_ordered_finger_paths().values():
# Convert the point coordinates in pixels to in mms.
points_in_mm = [Point(*self.device.pixel_to_mm(p.value()))
for p in finger_path.get('point')]
list_rocs += get_two_min_radii(points_in_mm)
return list_rocs
def get_x_y_multiple_slots(self, target_slots):
"""Extract points in multiple slots.
Only the packets with all specified slots are extracted.
This is useful to collect packets for pinch to zoom.
# Initialize slot_exists dictionary to False
slot_exists = dict.fromkeys(target_slots, False)
# Set the initial slot number to 0 because evdev is a state machine,
# and may not present slot 0.
slot = 0
# Initialize the following dict to []
# Don't use "dict.fromkeys(target_slots, [])"
list_x = self._init_dict(target_slots, [])
list_y = self._init_dict(target_slots, [])
x = self._init_dict(target_slots, None)
y = self._init_dict(target_slots, None)
for packet in self.packets:
for event in packet:
if MtbEvent.is_ABS_MT_SLOT(event):
slot = event[MTB.EV_VALUE]
if slot not in target_slots:
if MtbEvent.is_ABS_MT_TRACKING_ID(event):
if MtbEvent.is_new_contact(event):
slot_exists[slot] = True
elif MtbEvent.is_finger_leaving(event):
slot_exists[slot] = False
elif MtbEvent.is_ABS_MT_POSITION_X(event):
x[slot] = event[MTB.EV_VALUE]
elif MtbEvent.is_ABS_MT_POSITION_Y(event):
y[slot] = event[MTB.EV_VALUE]
# Note:
# - All slot_exists must be True to append x, y positions for the
# slots.
# - All x and y values for all slots must have been reported once.
# (This handles the initial condition that no previous x or y
# is reported yet.)
# - If either x or y positions are reported in the current packet,
# append x and y to the list of that slot.
# (This handles the condition that only x or y is reported.)
# - Even in the case that neither x nor y is reported in current
# packet, cmt driver constructs and passes hwstate to gestures.
if (all(slot_exists.values()) and all(x.values()) and
for s in target_slots:
return (list_x, list_y)
def get_points_multiple_slots(self, target_slots):
"""Get the points in multiple slots."""
list_x, list_y = self.get_x_y_multiple_slots(target_slots)
points_list = [zip(list_x[slot], list_y[slot]) for slot in target_slots]
points_dict = dict(zip(target_slots, points_list))
return points_dict
def get_relative_motion(self, target_slots):
"""Get the relative motion of the two target slots."""
# The slots in target_slots could be (0, 1), (1, 2) or other
# possibilities.
slot_a, slot_b = target_slots
points_dict = self.get_points_multiple_slots(target_slots)
points_slot_a = points_dict[slot_a]
points_slot_b = points_dict[slot_b]
# if only 0 or 1 point observed, the relative motion is 0.
if len(points_slot_a) <= 1 or len(points_slot_b) <= 1:
return 0
distance_begin = self._calc_distance(points_slot_a[0], points_slot_b[0])
distance_end = self._calc_distance(points_slot_a[-1], points_slot_b[-1])
relative_motion = distance_end - distance_begin
return relative_motion
def get_points(self, target_slot):
"""Get the points in the target slot."""
list_x, list_y = self.get_x_y(target_slot)
return zip(list_x, list_y)
def get_distances(self, target_slot):
"""Get the distances of neighbor points in the target slot."""
points = self.get_points(target_slot)
distances = []
for index in range(len(points) - 1):
distance = self._calc_distance(points[index], points[index + 1])
return distances
def get_range(self):
"""Get the min and max values of (x, y) positions."""
min_x = min_y = float('infinity')
max_x = max_y = float('-infinity')
for packet in self.packets:
for event in packet:
if MtbEvent.is_ABS_MT_POSITION_X(event):
x = event[MTB.EV_VALUE]
min_x = min(min_x, x)
max_x = max(max_x, x)
elif MtbEvent.is_ABS_MT_POSITION_Y(event):
y = event[MTB.EV_VALUE]
min_y = min(min_y, y)
max_y = max(max_y, y)
return (min_x, max_x, min_y, max_y)
def get_total_motion(self, target_slot):
"""Get the total motion in the target slot."""
prev_x = prev_y = None
accu_x = accu_y = 0
slot = None
for packet in self.packets:
for event in packet:
if MtbEvent.is_ABS_MT_SLOT(event):
slot = event[MTB.EV_VALUE]
elif (MtbEvent.is_ABS_MT_POSITION_X(event) and
slot == target_slot):
x = event[MTB.EV_VALUE]
accu_x += self._calc_movement_for_axis(x, prev_x)
prev_x = x
elif (MtbEvent.is_ABS_MT_POSITION_Y(event) and
slot == target_slot):
y = event[MTB.EV_VALUE]
accu_y += self._calc_movement_for_axis(y, prev_y)
prev_y = y
return (accu_x, accu_y)
def get_max_distance(self, slot, unit):
"""Get the max distance between any two points of the specified slot."""
points = self.get_slot_data(slot, 'point')
return self.get_max_distance_from_points(points, unit)
def get_max_distance_from_points(self, points, unit):
"""Get the max distance between any two points."""
two_farthest_points = get_two_farthest_points(points)
if len(two_farthest_points) < 2:
return 0
# Convert the point coordinates from pixel to mm if necessary.
if unit == UNIT.MM:
p1, p2 = [Point(*self.device.pixel_to_mm(p.value()))
for p in two_farthest_points]
p1, p2 = two_farthest_points
return p1.distance(p2)
def get_largest_gap_ratio(self, target_slot):
"""Get the largest gap ratio in the target slot.
gap_ratio_with_prev = curr_gap / prev_gap
gap_ratio_with_next = curr_gap / next_gap
This function tries to find the largest gap_ratio_with_prev
with the restriction that gap_ratio_with_next is larger than
The ratio threshold is used to prevent the gaps detected in a swipe.
Note that in a swipe, the gaps tends to become larger and larger.
gaps = self.get_distances(target_slot)
gap_ratios = []
largest_gap_ratio = float('-infinity')
for index in range(1, len(gaps) - 1):
prev_gap = max(gaps[index - 1], 1)
curr_gap = gaps[index]
next_gap = max(gaps[index + 1], 1)
gap_ratio_with_prev = curr_gap / prev_gap
gap_ratio_with_next = curr_gap / next_gap
if (curr_gap >= GAP_LOWER_BOUND and
gap_ratio_with_prev > largest_gap_ratio and
gap_ratio_with_next > RATIO_THRESHOLD_CURR_GAP_TO_NEXT_GAP):
largest_gap_ratio = gap_ratio_with_prev
return largest_gap_ratio
def _is_large(self, numbers, index):
"""Is the number at the index a large number compared to the moving
average of the previous LEN_MOVING_AVERAGE numbers? This is used to
determine if a distance is a level jump."""
if index < self.LEN_MOVING_AVERAGE + 1:
return False
moving_sum = sum(numbers[index - self.LEN_MOVING_AVERAGE : index])
moving_average = float(moving_sum) / self.LEN_MOVING_AVERAGE
cond1 = numbers[index] >= self.LEVEL_JUMP_RATIO * moving_average
cond2 = numbers[index] >= self.LEVEL_JUMP_MAXIUM_ALLOWED
return cond1 and cond2
def _accumulate_level_jumps(self, level_jumps):
"""Accumulate level jumps."""
if not level_jumps:
return []
is_positive = level_jumps[0] > 0
tmp_sum = 0
accumulated_level_jumps = []
for jump in level_jumps:
# If current sign is the same as previous ones, accumulate it.
if (jump > 0) is is_positive:
tmp_sum += jump
# If current jump has changed its sign, reset the tmp_sum to
# accumulate the level_jumps onwards.
tmp_sum = jump
is_positive = not is_positive
if tmp_sum != 0:
return accumulated_level_jumps
def get_largest_accumulated_level_jumps(self, displacements):
"""Get the largest accumulated level jumps in displacements."""
largest_accumulated_level_jumps = 0
if len(displacements) < self.LEN_MOVING_AVERAGE + 1:
return largest_accumulated_level_jumps
# Truncate some elements at both ends of the list which are not stable.
displacements = displacements[self.LEN_DISCARD: -self.LEN_DISCARD]
distances = map(abs, displacements)
# E.g., displacements= [5, 6, 5, 6, 20, 3, 5, 4, 6, 21, 4, ...]
# level_jumps = [20, 21, ...]
level_jumps = [disp for i, disp in enumerate(displacements)
if self._is_large(distances, i)]
# E.g., level_jumps= [20, 21, -18, -25, 22, 18, -19]
# accumulated_level_jumps = [41, -43, 40, -19]
# largest_accumulated_level_jumps = 43
accumulated_level_jumps = self._accumulate_level_jumps(level_jumps)
if accumulated_level_jumps:
abs_accumulated_level_jumps = map(abs, accumulated_level_jumps)
largest_accumulated_level_jumps = max(abs_accumulated_level_jumps)
return largest_accumulated_level_jumps
def get_displacement(self, target_slot):
"""Get the displacement in the target slot."""
displace = [map(lambda p0, p1: p1 - p0, axis[:len(axis) - 1], axis[1:])
for axis in self.get_x_y(target_slot)]
displacement_dict = dict(zip((AXIS.X, AXIS.Y), displace))
return displacement_dict
def calc_displacement(self, numbers):
"""Calculate the displacements in a list of numbers."""
if len(numbers) <= 1:
return []
return [numbers[i + 1] - numbers[i] for i in range(len(numbers) - 1)]
def get_displacements_for_slots(self, min_slot):
"""Get the displacements for slots >= min_slot."""
finger_paths = self.get_ordered_finger_paths()
# Remove those tracking IDs with slots < min_slot
for tid, finger_path in finger_paths.items():
if finger_path.slot < min_slot:
del finger_paths[tid]
# Calculate the displacements of the coordinates in the tracking IDs.
displacements = defaultdict(dict)
for tid, finger_path in finger_paths.items():
finger_path_values = [p.value() for p in finger_path.get('point')]
if finger_path_values:
list_x, list_y = zip(*finger_path_values)
list_x, list_y = [], []
displacements[tid][MTB.SLOT] = finger_path.slot
displacements[tid][AXIS.X] = self.calc_displacement(list_x)
displacements[tid][AXIS.Y] = self.calc_displacement(list_y)
return displacements
def _get_segments_by_length(self, src_list, segment_flag, ratio):
"""Get the segments based on segment_flag and the ratio of the
src_list length (size).
@param src_list: could be list_x, list_y, or list_t
@param segment_flag: indicating which segment (the begin, the end, or
the middle segment) to extract
@param ratio: the ratio of the time interval of the segment
end_size = int(round(len(src_list) * ratio))
if segment_flag == VAL.WHOLE:
return src_list
elif segment_flag == VAL.MIDDLE:
return src_list[end_size: -end_size]
elif segment_flag == VAL.BEGIN:
return src_list[: end_size]
elif segment_flag == VAL.END:
return src_list[-end_size:]
elif segment_flag == VAL.BOTH_ENDS:
bgn_segment = src_list[: end_size]
end_segment = src_list[-end_size:]
return bgn_segment + end_segment
return None
def get_segments_x_and_y(self, ax, ay, segment_flag, ratio):
"""Get the segments for both x and y axes."""
segment_x = self._get_segments_by_length(ax, segment_flag, ratio)
segment_y = self._get_segments_by_length(ay, segment_flag, ratio)
return (segment_x, segment_y)
def get_segments_by_distance(self, list_t, list_coord, segment_flag, ratio):
"""Partition list_coord into the begin, the middle, and the end
segments based on segment_flag and the ratio. And then use the
derived indexes to partition list_t.
@param list_t: the list of syn_report time instants
@param list_coord: could be list_x, list_y
@param segment_flag: indicating which segment (the being, the end, or
the middle segment) to extract
@param ratio: the ratio of the distance of the begin/end segment
with the value between 0.0 and 1.0
def _find_boundary_index(list_coord, boundary_distance):
"""Find the boundary index i such that
abs(list_coord[i] - list_coord[0]) > boundary_distance
@param list_coord: a list of coordinates
@param boundary_distance: the min distance between boundary_coord
and list_coord[0]
for i, c in enumerate(list_coord):
if abs(c - list_coord[0]) > boundary_distance:
return i
end_to_end_distance = abs(list_coord[-1] - list_coord[0])
first_idx_mid_seg = _find_boundary_index(
list_coord, end_to_end_distance * ratio)
last_idx_mid_seg = _find_boundary_index(
list_coord, end_to_end_distance * (1 - ratio))
if segment_flag == VAL.WHOLE:
segment_coord = list_coord
segment_t = list_t
elif segment_flag == VAL.MIDDLE:
segment_coord = list_coord[first_idx_mid_seg:last_idx_mid_seg]
segment_t = list_t[first_idx_mid_seg:last_idx_mid_seg]
elif segment_flag == VAL.BEGIN:
segment_coord = list_coord[:first_idx_mid_seg]
segment_t = list_t[:first_idx_mid_seg]
elif segment_flag == VAL.END:
segment_coord = list_coord[last_idx_mid_seg:]
segment_t = list_t[last_idx_mid_seg:]
segment_coord = []
segment_t = []
return (segment_t, segment_coord)
def get_segments(self, list_t, list_coord, segment_flag, ratio):
"""Partition list_t and list_coord into the segments specified by
the segment_flag and the ratio.
If the list_coord stretches long enough, it represents a normal
line. We will partition list_t and list_coord by distance.
On the other hand, if the min and max values in list_coord are
close to each other, it probably does not represent a line. We will
partition list_t and list_coord by time in this case.
E.g., in the follow cases, it is better to partition using length
instead of using distance.
list_coord = [105, 105, 105, 105, 105, 105, 105, 105, 105, 105] or
list_coord = [104, 105, 105, 105, 105, 105, 105, 105, 105, 105] or
list_coord = [105, 105, 105, 105, 105, 105, 105, 105, 105, 106]
@param list_t: the list of syn_report time instants
@param list_coord: could be list_x, list_y
@param segment_flag: indicating which segment (the being, the end, or
the middle segment) to extract
@param ratio: the ratio of the distance of the begin/end segment
with the value between 0.0 and 1.0
if (max(list_coord) - min(list_coord)) > MIN_STRAIGHT_LINE_DIST:
return self.get_segments_by_distance(list_t, list_coord,
segment_flag, ratio)
return (self._get_segments_by_length(list_t, segment_flag, ratio),
self._get_segments_by_length(list_coord, segment_flag,
def get_reversed_motions(self, target_slot, direction,
segment_flag=VAL.WHOLE, ratio=None):
"""Get the total reversed motions in the specified direction
in the target slot.
Only the reversed motions specified by the segment_flag are taken.
The segment_flag could be
VAL.BEGIN: the begin segment
VAL.MIDDLE : the middle segment
VAL.END : the end segment
VAL.BOTH_ENDS : the segments at both ends
VAL.WHOLE: the whole line
The ratio represents the ratio of the BEGIN or END segment to the whole
If direction is in HORIZONTAL_DIRECTIONS, consider only x axis.
If direction is in VERTICAL_DIRECTIONS, consider only y axis.
If direction is in DIAGONAL_DIRECTIONS, consider both x and y axes.
Assume that the displacements in GV.LR (moving from left to right)
in the X axis are:
[10, 12, 8, -9, -2, 6, 8, 11, 12, 5, 2]
Its total reversed motion = (-9) + (-2) = -11
# Define the axis moving directions dictionary
POSITIVE = 'positive'
NEGATIVE = 'negative'
axis_moving_directions = AXIS_MOVING_DIRECTIONS.get(direction)
func_positive = lambda n: n > 0
func_negative = lambda n: n < 0
reversed_functions = {POSITIVE: func_negative, NEGATIVE: func_positive}
displacement_dict = self.get_displacement(target_slot)
reversed_motions = {}
for axis in AXIS.LIST:
axis_moving_direction = axis_moving_directions.get(axis)
if axis_moving_direction is None:
displacement = displacement_dict[axis]
displacement_segment = self._get_segments_by_length(
displacement, segment_flag, ratio)
reversed_func = reversed_functions[axis_moving_direction]
reversed_motions[axis] = sum(filter(reversed_func,
return reversed_motions
def get_num_packets(self, target_slot):
"""Get the number of packets in the target slot."""
list_x, list_y = self.get_x_y(target_slot)
return len(list_x)
def get_list_syn_time(self, finger):
"""Get the list of syn_time instants from the packets of the ith finger
contact if finger is not None. Otherwise, use all packets.
@param finger : the specified ith finger contact.
If a finger contact is specified, extract only the list of
syn_time from this finger contact.
Otherwise, when the finger contact is set to None, take all
packets into account. Note that the finger contact number
starts from 0.
Note: the last event in a packet, represented as packet[-1], is
'SYN_REPORT' of which the event time is the 'syn_time'.
return (self.get_ordered_finger_path(finger, 'syn_time')
if isinstance(finger, int) else
[packet[-1].get(MTB.EV_TIME) for packet in self.packets])
def _call_check_event_func(self, event, expected_value, check_event_result,
"""Call all functions in check_event_func and return the results.
Note that since check_event_result is a dictionary, it is passed
by reference.
for func in check_event_func:
if func(event):
event_value = event[MTB.EV_VALUE]
check_event_result[func] = (event_value == expected_value)
def _get_event_cycles(self, check_event_func):
"""A generic method to get the number of event cycles.
For a tap, its event cycle looks like:
(1) finger touching the touch device:
BTN_TOUCH: 0 -> 1
(2) finger leaving the touch device:
BTN_TOUCH: 1 -> 0
For a one-finger physical click, its event cycle looks like:
(1) finger clicking and pressing:
BTN_LEFT : 0-> 1
BTN_TOUCH: 0 -> 1
(2) finger leaving:
BTN_LEFT : 1-> 0
BTN_TOUCH: 1 -> 0
This method counts how many such cycles there are in the packets.
# Initialize all check_event_result to False
# when all_events_observed is False and all check_event_result are True
# => all_events_observed is set to True
# when all_events_observed is True and all check_event_result are True
# => all_events_observed is set to False, and
# count is increased by 1
check_event_result = self._init_dict(check_event_func, False)
all_events_observed = False
count = 0
for packet in self.packets:
for event in packet:
if all_events_observed:
expected_value = 0
self._call_check_event_func(event, expected_value,
if all(check_event_result.values()):
all_events_observed = False
check_event_result = self._init_dict(check_event_func,
count += 1
expected_value = 1
self._call_check_event_func(event, expected_value,
if all(check_event_result.values()):
all_events_observed = True
check_event_result = self._init_dict(check_event_func,
return count
def _get_event_cycles_for_num_fingers(self, num_fingers):
return self._get_event_cycles(self.check_event_func_list[num_fingers])
def verify_exact_number_fingers_touch(self, num_fingers):
"""Verify the exact number of fingers touching the device.
Example: for a two-finger touch
2-finger touch cycles should be equal to 1
3/4/5-finger touch cycles should be equal to 0
Don't care about 1-finger touch cycles which is not deterministic.
range_fingers = range(1, self.MAX_FINGERS)
flag_check = self._init_dict(range_fingers, True)
for f in range_fingers:
cycles = self._get_event_cycles_for_num_fingers(f)
if f == num_fingers:
flag_check[f] = cycles == 1
elif f > num_fingers:
flag_check[f] = cycles == 0
return all(flag_check)
def get_physical_clicks(self, num_fingers):
"""Get the count of physical clicks for the given number of fingers."""
flag_fingers_touch = self.verify_exact_number_fingers_touch(num_fingers)
click_cycles = self._get_event_cycles(self.check_event_func_click)
return click_cycles if flag_fingers_touch else 0
def get_raw_physical_clicks(self):
"""Get how many BTN_LEFT click events have been seen.
When calculating the raw BTN_LEFT click count, this method does not
consider whether BTN_LEFT comes with the correct finger (tracking) IDs.
A correct BTN_LEFT click consists of value 1 followed by value 0.
click_count = 0
btn_left_was_pressed = False
for packet in self.packets:
for event in packet:
# when seeing BTN_LEFT value: 0 -> 1
if (MtbEvent.is_BTN_LEFT_value(event, 1) and
not btn_left_was_pressed):
btn_left_was_pressed = True
# when seeing BTN_LEFT value: 1 -> 0
elif (MtbEvent.is_BTN_LEFT_value(event, 0) and
btn_left_was_pressed = False
click_count += 1
return click_count
def get_correct_physical_clicks(self, number_fingers):
"""Get the count of physical clicks correctly overlap with
the given number of fingers.
@param num_fingers: the expected number of fingers when a physical
click is seen
sm = MtbStateMachine()
correct_click_count = 0
click_with_correct_number_fingers = False
for packet in self.packets:
btn_left_was_pressed = False
btn_left_was_released = False
for event in packet:
if MtbEvent.is_BTN_LEFT_value(event, 1):
btn_left_was_pressed = True
elif MtbEvent.is_BTN_LEFT_value(event, 0):
btn_left_was_released = True
# Check the number of fingers only after all events in this packet
# have been processed.
if btn_left_was_pressed or btn_left_was_released:
click_with_correct_number_fingers |= (sm.number_fingers ==
# If BTN_LEFT was released, reset the flag and increase the count.
if btn_left_was_released and click_with_correct_number_fingers:
correct_click_count += 1
click_with_correct_number_fingers = False
return correct_click_count
class MtbParser:
"""Touch device MTB event Parser."""
def __init__(self):
def _get_event_re_patt(self):
"""Construct the regular expression search pattern of MTB events.
An ordinary event looks like
Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316
A SYN_REPORT event looks like
Event: time 10788.289613, -------------- SYN_REPORT ------------
# Get the pattern of an ordinary event
event_patt_time = 'Event:\s*time\s*(\d+\.\d+)'
event_patt_type = 'type\s*(\d+)\s*\(\w+\)'
event_patt_code = 'code\s*(\d+)\s*\(\w+\)'
event_patt_value = 'value\s*(-?\d+)'
event_sep = ',\s*'
event_patt = event_sep.join([event_patt_time,
self.event_re_patt = re.compile(event_patt, re.I)
# Get the pattern of the SYN_REPORT event
event_patt_type_SYN_REPORT = '-+\s*SYN_REPORT\s-+'
event_patt_SYN_REPORT = event_sep.join([event_patt_time,
self.event_re_patt_SYN_REPORT = re.compile(event_patt_SYN_REPORT, re.I)
def _get_event_dict_ordinary(self, line):
"""Construct the event dictionary for an ordinary event."""
result =
ev_dict = {}
if result is not None:
ev_dict[MTB.EV_TIME] = float(
ev_dict[MTB.EV_TYPE] = int(
ev_dict[MTB.EV_CODE] = int(
ev_dict[MTB.EV_VALUE] = int(
return ev_dict
def _get_event_dict_SYN_REPORT(self, line):
"""Construct the event dictionary for a SYN_REPORT event."""
result =
ev_dict = {}
if result is not None:
ev_dict[MTB.EV_TIME] = float(
ev_dict[MTB.SYN_REPORT] = True
return ev_dict
def _get_event_dict(self, line):
"""Construct the event dictionary."""
EVENT_FUNC_LIST = [self._get_event_dict_ordinary,
for get_event_func in EVENT_FUNC_LIST:
ev_dict = get_event_func(line)
if ev_dict:
return ev_dict
return False
def _is_SYN_REPORT(self, ev_dict):
"""Determine if this event is SYN_REPORT."""
return ev_dict.get(MTB.SYN_REPORT, False)
def parse(self, raw_event):
"""Parse the raw event string into a list of event dictionary."""
ev_list = []
packets = []
start_flag = False
for line in raw_event:
ev_dict = self._get_event_dict(line)
if ev_dict:
start_flag = True
if self._is_SYN_REPORT(ev_dict):
ev_list = []
elif start_flag:
logging.warn(' Warn: format problem in event:\n %s' % line)
return packets
def parse_file(self, file_name):
"""Parse raw device events in the given file name."""
packets = None
if os.path.isfile(file_name):
with open(file_name) as f:
packets = self.parse(f)
return packets
if __name__ == '__main__':
# Read a device file, and convert it to pretty packet format.
if len(sys.argv) != 2 or not os.path.exists(sys.argv[1]):
print 'Usage: %s device_file' % sys.argv[0]
with open(sys.argv[1]) as event_file:
packets = MtbParser().parse(event_file)
for packet in packets:
print make_pretty_packet(packet)