| #!/usr/bin/env python |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import ctypes |
| import select |
| |
| import xi2 |
| import xlib |
| |
| |
| class XI2Reader(object): |
| """A reader to create connection to X server and read x input events.""" |
| def __init__(self, display_name=':0'): |
| """Constructor |
| |
| Args: |
| display_name: The X window display name. |
| """ |
| self._display = xlib.XOpenDisplay(display_name) |
| self._window = xlib.XDefaultRootWindow(self._display) |
| self._data = [] |
| |
| self._register() |
| |
| # Consumes the very first traffic within the connection with X server. |
| xlib.XFlush(self._display) |
| |
| def _register(self): |
| """Registers device and events to listen on""" |
| mask = xi2.XIEventMask() |
| mask.deviceid = xi2.XIAllDevices |
| mask.mask_len = xi2.XIMaskLen(xi2.XI_RawMotion) |
| mask.mask = ctypes.cast((ctypes.c_ubyte * mask.mask_len)(), |
| ctypes.POINTER(ctypes.c_ubyte)) |
| |
| self._set_mask(mask.mask, xi2.XI_RawKeyPress) |
| self._set_mask(mask.mask, xi2.XI_RawKeyRelease) |
| self._set_mask(mask.mask, xi2.XI_RawButtonPress) |
| self._set_mask(mask.mask, xi2.XI_RawButtonRelease) |
| self._set_mask(mask.mask, xi2.XI_RawMotion) |
| |
| xi2.XISelectEvents(self._display, self._window, ctypes.pointer(mask), 1) |
| xlib.XSelectInput(self._display, self._window, ctypes.c_long(0)) |
| |
| def _set_mask(self, ptr, event): |
| """Sets event mask""" |
| val = xi2.XISetMask(ptr, event) |
| ptr[event >> 3] = val |
| |
| def get_valuator_names(self, device_id): |
| """Gets the valuator names for device. |
| |
| Return: |
| An dictionary maps valuator index to descriptive names. |
| Sample output: |
| { |
| 0: 'Rel X', |
| 1: 'Rel Y', |
| 2: 'Abs Start Timestamp', |
| 3: 'Abs End Timestamp', |
| 4: 'Rel Vert Wheel', |
| 5: 'Rel Horiz Wheel' |
| } |
| """ |
| num_devices = ctypes.c_int() |
| device = xi2.XIQueryDevice(self._display, device_id, |
| ctypes.pointer(num_devices)).contents |
| |
| valuator_names = [] |
| for i in range(device.num_classes): |
| if device.classes[i].contents.type == xi2.XIValuatorClass: |
| valuator_class_info = ctypes.cast(device.classes[i], |
| ctypes.POINTER(xi2.XIValuatorClassInfo)).contents |
| valuator_names.append(xlib.XGetAtomName(reader._display, |
| valuator_class_info.label)) |
| valuator_names_dict = {} |
| for i in range(len(valuator_names)): |
| valuator_names_dict[i] = valuator_names[i] |
| return valuator_names_dict |
| |
| def get_connection_number(self): |
| """Gets the file descriptor number for the connection with X server""" |
| return xlib.XConnectionNumber(reader._display) |
| |
| def read_pending_events(self): |
| """Read all the new event datas. |
| |
| Return: |
| An array contains all event data with event type and valuator |
| values. Sample format: |
| { |
| 'deviceid': 11, |
| 'evtype': 17, |
| 'time': 406752437L, |
| 'valuators': { |
| 0: (396.0, -38.0), |
| 1: (578.0, -21.0), |
| 2: (22802890.0, 22802890.0), |
| 3: (26145746.0, 26145746.0) |
| } |
| } |
| """ |
| data = [] |
| while xlib.XPending(self._display): |
| xevent = xlib.XEvent() |
| xlib.XNextEvent(self._display, ctypes.pointer(xevent)) |
| cookie = xevent.xcookie |
| |
| # Get event valuator_data |
| result = xlib.XGetEventData(self._display, ctypes.pointer(cookie)) |
| if (not result or cookie.type != xlib.GenericEvent): |
| continue |
| |
| raw_event_ptr = ctypes.cast(cookie.data, |
| ctypes.POINTER(xi2.XIRawEvent)) |
| raw_event = raw_event_ptr.contents |
| valuator_state = raw_event.valuators |
| |
| # Two value arrays |
| val_ptr = valuator_state.values |
| val_idx = 0 |
| raw_val_ptr = raw_event.raw_values |
| raw_val_idx = 0 |
| |
| valuator_data = {} |
| for i in range(valuator_state.mask_len): |
| if xi2.XIMaskIsSet(valuator_state.mask, i): |
| valuator_data[i] = (val_ptr[val_idx], |
| raw_val_ptr[raw_val_idx]) |
| val_idx += 1 |
| raw_val_idx += 1 |
| data.append({'deviceid': raw_event.deviceid, |
| 'evtype': cookie.evtype, |
| 'time': raw_event.time, |
| 'valuators': valuator_data}) |
| return data |
| |
| |
| if __name__ == '__main__': |
| reader = XI2Reader() |
| fd = reader.get_connection_number() |
| |
| while True: |
| rl, _, _ = select.select([fd], [], []) |
| if fd not in rl: |
| break |
| print reader.read_pending_events() |