| #!/usr/bin/env vpython3 |
| # Copyright 2020 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Unit tests for lockfile.py""" |
| |
| import logging |
| import os |
| import shutil |
| import sys |
| import tempfile |
| import threading |
| import unittest |
| |
| if sys.version_info.major == 2: |
| import mock |
| import Queue |
| else: |
| from unittest import mock |
| import queue as Queue |
| |
| DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| sys.path.insert(0, DEPOT_TOOLS_ROOT) |
| |
| from testing_support import coverage_utils |
| |
| import lockfile |
| |
| |
| class LockTest(unittest.TestCase): |
| def setUp(self): |
| self.cache_dir = tempfile.mkdtemp(prefix='lockfile') |
| self.addCleanup(shutil.rmtree, self.cache_dir, ignore_errors=True) |
| |
| def testLock(self): |
| with lockfile.lock(self.cache_dir): |
| # cached dir locked, attempt to lock it again |
| with self.assertRaises(lockfile.LockError): |
| with lockfile.lock(self.cache_dir): |
| pass |
| |
| with lockfile.lock(self.cache_dir): |
| pass |
| |
| @mock.patch('time.sleep') |
| def testLockConcurrent(self, sleep_mock): |
| '''testLockConcurrent simulates what happens when two separate processes try |
| to acquire the same file lock with timeout.''' |
| # Queues q_f1 and q_sleep are used to controll execution of individual |
| # threads. |
| q_f1 = Queue.Queue() |
| q_sleep = Queue.Queue() |
| results = Queue.Queue() |
| |
| def side_effect(arg): |
| '''side_effect is called when with l.lock is blocked. In this unit test |
| case, it comes from f2.''' |
| logging.debug('sleep: started') |
| q_sleep.put(True) |
| logging.debug('sleep: waiting for q_sleep to be consumed') |
| q_sleep.join() |
| logging.debug('sleep: waiting for result before exiting') |
| results.get(timeout=1) |
| logging.debug('sleep: exiting') |
| |
| sleep_mock.side_effect = side_effect |
| |
| def f1(): |
| '''f1 enters first in l.lock (controlled via q_f1). It then waits for |
| side_effect to put a message in queue q_sleep.''' |
| logging.debug('f1 started, locking') |
| |
| with lockfile.lock(self.cache_dir, timeout=1): |
| logging.debug('f1: locked') |
| q_f1.put(True) |
| logging.debug('f1: waiting on q_f1 to be consumed') |
| q_f1.join() |
| logging.debug('f1: done waiting on q_f1, getting q_sleep') |
| q_sleep.get(timeout=1) |
| results.put(True) |
| |
| logging.debug('f1: lock released') |
| q_sleep.task_done() |
| logging.debug('f1: exiting') |
| |
| def f2(): |
| '''f2 enters second in l.lock (controlled by q_f1).''' |
| logging.debug('f2: started, consuming q_f1') |
| q_f1.get(timeout=1) # wait for f1 to execute lock |
| q_f1.task_done() |
| logging.debug('f2: done waiting for q_f1, locking') |
| |
| with lockfile.lock(self.cache_dir, timeout=1): |
| logging.debug('f2: locked') |
| results.put(True) |
| |
| t1 = threading.Thread(target=f1) |
| t1.start() |
| t2 = threading.Thread(target=f2) |
| t2.start() |
| t1.join() |
| t2.join() |
| |
| # One result was consumed by side_effect, we expect only one in the queue. |
| self.assertEqual(1, results.qsize()) |
| sleep_mock.assert_called_once_with(1) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig( |
| level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) |
| sys.exit( |
| coverage_utils.covered_main( |
| (os.path.join(DEPOT_TOOLS_ROOT, 'git_cache.py')), |
| required_percentage=0)) |