| #!/usr/bin/python2 |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import mox |
| import socket |
| import unittest |
| |
| from config import rpm_config |
| import rpm_dispatcher |
| |
| DUT_SAME_RPM1 = 'chromeos-rack8e-hostbs1' |
| DUT_SAME_RPM2 = 'chromeos-rack8e-hostbs2' |
| RPM_HOSTNAME = 'chromeos-rack8e-rpm1' |
| DUT_DIFFERENT_RPM = 'chromeos-rack1-hostbs1' |
| FAKE_DISPATCHER_URI = 'fake-dispatcher' |
| FAKE_DISPATCHER_PORT = 9999 |
| FRONT_END_PORT = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') |
| PROPER_URI_FORMAT = 'http://%s:%d' |
| |
| |
| class TestRPMDispatcher(mox.MoxTestBase): |
| """ |
| Simple unit tests to verify that the RPM Dispatcher properly registers with |
| the frontend server, and also initializes and reuses the same RPM Controller |
| for DUT requests on the same RPM. |
| |
| queue_request is the only public method of RPM Dispatcher, however its logic |
| is simple and relies mostly on the private methods; therefore, I am testing |
| primarily RPMDispatcher initialization and _get_rpm_controller (which calls |
| _create_rpm_controller) to verify correct implementation. |
| """ |
| |
| def setUp(self): |
| super(TestRPMDispatcher, self).setUp() |
| self.frontend_mock = self.mox.CreateMockAnything() |
| expected_uri = PROPER_URI_FORMAT % (FAKE_DISPATCHER_URI, |
| FAKE_DISPATCHER_PORT) |
| self.frontend_mock.register_dispatcher(expected_uri) |
| rpm_dispatcher.xmlrpclib.ServerProxy = self.mox.CreateMockAnything() |
| frontend_uri = 'http://%s:%d' % (socket.gethostname(), FRONT_END_PORT) |
| rpm_dispatcher.xmlrpclib.ServerProxy(frontend_uri).AndReturn( |
| self.frontend_mock) |
| rpm_dispatcher.atexit = self.mox.CreateMockAnything() |
| rpm_dispatcher.atexit.register(mox.IgnoreArg()) |
| self.mox.ReplayAll() |
| self.dispatcher = rpm_dispatcher.RPMDispatcher(FAKE_DISPATCHER_URI, |
| FAKE_DISPATCHER_PORT) |
| |
| |
| def testRegistration(self): |
| """ |
| Make sure that as a dispatcher is initialized it properly registered |
| with the frontend server. |
| """ |
| self.mox.VerifyAll() |
| |
| |
| def testGetSameRPMController(self): |
| """ |
| Make sure that calls to _get_rpm_controller with DUT hostnames that |
| belong to the same RPM device create and retrieve the same RPMController |
| instance. |
| """ |
| controller1 = self.dispatcher._get_rpm_controller(RPM_HOSTNAME) |
| controller2 = self.dispatcher._get_rpm_controller(RPM_HOSTNAME) |
| self.assertEquals(controller1, controller2) |
| |
| |
| def testGetDifferentRPMController(self): |
| """ |
| Make sure that calls to _get_rpm_controller with DUT hostnames that |
| belong to the different RPM device create and retrieve different |
| RPMController instances. |
| """ |
| controller1 = self.dispatcher._get_rpm_controller(DUT_SAME_RPM1) |
| controller2 = self.dispatcher._get_rpm_controller(DUT_DIFFERENT_RPM) |
| self.assertNotEquals(controller1, controller2) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |