blob: 60bcf71d27a46572d9d42c925584ae8e13e33ce2 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import gtest_parallel
import unittest
class LoggerMock(object):
def __init__(self):
self.runtimes = collections.defaultdict(list)
self.exit_codes = collections.defaultdict(list)
self.last_execution_times = collections.defaultdict(list)
self.execution_numbers = collections.defaultdict(list)
def log_exit(self, task):
self.runtimes[task.test_id].append(task.runtime_ms)
self.exit_codes[task.test_id].append(task.exit_code)
self.last_execution_times[task.test_id].append(task.last_execution_time)
self.execution_numbers[task.test_id].append(task.execution_number)
class TimesMock(object):
def __init__(self):
self.last_execution_times = collections.defaultdict(list)
def record_test_time(self, test_binary, test_name, last_execution_time):
test_id = (test_binary, test_name)
self.last_execution_times[test_id].append(last_execution_time)
class TestResultsMock(object):
def __init__(self):
self.results = []
def log(self, test_name, runtime_ms, actual_result):
self.results.append((test_name, runtime_ms, actual_result))
class TaskMockFactory(object):
def __init__(self, test_data):
self.data = test_data
self.passed = []
self.failed = []
def get_task(self, test_id, execution_number=0):
task = TaskMock(test_id, execution_number, self.data[test_id])
if task.exit_code == 0:
self.passed.append(task)
else:
self.failed.append(task)
return task
def __call__(self, test_binary, test_name, test_command, execution_number,
last_execution_time, output_dir):
return self.get_task((test_binary, test_name), execution_number)
class TaskMock(object):
def __init__(self, test_id, execution_number, test_data):
self.test_id = test_id
self.execution_number = execution_number
self.runtime_ms = test_data['runtime_ms'][execution_number]
self.exit_code = test_data['exit_code'][execution_number]
self.last_execution_time = (
test_data['last_execution_time'][execution_number])
self.test_command = None
self.output_dir = None
self.test_binary = test_id[0]
self.test_name = test_id[1]
self.task_id = (test_id[0], test_id[1], execution_number)
def run(self):
pass
class TestTaskManager(unittest.TestCase):
def setUp(self):
self.times = TimesMock()
self.logger = LoggerMock()
self.test_results = TestResultsMock()
self.test_data = [
# Passing task
(('fake_binary', 'Fake.PassingTest'), {
'runtime_ms': [10],
'exit_code': [0],
'last_execution_time': [10],
}),
# Fails once, then succeeds
(('another_binary', 'Fake.Test.FailOnce'), {
'runtime_ms': [21, 22],
'exit_code': [3, 0],
'last_execution_time': [None, 22],
}),
# Fails twice, then succeeds
(('yet_another_binary', 'Fake.Test.FailTwice'), {
'runtime_ms': [23, 25, 24],
'exit_code': [2, 2, 0],
'last_execution_time': [None, None, 24],
}),
# Failing task
(('fake_binary', 'Fake.FailingTest'), {
'runtime_ms': [20, 30, 40],
'exit_code': [1, 1, 1],
'last_execution_time': [None, None, None],
})
]
def test_run_task_basic(self):
repeat = 1
retry_failed = 0
task_mock_factory = TaskMockFactory(dict(self.test_data))
task_manager = gtest_parallel.TaskManager(
self.times, self.logger, self.test_results,
task_mock_factory, retry_failed, repeat)
for test_id, expected in self.test_data:
task = task_mock_factory.get_task(test_id)
task_manager.run_task(task)
self.assertEqual(len(task_manager.started), 0)
self.assertListEqual(
sorted(task.task_id for task in task_manager.passed),
sorted(task.task_id for task in task_mock_factory.passed))
self.assertListEqual(
sorted(task.task_id for task in task_manager.failed),
sorted(task.task_id for task in task_mock_factory.failed))
self.assertEqual(task_manager.global_exit_code, 1)
if __name__ == '__main__':
unittest.main()