blob: 72bda8101e03e90e8b51ce69bb02127f4f914a87 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Handle google gdata spreadsheet service."""
import glob
import os
import shutil
import subprocess
import sys
import tempfile
import gdata.gauth
import gdata.spreadsheets.client
from authenticator import SpreadsheetAuthorizer
import mtb
import test_conf as conf
from common_util import print_and_exit
from firmware_constants import GV
SHEET_DESCRIPTION = ('This sheet is autogenerated from autotest/client/'
'site_tests/firmware_TouchMTB/spreadsheet.py')
# 'Touchpad relative pressure readings' spreadsheet key which is copied from the
# url of the spreadsheet https://docs.google.com/a/google.com/spreadsheet/ccc?
# key=0Ah6uZRmm2hgYdG4wX0JaQkVqa2gybTQwMnRfNmxsR1E
TARGET_SPREADSHEET_KEY = '0Ah6uZRmm2hgYdG4wX0JaQkVqa2gybTQwMnRfNmxsR1E'
RESULTS = ['result:', '=B4*I5+J5', '=C4*I5+J5', '=D4*I5+J5', '=E4*I5+J5',
'=F4*I5+J5', '=G4*I5+J5', '=H4*I5+J5']
ACTUALS = ['actual:', '11.945895975', '25.517564775', '46.566217775',
'76.976808975', '107.513063775', '151.746650975', '248.8453439']
AVERAGES = ['mean:', '=AVERAGE(B6:B405)', '=AVERAGE(C6:C405)',
'=AVERAGE(D6:D405)', '=AVERAGE(E6:E405)', '=AVERAGE(F6:F405)',
'=AVERAGE(G6:G405)', '=AVERAGE(H6:H405)', 'slope:', 'icept']
HEADERS = ['', 'size0', 'size1', 'size2', 'size3', 'size4', 'size5', 'size6',
'=Regress(B4:F4,B3:F3)', '=CONTINUE(I5, 1, 2)']
MEASURED = ['H4', 'G4', 'F4', 'E4', 'D4', 'C4', 'B4']
COMPUTED = ['H2', 'G2', 'F2', 'E2', 'D2', 'C2', 'B2']
CELLS = [
RESULTS,
ACTUALS,
AVERAGES,
HEADERS
]
CROSS_SHEET_CELL = '=\'%s\'!%s'
SUMMARY_WORKSHEET_TITLE = 'Summary'
TABLE_COMPUTED = 'Computed Pressures'
TABLE_MEASURED = 'Measured Pressures'
flag_verbose = False
ACTUAL_SURFACE_AREA = [ '11.945895975', '25.517564775', '46.566217775',
'76.976808975', '107.513063775', '151.746650975',
'248.8453439' ]
results_table = { 'A': 'device', 'B': 'size6', 'C': 'size5', 'D': 'size4',
'E': 'size3', 'F': 'size2', 'G': 'size1', 'H': 'size0' }
surface_title = [ 'surface area', '248.8453439', '151.746650975',
'107.513063775', '76.976808975', '46.566217775',
'25.517564775', '11.945895975' ]
diameter_title = [ 'diameter (mm)', '17.8', '13.9', '11.7', '9.9', '7.7',
'5.7', '3.9']
def print_verbose(msg):
"""Print the message if flag_verbose is True.
@param msg: the message to print
"""
if flag_verbose:
print msg
class GestureEventFiles:
"""Get gesture event files and parse the pressure values."""
DEFAULT_RESULT_DIR = 'latest'
FILE_PATTERN = '{}.%s*.dat'.format(conf.PRESSURE_CALIBRATION)
PRESSURE_LIST_MAX_SIZE = 80 * 5
def __init__(self):
self._get_machine_ip()
self._get_result_dir()
self._get_gesture_event_files()
self._get_event_pressures()
self._get_list_of_pressure_dicts()
def __del__(self):
self._cleanup()
def _cleanup(self):
"""Remove the temporary directory that holds the gesture event files."""
if os.path.isdir(self.event_dir):
print 'Removing tmp directory "%s" .... ' % self.event_dir
try:
shutil.rmtree(self.event_dir)
except Exception as e:
msg = 'Error in removing tmp directory ("%s"): %s'
print_and_exit(msg % (self.event_dir, e))
def _cleanup_and_exit(self, err_msg):
"""Clean up and exit with the error message.
@param err_msg: the error message to print
"""
self._cleanup()
print_and_exit(err_msg)
def _get_machine_ip(self):
"""Get the ip address of the chromebook machine."""
msg = '\nEnter the ip address (xx.xx.xx.xx) of the chromebook machine: '
self.machine_ip = raw_input(msg)
def _get_result_dir(self):
"""Get the test result directory located in the chromebook machine."""
print '\nEnter the test result directory located in the machine.'
print 'It is a directory under %s' % conf.log_root_dir
print ('If you have just performed the pressure calibration test '
'on the machine,\n' 'you could just press ENTER to use '
'the default "latest" directory.')
result_dir = raw_input('Enter test result directory: ')
if result_dir == '':
result_dir = self.DEFAULT_RESULT_DIR
self.result_dir = os.path.join(conf.log_root_dir, result_dir)
def _get_gesture_event_files(self):
"""Scp the gesture event files in the result_dir in machine_ip."""
try:
self.event_dir = tempfile.mkdtemp(prefix='touch_firmware_test_')
except Exception as e:
err_msg = 'Error in creating tmp directory (%s): %s'
self._cleanup_and_exit(err_msg % (self.event_dir, e))
# Try to scp the gesture event files from the chromebook machine to
# the event_dir created above on the host.
# An example gesture event file looks like
# pressure_calibration.size0-lumpy-fw_11.27-calibration-20130307.dat
filepath = os.path.join(self.result_dir, self.FILE_PATTERN % '')
cmd = 'scp root@%s:%s %s' % (self.machine_ip, filepath, self.event_dir)
try:
print ('scp gesture event files from "machine_ip:%s" to %s\n' %
(self.machine_ip, self.event_dir))
subprocess.call(cmd.split())
except subprocess.CalledProcessError as e:
self._cleanup_and_exit('Error in executing "%s": %s' % (cmd, e))
def _get_event_pressures(self):
"""Parse the gesture event files to get the pressure values."""
self.pressures = {}
self.len_pressures = {}
for s in GV.SIZE_LIST:
# Get the gesture event file for every finger size.
filepath = os.path.join(self.event_dir, self.FILE_PATTERN % s)
event_files = glob.glob(filepath)
if not event_files:
err_msg = 'Error: there is no gesture event file for size %s'
self._cleanup_and_exit(err_msg % s)
# Use the latest event file for the size if there are multiple ones.
event_files.sort()
event_file = event_files[-1]
# Get the list of pressures in the event file.
mtb_packets = mtb.get_mtb_packets_from_file(event_file)
target_slot = 0
list_z = mtb_packets.get_slot_data(target_slot, 'pressure')
len_z = len(list_z)
if self.PRESSURE_LIST_MAX_SIZE > len_z:
bgn_index = 0
end_index = len_z
else:
# Get the middle segment of the list of pressures.
bgn_index = (len_z - self.PRESSURE_LIST_MAX_SIZE) / 2
end_index = (len_z + self.PRESSURE_LIST_MAX_SIZE) / 2
self.pressures[s] = list_z[bgn_index : end_index]
self.len_pressures[s] = len(self.pressures[s])
def _get_list_of_pressure_dicts(self):
"""Get a list of pressure dictionaries."""
self.list_of_pressure_dicts = []
for index in range(max(self.len_pressures.values())):
pressure_dict = {}
for s in GV.SIZE_LIST:
if index < self.len_pressures[s]:
pressure_dict[s] = str(self.pressures[s][index])
self.list_of_pressure_dicts.append(pressure_dict)
print_verbose(' row %4d: %s' % (index, str(pressure_dict)))
class PressureSpreadsheet(object):
"""A spreadsheet class to perform pressures calibration in worksheets."""
WORKSHEET_ROW_COUNT = 1000
WORKSHEET_COL_COUNT = 20
START_ROW_NUMBER = 2
DATA_BEGIN_COLUMN = 2
COMPUTED_TABLE_ROW = 12
MEASURED_TABLE_ROW = 3
def __init__(self, worksheet_title):
"""Initialize the spreadsheet and the worksheet
@param spreadsheet_title: the spreadsheet title
@param worksheet_title: the worksheet title
"""
self.computed_table = None
self.feed = None
self.measured_table = None
self.number_records = 0
self.spreadsheet_key = TARGET_SPREADSHEET_KEY
self.ss_client = gdata.spreadsheets.client.SpreadsheetsClient()
self.summary_feed = None
self.worksheet_title = worksheet_title
authorizer = SpreadsheetAuthorizer()
if not authorizer.authorize(self.ss_client):
raise "Please check the access permission of the spreadsheet"
self._get_new_worksheet_by_title(worksheet_title)
self._get_summary_tables()
def _get_worksheet_entry_by_title(self, worksheet_title):
"""Check if the worksheet title exists?"""
worksheet_feed = self.ss_client.get_worksheets(self.spreadsheet_key)
for entry in worksheet_feed.entry:
if entry.title.text == worksheet_title:
return entry
return None
def _get_new_worksheet_by_title(self, worksheet_title,
row_count=WORKSHEET_ROW_COUNT,
col_count=WORKSHEET_COL_COUNT):
"""Create a new worksheet using the title.
If the worksheet title already exists, using a new title name such as
"Copy n of title", where n = 2, 3, ..., MAX_TITLE_DUP + 1
@param title: the worksheet title
@param row_count: the number of rows in the worksheet
@param col_count: the number of columns in the worksheet
"""
MAX_TITLE_DUP = 10
new_worksheet_title = worksheet_title
for i in range(2, MAX_TITLE_DUP + 2):
if not self._get_worksheet_entry_by_title(new_worksheet_title):
break
new_worksheet_title = 'Copy %d of %s' % (i, worksheet_title)
self.worksheet_title = new_worksheet_title
else:
msg = 'Too many duplicate copies of the worksheet title: %s.'
print_and_exit(msg % worksheet_title)
# Add the new worksheet and get the worksheet_id.
worksheet_entry = self.ss_client.add_worksheet(self.spreadsheet_key,
new_worksheet_title,
row_count,
col_count)
worksheet_id = worksheet_entry.get_worksheet_id()
self.feed = gdata.spreadsheets.data.build_batch_cells_update(
self.spreadsheet_key, worksheet_id)
self.feed.add_set_cell(1, 1, SHEET_DESCRIPTION)
def _insert_pressure_data(self, list_of_pressure_dicts):
"""Insert the lists of pressures of all finger sizes to a worksheet.
@param list_of_pressure_dicts: a list of pressure dictionaries
"""
# Set column headers for figner sizes
for row in range(len(CELLS)):
for column in range(len(CELLS[row])):
self.feed.add_set_cell(self.START_ROW_NUMBER + row,
column + 1,
CELLS[row][column])
# Insert the pressures row by row
row = self.START_ROW_NUMBER + len(CELLS)
for pressure_dict in list_of_pressure_dicts:
print_verbose(' %s' % str(pressure_dict))
col = self.DATA_BEGIN_COLUMN
for size in GV.SIZE_LIST:
if size in pressure_dict:
self.feed.add_set_cell(row, col, pressure_dict[size])
col = col + 1
row = row + 1
def _insert_summary_record(self, table, row_source):
""" Paste a record into measured/computed table in summary sheet from
a row in source sheet.
Append one record in the table first as the gdata api does not allow
formula in the record data for add_record(). In other word, we have
to fill data with add_set_cell() one-by-one instead.
@param table: Target table entry
@param row_source: row in source sheet
"""
record = { 'device' : self.worksheet_title }
self.ss_client.add_record(self.spreadsheet_key,
table.get_table_id(),
record)
row_target = int(table.data.start_row) + int(table.data.num_rows)
# As there will be one record inserted measured table, we need to
# increase the row number by one for computed_table
if table == self.computed_table:
row_target = row_target + 1
for i in range(len(row_source)):
formula = CROSS_SHEET_CELL % (self.worksheet_title, row_source[i])
self.summary_feed.add_set_cell(row_target,
i + 2,
formula)
def insert_pressures_to_worksheet(self, list_of_pressure_dicts):
"""Insert the lists of pressures of all finger sizes to a new worksheet.
@param list_of_pressure_dicts: a list of pressure dictionaries
"""
self.number_records = len(list_of_pressure_dicts)
print 'Insert the data to device worksheet...'
self._insert_pressure_data(list_of_pressure_dicts)
print 'Finalizing the insertion...'
self.ss_client.batch(self.feed, force=True)
print 'Insert the data to summary worksheet...'
self._insert_summary_record(self.computed_table, COMPUTED)
self._insert_summary_record(self.measured_table, MEASURED)
print 'Finalizing the insertion...'
self.ss_client.batch(self.summary_feed, force=True)
def _get_summary_table(self, title, header_row, start_row):
"""Insert the lists of pressures of all finger sizes to a new worksheet.
@param title: Title of the table
@param header_row: Row of the header in the table
@param start_row: starting row of data in the table
"""
tables = self.ss_client.get_tables(self.spreadsheet_key)
for table in tables.entry:
if table.title.text == title:
return table
# table is not created yet
table = self.ss_client.add_table(self.spreadsheet_key,
title,
title,
SUMMARY_WORKSHEET_TITLE,
header_row,
0,
start_row,
gdata.spreadsheets.data.INSERT_MODE,
results_table)
# add additional table descriptions
if title == 'Computed Pressures':
row = self.COMPUTED_TABLE_ROW - 1
for col in range(len(surface_title)):
self.summary_feed.add_set_cell(row, col + 1, surface_title[col])
for col in range(len(diameter_title)):
self.summary_feed.add_set_cell(row + 1, col + 1,
diameter_title[col])
return table
def _delete_nonexist_table(self, title):
"""Remove the nonexist table entries.
@param title: Title of the table
"""
tables = self.ss_client.get_tables(self.spreadsheet_key)
for table in tables.entry:
if table.title.text == title:
self.ss_client.delete(table)
def _get_summary_tables(self):
"""Insert the results of all finger sizes to summary worksheet."""
entry = self._get_worksheet_entry_by_title(SUMMARY_WORKSHEET_TITLE)
if not entry:
self._delete_nonexist_table(TABLE_MEASURED)
self._delete_nonexist_table(TABLE_COMPUTED)
entry = self.ss_client.add_worksheet(self.spreadsheet_key,
SUMMARY_WORKSHEET_TITLE,
self.WORKSHEET_ROW_COUNT,
self.WORKSHEET_COL_COUNT)
self.summary_feed = gdata.spreadsheets.data.build_batch_cells_update(
self.spreadsheet_key, entry.get_worksheet_id())
table_row = self.MEASURED_TABLE_ROW
self.measured_table = self._get_summary_table(TABLE_MEASURED,
table_row,
table_row + 1)
table_row = self.COMPUTED_TABLE_ROW
self.computed_table = self._get_summary_table(TABLE_COMPUTED,
table_row,
table_row + 1)
def get_worksheet_title():
"""Get the worksheet title."""
worksheet_title = ''
while not worksheet_title:
print '\nInput the name of the new worksheet to insert the events.'
print ('This is usually the board name with the firmware version, '
'e.g., Lumpy 11.27')
worksheet_title = raw_input('Input the new worksheet name: ')
return worksheet_title
def main():
"""Parse the gesture events and insert them to the spreadsheet."""
# Get the gesture event files and parse the events.
list_of_pressure_dicts = GestureEventFiles().list_of_pressure_dicts
# Access the spreadsheet, and create a new worksheet to insert the events.
worksheet_title = get_worksheet_title()
ss = PressureSpreadsheet(worksheet_title)
ss.insert_pressures_to_worksheet(list_of_pressure_dicts)
if __name__ == '__main__':
argc = len(sys.argv)
if argc == 2 and sys.argv[1] == '-v':
flag_verbose = True
elif argc > 2 or argc == 2:
print_and_exit('Usage: %s [-v]' % sys.argv[0])
main()