blob: c47a207887ba7e5c6bbc351bc25b155257bdc146 [file] [log] [blame]
# Copyright 2010-2011 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import dummy_threading
import fcntl
import sys
try:
import threading
except ImportError:
import dummy_threading as threading
import portage
from portage import os
from portage.exception import TryAgain
from portage.locks import lockfile, unlockfile
from _emerge.AbstractPollTask import AbstractPollTask
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollConstants import PollConstants
from _emerge.SpawnProcess import SpawnProcess
class AsynchronousLock(AsynchronousTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using either a thread (if available) or a subprocess.
The default behavior is to use a process instead of a thread, since
there is currently no way to interrupt a thread that is waiting for
a lock (notably, SIGINT doesn't work because python delivers all
signals to the main thread).
"""
__slots__ = ('path', 'scheduler',) + \
('_imp', '_force_async', '_force_dummy', '_force_process', \
'_force_thread', '_waiting')
_use_process_by_default = True
def _start(self):
if not self._force_async:
try:
self._imp = lockfile(self.path,
wantnewlockfile=True, flags=os.O_NONBLOCK)
except TryAgain:
pass
else:
self.returncode = os.EX_OK
self.wait()
return
if self._force_process or \
(not self._force_thread and \
(self._use_process_by_default or threading is dummy_threading)):
self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
else:
self._imp = _LockThread(path=self.path,
scheduler=self.scheduler,
_force_dummy=self._force_dummy)
self._imp.addExitListener(self._imp_exit)
self._imp.start()
def _imp_exit(self, imp):
# call exit listeners
if not self._waiting:
self.wait()
def _wait(self):
if self.returncode is not None:
return self.returncode
self._waiting = True
self.returncode = self._imp.wait()
self._waiting = False
return self.returncode
def unlock(self):
if self._imp is None:
raise AssertionError('not locked')
if isinstance(self._imp, (_LockProcess, _LockThread)):
self._imp.unlock()
else:
unlockfile(self._imp)
self._imp = None
class _LockThread(AbstractPollTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using a background thread. After the lock is acquired, the thread
writes to a pipe in order to notify a poll loop running in the main
thread.
If the threading module is unavailable then the dummy_threading
module will be used, and the lock will be acquired synchronously
(before the start() method returns).
"""
__slots__ = ('path',) + \
('_files', '_force_dummy', '_lock_obj',
'_thread', '_reg_id',)
def _start(self):
pr, pw = os.pipe()
self._files = {}
self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
for k, f in self._files.items():
fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
PollConstants.POLLIN, self._output_handler)
self._registered = True
threading_mod = threading
if self._force_dummy:
threading_mod = dummy_threading
self._thread = threading_mod.Thread(target=self._run_lock)
self._thread.start()
def _run_lock(self):
self._lock_obj = lockfile(self.path, wantnewlockfile=True)
self._files['pipe_write'].write(b'\0')
def _output_handler(self, f, event):
buf = self._read_buf(self._files['pipe_read'], event)
if buf:
self._unregister()
self.returncode = os.EX_OK
self.wait()
def _wait(self):
if self.returncode is not None:
return self.returncode
if self._registered:
self.scheduler.schedule(self._reg_id)
return self.returncode
def unlock(self):
if self._lock_obj is None:
raise AssertionError('not locked')
if self.returncode is None:
raise AssertionError('lock not acquired yet')
unlockfile(self._lock_obj)
self._lock_obj = None
def _unregister(self):
self._registered = False
if self._thread is not None:
self._thread.join()
self._thread = None
if self._reg_id is not None:
self.scheduler.unregister(self._reg_id)
self._reg_id = None
if self._files is not None:
for f in self._files.values():
f.close()
self._files = None
class _LockProcess(AbstractPollTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using a subprocess. After the lock is acquired, the process
writes to a pipe in order to notify a poll loop running in the main
process. The unlock() method notifies the subprocess to release the
lock and exit.
"""
__slots__ = ('path', 'scheduler',) + \
('_proc', '_files', '_reg_id')
def _start(self):
in_pr, in_pw = os.pipe()
out_pr, out_pw = os.pipe()
self._files = {}
self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
fcntl.fcntl(in_pr, fcntl.F_SETFL,
fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
self._reg_id = self.scheduler.register(in_pr,
PollConstants.POLLIN, self._output_handler)
self._registered = True
self._proc = SpawnProcess(
args=[portage._python_interpreter,
os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
scheduler=self.scheduler)
self._proc.addExitListener(self._proc_exit)
self._proc.start()
os.close(out_pr)
os.close(in_pw)
def _proc_exit(self, proc):
if proc.returncode != os.EX_OK:
# There's no good reason for locks to fail.
raise AssertionError('lock process failed with returncode %s' \
% (proc.returncode,))
def _wait(self):
if self.returncode is not None:
return self.returncode
if self._registered:
self.scheduler.schedule(self._reg_id)
return self.returncode
def _output_handler(self, f, event):
buf = self._read_buf(self._files['pipe_in'], event)
if buf:
self._unregister()
self.returncode = os.EX_OK
self.wait()
def _unregister(self):
self._registered = False
if self._reg_id is not None:
self.scheduler.unregister(self._reg_id)
self._reg_id = None
if self._files is not None:
try:
pipe_in = self._files.pop('pipe_in')
except KeyError:
pass
else:
pipe_in.close()
def unlock(self):
if self._proc is None:
raise AssertionError('not locked')
if self.returncode is None:
raise AssertionError('lock not acquired yet')
self._files['pipe_out'].write(b'\0')
self._files['pipe_out'].close()
self._files = None
self._proc.wait()
self._proc = None