# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Test signer_payloads_client library."""

from __future__ import print_function

import base64
import os
import shutil
import tempfile

import mock

from chromite.lib import chroot_util
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib import gs_unittest
from chromite.lib import osutils
from chromite.lib import remote_access
from chromite.lib.paygen import gslock
from chromite.lib.paygen import gspaths
from chromite.lib.paygen import signer_payloads_client


# pylint: disable=protected-access


class SignerPayloadsClientGoogleStorageTest(gs_unittest.AbstractGSContextTest,
                                            cros_test_lib.TempDirTestCase):
  """Test suite for the class SignerPayloadsClientGoogleStorage."""

  orig_timeout = (
      signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS)

  def setUp(self):
    """Setup for tests, and store off some standard expected values."""
    self.hash_names = [
        '1.payload.hash',
        '2.payload.hash',
        '3.payload.hash']

    self.build_uri = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
                      'payloads/signing/foo-unique')

    # Some tests depend on this timeout. Make it smaller, then restore.
    signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS = 0.01

  def tearDown(self):
    """Teardown after tests, and restore values test might adjust."""
    # Some tests modify this timeout. Restore the original value.
    signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS = (
        self.orig_timeout)

  def createStandardClient(self):
    """Test helper method to create a client with standard arguments."""

    client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
        build=gspaths.Build(channel='foo-channel',
                            board='foo-board',
                            version='foo-version',
                            bucket='foo-bucket'),
        work_dir=self.tempdir,
        unique='foo-unique',
        ctx=self.ctx)
    return client

  def testUris(self):
    """Test that the URIs on the client are correct."""

    client = self.createStandardClient()

    expected_build_uri = self.build_uri

    self.assertEqual(
        client.signing_base_dir,
        expected_build_uri)

    self.assertEqual(
        client.archive_uri,
        expected_build_uri + '/payload.hash.tar.bz2')

  def testWorkDir(self):
    """Test that the work_dir is generated/passed correctly."""
    client = self.createStandardClient()
    self.assertIsNotNone(client._work_dir)

    client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
        build=gspaths.Build(), work_dir='/foo-dir')
    self.assertEqual(client._work_dir, '/foo-dir')

  def testCleanSignerFilesByKeyset(self):
    """Test the keyset specific cleanup works as expected."""

    hashes = ('hash-1', 'hash-2')
    keyset = 'foo-keys'

    lock_uri = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
                'foo-version,payloads,signing,foo-unique,'
                'foo-keys.payload.signer.instructions.lock')

    signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
                   'payloads/signing/foo-unique')

    expected_removals = (
        # Signing Request
        'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
        'payloads,signing,foo-unique,'
        'foo-keys.payload.signer.instructions',

        # Signing Instructions
        signing_dir + '/foo-keys.payload.signer.instructions',

        # Signed Results`
        signing_dir + '/1.payload.hash.foo-keys.signed.bin',
        signing_dir + '/1.payload.hash.foo-keys.signed.bin.md5',
        signing_dir + '/2.payload.hash.foo-keys.signed.bin',
        signing_dir + '/2.payload.hash.foo-keys.signed.bin.md5',
    )

    client = self.createStandardClient()

    # Fake lock failed then acquired.
    lock = self.PatchObject(gslock, 'Lock', autospec=True,
                            side_effect=[gslock.LockNotAcquired(),
                                         mock.MagicMock()])

    # Do the work.
    client._CleanSignerFilesByKeyset(hashes, keyset)

    # Assert locks created with expected lock_uri.
    lock.assert_called_with(lock_uri)

    # Verify all expected files were removed.
    for uri in expected_removals:
      self.gs_mock.assertCommandContains(['rm', uri])

  def testCleanSignerFiles(self):
    """Test that GS cleanup works as expected."""

    hashes = ('hash-1', 'hash-2')
    keysets = ('foo-keys-1', 'foo-keys-2')

    lock_uri1 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
                 'foo-version,payloads,signing,foo-unique,'
                 'foo-keys-1.payload.signer.instructions.lock')

    lock_uri2 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
                 'foo-version,payloads,signing,foo-unique,'
                 'foo-keys-2.payload.signer.instructions.lock')

    signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
                   'payloads/signing/foo-unique')

    expected_removals = (
        # Signing Request
        'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
        'payloads,signing,foo-unique,'
        'foo-keys-1.payload.signer.instructions',

        'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
        'payloads,signing,foo-unique,'
        'foo-keys-2.payload.signer.instructions',

        # Signing Instructions
        signing_dir + '/foo-keys-1.payload.signer.instructions',
        signing_dir + '/foo-keys-2.payload.signer.instructions',

        # Signed Results
        signing_dir + '/1.payload.hash.foo-keys-1.signed.bin',
        signing_dir + '/1.payload.hash.foo-keys-1.signed.bin.md5',
        signing_dir + '/2.payload.hash.foo-keys-1.signed.bin',
        signing_dir + '/2.payload.hash.foo-keys-1.signed.bin.md5',
        signing_dir + '/1.payload.hash.foo-keys-2.signed.bin',
        signing_dir + '/1.payload.hash.foo-keys-2.signed.bin.md5',
        signing_dir + '/2.payload.hash.foo-keys-2.signed.bin',
        signing_dir + '/2.payload.hash.foo-keys-2.signed.bin.md5',
    )

    client = self.createStandardClient()

    # Fake lock failed then acquired.
    lock = self.PatchObject(gslock, 'Lock', autospec=True)

    # Do the work.
    client._CleanSignerFiles(hashes, keysets)

    # Check created with lock_uri1, lock_uri2.
    self.assertEqual(lock.call_args_list,
                     [mock.call(lock_uri1), mock.call(lock_uri2)])

    # Verify expected removals.
    for uri in expected_removals:
      self.gs_mock.assertCommandContains(['rm', uri])

    self.gs_mock.assertCommandContains(['rm', signing_dir])

  def testCreateInstructionsUri(self):
    """Test that the expected instructions URI is correct."""

    client = self.createStandardClient()

    signature_uri = client._CreateInstructionsURI('keyset_foo')

    expected_signature_uri = (
        self.build_uri +
        '/keyset_foo.payload.signer.instructions')

    self.assertEqual(signature_uri, expected_signature_uri)

  def testCreateHashNames(self):
    """Test that the expected hash names are generated."""

    client = self.createStandardClient()

    hash_names = client._CreateHashNames(3)

    expected_hash_names = self.hash_names

    self.assertEqual(hash_names, expected_hash_names)

  def testCreateSignatureURIs(self):
    """Test that the expected signature URIs are generated."""

    client = self.createStandardClient()

    signature_uris = client._CreateSignatureURIs(self.hash_names,
                                                 'keyset_foo')

    expected_signature_uris = [
        self.build_uri + '/1.payload.hash.keyset_foo.signed.bin',
        self.build_uri + '/2.payload.hash.keyset_foo.signed.bin',
        self.build_uri + '/3.payload.hash.keyset_foo.signed.bin',
    ]

    self.assertEqual(signature_uris, expected_signature_uris)

  def testCreateArchive(self):
    """Test that we can correctly archive up hash values for the signer."""

    client = self.createStandardClient()

    tmp_dir = None
    hashes = [b'Hash 1', b'Hash 2', b'Hash 3']

    try:
      with tempfile.NamedTemporaryFile() as archive_file:
        client._CreateArchive(archive_file.name, hashes, self.hash_names)

        # Make sure the archive file created exists
        self.assertExists(archive_file.name)

        tmp_dir = tempfile.mkdtemp()

        cmd = ['tar', '-xjf', archive_file.name]
        cros_build_lib.run(
            cmd, stdout=True, stderr=True, cwd=tmp_dir)

        # Check that the expected (and only the expected) contents are present
        extracted_file_names = os.listdir(tmp_dir)
        self.assertEqual(len(extracted_file_names), len(self.hash_names))
        for name in self.hash_names:
          self.assertTrue(name in extracted_file_names)

        # Make sure each file has the expected contents
        for h, hash_name in zip(hashes, self.hash_names):
          with open(os.path.join(tmp_dir, hash_name), 'rb') as f:
            self.assertEqual([h], f.readlines())

    finally:
      # Clean up at the end of the test
      if tmp_dir:
        shutil.rmtree(tmp_dir)

  def testCreateInstructions(self):
    """Test that we can correctly create signer instructions."""

    client = self.createStandardClient()

    instructions = client._CreateInstructions(self.hash_names, 'keyset_foo')

    expected_instructions = """
# Auto-generated instruction file for signing payload hashes.

[insns]
generate_metadata = false
keyset = keyset_foo
channel = foo

input_files = %s
output_names = @BASENAME@.@KEYSET@.signed

[general]
type = update_payload
board = foo-board

archive = payload.hash.tar.bz2

# We reuse version for version rev because we may not know the
# correct versionrev "R24-1.2.3"
version = foo-version
versionrev = foo-version
""" % ' '.join(['1.payload.hash',
                '2.payload.hash',
                '3.payload.hash'])

    self.assertEqual(instructions, expected_instructions)

  def testSignerRequestUri(self):
    """Test that we can create signer request URI."""

    client = self.createStandardClient()

    instructions_uri = client._CreateInstructionsURI('foo_keyset')
    signer_request_uri = client._SignerRequestUri(instructions_uri)

    expected = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
                'foo-version,payloads,signing,foo-unique,'
                'foo_keyset.payload.signer.instructions')

    self.assertEqual(signer_request_uri, expected)

  def testWaitForSignaturesInstant(self):
    """Test that we can correctly wait for a list of URIs to be created."""
    uris = ['foo', 'bar', 'is']

    # All Urls exist.
    exists = self.PatchObject(self.ctx, 'Exists', return_value=True)

    client = self.createStandardClient()

    self.assertTrue(client._WaitForSignatures(uris, timeout=0.02))

    # Make sure it really looked for every URL listed.
    self.assertEqual(exists.call_args_list,
                     [mock.call(u) for u in uris])

  def testWaitForSignaturesNever(self):
    """Test that we can correctly timeout waiting for a list of URIs."""
    uris = ['foo', 'bar', 'is']

    # Default mock GSContext behavior is nothing Exists.
    client = self.createStandardClient()
    self.assertFalse(client._WaitForSignatures(uris, timeout=0.02))

    # We don't care which URLs it checked, since it doesn't have to check
    # them all in this case.


