blob: a3cd13a8af185d844d560177942fe7bd0faceb18 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for cros_portage_upgrade.py."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)),
'..', '..'))
from chromite.lib import cros_test_lib
from chromite.lib import gdata_lib
from chromite.lib import upgrade_table as utable
from chromite.scripts import sync_package_status as sps
# pylint: disable=W0212,R0904
class SyncerTest(cros_test_lib.MoxOutputTestCase):
col_amd64 = utable.UpgradeTable.GetColumnName(utable.UpgradeTable.COL_STATE,
'amd64')
col_amd64 = gdata_lib.PrepColNameForSS(col_amd64)
col_arm = utable.UpgradeTable.GetColumnName(utable.UpgradeTable.COL_STATE,
'arm')
col_arm = gdata_lib.PrepColNameForSS(col_arm)
col_x86 = utable.UpgradeTable.GetColumnName(utable.UpgradeTable.COL_STATE,
'x86')
col_x86 = gdata_lib.PrepColNameForSS(col_x86)
def testInit(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
tcomm, scomm = 'TComm', 'SComm'
# Replay script
self.mox.ReplayAll()
# Verify
sps.Syncer.__init__(mocked_syncer, tcomm, scomm)
self.mox.VerifyAll()
self.assertEquals(scomm, mocked_syncer.scomm)
self.assertEquals(tcomm, mocked_syncer.tcomm)
self.assertEquals(None, mocked_syncer.teams)
self.assertEquals(None, mocked_syncer.owners)
self.assertEquals(False, mocked_syncer.pretend)
self.assertEquals(False, mocked_syncer.verbose)
def testReduceTeamName(self):
syncer = sps.Syncer('tcomm_obj', 'scomm_obj')
tests = {
'build/bdavirro': 'build',
'build/rtc': 'build',
'build': 'build',
'UI/zel': 'ui',
'UI': 'ui',
'Build': 'build',
None: None,
}
# Verify
for key in tests:
result = syncer._ReduceTeamName(key)
self.assertEquals(tests[key], result)
def testReduceOwnerName(self):
syncer = sps.Syncer('tcomm_obj', 'scomm_obj')
tests = {
'joe': 'joe',
'Joe': 'joe',
'joe@chromium.org': 'joe',
'Joe@chromium.org': 'joe',
'Joe.Bob@chromium.org': 'joe.bob',
None: None,
}
# Verify
for key in tests:
result = syncer._ReduceOwnerName(key)
self.assertEquals(tests[key], result)
def testSetTeamFilterOK(self):
syncer = sps.Syncer('tcomm_obj', 'scomm_obj')
tests = {
'build:system:ui': set(['build', 'system', 'ui']),
'Build:system:UI': set(['build', 'system', 'ui']),
'kernel': set(['kernel']),
'KERNEL': set(['kernel']),
None: None,
'': None,
}
# Verify
for test in tests:
syncer.SetTeamFilter(test)
self.assertEquals(tests[test], syncer.teams)
def testSetTeamFilterError(self):
syncer = sps.Syncer('tcomm_obj', 'scomm_obj')
# "systems" is not valid (should be "system")
teamarg = 'build:systems'
# Verify
with self.OutputCapturer():
self.assertRaises(SystemExit, sps.Syncer.SetTeamFilter,
syncer, teamarg)
def testSetOwnerFilter(self):
syncer = sps.Syncer('tcomm_obj', 'scomm_obj')
tests = {
'joe:bill:bob': set(['joe', 'bill', 'bob']),
'Joe:Bill:BOB': set(['joe', 'bill', 'bob']),
'joe@chromium.org:bill:bob': set(['joe', 'bill', 'bob']),
'joe': set(['joe']),
'joe@chromium.org': set(['joe']),
'': None,
None: None,
}
# Verify
for test in tests:
syncer.SetOwnerFilter(test)
self.assertEquals(tests[test], syncer.owners)
def testRowPassesFilters(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
row1 = { sps.COL_TEAM: 'build', sps.COL_OWNER: 'joe' }
row2 = { sps.COL_TEAM: 'build', sps.COL_OWNER: 'bob' }
row3 = { sps.COL_TEAM: 'build', sps.COL_OWNER: None }
row4 = { sps.COL_TEAM: None, sps.COL_OWNER: None }
teams1 = set(['build'])
teams2 = set(['kernel'])
teams3 = set(['build', 'ui'])
owners1 = set(['joe'])
owners2 = set(['bob'])
owners3 = set(['joe', 'dan'])
tests = [
{ 'row': row1, 'teams': None, 'owners': None, 'result': True },
{ 'row': row1, 'teams': teams1, 'owners': None, 'result': True },
{ 'row': row1, 'teams': teams2, 'owners': None, 'result': False },
{ 'row': row1, 'teams': teams3, 'owners': None, 'result': True },
{ 'row': row1, 'teams': teams1, 'owners': owners1, 'result': True },
{ 'row': row1, 'teams': None, 'owners': owners2, 'result': False },
{ 'row': row1, 'teams': None, 'owners': owners3, 'result': True },
{ 'row': row2, 'teams': None, 'owners': None, 'result': True },
{ 'row': row2, 'teams': teams1, 'owners': None, 'result': True },
{ 'row': row2, 'teams': teams2, 'owners': None, 'result': False },
{ 'row': row2, 'teams': teams3, 'owners': None, 'result': True },
{ 'row': row2, 'teams': teams1, 'owners': owners1, 'result': False },
{ 'row': row2, 'teams': None, 'owners': owners2, 'result': True },
{ 'row': row2, 'teams': None, 'owners': owners3, 'result': False },
{ 'row': row3, 'teams': None, 'owners': None, 'result': True },
{ 'row': row3, 'teams': teams1, 'owners': None, 'result': True },
{ 'row': row3, 'teams': teams2, 'owners': None, 'result': False },
{ 'row': row3, 'teams': teams3, 'owners': None, 'result': True },
{ 'row': row3, 'teams': teams1, 'owners': owners1, 'result': False },
{ 'row': row3, 'teams': None, 'owners': owners2, 'result': False },
{ 'row': row3, 'teams': None, 'owners': owners3, 'result': False },
{ 'row': row4, 'teams': None, 'owners': None, 'result': True },
{ 'row': row4, 'teams': teams1, 'owners': None, 'result': False },
{ 'row': row4, 'teams': teams1, 'owners': owners1, 'result': False },
{ 'row': row4, 'teams': None, 'owners': owners2, 'result': False },
]
# Replay script
for test in tests:
done = False
if test['teams']:
row_team = test['row'][sps.COL_TEAM]
mocked_syncer._ReduceTeamName(row_team).AndReturn(row_team)
done = row_team not in test['teams']
if not done and test['owners']:
row_owner = test['row'][sps.COL_OWNER]
mocked_syncer._ReduceOwnerName(row_owner).AndReturn(row_owner)
self.mox.ReplayAll()
# Verify
for test in tests:
mocked_syncer.teams = test['teams']
mocked_syncer.owners = test['owners']
result = sps.Syncer._RowPassesFilters(mocked_syncer, test['row'])
msg = ('Expected following row to %s filter, but it did not:\n%r' %
('pass' if test['result'] else 'fail', test['row']))
msg += '\n Using teams filter : %r' % mocked_syncer.teams
msg += '\n Using owners filter: %r' % mocked_syncer.owners
self.assertEquals(test['result'], result, msg)
self.mox.VerifyAll()
def testSyncMissingTrackerColumn(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tcomm = mocked_tcomm
# Replay script
mocked_scomm.GetColumnIndex(sps.COL_TRACKER).AndReturn(None)
self.mox.ReplayAll()
# Verify
self.assertRaises(sps.SyncError, sps.Syncer.Sync, mocked_syncer)
self.mox.VerifyAll()
def testSyncNewIssues(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tcomm = mocked_tcomm
rows = [
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: None },
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: 'joe' },
]
# Replay script
mocked_scomm.GetColumnIndex(sps.COL_TRACKER).AndReturn(1) # Any index ok.
mocked_scomm.GetRows().AndReturn(rows)
for ix in xrange(len(rows)):
mocked_syncer._RowPassesFilters(rows[ix]).AndReturn(True)
mocked_syncer._GenIssueForRow(rows[ix]).AndReturn('NewIssue%d' % ix)
mocked_syncer._GetRowTrackerId(rows[ix]).AndReturn(None)
mocked_syncer._CreateRowIssue(ix + 2, rows[ix], 'NewIssue%d' % ix)
self.mox.ReplayAll()
# Verify
sps.Syncer.Sync(mocked_syncer)
self.mox.VerifyAll()
def testSyncClearIssues(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tcomm = mocked_tcomm
rows = [
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: None },
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: 'joe' },
]
# Replay script
mocked_scomm.GetColumnIndex(sps.COL_TRACKER).AndReturn(1) # Any index ok.
mocked_scomm.GetRows().AndReturn(rows)
for ix in xrange(len(rows)):
mocked_syncer._RowPassesFilters(rows[ix]).AndReturn(True)
mocked_syncer._GenIssueForRow(rows[ix]).AndReturn(None)
mocked_syncer._GetRowTrackerId(rows[ix]).AndReturn(123 + ix)
mocked_syncer._ClearRowIssue(ix + 2, rows[ix])
self.mox.ReplayAll()
# Verify
sps.Syncer.Sync(mocked_syncer)
self.mox.VerifyAll()
def testSyncFilteredOut(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tcomm = mocked_tcomm
rows = [
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: None },
{ sps.COL_PACKAGE: 'd/f', sps.COL_TEAM: 'build', sps.COL_OWNER: 'joe' },
]
# Replay script
mocked_scomm.GetColumnIndex(sps.COL_TRACKER).AndReturn(1) # Any index ok.
mocked_scomm.GetRows().AndReturn(rows)
for ix in xrange(len(rows)):
mocked_syncer._RowPassesFilters(rows[ix]).AndReturn(False)
self.mox.ReplayAll()
# Verify
sps.Syncer.Sync(mocked_syncer)
self.mox.VerifyAll()
def testGetRowValue(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
row = {
self.col_amd64: 'ABC',
self.col_arm: 'XYZ',
self.col_x86: 'FooBar',
sps.COL_TEAM: 'build',
}
# Replay script
self.mox.ReplayAll()
# Verify
result = sps.Syncer._GetRowValue(mocked_syncer, row,
'stateonamd64', 'amd64')
self.assertEquals('ABC', result)
result = sps.Syncer._GetRowValue(mocked_syncer, row,
'stateonarm', 'arm')
self.assertEquals('XYZ', result)
result = sps.Syncer._GetRowValue(mocked_syncer, row,
'stateonamd64', 'amd64')
self.assertEquals('ABC', result)
result = sps.Syncer._GetRowValue(mocked_syncer, row, sps.COL_TEAM)
self.assertEquals('build', result)
self.mox.VerifyAll()
def _TestGenIssueForRowNeedsUpgrade(self, row):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_syncer.default_owner = None
mocked_syncer.scomm = cros_test_lib.EasyAttr(ss_key='SomeSSKey')
# Replay script
for arch in sps.ARCHES:
state = sps.Syncer._GetRowValue(mocked_syncer, row,
utable.UpgradeTable.COL_STATE, arch)
mocked_syncer._GetRowValue(row, utable.UpgradeTable.COL_STATE,
arch).AndReturn(state)
red_team = sps.Syncer._ReduceTeamName(mocked_syncer, row[sps.COL_TEAM])
mocked_syncer._ReduceTeamName(row[sps.COL_TEAM]).AndReturn(red_team)
red_owner = sps.Syncer._ReduceOwnerName(mocked_syncer, row[sps.COL_OWNER])
mocked_syncer._ReduceOwnerName(row[sps.COL_OWNER]).AndReturn(red_owner)
for arch in sps.ARCHES:
mocked_syncer._GetRowValue(row, utable.UpgradeTable.COL_CURRENT_VER,
arch).AndReturn('1')
mocked_syncer._GetRowValue(row,
utable.UpgradeTable.COL_STABLE_UPSTREAM_VER,
arch).AndReturn('2')
mocked_syncer._GetRowValue(row,
utable.UpgradeTable.COL_LATEST_UPSTREAM_VER,
arch).AndReturn('3')
self.mox.ReplayAll()
# Verify
result = sps.Syncer._GenIssueForRow(mocked_syncer, row)
self.mox.VerifyAll()
return result
def testGenIssueForRowNeedsUpgrade1(self):
row = {
self.col_amd64: utable.UpgradeTable.STATE_NEEDS_UPGRADE,
self.col_arm: 'Not important',
self.col_x86: 'Not important',
sps.COL_TEAM: 'build',
sps.COL_OWNER: None,
sps.COL_PACKAGE: 'dev/foo',
}
result = self._TestGenIssueForRowNeedsUpgrade(row)
self.assertEquals(None, result.owner)
self.assertEquals(0, result.id)
self.assertEquals('Untriaged', result.status)
def testGenIssueForRowNeedsUpgrade2(self):
row = {
self.col_amd64: utable.UpgradeTable.STATE_NEEDS_UPGRADE,
self.col_arm: utable.UpgradeTable.STATE_NEEDS_UPGRADE_AND_PATCHED,
self.col_x86: 'Not important',
sps.COL_TEAM: 'build',
sps.COL_OWNER: 'joe',
sps.COL_PACKAGE: 'dev/foo',
}
result = self._TestGenIssueForRowNeedsUpgrade(row)
self.assertEquals('joe@chromium.org', result.owner)
self.assertEquals(0, result.id)
self.assertEquals('Available', result.status)
def testGenIssueForRowNeedsUpgrade3(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
row = {
self.col_amd64: utable.UpgradeTable.STATE_NEEDS_UPGRADE,
self.col_arm: utable.UpgradeTable.STATE_NEEDS_UPGRADE_AND_PATCHED,
self.col_x86: 'Not important',
sps.COL_TEAM: None,
sps.COL_OWNER: 'joe',
sps.COL_PACKAGE: 'dev/foo',
}
# Replay script
for arch in sps.ARCHES:
state = sps.Syncer._GetRowValue(mocked_syncer, row,
utable.UpgradeTable.COL_STATE, arch)
mocked_syncer._GetRowValue(row, utable.UpgradeTable.COL_STATE,
arch).AndReturn(state)
reduced = sps.Syncer._ReduceTeamName(mocked_syncer, row[sps.COL_TEAM])
mocked_syncer._ReduceTeamName(row[sps.COL_TEAM]).AndReturn(reduced)
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
self.assertRaises(RuntimeError, sps.Syncer._GenIssueForRow,
mocked_syncer, row)
self.mox.VerifyAll()
def testGenIssueForRowNoUpgrade(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
row = {
self.col_amd64: 'Not important',
self.col_arm: 'Not important',
self.col_x86: 'Not important',
sps.COL_TEAM: None,
sps.COL_OWNER: 'joe',
sps.COL_PACKAGE: 'dev/foo',
}
# Replay script
for arch in sps.ARCHES:
state = sps.Syncer._GetRowValue(mocked_syncer, row,
utable.UpgradeTable.COL_STATE, arch)
mocked_syncer._GetRowValue(row, utable.UpgradeTable.COL_STATE,
arch).AndReturn(state)
self.mox.ReplayAll()
# Verify
result = sps.Syncer._GenIssueForRow(mocked_syncer, row)
self.mox.VerifyAll()
self.assertEquals(None, result)
def testGetRowTrackerId(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
row = { sps.COL_TRACKER: '321' }
# Replay script
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
result = sps.Syncer._GetRowTrackerId(mocked_syncer, row)
self.mox.VerifyAll()
self.assertEquals(321, result)
def testCreateRowIssuePretend(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_syncer.pretend = True
row = { sps.COL_PACKAGE: 'dev/foo' }
# Replay script
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
sps.Syncer._CreateRowIssue(mocked_syncer, 5, row, 'some_issue')
self.mox.VerifyAll()
def testCreateRowIssue(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tcomm = mocked_tcomm
mocked_syncer.tracker_col_ix = 8
mocked_syncer.pretend = False
row_ix = 5
row = { sps.COL_PACKAGE: 'dev/foo' }
issue = 'SomeIssue'
issue_id = 234
ss_issue_val = 'Hyperlink%d' % issue_id
# Replay script
mocked_scomm.ClearCellValue(row_ix, mocked_syncer.tracker_col_ix)
mocked_tcomm.CreateTrackerIssue(issue).AndReturn(issue_id)
mocked_syncer._GenSSLinkToIssue(issue_id).AndReturn(ss_issue_val)
mocked_scomm.ReplaceCellValue(row_ix, mocked_syncer.tracker_col_ix,
ss_issue_val)
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
sps.Syncer._CreateRowIssue(mocked_syncer, row_ix, row, issue)
self.mox.VerifyAll()
def testGenSSLinkToIssue(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
issue_id = 123
# Replay script
self.mox.ReplayAll()
# Verify
result = sps.Syncer._GenSSLinkToIssue(mocked_syncer, issue_id)
self.mox.VerifyAll()
self.assertEquals('=hyperlink("crbug.com/123";"123")', result)
def testClearRowIssue(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tracker_col_ix = 8
mocked_syncer.pretend = False
row_ix = 44
row = { sps.COL_PACKAGE: 'dev/foo' }
# Replay script
mocked_scomm.ClearCellValue(row_ix, mocked_syncer.tracker_col_ix)
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
sps.Syncer._ClearRowIssue(mocked_syncer, row_ix, row)
self.mox.VerifyAll()
def testClearRowIssuePretend(self):
mocked_syncer = self.mox.CreateMock(sps.Syncer)
mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm)
mocked_syncer.scomm = mocked_scomm
mocked_syncer.tracker_col_ix = 8
mocked_syncer.pretend = True
row_ix = 44
row = { sps.COL_PACKAGE: 'dev/foo' }
# Replay script
self.mox.ReplayAll()
# Verify
with self.OutputCapturer():
sps.Syncer._ClearRowIssue(mocked_syncer, row_ix, row)
self.mox.VerifyAll()
if __name__ == '__main__':
cros_test_lib.main()