| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for the gdata_lib module.""" |
| |
| from __future__ import print_function |
| |
| import getpass |
| import mox |
| import re |
| |
| import atom.service |
| import gdata.projecthosting.client as gd_ph_client |
| import gdata.spreadsheet.service |
| |
| from chromite.lib import cros_test_lib |
| from chromite.lib import gdata_lib |
| from chromite.lib import osutils |
| |
| |
| # pylint: disable=W0212,E1101 |
| |
| |
| class GdataLibTest(cros_test_lib.OutputTestCase): |
| """Tests for methods that escape/unescape strings for speadsheets.""" |
| |
| def testPrepColNameForSS(self): |
| tests = { |
| 'foo': 'foo', |
| 'Foo': 'foo', |
| 'FOO': 'foo', |
| 'foo bar': 'foobar', |
| 'Foo Bar': 'foobar', |
| 'F O O B A R': 'foobar', |
| 'Foo/Bar': 'foobar', |
| 'Foo Bar/Dude': 'foobardude', |
| 'foo/bar': 'foobar', |
| } |
| |
| for col in tests: |
| expected = tests[col] |
| self.assertEquals(expected, gdata_lib.PrepColNameForSS(col)) |
| self.assertEquals(expected, gdata_lib.PrepColNameForSS(expected)) |
| |
| def testPrepValForSS(self): |
| tests = { |
| None: None, |
| '': '', |
| 'foo': 'foo', |
| 'foo123': 'foo123', |
| '123': "'123", |
| '1.2': "'1.2", |
| } |
| |
| for val in tests: |
| expected = tests[val] |
| self.assertEquals(expected, gdata_lib.PrepValForSS(val)) |
| |
| def testPrepRowForSS(self): |
| vals = { |
| None: None, |
| '': '', |
| 'foo': 'foo', |
| 'foo123': 'foo123', |
| '123': "'123", |
| '1.2': "'1.2", |
| } |
| |
| # Create before and after rows (rowIn, rowOut). |
| rowIn = {} |
| rowOut = {} |
| col = 'a' # Column names not important. |
| for valIn in vals: |
| valOut = vals[valIn] |
| rowIn[col] = valIn |
| rowOut[col] = valOut |
| |
| col = chr(ord(col) + 1) # Change column name. |
| |
| self.assertEquals(rowOut, gdata_lib.PrepRowForSS(rowIn)) |
| |
| def testScrubValFromSS(self): |
| tests = { |
| None: None, |
| 'foo': 'foo', |
| '123': '123', |
| "'123": '123', |
| } |
| |
| for val in tests: |
| expected = tests[val] |
| self.assertEquals(expected, gdata_lib.ScrubValFromSS(val)) |
| |
| |
| class CredsTest(cros_test_lib.MockOutputTestCase): |
| """Tests related to user credentials.""" |
| |
| USER = 'somedude@chromium.org' |
| PASSWORD = 'worldsbestpassword' |
| DOCS_TOKEN = 'SomeDocsAuthToken' |
| TRACKER_TOKEN = 'SomeTrackerAuthToken' |
| |
| @osutils.TempFileDecorator |
| def testStoreLoadCreds(self): |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| creds.SetCreds(self.USER, self.PASSWORD) |
| self.assertEquals(self.USER, creds.user) |
| self.assertEquals(self.PASSWORD, creds.password) |
| self.assertTrue(creds.creds_dirty) |
| |
| creds.StoreCreds(self.tempfile) |
| self.assertEquals(self.USER, creds.user) |
| self.assertEquals(self.PASSWORD, creds.password) |
| self.assertFalse(creds.creds_dirty) |
| |
| # Clear user/password before loading from just-created file. |
| creds.user = None |
| creds.password = None |
| self.assertEquals(None, creds.user) |
| self.assertEquals(None, creds.password) |
| |
| creds.LoadCreds(self.tempfile) |
| self.assertEquals(self.USER, creds.user) |
| self.assertEquals(self.PASSWORD, creds.password) |
| self.assertFalse(creds.creds_dirty) |
| |
| @osutils.TempFileDecorator |
| def testStoreLoadToken(self): |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| creds.user = self.USER |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| creds.SetDocsAuthToken(self.DOCS_TOKEN) |
| self.assertEquals(self.DOCS_TOKEN, creds.docs_auth_token) |
| self.assertTrue(creds.token_dirty) |
| creds.SetTrackerAuthToken(self.TRACKER_TOKEN) |
| self.assertEquals(self.TRACKER_TOKEN, creds.tracker_auth_token) |
| self.assertTrue(creds.token_dirty) |
| |
| creds.StoreAuthToken(self.tempfile) |
| self.assertEquals(self.DOCS_TOKEN, creds.docs_auth_token) |
| self.assertEquals(self.TRACKER_TOKEN, creds.tracker_auth_token) |
| self.assertFalse(creds.token_dirty) |
| |
| # Clear auth_tokens before loading from just-created file. |
| creds.docs_auth_token = None |
| creds.tracker_auth_token = None |
| creds.user = None |
| |
| creds.LoadAuthToken(self.tempfile) |
| self.assertEquals(self.DOCS_TOKEN, creds.docs_auth_token) |
| self.assertEquals(self.TRACKER_TOKEN, creds.tracker_auth_token) |
| self.assertFalse(creds.token_dirty) |
| self.assertEquals(self.USER, creds.user) |
| |
| def testSetCreds(self): |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| |
| # This is the test verification. |
| creds.SetCreds(self.USER, password=self.PASSWORD) |
| self.assertEquals(self.USER, creds.user) |
| self.assertEquals(self.PASSWORD, creds.password) |
| self.assertTrue(creds.creds_dirty) |
| |
| def testSetCredsNoPassword(self): |
| # Add test-specific mocks/stubs |
| self.PatchObject(getpass, 'getpass', return_value=self.PASSWORD) |
| |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| |
| # This is the test verification. |
| creds.SetCreds(self.USER) |
| self.assertEquals(self.USER, creds.user) |
| self.assertEquals(self.PASSWORD, creds.password) |
| self.assertTrue(creds.creds_dirty) |
| |
| def testSetDocsToken(self): |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| |
| # This is the test verification. |
| creds.SetDocsAuthToken(self.DOCS_TOKEN) |
| self.assertEquals(self.DOCS_TOKEN, creds.docs_auth_token) |
| self.assertTrue(creds.token_dirty) |
| |
| def testSetTrackerToken(self): |
| # This is the replay script for the test. |
| creds = gdata_lib.Creds() |
| |
| # This is the test verification. |
| creds.SetTrackerAuthToken(self.TRACKER_TOKEN) |
| self.assertEquals(self.TRACKER_TOKEN, creds.tracker_auth_token) |
| self.assertTrue(creds.token_dirty) |
| |
| |
| class SpreadsheetRowTest(cros_test_lib.OutputTestCase): |
| """Tests related to spreadsheet row interaction.""" |
| |
| SS_ROW_OBJ = 'SSRowObj' |
| SS_ROW_NUM = 5 |
| |
| def testEmpty(self): |
| row = gdata_lib.SpreadsheetRow(self.SS_ROW_OBJ, self.SS_ROW_NUM) |
| |
| self.assertEquals(0, len(row)) |
| self.assertEquals(self.SS_ROW_OBJ, row.ss_row_obj) |
| self.assertEquals(self.SS_ROW_NUM, row.ss_row_num) |
| |
| self.assertRaises(TypeError, row, '__setitem__', 'abc', 'xyz') |
| self.assertEquals(0, len(row)) |
| self.assertFalse('abc' in row) |
| |
| def testInit(self): |
| starting_vals = {'abc': 'xyz', 'foo': 'bar'} |
| row = gdata_lib.SpreadsheetRow(self.SS_ROW_OBJ, self.SS_ROW_NUM, |
| starting_vals) |
| |
| self.assertEquals(len(starting_vals), len(row)) |
| self.assertEquals(starting_vals, row) |
| self.assertEquals(row['abc'], 'xyz') |
| self.assertTrue('abc' in row) |
| self.assertEquals(row['foo'], 'bar') |
| self.assertTrue('foo' in row) |
| |
| self.assertEquals(self.SS_ROW_OBJ, row.ss_row_obj) |
| self.assertEquals(self.SS_ROW_NUM, row.ss_row_num) |
| |
| self.assertRaises(TypeError, row, '__delitem__', 'abc') |
| self.assertEquals(len(starting_vals), len(row)) |
| self.assertTrue('abc' in row) |
| |
| |
| class SpreadsheetCommTest(cros_test_lib.MoxOutputTestCase): |
| """Test Speadsheet communication.""" |
| |
| SS_KEY = 'TheSSKey' |
| WS_NAME = 'TheWSName' |
| WS_KEY = 'TheWSKey' |
| |
| USER = 'dude' |
| PASSWORD = 'shhh' |
| TOKEN = 'authtoken' |
| |
| COLUMNS = ('greeting', 'name', 'title') |
| ROWS = ( |
| {'greeting': 'Hi', 'name': 'George', 'title': 'Mr.'}, |
| {'greeting': 'Howdy', 'name': 'Billy Bob', 'title': 'Mr.'}, |
| {'greeting': 'Yo', 'name': 'Adriane', 'title': 'Ms.'}, |
| ) |
| |
| def MockScomm(self, connect=True): |
| """Return a mocked SpreadsheetComm""" |
| mocked_scomm = self.mox.CreateMock(gdata_lib.SpreadsheetComm) |
| |
| mocked_scomm._columns = None |
| mocked_scomm._rows = None |
| |
| if connect: |
| mocked_gdclient = self.mox.CreateMock(gdata_lib.RetrySpreadsheetsService) |
| mocked_scomm.gd_client = mocked_gdclient |
| mocked_scomm.ss_key = self.SS_KEY |
| mocked_scomm.ws_name = self.WS_NAME |
| mocked_scomm.ws_key = self.WS_KEY |
| else: |
| mocked_scomm.gd_client = None |
| mocked_scomm.ss_key = None |
| mocked_scomm.ws_name = None |
| mocked_scomm.ws_key = None |
| |
| return mocked_scomm |
| |
| def NewScomm(self, gd_client=None, connect=True): |
| """Return a non-mocked SpreadsheetComm.""" |
| scomm = gdata_lib.SpreadsheetComm() |
| scomm.gd_client = gd_client |
| |
| if connect: |
| scomm.ss_key = self.SS_KEY |
| scomm.ws_name = self.WS_NAME |
| scomm.ws_key = self.WS_KEY |
| else: |
| scomm.ss_key = None |
| scomm.ws_name = None |
| scomm.ws_key = None |
| |
| return scomm |
| |
| def GenerateCreds(self, skip_user=False, skip_token=False): |
| creds = gdata_lib.Creds() |
| if not skip_user: |
| creds.user = self.USER |
| creds.password = self.PASSWORD |
| |
| if not skip_token: |
| creds.docs_auth_token = self.TOKEN |
| |
| return creds |
| |
| def testConnect(self): |
| mocked_scomm = self.MockScomm(connect=False) |
| creds = self.GenerateCreds() |
| |
| # This is the replay script for the test. |
| mocked_scomm._Login(creds, 'chromiumos') |
| mocked_scomm.SetCurrentWorksheet(self.WS_NAME, ss_key=self.SS_KEY) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.Connect(mocked_scomm, creds, |
| self.SS_KEY, self.WS_NAME) |
| self.mox.VerifyAll() |
| |
| def testColumns(self): |
| """Test the .columns property. Testing a property gets ugly.""" |
| self.mox.StubOutWithMock(gdata.spreadsheet.service, 'CellQuery') |
| mocked_gdclient = self.mox.CreateMock(gdata_lib.RetrySpreadsheetsService) |
| scomm = self.NewScomm(gd_client=mocked_gdclient, connect=True) |
| |
| query = {'max-row': '1'} |
| |
| # Simulate a Cells feed from spreadsheet for the column row. |
| cols = [c[0].upper() + c[1:] for c in self.COLUMNS] |
| entry = [cros_test_lib.EasyAttr( |
| content=cros_test_lib.EasyAttr(text=c)) for c in cols] |
| feed = cros_test_lib.EasyAttr(entry=entry) |
| |
| # This is the replay script for the test. |
| gdata.spreadsheet.service.CellQuery().AndReturn(query) |
| mocked_gdclient.GetCellsFeed( |
| self.SS_KEY, self.WS_KEY, query=query).AndReturn(feed) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = scomm.columns |
| del scomm # Force deletion now before VerifyAll. |
| self.mox.VerifyAll() |
| |
| expected_result = self.COLUMNS |
| self.assertEquals(expected_result, result) |
| |
| def testRows(self): |
| """Test the .rows property. Testing a property gets ugly.""" |
| mocked_gdclient = self.mox.CreateMock(gdata_lib.RetrySpreadsheetsService) |
| scomm = self.NewScomm(gd_client=mocked_gdclient, connect=True) |
| |
| # Simulate a List feed from spreadsheet for all rows. |
| rows = [ |
| {'col_name': 'Joe', 'col_age': '12', 'col_zip': '12345'}, |
| {'col_name': 'Bob', 'col_age': '15', 'col_zip': '54321'}, |
| ] |
| entry = [] |
| for row in rows: |
| custom = dict((k, cros_test_lib.EasyAttr(text=v)) |
| for (k, v) in row.iteritems()) |
| entry.append(cros_test_lib.EasyAttr(custom=custom)) |
| feed = cros_test_lib.EasyAttr(entry=entry) |
| |
| # This is the replay script for the test. |
| mocked_gdclient.GetListFeed(self.SS_KEY, self.WS_KEY).AndReturn(feed) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = scomm.rows |
| del scomm # Force deletion now before VerifyAll. |
| self.mox.VerifyAll() |
| self.assertEquals(tuple(rows), result) |
| |
| # Result tuple should have spreadsheet row num as attribute on each row. |
| self.assertEquals(2, result[0].ss_row_num) |
| self.assertEquals(3, result[1].ss_row_num) |
| |
| # Result tuple should have spreadsheet row obj as attribute on each row. |
| self.assertEquals(entry[0], result[0].ss_row_obj) |
| self.assertEquals(entry[1], result[1].ss_row_obj) |
| |
| def testSetCurrentWorksheetStart(self): |
| mocked_scomm = self.MockScomm(connect=True) |
| |
| # Undo worksheet settings. |
| mocked_scomm.ss_key = None |
| mocked_scomm.ws_name = None |
| mocked_scomm.ws_key = None |
| |
| # This is the replay script for the test. |
| mocked_scomm._ClearCache() |
| mocked_scomm._GetWorksheetKey( |
| self.SS_KEY, self.WS_NAME).AndReturn(self.WS_KEY) |
| mocked_scomm._ClearCache() |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.SetCurrentWorksheet(mocked_scomm, self.WS_NAME, |
| ss_key=self.SS_KEY) |
| self.mox.VerifyAll() |
| |
| self.assertEquals(self.SS_KEY, mocked_scomm.ss_key) |
| self.assertEquals(self.WS_KEY, mocked_scomm.ws_key) |
| self.assertEquals(self.WS_NAME, mocked_scomm.ws_name) |
| |
| def testSetCurrentWorksheetRestart(self): |
| mocked_scomm = self.MockScomm(connect=True) |
| |
| other_ws_name = 'OtherWSName' |
| other_ws_key = 'OtherWSKey' |
| |
| # This is the replay script for the test. |
| mocked_scomm._GetWorksheetKey( |
| self.SS_KEY, other_ws_name).AndReturn(other_ws_key) |
| mocked_scomm._ClearCache() |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.SetCurrentWorksheet(mocked_scomm, other_ws_name) |
| self.mox.VerifyAll() |
| |
| self.assertEquals(self.SS_KEY, mocked_scomm.ss_key) |
| self.assertEquals(other_ws_key, mocked_scomm.ws_key) |
| self.assertEquals(other_ws_name, mocked_scomm.ws_name) |
| |
| def testClearCache(self): |
| rows = 'SomeRows' |
| cols = 'SomeColumns' |
| |
| scomm = self.NewScomm() |
| scomm._rows = rows |
| scomm._columns = cols |
| |
| scomm._ClearCache(keep_columns=True) |
| self.assertTrue(scomm._rows is None) |
| self.assertEquals(cols, scomm._columns) |
| |
| scomm._rows = rows |
| scomm._columns = cols |
| |
| scomm._ClearCache(keep_columns=False) |
| self.assertTrue(scomm._rows is None) |
| self.assertTrue(scomm._columns is None) |
| |
| scomm._rows = rows |
| scomm._columns = cols |
| |
| scomm._ClearCache() |
| self.assertTrue(scomm._rows is None) |
| self.assertTrue(scomm._columns is None) |
| |
| def testLoginWithUserPassword(self): |
| mocked_scomm = self.MockScomm(connect=False) |
| creds = self.GenerateCreds(skip_token=True) |
| |
| self.mox.StubOutClassWithMocks(gdata_lib, 'RetrySpreadsheetsService') |
| |
| source = 'SomeSource' |
| |
| # This is the replay script for the test. |
| mocked_gdclient = gdata_lib.RetrySpreadsheetsService() |
| mocked_gdclient.ProgrammaticLogin() |
| mocked_gdclient.GetClientLoginToken().AndReturn(self.TOKEN) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| gdata_lib.SpreadsheetComm._Login(mocked_scomm, creds, source) |
| self.mox.VerifyAll() |
| self.assertEquals(self.USER, mocked_gdclient.email) |
| self.assertEquals(self.PASSWORD, mocked_gdclient.password) |
| self.assertEquals(self.TOKEN, creds.docs_auth_token) |
| self.assertEquals(source, mocked_gdclient.source) |
| self.assertEquals(mocked_gdclient, mocked_scomm.gd_client) |
| |
| def testLoginWithToken(self): |
| mocked_scomm = self.MockScomm(connect=False) |
| creds = self.GenerateCreds(skip_user=True) |
| |
| self.mox.StubOutClassWithMocks(gdata_lib, 'RetrySpreadsheetsService') |
| |
| source = 'SomeSource' |
| |
| # This is the replay script for the test. |
| mocked_gdclient = gdata_lib.RetrySpreadsheetsService() |
| mocked_gdclient.SetClientLoginToken(creds.docs_auth_token) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| gdata_lib.SpreadsheetComm._Login(mocked_scomm, creds, source) |
| self.mox.VerifyAll() |
| self.assertFalse(hasattr(mocked_gdclient, 'email')) |
| self.assertFalse(hasattr(mocked_gdclient, 'password')) |
| self.assertEquals(source, mocked_gdclient.source) |
| self.assertEquals(mocked_gdclient, mocked_scomm.gd_client) |
| |
| def testGetWorksheetKey(self): |
| mocked_scomm = self.MockScomm() |
| |
| entrylist = [ |
| cros_test_lib.EasyAttr( |
| title=cros_test_lib.EasyAttr(text='Foo'), id='NotImportant'), |
| cros_test_lib.EasyAttr( |
| title=cros_test_lib.EasyAttr(text=self.WS_NAME), |
| id=cros_test_lib.EasyAttr(text='/some/path/%s' % self.WS_KEY)), |
| cros_test_lib.EasyAttr( |
| title=cros_test_lib.EasyAttr(text='Bar'), id='NotImportant'), |
| ] |
| feed = cros_test_lib.EasyAttr(entry=entrylist) |
| |
| # This is the replay script for the test. |
| mocked_scomm.gd_client.GetWorksheetsFeed(self.SS_KEY).AndReturn(feed) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm._GetWorksheetKey(mocked_scomm, |
| self.SS_KEY, self.WS_NAME) |
| self.mox.VerifyAll() |
| |
| def testGetColumns(self): |
| mocked_scomm = self.MockScomm() |
| mocked_scomm.columns = 'SomeColumns' |
| |
| # Replay script |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = gdata_lib.SpreadsheetComm.GetColumns(mocked_scomm) |
| self.mox.VerifyAll() |
| self.assertEquals('SomeColumns', result) |
| |
| def testGetColumnIndex(self): |
| # Note that spreadsheet column indices start at 1. |
| mocked_scomm = self.MockScomm() |
| mocked_scomm.columns = ['these', 'are', 'column', 'names'] |
| |
| # This is the replay script for the test. |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = gdata_lib.SpreadsheetComm.GetColumnIndex(mocked_scomm, 'are') |
| self.mox.VerifyAll() |
| self.assertEquals(2, result) |
| |
| def testGetRows(self): |
| mocked_scomm = self.MockScomm() |
| rows = [] |
| for row_ix, row_dict in enumerate(self.ROWS): |
| rows.append(gdata_lib.SpreadsheetRow('SSRowObj%d' % (row_ix + 2), |
| (row_ix + 2), row_dict)) |
| mocked_scomm.rows = tuple(rows) |
| |
| # This is the replay script for the test. |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = gdata_lib.SpreadsheetComm.GetRows(mocked_scomm) |
| self.mox.VerifyAll() |
| self.assertEquals(self.ROWS, result) |
| for row_ix in xrange(len(self.ROWS)): |
| self.assertEquals(row_ix + 2, result[row_ix].ss_row_num) |
| self.assertEquals('SSRowObj%d' % (row_ix + 2), result[row_ix].ss_row_obj) |
| |
| def testGetRowCacheByCol(self): |
| mocked_scomm = self.MockScomm() |
| |
| # This is the replay script for the test. |
| mocked_scomm.GetRows().AndReturn(self.ROWS) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = gdata_lib.SpreadsheetComm.GetRowCacheByCol(mocked_scomm, 'name') |
| self.mox.VerifyAll() |
| |
| # Result is a dict of rows by the 'name' column. |
| for row in self.ROWS: |
| name = row['name'] |
| self.assertEquals(row, result[name]) |
| |
| def testGetRowCacheByColDuplicates(self): |
| mocked_scomm = self.MockScomm() |
| |
| # Create new row list with duplicates by name column. |
| rows = [] |
| for row in self.ROWS: |
| new_row = dict(row) |
| new_row['greeting'] = row['greeting'] + ' there' |
| rows.append(new_row) |
| |
| rows.extend(self.ROWS) |
| |
| # This is the replay script for the test. |
| mocked_scomm.GetRows().AndReturn(tuple(rows)) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| result = gdata_lib.SpreadsheetComm.GetRowCacheByCol(mocked_scomm, 'name') |
| self.mox.VerifyAll() |
| |
| # Result is a dict of rows by the 'name' column. In this |
| # test each result should be a list of the rows with the same |
| # value in the 'name' column. |
| num_rows = len(rows) |
| for ix in xrange(num_rows / 2): |
| row1 = rows[ix] |
| row2 = rows[ix + (num_rows / 2)] |
| |
| name = row1['name'] |
| self.assertEquals(name, row2['name']) |
| |
| expected_rows = [row1, row2] |
| self.assertEquals(expected_rows, result[name]) |
| |
| def testInsertRow(self): |
| mocked_scomm = self.MockScomm() |
| |
| row = 'TheRow' |
| |
| # Replay script |
| mocked_scomm.gd_client.InsertRow(row, mocked_scomm.ss_key, |
| mocked_scomm.ws_key) |
| mocked_scomm._ClearCache(keep_columns=True) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.InsertRow(mocked_scomm, row) |
| self.mox.VerifyAll() |
| |
| def testUpdateRowCellByCell(self): |
| mocked_scomm = self.MockScomm() |
| |
| rowIx = 5 |
| row = {'a': 123, 'b': 234, 'c': 345} |
| colIndices = {'a': 1, 'b': None, 'c': 4} |
| |
| # Replay script |
| for colName in row: |
| colIx = colIndices[colName] |
| mocked_scomm.GetColumnIndex(colName).AndReturn(colIx) |
| if colIx is not None: |
| mocked_scomm.ReplaceCellValue(rowIx, colIx, row[colName]) |
| mocked_scomm._ClearCache(keep_columns=True) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.UpdateRowCellByCell(mocked_scomm, rowIx, row) |
| self.mox.VerifyAll() |
| |
| def testDeleteRow(self): |
| mocked_scomm = self.MockScomm() |
| |
| ss_row = 'TheRow' |
| |
| # Replay script |
| mocked_scomm.gd_client.DeleteRow(ss_row) |
| mocked_scomm._ClearCache(keep_columns=True) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| gdata_lib.SpreadsheetComm.DeleteRow(mocked_scomm, ss_row) |
| self.mox.VerifyAll() |
| |
| def testReplaceCellValue(self): |
| mocked_scomm = self.MockScomm() |
| |
| rowIx = 14 |
| colIx = 4 |
| val = 'TheValue' |
| |
| # Replay script |
| mocked_scomm.gd_client.UpdateCell(rowIx, colIx, val, |
| mocked_scomm.ss_key, mocked_scomm.ws_key) |
| mocked_scomm._ClearCache(keep_columns=True) |
| self.mox.ReplayAll() |
| |
| # Verify |
| gdata_lib.SpreadsheetComm.ReplaceCellValue(mocked_scomm, rowIx, colIx, val) |
| self.mox.VerifyAll() |
| |
| def testClearCellValue(self): |
| mocked_scomm = self.MockScomm() |
| |
| rowIx = 14 |
| colIx = 4 |
| |
| # Replay script |
| mocked_scomm.ReplaceCellValue(rowIx, colIx, None) |
| self.mox.ReplayAll() |
| |
| # Verify |
| gdata_lib.SpreadsheetComm.ClearCellValue(mocked_scomm, rowIx, colIx) |
| self.mox.VerifyAll() |
| |
| |
| class IssueCommentTest(cros_test_lib.TestCase): |
| """Test creating comments.""" |
| |
| def testInit(self): |
| title = 'Greetings, Earthlings' |
| text = 'I come in peace.' |
| ic = gdata_lib.IssueComment(title, text) |
| |
| self.assertEquals(title, ic.title) |
| self.assertEquals(text, ic.text) |
| |
| |
| def createTrackerIssue(tid, labels, owner, status, content, title): |
| tissue = cros_test_lib.EasyAttr() |
| tissue.id = cros_test_lib.EasyAttr( |
| text='http://www/some/path/%d' % tid) |
| tissue.label = [cros_test_lib.EasyAttr(text=l) for l in labels] |
| tissue.owner = cros_test_lib.EasyAttr( |
| username=cros_test_lib.EasyAttr(text=owner)) |
| tissue.status = cros_test_lib.EasyAttr(text=status) |
| tissue.content = cros_test_lib.EasyAttr(text=content) |
| tissue.title = cros_test_lib.EasyAttr(text=title) |
| return tissue |
| |
| |
| class IssueTest(cros_test_lib.MoxTestCase): |
| """Test creating a bug.""" |
| |
| def testInitOverride(self): |
| owner = 'somedude@chromium.org' |
| status = 'Assigned' |
| issue = gdata_lib.Issue(owner=owner, status=status) |
| |
| self.assertEquals(owner, issue.owner) |
| self.assertEquals(status, issue.status) |
| |
| def testInitInvalidOverride(self): |
| self.assertRaises(ValueError, gdata_lib.Issue, |
| foobar='NotARealAttr') |
| |
| def testInitFromTracker(self): |
| # Need to create a dummy Tracker Issue object. |
| tissue_id = 123 |
| tissue_labels = ['Iteration-10', 'Effort-2'] |
| tissue_owner = 'thedude@chromium.org' |
| tissue_status = 'Available' |
| tissue_content = 'The summary message' |
| tissue_title = 'The Big Title' |
| |
| tissue = createTrackerIssue(tid=tissue_id, labels=tissue_labels, |
| owner=tissue_owner, status=tissue_status, |
| content=tissue_content, title=tissue_title) |
| |
| mocked_issue = self.mox.CreateMock(gdata_lib.Issue) |
| |
| # Replay script |
| mocked_issue.GetTrackerIssueComments(tissue_id, 'TheProject').AndReturn([]) |
| self.mox.ReplayAll() |
| |
| # Verify |
| gdata_lib.Issue.InitFromTracker(mocked_issue, tissue, 'TheProject') |
| self.mox.VerifyAll() |
| self.assertEquals(tissue_id, mocked_issue.id) |
| self.assertEquals(tissue_labels, mocked_issue.labels) |
| self.assertEquals(tissue_owner, mocked_issue.owner) |
| self.assertEquals(tissue_status, mocked_issue.status) |
| self.assertEquals(tissue_content, mocked_issue.summary) |
| self.assertEquals(tissue_title, mocked_issue.title) |
| self.assertEquals([], mocked_issue.comments) |
| |
| |
| class TrackerCommTest(cros_test_lib.MoxOutputTestCase): |
| """Test bug tracker communication.""" |
| |
| def testConnectEmail(self): |
| source = 'TheSource' |
| token = 'TheToken' |
| creds = gdata_lib.Creds() |
| creds.user = 'dude' |
| creds.password = 'shhh' |
| creds.tracker_auth_token = None |
| self.mox.StubOutClassWithMocks(gd_ph_client, 'ProjectHostingClient') |
| mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm) |
| |
| def set_token(*_args, **_kwargs): |
| mocked_itclient.auth_token = cros_test_lib.EasyAttr(token_string=token) |
| |
| # Replay script |
| mocked_itclient = gd_ph_client.ProjectHostingClient() |
| mocked_itclient.ClientLogin( |
| creds.user, creds.password, source=source, service='code', |
| account_type='GOOGLE').WithSideEffects(set_token) |
| self.mox.ReplayAll() |
| |
| # Verify |
| with self.OutputCapturer(): |
| gdata_lib.TrackerComm.Connect(mocked_tcomm, creds, 'TheProject', |
| source=source) |
| self.mox.VerifyAll() |
| self.assertEquals(mocked_tcomm.it_client, mocked_itclient) |
| |
| def testConnectToken(self): |
| source = 'TheSource' |
| token = 'TheToken' |
| creds = gdata_lib.Creds() |
| creds.user = 'dude' |
| creds.password = 'shhh' |
| creds.tracker_auth_token = token |
| mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm) |
| |
| self.mox.StubOutClassWithMocks(gd_ph_client, 'ProjectHostingClient') |
| self.mox.StubOutClassWithMocks(gdata.gauth, 'ClientLoginToken') |
| |
| # Replay script |
| mocked_itclient = gd_ph_client.ProjectHostingClient() |
| mocked_token = gdata.gauth.ClientLoginToken(token) |
| self.mox.ReplayAll() |
| |
| # Verify |
| with self.OutputCapturer(): |
| gdata_lib.TrackerComm.Connect(mocked_tcomm, creds, 'TheProject', |
| source=source) |
| self.mox.VerifyAll() |
| self.assertEquals(mocked_tcomm.it_client, mocked_itclient) |
| self.assertEquals(mocked_itclient.auth_token, mocked_token) |
| |
| def testGetTrackerIssueById(self): |
| mocked_itclient = self.mox.CreateMock(gd_ph_client.ProjectHostingClient) |
| tcomm = gdata_lib.TrackerComm() |
| tcomm.it_client = mocked_itclient |
| tcomm.project_name = 'TheProject' |
| |
| self.mox.StubOutClassWithMocks(gd_ph_client, 'Query') |
| self.mox.StubOutClassWithMocks(gdata_lib, 'Issue') |
| self.mox.StubOutWithMock(gdata_lib.Issue, 'InitFromTracker') |
| |
| issue_id = 12345 |
| feed = cros_test_lib.EasyAttr(entry=['hi', 'there']) |
| |
| # Replay script |
| mocked_query = gd_ph_client.Query(issue_id=str(issue_id)) |
| mocked_itclient.get_issues( |
| 'TheProject', query=mocked_query).AndReturn(feed) |
| mocked_issue = gdata_lib.Issue() |
| mocked_issue.InitFromTracker(feed.entry[0], 'TheProject') |
| self.mox.ReplayAll() |
| |
| # Verify |
| issue = tcomm.GetTrackerIssueById(issue_id) |
| self.mox.VerifyAll() |
| self.assertEquals(mocked_issue, issue) |
| |
| def testGetTrackerIssuesByText(self): |
| author = 'TheAuthor' |
| project = 'TheProject' |
| text = "find me" |
| |
| # Set up the fake tracker issue. |
| tissue_id = 1 |
| tissue_labels = ['auto-filed'] |
| tissue_owner = 'someone@chromium.org' |
| tissue_status = 'Available' |
| tissue_content = 'find me in body' |
| tissue_title = 'find me in title' |
| |
| tissue = createTrackerIssue(tid=tissue_id, labels=tissue_labels, |
| owner=tissue_owner, status=tissue_status, |
| content=tissue_content, title=tissue_title) |
| |
| issue = gdata_lib.Issue(id=tissue_id, labels=tissue_labels, |
| owner=tissue_owner, status=tissue_status, |
| title=tissue_title, summary=tissue_content) |
| |
| # This will get called as part of Issue.InitFromTracker. |
| self.mox.StubOutWithMock(gdata_lib.Issue, 'GetTrackerIssueComments') |
| |
| mocked_itclient = self.mox.CreateMock(gd_ph_client.ProjectHostingClient) |
| |
| tcomm = gdata_lib.TrackerComm() |
| tcomm.author = author |
| tcomm.it_client = mocked_itclient |
| tcomm.project_name = project |
| |
| # We expect a Query instance to be passed into get_issues. |
| # pylint: disable=E1120 |
| self.mox.StubOutClassWithMocks(gd_ph_client, 'Query') |
| |
| mocked_query = gd_ph_client.Query(text_query='%s is:open' % text) |
| feed = cros_test_lib.EasyAttr(entry=[tissue]) |
| mocked_itclient.get_issues(project, query=mocked_query).AndReturn(feed) |
| gdata_lib.Issue.GetTrackerIssueComments(1, project).AndReturn([]) |
| |
| self.mox.ReplayAll() |
| |
| issues = tcomm.GetTrackerIssuesByText(text) |
| self.assertEquals(issues, [issue]) |
| |
| def testCreateTrackerIssue(self): |
| author = 'TheAuthor' |
| mocked_itclient = self.mox.CreateMock(gd_ph_client.ProjectHostingClient) |
| mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm) |
| mocked_tcomm.author = author |
| mocked_tcomm.it_client = mocked_itclient |
| mocked_tcomm.project_name = 'TheProject' |
| |
| issue = cros_test_lib.EasyAttr(title='TheTitle', |
| summary='TheSummary', |
| status='TheStatus', |
| owner='TheOwner', |
| labels='TheLabels', |
| ccs=[]) |
| |
| # Replay script |
| issue_id = cros_test_lib.EasyAttr( |
| id=cros_test_lib.EasyAttr(text='foo/bar/123')) |
| mocked_itclient.add_issue( |
| project_name='TheProject', |
| title=issue.title, |
| content=issue.summary, |
| author=author, |
| status=issue.status, |
| owner=issue.owner, |
| labels=issue.labels, |
| ccs=issue.ccs).AndReturn(issue_id) |
| self.mox.ReplayAll() |
| |
| # Verify |
| result = gdata_lib.TrackerComm.CreateTrackerIssue(mocked_tcomm, issue) |
| self.mox.VerifyAll() |
| self.assertEquals(123, result) |
| |
| def testAppendTrackerIssueById(self): |
| author = 'TheAuthor' |
| project_name = 'TheProject' |
| mocked_itclient = self.mox.CreateMock(gd_ph_client.ProjectHostingClient) |
| mocked_tcomm = self.mox.CreateMock(gdata_lib.TrackerComm) |
| mocked_tcomm.author = author |
| mocked_tcomm.it_client = mocked_itclient |
| mocked_tcomm.project_name = project_name |
| |
| issue_id = 54321 |
| comment = 'TheComment' |
| |
| # Replay script |
| mocked_itclient.update_issue(project_name=project_name, |
| issue_id=issue_id, |
| author=author, |
| comment=comment, |
| owner=None) |
| self.mox.ReplayAll() |
| |
| # Verify |
| result = gdata_lib.TrackerComm.AppendTrackerIssueById(mocked_tcomm, |
| issue_id, comment) |
| self.mox.VerifyAll() |
| self.assertEquals(issue_id, result) |
| |
| |
| class RetrySpreadsheetsServiceTest(cros_test_lib.MoxOutputTestCase): |
| """Test Spreadsheet server retry helper.""" |
| |
| def testRequest(self): |
| """Test that calling request method invokes _RetryRequest wrapper.""" |
| # pylint: disable=W0212 |
| |
| self.mox.StubOutWithMock(gdata_lib.RetrySpreadsheetsService, |
| '_RetryRequest') |
| |
| # Use a real RetrySpreadsheetsService object rather than a mocked |
| # one, because the .request method only exists if __init__ is run. |
| # Also split up __new__ and __init__ in order to grab the original |
| # rss.request method (inherited from base class at that point). |
| rss = gdata_lib.RetrySpreadsheetsService.__new__( |
| gdata_lib.RetrySpreadsheetsService) |
| orig_request = rss.request |
| rss.__init__() |
| |
| args = ('GET', 'http://foo.bar') |
| |
| # This is the replay script for the test. |
| gdata_lib.RetrySpreadsheetsService._RetryRequest( |
| orig_request, *args).AndReturn('wrapped') |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| retval = rss.request(*args) |
| self.mox.VerifyAll() |
| self.assertEquals('wrapped', retval) |
| |
| def _TestHttpClientRetryRequest(self, statuses): |
| """Test retry logic in http_client request during ProgrammaticLogin. |
| |
| |statuses| is list of http codes to simulate, where 200 means success. |
| """ |
| expect_success = statuses[-1] == 200 |
| |
| self.mox.StubOutWithMock(atom.http.ProxiedHttpClient, 'request') |
| rss = gdata_lib.RetrySpreadsheetsService() |
| |
| args = ('POST', 'https://www.google.com/accounts/ClientLogin') |
| def _read(): |
| return 'Some response text' |
| |
| # This is the replay script for the test. |
| # Simulate the return codes in statuses. |
| for status in statuses: |
| retstatus = cros_test_lib.EasyAttr(status=status, read=_read) |
| atom.http.ProxiedHttpClient.request( |
| *args, data=mox.IgnoreArg(), |
| headers=mox.IgnoreArg()).AndReturn(retstatus) |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| if expect_success: |
| rss.ProgrammaticLogin() |
| else: |
| self.assertRaises(gdata.service.Error, rss.ProgrammaticLogin) |
| self.mox.VerifyAll() |
| |
| if not expect_success: |
| # Retries did not help, request still failed. |
| regexp = re.compile(r'^Giving up on HTTP request') |
| self.AssertOutputContainsWarning(regexp=regexp) |
| elif len(statuses) > 1: |
| # Warning expected if retries were needed. |
| self.AssertOutputContainsWarning() |
| else: |
| # First try worked, expect no warnings. |
| self.AssertOutputContainsWarning(invert=True) |
| |
| def testHttpClientRetryRequest(self): |
| self._TestHttpClientRetryRequest([200]) |
| |
| def testHttpClientRetryRequest403(self): |
| self._TestHttpClientRetryRequest([403, 200]) |
| |
| def testHttpClientRetryRequest403x2(self): |
| self._TestHttpClientRetryRequest([403, 403, 200]) |
| |
| def testHttpClientRetryRequest403x3(self): |
| self._TestHttpClientRetryRequest([403, 403, 403, 200]) |
| |
| def testHttpClientRetryRequest403x4(self): |
| self._TestHttpClientRetryRequest([403, 403, 403, 403, 200]) |
| |
| def testHttpClientRetryRequest403x5(self): |
| # This one should exhaust the retries. |
| self._TestHttpClientRetryRequest([403, 403, 403, 403, 403]) |
| |
| def _TestRetryRequest(self, statuses): |
| """Test retry logic for request method. |
| |
| |statuses| is list of http codes to simulate, where 200 means success. |
| """ |
| expect_success = statuses[-1] == 200 |
| expected_status_index = len(statuses) - 1 if expect_success else 0 |
| |
| mocked_ss = self.mox.CreateMock(gdata_lib.RetrySpreadsheetsService) |
| args = ('GET', 'http://foo.bar') |
| |
| # This is the replay script for the test. |
| for ix, status in enumerate(statuses): |
| # Add index of status to track which status the request function is |
| # returning. It is expected to return the last return status if |
| # successful (retries or not), but first return status if failed. |
| retval = cros_test_lib.EasyAttr(status=status, index=ix) |
| mocked_ss.request(*args).AndReturn(retval) |
| |
| self.mox.ReplayAll() |
| |
| # This is the test verification. |
| with self.OutputCapturer(): |
| # pylint: disable=W0212 |
| rval = gdata_lib.RetrySpreadsheetsService._RetryRequest(mocked_ss, |
| mocked_ss.request, |
| *args) |
| self.mox.VerifyAll() |
| self.assertEquals(statuses[expected_status_index], rval.status) |
| self.assertEquals(expected_status_index, rval.index) |
| |
| if not expect_success: |
| # Retries did not help, request still failed. |
| regexp = re.compile(r'^Giving up on HTTP request') |
| self.AssertOutputContainsWarning(regexp=regexp) |
| elif expected_status_index > 0: |
| # Warning expected if retries were needed. |
| self.AssertOutputContainsWarning() |
| else: |
| # First try worked, expect no warnings. |
| self.AssertOutputContainsWarning(invert=True) |
| |
| def testRetryRequest(self): |
| self._TestRetryRequest([200]) |
| |
| def testRetryRequest403(self): |
| self._TestRetryRequest([403, 200]) |
| |
| def testRetryRequest403x2(self): |
| self._TestRetryRequest([403, 403, 200]) |
| |
| def testRetryRequest403x3(self): |
| self._TestRetryRequest([403, 403, 403, 200]) |
| |
| def testRetryRequest403x4(self): |
| self._TestRetryRequest([403, 403, 403, 403, 200]) |
| |
| def testRetryRequest403x5(self): |
| # This one should exhaust the retries. |
| self._TestRetryRequest([403, 403, 403, 403, 403]) |