autotest: Extract Servo keyboard flasher from admin_audit

Extraction the logic will allow as re-use it as part of repair flow.

BUG=b:177928411
TEST=run local audit task to flash DUT with/without firmware

Change-Id: I8daf93ff37ee196c1d577ad2f5000a480747d754
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2674822
Tested-by: Otabek Kasimov <otabek@google.com>
Auto-Submit: Otabek Kasimov <otabek@google.com>
Reviewed-by: Garry Wang <xianuowang@chromium.org>
Commit-Queue: Otabek Kasimov <otabek@google.com>
diff --git a/server/cros/servo/keyboard/__init__.py b/server/cros/servo/keyboard/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/server/cros/servo/keyboard/__init__.py
diff --git a/server/cros/servo/keyboard/common.py b/server/cros/servo/keyboard/common.py
new file mode 100644
index 0000000..6eebf8b
--- /dev/null
+++ b/server/cros/servo/keyboard/common.py
@@ -0,0 +1,15 @@
+#!/usr/bin/python2
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Build relative paths for files with root of autotest_lib.
+
+import os, sys
+dirname = os.path.dirname(sys.modules[__name__].__file__)
+autotest_dir = os.path.abspath(os.path.join(dirname, '../../../..'))
+client_dir = os.path.join(autotest_dir, 'client')
+sys.path.insert(0, client_dir)
+import setup_modules
+sys.path.pop(0)
+setup_modules.setup(base_path=autotest_dir, root_module_name='autotest_lib')
diff --git a/site_utils/admin_audit/data/keyboard.hex b/server/cros/servo/keyboard/data/keyboard.hex
similarity index 100%
rename from site_utils/admin_audit/data/keyboard.hex
rename to server/cros/servo/keyboard/data/keyboard.hex
diff --git a/server/cros/servo/keyboard/servo_keyboard_flasher.py b/server/cros/servo/keyboard/servo_keyboard_flasher.py
new file mode 100644
index 0000000..34a8084
--- /dev/null
+++ b/server/cros/servo/keyboard/servo_keyboard_flasher.py
@@ -0,0 +1,111 @@
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import time
+import os
+
+import common
+from autotest_lib.client.common_lib import error
+from autotest_lib.server.cros import servo_keyboard_utils
+
+
+class ServoKeyboardMapFlasher():
+    """Flash the servo keyboard map on servo."""
+
+    _ATMEGA_RESET_DELAY = 0.2
+    _ATMEGA_FLASH_TIMEOUT = 120
+    _USB_PRESENT_DELAY = 1
+
+    # Command to detect LUFA Keyboard Demo by VID.
+    LSUSB_CMD = 'lsusb -d %s:' % servo_keyboard_utils.ATMEL_USB_VENDOR_ID
+    LSUSB_TIMEOUT = 30
+
+    def is_image_supported(self, host):
+        """Check if servo keyboard map supported on host
+
+        @param host: CrosHost instance
+        """
+        if host.run('hash dfu-programmer', ignore_status=True).exit_status:
+            return False
+        return True
+
+    def update(self, host):
+        """Update servo keyboard map firmware on the host if required.
+
+        The process will verify present of the keyboard firmware on the host
+        and flash it if device was not detected.
+
+        @param host: CrosHost instance
+        """
+        if not self.is_image_supported(host):
+            raise Exception(
+                    'The image is too old that does not have dfu-programmer.')
+
+        try:
+            logging.debug('Starting flashing the keyboard map.')
+            host.servo.set_nocheck('init_usb_keyboard', 'on')
+
+            if self._is_keyboard_present(host):
+                logging.info('Already using the new keyboard map.')
+                return
+
+            self._flash_keyboard_map(host)
+        finally:
+            # Restore the default settings.
+            # Select the chip on the USB mux unless using Servo V4
+            if 'servo_v4' not in host.servo.get_servo_type():
+                host.servo.set('usb_mux_sel4', 'on')
+
+    def _flash_keyboard_map(self, host):
+        """FLash servo keyboard firmware on the host."""
+        servo = host.servo
+        # Boot AVR into DFU mode by enabling the HardWareBoot mode
+        # strapping and reset.
+        servo.set_get_all([
+                'at_hwb:on', 'atmega_rst:on',
+                'sleep:%f' % self._ATMEGA_RESET_DELAY, 'atmega_rst:off',
+                'sleep:%f' % self._ATMEGA_RESET_DELAY, 'at_hwb:off'
+        ])
+
+        time.sleep(self._USB_PRESENT_DELAY)
+        result = host.run(self.LSUSB_CMD,
+                          timeout=self.LSUSB_TIMEOUT).stdout.strip()
+        if not 'Atmel Corp. atmega32u4 DFU bootloader' in result:
+            raise Exception('Not an expected chip: %s', result)
+
+        # Update the keyboard map.
+        bindir = os.path.dirname(os.path.realpath(__file__))
+        local_path = os.path.join(bindir, 'data', 'keyboard.hex')
+        host.send_file(local_path, '/tmp')
+        logging.info('Updating the keyboard map...')
+        host.run('dfu-programmer atmega32u4 erase --force',
+                 timeout=self._ATMEGA_FLASH_TIMEOUT)
+        host.run('dfu-programmer atmega32u4 flash /tmp/keyboard.hex',
+                 timeout=self._ATMEGA_FLASH_TIMEOUT)
+
+        # Reset the chip.
+        servo.set_get_all([
+                'atmega_rst:on',
+                'sleep:%f' % self._ATMEGA_RESET_DELAY, 'atmega_rst:off'
+        ])
+        if self._is_keyboard_present(host):
+            logging.info('Update successfully!')
+        else:
+            raise Exception('Update failed!')
+
+    def _is_keyboard_present(self, host):
+        """Verify if servo keyboard is present on the host.
+
+        The keyboard will be detected as USB device on the host with name:
+                'Atmel Corp. LUFA Keyboard Demo Application'
+        """
+        time.sleep(self._USB_PRESENT_DELAY)
+        result = host.run(self.LSUSB_CMD,
+                          timeout=self.LSUSB_TIMEOUT).stdout.strip()
+        logging.debug('got the result: %s', result)
+        if ('LUFA Keyboard Demo' in result
+                    and servo_keyboard_utils.is_servo_usb_wake_capable(host)):
+            return True
+        return False
diff --git a/site_utils/admin_audit/verifiers.py b/site_utils/admin_audit/verifiers.py
index 8ad7ff7..a000789 100644
--- a/site_utils/admin_audit/verifiers.py
+++ b/site_utils/admin_audit/verifiers.py
@@ -5,18 +5,16 @@
 
 import logging
 
