blob: 3c4c398fba0502ef255313cf4cc317689d7b8283 [file] [log] [blame] [edit]
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Display active git branches and code changes in a ChromiumOS workspace."""
import optparse
import os
import re
import subprocess
import sys
import threading
try:
import chromite.lib.git
except ImportError:
lib_location = os.path.join(os.path.dirname(sys.argv[0]), '../../..')
sys.path.append(lib_location)
try:
import chromite.lib.git
except ImportError:
print 'Failed to import chromite library'
sys.exit(1)
import chromite.lib.terminal
Colorizer = chromite.lib.terminal.Color(os.isatty(sys.stdout.fileno()))
class Result(object):
'''A class to synchronize multiple repository request threads.
Multiple parallel running threads collect text information (for instance,
state of different git repositories, one per thread).
Using this class object allows these multiple threads to synchronously
attach collected information to a list.
This class also provides a facility to unblock the master thread once all
parallel running threads have finished running.
To make matters simpler, the number of parallel threads to expect to run is
given to this class during initialization.
Attributes:
thread_lock - a lock object, used as a mutex to control access to the count
of running threads
finished_lock - a lock object, used as a mutex for the master thread. This
lock is locked as soon as the first parallel thread starts
and stays locked until the last parallel thread finishes.
thread_count - an integer, number of parallel threads this object will
have to handle.
fast_report - a boolean, if True - report results as they arrive, do not
wait for all thread results to arrive and get sorted
started_threads - an integer, number of threads started so far.
results - a list of strings, each representing a result of a running
parallel thread.
not_existing - an integer, counter of repos which were not found (most
often happens in minilayouts)
'''
def __init__(self, max_threads, fast_report=False):
'''Initialize the object with the number of parallel threads to handle.'''
self.thread_lock = threading.Lock()
self.finished_lock = threading.Lock()
self.finished_lock.acquire()
self.thread_count = max_threads
self.fast_report = fast_report
self.started_threads = 0
self.results = []
self.not_existing = 0
def ReportThreadStatus(self):
'''Print status line when enabled.
If enabled, print a line showing numbers of started and still running
threads. Make sure this line is printed in the same screen position (using
the ANSI cursor control escape sequence).
'''
if self.fast_report:
return
print '\n%c[1AStarted threads: %3d, remaining threads: %3d' % (
27, self.started_threads, self.thread_count),
def ReportStartedThread(self):
'''Increment started thread count.
And report status, if enabled.
'''
self.thread_lock.acquire()
self.started_threads = self.started_threads + 1
self.ReportThreadStatus()
self.thread_lock.release()
def ReportFinishedThread(self, result=None):
'''Report that a parallel thread finished running.
Each time a finished thread is reported, decrement the running thread
count. Once the running thread count reaches zero - unblock the master
thread lock.
result - a string, representing the output of the thread, could be empty.
Set to None for threads started on non-existing repositories
(happens in minilayouts).
If not empty - ether add this string to the results attribute or
print it, if fast reporting is enabled.
'''
self.thread_lock.acquire()
if result == None:
self.not_existing += 1
elif result:
if self.fast_report:
print result
else:
self.results.append(result)
self.thread_count = self.thread_count - 1
self.ReportThreadStatus()
self.thread_lock.release()
if self.thread_count == 0:
if not self.fast_report:
# Erase the status line.
print '\n%c[1A%c[K%c[1A' % (27, 27, 27),
self.finished_lock.release()
def GetResults(self):
'''Wait till all parallel threads finish and return results.
This function blocks until all parallel threads finish running. Once they
do, sort the results[] list and return it.
'''
with self.finished_lock:
if self.not_existing:
self.results.append(Colorizer.Color(
Colorizer.RED, '%d repos not found' % self.not_existing))
return sorted(self.results)
def RunCommand(path, command):
"""Run a command in a given directory, return stdout."""
return subprocess.Popen(command,
cwd=path,
stdout=subprocess.PIPE).communicate()[0].rstrip()
def GetBranches(full_name, color, revision):
"""Return a list of branch descriptions."""
command = ['git', 'branch', '-vv']
if color:
command.append('--color')
branches = RunCommand(full_name, command).splitlines()
# Find out full sha1 of the tracking branch
command = ['git', 'rev-parse', revision]
tracking_sha1 = RunCommand(full_name, command)
rv = []
for branch in branches:
# Let's get the detached HEAD branch sha1. The branch's string looks as
# follows:
#
# For versions older than git 1.8.x:
# `* (no branch) <sha_1> <description>'
# Newer versions look like:
# `* (detached from <sha_1>) <sha_1> <description>'
#
# The asterisk could be not there, the '(no branch)' substring could be
# enveloped in the color enable/disable escape sequences with spaces
# potentially before and after the escape sequence.
m = re.search(
'\*? +[^ ]*\((no branch|detached from [^)]*)\) *(?:\x1b..)? *([^ ]+) ',
branch)
if m:
branch_sha1 = m.groups(0)[1]
# Does detached HEAD match the tracking branch?
detached_head_match = tracking_sha1.startswith(branch_sha1)
if detached_head_match and len(branches) == 1:
# This is the only local branch in repo, and it matches tracking branch.
return []
else:
if not detached_head_match:
# Does not match the tracking branch sha1, mark it with a prefix
rv.append('%s %s' % (Colorizer.Color(Colorizer.RED, '!!!!!'), branch))
continue
rv.append(branch)
return rv
def GetStatus(full_name):
"""Return a list of files that have modifications."""
command = ['git', 'status', '-s']
return RunCommand(full_name, command).splitlines()
def GetHistory(full_name, author, days):
"""Return a list of oneline log messages.
The messages are for the author going back a specified number of days.
"""
command = ['git', 'log',
'--author=' + author,
'--after=' + '-' + str(days) + 'days',
'--pretty=oneline',
'm/master']
return RunCommand(full_name, command).splitlines()
def ShowBranches(full_name, tracking_branch, color, options):
"""Show all local branches' state.
For a git tree in directory 'full_name' tracking branch named
'tracking_branch', report the status of the tree.
"""
branches = GetBranches(full_name, color, tracking_branch)
status = GetStatus(full_name)
text = []
if options.logs:
history = GetHistory(full_name, options.author, options.days)
else:
history = []
if branches or status or history:
# We want to use the full path for testing, but we want to use the
# relative path for display. Add an empty string to the list to have the
# output sections separated.
text = [ '', Colorizer.Color(Colorizer.BLUE,
os.path.relpath(full_name)), ]
for extra in (branches, status, history):
if extra:
text = text + extra
return text
def ShowDir(project, root, color, options, result):
"""Report active work in a single git repository."""
result.ReportStartedThread()
text = []
try:
full_name = os.path.join(root, project['path'])
if os.path.isdir(full_name):
if chromite.lib.git.IsSHA1(project['revision']):
top_revision = project['revision']
else:
top_revision = project['tracking_branch']
text = ShowBranches(full_name, top_revision, color, options)
except Exception as e:
text = ['Exception %s occured. Arguments: %s' % (
type(e).__name__, ' '.join(e.args))]
result.ReportFinishedThread('\n'.join(text))
def FindRoot():
"""Returns the repo root.
The script could run both inside and outside chroot. When run outside
chroot, it could be invoked from the current directory not in chroot source
tree.
So, if the search based on the current directory fails, try finding the top
directory based on the location of this script (presumably it is in the
chroot source tree).
"""
# Try looking based on current directory
repo_path = chromite.lib.git.FindRepoDir(os.getcwd())
if repo_path is None:
repo_path = chromite.lib.git.FindRepoDir(os.path.dirname(sys.argv[0]))
if repo_path is None:
raise Exception('Failed to find repo root')
return os.path.dirname(repo_path)
def main():
parser = optparse.OptionParser(usage = 'usage: %prog [options]\n')
parser.add_option('-l', '--logs', default=False,
help='Show the last few days of your commits in short '
'form.',
action='store_true',
dest='logs')
parser.add_option('-d', '--days', default=8,
help='Set the number of days of history to show.',
type='int',
dest='days')
parser.add_option('-a', '--author', default=os.environ['USER'],
help='Set the author to filter for.',
type='string',
dest='author')
parser.add_option('-f', '--fast-report', default=False,
help='Print results as they arrive, do not sort.',
action='store_true',
dest='fast_report')
options, arguments = parser.parse_args()
if arguments:
parser.print_usage()
return 1
color = os.isatty(1)
root = FindRoot()
manifest = chromite.lib.git.ManifestCheckout(root)
projects = manifest.ListCheckouts()
result = Result(len(projects), options.fast_report)
for p in projects:
t = threading.Thread(
target=ShowDir,
args=(p, root, color, options, result))
t.start()
print '\n'.join(result.GetResults())
if __name__ == '__main__':
main()