| # Copyright (c) 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import argparse |
| import copy |
| import csv |
| import logging |
| import os |
| import re |
| import shutil |
| |
| CONNECT_FAIL = object() |
| CONFIG_FAIL = object() |
| RESULTS_DIR = '/tmp/chaos' |
| |
| |
| class ChaosParser(object): |
| """Defines a parser for chaos test results""" |
| |
| def __init__(self, results_dir, create_file, print_config_failures): |
| """ Constructs a parser interface. |
| |
| @param results_dir: complete path to restuls directory for a chaos test. |
| @param create_file: True to create csv files; False otherwise. |
| @param print_config_failures: True to print the config info to stdout; |
| False otherwise. |
| |
| """ |
| self._test_results_dir = results_dir |
| self._create_file = create_file |
| self._print_config_failures = print_config_failures |
| |
| |
| def convert_set_to_string(self, set_list): |
| """Converts a set to a single string. |
| |
| @param set_list: a set to convert |
| |
| @returns a string, which is all items separated by the word 'and' |
| |
| """ |
| return_string = str() |
| for i in set_list: |
| return_string += str('%s and ' % i) |
| return return_string[:-5] |
| |
| |
| def create_csv(self, filename, data_list): |
| """Creates a file in .csv format. |
| |
| @param filename: name for the csv file |
| @param data_list: a list of all the info to write to a file |
| |
| """ |
| if not os.path.exists(RESULTS_DIR): |
| os.mkdir(RESULTS_DIR) |
| try: |
| path = os.path.join(RESULTS_DIR, filename + '.csv') |
| with open(path, 'wb') as f: |
| writer = csv.writer(f) |
| writer.writerow(data_list) |
| logging.info('Created CSV file %s', path) |
| except IOError as e: |
| logging.error('File operation failed with %s: %s', e.errno, |
| e.strerror) |
| return |
| |
| |
| def get_ap_name(self, line): |
| """Gets the router name from the string passed. |
| |
| @param line: Test ERROR string from chaos status.log |
| |
| @returns the router name or brand. |
| |
| """ |
| router_info = re.search('Router name: ([\w\s]+)', line) |
| return router_info.group(1) |
| |
| |
| def get_ap_mode_chan_freq(self, ssid): |
| """Gets the AP band from ssid using channel. |
| |
| @param ssid: A valid chaos test SSID as a string |
| |
| @returns the AP band, mode, and channel. |
| |
| """ |
| channel_security_info = ssid.split('_') |
| channel_info = channel_security_info[-2] |
| mode = channel_security_info[-3] |
| channel = int(re.split('(\d+)', channel_info)[1]) |
| # TODO Choose if we want to keep band, we never put it in the |
| # spreadsheet and is currently unused. |
| if channel in range(1, 15): |
| band = '2.4GHz' |
| else: |
| band = '5GHz' |
| return {'mode': mode.upper(), 'channel': channel, |
| 'band': band} |
| |
| |
| def generate_percentage_string(self, passed_tests, total_tests): |
| """Creates a pass percentage string in the formation x/y (zz%) |
| |
| @param passed_tests: int of passed tests |
| @param total_tests: int of total tests |
| |
| @returns a formatted string as described above. |
| |
| """ |
| percent = float(passed_tests)/float(total_tests) * 100 |
| percent_string = str(int(round(percent))) + '%' |
| return str('%d/%d (%s)' % (passed_tests, total_tests, percent_string)) |
| |
| |
| def parse_keyval(self, filepath): |
| """Parses the 'keyvalue' file to get device details. |
| |
| @param filepath: the complete path to the keyval file |
| |
| @returns a board with device name and OS version. |
| |
| """ |
| # Android information does not exist in the keyfile, add temporary |
| # information into the dictionary. crbug.com/570408 |
| lsb_dict = {'board': 'unknown', |
| 'version': 'unknown'} |
| f = open(filepath, 'r') |
| for line in f: |
| line = line.split('=') |
| if 'RELEASE_BOARD' in line[0]: |
| lsb_dict = {'board':line[1].rstrip()} |
| elif 'RELEASE_VERSION' in line[0]: |
| lsb_dict['version'] = line[1].rstrip() |
| else: |
| continue |
| f.close() |
| return lsb_dict |
| |
| |
| def parse_status_log(self, board, os_version, security, status_log_path): |
| """Parses the entire status.log file from chaos test for test failures. |
| and creates two CSV files for connect fail and configuration fail |
| respectively. |
| |
| @param board: the board the test was run against as a string |
| @param os_version: the version of ChromeOS as a string |
| @param security: the security used during the test as a string |
| @param status_log_path: complete path to the status.log file |
| |
| """ |
| # Items that can have multiple values |
| modes = list() |
| channels = list() |
| test_fail_aps = list() |
| static_config_failures = list() |
| dynamic_config_failures = list() |
| kernel_version = "" |
| fw_version = "" |
| f = open(status_log_path, 'r') |
| total = 0 |
| for line in f: |
| line = line.strip() |
| if line.startswith('START\tnetwork_WiFi'): |
| # Do not count PDU failures in total tests run. |
| if 'PDU' in line: |
| continue |
| total += 1 |
| elif 'kernel_version' in line: |
| kernel_version = re.search('[\d.]+', line).group(0) |
| elif 'firmware_version' in line: |
| fw_version = re.search('firmware_version\': \'([\w\s:().]+)', |
| line).group(1) |
| elif line.startswith('ERROR') or line.startswith('FAIL'): |
| title_info = line.split() |
| if 'reboot' in title_info: |
| continue |
| # Get the hostname for the AP that failed configuration. |
| if 'PDU' in title_info[1]: |
| continue |
| else: |
| # Get the router name, band for the AP that failed |
| # connect. |
| if 'Config' in title_info[1]: |
| failure_type = CONFIG_FAIL |
| else: |
| failure_type = CONNECT_FAIL |
| |
| if (failure_type == CONFIG_FAIL and |
| 'chromeos' in title_info[1]): |
| ssid = title_info[1].split('.')[1].split('_')[0] |
| else: |
| ssid_info = title_info[1].split('.') |
| ssid = ssid_info[1] |
| network_dict = self.get_ap_mode_chan_freq(ssid) |
| modes.append(network_dict['mode']) |
| channels.append(network_dict['channel']) |
| |
| # Security mismatches and Ping failures are not connect |
| # failures. |
| if (('Ping command' in line or 'correct security' in line) |
| or failure_type == CONFIG_FAIL): |
| if 'StaticAPConfigurator' in line: |
| static_config_failures.append(ssid) |
| else: |
| dynamic_config_failures.append(ssid) |
| else: |
| test_fail_aps.append(ssid) |
| elif ('END GOOD' in line and ('ChaosConnectDisconnect' in line or |
| 'ChaosLongConnect' in line)): |
| test_name = line.split()[2] |
| ssid = test_name.split('.')[1] |
| network_dict = self.get_ap_mode_chan_freq(ssid) |
| modes.append(network_dict['mode']) |
| channels.append(network_dict['channel']) |
| else: |
| continue |
| |
| config_pass = total - (len(dynamic_config_failures) + |
| len(static_config_failures)) |
| config_pass_string = self.generate_percentage_string(config_pass, |
| total) |
| connect_pass = config_pass - len(test_fail_aps) |
| connect_pass_string = self.generate_percentage_string(connect_pass, |
| config_pass) |
| |
| base_csv_list = [board, os_version, fw_version, kernel_version, |
| self.convert_set_to_string(set(modes)), |
| self.convert_set_to_string(set(channels)), |
| security] |
| |
| static_config_csv_list = copy.deepcopy(base_csv_list) |
| static_config_csv_list.append(config_pass_string) |
| static_config_csv_list.extend(static_config_failures) |
| |
| dynamic_config_csv_list = copy.deepcopy(base_csv_list) |
| dynamic_config_csv_list.append(config_pass_string) |
| dynamic_config_csv_list.extend(dynamic_config_failures) |
| |
| connect_csv_list = copy.deepcopy(base_csv_list) |
| connect_csv_list.append(connect_pass_string) |
| connect_csv_list.extend(test_fail_aps) |
| |
| print('Connect failure for security: %s' % security) |
| print ','.join(connect_csv_list) |
| print('\n') |
| |
| if self._print_config_failures: |
| config_files = [('Static', static_config_csv_list), |
| ('Dynamic', dynamic_config_csv_list)] |
| for config_data in config_files: |
| self.print_config_failures(config_data[0], security, |
| config_data[1]) |
| |
| if self._create_file: |
| self.create_csv('chaos_WiFi_dynamic_config_fail.' + security, |
| dynamic_config_csv_list) |
| self.create_csv('chaos_WiFi_static_config_fail.' + security, |
| static_config_csv_list) |
| self.create_csv('chaos_WiFi_connect_fail.' + security, |
| connect_csv_list) |
| |
| |
| def print_config_failures(self, config_type, security, config_csv_list): |
| """Prints out the configuration failures. |
| |
| @param config_type: string describing the configurator type |
| @param security: the security type as a string |
| @param config_csv_list: list of the configuration failures |
| |
| """ |
| # 8 because that is the lenth of the base list |
| if len(config_csv_list) <= 8: |
| return |
| print('%s config failures for security: %s' % (config_type, security)) |
| print ','.join(config_csv_list) |
| print('\n') |
| |
| |
| def traverse_results_dir(self, path): |
| """Walks through the results directory and get the pathnames for the |
| status.log and the keyval files. |
| |
| @param path: complete path to a specific test result directory. |
| |
| @returns a dict with absolute pathnames for the 'status.log' and |
| 'keyfile' files. |
| |
| """ |
| status = None |
| keyval = None |
| |
| for root, dir_name, file_name in os.walk(path): |
| for name in file_name: |
| current_path = os.path.join(root, name) |
| if name == 'status.log' and not status: |
| status = current_path |
| elif name == 'keyval' and ('param-debug_info' in |
| open(current_path).read()): |
| # This is a keyval file for a single test and not a suite. |
| keyval = os.path.join(root, name) |
| break |
| else: |
| continue |
| if not keyval: |
| raise Exception('Did Chaos tests complete successfully? Rerun tests' |
| ' with missing results.') |
| return {'status_file': status, 'keyval_file': keyval} |
| |
| |
| def parse_results_dir(self): |
| """Parses each result directory. |
| |
| For each results directory created by test_that, parse it and |
| create summary files. |
| |
| """ |
| if os.path.exists(RESULTS_DIR): |
| shutil.rmtree(RESULTS_DIR) |
| test_processed = False |
| for results_dir in os.listdir(self._test_results_dir): |
| if 'results' in results_dir: |
| path = os.path.join(self._test_results_dir, results_dir) |
| test = results_dir.split('.')[1] |
| status_key_dict = self.traverse_results_dir(path) |
| status_log_path = status_key_dict['status_file'] |
| lsb_info = self.parse_keyval(status_key_dict['keyval_file']) |
| if test is not None: |
| self.parse_status_log(lsb_info['board'], |
| lsb_info['version'], |
| test, |
| status_log_path) |
| test_processed = True |
| if not test_processed: |
| raise RuntimeError('chaos_parse: Did not find any results directory' |
| 'to process') |
| |
| |
| def main(): |
| """Main function to call the parser.""" |
| logging.basicConfig(level=logging.INFO) |
| arg_parser = argparse.ArgumentParser() |
| arg_parser.add_argument('-d', '--directory', dest='dir_name', |
| help='Pathname to results generated by test_that', |
| required=True) |
| arg_parser.add_argument('--create_file', dest='create_file', |
| action='store_true', default=False) |
| arg_parser.add_argument('--print_config_failures', |
| dest='print_config_failures', |
| action='store_true', |
| default=False) |
| arguments = arg_parser.parse_args() |
| if not arguments.dir_name: |
| raise RuntimeError('chaos_parser: No directory name supplied. Use -h' |
| ' for help') |
| if not os.path.exists(arguments.dir_name): |
| raise RuntimeError('chaos_parser: Invalid directory name supplied.') |
| parser = ChaosParser(arguments.dir_name, arguments.create_file, |
| arguments.print_config_failures) |
| parser.parse_results_dir() |
| |
| |
| if __name__ == '__main__': |
| main() |