blob: 9c7e6c9c31be296863ea966e42184ec04adedfc5 [file] [log] [blame] [edit]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import contextlib
import errno
import fcntl
import logging
import os
import sys
import unittest
import mock
if sys.version_info.major < 3:
import subprocess32 as subprocess
else:
import subprocess
from cros_venv import testcases
from cros_venv import flock
logger = logging.getLogger(__name__)
_log_handler = None
def setUpModule():
global _log_handler
root = logging.getLogger()
root.setLevel(logging.DEBUG)
_log_handler = logging.StreamHandler()
_log_handler.setFormatter(logging.Formatter(
'%(asctime)s:%(levelname)s:%(message)s'))
root.addHandler(_log_handler)
def tearDownModule():
root = logging.getLogger()
root.removeHandler(_log_handler)
root.setLevel(logging.WARNING)
class ProcDirTestCase(unittest.TestCase):
"""Tests for _ProcDir."""
def test_getpath(self):
"""Test getpath()."""
got = flock._procdir.getpath(123)
self.assertEqual(got, '/proc/123')
def test_getpid(self):
"""Test getpid()."""
got = flock._procdir.getpid('/proc/123')
self.assertEqual(got, 123)
class IsPidRunningTestCase(unittest.TestCase):
"""Tests for _is_pid_running()."""
@mock.patch('os.kill')
def test_no_such_process(self, kill):
"""Test _is_pid_running() when no such process."""
kill.side_effect = OSError(errno.ESRCH, '')
self.assertFalse(flock._is_pid_running(1))
@mock.patch('os.kill')
def test_operation_not_permitted(self, kill):
"""Test _is_pid_running() when operation not permitted."""
kill.side_effect = OSError(errno.EPERM, '')
self.assertTrue(flock._is_pid_running(1))
@mock.patch('os.kill')
def test_okay(self, _kill):
"""Test _is_pid_running() when operation not permitted."""
self.assertTrue(flock._is_pid_running(1))
class FileLockTestCase(testcases.TmpdirTestCase):
"""Tests for flock (provide code coverage, not full functional testing)."""
def test_happy_path(self):
"""Test happy path."""
with flock.FileLock('lockfile', timeout=0):
pass
def test_other_process_has_lock(self):
"""Test with other process holding lock."""
logger.debug('Starting test')
with _lock('lockfile'):
with self.assertRaises(flock.TimeoutError):
with flock.FileLock('lockfile', timeout=0):
pass # pragma: no cover
def test_with_stale_symlink_lock(self):
"""Test with existing symlink lock.
TODO(ayatane): Old symlink lock file handling can be removed,
see flock.py
"""
os.symlink('/proc/99999', 'lockfile')
with mock.patch.object(flock, '_is_pid_running',
autospec=True) as running:
running.return_value = False
with flock.FileLock('lockfile', timeout=0):
pass
self.assertFalse(os.path.islink('lockfile'))
def test_with_active_symlink_lock(self):
"""Test with existing symlink lock.
TODO(ayatane): Old symlink lock file handling can be removed,
see flock.py
"""
os.symlink('/proc/99999', 'lockfile')
with mock.patch.object(flock, '_is_pid_running',
autospec=True) as running:
running.return_value = True
with self.assertRaises(flock.TimeoutError):
with flock.FileLock('lockfile', timeout=0):
pass # pragma: no cover
class TimeoutErrorTestCase(unittest.TestCase):
"""Tests for Timeout Error"""
def test_str(self):
"""Test basic error string contents."""
e = flock.TimeoutError(filename='foo', timeout=123)
self.assertIn('foo', str(e))
self.assertIn('123', str(e))
_LOCK_SCRIPT = """\
import fcntl
import os
import time
fd = os.open(%(filename)r, os.O_CREAT | os.O_RDWR)
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
print('hi')
while True:
time.sleep(10)
"""
@contextlib.contextmanager
def _lock(filename):
"""Lock a file in another process for testing."""
logger.debug('Grabbing external lock on %s', filename)
with subprocess.Popen(
['python', '-u', '-c', _LOCK_SCRIPT % {'filename': filename}],
stdout=subprocess.PIPE) as proc:
logger.debug('Started external lock process for %s', filename)
proc.stdout.readline()
logger.debug('Finished grabbing external lock on %s', filename)
yield proc
logger.debug('Releasing external lock on %s', filename)
proc.terminate()