blob: 423c12c1e14533d3625f69b8fed57785676b59df [file] [log] [blame]
# Copyright 2016 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module that contains unittests for auth module."""
import time
from unittest import mock
from chromite.lib import auth
from chromite.lib import cros_test_lib
class AuthTest(cros_test_lib.RunCommandTestCase):
"""Test cases for methods in auth."""
def setUp(self):
self.PatchObject(time, "sleep")
self.PatchObject(auth, "GetLuciAuth", return_value="luci-auth")
self.PatchObject(
auth, "GetLuciGitCreds", return_value="git-credential-luci"
)
def testLoginFailed(self):
"""Test Login failing."""
self.rc.AddCmdResult(["luci-auth", "login"], stderr="", returncode=1)
self.assertRaises(auth.AccessTokenError, auth.Login)
def testLoginPassed(self):
"""Test Login working."""
self.rc.AddCmdResult(["luci-auth", "login"], stdout="")
self.assertIsNone(auth.Login())
def testTokenFailed(self):
"""Test Token failing."""
self.rc.AddCmdResult(["luci-auth", "token"], stderr="", returncode=1)
self.assertRaises(auth.AccessTokenError, auth.Token)
def testTokenPassed(self):
"""Test Token working."""
self.rc.AddCmdResult(["luci-auth", "token"], stdout="token")
self.assertEqual(auth.Token(), "token")
def testTokenAndLoginIfNeed(self):
"""Test TokenAndLoginIfNeed."""
# pylint: disable=protected-access
sc_json = "/tmp/service_account.json"
mock_login = self.PatchObject(auth, "Login", return_value=None)
mock_token = self.PatchObject(auth, "Token", return_value="token")
auth._TokenAndLoginIfNeed(service_account_json=sc_json)
mock_token.assert_called_once_with(service_account_json=sc_json)
mock_login.assert_not_called()
mock_login.reset_mock()
mock_token.reset_mock()
auth._TokenAndLoginIfNeed(
service_account_json=sc_json, force_token_renew=True
)
mock_token.assert_called_once_with(service_account_json=sc_json)
mock_login.assert_called_once_with(service_account_json=sc_json)
mock_login.reset_mock()
mock_token = self.PatchObject(
auth, "Token", side_effect=auth.AccessTokenError()
)
self.assertRaises(
auth.AccessTokenError,
auth._TokenAndLoginIfNeed,
service_account_json=sc_json,
)
mock_token.assert_called_once_with(service_account_json=sc_json)
mock_login.assert_called_once_with(service_account_json=sc_json)
mock_login.reset_mock()
mock_token.reset_mock()
self.assertRaises(
auth.AccessTokenError,
auth._TokenAndLoginIfNeed,
service_account_json=sc_json,
force_token_renew=True,
)
mock_token.assert_called_once_with(service_account_json=sc_json)
mock_login.assert_called_once_with(service_account_json=sc_json)
def testGetAccessToken(self):
"""Test GetAccessToken."""
mock_login = self.PatchObject(auth, "Login", return_value=None)
mock_token = self.PatchObject(
auth, "Token", side_effect=auth.AccessTokenError()
)
auth.GetAccessToken()
self.assertEqual(mock_login.call_count, auth.RETRY_GET_ACCESS_TOKEN + 1)
self.assertEqual(mock_token.call_count, auth.RETRY_GET_ACCESS_TOKEN + 1)
mock_login.reset_mock()
mock_token.reset_mock()
auth.GetAccessToken(force_token_renew=True)
self.assertEqual(mock_login.call_count, auth.RETRY_GET_ACCESS_TOKEN + 1)
self.assertEqual(mock_token.call_count, auth.RETRY_GET_ACCESS_TOKEN + 1)
def testGitCredsFailed(self):
"""Test git-credential-luci failing."""
self.rc.AddCmdResult(
["git-credential-luci", "get"], stderr="", returncode=1
)
self.assertRaises(auth.AccessTokenError, auth.GitCreds)
def testGitCredsPassed(self):
"""Test git-credential-luci working."""
stdout = "\n".join(
[
"user=some-luci-user",
"password=some-git-password",
]
)
self.rc.AddCmdResult(["git-credential-luci", "get"], stdout=stdout)
self.assertEqual(auth.GitCreds(), "some-git-password")
def testGitCredsNoPassword(self):
"""Test git-credential-luci returning unknown output."""
stdout = "\n".join(
[
"unknown stdout format, line #1",
"unknown stdout format, line #2",
]
)
self.rc.AddCmdResult(["git-credential-luci", "get"], stdout=stdout)
self.assertRaises(auth.AccessTokenError, auth.GitCreds)
class AuthorizedHttp(cros_test_lib.MockTestCase):
"""Test cases for AuthorizedHttp."""
def setUp(self):
self.mock_http = mock.Mock()
self.mock_resp = mock.Mock()
self.mock_get_token = self.PatchObject(
auth, "GetAccessToken", return_value="token"
)
self.account_json = "service_account_json"
def testAuthorize(self):
self.mock_resp.status = 200
self.mock_http.request.return_value = self.mock_resp, "content"
auth_http = auth.AuthorizedHttp(
auth.GetAccessToken,
self.mock_http,
service_account_json=self.account_json,
)
auth_http.request("url", "GET", body={}, headers={})
auth_http.request("url", "PUT", body={}, headers={})
auth_http.request("url", "POST", body={}, headers={})
self.assertEqual(1, self.mock_get_token.call_count)
self.assertEqual(3, self.mock_http.request.call_count)
self.mock_get_token.assert_called_with(
service_account_json=self.account_json
)
def testAuthorize2(self):
self.mock_resp.status = 401
self.mock_http.request.return_value = self.mock_resp, "content"
auth_http = auth.AuthorizedHttp(
auth.GetAccessToken,
self.mock_http,
service_account_json=self.account_json,
)
auth_http.request("url", "GET", body={}, headers={})
auth_http.request("url", "PUT", body={}, headers={})
auth_http.request("url", "POST", body={}, headers={})
self.assertEqual(4, self.mock_get_token.call_count)
self.assertEqual(6, self.mock_http.request.call_count)
self.mock_get_token.assert_called_with(
service_account_json=self.account_json, force_token_renew=True
)
def testAuthorize3(self):
self.mock_resp.status = 500
self.mock_http.request.return_value = self.mock_resp, "content"
auth_http = auth.AuthorizedHttp(auth.GetAccessToken, self.mock_http)
auth_http.request("url", "GET", body={}, headers={})
auth_http.request("url", "PUT", body={}, headers={})
auth_http.request("url", "POST", body={}, headers={})
self.assertEqual(1, self.mock_get_token.call_count)
self.mock_get_token.assert_called_with()