blob: bc9ad3967025859684890bb2f6b6a607e5a03d9b [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test gslock library."""
from __future__ import print_function
import multiprocessing
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib.paygen import gslock
# We access a lot of protected members during testing.
# pylint: disable=protected-access
def _InProcessAcquire(lock_uri):
"""Acquire a lock in a sub-process, but don't release.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri: URI of the lock to acquire.
Returns:
boolean telling if this method got the lock.
"""
lock = gslock.Lock(lock_uri)
try:
lock.Acquire()
return True
except gslock.LockNotAcquired:
return False
def _InProcessDoubleAcquire(lock_uri):
"""Acquire a lock in a sub-process, and reacquire it a second time.
Do not release the lock after acquiring.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri: URI of the lock to acquire.
Returns:
int describing how many times it acquired a lock.
"""
count = 0
lock = gslock.Lock(lock_uri)
try:
lock.Acquire()
count += 1
lock.Acquire()
count += 1
except gslock.LockNotAcquired:
pass
return count
def _InProcessDataUpdate(lock_uri_data_uri):
"""Increment a number in a GS file protected by a lock.
Keeps looking until the lock is acquired, so effectively, blocking. Stores
or increments an integer in the data_uri by one, once.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri_data_uri: Tuple containing (lock_uri, data_uri). Passed as
a tuple, since multiprocessing.Pool.map only allows
a single argument in.
lock_uri: URI of the lock to acquire.
data_uri: URI of the data file to create/increment.
Returns:
boolean describing if this method got the lock.
"""
lock_uri, data_uri = lock_uri_data_uri
ctx = gs.GSContext()
# Keep trying until the lock is acquired.
while True:
try:
with gslock.Lock(lock_uri):
if ctx.Exists(data_uri):
data = int(ctx.Cat(data_uri)) + 1
else:
data = 1
ctx.CreateWithContents(data_uri, str(data))
return True
except gslock.LockNotAcquired:
pass
class GSLockTest(cros_test_lib.MockTestCase):
"""This test suite covers the GSLock file."""
# For contention tests, how many parallel workers to spawn. To really
# stress test, you can bump it up to 200, but 20 seems to provide good
# coverage w/out sucking up too many resources.
NUM_THREADS = 20
@cros_test_lib.NetworkTest()
def setUp(self):
self.ctx = gs.GSContext()
@cros_test_lib.NetworkTest()
def testLock(self):
"""Test getting a lock."""
# Force a known host name.
self.PatchObject(cros_build_lib, 'MachineDetails', return_value='TestHost')
with gs.TemporaryURL('gslock') as lock_uri:
lock = gslock.Lock(lock_uri)
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
contents = self.ctx.Cat(lock_uri)
self.assertEqual(contents, 'TestHost')
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
@cros_test_lib.NetworkTest()
def testLockRepetition(self):
"""Test aquiring same lock multiple times."""
# Force a known host name.
self.PatchObject(cros_build_lib, 'MachineDetails', return_value='TestHost')
with gs.TemporaryURL('gslock') as lock_uri:
lock = gslock.Lock(lock_uri)
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
@cros_test_lib.NetworkTest()
def testLockConflict(self):
"""Test lock conflict."""
with gs.TemporaryURL('gslock') as lock_uri:
lock1 = gslock.Lock(lock_uri)
lock2 = gslock.Lock(lock_uri)
# Manually lock 1, and ensure lock2 can't lock.
lock1.Acquire()
self.assertRaises(gslock.LockNotAcquired, lock2.Acquire)
lock1.Release()
# Use a with clause on 2, and ensure 1 can't lock.
with lock2:
self.assertRaises(gslock.LockNotAcquired, lock1.Acquire)
# Ensure we can renew a given lock.
lock1.Acquire()
lock1.Renew()
lock1.Release()
# Ensure we get an error renewing a lock we don't hold.
self.assertRaises(gslock.LockNotAcquired, lock1.Renew)
@cros_test_lib.NetworkTest()
def testLockTimeout(self):
"""Test getting a lock when an old timed out one is present."""
with gs.TemporaryURL('gslock') as lock_uri:
# Both locks are always timed out.
lock1 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
lock2 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
lock1.Acquire()
lock2.Acquire()
@cros_test_lib.NetworkTest()
def testRaceToAcquire(self):
"""Have lots of processes race to acquire the same lock."""
count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
with gs.TemporaryURL('gslock') as lock_uri:
results = pool.map(_InProcessAcquire, [lock_uri] * count)
# Clean up the lock since the processes explicitly only acquire.
self.ctx.Remove(lock_uri)
# Ensure that only one of them got the lock.
self.assertEqual(results.count(True), 1)
@cros_test_lib.NetworkTest()
def testRaceToDoubleAcquire(self):
"""Have lots of processes race to double acquire the same lock."""
count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
with gs.TemporaryURL('gslock') as lock_uri:
results = pool.map(_InProcessDoubleAcquire, [lock_uri] * count)
# Clean up the lock sinc the processes explicitly only acquire.
self.ctx.Remove(lock_uri)
# Ensure that only one of them got the lock (and got it twice).
self.assertEqual(results.count(0), count - 1)
self.assertEqual(results.count(2), 1)
@cros_test_lib.NetworkTest()
def testMultiProcessDataUpdate(self):
"""Have lots of processes update a GS file proctected by a lock."""
count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
with gs.TemporaryURL('gslock') as lock_uri:
data_uri = lock_uri + '.data'
results = pool.map(_InProcessDataUpdate,
[(lock_uri, data_uri)] * count)
self.assertEqual(self.ctx.Cat(data_uri), str(count))
# Ensure that all report success
self.assertEqual(results.count(True), count)
@cros_test_lib.NetworkTest()
def testDryrunLock(self):
"""Ensure that lcok can be obtained and released in dry-run mode."""
with gs.TemporaryURL('gslock') as lock_uri:
lock = gslock.Lock(lock_uri, dry_run=True)
self.assertIsNone(lock.Acquire())
self.assertFalse(self.ctx.Exists(lock_uri))
self.assertIsNone(lock.Release())
@cros_test_lib.NetworkTest()
def testDryrunLockRepetition(self):
"""Test aquiring same lock multiple times in dry-run mode."""
with gs.TemporaryURL('gslock') as lock_uri:
lock = gslock.Lock(lock_uri, dry_run=True)
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Release())
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Release())