| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Test the locking library.""" |
| |
| from __future__ import print_function |
| |
| import itertools |
| import multiprocessing |
| import os |
| import sys |
| import time |
| |
| from chromite.lib import cros_test_lib |
| from chromite.lib import locking |
| from chromite.lib import osutils |
| |
| |
| LOCK_ACQUIRED = 5 |
| LOCK_NOT_ACQUIRED = 6 |
| |
| |
| class LockingTest(cros_test_lib.TempDirTestCase): |
| """Test the Locking class.""" |
| |
| def setUp(self): |
| self.lock_file = os.path.join(self.tempdir, 'lockfile') |
| |
| def _HelperSingleLockTest(self, blocking, shared, locktype): |
| """Helper method that runs a basic test with/without blocking/sharing.""" |
| self.assertFalse(os.path.exists(self.lock_file)) |
| |
| lock = locking.FileLock( |
| self.lock_file, blocking=blocking, locktype=locktype) |
| self.assertFalse(lock.IsLocked()) |
| |
| lock.write_lock() |
| self.assertTrue(lock.IsLocked()) |
| self.assertTrue(os.path.exists(self.lock_file)) |
| |
| # Acquiring the lock again should be safe. |
| lock.lock(shared) |
| self.assertTrue(lock.IsLocked()) |
| |
| lock.close() |
| self.assertFalse(lock.IsLocked()) |
| |
| osutils.SafeUnlink(self.lock_file) |
| |
| def _HelperInsideProcess(self, blocking, shared, locktype=locking.LOCKF): |
| """Helper method that runs a basic test with/without blocking.""" |
| try: |
| lock = locking.FileLock( |
| self.lock_file, blocking=blocking, locktype=locktype) |
| with lock.lock(shared): |
| pass |
| sys.exit(LOCK_ACQUIRED) |
| except locking.LockNotAcquiredError: |
| sys.exit(LOCK_NOT_ACQUIRED) |
| |
| def _HelperStartProcess(self, blocking=False, shared=False): |
| """Create a process and invoke _HelperInsideProcess in it.""" |
| p = multiprocessing.Process(target=self._HelperInsideProcess, |
| args=(blocking, shared)) |
| p.start() |
| |
| # It's highly probably that p will have tried to grab the lock before the |
| # timer expired, but not certain. |
| time.sleep(0.1) |
| |
| return p |
| |
| def _HelperWithProcess(self, expected, blocking=False, shared=False, |
| locktype=locking.LOCKF): |
| """Create a process and invoke _HelperInsideProcess in it.""" |
| p = multiprocessing.Process(target=self._HelperInsideProcess, |
| args=(blocking, shared, locktype)) |
| p.start() |
| p.join() |
| self.assertEquals(p.exitcode, expected) |
| |
| def testSingleLock(self): |
| """Just test getting releasing a lock with options.""" |
| arg_list = [ |
| [True, False], # blocking |
| [True, False], # shared |
| [locking.FLOCK, locking.LOCKF], # locking mechanism |
| ] |
| for args in itertools.product(*arg_list): |
| self._HelperSingleLockTest(*args) |
| |
| def testDoubleLockWithFlock(self): |
| """Tests that double locks do block with flock.""" |
| lock1 = locking.FileLock( |
| self.lock_file, blocking=False, locktype=locking.FLOCK) |
| lock2 = locking.FileLock( |
| self.lock_file, blocking=False, locktype=locking.FLOCK) |
| |
| with lock1.write_lock(): |
| self.assertTrue(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| self.assertRaises(locking.LockNotAcquiredError, lock2.write_lock) |
| self.assertTrue(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| self.assertFalse(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| lock2.unlock() |
| self.assertFalse(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| def testDoubleLockWithLockf(self): |
| """Tests that double locks don't block with lockf.""" |
| lock1 = locking.FileLock( |
| self.lock_file, blocking=False, locktype=locking.LOCKF) |
| lock2 = locking.FileLock( |
| self.lock_file, blocking=False, locktype=locking.LOCKF) |
| with lock1.write_lock(): |
| self.assertTrue(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| # With lockf, we can lock the same file twice in the same process. |
| with lock2.write_lock(): |
| self.assertTrue(lock1.IsLocked()) |
| self.assertTrue(lock2.IsLocked()) |
| |
| self.assertFalse(lock1.IsLocked()) |
| self.assertFalse(lock2.IsLocked()) |
| |
| def testContextMgr(self): |
| """Make sure we behave properly with 'with'.""" |
| # Create an instance, and use it in a with. |
| prelock = locking.FileLock(self.lock_file) |
| self._HelperWithProcess(expected=LOCK_ACQUIRED) |
| |
| with prelock.write_lock() as lock: |
| # Assert the instance didn't change. |
| self.assertIs(prelock, lock) |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| self._HelperWithProcess(expected=LOCK_ACQUIRED) |
| |
| # Construct the instance in the with expression. |
| with locking.FileLock(self.lock_file).write_lock() as lock: |
| self.assertIsInstance(lock, locking.FileLock) |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| self._HelperWithProcess(expected=LOCK_ACQUIRED) |
| |
| def testAcquireBeforeWith(self): |
| """Sometimes you want to grab a lock and then return it into 'with'.""" |
| lock = locking.FileLock(self.lock_file, blocking=False) |
| |
| lock.write_lock() |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| with lock: |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| self._HelperWithProcess(expected=LOCK_ACQUIRED) |
| |
| def testSingleProcessLock(self): |
| """Test grabbing the same lock in processes with no conflicts.""" |
| arg_list = [ |
| [LOCK_ACQUIRED], |
| [True, False], # blocking |
| [True, False], # shared |
| [locking.FLOCK, locking.LOCKF], # locking mechanism |
| ] |
| for args in itertools.product(*arg_list): |
| self._HelperWithProcess(*args) |
| |
| def testNonBlockingConflicts(self): |
| """Test that we get a lock conflict for non-blocking locks.""" |
| with locking.FileLock(self.lock_file).write_lock(): |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED, shared=True) |
| |
| # Can grab it after it's released. |
| self._HelperWithProcess(expected=LOCK_ACQUIRED) |
| |
| def testSharedLocks(self): |
| """Test lock conflict for blocking locks.""" |
| # Intial lock is NOT shared. |
| with locking.FileLock(self.lock_file).write_lock(): |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED, shared=True) |
| |
| # Intial lock IS shared. |
| with locking.FileLock(self.lock_file).read_lock(): |
| self._HelperWithProcess(expected=LOCK_ACQUIRED, shared=True) |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED, |
| shared=False) |
| |
| def testBlockingConflicts(self): |
| """Test lock conflict for blocking locks.""" |
| # Intial lock is blocking, exclusive. |
| with locking.FileLock(self.lock_file, blocking=True).write_lock(): |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED, blocking=False) |
| |
| p = self._HelperStartProcess(blocking=True, shared=False) |
| |
| # when the with clause exits, p should unblock and get the lock, setting |
| # its exit code to sucess now. |
| p.join() |
| self.assertEquals(p.exitcode, LOCK_ACQUIRED) |
| |
| # Intial lock is NON blocking. |
| with locking.FileLock(self.lock_file, blocking=False).write_lock(): |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| |
| p = self._HelperStartProcess(blocking=True, shared=False) |
| |
| # when the with clause exits, p should unblock and get the lock, setting |
| # it's exit code to sucess now. |
| p.join() |
| self.assertEquals(p.exitcode, LOCK_ACQUIRED) |
| |
| # Intial lock is shared, blocking lock is exclusive. |
| with locking.FileLock(self.lock_file, blocking=False).read_lock(): |
| self._HelperWithProcess(expected=LOCK_NOT_ACQUIRED) |
| self._HelperWithProcess(expected=LOCK_ACQUIRED, shared=True) |
| |
| p = self._HelperStartProcess(blocking=True, shared=False) |
| q = self._HelperStartProcess(blocking=True, shared=False) |
| |
| # when the with clause exits, p should unblock and get the lock, setting |
| # it's exit code to sucess now. |
| p.join() |
| self.assertEquals(p.exitcode, LOCK_ACQUIRED) |
| q.join() |
| self.assertEquals(p.exitcode, LOCK_ACQUIRED) |
| |
| |
| class PortableLinkLockTest(cros_test_lib.TempDirTestCase): |
| """Test locking.PortableLinkLock class.""" |
| |
| def tearDown(self): |
| """Looks for leaked files from the locking process.""" |
| leaked_files = os.listdir(self.tempdir) |
| self.assertFalse(leaked_files, |
| 'Found unexpected leaked files from locking: %r' % |
| leaked_files) |
| |
| def testLockExclusivity(self): |
| """Test that when we have a lock, someone else can't grab it.""" |
| lock_path = os.path.join(self.tempdir, 'locked_file') |
| with locking.PortableLinkLock(lock_path, max_retry=0): |
| with self.assertRaises(locking.LockNotAcquiredError): |
| with locking.PortableLinkLock(lock_path, max_retry=5, sleep=0.1): |
| self.fail('We acquired a lock twice?') |
| |
| def testCanUnlock(self): |
| """Test that we release locks correctly.""" |
| lock_path = os.path.join(self.tempdir, 'locked_file') |
| with locking.PortableLinkLock(lock_path, max_retry=0): |
| pass |
| with locking.PortableLinkLock(lock_path, max_retry=0): |
| pass |