#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tests for heatmap_generator.py."""


import os
import unittest
import unittest.mock as mock

from heatmaps import heatmap_generator


def _write_perf_mmap(pid, tid, addr, size, fp):
    print(
        "0 0 0 0 PERF_RECORD_MMAP2 %d/%d: "
        "[%x(%x) @ 0x0 0:0 0 0] "
        "r-xp /opt/google/chrome/chrome\n" % (pid, tid, addr, size),
        file=fp,
    )


def _write_perf_fork(pid_from, tid_from, pid_to, tid_to, fp):
    print(
        "0 0 0 0 PERF_RECORD_FORK(%d:%d):(%d:%d)\n"
        % (pid_to, tid_to, pid_from, tid_from),
        file=fp,
    )


def _write_perf_exit(pid_from, tid_from, pid_to, tid_to, fp):
    print(
        "0 0 0 0 PERF_RECORD_EXIT(%d:%d):(%d:%d)\n"
        % (pid_to, tid_to, pid_from, tid_from),
        file=fp,
    )


def _write_perf_sample(pid, tid, addr, fp):
    print(
        "0 0 0 0 PERF_RECORD_SAMPLE(IP, 0x2): "
        "%d/%d: %x period: 100000 addr: 0" % (pid, tid, addr),
        file=fp,
    )
    print(" ... thread: chrome:%d" % tid, file=fp)
    print(" ...... dso: /opt/google/chrome/chrome\n", file=fp)


def _heatmap(file_name, page_size=4096, hugepage=None, analyze=False, top_n=10):
    generator = heatmap_generator.HeatmapGenerator(
        file_name, page_size, hugepage, "", log_level="none"
    )  # Don't log to stdout
    generator.draw()
    if analyze:
        generator.analyze("/path/to/chrome", top_n)


def _cleanup(file_name):
    files = [
        file_name,
        "out.txt",
        "inst-histo.txt",
        "inst-histo-hp.txt",
        "inst-histo-sp.txt",
        "heat_map.png",
        "timeline.png",
        "addr2symbol.txt",
    ]
    for f in files:
        if os.path.exists(f):
            os.remove(f)


