blob: dd66736a956b05c1eb734d180d3d9ee6411fa28d [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This library can use Google Storage files as basis for locking.
This is mostly convenient because it works inter-server.
"""
from __future__ import print_function
import fixup_path
fixup_path.FixupPath()
import datetime
import logging
import os
import random
import re
import socket
from chromite.lib.paygen import gslib
from chromite.lib.paygen import utils
class LockProbeError(Exception):
"""Raised when there was an error probing a lock file."""
class LockNotAcquired(Exception):
"""Raised when the lock is already held by another process."""
class Lock(object):
"""This class manages a google storage file as a form of lock.
This class can be used in conjuction with a "with" clause to ensure
the lock is released, or directly.
try:
with gslock.Lock("gs://chromoes-releases/lock-file"):
# Protected code
...
except LockNotAcquired:
# Error handling
...
lock = gslock.Lock("gs://chromoes-releases/lock-file")
try:
lock.Acquire()
except LockNotAcquired:
# Error handling
# Protected code
...
lock.Release()
Locking is strictly atomic, except when timeouts are involved.
It assumes that local server time is in sync with Google Storage server
time.
"""
def __init__(self, gs_path, lock_timeout_mins=120, dry_run=False):
"""Initializer for the lock.
Args:
gs_path:
Path to the potential GS file we use for lock management.
lock_timeout_mins:
How long should an existing lock be considered valid? This timeout
should be long enough that it's never hit unless a server is
unexpectedly rebooted, lost network connectivity or had
some other catastrophic error.
dry_run: do nothing, always succeed
"""
self._gs_path = gs_path
self._timeout = datetime.timedelta(minutes=lock_timeout_mins)
self._contents = repr((socket.gethostname(), os.getpid(), id(self),
random.random()))
self._generation = 0
self._dry_run = dry_run
def _LockExpired(self):
"""Check to see if an existing lock has timed out.
Returns:
True if the lock is expired. False otherwise.
"""
try:
modified = self.LastModified()
except LockProbeError:
# If we couldn't figure out when the file was last modified, it might
# have already been released. In any case, it's probably not safe to try
# to clear the lock, so we'll return False here.
return False
return modified and datetime.datetime.utcnow() > modified + self._timeout
def _AcquireLock(self, filename, retries):
"""Attempt to acquire the lock.
Args:
filename: local file to copy into the lock's contents.
retries: How many times to retry to GS operation to fetch the lock.
Returns:
Whether or not the lock was acquired.
"""
try:
res = gslib.RunGsutilCommand(['cp', '-v', filename, self._gs_path],
generation=self._generation,
redirect_stdout=True, redirect_stderr=True)
m = re.search(r'%s#(\d+)' % self._gs_path, res.error)
if m:
error = None
self._generation = int(m.group(1))
else:
error = 'No generation found.'
self._generation = 0
except gslib.GSLibError as ex:
error = str(ex)
try:
result = gslib.Cat(self._gs_path, generation=self._generation)
except gslib.CatFail as ex:
self._generation = 0
if error:
raise LockNotAcquired(error)
raise LockNotAcquired(ex)
if result == self._contents:
if error is not None:
logging.warning('Lock at %s acquired despite copy error.',
self._gs_path)
elif self._LockExpired() and retries >= 0:
logging.warning('Timing out lock at %s.', self._gs_path)
try:
# Attempt to set our generation to whatever the current generation is.
res = gslib.RunGsutilCommand(['stat', self._gs_path],
redirect_stdout=True)
m = re.search(r'Generation:\s*(\d+)', res.output)
# Make sure the lock is still expired and hasn't been stolen by
# someone else.
if self._LockExpired():
self._generation = int(m.group(1))
except gslib.GSLibError as ex:
logging.warning('Exception while stat-ing %s: %s', self._gs_path, ex)
logging.warning('Lock may have been cleared by someone else')
self._AcquireLock(filename, retries - 1)
else:
self._generation = 0
raise LockNotAcquired(result)
def LastModified(self):
"""Return the lock's last modification time.
If the lock is already acquired, uses in-memory values. Otherwise, probes
the lock file.
Returns:
The UTC time when the lock was last modified. None if corresponding
attribute is missing.
Raises:
LockProbeError: if a (non-acquired) lock is not present.
"""
try:
res = gslib.RunGsutilCommand(['stat', self._gs_path],
redirect_stdout=True)
m = re.search(r'Creation time:\s*(.*)', res.output)
if not m:
raise LockProbeError('Failed to extract creation time.')
return datetime.datetime.strptime(m.group(1), '%a, %d %b %Y %H:%M:%S %Z')
except gslib.GSLibError as ex:
raise LockProbeError(ex)
return None
def Acquire(self):
"""Attempt to acquire the lock.
Will remove an existing lock if it has timed out.
Raises:
LockNotAcquired if it is unable to get the lock.
"""
if self._dry_run:
return
with utils.CreateTempFileWithContents(self._contents) as tmp_file:
self._AcquireLock(tmp_file.name, gslib.RETRY_ATTEMPTS)
def Release(self):
"""Release the lock."""
if self._dry_run:
return
try:
gslib.Remove(self._gs_path, generation=self._generation,
ignore_no_match=True)
except gslib.RemoveFail:
if not self._LockExpired():
raise
logging.warning('Lock at %s expired and was stolen.', self._gs_path)
self._generation = 0
def Renew(self):
"""Resets the timeout on a lock you are holding.
Raises:
LockNotAcquired if it can't Renew the lock for any reason.
"""
if self._dry_run:
return
if int(self._generation) == 0:
raise LockNotAcquired('Lock not held')
self.Acquire()
def __enter__(self):
"""Support for entering a with clause."""
self.Acquire()
return self
def __exit__(self, _type, _value, _traceback):
"""Support for exiting a with clause."""
self.Release()