| #!/usr/bin/python |
| |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for autoupdate.py.""" |
| |
| import json |
| import os |
| import shutil |
| import socket |
| import tempfile |
| import unittest |
| |
| import cherrypy |
| import mox |
| |
| import autoupdate |
| import autoupdate_lib |
| import common_util |
| import devserver_constants as constants |
| import xbuddy |
| |
| |
| _TEST_REQUEST = """ |
| <client_test xmlns:o="http://www.google.com/update2/request" updaterversion="%(client)s" protocol="3.0"> |
| <app version="%(version)s" track="%(track)s" board="%(board)s" /> |
| <updatecheck /> |
| <event eventresult="%(event_result)d" eventtype="%(event_type)d" /> |
| </client_test>""" |
| |
| # Test request with additional fields needed for full Omaha protocol. |
| _FULL_TEST_REQUEST = """ |
| <client_test xmlns:o="http://www.google.com/update2/request" updaterversion="%(client)s" protocol="3.0"> |
| <app version="%(version)s" track="%(track)s" board="%(board)s" |
| hardware_class="Test Device" /> |
| <updatecheck /> |
| <event eventresult="%(event_result)d" eventtype="%(event_type)d" /> |
| </client_test>""" |
| |
| #pylint: disable=W0212 |
| class AutoupdateTest(mox.MoxTestBase): |
| """Tests for the autoupdate.Autoupdate class.""" |
| |
| def setUp(self): |
| mox.MoxTestBase.setUp(self) |
| self.mox.StubOutWithMock(common_util, 'GetFileSize') |
| self.mox.StubOutWithMock(common_util, 'GetFileSha1') |
| self.mox.StubOutWithMock(common_util, 'GetFileSha256') |
| self.mox.StubOutWithMock(common_util, 'IsInsideChroot') |
| self.mox.StubOutWithMock(autoupdate_lib, 'GetUpdateResponse') |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, '_GetRemotePayloadAttrs') |
| self.port = 8080 |
| self.test_board = 'test-board' |
| self.build_root = tempfile.mkdtemp('autoupdate_build_root') |
| self.latest_dir = '12345_af_12-a1' |
| self.latest_verision = '12345_af_12' |
| self.static_image_dir = tempfile.mkdtemp('autoupdate_static_dir') |
| self.hostname = '%s:%s' % (socket.gethostname(), self.port) |
| self.test_dict = { |
| 'client': 'ChromeOSUpdateEngine-1.0', |
| 'version': 'ForcedUpdate', |
| 'track': 'test-channel', |
| 'board': self.test_board, |
| 'event_result': 2, |
| 'event_type': 3 |
| } |
| self.test_data = _TEST_REQUEST % self.test_dict |
| self.sha1 = 12345 |
| self.size = 54321 |
| self.url = 'http://%s/static/update.gz' % self.hostname |
| self.payload = 'My payload' |
| self.sha256 = 'SHA LA LA' |
| cherrypy.request.base = 'http://%s' % self.hostname |
| common_util.MkDirP(self.static_image_dir) |
| self._xbuddy = xbuddy.XBuddy(False, |
| static_dir=self.static_image_dir) |
| self.mox.StubOutWithMock(xbuddy.XBuddy, '_GetArtifact') |
| |
| def tearDown(self): |
| shutil.rmtree(self.build_root) |
| shutil.rmtree(self.static_image_dir) |
| |
| def _DummyAutoupdateConstructor(self, **kwargs): |
| """Creates a dummy autoupdater. Used to avoid using constructor.""" |
| dummy = autoupdate.Autoupdate(self._xbuddy, |
| static_dir=self.static_image_dir, |
| **kwargs) |
| return dummy |
| |
| def testGetRightSignedDeltaPayloadDir(self): |
| """Test that our directory is what we expect it to be for signed updates.""" |
| self.mox.StubOutWithMock(common_util, 'GetFileMd5') |
| key_path = 'test_key_path' |
| src_image = 'test_src_image' |
| target_image = 'test_target_image' |
| src_hash = '12345' |
| target_hash = '67890' |
| key_hash = 'abcde' |
| |
| common_util.GetFileMd5(src_image).AndReturn(src_hash) |
| common_util.GetFileMd5(target_image).AndReturn(target_hash) |
| common_util.GetFileMd5(key_path).AndReturn(key_hash) |
| |
| self.mox.ReplayAll() |
| au_mock = self._DummyAutoupdateConstructor() |
| au_mock.private_key = key_path |
| update_dir = au_mock.FindCachedUpdateImageSubDir(src_image, target_image) |
| self.assertEqual(os.path.basename(update_dir), |
| '%s_%s+%s' % (src_hash, target_hash, key_hash)) |
| self.mox.VerifyAll() |
| |
| def testGenerateLatestUpdateImage(self): |
| """Test default behavior in response to plain update call.""" |
| latest_label = os.path.join(self.test_board, self.latest_dir) |
| # Generate a fake latest image |
| latest_image_dir = os.path.join(self.static_image_dir, latest_label) |
| common_util.MkDirP(latest_image_dir) |
| image = os.path.join(latest_image_dir, constants.TEST_IMAGE_FILE) |
| with open(image, 'w') as fh: |
| fh.write('') |
| |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, |
| 'GenerateUpdateImageWithCache') |
| au_mock = self._DummyAutoupdateConstructor() |
| |
| common_util.IsInsideChroot().AndReturn(True) |
| self._xbuddy._GetArtifact( |
| [''], board=self.test_board, lookup_only=True, image_dir=None, |
| version=None).AndReturn((latest_label, constants.TEST_IMAGE_FILE)) |
| |
| au_mock.GenerateUpdateImageWithCache( |
| os.path.join(self.static_image_dir, self.test_board, self.latest_dir, |
| constants.TEST_IMAGE_FILE)).AndReturn('update.gz') |
| |
| self.mox.ReplayAll() |
| test_data = _TEST_REQUEST % self.test_dict |
| self.assertTrue(au_mock.HandleUpdatePing(test_data)) |
| self.mox.VerifyAll() |
| |
| def testHandleUpdatePingForForcedImage(self): |
| """Test update response to having a forced image.""" |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, |
| 'GenerateUpdateImageWithCache') |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, '_StoreMetadataToFile') |
| au_mock = self._DummyAutoupdateConstructor() |
| test_data = _TEST_REQUEST % self.test_dict |
| |
| # Generate a fake image |
| forced_image_dir = '/tmp/path_to_force/' |
| forced_image = forced_image_dir + constants.IMAGE_FILE |
| common_util.MkDirP(forced_image_dir) |
| with open(forced_image, 'w') as fh: |
| fh.write('') |
| |
| cache_image_dir = os.path.join(self.static_image_dir, 'cache') |
| |
| # Mock out GenerateUpdateImageWithCache to make an update file in cache |
| def mock_fn(_image): |
| print 'mock_fn' |
| # No good way to introduce an update file during execution. |
| cache_dir = os.path.join(self.static_image_dir, 'cache') |
| common_util.MkDirP(cache_dir) |
| update_image = os.path.join(cache_dir, constants.UPDATE_FILE) |
| with open(update_image, 'w') as fh: |
| fh.write('') |
| metadata_hash = os.path.join(cache_dir, constants.METADATA_HASH_FILE) |
| with open(metadata_hash, 'w') as fh: |
| fh.write('') |
| |
| common_util.IsInsideChroot().AndReturn(True) |
| au_mock.GenerateUpdateImageWithCache(forced_image).WithSideEffects( |
| mock_fn).AndReturn('cache') |
| |
| common_util.GetFileSha1(os.path.join( |
| cache_image_dir, 'update.gz')).AndReturn(self.sha1) |
| common_util.GetFileSha256(os.path.join( |
| cache_image_dir, 'update.gz')).AndReturn(self.sha256) |
| common_util.GetFileSize(os.path.join( |
| cache_image_dir, 'update.gz')).AndReturn(self.size) |
| au_mock._StoreMetadataToFile(cache_image_dir, |
| mox.IsA(autoupdate.UpdateMetadata)) |
| forced_url = 'http://%s/static/%s/update.gz' % (self.hostname, |
| 'cache') |
| autoupdate_lib.GetUpdateResponse( |
| self.sha1, self.sha256, self.size, forced_url, False, 0, None, None, |
| '3.0', False).AndReturn(self.payload) |
| |
| self.mox.ReplayAll() |
| au_mock.forced_image = forced_image |
| self.assertEqual(au_mock.HandleUpdatePing(test_data), self.payload) |
| self.mox.VerifyAll() |
| |
| def testHandleForcePregenerateXBuddy(self): |
| """Check pregenerating an xbuddy path. |
| |
| A forced image that starts with 'xbuddy:' uses the following path to |
| obtain an update. |
| """ |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, |
| 'GetUpdateForLabel') |
| au_mock = self._DummyAutoupdateConstructor() |
| au_mock.forced_image = "xbuddy:b/v/a" |
| |
| self._xbuddy._GetArtifact(['b', 'v', 'a'], |
| image_dir=None).AndReturn(('label', constants.TEST_IMAGE_FILE)) |
| |
| au_mock.GetUpdateForLabel( |
| autoupdate.FORCED_UPDATE, 'b/v/a').AndReturn('p') |
| self.mox.ReplayAll() |
| |
| au_mock.PreGenerateUpdate() |
| self.mox.VerifyAll() |
| |
| def testChangeUrlPort(self): |
| r = autoupdate._ChangeUrlPort('http://fuzzy:8080/static', 8085) |
| self.assertEqual(r, 'http://fuzzy:8085/static') |
| |
| r = autoupdate._ChangeUrlPort('http://fuzzy/static', 8085) |
| self.assertEqual(r, 'http://fuzzy:8085/static') |
| |
| r = autoupdate._ChangeUrlPort('ftp://fuzzy/static', 8085) |
| self.assertEqual(r, 'ftp://fuzzy:8085/static') |
| |
| r = autoupdate._ChangeUrlPort('ftp://fuzzy', 8085) |
| self.assertEqual(r, 'ftp://fuzzy:8085') |
| |
| def testHandleHostInfoPing(self): |
| au_mock = self._DummyAutoupdateConstructor() |
| self.assertRaises(AssertionError, au_mock.HandleHostInfoPing, None) |
| |
| # Setup fake host_infos entry and ensure it comes back to us in one piece. |
| test_ip = '1.2.3.4' |
| au_mock.host_infos.GetInitHostInfo(test_ip).attrs = self.test_dict |
| self.assertEqual( |
| json.loads(au_mock.HandleHostInfoPing(test_ip)), self.test_dict) |
| |
| def testHandleSetUpdatePing(self): |
| au_mock = self._DummyAutoupdateConstructor() |
| test_ip = '1.2.3.4' |
| test_label = 'test/old-update' |
| self.assertRaises( |
| AssertionError, au_mock.HandleSetUpdatePing, test_ip, None) |
| self.assertRaises( |
| AssertionError, au_mock.HandleSetUpdatePing, None, test_label) |
| self.assertRaises( |
| AssertionError, au_mock.HandleSetUpdatePing, None, None) |
| |
| au_mock.HandleSetUpdatePing(test_ip, test_label) |
| self.assertEqual( |
| au_mock.host_infos.GetHostInfo(test_ip).attrs['forced_update_label'], |
| test_label) |
| |
| def testHandleUpdatePingWithSetUpdate(self): |
| """If update is set, it should use the update found in that directory.""" |
| self.mox.StubOutWithMock(autoupdate.Autoupdate, '_StoreMetadataToFile') |
| au_mock = self._DummyAutoupdateConstructor() |
| |
| test_data = _TEST_REQUEST % self.test_dict |
| test_label = 'new_update-test/the-new-update' |
| new_image_dir = os.path.join(self.static_image_dir, test_label) |
| new_url = self.url.replace('update.gz', test_label + '/update.gz') |
| |
| # Generate a fake payload. |
| common_util.MkDirP(new_image_dir) |
| update_gz = os.path.join(new_image_dir, constants.UPDATE_FILE) |
| with open(update_gz, 'w') as fh: |
| fh.write('') |
| metadata_hash = os.path.join(new_image_dir, constants.METADATA_HASH_FILE) |
| with open(metadata_hash, 'w') as fh: |
| fh.write('') |
| |
| common_util.GetFileSha1(os.path.join( |
| new_image_dir, 'update.gz')).AndReturn(self.sha1) |
| common_util.GetFileSha256(os.path.join( |
| new_image_dir, 'update.gz')).AndReturn(self.sha256) |
| common_util.GetFileSize(os.path.join( |
| new_image_dir, 'update.gz')).AndReturn(self.size) |
| au_mock._StoreMetadataToFile(new_image_dir, |
| mox.IsA(autoupdate.UpdateMetadata)) |
| autoupdate_lib.GetUpdateResponse( |
| self.sha1, self.sha256, self.size, new_url, False, 0, None, None, |
| '3.0', False).AndReturn(self.payload) |
| |
| self.mox.ReplayAll() |
| au_mock.HandleSetUpdatePing('127.0.0.1', test_label) |
| self.assertEqual( |
| au_mock.host_infos.GetHostInfo('127.0.0.1'). |
| attrs['forced_update_label'], |
| test_label) |
| self.assertEqual(au_mock.HandleUpdatePing(test_data), self.payload) |
| self.assertFalse('forced_update_label' in |
| au_mock.host_infos.GetHostInfo('127.0.0.1').attrs) |
| |
| def testGetVersionFromDir(self): |
| au = self._DummyAutoupdateConstructor() |
| |
| # New-style version number. |
| self.assertEqual( |
| au._GetVersionFromDir('/foo/x86-alex/R16-1102.0.2011_09_30_0806-a1'), |
| '1102.0.2011_09_30_0806') |
| |
| def testCanUpdate(self): |
| au = self._DummyAutoupdateConstructor() |
| |
| # When both the client and the server have new-style versions, we should |
| # just compare the tokens directly. |
| self.assertTrue( |
| au._CanUpdate('1098.0.2011_09_28_1635', '1098.0.2011_09_30_0806')) |
| self.assertTrue( |
| au._CanUpdate('1098.0.2011_09_28_1635', '1100.0.2011_09_26_0000')) |
| self.assertFalse( |
| au._CanUpdate('1098.0.2011_09_28_1635', '1098.0.2011_09_26_0000')) |
| self.assertFalse( |
| au._CanUpdate('1098.0.2011_09_28_1635', '1096.0.2011_09_30_0000')) |
| |
| def testHandleUpdatePingRemotePayload(self): |
| remote_urlbase = 'http://remotehost:6666' |
| remote_payload_path = 'static/path/to/update.gz' |
| remote_url = '/'.join([remote_urlbase, remote_payload_path, 'update.gz']) |
| au_mock = self._DummyAutoupdateConstructor(urlbase=remote_urlbase, |
| payload_path=remote_payload_path, |
| remote_payload=True) |
| |
| incomplete_test_data = _TEST_REQUEST % self.test_dict |
| complete_test_data = _FULL_TEST_REQUEST % self.test_dict |
| |
| au_mock._GetRemotePayloadAttrs(remote_url).AndReturn( |
| autoupdate.UpdateMetadata(self.sha1, self.sha256, self.size, False, |
| 0, '')) |
| autoupdate_lib.GetUpdateResponse( |
| self.sha1, self.sha256, self.size, remote_url, False, 0, None, None, |
| '3.0', False).AndReturn(self.payload) |
| |
| self.mox.ReplayAll() |
| # This should fail because of missing fields. |
| self.assertRaises(common_util.DevServerHTTPError, |
| au_mock.HandleUpdatePing, incomplete_test_data) |
| # This should have enough information. |
| self.assertEqual(au_mock.HandleUpdatePing(complete_test_data), self.payload) |
| self.mox.VerifyAll() |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |