blob: 411ae32c4e2093e3360828d0e4b6483866c6a574 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Standalone local webserver to acquire fingerprints for user studies."""
from __future__ import print_function
import argparse
from datetime import datetime
import json
import logging
import logging.handlers
import os
import re
import subprocess
import sys
import threading
import time
# The following imports will be available on the test image, but will usually
# be missing in the SDK.
# pylint: disable=import-error
import cherrypy
import gnupg
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
# Use the image conversion library if available.
sys.path.extend(['/usr/local/opt/fpc', '/opt/fpc'])
try:
import fputils
except ImportError:
fputils = None
errors = [
# FP_SENSOR_LOW_IMAGE_QUALITY 1
'retrying...',
# FP_SENSOR_TOO_FAST 2
'keeping your finger still during capture',
# FP_SENSOR_LOW_SENSOR_COVERAGE 3
'centering your finger on the sensor',
]
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
HTML_DIR = os.path.join(SCRIPT_DIR, 'html')
ECTOOL = 'ectool'
# Wait to see a finger on the sensor.
FP_MODE_FINGER_DOWN = 2
# Poll until the finger has left the sensor.
FP_MODE_FINGER_UP = 4
# Capture the current finger image.
FP_MODE_CAPTURE = 8
class FingerWebSocket(WebSocket):
"""Handle the websocket used finger images acquisition and logging."""
FP_MODE_RE = re.compile(r'^FP mode:\s*\(0x([0-9a-fA-F]+)\)', re.MULTILINE)
DIR_FORMAT = '{participant:04d}/{group:s}/{finger:02d}'
FILE_FORMAT = '{finger:02d}_{picture:02d}'
config = {}
pict_dir = '/tmp'
# FpUtils class to process images through the external library.
utils = None
# The optional GNUGPG instance used for encryption.
gpg = None
gpg_recipients: list = None
# The worker thread processing the images.
worker = None
# The current request processed by the worker thread.
current_req = None
# The Condition variable the worker thread waits on to get a new request.
available_req = threading.Condition()
# Force terminating the current processing in the worker thread.
abort_request = False
def set_config(self, arg):
self.config = {
'fingerCount': arg.finger_count,
'enrollmentCount': arg.enrollment_count,
'verificationCount': arg.verification_count
}
self.pict_dir = arg.picture_dir
if fputils:
self.utils = fputils.FpUtils()
if arg.gpg_keyring:
# The verbose flag prints a lot of info to console using print
# directly. We use the logging interface instead.
self.gpg = gnupg.GPG(keyring=arg.gpg_keyring, verbose=False,
options=[
'--no-options',
'--no-default-recipient',
'--trust-model', 'always',
])
self.gpg_recipients = arg.gpg_recipients.split()
if not self.gpg_recipients:
cherrypy.log('Error - GPG Recipients is Empty',
severity=logging.FATAL)
cherrypy.engine.exit()
return
cherrypy.log(f'GPG Recipients: {self.gpg_recipients}')
keyring_list = self.gpg.list_keys()
if not keyring_list:
cherrypy.log('Error - GPG Keyring is Empty',
severity=logging.FATAL)
cherrypy.engine.exit()
return
for k in keyring_list:
cherrypy.log(f'GPG Keyring Key {k["fingerprint"]}:')
for dk, dv in k.items():
if dv:
cherrypy.log(f'\t{dk}: {dv}')
# Check if recipients are in the keyring and perfectly
# match one to one. There could be a mismatch if a generic search
# specifier is used for the recipient that matches more than one
# key in the keyring.
for recipients in self.gpg_recipients:
keyring_list = self.gpg.list_keys(keys=recipients)
if not (keyring_list and len(keyring_list) == 1):
cherrypy.log(
'Error - GPG Recipients do not match specific keys.',
severity=logging.FATAL)
cherrypy.engine.exit()
return
self.worker = threading.Thread(target=self.finger_worker)
self.worker.start()
def closed(self, code, reason=''):
self.abort_request = True
cherrypy.log(f'Websocket closed with code {code} / {reason}')
if not self.worker:
cherrypy.log("Worker thread wasn't running.")
return
cherrypy.log('Stopping worker thread.')
# Wake up the thread so it can exit.
self.available_req.acquire()
self.available_req.notify()
self.available_req.release()
self.worker.join(10.0)
if self.worker.is_alive():
cherrypy.log('Failed to stop worker thread.')
else:
cherrypy.log('Successfully stopped worker thread.')
def received_message(self, m):
if m.is_binary:
return # Not supported
j = json.loads(m.data)
if 'log' in j:
cherrypy.log(j['log'])
if 'finger' in j:
self.finger_request(j)
if 'config' in j:
self.config_request(j)
def make_dirs(self, path):
if not os.path.exists(path):
os.makedirs(path)
def save_to_file(self, data: bytes, file_path: str):
"""Save data bytes to file at file_path.
If GPG is enabled, the .gpg suffix is added to file_path.
"""
if self.gpg:
file_path += '.gpg'
enc = self.gpg.encrypt(data, self.gpg_recipients)
data = enc.data
cherrypy.log(f"Saving file '{file_path}' size {len(data)}")
if not data:
cherrypy.log('Error - Attempted to save empty file',
severity=logging.ERROR)
return
with open(file_path, 'wb') as f:
f.write(data)
def ectool(self, command: str, *args) -> bytes:
"""Run the ectool command and return its stdout as bytes."""
cmdline = [ECTOOL, '--name=cros_fp', command] + list(args)
stdout = b''
while not self.abort_request:
try:
stdout = subprocess.check_output(cmdline)
break
except subprocess.CalledProcessError as e:
cherrypy.log(f"command '{e.cmd}' failed with {e.returncode}")
stdout = b''
return stdout
def ectool_fpmode(self, *args) -> int:
mode = self.ectool('fpmode', *args).decode('utf-8')
match_mode = self.FP_MODE_RE.search(mode)
return int(match_mode.group(1), 16) if match_mode else -1
def finger_wait_done(self, mode):
# Poll until the mode bit has disappeared.
while not self.abort_request and self.ectool_fpmode() & mode:
time.sleep(0.050)
return not self.abort_request
def finger_save_image(self, req):
directory = os.path.join(self.pict_dir, self.DIR_FORMAT.format(**req))
self.make_dirs(directory)
file_base = os.path.join(directory, self.FILE_FORMAT.format(**req))
raw_file = file_base + '.raw'
fmi_file = file_base + '.fmi'
img = self.ectool('fpframe', 'raw')
if not img:
cherrypy.log('Failed to download fpframe')
return
self.save_to_file(img, raw_file)
if self.utils:
rc, fmi = self.utils.image_data_to_fmi(img)
if rc == 0:
self.save_to_file(fmi, fmi_file)
else:
cherrypy.log(f'FMI conversion failed {rc}')
def finger_process(self, req):
# Ensure the user has removed the finger between 2 captures.
if not self.finger_wait_done(FP_MODE_FINGER_UP):
return
# Capture the finger image when the finger is on the sensor.
self.ectool_fpmode('capture', 'vendor')
t0 = time.time()
# Wait for the image being available.
if not self.finger_wait_done(FP_MODE_CAPTURE):
return
t1 = time.time()
# Detect the finger removal before the next capture.
self.ectool_fpmode('fingerup')
# Record the outcome of the capture.
cherrypy.log(
f'Captured finger {req["finger"]:02d}:{req["picture"]:02d}'
f' in {t1 - t0:.2f}s')
req['result'] = 'ok' # ODER req['result'] = errors[ERRNUM_TBD]
# Retrieve the finger image.
self.finger_save_image(req)
# Tell the page about the acquisition result.
self.send(json.dumps(req), False)
def finger_worker(self):
while not self.server_terminated and not self.client_terminated:
self.available_req.acquire()
while not self.current_req and not self.abort_request:
self.available_req.wait()
self.finger_process(self.current_req)
self.current_req = None
self.available_req.release()
def finger_request(self, req):
# Ask the thread to exit the waiting loops
# it will wait on the acquire() below if needed.
self.abort_request = True
# Ask the thread to process the new request.
self.available_req.acquire()
self.abort_request = False
self.current_req = req
self.available_req.notify()
self.available_req.release()
def config_request(self, req):
# Populate the configuration.
req['config'] = self.config
self.send(json.dumps(req), False)
class Root(object):
"""Serve the static HTML/CSS and connect the websocket."""
def __init__(self, cmdline_args):
self.args = cmdline_args
@cherrypy.expose
def index(self):
index_file = os.path.join(SCRIPT_DIR, 'html/index.html')
with open(index_file, encoding='utf-8') as f:
return f.read()
@cherrypy.expose
def finger(self):
cherrypy.request.ws_handler.set_config(self.args)
def main(argv: list):
# Get study parameters from the command-line.
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--finger-count', type=int, default=2,
help='Number of fingers acquired per user')
parser.add_argument('-e', '--enrollment-count', type=int, default=20,
help='Number of enrollment images per finger')
parser.add_argument('-v', '--verification-count', type=int, default=15,
help='Number of verification images per finger')
parser.add_argument('-p', '--port', type=int, default=9000,
help='The port for the webserver')
parser.add_argument('-d', '--picture-dir', default='./fingers',
help='Directory for the fingerprint captures')
parser.add_argument('-l', '--log-dir',
help='Log files directory')
parser.add_argument('-s', '--syslog', action='store_true',
help='Log to syslog')
parser.add_argument('-k', '--gpg-keyring', type=str,
help='Path to the GPG keyring')
parser.add_argument('-r', '--gpg-recipients', type=str,
help='User IDs of GPG recipients separated by space')
args = parser.parse_args(argv)
# GPG can only be used when gpg-keyring and gpg-recipient are specified.
if args.gpg_keyring and not args.gpg_recipients:
parser.error('gpg-recipients must be specified with gpg-keyring')
if args.gpg_recipients and not args.gpg_keyring:
parser.error('gpg-keyring must be specified with gpg-recipients')
if args.gpg_keyring and not os.access(args.gpg_keyring, os.R_OK):
parser.error(f'cannot read gpg-keyring file {args.gpg_keyring}')
# Configure cherrypy server.
cherrypy.config.update({'server.socket_port': args.port})
# Configure logging.
cherrypy.config.update({'log.screen': False})
loggers = []
l = logging.getLogger('cherrypy.access')
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger('cherrypy.error')
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger('gnupg')
l.setLevel(logging.INFO)
loggers.append(l)
if args.log_dir:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_name = f'server-{timestamp}.log'
h = logging.handlers.RotatingFileHandler(
filename=os.path.join(args.log_dir, log_name))
for l in loggers:
l.addHandler(h)
if args.syslog:
h = logging.handlers.SysLogHandler(
address='/dev/log',
facility=logging.handlers.SysLogHandler.LOG_LOCAL1)
for l in loggers:
l.addHandler(h)
if not args.log_dir and not args.syslog:
h = logging.StreamHandler()
for l in loggers:
l.addHandler(h)
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
cherrypy.quickstart(Root(args), '/', config={
'/finger': {'tools.websocket.on': True,
'tools.websocket.handler_cls': FingerWebSocket},
'/static': {'tools.staticdir.on': True,
'tools.staticdir.dir': HTML_DIR}})
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))