mmm_donut: Initial version

Initial version of the Mad Memory Muncher, a Dynamically Organizing
Non-Uniprocess Tester.

This attempts to chew through memory and exercise zram / low memory
conditions in a slightly cleaner way than other similar tools.

- It's intrinsically multiprocess and the different processes work
  together.  Among other benefits, this means we don't get into corner
  cases where one process starts or finishes significantly before the
- The master process can detect if a child has died and stop the test.
- We can summarize the results of all clients easily.

This test is written in python so it is hopefully easy to extend.  It
still attempts to be predictable and performant by allocating pages
itself by calling libc directly and doing its memory manipulation
using numpy (which is written in C).

At the moment this program only has a single, hardcoded test where it
allocates a bunch of memory and then spends time accessing it.  It
could certainly be extended to include additional test modes.

TEST=Use this to genereate some memory pressure

Change-Id: I9d9babc7b2b9700741fa7577dc744aeac66f73a6
Commit-Ready: Douglas Anderson <>
Tested-by: Douglas Anderson <>
Reviewed-by: Sonny Rao <>
diff --git a/ b/
new file mode 100755
index 0000000..9b6494a
--- /dev/null
+++ b/
@@ -0,0 +1,547 @@
+#!/usr/bin/env python2
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Mad Memory Muncher - Dynamically Organizing Non-Uniprocess Tester.
+This program attempts to exercise low memory situations by munching memory
+in a coordinated way across several processes.
+Specifically, there is a single controlling process that keeps communication
+channels open to subprocesses so that all processes can start and stop various
+parts of the test at the same time.  This tester also has various clever ways
+to access memory.
+The main modes are: munch (allocate memory), taste (re-read already
+allocated memory), and chew (modify already allocated memory).  Whenever
+possible we try to put some sane values into memory so that any memory
+compression will behave in a real-world-like way.
+At the moment this program always makes sure that all of the child
+sub-processes are set to have an OOM score of 1000 (easy to kill them) and
+the parent has a default OOM score (unkillable on Chrome OS).  At various
+checkpoints in the test the parent looks for dead children and stops the test.
+- The way this program works is subject to chagne depending on the needs
+  of people stressing memory.  Don't rely on command line arguments staying
+  consistent.  If we need a consistent test, we could fork this or add a
+  consistent subcommand.
+- You should probably have KASAN and slub_debug turned off when running this.
+  If you have those on then you're not doing a real test of the memory system
+  and shouldn't be surprised that it can't keep up.
+    1. Launch one process per CPU and aim for 500 MB swap left.  Re-access
+       memory for 70 seconds and then access/modify memory for 90 seconds:
+         mmm_donut --free_swap=500 --taste=70 --chew=90
+    2. Like #1 but use 200 processes.  Note that since by default each
+       process will access 1MB at a time we'll probably really end up stopping
+       at closer to 300 MB free swap or less (the muncher stops telling
+       sub-processes to allocate when free swap is 500 MB, but then any
+       outstanding allocatoins will finish.
+         mmm_donut -n200 --free_swap=500 --taste=70 --chew=90
+    3. Like #1 but have children allocate 20MB chunks.  This will act to
+       more quickly allocate memory but will also over-allocate a bit more.
+       On a 6-CPU system you might overallocate by 120MB.
+         mmm_donut --free_swap=500 --munch_mbs=20 --taste=70 --chew=90
+from __future__ import print_function
+import argparse
+import ctypes
+import multiprocessing
+import numpy
+import os
+import Queue
+import subprocess
+import sys
+import time
+libc = ctypes.CDLL('') = [ctypes.c_void_p] = None
+libc.valloc.argtypes = [ctypes.c_size_t]
+libc.valloc.restype = ctypes.c_void_p
+# By default, we'll fill memory with data based on the contents of this
+# file.  Ideally it should be a big file and fairly representative of
+# what we expect memory to contain.
+_DEFAULT_FILE_TO_MAP = '/opt/google/chrome/chrome'
+_KB = 1024
+_MB = _KB * _KB
+# For the purpose of this program, a 'word' is 32-bits.
+_WORDS_PER_MB = _MB / 4
+_PAGESIZE = os.sysconf('SC_PAGESIZE')
+class _MemoryMuncher(object):
+    """A class for eating memory.
+    This class has functions in it for efficiently munching up memory.
+    Specifically, it has a few things it can do:
+    munch: This will allocate more memory and fill it with data copied from
+        a prototype datasource.  It will attempt to make this data 'unique'
+        by adding a value to each word based on the current PID.  Allocating
+        is done 1 MB at a time and done with valloc() so we get page-sized
+        allocations.  Copying / making unique is done with numpy to get
+        reasonably efficiency.
+    taste: This will re-read memory (1MB at a time) that's already been munched,
+        which ought to cause it to get paged in.  We read 1 word from each page.
+        Again we use numpy which ought to make it somewhat efficient.
+    chew: This will attempt to read-modify-write memory (1MB at a time) that's
+        already been munched.  This ought to have no huge performance difference
+        than taste.
+    spit: This will release memory allocated by munch.
+    Attributes:
+        num_mbs_allocated: Number of MB that are currently allocated.
+        num_mbs_munched: Number of MB that have been munched in total.  Note
+            that if you munch something and then spit it out it still counts in
+            this number, so munch(30); spit(10); munch(20) => 50.
+        num_mbs_tasted: Number of MB that have been tasted in total.
+        num_mbs_chewed: Number of MB that have been chewed in total.
+    """
+    def __init__(self, proto_data=None):
+        """Create a MemoryMuncher object.
+        Args:
+            proto_data: A numpy.memmap array, or None for the default.  We'll
+                use this as prototype data to copy to our allocated pages.
+        """
+        if not proto_data:
+            proto_data = numpy.memmap(_DEFAULT_FILE_TO_MAP,
+                                      dtype='uint32', mode='r')
+        self._proto_data = proto_data
+        self._num_proto_mbs = len(self._proto_data) / _WORDS_PER_MB
+        self._at_proto_mb = 0
+        # Every time we munch through a chunk we'll add this to each integer to
+        # make the chunk look unique, then increment it.
+        self._unique = os.getpid() << 16
+        self._mbs = []
+        self._last_accessed_mb = -1
+        self.num_mbs_munched = 0
+        self.num_mbs_tasted = 0
+        self.num_mbs_chewed = 0
+    @property
+    def num_mbs_allocated(self):
+        return len(self._mbs)
+    def _alloc_array(self, n, element_type=ctypes.c_uint8):
+        """Allocate a numpy array using libc.valloc (page aligned allocation).
+        Args:
+            n: Number of elements in the array
+            element_type: The type of the element (a ctypes type)
+        """
+        ptr = libc.valloc(n * ctypes.sizeof(element_type))
+        ptr = ctypes.cast(ptr, ctypes.POINTER(element_type))
+        return numpy.ctypeslib.as_array(ptr, shape=(n,))
+    def _free_array(self, arr):
+        """Free a numpy array allocated with _alloc_array
+        Args:
+            arr: The return value from _alloc_array.
+        """
+        ptr = ctypes.cast(arr, ctypes.c_void_p)
+    def munch(self, mbs_to_munch, quick_alloc=False):
+        """Allocate the given number of mbs, filling the memory with data.
+        Args:
+            mbs_to_munch: The number of MBs to allocate.
+            quick_alloc: If true, we'll try to allocate quicker by not using
+                the proto data; we'll just put a unique value in the first
+                word of the page.
+        """
+        for _ in xrange(mbs_to_munch):
+            # Allocate some memory using libc; give back a numpy object
+            mb = self._alloc_array(_WORDS_PER_MB, ctypes.c_uint32)
+            # Copy data from our proto data making it unique by adding a
+            # unique integer to each word.
+            mb[0] = self._unique
+            if quick_alloc:
+                # Don't even bother to zero memory, but put at least something
+                # unique per page
+                mb.reshape((_PAGESIZE, -1)).T[0] = self._unique
+            else:
+                # Copy from the next spot in the prototype
+                # As we copy, add the unique data based on our PID.
+                mb[:] = (self._proto_data[self._at_proto_mb *
+                                          _WORDS_PER_MB:
+                                          (self._at_proto_mb + 1) *
+                                          _WORDS_PER_MB] + self._unique)
+            # Update so we're ready for the next time
+            self._at_proto_mb += 1
+            self._at_proto_mb %= self._num_proto_mbs
+            self._unique += 1
+            self._mbs.append(mb)
+            self.num_mbs_munched += 1
+    def spit(self, mbs_to_spit):
+        """Spit (free) out the oldest munched memory.
+        Args:
+            mbs_to_spit: Number of MBs to spit.
+        """
+        for _ in xrange(mbs_to_spit):
+            if not self._mbs:
+                raise RuntimeError('No more memory to spit out')
+            self._free_array(self._mbs.pop(0))
+    def taste(self, mbs_to_taste):
+        """Access memory that we've chewed through, reading 1 word per page.
+        Args:
+            mbs_to_taste: Number of MBs that we'd like to try to access
+        """
+        if not self._mbs:
+            raise RuntimeError('No memory')
+        mb_num = self._last_accessed_mb
+        for mb_num in xrange(mb_num + 1, mb_num + 1 + mbs_to_taste):
+            mb_num %= len(self._mbs)
+            mb = self._mbs[mb_num]
+            self.num_mbs_tasted += 1
+            # Fancy numpy to access 1 word from each page
+            _ = sum(mb.reshape((-1, _PAGESIZE)).T[0])
+        self._last_accessed_mb = mb_num
+    def chew(self, mbs_to_chew):
+        """Modify memory that we've chewed through, tweaking 1 word per page.
+        Args:
+            mbs_to_chew: Number of MBs that we'd like to try to access
+        """
+        if not self._mbs:
+            raise RuntimeError('No memory')
+        mb_num = self._last_accessed_mb
+        for mb_num in xrange(mb_num + 1, mb_num + 1 + mbs_to_chew):
+            mb_num %= len(self._mbs)
+            mb = self._mbs[mb_num]
+            self.num_mbs_chewed += 1
+            # Fancy numpy to access 1 word from each page; we'll invert each
+            # time as our modification
+            _ = sum(mb.reshape((-1, _PAGESIZE)).T[0])
+        self._last_accessed_mb = mb_num
+class _MemInfo(object):
+    """An object that makes accessing /proc/meminfo easy.
+    When this object is created it will read /proc/meminfo and store all the
+    attributes it finds as integer properties.  All memory quantities are
+    expressed in bytes, so if /proc/meminfo said 'MemFree' was 100 kB then our
+    MemFree attribute will be 102400.
+    """
+    def __init__(self):
+        with open('/proc/meminfo', 'r') as f:
+            for line in f.readlines():
+                name, _, val = line.partition(':')
+                num, _, unit = val.strip().partition(' ')
+                num = int(num)
+                if unit == 'kB':
+                    num *= 1024
+                elif unit != '':
+                    raise RuntimeError('Unexpected meminfo: %s' % line)
+                setattr(self, name, num)
+def _make_self_oomable():
+    """Makes sure that the current process is easily OOMable."""
+    with open('/proc/self/oom_score_adj', 'w') as f:
+        f.write('1000\n')
+def _thread_main(task_num, options, cmd_queue, done_queue):
+    """The main entry point of the worker threads.
+    Threads communicate with the main thread through two queues.  They get
+    commands from the cmd_queue and communicate that they're done by putting
+    their task_num on the done_queue.
+    Args:
+        task_num: The integer ID of this task.
+        options: Options created by _parse_options()
+        cmd_queue: String commands will be put here by the main thread.
+        done_queue: We'll put our task_num on this queue when we're done with
+            our command.
+    """
+    _make_self_oomable()
+    muncher = _MemoryMuncher()
+    munch_mbs = options.munch_mbs
+    taste_mbs = options.taste_mbs
+    chew_mbs = options.chew_mbs
+    try:
+        cmd = None
+        while cmd != 'done':
+            cmd = cmd_queue.get()
+            if cmd == 'status':
+                print(('Task %d: allocated %d MB, munched %d MB, ' +
+                       'tasted %d MB, chewed %d MB') %
+                      (task_num, muncher.num_mbs_allocated,
+                       muncher.num_mbs_munched, muncher.num_mbs_tasted,
+                       muncher.num_mbs_chewed))
+            elif cmd == 'munch':
+                muncher.munch(munch_mbs)
+            elif cmd == 'taste':
+                muncher.taste(chew_mbs)
+            elif cmd == 'chew':
+                muncher.chew(taste_mbs)
+            done_queue.put(task_num)
+    except KeyboardInterrupt:
+        # Don't yell about keyboard interrupts
+        pass
+    finally:
+        print('Task %d is done' % task_num)
+        done_queue.close()
+        cmd_queue.close()
+class WorkerDeadError(RuntimeError):
+    """We throw this when we see that a worker has died."""
+    def __init__(self, task_num):
+        super(WorkerDeadError, self).__init__('Task %d is dead' % task_num)
+        self.task_num = task_num
+def _wait_everyone_done(tasks, done_queue, refill_done_queue=True):
+    """Wait until all of our workers are done.
+    This will wait until all tasks have put their task_num in the done_queue.
+    We'll also check to see if any tasks are dead and we'll raise an exception
+    if we notice this.
+    Args:
+        tasks: The list of our worker tasks.
+        done_queue: Our done queue
+        refill_done_queue: If True then we'll make sure that the done_queue
+            has each task number in it when we're done; if False then we'll
+            leave the done_queue empty.
+    Raises:
+        WorkerDeadError: If we notice something has died.
+    """
+    num_tasks = len(tasks)
+    # We want to see every task number report it's done via the done_queue; if
+    # things are taking too long we'll poll for dead children.
+    done_tasks = set()
+    while len(done_tasks) != num_tasks:
+        try:
+            task_num = done_queue.get(timeout=.5)
+            done_tasks.add(task_num)
+        except Queue.Empty:
+            for task_num, task in enumerate(tasks):
+                if not task.is_alive():
+                    raise WorkerDeadError(task_num)
+    assert done_queue.empty()
+    if not refill_done_queue:
+        return
+    # Add everyone back to the done_queue.
+    for task_num in xrange(num_tasks):
+        done_queue.put(task_num)
+def _end_stage(old_stage_name, tasks, done_queue, cmd_queues):
+    """End the given stage and ask wokers to print status.
+    Args:
+        old_stage_name: We'll print this to tell the user we finished this.
+        tasks: The list of our worker tasks.
+        done_queue: Our done queue
+        cmd_queues: A list of all task command queues.
+    """
+    num_tasks = len(tasks)
+    # Wait, but don't refill the queue since since we'll get the queue
+    # refilled after the workers finish printing their status.
+    _wait_everyone_done(tasks, done_queue, refill_done_queue=False)
+    print('Done with stage %s' % old_stage_name)
+    # Give the system a second to quiesce (TODO: needed?)
+    time.sleep(1)
+    # We'll throw an extra status update; this will refill the done_queue
+    for task_num in xrange(num_tasks):
+        assert cmd_queues[task_num].empty()
+        cmd_queues[task_num].put('status')
+    _wait_everyone_done(tasks, done_queue)
+def _parse_options(args):
+    """Parse command line options.
+    Args:
+        args: sys.argv[1:]
+    Returns:
+        An argparse.ArgumentParser object.
+    """
+    p = subprocess.Popen(['nproc'], stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+    stdout, _ = p.communicate()
+    nproc = int(stdout)
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        formatter_class=argparse.RawDescriptionHelpFormatter
+    )
+    parser.add_argument(
+        '-n', '--num_tasks', type=int, default=nproc,
+        help='Number of tasks to use (default: %(default)s)'
+    )
+    parser.add_argument(
+        '-z', '--munch_mbs', type=int, default=1,
+        help='Munch this many MB at a time (default: %(default)s)'
+    )
+    parser.add_argument(
+        '-s', '--free_swap', type=int, default=500,
+        help='Stop munching when free swap <= this many MB ' +
+        '(default: %(default)s)'
+    )
+    parser.add_argument(
+        '-t', '--taste', type=int, default=30,
+        help='Taste for this many seconds (default: %(default)s)'
+    )
+    parser.add_argument(
+        '-T', '--taste_mbs', type=int, default=-1,
+        help='Taste this many MB at a time (default: use munch_mbs)'
+    )
+    parser.add_argument(
+        '-c', '--chew', type=int, default=30,
+        help='Chew for this many seconds (default: %(default)s)'
+    )
+    parser.add_argument(
+        '-C', '--chew_mbs', type=int, default=-1,
+        help='Chew this many MB at a time (default: use munch_mbs)'
+    )
+    parser.add_argument(
+        '-F', '--memfree_sleep', type=int, default=0,
+        help='Sleep when memfree is < this many MB (default: %(default)s)'
+    )
+    options = parser.parse_args(args)
+    if options.taste_mbs == -1:
+        options.taste_mbs = options.munch_mbs
+    if options.chew_mbs == -1:
+        options.chew_mbs = options.munch_mbs
+    return options
+def main(args):
+    options = _parse_options(args)
+    num_tasks = options.num_tasks
+    done_queue = multiprocessing.Queue()
+    cmd_queues = [multiprocessing.Queue() for task_num in xrange(num_tasks)]
+    tasks = [
+        multiprocessing.Process(
+            target=_thread_main,
+            args=(task_num, options, cmd_queues[task_num], done_queue)
+        )
+        for task_num in xrange(num_tasks)
+    ]
+    for task in tasks:
+        task.start()
+    print('Starting test.')
+    for task_num in xrange(num_tasks):
+        cmd_queues[task_num].put('status')
+    _wait_everyone_done(tasks, done_queue)
+    try:
+        print('Munching till swap < %d MB free; munch %d MB at a time.' %
+              (options.free_swap, options.munch_mbs))
+        while True:
+            meminfo = _MemInfo()
+            if meminfo.SwapFree < options.free_swap * _MB:
+                break
+            if meminfo.MemFree < options.memfree_sleep * _MB:
+                print('MemFree only %d MB; sleeping' % (meminfo.MemFree / _MB))
+                time.sleep(1)
+                continue
+            task_num = done_queue.get()
+            cmd_queues[task_num].put('munch')
+        _end_stage('munch', tasks, done_queue, cmd_queues)
+        print('Tasting for %d seconds; taste %d MB at a time.' %
+              (options.taste, options.taste_mbs))
+        end_time = time.time() + options.taste
+        while time.time() < end_time:
+            task_num = done_queue.get()
+            cmd_queues[task_num].put('taste')
+        _end_stage('taste', tasks, done_queue, cmd_queues)
+        print('Chewing for %d seconds; chew %d MB at a time.' %
+              (options.chew, options.chew_mbs))
+        end_time = time.time() + options.chew
+        while time.time() < end_time:
+            task_num = done_queue.get()
+            cmd_queues[task_num].put('chew')
+        _end_stage('chew', tasks, done_queue, cmd_queues)
+    except KeyboardInterrupt:
+        pass
+    except WorkerDeadError as error:
+        print('ERROR: %s' % str(error))
+    finally:
+        print('All done I guess; trying to end things nicely.')
+        # Throw in a command to try to get them to quit
+        for cmd_queue in cmd_queues:
+            cmd_queue.put('done')
+        for task in tasks:
+            task.join(10)
+            task.terminate()
+        done_queue.close()
+        for cmd_queue in cmd_queues:
+            cmd_queue.close()
+        print('Quitting')
+    return 0
+if __name__ == '__main__':
+    sys.exit(main(sys.argv[1:]))
diff --git a/pylintrc b/pylintrc
new file mode 100644
index 0000000..838ff38
--- /dev/null
+++ b/pylintrc
@@ -0,0 +1,86 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifier separated by comma (,) or put this option
+# multiple times (only on the command line, not in the configuration file where
+# it should appear only once).
+# C0103: Invalid name ""
+# C0111: Missing docstring
+# C0301: Line too long.
+# C0302: Too many lines in module (N)
+# C0321: More than one statement on a single line
+# E1103: Instance has no '*' member (but some types could not be inferred)
+# I0011: Locally disabling warning.
+# I0012: Locally enabling warning.
+# R0201: Method could be a function
+# R0902: Too many instance attributes (N/7)
+# R0903: Too few public methods (N/2)
+# R0911: Too many return statements (N/6)
+# R0912: Too many branches (N/12)
+# R0913: Too many arguments (N/5)
+# R0914: Too many local variables (N/15)
+# R0915: Too many statements (N/50)
+# W0122: Use of the exec statement
+# W0102: Dangerous default value
+# W0141: Used builtin function ''
+# W0142: Used * or ** magic
+# W0212: Access to protected member
+# W0311: Bad indentation.
+# W0312: Found indentation with tabs instead of spaces
+# W0403: Relative import 'constants', should be 'chromite.cbuildbot.constants'
+# W0511: Used when a warning note as FIXME or XXX is detected.
+# W0622: Redefining built-in
+# R0904: Too many public methods
+# R0921: Abstract class not referenced.
+# Tells whether to display a full report or only the messages
+# CHANGE: No report.
+# List of members which are set dynamically and missed by pylint inference
+# system, and so shouldn't trigger E0201 when accessed.
+# CHANGE: Added 'AndReturn', 'InAnyOrder' and 'MultipleTimes' for pymox.
+# CHANGE: Added tempdir for @osutils.TempDirDecorator.
+# Regular expression which should only match correct function names
+# CHANGE: The ChromiumOS standard is different than PEP-8, so we need to
+# redefine this.
+# Common exceptions to ChromiumOS standard:
+# - main: Standard for main function
+# Regular expression which should only match correct method names
+# CHANGE: The ChromiumOS standard is different than PEP-8, so we need to
+# redefine this. Here's what we allow:
+# - CamelCaps, starting with a capital letter.  No underscores in function
+#   names.  Can also have a "_" prefix (private method) or a "test" prefix
+#   (unit test).
+# - Methods that look like __xyz__, which are used to do things like
+#   __init__, __del__, etc.
+# - setUp, tearDown: For unit tests.
+# Minimum lines number of a similarity.