blob: 3a2ccfb84a21d92c1c454ecd4b5f4789a9d091b7 [file] [log] [blame]
# Copyright 2010-2011 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import signal
import tempfile
from portage import os
from portage import shutil
from portage.tests import TestCase
from portage.util._eventloop.global_event_loop import global_event_loop
from _emerge.AsynchronousLock import AsynchronousLock
class AsynchronousLockTestCase(TestCase):
def _testAsynchronousLock(self):
scheduler = global_event_loop()
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
for force_async in (True, False):
for force_dummy in (True, False):
async_lock = AsynchronousLock(path=path,
scheduler=scheduler, _force_async=force_async,
_force_thread=True,
_force_dummy=force_dummy)
async_lock.start()
self.assertEqual(async_lock.wait(), os.EX_OK)
self.assertEqual(async_lock.returncode, os.EX_OK)
async_lock.unlock()
async_lock = AsynchronousLock(path=path,
scheduler=scheduler, _force_async=force_async,
_force_process=True)
async_lock.start()
self.assertEqual(async_lock.wait(), os.EX_OK)
self.assertEqual(async_lock.returncode, os.EX_OK)
async_lock.unlock()
finally:
shutil.rmtree(tempdir)
def testAsynchronousLock(self):
self._testAsynchronousLock()
def testAsynchronousLockHardlink(self):
prev_state = os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = "1"
try:
self._testAsynchronousLock()
finally:
os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
if prev_state is not None:
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = prev_state
def _testAsynchronousLockWait(self):
scheduler = global_event_loop()
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
lock1 = AsynchronousLock(path=path, scheduler=scheduler)
lock1.start()
self.assertEqual(lock1.wait(), os.EX_OK)
self.assertEqual(lock1.returncode, os.EX_OK)
# lock2 requires _force_async=True since the portage.locks
# module is not designed to work as intended here if the
# same process tries to lock the same file more than
# one time concurrently.
lock2 = AsynchronousLock(path=path, scheduler=scheduler,
_force_async=True, _force_process=True)
lock2.start()
# lock2 should be waiting for lock1 to release
self.assertEqual(lock2.poll(), None)
self.assertEqual(lock2.returncode, None)
lock1.unlock()
self.assertEqual(lock2.wait(), os.EX_OK)
self.assertEqual(lock2.returncode, os.EX_OK)
lock2.unlock()
finally:
shutil.rmtree(tempdir)
def testAsynchronousLockWait(self):
self._testAsynchronousLockWait()
def testAsynchronousLockWaitHardlink(self):
prev_state = os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = "1"
try:
self._testAsynchronousLockWait()
finally:
os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
if prev_state is not None:
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = prev_state
def _testAsynchronousLockWaitCancel(self):
scheduler = global_event_loop()
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
lock1 = AsynchronousLock(path=path, scheduler=scheduler)
lock1.start()
self.assertEqual(lock1.wait(), os.EX_OK)
self.assertEqual(lock1.returncode, os.EX_OK)
lock2 = AsynchronousLock(path=path, scheduler=scheduler,
_force_async=True, _force_process=True)
lock2.start()
# lock2 should be waiting for lock1 to release
self.assertEqual(lock2.poll(), None)
self.assertEqual(lock2.returncode, None)
# Cancel lock2 and then check wait() and returncode results.
lock2.cancel()
self.assertEqual(lock2.wait() == os.EX_OK, False)
self.assertEqual(lock2.returncode == os.EX_OK, False)
self.assertEqual(lock2.returncode is None, False)
lock1.unlock()
finally:
shutil.rmtree(tempdir)
def testAsynchronousLockWaitCancel(self):
self._testAsynchronousLockWaitCancel()
def testAsynchronousLockWaitCancelHardlink(self):
prev_state = os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = "1"
try:
self._testAsynchronousLockWaitCancel()
finally:
os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
if prev_state is not None:
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = prev_state
def _testAsynchronousLockWaitKill(self):
scheduler = global_event_loop()
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
lock1 = AsynchronousLock(path=path, scheduler=scheduler)
lock1.start()
self.assertEqual(lock1.wait(), os.EX_OK)
self.assertEqual(lock1.returncode, os.EX_OK)
lock2 = AsynchronousLock(path=path, scheduler=scheduler,
_force_async=True, _force_process=True)
lock2.start()
# lock2 should be waiting for lock1 to release
self.assertEqual(lock2.poll(), None)
self.assertEqual(lock2.returncode, None)
# Kill lock2's process and then check wait() and
# returncode results. This is intended to simulate
# a SIGINT sent via the controlling tty.
self.assertEqual(lock2._imp is not None, True)
self.assertEqual(lock2._imp._proc is not None, True)
self.assertEqual(lock2._imp._proc.pid is not None, True)
lock2._imp._kill_test = True
os.kill(lock2._imp._proc.pid, signal.SIGTERM)
self.assertEqual(lock2.wait() == os.EX_OK, False)
self.assertEqual(lock2.returncode == os.EX_OK, False)
self.assertEqual(lock2.returncode is None, False)
lock1.unlock()
finally:
shutil.rmtree(tempdir)
def testAsynchronousLockWaitKill(self):
self._testAsynchronousLockWaitKill()
def testAsynchronousLockWaitKillHardlink(self):
prev_state = os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = "1"
try:
self._testAsynchronousLockWaitKill()
finally:
os.environ.pop("__PORTAGE_TEST_HARDLINK_LOCKS", None)
if prev_state is not None:
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] = prev_state