blob: 2937777397b88bc041ef0887eaa7dc652029dabe [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
''' A module for capturing X events '''
import logging
import os
import subprocess
import tempfile
import time
from trackpad_util import Display, read_trackpad_test_conf
class Xcapture:
''' A class to capture X events '''
def __init__(self, error, conf_path, autox):
# Set X display server and xauthority.
self.display = Display()
self.display.set_environ()
self.xcapture_dir = '/tmp/xevent'
self.fd = None
self.proc = None
self.error = error
self.conf_path = conf_path
# Create the directory if not existent.
if not os.path.exists(self.xcapture_dir):
try:
os.mkdir(self.xcapture_dir)
except OSError:
err_msg = 'Fail to make directory: %s' % self.xcapture_dir
raise self.error.TestError(err_msg)
# Create a popup window with override-redirect so that we can
# manipulate its geometry.
# win_x, win_y: the coordinates of the popup window
# win_width, win_height: the width and height of the popup window
self.win_x = 0
self.win_y = 0
self.win_width = self.display.screen.width_in_pixels
self.win_height = self.display.screen.height_in_pixels
win_geometry = (self.win_x, self.win_y, self.win_width, self.win_height)
self.ax = autox
# Create the popup window
self.popup_win = self.ax.create_and_map_window(x=self.win_x,
y=self.win_y,
width=self.win_width,
height=self.win_height,
title='Xcapture_Popup',
override_redirect=True)
popup_info = self.ax.get_window_info(self.popup_win.id)
# Check that the popup window appears in the required position.
# The default timeout of await_condition is 10 seconds, which looks
# reasonable here too.
try:
self.ax.await_condition(
lambda: popup_info.get_geometry() == win_geometry,
desc='Check window 0x%x\'s geometry' % self.popup_win.id)
except self.ax.ConditionTimeoutError as exception:
raise error.TestFail('Timed out on condition: %s' %
exception.__str__())
# Create a tmp file to capture the X events for all gesture files
self.fd_all = tempfile.NamedTemporaryFile()
self.xcapture_file_all = self.fd_all.name
# Launch the capture process
self.xcapture_cmd = 'xev -id 0x%x' % int(self.popup_win.id)
self._launch(self.fd_all)
logging.info('X events will be saved in %s' % self.xcapture_dir)
logging.info('X events capture program: %s' % self.xcapture_cmd)
logging.info('X events capture program geometry: %dx%d+%d+%d' %
(self.win_width, self.win_height, self.win_x, self.win_y))
def _open_file(self, filename):
try:
fd = open(filename, 'w+')
except:
err_msg = 'Cannot open file to save X events: %s'
raise self.error.TestError(err_msg % filename)
return fd
def _launch(self, fd):
''' Launch the capture program '''
try:
self.proc = subprocess.Popen(self.xcapture_cmd.split(), stdout=fd)
except:
err_msg = 'Cannot start capture program: %s' % self.xcapture_cmd
raise self.error.TestError(err_msg)
def start(self, filename):
''' Start capture program '''
self.display.move_cursor_to_center()
self.xcapture_file = os.path.join(self.xcapture_dir, filename) + '.xev'
self.fd = self._open_file(self.xcapture_file)
def wait(self):
''' Wait until timeout or max_post_replay_time expires.
The wait loop is terminated if either of the conditions is true:
(Cond 1) Normal timeout: there are no more X events coming in
before timeout; or
(Cond 2) Endless xevents: the duration of X event emission after the
completion of playback, typically observed in coasting,
exceeds max_post_replay_time. In this case, the X events
keep coming in for a while. We need to interrupt it after
max_post_replay_time expires so that the waiting will not
last forever due to a possible driver bug.
'''
timeout_str = 'xcapture_timeout'
max_time_str = 'xcapture_max_post_replay_time'
conf_path = self.conf_path
timeout = read_trackpad_test_conf(timeout_str, conf_path)
max_post_replay_time = read_trackpad_test_conf(max_time_str, conf_path)
interval = timeout / 10.0
with open(self.xcapture_file_all) as fd_all:
now = latest_event_time = start_time = time.time()
# Cond2: keep looping while cond2 does not occur
while (now - start_time <= max_post_replay_time):
time.sleep(interval)
now = time.time()
if fd_all.read() != '':
latest_event_time = now
# Cond1: if cond1_normal_timeout occurs, exit the loop
if (now - latest_event_time > timeout):
return True
else:
# Cond2 occurs
max_warn = 'Warning: max_post_replay_time (%d seconds) expires'
logging.info(max_warn % max_post_replay_time)
return False
def read(self):
''' Read packet data from the device file '''
with open(self.xcapture_file) as fd:
return fd.readlines()
def stop(self):
''' Make a copy of the X events and close the file '''
fd_is_open = self.fd is not None and not self.fd.closed
fd_all_is_open = self.fd_all is not None and not self.fd_all.closed
if fd_is_open:
if fd_all_is_open:
# Make a copy of the X events for this gesture file
self.fd_all.seek(os.SEEK_SET)
self.fd.write(self.fd_all.read())
# Truncate xcapture_file_all
self.fd_all.seek(os.SEEK_SET)
self.fd_all.truncate()
# Close the X event capture file for this gesture file
self.fd.flush()
os.fsync(self.fd.fileno())
self.fd.close()
self.fd = None
def terminate(self):
''' Terminate the X capture subprocess and destroy the popup window '''
# Terminate the X capture subprocess
if self.fd_all is not None and not self.fd_all.closed:
self.fd_all.close()
self.fd_all = None
self.proc.terminate()
self.proc.kill()
self.proc.wait()
# Destroy the popup window
self.popup_win.destroy()
self.ax.sync()