class SignerPayloadsClientIntegrationTest(cros_test_lib.MockTempDirTestCase):
  """Test suite integration with live signer servers."""

  def setUp(self):
    # This is in the real production chromeos-releases, but the listed
    # build has never, and will never exist.
    self.client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
        gspaths.Build(channel='test-channel',
                      board='crostools-client',
                      version='Rxx-Ryy',
                      bucket='chromeos-releases'),
        work_dir=self.tempdir)

  def testDownloadSignatures(self):
    """Test that we can correctly download a list of URIs."""
    def fake_copy(uri, sig):
      """Just write the uri address to the content of the file."""
      osutils.WriteFile(sig, uri, mode='wb')

    self.PatchObject(self.client._ctx, 'Copy', side_effect=fake_copy)

    uris = [b'gs://chromeos-releases-test/sigining-test/foo',
            b'gs://chromeos-releases-test/sigining-test/bar']
    downloads = self.client._DownloadSignatures(uris)
    self.assertEqual(downloads, uris)

  @cros_test_lib.NetworkTest()
  def testGetHashSignatures(self):
    """Integration test that talks to the real signer with test hashes."""
    ctx = gs.GSContext()

    clean_uri = ('gs://chromeos-releases/test-channel/%s/crostools-client/**' %
                 (cros_build_lib.GetRandomString(),))

    # Cleanup before we start
    ctx.Remove(clean_uri, ignore_missing=True)

    try:
      hashes = [b'0' * 32,
                b'1' * 32,
                (b'29834370e415b3124a926c903906f18b'
                 b'3d52e955147f9e6accd67e9512185a63')]

      keysets = ['update_signer']

      expected_sigs_hex = (
          ('ba4c7a86b786c609bf6e4c5fb9c47525608678caa532bea8acc457aa6dd32b43'
           '5f094b331182f2e167682916990c40ff7b6b0128de3fa45ad0fd98041ec36d6f'
           '63b867bcf219804200616590a41a727c2685b48340efb4b480f1ef448fc7bc3f'
           'b1c4b53209e950ecc721b07a52a41d9c025fd25602340c93d5295211308caa29'
           'a03ed18516cf61411c508097d5b47620d643ed357b05213b2b9fa3a3f938d6c4'
           'f52b85c3f9774edc376902458344d1c1cd72bc932f033c076c76fee2400716fe'
           '652306871ba923021ce245e0c778ad9e0e50e87a169b2aea338c4dc8b5c0c716'
           'aabfb6133482e8438b084a09503db27ca546e910f8938f7805a8a76a3b0d0241',),

          ('2d909ca5b33a7fb6f2323ca0bf9de2e4f2266c73da4b6948a517dffa96783e08'
           'ca36411d380f6e8a20011f599d8d73576b2a141a57c0873d089726e24f62c7e0'
           '346ba5fbde68414b0f874b627fb1557a6e9658c8fac96c54f458161ea770982b'
           'fa9fe514120635e5ccb32e8219b9069cb0bf8063fba48d60d649c5af203cccef'
           'ca5dbc2191f81f0215edbdee4ec8c1553e69b83036aca3e840227d317ff6cf8b'
           '968c973f698db1ce59f6871303dcdbe839400c5df4d2e6e505d68890010a4459'
           '6ca9fee77f4db6ea3448d98018437c319fc8c5f4603ef94b04e3a4eafa206b73'
           '91a2640d43128310285bc0f1c7e5060d37c433d663b1c6f01110b9a43f2a74f4',),

          ('23791c99ab937f1ae5d4988afc9ceca39c290ac90e3da9f243f9a0b1c86c3c32'
           'ab7241d43dfc233da412bab989cf02f15a01fe9ea4b2dc7dc9182117547836d6'
           '9310af3aa005ee3a6deb9602bc676dcc103bf3f7831d64ab844b4785c5c8b4b1'
           '4467e6b5ab6bf34c12f7534e0d5140151c8f28e8276e703dd6332c2bab9e7f4a'
           '495215998ff56e476b81bd6b8d765e1f87da50c22cd52c9afa8c43a6528ab898'
           '6d7a273d9136d5aff5c4d95985d16eeec7380539ef963e0784a0de42b42890df'
           'c83702179f69f5c6eca4630807fbc4ab6241017e0942b15feada0b240e9729bf'
           '33bf456bd419da63302477e147963550a45c6cf60925ff48ad7b309fa158dcb2',))

      expected_sigs = [[base64.b16decode(x[0], True)]
                       for x in expected_sigs_hex]

      all_signatures = self.client.GetHashSignatures(hashes, keysets)

      self.assertEqual(all_signatures, expected_sigs)
      self.assertRaises(gs.GSNoSuchKey, ctx.List, clean_uri)

    finally:
      # Cleanup when we are over.
      ctx.Remove(clean_uri, ignore_missing=True)


