blob: 9376bbae6945577b71b4c3b5fb3468a7213e030e [file] [log] [blame]
#!/usr/bin/python
"""
Step file creator/editor.
@copyright: Red Hat Inc 2009
@author: mgoldish@redhat.com (Michael Goldish)
@version: "20090401"
"""
import pygtk, gtk, gobject, time, os, commands, logging
import common
from autotest_lib.client.common_lib import error
from autotest_lib.client.virt import virt_utils, ppm_utils, virt_step_editor
from autotest_lib.client.virt import kvm_monitor
pygtk.require('2.0')
class StepMaker(virt_step_editor.StepMakerWindow):
"""
Application used to create a step file. It will grab your input to the
virtual machine and record it on a 'step file', that can be played
making it possible to do unattended installs.
"""
# Constructor
def __init__(self, vm, steps_filename, tempdir, params):
virt_step_editor.StepMakerWindow.__init__(self)
self.vm = vm
self.steps_filename = steps_filename
self.steps_data_dir = ppm_utils.get_data_dir(steps_filename)
self.tempdir = tempdir
self.screendump_filename = os.path.join(tempdir, "scrdump.ppm")
self.params = params
if not os.path.exists(self.steps_data_dir):
os.makedirs(self.steps_data_dir)
self.steps_file = open(self.steps_filename, "w")
self.vars_file = open(os.path.join(self.steps_data_dir, "vars"), "w")
self.step_num = 1
self.run_time = 0
self.update_delay = 1000
self.prev_x = 0
self.prev_y = 0
self.vars = {}
self.timer_id = None
self.time_when_done_clicked = time.time()
self.time_when_actions_completed = time.time()
self.steps_file.write("# Generated by Step Maker\n")
self.steps_file.write("# Generated on %s\n" % time.asctime())
self.steps_file.write("# uname -a: %s\n" %
commands.getoutput("uname -a"))
self.steps_file.flush()
self.vars_file.write("# This file lists the vars used during recording"
" with Step Maker\n")
self.vars_file.flush()
# Done/Break HBox
hbox = gtk.HBox(spacing=10)
self.user_vbox.pack_start(hbox)
hbox.show()
self.button_break = gtk.Button("Break")
self.button_break.connect("clicked", self.event_break_clicked)
hbox.pack_start(self.button_break)
self.button_break.show()
self.button_done = gtk.Button("Done")
self.button_done.connect("clicked", self.event_done_clicked)
hbox.pack_start(self.button_done)
self.button_done.show()
# Set window title
self.window.set_title("Step Maker")
# Connect "capture" button
self.button_capture.connect("clicked", self.event_capture_clicked)
# Switch to run mode
self.switch_to_run_mode()
def destroy(self, widget):
self.vm.monitor.cmd("cont")
self.steps_file.close()
self.vars_file.close()
virt_step_editor.StepMakerWindow.destroy(self, widget)
# Utilities
def redirect_timer(self, delay=0, func=None):
if self.timer_id != None:
gobject.source_remove(self.timer_id)
self.timer_id = None
if func != None:
self.timer_id = gobject.timeout_add(delay, func,
priority=gobject.PRIORITY_LOW)
def switch_to_run_mode(self):
# Set all widgets to their default states
self.clear_state(clear_screendump=False)
# Enable/disable some widgets
self.button_break.set_sensitive(True)
self.button_done.set_sensitive(False)
self.data_vbox.set_sensitive(False)
# Give focus to the Break button
self.button_break.grab_focus()
# Start the screendump timer
self.redirect_timer(100, self.update)
# Resume the VM
self.vm.monitor.cmd("cont")
def switch_to_step_mode(self):
# Set all widgets to their default states
self.clear_state(clear_screendump=False)
# Enable/disable some widgets
self.button_break.set_sensitive(False)
self.button_done.set_sensitive(True)
self.data_vbox.set_sensitive(True)
# Give focus to the keystrokes entry widget
self.entry_keys.grab_focus()
# Start the screendump timer
self.redirect_timer()
# Stop the VM
self.vm.monitor.cmd("stop")
# Events in step mode
def update(self):
self.redirect_timer()
if os.path.exists(self.screendump_filename):
os.unlink(self.screendump_filename)
try:
self.vm.monitor.screendump(self.screendump_filename, debug=False)
except kvm_monitor.MonitorError, e:
logging.warn(e)
else:
self.set_image_from_file(self.screendump_filename)
self.redirect_timer(self.update_delay, self.update)
return True
def event_break_clicked(self, widget):
if not self.vm.is_alive():
self.message("The VM doesn't seem to be alive.", "Error")
return
# Switch to step mode
self.switch_to_step_mode()
# Compute time elapsed since last click on "Done" and add it
# to self.run_time
self.run_time += time.time() - self.time_when_done_clicked
# Set recording time widget
self.entry_time.set_text("%.2f" % self.run_time)
# Update screendump ID
self.update_screendump_id(self.steps_data_dir)
# By default, check the barrier checkbox
self.check_barrier.set_active(True)
# Set default sleep and barrier timeout durations
time_delta = time.time() - self.time_when_actions_completed
if time_delta < 1.0: time_delta = 1.0
self.spin_sleep.set_value(round(time_delta))
self.spin_barrier_timeout.set_value(round(time_delta * 5))
# Set window title
self.window.set_title("Step Maker -- step %d at time %.2f" %
(self.step_num, self.run_time))
def event_done_clicked(self, widget):
# Get step lines and screendump
lines = self.get_step_lines(self.steps_data_dir)
if lines == None:
return
# Get var values from user and write them to vars file
vars = {}
for line in lines.splitlines():
words = line.split()
if words and words[0] == "var":
varname = words[1]
if varname in self.vars.keys():
val = self.vars[varname]
elif varname in vars.keys():
val = vars[varname]
elif varname in self.params.keys():
val = self.params[varname]
vars[varname] = val
else:
val = self.inputdialog("$%s =" % varname, "Variable")
if val == None:
return
vars[varname] = val
for varname in vars.keys():
self.vars_file.write("%s=%s\n" % (varname, vars[varname]))
self.vars.update(vars)
# Write step lines to file
self.steps_file.write("# " + "-" * 32 + "\n")
self.steps_file.write(lines)
# Flush buffers of both files
self.steps_file.flush()
self.vars_file.flush()
# Remember the current time
self.time_when_done_clicked = time.time()
# Switch to run mode
self.switch_to_run_mode()
# Send commands to VM
for line in lines.splitlines():
words = line.split()
if not words:
continue
elif words[0] == "key":
self.vm.send_key(words[1])
elif words[0] == "var":
val = self.vars.get(words[1])
if not val:
continue
self.vm.send_string(val)
elif words[0] == "mousemove":
self.vm.monitor.mouse_move(-8000, -8000)
time.sleep(0.5)
self.vm.monitor.mouse_move(words[1], words[2])
time.sleep(0.5)
elif words[0] == "mouseclick":
self.vm.monitor.mouse_button(words[1])
time.sleep(0.1)
self.vm.monitor.mouse_button(0)
# Remember the current time
self.time_when_actions_completed = time.time()
# Move on to next step
self.step_num += 1
def event_capture_clicked(self, widget):
self.message("Mouse actions disabled (for now).", "Sorry")
return
self.image_width_backup = self.image_width
self.image_height_backup = self.image_height
self.image_data_backup = self.image_data
gtk.gdk.pointer_grab(self.event_box.window, False,
gtk.gdk.BUTTON_PRESS_MASK |
gtk.gdk.BUTTON_RELEASE_MASK)
# Create empty cursor
pix = gtk.gdk.Pixmap(self.event_box.window, 1, 1, 1)
color = gtk.gdk.Color()
cursor = gtk.gdk.Cursor(pix, pix, color, color, 0, 0)
self.event_box.window.set_cursor(cursor)
gtk.gdk.display_get_default().warp_pointer(gtk.gdk.screen_get_default(),
self.prev_x, self.prev_y)
self.redirect_event_box_input(
self.event_capture_button_press,
self.event_capture_button_release,
self.event_capture_scroll)
self.redirect_timer(10, self.update_capture)
self.vm.monitor.cmd("cont")
# Events in mouse capture mode
def update_capture(self):
self.redirect_timer()
(screen, x, y, flags) = gtk.gdk.display_get_default().get_pointer()
self.mouse_click_coords[0] = int(x * self.spin_sensitivity.get_value())
self.mouse_click_coords[1] = int(y * self.spin_sensitivity.get_value())
delay = self.spin_latency.get_value() / 1000
if (x, y) != (self.prev_x, self.prev_y):
self.vm.monitor.mouse_move(-8000, -8000)
time.sleep(delay)
self.vm.monitor.mouse_move(self.mouse_click_coords[0],
self.mouse_click_coords[1])
time.sleep(delay)
self.prev_x = x
self.prev_y = y
if os.path.exists(self.screendump_filename):
os.unlink(self.screendump_filename)
try:
self.vm.monitor.screendump(self.screendump_filename, debug=False)
except kvm_monitor.MonitorError, e:
logging.warn(e)
else:
self.set_image_from_file(self.screendump_filename)
self.redirect_timer(int(self.spin_latency.get_value()),
self.update_capture)
return True
def event_capture_button_press(self, widget,event):
pass
def event_capture_button_release(self, widget,event):
gtk.gdk.pointer_ungrab()
self.event_box.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.CROSSHAIR))
self.redirect_event_box_input(
self.event_button_press,
self.event_button_release,
None,
None,
self.event_expose)
self.redirect_timer()
self.vm.monitor.cmd("stop")
self.mouse_click_captured = True
self.mouse_click_button = event.button
self.set_image(self.image_width_backup, self.image_height_backup,
self.image_data_backup)
self.check_mousemove.set_sensitive(True)
self.check_mouseclick.set_sensitive(True)
self.check_mousemove.set_active(True)
self.check_mouseclick.set_active(True)
self.update_mouse_click_info()
def event_capture_scroll(self, widget, event):
if event.direction == gtk.gdk.SCROLL_UP:
direction = 1
else:
direction = -1
self.spin_sensitivity.set_value(self.spin_sensitivity.get_value() +
direction)
pass
def run_stepmaker(test, params, env):
vm = env.get_vm(params.get("main_vm"))
if not vm:
raise error.TestError("VM object not found in environment")
if not vm.is_alive():
raise error.TestError("VM seems to be dead; Step Maker requires a"
" living VM")
steps_filename = params.get("steps")
if not steps_filename:
raise error.TestError("Steps filename not specified")
steps_filename = virt_utils.get_path(test.bindir, steps_filename)
if os.path.exists(steps_filename):
raise error.TestError("Steps file %s already exists" % steps_filename)
StepMaker(vm, steps_filename, test.debugdir, params)
gtk.main()