# Copyright 2014 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Test the archive_lib module."""

import json
import multiprocessing
from pathlib import Path

from chromite.lib import cros_test_lib
from chromite.lib import metadata_lib
from chromite.lib import parallel


class MetadataTest(cros_test_lib.TestCase):
    """Tests the correctness of various metadata methods."""

    def testGetDict(self) -> None:
        starting_dict = {
            "key1": 1,
            "key2": "2",
            "cl_actions": [("a", 1), ("b", 2)],
            "board-metadata": {
                "board-1": {"info": 432},
            },
        }
        metadata = metadata_lib.CBuildbotMetadata(starting_dict)
        ending_dict = metadata.GetDict()
        self.assertEqual(starting_dict, ending_dict)

    def testGetValue(self) -> None:
        """Test GetValue."""
        starting_dict = {"key1": 1, "key2": "2"}
        metadata = metadata_lib.CBuildbotMetadata(starting_dict)
        self.assertEqual(metadata.GetValue("key1"), 1)
        self.assertEqual(metadata.GetValue("key2"), "2")
        self.assertRaises(KeyError, metadata.GetValue, "key3")

    def testGetJSON(self) -> None:
        """Test GetJSON."""
        starting_dict = {
            "key1": 1,
            "key2": "2",
            "path": Path("/foo"),
        }
        metadata = metadata_lib.CBuildbotMetadata(starting_dict)
        sdata = metadata.GetJSON()
        data = json.loads(sdata)
        self.assertEqual(data["key1"], 1)
        self.assertEqual(data["key2"], "2")
        self.assertEqual(data["path"], "/foo")

    def testGetValueWithDefault(self) -> None:
        """Test GetValueWithDefault."""
        starting_dict = {"key1": 1, "key2": "2"}
        metadata = metadata_lib.CBuildbotMetadata(starting_dict)
        self.assertEqual(metadata.GetValueWithDefault("key1"), 1)
        self.assertEqual(metadata.GetValueWithDefault("key2", 2), "2")
        self.assertEqual(metadata.GetValueWithDefault("key3", 3), 3)

    def testUpdateKeyDictWithDict(self) -> None:
        expected_dict = {str(x): x for x in range(20)}
        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)

        metadata.UpdateKeyDictWithDict("my_dict", expected_dict)

        self.assertEqual(expected_dict, metadata.GetDict()["my_dict"])

    def testExtendKeyListWithList(self) -> None:
        """Test ExtendKeyListWithList."""
        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)

        expected_list = [str(x) for x in range(20)]
        metadata.ExtendKeyListWithList("my_list", expected_list)
        self.assertEqual(expected_list, metadata.GetDict()["my_list"])

        sub_list_1 = [str(x) for x in range(0, 10)]
        sub_list_2 = [str(x) for x in range(10, 20)]
        metadata.ExtendKeyListWithList("my_list_2", sub_list_1)
        metadata.ExtendKeyListWithList("my_list_2", sub_list_2)
        self.assertEqual(expected_list, metadata.GetDict()["my_list_2"])

    def testUpdateKeyDictWithDictMultiprocess(self) -> None:
        expected_dict = {str(x): x for x in range(20)}
        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)

        with parallel.BackgroundTaskRunner(metadata.UpdateKeyDictWithDict) as q:
            for k, v in expected_dict.items():
                q.put(["my_dict", {k: v}])

        self.assertEqual(expected_dict, metadata.GetDict()["my_dict"])

    def testUpdateBoardMetadataWithEmptyDict(self) -> None:
        metadata = metadata_lib.CBuildbotMetadata()
        metadata.UpdateBoardDictWithDict("someboard", {})
        self.assertEqual(metadata.GetDict()["board-metadata"]["someboard"], {})

    def testUpdateBoardMetadataWithMultiprocessDict(self) -> None:
        starting_dict = {
            "key1": 1,
            "key2": "2",
            "cl_actions": [("a", 1), ("b", 2)],
            "board-metadata": {
                "board-1": {"info": 432},
            },
        }

        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(
            metadata_dict=starting_dict, multiprocess_manager=m
        )

        # pylint: disable=no-member
        update_dict = m.dict()
        update_dict["my_key"] = "some value"
        metadata.UpdateBoardDictWithDict("board-1", update_dict)

        self.assertEqual(
            metadata.GetDict()["board-metadata"]["board-1"]["my_key"],
            "some value",
        )

    def testMultiprocessSafety(self) -> None:
        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(multiprocess_manager=m)
        key_dict = {"key1": 1, "key2": 2}
        starting_dict = {
            "key1": 1,
            "key2": "2",
            "key3": key_dict,
            "cl_actions": [("a", 1), ("b", 2)],
            "board-metadata": {
                "board-1": {"info": 432},
            },
        }

        # Test that UpdateWithDict is process-safe
        parallel.RunParallelSteps(
            [lambda: metadata.UpdateWithDict(starting_dict)]
        )
        ending_dict = metadata.GetDict()
        self.assertEqual(starting_dict, ending_dict)

        # Test that UpdateKeyDictWithDict is process-safe
        parallel.RunParallelSteps(
            [lambda: metadata.UpdateKeyDictWithDict("key3", key_dict)]
        )
        ending_dict = metadata.GetDict()
        self.assertEqual(starting_dict, ending_dict)

    def testPerBoardDict(self) -> None:
        starting_per_board_dict = {
            "board-1": {
                "kubrick": 2001,
                "bergman": "persona",
                "hitchcock": "vertigo",
            },
            "board-2": {
                "kubrick": ["barry lyndon", "dr. strangelove"],
                "bergman": "the seventh seal",
            },
        }

        starting_dict = {"board-metadata": starting_per_board_dict}

        m = multiprocessing.Manager()
        metadata = metadata_lib.CBuildbotMetadata(
            metadata_dict=starting_dict, multiprocess_manager=m
        )

        extra_per_board_dict = {
            "board-1": {"kurosawa": "rashomon", "coen brothers": "fargo"},
            "board-3": {
                "hitchcock": "north by northwest",
                "coen brothers": "the big lebowski",
            },
        }

        expected_dict = starting_per_board_dict

        # Write each per board key-value pair to metadata in a separate process.
        with parallel.BackgroundTaskRunner(
            metadata.UpdateBoardDictWithDict
        ) as q:
            for board, board_dict in extra_per_board_dict.items():
                expected_dict.setdefault(board, {}).update(board_dict)
                for k, v in board_dict.items():
                    q.put([board, {k: v}])

        self.assertEqual(expected_dict, metadata.GetDict()["board-metadata"])
