blob: a29540ece4fccc1e304fa3a78506e922491c8538 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for lockfile.py"""
import logging
import os
import shutil
import sys
import tempfile
import threading
import unittest
if sys.version_info.major == 2:
import mock
import Queue
else:
from unittest import mock
import queue as Queue
DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, DEPOT_TOOLS_ROOT)
from testing_support import coverage_utils
import lockfile
class LockTest(unittest.TestCase):
def setUp(self):
self.cache_dir = tempfile.mkdtemp(prefix='lockfile')
self.addCleanup(shutil.rmtree, self.cache_dir, ignore_errors=True)
def testLock(self):
with lockfile.lock(self.cache_dir):
# cached dir locked, attempt to lock it again
with self.assertRaises(lockfile.LockError):
with lockfile.lock(self.cache_dir):
pass
with lockfile.lock(self.cache_dir):
pass
@mock.patch('time.sleep')
def testLockConcurrent(self, sleep_mock):
'''testLockConcurrent simulates what happens when two separate processes try
to acquire the same file lock with timeout.'''
# Queues q_f1 and q_sleep are used to controll execution of individual
# threads.
q_f1 = Queue.Queue()
q_sleep = Queue.Queue()
results = Queue.Queue()
def side_effect(arg):
'''side_effect is called when with l.lock is blocked. In this unit test
case, it comes from f2.'''
logging.debug('sleep: started')
q_sleep.put(True)
logging.debug('sleep: waiting for q_sleep to be consumed')
q_sleep.join()
logging.debug('sleep: waiting for result before exiting')
results.get(timeout=1)
logging.debug('sleep: exiting')
sleep_mock.side_effect = side_effect
def f1():
'''f1 enters first in l.lock (controlled via q_f1). It then waits for
side_effect to put a message in queue q_sleep.'''
logging.debug('f1 started, locking')
with lockfile.lock(self.cache_dir, timeout=1):
logging.debug('f1: locked')
q_f1.put(True)
logging.debug('f1: waiting on q_f1 to be consumed')
q_f1.join()
logging.debug('f1: done waiting on q_f1, getting q_sleep')
q_sleep.get(timeout=1)
results.put(True)
logging.debug('f1: lock released')
q_sleep.task_done()
logging.debug('f1: exiting')
def f2():
'''f2 enters second in l.lock (controlled by q_f1).'''
logging.debug('f2: started, consuming q_f1')
q_f1.get(timeout=1) # wait for f1 to execute lock
q_f1.task_done()
logging.debug('f2: done waiting for q_f1, locking')
with lockfile.lock(self.cache_dir, timeout=1):
logging.debug('f2: locked')
results.put(True)
t1 = threading.Thread(target=f1)
t1.start()
t2 = threading.Thread(target=f2)
t2.start()
t1.join()
t2.join()
# One result was consumed by side_effect, we expect only one in the queue.
self.assertEqual(1, results.qsize())
sleep_mock.assert_called_once_with(1)
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
sys.exit(
coverage_utils.covered_main(
(os.path.join(DEPOT_TOOLS_ROOT, 'git_cache.py')),
required_percentage=0))