blob: c30043789dbe5c7fc0f0081a5858bd6dfc3ca38b [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Job leasing utilities
See infra/lucifer for the implementation of job leasing.
https://chromium.googlesource.com/chromiumos/infra/lucifer
Jobs are leased to processes to own and run. A process owning a job
obtain a job lease. Ownership of the lease is established using an
exclusive fcntl lock on the lease file.
If a lease file is older than a few seconds and is not locked, then its
owning process should be considered crashed.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import errno
import fcntl
import logging
import os
import socket
import time
from scandir import scandir
logger = logging.getLogger(__name__)
def get_expired_leases(jobdir):
"""Yield expired Leases in jobdir.
Expired jobs are jobs whose lease files are no longer locked.
@param jobdir: job lease file directory
@returns: iterator of Leases
"""
for lease in leases_iter(jobdir):
if lease.expired():
yield lease
def get_timed_out_leases(dbjob_model, jobdir):
"""Yield timed out Jobs that are leased.
@param dbjob_model: Django model for Job
@param jobdir: job lease file directory
@returns: iterator of Leases
"""
all_timed_out_dbjobs = (
dbjob_model.objects
.filter(hostqueueentry__complete=False)
.extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
.distinct()
)
for _, lease in _filter_leased(jobdir, all_timed_out_dbjobs):
yield lease
def get_marked_aborting_leases(dbjob_model, jobdir):
"""Yield Jobs marked for aborting that are leased.
@param dbjob_model: Django model for Job
@param jobdir: job lease file directory
@returns: iterator of Leases
"""
all_aborting_dbjobs = (
dbjob_model.objects
.filter(hostqueueentry__aborted=True)
.filter(hostqueueentry__complete=False)
.distinct()
)
for _, lease in _filter_leased(jobdir, all_aborting_dbjobs):
yield lease
def leases_iter(jobdir):
"""Yield Lease instances from jobdir.
@param jobdir: job lease file directory
@returns: iterator of Leases
"""
for entry in scandir(jobdir):
if _is_lease_entry(entry):
yield Lease(entry)
class Lease(object):
"Represents a job lease."
# Seconds after a lease file's mtime where its owning process is not
# considered dead.
_FRESH_LIMIT = 5
def __init__(self, entry):
"""Initialize instance.
@param entry: scandir.DirEntry instance
"""
self._entry = entry
@property
def id(self):
"""Return id of leased job."""
return int(self._entry.name)
def expired(self):
"""Return True if the lease is expired.
A lease is considered expired if there is no fcntl lock on it
and the grace period for the owning process to obtain the lock
has passed. The lease is not considered expired if the owning
process removed the lock file normally, as an expired lease
indicates that some error has occurred and clean up operations
are needed.
"""
try:
stat_result = self._entry.stat()
except OSError as e: # pragma: no cover
if e.errno == errno.ENOENT:
return False
raise
mtime = stat_result.st_mtime_ns / (10 ** 9)
if time.time() - mtime < self._FRESH_LIMIT:
return False
return not _fcntl_locked(self._entry.path)
def cleanup(self):
"""Remove the lease file.
This does not need to be called normally, as the owning process
should clean up its files.
"""
try:
os.unlink(self._entry.path)
except OSError as e:
logger.warning('Error removing %s: %s', self._entry.path, e)
try:
os.unlink(self._sock_path)
except OSError as e:
logger.warning('Error removing %s: %s', self._sock_path, e)
def abort(self):
"""Abort the job.
This sends a datagram to the abort socket associated with the
lease.
If the socket is closed, either the connect() call or the send()
call will raise socket.error with ECONNREFUSED.
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
logger.debug('Connecting to abort socket %s', self._sock_path)
sock.connect(self._sock_path)
logger.debug('Sending abort to %s', self._sock_path)
# The value sent does not matter.
sent = sock.send('abort')
# TODO(ayatane): I don't know if it is possible for sent to be 0
assert sent > 0
@property
def _sock_path(self):
"""Return the path of the abort socket corresponding to the lease."""
return self._entry.path + ".sock"
def _filter_leased(jobdir, dbjobs):
"""Filter Job models for leased jobs.
Yields pairs of Job model and Lease instances.
@param jobdir: job lease file directory
@param dbjobs: iterable of Django model Job instances
@returns: iterator of Leases
"""
our_jobs = {job.id: job for job in leases_iter(jobdir)}
for dbjob in dbjobs:
if dbjob.id in our_jobs:
yield dbjob, our_jobs[dbjob.id]
def _is_lease_entry(entry):
"""Return True if the DirEntry is for a lease."""
return entry.name.isdigit()
def _fcntl_locked(path):
"""Return True if a file is fcntl locked.
@param path: path to file
"""
fd = os.open(path, os.O_WRONLY)
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
return True
else:
return False
finally:
os.close(fd)