| # Copyright 2010 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import dummy_threading |
| import fcntl |
| import sys |
| |
| try: |
| import threading |
| except ImportError: |
| import dummy_threading as threading |
| |
| import portage |
| from portage import os |
| from portage.exception import TryAgain |
| from portage.locks import lockfile, unlockfile |
| from _emerge.AbstractPollTask import AbstractPollTask |
| from _emerge.AsynchronousTask import AsynchronousTask |
| from _emerge.PollConstants import PollConstants |
| from _emerge.SpawnProcess import SpawnProcess |
| |
| class AsynchronousLock(AsynchronousTask): |
| """ |
| This uses the portage.locks module to acquire a lock asynchronously, |
| using either a thread (if available) or a subprocess. |
| """ |
| |
| __slots__ = ('path', 'scheduler',) + \ |
| ('_imp', '_force_async', '_force_dummy', '_force_process', \ |
| '_force_thread', '_waiting') |
| |
| def _start(self): |
| |
| if not self._force_async: |
| try: |
| self._imp = lockfile(self.path, |
| wantnewlockfile=True, flags=os.O_NONBLOCK) |
| except TryAgain: |
| pass |
| else: |
| self.returncode = os.EX_OK |
| self.wait() |
| return |
| |
| if self._force_process or \ |
| (not self._force_thread and threading is dummy_threading): |
| self._imp = _LockProcess(path=self.path, scheduler=self.scheduler) |
| else: |
| self._imp = _LockThread(path=self.path, |
| scheduler=self.scheduler, |
| _force_dummy=self._force_dummy) |
| |
| self._imp.addExitListener(self._imp_exit) |
| self._imp.start() |
| |
| def _imp_exit(self, imp): |
| # call exit listeners |
| if not self._waiting: |
| self.wait() |
| |
| def _wait(self): |
| if self.returncode is not None: |
| return self.returncode |
| self._waiting = True |
| self.returncode = self._imp.wait() |
| self._waiting = False |
| return self.returncode |
| |
| def unlock(self): |
| if self._imp is None: |
| raise AssertionError('not locked') |
| if isinstance(self._imp, (_LockProcess, _LockThread)): |
| self._imp.unlock() |
| else: |
| unlockfile(self._imp) |
| self._imp = None |
| |
| class _LockThread(AbstractPollTask): |
| """ |
| This uses the portage.locks module to acquire a lock asynchronously, |
| using a background thread. After the lock is acquired, the thread |
| writes to a pipe in order to notify a poll loop running in the main |
| thread. |
| |
| If the threading module is unavailable then the dummy_threading |
| module will be used, and the lock will be acquired synchronously |
| (before the start() method returns). |
| """ |
| |
| __slots__ = ('path',) + \ |
| ('_files', '_force_dummy', '_lock_obj', |
| '_thread', '_reg_id',) |
| |
| def _start(self): |
| pr, pw = os.pipe() |
| self._files = {} |
| self._files['pipe_read'] = os.fdopen(pr, 'rb', 0) |
| self._files['pipe_write'] = os.fdopen(pw, 'wb', 0) |
| for k, f in self._files.items(): |
| fcntl.fcntl(f.fileno(), fcntl.F_SETFL, |
| fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) |
| self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(), |
| PollConstants.POLLIN, self._output_handler) |
| self._registered = True |
| threading_mod = threading |
| if self._force_dummy: |
| threading_mod = dummy_threading |
| self._thread = threading_mod.Thread(target=self._run_lock) |
| self._thread.start() |
| |
| def _run_lock(self): |
| self._lock_obj = lockfile(self.path, wantnewlockfile=True) |
| self._files['pipe_write'].write(b'\0') |
| |
| def _output_handler(self, f, event): |
| buf = self._read_buf(self._files['pipe_read'], event) |
| if buf: |
| self._unregister() |
| self.returncode = os.EX_OK |
| self.wait() |
| |
| def _wait(self): |
| if self.returncode is not None: |
| return self.returncode |
| if self._registered: |
| self.scheduler.schedule(self._reg_id) |
| return self.returncode |
| |
| def unlock(self): |
| if self._lock_obj is None: |
| raise AssertionError('not locked') |
| if self.returncode is None: |
| raise AssertionError('lock not acquired yet') |
| unlockfile(self._lock_obj) |
| self._lock_obj = None |
| |
| def _unregister(self): |
| self._registered = False |
| |
| if self._thread is not None: |
| self._thread.join() |
| self._thread = None |
| |
| if self._reg_id is not None: |
| self.scheduler.unregister(self._reg_id) |
| self._reg_id = None |
| |
| if self._files is not None: |
| for f in self._files.values(): |
| f.close() |
| self._files = None |
| |
| class _LockProcess(AbstractPollTask): |
| """ |
| This uses the portage.locks module to acquire a lock asynchronously, |
| using a subprocess. After the lock is acquired, the process |
| writes to a pipe in order to notify a poll loop running in the main |
| process. The unlock() method notifies the subprocess to release the |
| lock and exit. |
| """ |
| |
| __slots__ = ('path', 'scheduler',) + \ |
| ('_proc', '_files', '_reg_id') |
| |
| def _start(self): |
| in_pr, in_pw = os.pipe() |
| out_pr, out_pw = os.pipe() |
| self._files = {} |
| self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0) |
| self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0) |
| fcntl.fcntl(in_pr, fcntl.F_SETFL, |
| fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK) |
| self._reg_id = self.scheduler.register(in_pr, |
| PollConstants.POLLIN, self._output_handler) |
| self._registered = True |
| self._proc = SpawnProcess( |
| args=[portage._python_interpreter, |
| os.path.join(portage._bin_path, 'lock-helper.py'), self.path], |
| env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path), |
| fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()}, |
| scheduler=self.scheduler) |
| self._proc.addExitListener(self._proc_exit) |
| self._proc.start() |
| os.close(out_pr) |
| os.close(in_pw) |
| |
| def _proc_exit(self, proc): |
| if proc.returncode != os.EX_OK: |
| # There's no good reason for locks to fail. |
| raise AssertionError('lock process failed with returncode %s' \ |
| % (proc.returncode,)) |
| |
| def _wait(self): |
| if self.returncode is not None: |
| return self.returncode |
| if self._registered: |
| self.scheduler.schedule(self._reg_id) |
| return self.returncode |
| |
| def _output_handler(self, f, event): |
| buf = self._read_buf(self._files['pipe_in'], event) |
| if buf: |
| self._unregister() |
| self.returncode = os.EX_OK |
| self.wait() |
| |
| def _unregister(self): |
| self._registered = False |
| |
| if self._reg_id is not None: |
| self.scheduler.unregister(self._reg_id) |
| self._reg_id = None |
| |
| if self._files is not None: |
| try: |
| pipe_in = self._files.pop('pipe_in') |
| except KeyError: |
| pass |
| else: |
| pipe_in.close() |
| |
| def unlock(self): |
| if self._proc is None: |
| raise AssertionError('not locked') |
| if self.returncode is None: |
| raise AssertionError('lock not acquired yet') |
| self._files['pipe_out'].write(b'\0') |
| self._files['pipe_out'].close() |
| self._files = None |
| self._proc.wait() |
| self._proc = None |