| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Standalone local webserver to acquire fingerprints for user studies.""" |
| |
| from __future__ import print_function |
| |
| import argparse |
| from datetime import datetime |
| from distutils import util |
| import json |
| import logging |
| import logging.handlers |
| import os |
| import re |
| import subprocess |
| import sys |
| import threading |
| import time |
| |
| # The following imports will be available on the test image, but will usually |
| # be missing in the SDK. |
| # pylint: disable=import-error |
| import cherrypy |
| import gnupg |
| from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool |
| from ws4py.websocket import WebSocket |
| |
| # Use the image conversion library if available. |
| sys.path.extend(['/usr/local/opt/fpc', '/opt/fpc']) |
| try: |
| import fputils |
| except ImportError: |
| fputils = None |
| |
| DEFAULT_ARGS = { |
| 'finger-count': 2, |
| 'enrollment-count': 20, |
| 'verification-count': 15, |
| 'port': 9000, |
| 'picture-dir': './fingers', |
| 'syslog': False, |
| 'gpg-keyring': '', |
| 'gpg-recipients': '', |
| } |
| |
| errors = [ |
| # FP_SENSOR_LOW_IMAGE_QUALITY 1 |
| 'retrying...', |
| # FP_SENSOR_TOO_FAST 2 |
| 'keeping your finger still during capture', |
| # FP_SENSOR_LOW_SENSOR_COVERAGE 3 |
| 'centering your finger on the sensor', |
| ] |
| |
| SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) |
| HTML_DIR = os.path.join(SCRIPT_DIR, 'html') |
| |
| ECTOOL = 'ectool' |
| # Wait to see a finger on the sensor. |
| FP_MODE_FINGER_DOWN = 2 |
| # Poll until the finger has left the sensor. |
| FP_MODE_FINGER_UP = 4 |
| # Capture the current finger image. |
| FP_MODE_CAPTURE = 8 |
| |
| |
| class FingerWebSocket(WebSocket): |
| """Handle the websocket used finger images acquisition and logging.""" |
| |
| FP_MODE_RE = re.compile(r'^FP mode:\s*\(0x([0-9a-fA-F]+)\)', re.MULTILINE) |
| DIR_FORMAT = '{participant:04d}/{group:s}/{finger:02d}' |
| FILE_FORMAT = '{finger:02d}_{picture:02d}' |
| |
| config = {} |
| pict_dir = '/tmp' |
| # FpUtils class to process images through the external library. |
| utils = None |
| # The optional GNUGPG instance used for encryption. |
| gpg = None |
| gpg_recipients: list = None |
| # The worker thread processing the images. |
| worker = None |
| # The current request processed by the worker thread. |
| current_req = None |
| # The Condition variable the worker thread waits on to get a new request. |
| available_req = threading.Condition() |
| # Force terminating the current processing in the worker thread. |
| abort_request = False |
| |
| def set_config(self, arg): |
| self.config = { |
| 'fingerCount': arg.finger_count, |
| 'enrollmentCount': arg.enrollment_count, |
| 'verificationCount': arg.verification_count |
| } |
| self.pict_dir = arg.picture_dir |
| if fputils: |
| self.utils = fputils.FpUtils() |
| if arg.gpg_keyring: |
| # The verbose flag prints a lot of info to console using print |
| # directly. We use the logging interface instead. |
| self.gpg = gnupg.GPG(keyring=arg.gpg_keyring, verbose=False, |
| options=[ |
| '--no-options', |
| '--no-default-recipient', |
| '--trust-model', 'always', |
| ]) |
| self.gpg_recipients = arg.gpg_recipients.split() |
| if not self.gpg_recipients: |
| cherrypy.log('Error - GPG Recipients is Empty', |
| severity=logging.FATAL) |
| cherrypy.engine.exit() |
| return |
| cherrypy.log(f'GPG Recipients: {self.gpg_recipients}') |
| |
| keyring_list = self.gpg.list_keys() |
| if not keyring_list: |
| cherrypy.log('Error - GPG Keyring is Empty', |
| severity=logging.FATAL) |
| cherrypy.engine.exit() |
| return |
| for k in keyring_list: |
| cherrypy.log(f'GPG Keyring Key {k["fingerprint"]}:') |
| for dk, dv in k.items(): |
| if dv: |
| cherrypy.log(f'\t{dk}: {dv}') |
| |
| # Check if recipients are in the keyring and perfectly |
| # match one to one. There could be a mismatch if a generic search |
| # specifier is used for the recipient that matches more than one |
| # key in the keyring. |
| for recipients in self.gpg_recipients: |
| keyring_list = self.gpg.list_keys(keys=recipients) |
| if not (keyring_list and len(keyring_list) == 1): |
| cherrypy.log( |
| 'Error - GPG Recipients do not match specific keys.', |
| severity=logging.FATAL) |
| cherrypy.engine.exit() |
| return |
| |
| self.worker = threading.Thread(target=self.finger_worker) |
| self.worker.start() |
| |
| def closed(self, code, reason=''): |
| self.abort_request = True |
| cherrypy.log(f'Websocket closed with code {code} / {reason}') |
| if not self.worker: |
| cherrypy.log("Worker thread wasn't running.") |
| return |
| cherrypy.log('Stopping worker thread.') |
| # Wake up the thread so it can exit. |
| self.available_req.acquire() |
| self.available_req.notify() |
| self.available_req.release() |
| self.worker.join(10.0) |
| if self.worker.is_alive(): |
| cherrypy.log('Failed to stop worker thread.') |
| else: |
| cherrypy.log('Successfully stopped worker thread.') |
| |
| def received_message(self, m): |
| if m.is_binary: |
| return # Not supported |
| j = json.loads(m.data) |
| if 'log' in j: |
| cherrypy.log(j['log']) |
| if 'finger' in j: |
| self.finger_request(j) |
| if 'config' in j: |
| self.config_request(j) |
| |
| def make_dirs(self, path): |
| if not os.path.exists(path): |
| os.makedirs(path) |
| |
| def save_to_file(self, data: bytes, file_path: str): |
| """Save data bytes to file at file_path. |
| |
| If GPG is enabled, the .gpg suffix is added to file_path. |
| """ |
| |
| if self.gpg: |
| file_path += '.gpg' |
| enc = self.gpg.encrypt(data, self.gpg_recipients) |
| data = enc.data |
| |
| cherrypy.log(f"Saving file '{file_path}' size {len(data)}") |
| if not data: |
| cherrypy.log('Error - Attempted to save empty file', |
| severity=logging.ERROR) |
| return |
| |
| with open(file_path, 'wb') as f: |
| f.write(data) |
| |
| def ectool(self, command: str, *args) -> bytes: |
| """Run the ectool command and return its stdout as bytes.""" |
| |
| cmdline = [ECTOOL, '--name=cros_fp', command] + list(args) |
| stdout = b'' |
| while not self.abort_request: |
| try: |
| stdout = subprocess.check_output(cmdline) |
| break |
| except subprocess.CalledProcessError as e: |
| cherrypy.log(f"command '{e.cmd}' failed with {e.returncode}") |
| stdout = b'' |
| return stdout |
| |
| def ectool_fpmode(self, *args) -> int: |
| mode = self.ectool('fpmode', *args).decode('utf-8') |
| match_mode = self.FP_MODE_RE.search(mode) |
| return int(match_mode.group(1), 16) if match_mode else -1 |
| |
| def finger_wait_done(self, mode): |
| # Poll until the mode bit has disappeared. |
| while not self.abort_request and self.ectool_fpmode() & mode: |
| time.sleep(0.050) |
| return not self.abort_request |
| |
| def finger_save_image(self, req): |
| directory = os.path.join(self.pict_dir, self.DIR_FORMAT.format(**req)) |
| self.make_dirs(directory) |
| file_base = os.path.join(directory, self.FILE_FORMAT.format(**req)) |
| raw_file = file_base + '.raw' |
| fmi_file = file_base + '.fmi' |
| img = self.ectool('fpframe', 'raw') |
| if not img: |
| cherrypy.log('Failed to download fpframe') |
| return |
| self.save_to_file(img, raw_file) |
| if self.utils: |
| rc, fmi = self.utils.image_data_to_fmi(img) |
| if rc == 0: |
| self.save_to_file(fmi, fmi_file) |
| else: |
| cherrypy.log(f'FMI conversion failed {rc}') |
| |
| def finger_process(self, req): |
| # Ensure the user has removed the finger between 2 captures. |
| if not self.finger_wait_done(FP_MODE_FINGER_UP): |
| return |
| # Capture the finger image when the finger is on the sensor. |
| self.ectool_fpmode('capture', 'vendor') |
| t0 = time.time() |
| # Wait for the image being available. |
| if not self.finger_wait_done(FP_MODE_CAPTURE): |
| return |
| t1 = time.time() |
| # Detect the finger removal before the next capture. |
| self.ectool_fpmode('fingerup') |
| # Record the outcome of the capture. |
| cherrypy.log( |
| f'Captured finger {req["finger"]:02d}:{req["picture"]:02d}' |
| f' in {t1 - t0:.2f}s') |
| req['result'] = 'ok' # ODER req['result'] = errors[ERRNUM_TBD] |
| # Retrieve the finger image. |
| self.finger_save_image(req) |
| # Tell the page about the acquisition result. |
| self.send(json.dumps(req), False) |
| |
| def finger_worker(self): |
| while not self.server_terminated and not self.client_terminated: |
| self.available_req.acquire() |
| while not self.current_req and not self.abort_request: |
| self.available_req.wait() |
| self.finger_process(self.current_req) |
| self.current_req = None |
| self.available_req.release() |
| |
| def finger_request(self, req): |
| # Ask the thread to exit the waiting loops |
| # it will wait on the acquire() below if needed. |
| self.abort_request = True |
| # Ask the thread to process the new request. |
| self.available_req.acquire() |
| self.abort_request = False |
| self.current_req = req |
| self.available_req.notify() |
| self.available_req.release() |
| |
| def config_request(self, req): |
| # Populate the configuration. |
| req['config'] = self.config |
| self.send(json.dumps(req), False) |
| |
| |
| class Root(object): |
| """Serve the static HTML/CSS and connect the websocket.""" |
| |
| def __init__(self, cmdline_args): |
| self.args = cmdline_args |
| |
| @cherrypy.expose |
| def index(self): |
| index_file = os.path.join(SCRIPT_DIR, 'html/index.html') |
| with open(index_file, encoding='utf-8') as f: |
| return f.read() |
| |
| @cherrypy.expose |
| def finger(self): |
| cherrypy.request.ws_handler.set_config(self.args) |
| |
| |
| def environment_parameters(default_params: dict) -> dict: |
| """Return |default_params| after overriding with environment vars. |
| |
| Given a dictionary of default runtime parameters, return the same |
| dictionary with parameters overridden by their equivalent environment |
| variable. |
| |
| A corresponding environment variable is the uppercase equivalent of the |
| parameter name, with all '-' replaced with '_'. |
| For examples, parameter "log-dir" corresponds to environment variable |
| "LOG_DIR". |
| """ |
| |
| env_params = default_params.copy() |
| for param in default_params: |
| env_var = param.upper().replace('-', '_') |
| arg_type = type(default_params[param]) |
| value = os.environ.get(env_var) |
| if value is not None: |
| try: |
| if arg_type is bool: |
| value = bool(util.strtobool(value)) |
| elif arg_type is type(None): |
| raise Exception('Cannot handle type None in default list.') |
| else: |
| value = arg_type(value) |
| except ValueError: |
| raise ValueError(env_var) |
| env_params[param] = value |
| return env_params |
| |
| |
| def main(argv: list): |
| parser = argparse.ArgumentParser() |
| |
| # Read environment variables as the arg default values. |
| try: |
| env_default = environment_parameters(DEFAULT_ARGS) |
| except ValueError as e: |
| parser.error(f'failed to parse {e}') |
| |
| # Get study parameters from the command-line. |
| parser.add_argument('-f', '--finger-count', type=int, |
| default=env_default['finger-count'], |
| help='Number of fingers acquired per user') |
| parser.add_argument('-e', '--enrollment-count', type=int, |
| default=env_default['enrollment-count'], |
| help='Number of enrollment images per finger') |
| parser.add_argument('-v', '--verification-count', type=int, |
| default=env_default['verification-count'], |
| help='Number of verification images per finger') |
| parser.add_argument('-p', '--port', type=int, |
| default=env_default['port'], |
| help='The port for the webserver') |
| parser.add_argument('-d', '--picture-dir', |
| default=env_default['picture-dir'], |
| help='Directory for the fingerprint captures') |
| parser.add_argument('-l', '--log-dir', |
| default=env_default['log-dir'], |
| help='Log files directory') |
| parser.add_argument('-s', '--syslog', action='store_true', |
| default=env_default['syslog'], |
| help='Log to syslog') |
| parser.add_argument('-k', '--gpg-keyring', type=str, |
| default=env_default['gpg-keyring'], |
| help='Path to the GPG keyring') |
| parser.add_argument('-r', '--gpg-recipients', type=str, |
| default=env_default['gpg-recipients'], |
| help='User IDs of GPG recipients separated by space') |
| args = parser.parse_args(argv) |
| |
| # GPG can only be used when gpg-keyring and gpg-recipient are specified. |
| if args.gpg_keyring and not args.gpg_recipients: |
| parser.error('gpg-recipients must be specified with gpg-keyring') |
| if args.gpg_recipients and not args.gpg_keyring: |
| parser.error('gpg-keyring must be specified with gpg-recipients') |
| if args.gpg_keyring and not os.access(args.gpg_keyring, os.R_OK): |
| parser.error(f'cannot read gpg-keyring file {args.gpg_keyring}') |
| |
| # Configure cherrypy server. |
| cherrypy.config.update({'server.socket_port': args.port}) |
| |
| # Configure logging. |
| cherrypy.config.update({'log.screen': False}) |
| |
| loggers = [] |
| l = logging.getLogger('cherrypy.access') |
| l.setLevel(logging.DEBUG) |
| loggers.append(l) |
| l = logging.getLogger('cherrypy.error') |
| l.setLevel(logging.DEBUG) |
| loggers.append(l) |
| l = logging.getLogger('gnupg') |
| l.setLevel(logging.INFO) |
| loggers.append(l) |
| |
| if args.log_dir: |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| log_name = f'server-{timestamp}.log' |
| h = logging.handlers.RotatingFileHandler( |
| filename=os.path.join(args.log_dir, log_name)) |
| for l in loggers: |
| l.addHandler(h) |
| if args.syslog: |
| h = logging.handlers.SysLogHandler( |
| address='/dev/log', |
| facility=logging.handlers.SysLogHandler.LOG_LOCAL1) |
| for l in loggers: |
| l.addHandler(h) |
| if not args.log_dir and not args.syslog: |
| h = logging.StreamHandler() |
| for l in loggers: |
| l.addHandler(h) |
| |
| WebSocketPlugin(cherrypy.engine).subscribe() |
| cherrypy.tools.websocket = WebSocketTool() |
| |
| cherrypy.quickstart(Root(args), '/', config={ |
| '/finger': {'tools.websocket.on': True, |
| 'tools.websocket.handler_cls': FingerWebSocket}, |
| '/static': {'tools.staticdir.on': True, |
| 'tools.staticdir.dir': HTML_DIR}}) |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(sys.argv[1:])) |