blob: a52c5ca25a4a548fa2216d33ed580c531da9d050 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test Utils library."""
from __future__ import print_function
import os
import time
import threading
import multiprocessing
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib.paygen import utils
# We access a lot of protected members during testing.
# pylint: disable=protected-access
# Tests involving the memory semaphore should block this long.
ACQUIRE_TIMEOUT = 120
ACQUIRE_SHOULD_BLOCK_TIMEOUT = 20
class TestUtils(cros_test_lib.TempDirTestCase):
"""Test utils methods."""
@classmethod
def setUpClass(cls):
"""Class setup to run system polling quickly in semaphore tests."""
utils.MemoryConsumptionSemaphore.SYSTEM_POLLING_INTERVAL_SECONDS = 0
class MockClock(object):
"""Mock clock that is manually incremented."""
def __call__(self):
"""Return the current mock time."""
return self._now
def __init__(self):
"""Init the clock."""
self._now = 0.0
def add_time(self, n):
"""Add some amount of time."""
self._now += n
def mock_get_system_available(self, how_much):
"""Mock the system's available memory, used to override /proc."""
return lambda: how_much
def testListdirFullpath(self):
file_a = os.path.join(self.tempdir, 'a')
file_b = os.path.join(self.tempdir, 'b')
osutils.Touch(file_a)
osutils.Touch(file_b)
self.assertEqual(sorted(utils.ListdirFullpath(self.tempdir)),
[file_a, file_b])
def testReadLsbRelease(self):
"""Tests that we correctly read the lsb release file."""
path = os.path.join(self.tempdir, 'etc', 'lsb-release')
osutils.WriteFile(path, 'key=value\nfoo=bar\n', makedirs=True)
self.assertEqual(utils.ReadLsbRelease(self.tempdir),
{'key': 'value', 'foo': 'bar'})
def testMassiveMemoryConsumptionSemaphore(self):
"""Tests that we block on not having enough memory."""
# You should never get 2**64 bytes.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=2 ** 64,
single_proc_max_bytes=2 ** 64,
quiescence_time_seconds=0.0)
# You can't get that much.
self.assertEqual(_semaphore.acquire(
ACQUIRE_SHOULD_BLOCK_TIMEOUT).result, False)
def testNoMemoryConsumptionSemaphore(self):
"""Tests that you can acquire a very little amount of memory."""
# You should always get one byte.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.0)
# Sure you can have two bytes.
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testTotalMaxMemoryConsumptionSemaphore(self):
"""Tests that the total_max is respected."""
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=0,
single_proc_max_bytes=1,
quiescence_time_seconds=0.0,
total_max=3)
# Look at all this memory.
_semaphore._get_system_available = self.mock_get_system_available(2**64)
# Sure you can have three.
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
# Nope, you're now over max.
self.assertEqual(_semaphore.acquire(1).result, False)
def testQuiesceMemoryConsumptionSemaphore(self):
"""Tests that you wait for memory utilization to settle (quiesce)."""
# All you want is two bytes.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=2.0)
# Should want two bytes, have a whole lot.
_semaphore._get_system_available = self.mock_get_system_available(2**64)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
# Should want two bytes, have a whole lot (but you'll block for 2 seconds).
_semaphore._get_system_available = self.mock_get_system_available(2**64 - 2)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testUncheckedMemoryConsumptionSemaphore(self):
"""Tests that some acquires work unchecked."""
# You should never get 2**64 bytes (i wish...).
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=2**64,
single_proc_max_bytes=2**64,
quiescence_time_seconds=2.0,
unchecked_acquires=2)
# Nothing available, but we expect unchecked_acquires to allow it.
_semaphore._get_system_available = self.mock_get_system_available(0)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testQuiescenceUnblocksMemoryConsumptionSemaphore(self):
"""Test that after a period of time you unblock (due to quiescence)."""
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=2.0,
unchecked_acquires=0)
# Make large amount of memory available, but we expect quiescence
# to block the second task.
_semaphore._get_system_available = self.mock_get_system_available(2**64)
start_time = time.time()
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
# Get the lock or die trying. We spin fast here instead of ACQUIRE_TIMEOUT.
while not _semaphore.acquire(1).result:
continue
_semaphore.release()
# Check that the lock was acquired after quiescence_time_seconds.
end_time = time.time()
# Why 1.8? Because the clock isn't monotonic and we don't want to flake.
self.assertGreaterEqual(end_time - start_time, 1.8)
def testThreadedMemoryConsumptionSemaphore(self):
"""Test many threads simultaneously using the Semaphore."""
initial_memory = 6
# These are lists so we can write nonlocal.
mem_avail = [initial_memory]
good_thread_exits = [0]
mock_clock = TestUtils.MockClock()
lock, exit_lock = threading.Lock(), threading.Lock()
test_threads = 8
# Currently executes in 1.6 seconds a 2 x Xeon Gold 6154 CPUs
get_and_releases = 50
def sub_mem():
with lock:
mem_avail[0] = mem_avail[0] - 1
self.assertGreaterEqual(mem_avail[0], 0)
def add_mem():
with lock:
mem_avail[0] = mem_avail[0] + 1
self.assertGreaterEqual(mem_avail[0], 0)
def get_mem():
with lock:
return mem_avail[0]
# Ask for two bytes available each time.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.1,
unchecked_acquires=1,
clock=mock_clock)
_semaphore._get_system_available = get_mem
def hammer_semaphore():
for _ in range(get_and_releases):
while not _semaphore.acquire(0.1).result:
continue
# Simulate 'using the memory'.
sub_mem()
time.sleep(0.1)
add_mem()
_semaphore.release()
with exit_lock:
good_thread_exits[0] = good_thread_exits[0] + 1
threads = [threading.Thread(target=hammer_semaphore)
for _ in range(test_threads)]
for x in threads:
x.daemon = True
x.start()
# ~Maximum 600 seconds realtime, keeps clock ticking for overall timeout.
for _ in range(60000):
time.sleep(0.01)
mock_clock.add_time(0.1)
# Maybe we can break early? (and waste some time for other threads).
threads_dead = [not x.isAlive() for x in threads]
if all(threads_dead):
break
# If we didn't get here a thread did not exit. This is fatal and may
# indicate a deadlock has been introduced.
self.assertEqual(initial_memory, get_mem())
self.assertEqual(good_thread_exits[0], test_threads)
def testMultiProcessedMemoryConsumptionSemaphore(self):
"""Test many processes simultaneously using the Semaphore."""
initial_memory = 6
mem_avail = multiprocessing.Value('I', initial_memory, lock=True)
good_process_exits = multiprocessing.Value('I', 0, lock=True)
n_processes = 8
# Currently executes in 45 seconds a 2 x Xeon Gold 6154 CPUs
get_and_releases = 50
def sub_mem():
with mem_avail.get_lock():
mem_avail.value -= 1
self.assertGreaterEqual(mem_avail.value, 0)
def add_mem():
with mem_avail.get_lock():
mem_avail.value += 1
self.assertLessEqual(mem_avail.value, 6)
def get_mem():
with mem_avail.get_lock():
return mem_avail.value
# Ask for two bytes available each time.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.1,
unchecked_acquires=1)
_semaphore._get_system_available = get_mem
def hammer_semaphore():
for _ in range(get_and_releases):
while not _semaphore.acquire(0.1).result:
continue
# Simulate 'using the memory'.
sub_mem()
time.sleep(0.1)
add_mem()
_semaphore.release()
with good_process_exits.get_lock():
good_process_exits.value = good_process_exits.value + 1
processes = [multiprocessing.Process(target=hammer_semaphore)
for _ in range(n_processes)]
for p in processes:
p.daemon = True
p.start()
for p in processes:
p.join()
# If we didn't get here a proc did not exit. This is fatal and may
# indicate a deadlock has been introduced.
self.assertEqual(initial_memory, get_mem())
with good_process_exits.get_lock():
self.assertEqual(good_process_exits.value, n_processes)