blob: 4702d83afaf51953d92e395308e344176293a183 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import factory_common
import commands
import glib
import gtk
import logging
import os
import pyudev
import unittest
from autotest_lib.client.common_lib.autotemp import tempfile
from autotest_lib.client.cros.factory.media_util import MediaMonitor
from autotest_lib.client.cros.factory.media_util import MountedMedia
# udev constants
_UDEV_ACTION_INSERT = 'add'
_UDEV_ACTION_REMOVE = 'remove'
_WRITING_TEST_FILENAME = 'media_util_unittest.test'
_WRITING_TEST_STR = 'Unittest writing test...'
_VIRTUAL_PATITION_NUMBER = 3
class TestMountedMedia(unittest.TestCase):
def setUp(self):
"""Creates a temp file to mock as a media device."""
self._virtual_device = tempfile(unique_id='media_util_unitttest')
exit_code, ret = commands.getstatusoutput(
'truncate -s 1048576 %s && mkfs -F -t ext3 %s' %
(self._virtual_device.name, self._virtual_device.name))
self.assertEqual(0, exit_code)
exit_code, ret = commands.getstatusoutput('losetup --show -f %s' %
self._virtual_device.name)
self._free_loop_device = ret
self.assertEqual(0, exit_code)
def tearDown(self):
exit_code, ret = commands.getstatusoutput(
'losetup -d %s' % self._free_loop_device)
self.assertEqual(0, exit_code)
self._virtual_device.clean()
def testFailToMount(self):
"""Tests the MountedMedia throws exceptions when it fails."""
def with_wrapper():
with MountedMedia('/dev/device_not_exist') as path:
pass
self.assertRaises(Exception, with_wrapper)
def testNormalMount(self):
"""Tests mounting partition."""
with MountedMedia(self._free_loop_device) as path:
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'w') as f:
f.write(_WRITING_TEST_STR)
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'r') as f:
self.assertEqual(_WRITING_TEST_STR, f.readline())
def testPartitionMountSDA(self):
"""Tests mounting partition.
This tests mounting partition with devices enumerated
in alphabets (ex, sda).
"""
virtual_partition = tempfile(unique_id='virtual_partition',
suffix='sdc%d' %
_VIRTUAL_PATITION_NUMBER)
exit_code, ret = commands.getstatusoutput(
'ln -s -f %s %s' %
(self._free_loop_device, virtual_partition.name))
self.assertEqual(0, exit_code)
with MountedMedia(virtual_partition.name[:-1],
_VIRTUAL_PATITION_NUMBER) as path:
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'w') as f:
f.write(_WRITING_TEST_STR)
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'r') as f:
self.assertEqual(_WRITING_TEST_STR, f.readline())
virtual_partition.clean()
def testPartitionMountMMCBLK0(self):
"""Tests mounting partition.
This tests mounting partition with devices enumerated
in alphabets (ex, mmcblk0).
"""
virtual_partition = tempfile(unique_id='virtual_partition',
suffix='mmcblk0p%d' %
_VIRTUAL_PATITION_NUMBER)
exit_code, ret = commands.getstatusoutput(
'ln -s -f %s %s' %
(self._free_loop_device, virtual_partition.name))
self.assertEqual(0, exit_code)
with MountedMedia(virtual_partition.name[:-2],
_VIRTUAL_PATITION_NUMBER) as path:
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'w') as f:
f.write(_WRITING_TEST_STR)
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'r') as f:
self.assertEqual(_WRITING_TEST_STR, f.readline())
virtual_partition.clean()
def testPartitionMountFloppy(self):
"""Tests mounting a device without partition table."""
with MountedMedia(self._free_loop_device, 1) as path:
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'w') as f:
f.write(_WRITING_TEST_STR)
with open(os.path.join(path, _WRITING_TEST_FILENAME), 'r') as f:
self.assertEqual(_WRITING_TEST_STR, f.readline())
class TestMediaMonitor(unittest.TestCase):
def setUp(self):
"""Creates a temp file to mock as a media device."""
self._virtual_device = tempfile(unique_id='media_util_unitttest')
exit_code, ret = commands.getstatusoutput(
'truncate -s 1048576 %s' % self._virtual_device.name)
self.assertEqual(0, exit_code)
exit_code, ret = commands.getstatusoutput('losetup --show -f %s' %
self._virtual_device.name)
self._free_loop_device = ret
self.assertEqual(0, exit_code)
def tearDown(self):
exit_code, ret = commands.getstatusoutput(
'losetup -d %s' % self._free_loop_device)
self.assertEqual(0, exit_code)
self._virtual_device.clean()
def testMediaMonitor(self):
def on_insert(dev_path):
self.assertEqual(self._free_loop_device, dev_path)
self._media_inserted = True
gtk.main_quit()
def on_remove(dev_path):
self.assertEqual(self._free_loop_device, dev_path)
self._media_removed = True
gtk.main_quit()
def one_time_timer_mock_insert():
monitor._observer.emit('device-event',
_UDEV_ACTION_INSERT,
self._mock_device)
return False
def one_time_timer_mock_remove():
monitor._observer.emit('device-event',
_UDEV_ACTION_REMOVE,
self._mock_device)
return False
self._media_inserted = False
self._media_removed = False
self._context = pyudev.Context()
self._mock_device = pyudev.Device.from_name(
self._context, 'block',
os.path.basename(self._free_loop_device))
# Start the monitor.
TIMEOUT_SECOND = 1
monitor = MediaMonitor()
monitor.start(on_insert=on_insert, on_remove=on_remove)
# Simulating the insertion of a valid media device.
timer_tag = glib.timeout_add_seconds(TIMEOUT_SECOND,
one_time_timer_mock_insert)
gtk.main()
# Simulating the removal of a valid media device.
glib.source_remove(timer_tag)
timer_tag = glib.timeout_add_seconds(TIMEOUT_SECOND,
one_time_timer_mock_remove)
gtk.main()
monitor.stop()
self.assertEqual(True, self._media_inserted)
self.assertEqual(True, self._media_removed)
if __name__ == '__main__':
unittest.main()