| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging, multiprocessing, os, time |
| import numpy |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.cros.camera import camera_utils |
| from autotest_lib.client.cros.power import sys_power |
| from autotest_lib.client.cros.video import device_capability |
| from autotest_lib.client.common_lib import error |
| |
| try: |
| # HACK: We need to succeed if OpenCV is missing to allow "emerge |
| # autotest-tests" to succeed, as OpenCV is not available in the build |
| # environment. It is available on the target where the test actually runs. |
| import cv2 |
| except ImportError: |
| pass |
| |
| |
| def async_suspend(): |
| try: |
| time.sleep(5) # allow some time to start capturing |
| sys_power.kernel_suspend(10) |
| except: |
| # Any exception will be re-raised in main process, but the stack trace |
| # will be wrong. Log it here with the correct stack trace. |
| logging.exception('suspend raised exception') |
| raise |
| |
| |
| class power_CameraSuspend(test.test): |
| """Test camera before & after suspend.""" |
| |
| version = 1 |
| |
| def run_once(self, capability, save_images=False): |
| device_capability.DeviceCapability().ensure_capability(capability) |
| # open the camera via opencv |
| cam_name, cam_index = camera_utils.find_camera() |
| if cam_index is None: |
| raise error.TestError('no camera found') |
| cam = cv2.VideoCapture(cam_index) |
| |
| # kick off async suspend |
| logging.info('starting subprocess to suspend system') |
| pool = multiprocessing.Pool(processes=1) |
| # TODO(spang): Move async suspend to library. |
| result = pool.apply_async(async_suspend) |
| |
| # capture images concurrently with suspend |
| capture_start = time.time() |
| logging.info('start capturing at %d', capture_start) |
| image_count = 0 |
| resume_count = None |
| last_image = None |
| |
| while True: |
| # terminate if we've captured a few frames after resume |
| if result.ready() and resume_count is None: |
| result.get() # reraise exception, if any |
| resume_count = image_count |
| logging.info('suspend task finished') |
| if resume_count is not None and image_count - resume_count >= 10: |
| break |
| |
| # capture one frame |
| image_ok, image = cam.read() |
| image_count += 1 |
| if not image_ok: |
| logging.error('failed capture at image %d', image_count) |
| raise error.TestFail('image capture failed from %s', cam_name) |
| |
| # write image to disk, if requested |
| if save_images and image_count <= 200: |
| path = os.path.join(self.outputdir, '%03d.jpg' % image_count) |
| cv2.imwrite(path, image) |
| |
| # verify camera produces a unique image on each capture |
| if last_image is not None and numpy.array_equal(image, last_image): |
| raise error.TestFail('camera produced two identical images') |
| last_image = image |
| |
| capture_end = time.time() |
| logging.info('done capturing at %d', capture_end) |
| |
| logging.info('captured %d frames in %d seconds', |
| image_count, capture_end - capture_start) |