blob: 7186bf4ee7b7a5ae20ef5fa9158cd849c2b8be57 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
''' A module for capturing X events '''
import logging
import os
import re
import subprocess
import tempfile
import time
import common_util
import trackpad_util
import Xevent
from trackpad_util import Display, read_trackpad_test_conf
from Xevent import reset_x_input_prop, set_x_input_prop
class Mtplot:
''' Create a mtplot window for Xcapture to listen to '''
def __init__(self, display, error):
display.set_environ()
self.error = error
self._create_mtplot_window()
self.id = self._get_mtplot_win_id()
if self.id is None:
raise error.TestError('Failure on deriving window id in focus.')
def _create_mtplot_window(self):
trackpad_device_file, msg = trackpad_util.get_trackpad_device_file()
mtplot_cmd = 'mtplot -c 0 %s' % trackpad_device_file
self.null_file = open('/dev/null')
try:
self.proc = subprocess.Popen(mtplot_cmd.split(),
stdout=self.null_file)
except:
err_msg = 'Cannot start program: %s' % mtplot_cmd
raise self.error.TestError(err_msg)
time.sleep(0.1)
if self.proc.poll() is not None:
raise self.error.TestError('Failure on "%s" [%d]' %
(mtplot_cmd, self.proc.returncode))
logging.info('"%s" has been launched.' % mtplot_cmd)
def _get_mtplot_win_id(self):
'''Use xwininfo to derive the window id of mtplot.'''
# A typical mtplot window info in xwininfo is
# 0x800001 "mtplot": () 2560x1700+0+0 +0+0
cmd = 'xwininfo -root -tree | grep mtplot'
win_info = common_util.simple_system_output(cmd)
win_id = win_info.strip().split()[0] if win_info else None
return win_id
def destroy(self):
''' Destroy the mtplot process. '''
self.proc.terminate()
self.proc.kill()
self.proc.wait()
self.null_file.close()
class Xcapture:
''' A class to capture X events '''
def __init__(self, error, conf_path):
# Set X display server and xauthority.
self.display = Display()
self.display.set_environ()
self.xcapture_dir = '/tmp/xevent'
self.fd = None
self.proc = None
self.error = error
self.conf_path = conf_path
# Create the directory if not existent.
if not os.path.exists(self.xcapture_dir):
try:
os.mkdir(self.xcapture_dir)
except OSError:
err_msg = 'Fail to make directory: %s' % self.xcapture_dir
raise self.error.TestError(err_msg)
# Create a tmp file to capture the X events for all gesture files
self.fd_all = tempfile.NamedTemporaryFile()
self.xcapture_file_all = self.fd_all.name
# Enable X Scroll Buttons and Tap Enable if they are not enabled yet
self.scroll_butons = set_x_input_prop(Xevent.X_PROP_SCROLL_BUTTONS)
self.tap_enable = set_x_input_prop(Xevent.X_PROP_TAP_ENABLE)
# Launch the capture process
mtplot_gui = read_trackpad_test_conf('mtplot_gui', conf_path)
if mtplot_gui:
self.mtplot = Mtplot(self.display, error)
self.xcapture_cmd = 'xev -id %s' % self.mtplot.id
else:
self.mtplot = None
self.xcapture_cmd = 'xev -geometry %s' % self._root_geometry()
self._launch(self.fd_all)
logging.info('X events will be saved in %s' % self.xcapture_dir)
logging.info('X events capture command: %s' % self.xcapture_cmd)
def _root_geometry(self):
"""Get the geometry of the root window.
The geometry string looks like:
-geometry 2560x1700+0+0
"""
cmd = 'xwininfo -root | grep geometry'
geometry_str = common_util.simple_system_output(cmd)
_, geometry = geometry_str.split()
return geometry
def _open_file(self, filename):
try:
fd = open(filename, 'w+')
except:
err_msg = 'Cannot open file to save X events: %s'
raise self.error.TestError(err_msg % filename)
return fd
def _launch(self, fd):
''' Launch the capture program '''
try:
self.proc = subprocess.Popen(self.xcapture_cmd.split(), stdout=fd)
except:
err_msg = 'Cannot start capture program: %s' % self.xcapture_cmd
raise self.error.TestError(err_msg)
def start(self, filename):
''' Start capture program '''
self.display.move_cursor_to_center()
self.xcapture_file = os.path.join(self.xcapture_dir, filename) + '.xev'
self.fd = self._open_file(self.xcapture_file)
def wait(self):
''' Wait until timeout or max_post_replay_time expires.
The wait loop is terminated if either of the conditions is true:
(Cond 1) Normal timeout: there are no more X events coming in
before timeout; or
(Cond 2) Endless xevents: the duration of X event emission after the
completion of playback, typically observed in coasting,
exceeds max_post_replay_time. In this case, the X events
keep coming in for a while. We need to interrupt it after
max_post_replay_time expires so that the waiting will not
last forever due to a possible driver bug.
'''
timeout_str = 'xcapture_timeout'
max_time_str = 'xcapture_max_post_replay_time'
conf_path = self.conf_path
timeout = read_trackpad_test_conf(timeout_str, conf_path)
max_post_replay_time = read_trackpad_test_conf(max_time_str, conf_path)
interval = timeout / 10.0
with open(self.xcapture_file_all) as fd_all:
now = latest_event_time = start_time = time.time()
# Cond2: keep looping while cond2 does not occur
while (now - start_time <= max_post_replay_time):
time.sleep(interval)
now = time.time()
if fd_all.read():
latest_event_time = now
# Cond1: if cond1_normal_timeout occurs, exit the loop
if (now - latest_event_time > timeout):
return True
else:
# Cond2 occurs
max_warn = 'Warning: max_post_replay_time (%d seconds) expires'
logging.info(max_warn % max_post_replay_time)
return False
def read(self):
''' Read packet data from the device file '''
with open(self.xcapture_file) as fd:
return fd.readlines()
def stop(self):
''' Make a copy of the X events and close the file '''
fd_is_open = self.fd is not None and not self.fd.closed
fd_all_is_open = self.fd_all is not None and not self.fd_all.closed
if fd_is_open:
if fd_all_is_open:
# Make a copy of the X events for this gesture file
self.fd_all.seek(os.SEEK_SET)
self.fd.write(self.fd_all.read())
# Truncate xcapture_file_all
self.fd_all.seek(os.SEEK_SET)
self.fd_all.truncate()
# Close the X event capture file for this gesture file
self.fd.flush()
os.fsync(self.fd.fileno())
self.fd.close()
self.fd = None
def terminate(self):
''' Terminate the X capture subprocess and destroy the popup window '''
# Terminate the X capture subprocess
if self.fd_all is not None and not self.fd_all.closed:
self.fd_all.close()
self.fd_all = None
self.proc.terminate()
self.proc.kill()
self.proc.wait()
# Destroy the mtplot window if exists.
if self.mtplot:
self.mtplot.destroy()
# Reset X Scroll Buttons and Tap Enable if they were disabled originally
reset_x_input_prop(self.scroll_butons)
reset_x_input_prop(self.tap_enable)