-import common
 import base
 import constants
 import servo_updater
-import time
-import os
 import re
 
+import common
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import utils as client_utils
 from autotest_lib.server.cros.storage import storage_validate as storage
-from autotest_lib.server.cros import servo_keyboard_utils
+from autotest_lib.server.cros.servo.keyboard import servo_keyboard_flasher
 from autotest_lib.site_utils.admin_audit import rpm_validator
 
 try:
@@ -256,12 +254,6 @@
 class FlashServoKeyboardMapVerifier(base._BaseDUTVerifier):
     """Flash the keyboard map on servo."""
 
-    _ATMEGA_RESET_DELAY = 0.2
-    _USB_PRESENT_DELAY = 1
-
-    # Command to detect LUFA Keyboard Demo by VID.
-    LSUSB_CMD = 'lsusb -d %s:' % servo_keyboard_utils.ATMEL_USB_VENDOR_ID
-
     def _verify(self):
         if not self.host_is_up():
             raise base.AuditError('Host is down')
@@ -269,88 +261,9 @@
             raise base.AuditError('Servo not initialized')
 
         host = self.get_host()
-        servo = host.servo
-        status = STATUS_FAIL
-        try:
-            logging.info('Starting flashing the keyboard map.')
-            status = self._flash_keyboard_map(host, servo)
-            logging.info('Set status: %s', status)
-            if status == STATUS_FAIL:
-                self._send_metrics()
-        except Exception as e:
-            # The possible errors is timeout of commands.
-            logging.info('Failed to flash servo keyboard map; %s', e)
-            self._send_metrics()
-        finally:
-            # Restore the default settings.
-            # Select the chip on the USB mux unless using Servo V4
-            if 'servo_v4' not in servo.get_servo_version():
-                servo.set('usb_mux_sel4', 'on')
-        if status == STATUS_FAIL:
-            raise base.AuditError('Failed to flash keyboard map on servo')
-
-    def _flash_keyboard_map(self, host, servo):
-        if host.run('hash dfu-programmer', ignore_status=True).exit_status:
-            logging.info(
-                'The image is too old that does not have dfu-programmer.')
-            return STATUS_SKIPPED
-
-        servo.set_nocheck('init_usb_keyboard', 'on')
-
-        if self._is_keyboard_present(host):
-            logging.info('Already using the new keyboard map.')
-            return STATUS_SUCCESS
-
-        # Boot AVR into DFU mode by enabling the HardWareBoot mode
-        # strapping and reset.
-        servo.set_get_all(['at_hwb:on',
-                            'atmega_rst:on',
-                            'sleep:%f' % self._ATMEGA_RESET_DELAY,
-                            'atmega_rst:off',
-                            'sleep:%f' % self._ATMEGA_RESET_DELAY,
-                            'at_hwb:off'])
-
-        time.sleep(self._USB_PRESENT_DELAY)
-        result = host.run(self.LSUSB_CMD, timeout=30).stdout.strip()
-        if not 'Atmel Corp. atmega32u4 DFU bootloader' in result:
-            logging.info('Not an expected chip: %s', result)
-            return STATUS_FAIL
-
-        # Update the keyboard map.
-        bindir = os.path.dirname(os.path.realpath(__file__))
-        local_path = os.path.join(bindir, 'data', 'keyboard.hex')
-        host.send_file(local_path, '/tmp')
-        logging.info('Updating the keyboard map...')
-        host.run('dfu-programmer atmega32u4 erase --force', timeout=120)
-        host.run('dfu-programmer atmega32u4 flash /tmp/keyboard.hex',
-                 timeout=120)
-
-        # Reset the chip.
-        servo.set_get_all(['atmega_rst:on',
-                            'sleep:%f' % self._ATMEGA_RESET_DELAY,
-                            'atmega_rst:off'])
-        if self._is_keyboard_present(host):
-            logging.info('Update successfully!')
-            return STATUS_SUCCESS
-
-        logging.info('Update failed!')
-        return STATUS_FAIL
-
-    def _is_keyboard_present(self, host):
-        # Check the result of lsusb.
-        time.sleep(self._USB_PRESENT_DELAY)
-        result = host.run(self.LSUSB_CMD, timeout=30).stdout.strip()
-        logging.info('got the result: %s', result)
-        if ('LUFA Keyboard Demo' in result and
-            servo_keyboard_utils.is_servo_usb_wake_capable(host)):
-            return True
-        return False
-
-    def _send_metrics(self):
-        host = self.get_host()
-        data = {'host': host.hostname, 'status': STATUS_FAIL}
-        metrics.Counter(
-            'chromeos/autotest/audit/servo_keyboard').increment(fields=data)
+        flasher = servo_keyboard_flasher.ServoKeyboardMapFlasher()
+        if flasher.is_image_supported(host):
+            flasher.update(host)
 
 
 class VerifyDUTMacAddress(base._BaseDUTVerifier):