blob: edd5b86ac53068abc34025e9515d5e862769d4b4 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides MTB parser and related packet methods."""
import logging
import math
import os
import re
import sys
from linux_input import *
# Include some constants
execfile('', globals())
def make_pretty_packet(packet):
"""Convert the event list in a packet to a pretty format."""
pretty_packet = []
for event in packet:
pretty_event = []
pretty_event.append('time %.6f,' % event[EV_TIME])
if event.get(SYN_REPORT):
pretty_event.append('-------------- SYN_REPORT ------------\n')
ev_type = event[EV_TYPE]
pretty_event.append('type %d (%s),' % (ev_type, EV_TYPES[ev_type]))
ev_code = event[EV_CODE]
pretty_event.append('code %d (%s),' %
(ev_code, EV_STRINGS[ev_type][ev_code]))
pretty_event.append('value %d' % event[EV_VALUE])
pretty_packet.append(' '.join(pretty_event))
return '\n'.join(pretty_packet)
class MTB:
"""An MTB class providing MTB format related utility methods."""
def __init__(self, packets):
self.packets = packets
def _is_new_contact(self, event):
"""is this packet generating new contact (Tracking ID)?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
event[EV_VALUE] != -1)
def _is_ABS_MT_SLOT(self, event):
"""Is this packet ABS_MT_SLOT?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
event[EV_CODE] == ABS_MT_SLOT)
def _is_ABS_MT_POSITION_X(self, event):
"""Is this packet ABS_MT_POSITION_X?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
def _is_ABS_MT_POSITION_Y(self, event):
"""Is this packet ABS_MT_POSITION_Y?"""
return (not event.get(SYN_REPORT) and
event[EV_TYPE] == EV_ABS and
def _calc_movement_for_axis(self, x, prev_x):
"""Calculate the distance moved in an axis."""
return abs(x - prev_x) if prev_x is not None else 0
def _calc_distance(self, (x0, y0), (x1, y1)):
"""Calculate the distance between two points."""
dist_x = x1 - x0
dist_y = y1 - y0
return math.sqrt(dist_x * dist_x + dist_y * dist_y)
def get_number_contacts(self):
"""Get the number of contacts (Tracking IDs)."""
num_contacts = 0
for packet in self.packets:
for event in packet:
if self._is_new_contact(event):
num_contacts += 1
return num_contacts
def get_x_y(self, target_slot):
"""Extract x and y positions in the target slot."""
slot = 0
list_x = []
list_y = []
prev_x = prev_y = None
for packet in self.packets:
found_flag = False
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
elif self._is_ABS_MT_POSITION_X(event) and slot == target_slot:
prev_x = event[EV_VALUE]
found_flag = True
elif self._is_ABS_MT_POSITION_Y(event) and slot == target_slot:
prev_y = event[EV_VALUE]
found_flag = True
# If either x or y positions are reported in the current packet,
# append the x and y to the list.
# This handles the condition that only x or y is reported.
# This also handles the initial condition that no previous x or y
# is reported yet.
if found_flag and prev_x and prev_y:
return (list_x, list_y)
def get_points(self, target_slot):
"""Get the points in the target slot."""
list_x, list_y = self.get_x_y(target_slot)
return zip(list_x, list_y)
def get_distances(self, target_slot):
"""Get the distances of neighbor points in the target slot."""
points = self.get_points(target_slot)
distances = []
for index in range(len(points) - 1):
distance = self._calc_distance(points[index], points[index + 1])
return distances
def get_distances_with_first_point(self, target_slot):
"""Get distances of the points in the target_slot with first point."""
points = self.get_points(target_slot)
point0 = points[0]
distances = [self._calc_distance(point, point0) for point in points]
return distances
def get_range(self):
"""Get the min and max values of (x, y) positions."""
min_x = min_y = float('infinity')
max_x = max_y = float('-infinity')
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_POSITION_X(event):
x = event[EV_VALUE]
min_x = min(min_x, x)
max_x = max(max_x, x)
elif self._is_ABS_MT_POSITION_Y(event):
y = event[EV_VALUE]
min_y = min(min_y, y)
max_y = max(max_y, y)
return (min_x, max_x, min_y, max_y)
def get_total_motion(self, target_slot):
"""Get the total motion in the target slot."""
prev_x = prev_y = None
accu_x = accu_y = 0
slot = None
for packet in self.packets:
for event in packet:
if self._is_ABS_MT_SLOT(event):
slot = event[EV_VALUE]
elif self._is_ABS_MT_POSITION_X(event) and slot == target_slot:
x = event[EV_VALUE]
accu_x += self._calc_movement_for_axis(x, prev_x)
prev_x = x
elif self._is_ABS_MT_POSITION_Y(event) and slot == target_slot:
y = event[EV_VALUE]
accu_y += self._calc_movement_for_axis(y, prev_y)
prev_y = y
return (accu_x, accu_y)
def get_largest_distance(self, target_slot):
"""Get the largest distance of point to the first point."""
distances = self.get_distances_with_first_point(target_slot)
return max(distances)
def get_largest_gap_ratio(self, target_slot):
"""Get the largest gap ratio in the target slot."""
gaps = self.get_distances(target_slot)
gap_ratios = []
for index in range(1, len(gaps) - 1):
prev_gap = max(gaps[index - 1], 1)
curr_gap = gaps[index]
next_gap = max(gaps[index + 1], 1)
gap_ratios.append(2.0 * curr_gap / (prev_gap + next_gap))
largest_gap_ratio = max(gap_ratios) if gap_ratios else 0
return largest_gap_ratio
class MTBParser:
"""Touchpad MTB event Parser."""
def __init__(self):
def _get_event_re_patt(self):
"""Construct the regular expression search pattern of MTB events.
An ordinary event looks like
Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316
A SYN_REPORT event looks like
Event: time 10788.289613, -------------- SYN_REPORT ------------
# Get the pattern of an ordinary event
event_patt_time = 'Event:\s*time\s*(\d+\.\d+)'
event_patt_type = 'type\s*(\d+)\s*\(\w+\)'
event_patt_code = 'code\s*(\d+)\s*\(\w+\)'
event_patt_value = 'value\s*(-?\d+)'
event_sep = ',\s*'
event_patt = event_sep.join([event_patt_time,
self.event_re_patt = re.compile(event_patt, re.I)
# Get the pattern of the SYN_REPORT event
event_patt_type_SYN_REPORT = '-+\s*SYN_REPORT\s-+'
event_patt_SYN_REPORT = event_sep.join([event_patt_time,
self.event_re_patt_SYN_REPORT = re.compile(event_patt_SYN_REPORT, re.I)
def _get_event_dict_ordinary(self, line):
"""Construct the event dictionary for an ordinary event."""
result =
ev_dict = {}
if result is not None:
ev_dict[EV_TIME] = float(
ev_dict[EV_TYPE] = int(
ev_dict[EV_CODE] = int(
ev_dict[EV_VALUE] = int(
return ev_dict
def _get_event_dict_SYN_REPORT(self, line):
"""Construct the event dictionary for a SYN_REPORT event."""
result =
ev_dict = {}
if result is not None:
ev_dict[EV_TIME] = float(
ev_dict[SYN_REPORT] = True
return ev_dict
def _get_event_dict(self, line):
"""Construct the event dictionary."""
EVENT_FUNC_LIST = [self._get_event_dict_ordinary,
for get_event_func in EVENT_FUNC_LIST:
ev_dict = get_event_func(line)
if ev_dict:
return ev_dict
return False
def _is_SYN_REPORT(self, ev_dict):
"""Determine if this event is SYN_REPORT."""
return ev_dict.get(SYN_REPORT, False)
def parse(self, raw_event):
"""Parse the raw event string into a list of event dictionary."""
ev_list = []
packets = []
start_flag = False
for line in raw_event:
ev_dict = self._get_event_dict(line)
if ev_dict:
start_flag = True
if self._is_SYN_REPORT(ev_dict):
ev_list = []
elif start_flag:
logging.warn(' Warn: format problem in event:\n %s' % line)
return packets
def parse_file(self, file_name):
"""Parse raw device events in the given file name."""
packets = None
if os.path.isfile(file_name):
with open(file_name) as f:
packets = self.parse(f)
return packets
if __name__ == '__main__':
# Read a device file, and convert it to pretty packet format.
if len(sys.argv) != 2 or not os.path.exists(sys.argv[1]):
print 'Usage: %s device_file' % sys.argv[0]
with open(sys.argv[1]) as event_file:
packets = MTBParser().parse(event_file)
for packet in packets:
print make_pretty_packet(packet)