| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import cgi |
| import json |
| import logging |
| import logging.handlers |
| import os |
| import sys |
| |
| import common |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib.cros import chrome, xmlrpc_server |
| from autotest_lib.client.cros import constants |
| |
| |
| class InteractiveXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): |
| """Exposes methods called remotely to create interactive tests. |
| |
| All instance methods of this object without a preceding '_' are exposed via |
| an XML-RPC server. This is not a stateless handler object, which means that |
| if you store state inside the delegate, that state will remain around for |
| future calls. |
| """ |
| |
| def login(self): |
| """Login to the system and open a tab. |
| |
| The tab opened is used by other methods on this server to interact |
| with the user. |
| |
| @return True. |
| |
| """ |
| self._chrome = chrome.Chrome() |
| self._chrome.browser.platform.SetHTTPServerDirectories( |
| os.path.dirname(sys.argv[0])) |
| self._tab = self._chrome.browser.tabs[0] |
| self._tab.Navigate( |
| self._chrome.browser.platform.http_server.UrlOf('shell.html')) |
| |
| return True |
| |
| |
| def set_output(self, html): |
| """Replace the contents of the tab. |
| |
| @param html: HTML document to replace tab contents with. |
| |
| @return True. |
| |
| """ |
| # JSON does a better job of escaping HTML for JavaScript than we could |
| # with string.replace(). |
| html_escaped = json.dumps(html) |
| # Use JavaScript to append the output and scroll to the bottom of the |
| # open tab. |
| self._tab.ExecuteJavaScript('document.body.innerHTML = %s; ' % |
| html_escaped) |
| self._tab.Activate() |
| self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() |
| return True |
| |
| |
| def append_output(self, html): |
| """Append HTML to the contents of the tab. |
| |
| @param html: HTML to append to the existing tab contents. |
| |
| @return True. |
| |
| """ |
| # JSON does a better job of escaping HTML for JavaScript than we could |
| # with string.replace(). |
| html_escaped = json.dumps(html) |
| # Use JavaScript to append the output and scroll to the bottom of the |
| # open tab. |
| self._tab.ExecuteJavaScript( |
| ('document.body.innerHTML += %s; ' % html_escaped) + |
| 'window.scrollTo(0, document.body.scrollHeight);') |
| self._tab.Activate() |
| self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() |
| return True |
| |
| |
| def append_buttons(self, *args): |
| """Append confirmation buttons to the tab. |
| |
| Each button is given an index, 0 for the first button, 1 for the second, |
| and so on. |
| |
| @param title...: Title of button to append. |
| |
| @return True. |
| |
| """ |
| html = '' |
| index = 0 |
| for title in args: |
| onclick = 'submit_button(%d)' % index |
| html += ('<input type="button" value="%s" onclick="%s">' % ( |
| cgi.escape(title), |
| cgi.escape(onclick))) |
| index += 1 |
| return self.append_output(html) |
| |
| |
| def wait_for_button(self, timeout): |
| """Wait for a button to be clicked. |
| |
| Call append_buttons() before this to add buttons to the document. |
| |
| @param timeout: Maximum time, in seconds, to wait for a click. |
| |
| @return index of button that was clicked. |
| |
| """ |
| # Wait for the button to be clicked. |
| utils.poll_for_condition( |
| condition=lambda: |
| self._tab.EvaluateJavaScript('window.__ready') == 1, |
| desc='User clicked on button.', |
| timeout=timeout) |
| # Fetch the result. |
| result = self._tab.EvaluateJavaScript('window.__result') |
| # Reset for the next button. |
| self._tab.ExecuteJavaScript( |
| 'window.__ready = 0; ' |
| 'window.__result = null;') |
| return result |
| |
| |
| def check_for_button(self): |
| """Check whether a button has been clicked. |
| |
| Call append_buttons() before this to add buttons to the document. |
| |
| @return index of button that was clicked or -1 if no button |
| has been clicked. |
| |
| """ |
| if not self._tab.EvaluateJavaScript('window.__ready'): |
| return -1 |
| # Fetch the result. |
| result = self._tab.EvaluateJavaScript('window.__result') |
| # Reset for the next button. |
| self._tab.ExecuteJavaScript( |
| 'window.__ready = 0; ' |
| 'window.__result = null;') |
| return result |
| |
| |
| def append_list(self, name): |
| """Append a results list to the contents of the tab. |
| |
| @param name: Name to use for making modifications to the list. |
| |
| @return True. |
| |
| """ |
| html = '<div id="%s"></div>' % cgi.escape(name) |
| return self.append_output(html) |
| |
| |
| def append_list_item(self, list_name, item_name, html): |
| """Append an item to a results list. |
| |
| @param list_name: Name of list provided to append_list(). |
| @param item_name: Name to use for making modifications to the item. |
| @param html: HTML to place in the list item. |
| |
| @return True. |
| |
| """ |
| # JSON does a better job of escaping HTML for JavaScript than we could |
| # with string.replace(). |
| item_html = '"<div id=\\"%s\\"></div>"' % cgi.escape(item_name) |
| # Use JavaScript to append the output. |
| self._tab.ExecuteJavaScript( |
| 'document.getElementById("%s").innerHTML += %s; ' % ( |
| cgi.escape(list_name), |
| item_html)) |
| self._tab.Activate() |
| self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() |
| return self.replace_list_item(item_name, html) |
| |
| |
| def replace_list_item(self, item_name, html): |
| """Replace an item in a results list. |
| |
| @param item_name: Name of item provided to append_list_item(). |
| @param html: HTML to place in the list item. |
| |
| @return True. |
| |
| """ |
| # JSON does a better job of escaping HTML for JavaScript than we could |
| # with string.replace(). |
| html_escaped = json.dumps(html) |
| # Use JavaScript to append the output. |
| self._tab.ExecuteJavaScript( |
| 'document.getElementById("%s").innerHTML = %s; ' % ( |
| cgi.escape(item_name), |
| html_escaped)) |
| self._tab.Activate() |
| self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() |
| return True |
| |
| |
| def close(self): |
| """Close the browser. |
| |
| @return True. |
| |
| """ |
| if hasattr(self, '_chrome'): |
| self._chrome.browser.Close() |
| return True |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| handler = logging.handlers.SysLogHandler(address='/dev/log') |
| formatter = logging.Formatter( |
| 'interactive_xmlrpc_server: [%(levelname)s] %(message)s') |
| handler.setFormatter(formatter) |
| logging.getLogger().addHandler(handler) |
| logging.debug('interactive_xmlrpc_server main...') |
| server = xmlrpc_server.XmlRpcServer( |
| 'localhost', |
| constants.INTERACTIVE_XMLRPC_SERVER_PORT) |
| server.register_delegate(InteractiveXmlRpcDelegate()) |
| server.run() |