blob: ac38462ed38b3bb9e4cbebf1502b8c8fd81e8563 [file] [log] [blame]
# Copyright 2010 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import shutil
import tempfile
from portage import os
from portage.tests import TestCase
from _emerge.AsynchronousLock import AsynchronousLock
from _emerge.PollScheduler import PollScheduler
class AsynchronousLockTestCase(TestCase):
def testAsynchronousLock(self):
scheduler = PollScheduler().sched_iface
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
for force_thread in (True, False):
for force_dummy in (True, False):
async_lock = AsynchronousLock(path=path,
scheduler=scheduler, _force_dummy=force_dummy,
_force_thread=force_thread)
async_lock.start()
async_lock.wait()
async_lock.unlock()
self.assertEqual(async_lock.returncode, os.EX_OK)
finally:
shutil.rmtree(tempdir)