class UnofficialPayloadSignerTest(cros_test_lib.TestCase):
  """Test suit for testing unofficial local payload signer."""
  def setUp(self):
    # UnofficialSignerPayloadsClient need a temporary directory inside chroot so
    # cros_test_lib.TempDirTestCase will not work if we run this unittest
    # outside the chroot.
    with chroot_util.TempDirInChroot(delete=False) as dir_in_chroot:
      self._temp_dir = dir_in_chroot

    self._client = signer_payloads_client.UnofficialSignerPayloadsClient(
        private_key=remote_access.TEST_PRIVATE_KEY, work_dir=self._temp_dir)

  def cleanUp(self):
    shutil.rmtree(self._temp_dir)

  def testExtractPublicKey(self):
    """Tests the correct command is run to extract the public key."""
    with tempfile.NamedTemporaryFile() as public_key:
      self._client.ExtractPublicKey(public_key.name)
      self.assertIn(b'BEGIN PUBLIC KEY', public_key.read())

  def testGetHashSignatures(self):
    """Tests we correclty sign given hashes."""
    hashes = (b'0' * 32, b'1' * 32)
    keyset = 'foo-keys'
    expected_sigs_hex = (
        '4732bf3c12b5795d5f4dd015cf8a65d8294186710f71e1530aa3b10d364ed15dc71ce'
        'f3bed312fd3f805d1b4ee79c1b868f7b8e175199d1c145838044fa3d037d67b142140'
        'a7187cb18cd8fb6897cc88481cb258e9ba87e6508a3eb2670b13542107dfea51417ab'
        'b3ee30c8ea81242d1b69c92b8c531e7b3799a28285c26ce5e9834648cf9601bdaf042'
        '57ffe97111b7497e14e4530ef9d4e9b6cdbf473304fee948af68fb6c992340e20bcf7'
        '8f0c6c28d7ea7d8f35322bc61d5d1456b3b868c16bfca9747887750e2734544b2dc0d'
        '22f68866e6456243ad53fc847d957b7fc1e87d0b4eafbb98b61810a86bf5b587b7241'
        'd0f92ba4323da3fc57ccd883fdc4d',
        '63cefceefb45688c5af557949fd0ec408245f5867e1453d2037c2125511a0ef5d7ead'
        '88cfebe04f6cda176a91707a6002a3a618fbb0ae8c956c0e7b56e1f29fb50c3ec3d47'
        'e786131c07b14d15cf768bbaceefd8d900526d79f08a985df910f31d33c97ec9f1599'
        '02e8478f4d0a1766bb21ea81677c67d11b2b17b0f4cf41599dcadb76549ee0b69badc'
        'a94ba5fae3b54cea75468a7a670991ba595f622d21ccb1f47edf0366503200e4ee7fe'
        'c686908f9099e4fc53f4a963769b42b856d2a8c94a15318d0620b7ed0b425989c599f'
        'fd363390a82c175f4ebab80f46d2f07585f5924fe1014233ba76ca6a2b047baee0231'
        '41ab38a027f56c963dd70550204ad',
    )

    expected_sigs = [[base64.b16decode(x, True)] for x in expected_sigs_hex]

    signatures = self._client.GetHashSignatures(hashes, keyset)

    self.assertEqual(signatures, expected_sigs)
