blob: 898c73700a82dd776ddb043f9c4f0d5c16617222 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for heatmap_generator.py."""
import os
import unittest
import unittest.mock as mock
from heatmaps import heatmap_generator
def _write_perf_mmap(pid, tid, addr, size, fp):
print(
"0 0 0 0 PERF_RECORD_MMAP2 %d/%d: "
"[%x(%x) @ 0x0 0:0 0 0] "
"r-xp /opt/google/chrome/chrome\n" % (pid, tid, addr, size),
file=fp,
)
def _write_perf_fork(pid_from, tid_from, pid_to, tid_to, fp):
print(
"0 0 0 0 PERF_RECORD_FORK(%d:%d):(%d:%d)\n"
% (pid_to, tid_to, pid_from, tid_from),
file=fp,
)
def _write_perf_exit(pid_from, tid_from, pid_to, tid_to, fp):
print(
"0 0 0 0 PERF_RECORD_EXIT(%d:%d):(%d:%d)\n"
% (pid_to, tid_to, pid_from, tid_from),
file=fp,
)
def _write_perf_sample(pid, tid, addr, fp):
print(
"0 0 0 0 PERF_RECORD_SAMPLE(IP, 0x2): "
"%d/%d: %x period: 100000 addr: 0" % (pid, tid, addr),
file=fp,
)
print(" ... thread: chrome:%d" % tid, file=fp)
print(" ...... dso: /opt/google/chrome/chrome\n", file=fp)
def _heatmap(file_name, page_size=4096, hugepage=None, analyze=False, top_n=10):
generator = heatmap_generator.HeatmapGenerator(
file_name, page_size, hugepage, "", log_level="none"
) # Don't log to stdout
generator.draw()
if analyze:
generator.analyze("/path/to/chrome", top_n)
def _cleanup(file_name):
files = [
file_name,
"out.txt",
"inst-histo.txt",
"inst-histo-hp.txt",
"inst-histo-sp.txt",
"heat_map.png",
"timeline.png",
"addr2symbol.txt",
]
for f in files:
if os.path.exists(f):
os.remove(f)
class HeatmapGeneratorDrawTests(unittest.TestCase):
"""All of our tests for heatmap_generator.draw() and related."""
def test_with_one_mmap_one_sample(self):
"""Tests one perf record and one sample."""
fname = "test.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
_write_perf_sample(101, 101, 0xABCD101, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname)
self.assertIn("out.txt", os.listdir("."))
with open("out.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
self.assertIn("101/101: 1 0", lines[0])
def test_with_one_mmap_multiple_samples(self):
"""Tests one perf record and three samples."""
fname = "test.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
_write_perf_sample(101, 101, 0xABCD101, f)
_write_perf_sample(101, 101, 0xABCD102, f)
_write_perf_sample(101, 101, 0xABCE102, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname)
self.assertIn("out.txt", os.listdir("."))
with open("out.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 3)
self.assertIn("101/101: 1 0", lines[0])
self.assertIn("101/101: 2 0", lines[1])
self.assertIn("101/101: 3 4096", lines[2])
def test_with_fork_and_exit(self):
"""Tests perf fork and perf exit."""
fname = "test_fork.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
_write_perf_fork(101, 101, 202, 202, f)
_write_perf_sample(101, 101, 0xABCD101, f)
_write_perf_sample(202, 202, 0xABCE101, f)
_write_perf_exit(202, 202, 202, 202, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname)
self.assertIn("out.txt", os.listdir("."))
with open("out.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
self.assertIn("101/101: 1 0", lines[0])
self.assertIn("202/202: 2 4096", lines[1])
def test_hugepage_creates_two_chrome_mmaps(self):
"""Test two chrome mmaps for the same process."""
fname = "test_hugepage.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
_write_perf_fork(101, 101, 202, 202, f)
_write_perf_mmap(202, 202, 0xABCD000, 0x100, f)
_write_perf_mmap(202, 202, 0xABCD300, 0xD00, f)
_write_perf_sample(101, 101, 0xABCD102, f)
_write_perf_sample(202, 202, 0xABCD102, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname)
self.assertIn("out.txt", os.listdir("."))
with open("out.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
self.assertIn("101/101: 1 0", lines[0])
self.assertIn("202/202: 2 0", lines[1])
def test_hugepage_creates_two_chrome_mmaps_fail(self):
"""Test two chrome mmaps for the same process."""
fname = "test_hugepage.txt"
# Cases where first_mmap.size < second_mmap.size
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
_write_perf_fork(101, 101, 202, 202, f)
_write_perf_mmap(202, 202, 0xABCD000, 0x10000, f)
self.addCleanup(_cleanup, fname)
with self.assertRaises(AssertionError) as msg:
_heatmap(fname)
self.assertIn("Original MMAP size", str(msg.exception))
# Cases where first_mmap.address > second_mmap.address
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
_write_perf_fork(101, 101, 202, 202, f)
_write_perf_mmap(202, 202, 0xABCC000, 0x10000, f)
with self.assertRaises(AssertionError) as msg:
_heatmap(fname)
self.assertIn("Original MMAP starting address", str(msg.exception))
# Cases where first_mmap.address + size <
# second_mmap.address + second_mmap.size
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x1000, f)
_write_perf_fork(101, 101, 202, 202, f)
_write_perf_mmap(202, 202, 0xABCD100, 0x10000, f)
with self.assertRaises(AssertionError) as msg:
_heatmap(fname)
self.assertIn("exceeds the end of original MMAP", str(msg.exception))
def test_histogram(self):
"""Tests if the tool can generate correct histogram.
In the tool, histogram is generated from statistics
of perf samples (saved to out.txt). The histogram is
generated by perf-to-inst-page.sh and saved to
inst-histo.txt. It will be used to draw heat maps.
"""
fname = "test_histo.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
for i in range(100):
_write_perf_sample(101, 101, 0xABCD000 + i, f)
_write_perf_sample(101, 101, 0xABCE000 + i, f)
_write_perf_sample(101, 101, 0xABFD000 + i, f)
_write_perf_sample(101, 101, 0xAFCD000 + i, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname)
self.assertIn("inst-histo.txt", os.listdir("."))
with open("inst-histo.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 4)
self.assertIn("100 0", lines[0])
self.assertIn("100 4096", lines[1])
self.assertIn("100 196608", lines[2])
self.assertIn("100 4194304", lines[3])
def test_histogram_two_mb_page(self):
"""Tests handling of 2MB page."""
fname = "test_histo.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
for i in range(100):
_write_perf_sample(101, 101, 0xABCD000 + i, f)
_write_perf_sample(101, 101, 0xABCE000 + i, f)
_write_perf_sample(101, 101, 0xABFD000 + i, f)
_write_perf_sample(101, 101, 0xAFCD000 + i, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname, page_size=2 * 1024 * 1024)
self.assertIn("inst-histo.txt", os.listdir("."))
with open("inst-histo.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
self.assertIn("300 0", lines[0])
self.assertIn("100 4194304", lines[1])
def test_histogram_in_and_out_hugepage(self):
"""Tests handling the case of separating samples in and out huge page."""
fname = "test_histo.txt"
with open(fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
for i in range(100):
_write_perf_sample(101, 101, 0xABCD000 + i, f)
_write_perf_sample(101, 101, 0xABCE000 + i, f)
_write_perf_sample(101, 101, 0xABFD000 + i, f)
_write_perf_sample(101, 101, 0xAFCD000 + i, f)
self.addCleanup(_cleanup, fname)
_heatmap(fname, hugepage=[0, 8192])
file_list = os.listdir(".")
self.assertNotIn("inst-histo.txt", file_list)
self.assertIn("inst-histo-hp.txt", file_list)
self.assertIn("inst-histo-sp.txt", file_list)
with open("inst-histo-hp.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
self.assertIn("100 0", lines[0])
self.assertIn("100 4096", lines[1])
with open("inst-histo-sp.txt") as f:
lines = f.readlines()
self.assertEqual(len(lines), 2)
self.assertIn("100 196608", lines[0])
self.assertIn("100 4194304", lines[1])
class HeatmapGeneratorAnalyzeTests(unittest.TestCase):
"""All of our tests for heatmap_generator.analyze() and related."""
def setUp(self):
# Use the same perf report for testing
self.fname = "test_histo.txt"
with open(self.fname, "w") as f:
_write_perf_mmap(101, 101, 0xABCD000, 0x100, f)
for i in range(10):
_write_perf_sample(101, 101, 0xABCD000 + i, f)
_write_perf_sample(101, 101, 0xABCE000 + i, f)
_write_perf_sample(101, 101, 0xABFD000 + i, f)
self.nm = (
"000000000abcd000 t Func1@Page1\n"
"000000000abcd001 t Func2@Page1\n"
"000000000abcd0a0 t Func3@Page1andFunc1@Page2\n"
"000000000abce010 t Func2@Page2\n"
"000000000abfd000 t Func1@Page3\n"
)
def tearDown(self):
_cleanup(self.fname)
@mock.patch("subprocess.check_output")
def test_analyze_hot_pages_with_hp_top(self, mock_nm):
"""Test if the analyze() can print the top page with hugepage."""
mock_nm.return_value = self.nm
_heatmap(self.fname, hugepage=[0, 8192], analyze=True, top_n=1)
file_list = os.listdir(".")
self.assertIn("addr2symbol.txt", file_list)
with open("addr2symbol.txt") as f:
contents = f.read()
self.assertIn("Func2@Page1 : 9", contents)
self.assertIn("Func1@Page1 : 1", contents)
self.assertIn("Func1@Page3 : 10", contents)
# Only displaying one page in hugepage
self.assertNotIn("Func3@Page1andFunc1@Page2 : 10", contents)
@mock.patch("subprocess.check_output")
def test_analyze_hot_pages_without_hp_top(self, mock_nm):
"""Test if the analyze() can print the top page without hugepage."""
mock_nm.return_value = self.nm
_heatmap(self.fname, analyze=True, top_n=1)
file_list = os.listdir(".")
self.assertIn("addr2symbol.txt", file_list)
with open("addr2symbol.txt") as f:
contents = f.read()
self.assertIn("Func2@Page1 : 9", contents)
self.assertIn("Func1@Page1 : 1", contents)
# Only displaying one page
self.assertNotIn("Func3@Page1andFunc1@Page2 : 10", contents)
self.assertNotIn("Func1@Page3 : 10", contents)
@mock.patch("subprocess.check_output")
def test_analyze_hot_pages_with_hp_top10(self, mock_nm):
"""Test if the analyze() can print with default top 10."""
mock_nm.return_value = self.nm
_heatmap(self.fname, analyze=True)
# Make sure nm command is called correctly.
mock_nm.assert_called_with(["nm", "-n", "/path/to/chrome"])
file_list = os.listdir(".")
self.assertIn("addr2symbol.txt", file_list)
with open("addr2symbol.txt") as f:
contents = f.read()
self.assertIn("Func2@Page1 : 9", contents)
self.assertIn("Func1@Page1 : 1", contents)
self.assertIn("Func3@Page1andFunc1@Page2 : 10", contents)
self.assertIn("Func1@Page3 : 10", contents)
if __name__ == "__main__":
unittest.main()