Fetch autotest result keyval and parse perf data for further analysis.

This is the first part of code to analyze audio autotest results and
to figure out bad dongles. The original script was written by Owen Lin.
I'm taking over the work and continue the bad dongle fighting.

BUG=chromium:307379
TEST=1. Download job tags from cautotest as tko_query.csv
     2. python fetch_test_data.py tko_query.csv -o fetch_result
     3. python filter_test_data.py fetch_result -o filter_result

Change-Id: I5bd8820535892e966ea6f9e8299bc17023bf43ff
Reviewed-on: https://chromium-review.googlesource.com/212249
Reviewed-by: Chinyue Chen <chinyue@chromium.org>
Commit-Queue: Chinyue Chen <chinyue@chromium.org>
Tested-by: Chinyue Chen <chinyue@chromium.org>
diff --git a/audio-scripts/fetch_test_data.py b/audio-scripts/fetch_test_data.py
new file mode 100755
index 0000000..f80e821
--- /dev/null
+++ b/audio-scripts/fetch_test_data.py
@@ -0,0 +1,121 @@
+#!/usr/bin/python
+#
+# Copyright 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Fetch autotest results and parse perf data for further analysis.
+
+We need to download job tag names from cautotest for this script to download
+test data. Here is how we fetch the data for recent audio tests.
+
+Open http://cautotest/new_tko/#tab_id=spreadsheet_view
+
+Use the following query:
+
+test_name in (
+  'audio_AlsaLoopback',
+  'audio_CrasLoopback')
+AND platform not in ('mario')
+AND test_started_time > '2014-08-01'
+
+Choose "Job tag" as Rows and "Status" as Columns.
+
+Download the result as CSV file by clicking on "Export to CSV".
+'''
+
+import argparse
+import logging
+import re
+import sys
+import threading
+import utils
+
+
+# Regexp for matching test job tag.
+RE_TEST_TAG = re.compile('\d+-chromeos-test/[\w-]+')
+
+# Regexp for matching label keyval from test output. Example label format:
+#   butterfly-release/R34-5120.0.0/audio/audiovideo_LineOutToMicInLoopback
+RE_LABEL = re.compile('([\w-]+)-\w+/R(\d+)-(\d+\.\d+\.\d+)/\w+/(\w+)')
+
+# Regexp for matching perf data from test output.
+RE_PERF_KEYVAL = re.compile('(.+){perf}=(.*)')
+
+# Lock used to prevent output messages get interlaced.
+_output_lock = threading.Lock()
+
+
+def test_tag_iter(input_file):
+  for line in input_file:
+    m = RE_TEST_TAG.search(line)
+    if m is not None:
+      yield m.group(0)
+
+
+def parse_keyval(content):
+  keyval = {}
+  for line in content.splitlines():
+    key, value = line.split('=', 1)
+    keyval[key.strip()] = value.strip()
+  return keyval
+
+
+def parse_test_info_keyval(test):
+  # Get information from label keyval.
+  label = parse_keyval(utils.autotest_cat(test.tag, 'keyval'))['label']
+  match = RE_LABEL.match(label)
+  if match is None:
+    raise RuntimeError('failed to parse label: %s' % label)
+  test.platform, test.release, test.build, test.test_name = match.groups()
+
+
+def parse_perf_result_keyval(test):
+  try:
+    content = utils.autotest_cat(test.tag, '%s/results/keyval' % test.test_name)
+  except IOError:  # File not found on autotest GS storage.
+    return
+
+  for line in content.splitlines():
+    m = RE_PERF_KEYVAL.match(line)
+    if m is not None:
+      test.perf_dict[m.group(1)] = m.group(2)
+
+
+def fetch_and_print_test(tag, output):
+  try:
+    test = utils.TestObject(tag)
+    parse_test_info_keyval(test)
+    parse_perf_result_keyval(test)
+    with _output_lock:
+      output.write('%s\n' % str(test))
+  except Exception:
+    # Log the exception and continue.
+    logging.exception('failed to extract data: %s', tag)
+
+
+def parse_arguments():
+  parser = argparse.ArgumentParser(
+      description='Fetch the test results of specified tests.')
+  parser.add_argument(
+      'input', type=argparse.FileType('r'), nargs='?', default=sys.stdin,
+      help='input file, a list of tests\' tags. (default stdin)')
+  parser.add_argument(
+      '--jobs', '-j', type=int, nargs='?', default=32,
+      help='tests to fetch simultaneously (default 32)')
+  parser.add_argument(
+      '--output', '-o', type=argparse.FileType('w'), nargs='?',
+      default=sys.stdout, help='the output file. (default stdout)')
+  return parser.parse_args()
+
+
+def main():
+  args = parse_arguments()
+  job_iter = (lambda: fetch_and_print_test(t, args.output)
+              for t in test_tag_iter(args.input))
+
+  utils.run_in_pool(job_iter, pool_size=args.jobs)
+
+
+if __name__ == '__main__':
+  main()
diff --git a/audio-scripts/filter_test_data.py b/audio-scripts/filter_test_data.py
new file mode 100755
index 0000000..8c4c2d7
--- /dev/null
+++ b/audio-scripts/filter_test_data.py
@@ -0,0 +1,63 @@
+#!/usr/bin/python
+#
+# Copyright 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Filter test data.
+
+Test results without rms perf data are filtered out.
+Test specific filter logic could also be added into filter_test.
+'''
+
+import argparse
+import re
+import sys
+import utils
+
+
+RE_FORMAT_CONVERSION_RMS = re.compile(r'rms_value_\d+_\d+')
+
+
+def average(numbers):
+  total = 0
+  count = 0
+  for v in numbers:
+    total += v
+    count += 1
+  return total / count
+
+
+def filter_test(test):
+  if len(test.perf_dict) == 0:
+    return None
+  if test.test_name == 'audio_CRASFormatConversion':
+    value = average(float(v) for k, v in test.perf_dict.iteritems()
+                    if RE_FORMAT_CONVERSION_RMS.match(k))
+    test.perf_dict = {'average_rms_value': value}
+  return test
+
+
+def parse_arguments():
+  parser = argparse.ArgumentParser(description='Filter the test data.')
+  parser.add_argument(
+      'input', type=argparse.FileType('r'), nargs='?', default=sys.stdin,
+      help='input file, a list of tests\' tags. (default stdin)')
+  parser.add_argument(
+      '--output', '-o', type=argparse.FileType('w'), nargs='?',
+      default=sys.stdout, help='the output file. (default stdout)')
+  return parser.parse_args()
+
+
+def main():
+  args = parse_arguments()
+  for line in args.input:
+    test = utils.TestObject.parse(line)
+    if test is not None:
+      test = filter_test(test)
+    if test is not None:
+      args.output.write('%s\n' % str(test))
+
+
+if __name__ == '__main__':
+  main()
diff --git a/audio-scripts/utils.py b/audio-scripts/utils.py
new file mode 100644
index 0000000..6586246
--- /dev/null
+++ b/audio-scripts/utils.py
@@ -0,0 +1,105 @@
+#!/usr/bin/python
+#
+# Copyright 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Utility functions for audio scripts.'''
+
+import copy
+import logging
+import subprocess
+import threading
+
+
+AUTOTEST_GS_URL_FORMAT = 'gs://chromeos-autotest-results/%s/%s'
+
+_popen_lock = threading.Lock()
+
+
+class TestObject(object):
+  '''Object for holding test data.'''
+
+  def __init__(self, tag):
+    self.tag = tag
+    self.hostname = tag.split('/')[1]
+    self.test_name = ''
+    self.platform = ''
+    self.release = ''
+    self.build = ''
+    self.perf_dict = {}
+
+  @classmethod
+  def parse(cls, line):
+    '''Parse string of comma-separated fields into TestObject.
+
+    The string to be parsed should be in comma-separated format:
+      tag, platform, test_name, hostname, release, build, perf_dict
+
+    Example:
+      11932127-chromeos-test/chromeos4-row2-rack10-host6, link,
+      audio_CrasLoopback, chromeos4-row2-rack10-host6, 37, 5914.0.0,
+      rms_value: 0.668121
+    '''
+    values = [x.strip() for x in line.split(',')]
+    test = cls(values[0])
+    (test.platform, test.test_name, test.hostname, test.release,
+     test.build) = values[1:6]
+    test.perf_dict = dict(x.split(': ', 1) for x in values[6:])
+    return test
+
+  def __str__(self):
+    return ', '.join(str(x) for x in (
+        [self.tag, self.platform, self.test_name, self.hostname, self.release,
+         self.build] +
+        ['%s: %s' % item for item in self.perf_dict.iteritems()]))
+
+  def __repr__(self):
+    return str(self)
+
+  def clone(self):
+    return copy.copy(self)
+
+
+def run_in_pool(functions, pool_size=8):
+  lock = threading.Lock()
+
+  def next_task():
+    try:
+      with lock:
+        return next(functions)
+    except StopIteration:
+      return None
+
+  def work():
+    task = next_task()
+    while task:
+      task()
+      task = next_task()
+
+  threads = [threading.Thread(target=work) for _ in xrange(pool_size)]
+
+  for t in threads:
+    t.start()
+
+  for t in threads:
+    t.join()
+
+
+def execute(args):
+  with _popen_lock:
+    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+  out, err = p.communicate()
+  return out, err, p.wait()
+
+
+def autotest_cat(job_tag, file_path):
+  gs_url = AUTOTEST_GS_URL_FORMAT % (job_tag, file_path)
+  out, err, ret = execute(['gsutil', 'cat', gs_url])
+  if ret != 0:
+    if 'InvalidUriError' in err:
+      raise IOError(err)
+    else:
+      logging.error('command failed, return code: %d', ret)
+      raise RuntimeError(err)
+  return out