class HeatmapGeneratorDrawTests(unittest.TestCase):
    """All of our tests for heatmap_generator.draw() and related."""

    def test_with_one_mmap_one_sample(self):
        """Tests one perf record and one sample."""
        fname = "test.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            _write_perf_sample(101, 101, 0xABCD101, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname)
        self.assertIn("out.txt", os.listdir("."))
        with open("out.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 1)
            self.assertIn("101/101: 1 0", lines[0])

    def test_with_one_mmap_multiple_samples(self):
        """Tests one perf record and three samples."""
        fname = "test.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            _write_perf_sample(101, 101, 0xABCD101, f)
            _write_perf_sample(101, 101, 0xABCD102, f)
            _write_perf_sample(101, 101, 0xABCE102, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname)
        self.assertIn("out.txt", os.listdir("."))
        with open("out.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 3)
            self.assertIn("101/101: 1 0", lines[0])
            self.assertIn("101/101: 2 0", lines[1])
            self.assertIn("101/101: 3 4096", lines[2])

    def test_with_fork_and_exit(self):
        """Tests perf fork and perf exit."""
        fname = "test_fork.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            _write_perf_fork(101, 101, 202, 202, f)
            _write_perf_sample(101, 101, 0xABCD101, f)
            _write_perf_sample(202, 202, 0xABCE101, f)
            _write_perf_exit(202, 202, 202, 202, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname)
        self.assertIn("out.txt", os.listdir("."))
        with open("out.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 2)
            self.assertIn("101/101: 1 0", lines[0])
            self.assertIn("202/202: 2 4096", lines[1])

    def test_hugepage_creates_two_chrome_mmaps(self):
        """Test two chrome mmaps for the same process."""
        fname = "test_hugepage.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
            _write_perf_fork(101, 101, 202, 202, f)
            _write_perf_mmap(202, 202, 0xABCD000, 0x100, f)
            _write_perf_mmap(202, 202, 0xABCD300, 0xD00, f)
            _write_perf_sample(101, 101, 0xABCD102, f)
            _write_perf_sample(202, 202, 0xABCD102, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname)
        self.assertIn("out.txt", os.listdir("."))
        with open("out.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 2)
            self.assertIn("101/101: 1 0", lines[0])
            self.assertIn("202/202: 2 0", lines[1])

    def test_hugepage_creates_two_chrome_mmaps_fail(self):
        """Test two chrome mmaps for the same process."""
        fname = "test_hugepage.txt"
        # Cases where first_mmap.size < second_mmap.size
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
            _write_perf_fork(101, 101, 202, 202, f)
            _write_perf_mmap(202, 202, 0xABCD000, 0x10000, f)
        self.addCleanup(_cleanup, fname)
        with self.assertRaises(AssertionError) as msg:
            _heatmap(fname)
        self.assertIn("Original MMAP size", str(msg.exception))

        # Cases where first_mmap.address > second_mmap.address
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
            _write_perf_fork(101, 101, 202, 202, f)
            _write_perf_mmap(202, 202, 0xABCC000, 0x10000, f)
        with self.assertRaises(AssertionError) as msg:
            _heatmap(fname)
        self.assertIn("Original MMAP starting address", str(msg.exception))

        # Cases where first_mmap.address + size <
        # second_mmap.address + second_mmap.size
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
            _write_perf_fork(101, 101, 202, 202, f)
            _write_perf_mmap(202, 202, 0xABCD100, 0x10000, f)
        with self.assertRaises(AssertionError) as msg:
            _heatmap(fname)
        self.assertIn("exceeds the end of original MMAP", str(msg.exception))

    def test_histogram(self):
        """Tests if the tool can generate correct histogram.

        In the tool, histogram is generated from statistics
        of perf samples (saved to out.txt). The histogram is
        generated by perf-to-inst-page.sh and saved to
        inst-histo.txt. It will be used to draw heat maps.
        """
        fname = "test_histo.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            for i in range(100):
                _write_perf_sample(101, 101, 0xABCD000 + i, f)
                _write_perf_sample(101, 101, 0xABCE000 + i, f)
                _write_perf_sample(101, 101, 0xABFD000 + i, f)
                _write_perf_sample(101, 101, 0xAFCD000 + i, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname)
        self.assertIn("inst-histo.txt", os.listdir("."))
        with open("inst-histo.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 4)
            self.assertIn("100 0", lines[0])
            self.assertIn("100 4096", lines[1])
            self.assertIn("100 196608", lines[2])
            self.assertIn("100 4194304", lines[3])

    def test_histogram_two_mb_page(self):
        """Tests handling of 2MB page."""
        fname = "test_histo.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            for i in range(100):
                _write_perf_sample(101, 101, 0xABCD000 + i, f)
                _write_perf_sample(101, 101, 0xABCE000 + i, f)
                _write_perf_sample(101, 101, 0xABFD000 + i, f)
                _write_perf_sample(101, 101, 0xAFCD000 + i, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname, page_size=2 * 1024 * 1024)
        self.assertIn("inst-histo.txt", os.listdir("."))
        with open("inst-histo.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 2)
            self.assertIn("300 0", lines[0])
            self.assertIn("100 4194304", lines[1])

    def test_histogram_in_and_out_hugepage(self):
        """Tests handling the case of separating samples in and out huge page."""
        fname = "test_histo.txt"
        with open(fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            for i in range(100):
                _write_perf_sample(101, 101, 0xABCD000 + i, f)
                _write_perf_sample(101, 101, 0xABCE000 + i, f)
                _write_perf_sample(101, 101, 0xABFD000 + i, f)
                _write_perf_sample(101, 101, 0xAFCD000 + i, f)
        self.addCleanup(_cleanup, fname)
        _heatmap(fname, hugepage=[0, 8192])
        file_list = os.listdir(".")
        self.assertNotIn("inst-histo.txt", file_list)
        self.assertIn("inst-histo-hp.txt", file_list)
        self.assertIn("inst-histo-sp.txt", file_list)
        with open("inst-histo-hp.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 2)
            self.assertIn("100 0", lines[0])
            self.assertIn("100 4096", lines[1])
        with open("inst-histo-sp.txt") as f:
            lines = f.readlines()
            self.assertEqual(len(lines), 2)
            self.assertIn("100 196608", lines[0])
            self.assertIn("100 4194304", lines[1])


class HeatmapGeneratorAnalyzeTests(unittest.TestCase):
    """All of our tests for heatmap_generator.analyze() and related."""

    def setUp(self):
        # Use the same perf report for testing
        self.fname = "test_histo.txt"
        with open(self.fname, "w") as f:
            _write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
            for i in range(10):
                _write_perf_sample(101, 101, 0xABCD000 + i, f)
                _write_perf_sample(101, 101, 0xABCE000 + i, f)
                _write_perf_sample(101, 101, 0xABFD000 + i, f)
        self.nm = (
            "000000000abcd000 t Func1@Page1\n"
            "000000000abcd001 t Func2@Page1\n"
            "000000000abcd0a0 t Func3@Page1andFunc1@Page2\n"
            "000000000abce010 t Func2@Page2\n"
            "000000000abfd000 t Func1@Page3\n"
        )

    def tearDown(self):
        _cleanup(self.fname)

    @mock.patch("subprocess.check_output")
    def test_analyze_hot_pages_with_hp_top(self, mock_nm):
        """Test if the analyze() can print the top page with hugepage."""
        mock_nm.return_value = self.nm
        _heatmap(self.fname, hugepage=[0, 8192], analyze=True, top_n=1)
        file_list = os.listdir(".")
        self.assertIn("addr2symbol.txt", file_list)
        with open("addr2symbol.txt") as f:
            contents = f.read()
            self.assertIn("Func2@Page1 : 9", contents)
            self.assertIn("Func1@Page1 : 1", contents)
            self.assertIn("Func1@Page3 : 10", contents)
            # Only displaying one page in hugepage
            self.assertNotIn("Func3@Page1andFunc1@Page2 : 10", contents)

    @mock.patch("subprocess.check_output")
    def test_analyze_hot_pages_without_hp_top(self, mock_nm):
        """Test if the analyze() can print the top page without hugepage."""
        mock_nm.return_value = self.nm
        _heatmap(self.fname, analyze=True, top_n=1)
        file_list = os.listdir(".")
        self.assertIn("addr2symbol.txt", file_list)
        with open("addr2symbol.txt") as f:
            contents = f.read()
            self.assertIn("Func2@Page1 : 9", contents)
            self.assertIn("Func1@Page1 : 1", contents)
            # Only displaying one page
            self.assertNotIn("Func3@Page1andFunc1@Page2 : 10", contents)
            self.assertNotIn("Func1@Page3 : 10", contents)

    @mock.patch("subprocess.check_output")
    def test_analyze_hot_pages_with_hp_top10(self, mock_nm):
        """Test if the analyze() can print with default top 10."""
        mock_nm.return_value = self.nm
        _heatmap(self.fname, analyze=True)
        # Make sure nm command is called correctly.
        mock_nm.assert_called_with(["nm", "-n", "/path/to/chrome"])
        file_list = os.listdir(".")
        self.assertIn("addr2symbol.txt", file_list)
        with open("addr2symbol.txt") as f:
            contents = f.read()
            self.assertIn("Func2@Page1 : 9", contents)
            self.assertIn("Func1@Page1 : 1", contents)
            self.assertIn("Func3@Page1andFunc1@Page2 : 10", contents)
            self.assertIn("Func1@Page3 : 10", contents)


if __name__ == "__main__":
    unittest.main()
