blob: 6fc8819f5852df43bbde7c7e3f0d3438d9a67962 [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Commands to support cros_gestures
This code is modeled after and derived from the Command class in
gsutil/gslib/command.py for maximum re-use.
"""
__version__ = '1.0.0'
import datetime
import hashlib
import logging
import optparse
import os
import re
import sys
import cros_gestures_constants
import cros_gestures_utils
from exception import CrosGesturesException
color = cros_gestures_utils.Color()
LOG = logging.getLogger('cros_gestures_commands')
# Mon, 25 Jul 2011 08:20:08
DISPLAY_TIME_FORMAT = '%a, %d %b %Y %H:%M:%S'
# 20110725_082008
FILE_TIME_FORMAT = '%Y%m%d_%H%M%S'
def AddOptions(parser, config_options, admin=False):
"""Add command line option group for cros_gestures_commands.
Add helpful command line options related to commands defined
in this file.
-F, --force-rm: [invalidate/rm] confirmation of irreversible file removal.
-L, --list-metadata: [ls] most verbose ls command.
-U, --user-override: [all] substitute an alternate user tag in file namespace.
--print-filenames: [cat] include filename (helps cat of multiple cat files).
--download-dir: [download] place downloaded files out of current dir.
--upload-functionality: [upload]: override filename functionality.
--upload-fwversion: [upload]: override filename fwversion.
--upload-tag: [upload] set a tag (to arbitrarily group files).
"""
group = optparse.OptionGroup(parser, 'Command Options')
group.add_option('-F', '--force-rm',
help='Force %s of file [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'rm'),
'%default'),
dest='forcerm',
action='store_true',
default=False)
group.add_option('-L', '--list-metadata',
help='Show file metadata with %s [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'list'),
'%default'),
dest='listmetadata',
action='store_true',
default=False)
group.add_option('-U', '--user-override',
help='Override filename user of file [default: %s]' %
'%default',
dest='useroverride',
action='store_true',
default=False)
group.add_option('', '--print-filenames',
help='Show file names with %s output [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'cat'),
'%default'),
dest='catfilenames',
action='store_true',
default=False)
# Admin commands use different options.
if not admin:
group.add_option('', '--download-dir',
help='Set %s directory [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD,
'download'),
'%default'),
dest='downloaddir',
default=None)
group.add_option('', '--upload-functionality',
help='Set functionality during %s: %s [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'upload'),
', '.join(sorted(config_options.keys())),
'<from filename>'),
dest='uploadfunctionality',
default=None)
group.add_option('', '--upload-fwversion',
help='Set fwversion during %s [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'upload'),
'%default'),
dest='uploadfwversion',
default=None)
group.add_option('', '--upload-tag',
help='Supply %s tag for custom grouping [default: %s]' % (
color.Color(cros_gestures_utils.Color.BOLD, 'upload'),
'%default'),
dest='uploadtag',
default=None)
parser.add_option_group(group)
def FixupOptionsFromFilename(source_file, options):
"""Setup default options (metadata) from filename if possible.
template filename:
area-functionalitynamewithunderscores.subname-model-tester-timestamp.dat
0: area
1: functionality [optional subname(s) separated by period]
2: model
3: [fw version]
4: [optional attributes - each separated by hyphen]
5: tester
6: timestamp
"""
options.uploadarea = None # Create nonexistent attributes
options.uploadcreated = None
filename_parse = re.compile(
'([\w]+)-([\w]+)\.[\w.]+-([\w]+)-([\w]+)-([\w]+)')
m = re.match(filename_parse, source_file)
if not m or m.lastindex < 5:
raise CrosGesturesException(
'This filename is not formatted properly. Expecting: '
'area-functionality.subname-model-tester-timestamp.dat. '
'Skipping %s.' % source_file)
gs_area, gs_functionality, gs_model, gs_tester, gs_time = m.groups()
if gs_area:
options.uploadarea = gs_area
if gs_functionality:
options.uploadfunctionality = gs_functionality
if gs_model:
if not options.model:
options.model = gs_model
if gs_model != options.model:
raise CrosGesturesException(
'--model (%s) is different than file name model (%s).' %
(options.model, gs_model))
if gs_tester and gs_tester != options.userowner:
LOG.warning(
'--user (%s) is different than file name tester (%s).',
options.userowner, gs_tester)
if not options.useroverride:
options.userowner = gs_tester
if gs_time:
options.uploadcreated = datetime.datetime.strptime(
gs_time, FILE_TIME_FORMAT).strftime(DISPLAY_TIME_FORMAT)
# Extra validations.
if options.uploadfunctionality not in options.config_options:
raise CrosGesturesException('The config file does not expect this '
'functionality: %s' %
options.uploadfunctionality)
if (options.uploadarea not in
options.config_options[options.uploadfunctionality]):
raise CrosGesturesException('The area %s is not expected with %s.' %
(options.uploadarea,
options.uploadfunctionality))
class GestureUri(object):
"""Very thin wrapper around our gesture uri's."""
def __init__(self, options):
self.options = options
@staticmethod
def HasGSUri(uri_str):
"""Check one uri for our provider."""
return uri_str.lower().startswith('gs://')
@staticmethod
def HasGSUris(args):
"""Checks whether args contains any provider URIs (like 'gs://').
Args:
args: command-line arguments
Returns:
True if args contains any provider URIs.
"""
for uri_str in args:
if GestureUri.HasGSUri(uri_str):
return True
return False
def MakeGestureUri(self, uri_str=None):
"""Gesture files are prefaced by their ownername."""
assert uri_str
if uri_str == '*' and not self.options.model:
self.options.model = '*'
if not self.options.model:
raise CrosGesturesException('Please supply a model to MakeGestureUri.')
if not self.options.userowner:
raise CrosGesturesException('Please supply a user to MakeGestureUri.')
if not uri_str:
raise CrosGesturesException('Unexpected empty uri.')
if cros_gestures_constants.trusted:
user_type = 'trusted-dev'
else:
user_type = 'untrusted-dev'
return 'gs://chromeos-gestures-%s/%s/%s/%s' % (user_type,
self.options.model,
self.options.userowner,
uri_str)
@staticmethod
def MakeValidUri(uri_str):
"""Gesture files headed to the valid bucket."""
if not GestureUri.HasGSUri(uri_str):
raise CrosGesturesException('Validate requires a gs:// uri.')
return re.sub('gs://chromeos-gestures.*trusted-dev/',
'gs://chromeos-gestures-valid/', uri_str)
def MakeGestureUris(self, args):
"""Fixup args to be valid gs uri's if needed."""
new_args = []
for uri_str in args:
if GestureUri.HasGSUri(uri_str):
new_arg = uri_str
else:
uri_str = os.path.basename(uri_str)
FixupOptionsFromFilename(uri_str, self.options)
new_arg = self.MakeGestureUri(uri_str)
new_args.append(new_arg)
return new_args
class GestureCommand(object):
"""Class that contains all our Gesture command code."""
def __init__(self, gsutil_bin_dir):
"""Instantiates GestureCommand class. Modeled after gslib/command/Command.
Args:
gsutil_bin_dir: bin dir from which gsutil is running.
"""
self.gsutil_bin_dir = gsutil_bin_dir
def _FileExists(self, file_name):
"""Helper to see if a fully path-included remote file exits."""
return 0 == cros_gestures_utils.RunGSUtil(self.gsutil_bin_dir, LOG, 'ls',
args=[file_name],
show_output=False)
def RunGSUtil(self, cmd, headers=None, sub_opts=None, args=None,
show_output=True):
"""Executes common gsutil run command utility."""
return cros_gestures_utils.RunGSUtil(self.gsutil_bin_dir, LOG, cmd,
headers, sub_opts, args, show_output)
def CatGestureCommand(self, args, options):
"""Cat a single gesture file from Google Storage."""
guri = GestureUri(options)
args = guri.MakeGestureUris(args)
if options.catfilenames:
sub_opts = ['-h']
else:
sub_opts = None
return self.RunGSUtil(cmd='cat', sub_opts=sub_opts, args=args)
def DownloadGestureCommand(self, args, options):
"""Download a single gesture file from Google Storage."""
# TODO(Truty): add md5/md5 header and verify.
local_files = args[:]
guri = GestureUri(options)
args = guri.MakeGestureUris(args)
rc = 0
for i in xrange(len(args)):
remote_file = args[i]
if not self._FileExists(remote_file):
LOG.warning(color.Color(
cros_gestures_utils.Color.RED, 'File %s not found.' % remote_file))
continue
local_file = local_files[i]
if options.downloaddir:
if not os.path.exists(options.downloaddir):
os.makedirs(options.downloaddir)
local_file = os.path.join(options.downloaddir, local_file)
if os.path.isfile(local_file):
raise CrosGesturesException('Local file %s already exists.' %
local_file)
rc += self.RunGSUtil(cmd='cp', args=[remote_file, local_file])
return rc
def InvalidateGestureCommand(self, args, options):
"""Allows a gesture file to be blocked to tests.
See ValidateGestureCommand() for more detail.
"""
if not options.forcerm:
raise CrosGesturesException(
'invalidate requires -F to force file removal.')
guri = GestureUri(options)
source_file = guri.MakeGestureUris(args)[0]
target_file = GestureUri.MakeValidUri(source_file)
if not self._FileExists(target_file):
raise CrosGesturesException('Validated file %s cannot be found.'
% target_file)
return self.RunGSUtil(cmd='rm', args=[target_file])
def ListGesturesCommand(self, args, options):
"""List gestures (and metadata) in Google Storage."""
guri = GestureUri(options)
if not args:
args = [guri.MakeGestureUri('*')]
else:
args = guri.MakeGestureUris(args)
if options.listmetadata:
sub_opts = ['-L']
LOG.info('This can take a little longer to retrieve metadata.')
else:
sub_opts = None
return self.RunGSUtil(cmd='ls', sub_opts=sub_opts, args=args)
def RemoveGesturesCommand(self, args, options):
"""Remove gestures in Google Storage."""
guri = GestureUri(options)
args = guri.MakeGestureUris(args)
if not options.forcerm:
raise CrosGesturesException('rm requires -F to force file removal.')
return self.RunGSUtil(cmd='rm', args=args)
def UploadGestureCommand(self, args, options):
"""Upload a single gesture file to Google Storage."""
local_file = args[0]
guri = GestureUri(options)
remote_file = guri.MakeGestureUris(args)[0]
if self._FileExists(remote_file):
raise CrosGesturesException('File %s already exists.' % remote_file,
informational=True)
if not os.path.isfile(local_file):
raise CrosGesturesException('Cannot find source file: %s.' % local_file)
if not options.uploadfunctionality:
raise CrosGesturesException('upload requires '
'--upload-functionality=functionality. '
'\n\t\tfunctionality is one of:\n\t\t\t%s' %
('\n\t\t\t'.join(options.config_options)))
if not options.uploadarea:
raise CrosGesturesException('upload requires an upload area.')
if not options.model:
raise CrosGesturesException('upload requires --model=model#.')
# uploadfwversion is NOT required.
# If this is not project-private, then only the owner can mark the
# file object public-read (we call that 'validation').
sub_opts = ['-a', 'project-private']
hprefix = 'x-goog-meta'
f = open(local_file, 'r')
file_contents = f.read()
f.close()
headers = ["'Content-MD5':%s" % hashlib.md5(file_contents).hexdigest(),
'"%s-area:%s"' % (hprefix, options.uploadarea),
'"%s-function:%s"' % (hprefix, options.uploadfunctionality),
'"%s-fw_version:%s"' % (hprefix, options.uploadfwversion),
'"%s-model:%s"' % (hprefix, options.model),
'"%s-created:%s"' % (hprefix, options.uploadcreated)]
if options.uploadtag:
headers.append('"%s-tag:%s"' % (hprefix, options.uploadtag))
return self.RunGSUtil(cmd='cp', headers=headers, sub_opts=sub_opts,
args=[local_file, remote_file])
def ValidateGestureCommand(self, args, options):
"""Allows a gesture file to be consumed by tests.
Validate makes files available for unauthenticated access. Note that
in copying the file to the valid bucket, the object's metadata is lost.
It's metadata remains in the source folder, though, for filtering use.
1. 'Invalid' files are not accessible by unauthenticated-public.
2. It is desirable to know the date of the validation. The logic of
interpreting 'date-validated is:
If the file is 'valid', then 'Last mod' is the 'date-validated'.
Public access of 'validated' files is through:
http://chromeos-gestures-valid.commondatastorage.googleapis.com/
"""
guri = GestureUri(options)
source_file = guri.MakeGestureUris(args)[0]
target_file = GestureUri.MakeValidUri(source_file)
if not self._FileExists(source_file):
raise CrosGesturesException('File %s cannot be found.' % source_file)
if self._FileExists(target_file):
raise CrosGesturesException('Validated file %s already exists.'
% target_file)
rc = self.RunGSUtil(cmd='cp', args=[source_file, target_file])
if not rc:
rc = self.RunGSUtil(cmd='setacl', args=['public-read', target_file])
return rc
def VersionCommand(self, args, options):
"""Print version information for gsutil."""
msg = color.Color(cros_gestures_utils.Color.BOLD,
'cros_gestures version %s\n' % __version__)
sys.stderr.write(msg)
return self.RunGSUtil(cmd='ver')