blob: 1eacb0ae2955993762ca0a1e9e1a7fd485f5568b [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Standalone local webserver to acquire fingerprints for user studies."""
from __future__ import annotations
from __future__ import print_function
import argparse
import datetime
import json
import logging
import logging.handlers
import os
import pathlib
import re
import subprocess
import sys
import threading
import time
from typing import Any, Literal, Optional, Union
# The following imports will be available on the test image, but will usually
# be missing in the SDK.
# pylint: disable=import-error
import cherrypy
import gnupg
from ws4py import messaging
from ws4py.server.cherrypyserver import WebSocketPlugin
from ws4py.server.cherrypyserver import WebSocketTool
from ws4py.websocket import WebSocket
# Use the image conversion library if available.
sys.path.extend(["/usr/local/opt/fpc", "/opt/fpc"])
try:
import fputils as fpcutils
except ImportError:
fpcutils = None
DEFAULT_ARGS = {
"finger-count": 2,
"enrollment-count": 20,
"verification-count": 15,
"port": 9000,
"picture-dir": "./fingers",
"syslog": False,
"gpg-keyring": "",
"gpg-recipients": "",
"log-dir": "/var/log/fpstudy",
"export-fmi": False,
}
errors = [
# FP_SENSOR_LOW_IMAGE_QUALITY 1
"Please try again by retrying...",
# FP_SENSOR_TOO_FAST 2
"Please try again by keeping your finger still during capture",
# FP_SENSOR_LOW_SENSOR_COVERAGE 3
"Please try again by centering your finger on the sensor",
]
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
HTML_DIR = os.path.join(SCRIPT_DIR, "html")
ECTOOL = "ectool"
# Wait to see a finger on the sensor.
FP_MODE_FINGER_DOWN = 2
# Poll until the finger has left the sensor.
FP_MODE_FINGER_UP = 4
# Capture the current finger image.
FP_MODE_CAPTURE = 8
class FingerWebSocket(WebSocket):
"""Handle the websocket used finger images acquisition and logging."""
FP_MODE_RE = re.compile(r"^FP mode:\s*\(0x([0-9a-fA-F]+)\)", re.MULTILINE)
DIR_FORMAT = "{participant:04d}/{group:s}/{finger:02d}"
FILE_FORMAT = "{finger:02d}_{picture:02d}"
config = {}
pict_dir = pathlib.Path("/tmp")
# fpcutils.FpUtils class to process images through the external library.
fpcutils = None
export_fmi = False
# The optional GNUGPG instance used for encryption.
gpg = None
gpg_recipients: list = None
# The worker thread processing the images.
worker = None
# The current request processed by the worker thread.
current_req = None
# The Condition variable the worker thread waits on to get a new request.
available_req = threading.Condition()
# Force terminating the current processing in the worker thread.
abort_request = False
def get_finger_capture_dir_path(self, req: dict[str, Any]) -> pathlib.Path:
"""Get the path to the current requested finger's capture directory."""
directory = self.pict_dir / pathlib.Path(self.DIR_FORMAT.format(**req))
directory.mkdir(parents=True, exist_ok=True)
return directory
def get_finger_capture_base_path(self, req: dict[str, Any]) -> pathlib.Path:
"""Get the file name/path for the current finger capture.
This is the full path for the current finger and picture without the
file extension.
"""
directory = self.get_finger_capture_dir_path(req)
file_base = directory / pathlib.Path(self.FILE_FORMAT.format(**req))
return file_base
def set_config(self, arg: argparse.Namespace):
self.config = {
"fingerCount": arg.finger_count,
"enrollmentCount": arg.enrollment_count,
"verificationCount": arg.verification_count,
}
self.pict_dir = pathlib.Path(arg.picture_dir)
if fpcutils:
self.fpcutils = fpcutils.FpUtils()
if arg.export_fmi:
self.export_fmi = True
if arg.gpg_keyring:
# The verbose flag prints a lot of info to console using print
# directly. We use the logging interface instead.
self.gpg = gnupg.GPG(
keyring=arg.gpg_keyring,
verbose=False,
options=[
"--no-options",
"--no-default-recipient",
"--trust-model",
"always",
],
)
self.gpg_recipients = arg.gpg_recipients.split()
if not self.gpg_recipients:
cherrypy.log(
"Error - GPG Recipients is Empty", severity=logging.FATAL
)
cherrypy.engine.exit()
return
cherrypy.log(f"GPG Recipients: {self.gpg_recipients}")
keyring_list = self.gpg.list_keys()
if not keyring_list:
cherrypy.log(
"Error - GPG Keyring is Empty", severity=logging.FATAL
)
cherrypy.engine.exit()
return
for k in keyring_list:
cherrypy.log(f'GPG Keyring Key {k["fingerprint"]}:')
for dk, dv in k.items():
if dv:
cherrypy.log(f"\t{dk}: {dv}")
# Check if recipients are in the keyring and perfectly
# match one to one. There could be a mismatch if a generic search
# specifier is used for the recipient that matches more than one
# key in the keyring.
for recipients in self.gpg_recipients:
keyring_list = self.gpg.list_keys(keys=recipients)
if not (keyring_list and len(keyring_list) == 1):
cherrypy.log(
"Error - GPG Recipients do not match specific keys.",
severity=logging.FATAL,
)
cherrypy.engine.exit()
return
self.worker = threading.Thread(target=self.finger_worker)
self.worker.start()
def closed(self, code, reason=""):
self.abort_request = True
cherrypy.log(f"Websocket closed with code {code} / {reason}")
if not self.worker:
cherrypy.log("Worker thread wasn't running.")
return
cherrypy.log("Stopping worker thread.")
# Wake up the thread so it can exit.
self.available_req.acquire()
self.available_req.notify()
self.available_req.release()
self.worker.join(10.0)
if self.worker.is_alive():
cherrypy.log("Failed to stop worker thread.")
else:
cherrypy.log("Successfully stopped worker thread.")
def received_message(
self,
message: Union[messaging.TextMessage, messaging.BinaryMessage],
):
if message.is_binary:
return # Not supported
# The JSON payloads from index.html are string indexed dictionaries.
j: dict[str, Any] = json.loads(message.data)
if "log" in j:
cherrypy.log(j["log"])
if "finger" in j:
self.finger_request(j)
if "config" in j:
self.config_request(j)
def make_dirs(self, path: str):
if not os.path.exists(path):
os.makedirs(path)
def save_to_file(self, data: bytes, file_path: pathlib.Path):
"""Save data bytes to file at file_path.
If GPG is enabled, the .gpg suffix is added to file_path.
"""
if self.gpg:
file_path = file_path.with_suffix(file_path.suffix + ".gpg")
enc = self.gpg.encrypt(data, self.gpg_recipients)
data = enc.data
cherrypy.log(f"Saving file '{file_path}' size {len(data)}")
if not data:
cherrypy.log(
"Error - Attempted to save empty file", severity=logging.ERROR
)
return
file_path.write_bytes(data)
def ectool(self, command: str, *args: str) -> bytes:
"""Run the ectool command and return its stdout as bytes."""
cmdline = [ECTOOL, "--name=cros_fp", command] + list(args)
while not self.abort_request:
status = subprocess.run(cmdline, check=False, capture_output=True)
if status.returncode == 0:
return status.stdout
else:
cherrypy.log(
f"command '{status.args}' failed with {status.returncode}"
)
return b""
def ectool_fpmode(self, *args: str) -> int:
mode = self.ectool("fpmode", *args).decode("utf-8")
match_mode = self.FP_MODE_RE.search(mode)
return int(match_mode.group(1), 16) if match_mode else -1
def finger_wait_done(self, mode: int) -> bool:
"""Wait for the bits in mode to be cleared from fpmode.
Args:
mode: An fpmode that we will wait to be cleared.
Returns:
False only when the request has been aborted. True, otherwise.
"""
# Poll until the mode bit has disappeared.
while not self.abort_request and self.ectool_fpmode() & mode:
time.sleep(0.050)
return not self.abort_request
def capture(
self, capture_mode: Literal["vendor", "pattern0", "pattern1"] = "vendor"
) -> tuple[Optional[bytes], Optional[str]]:
"""Capture one sample from the FPMCU.
This function will start the capture, wait for the capture to complete,
fetch + return the capture bytes.
Args:
capture_mode: "vendor", "pattern0", or "pattern1"
Returns:
The tuple of capture bytes and error result string.
If the capture bytes are None, then an error occurred. Iin this case
the optional error result string should be optionally passed to the
client page.
"""
# Capture the finger image when the finger is on the sensor.
self.ectool_fpmode("capture", capture_mode)
# Wait for the image to become available.
if not self.finger_wait_done(FP_MODE_CAPTURE):
cherrypy.log("Finger wait aborted.")
return (None, None)
# Download the image.
if capture_mode == "vendor":
img = self.ectool("fpframe", "raw")
else:
# The pattern0/1s are intended to be captured using "simple" mode
# and will automatically be converted to PNM format.
img = self.ectool("fpframe")
if not img:
msg = f"Failed to download fpframe for {capture_mode} capture"
cherrypy.log(msg)
return (None, f"{msg}. Call operator.")
return (img, None)
def finger_save_image(self, req: dict[str, Any], img: bytes):
file_base = self.get_finger_capture_base_path(req)
raw_file = file_base.with_suffix(".raw")
fmi_file = file_base.with_suffix(".fmi")
self.save_to_file(img, raw_file)
if self.export_fmi:
if self.fpcutils:
rc, fmi = self.fpcutils.image_data_to_fmi(img)
if rc == 0:
self.save_to_file(fmi, fmi_file)
else:
cherrypy.log(f"FMI conversion failed {rc}")
else:
cherrypy.log("FPC utils are unavailable")
def finger_setup(self, req: dict[str, Any]) -> Optional[str]:
"""Run once before capturing each finger.
The user waits for this function to complete before capturing starts.
The expectation is that the user does not have a finger on the sensor
for this setup routine.
This routine does the following:
1. Capture a pattern0 and pattern1 sample and save it.
2. Query the FPMCU running version and save it.
Recording the version this frequently is probably not necessary,
but it will ensure that we catch accidental version mismatches and
changes across different test devices and different days of
capturing.
Args:
req: The finger capture request from the client page.
Returns:
The optional result string to send to the client page.
None when no result should be sent to the client page.
"""
finger_dir = self.get_finger_capture_dir_path(req)
t0 = time.time()
cherrypy.log(f"Capturing pattern0 for finger {req['finger']:02d}")
img_pattern0, result = self.capture("pattern0")
t1 = time.time()
cherrypy.log(f"Captured pattern0 in {t1 - t0:.2f}s")
if not img_pattern0:
return result
self.save_to_file(img_pattern0, finger_dir / "pattern0.pnm")
t0 = time.time()
# The pattern1 capture is needed for Elan 80SG to do off-chip
# evaluation.
cherrypy.log(f"Capturing pattern1 for finger {req['finger']:02d}")
img_pattern1, result = self.capture("pattern1")
t1 = time.time()
cherrypy.log(f"Captured pattern1 in {t1 - t0:.2f}s")
if not img_pattern1:
return result
self.save_to_file(img_pattern1, finger_dir / "pattern1.pnm")
t0 = time.time()
cherrypy.log(
f"Recording FPMCU FW version for finger {req['finger']:02d}"
)
version_string = self.ectool("version")
t1 = time.time()
self.save_to_file(version_string, finger_dir / "fpmcu_version.txt")
cherrypy.log(f"Recorded FPMCU FW version in {t1 - t0:.2f}s")
return "ok"
def finger_process(self, req: dict[str, Any]) -> Optional[str]:
"""Capture and save fingerprint samples.
Args:
req: The finger capture request from the client page.
Returns:
The optional result string to send to the client page.
None when no result should be sent to the client page.
"""
# Ensure the user has removed the finger between 2 captures or
# before the calibration capture occurs.
# If the finger is already up, this mode will immediately be cleared.
# Also, as a side effect, Elan's matcher will force a resample of
# pattern1 upon fingerup, if the sample taken on boot contained a
# fingerprint.
self.ectool_fpmode("fingerup")
if not self.finger_wait_done(FP_MODE_FINGER_UP):
# Aborted. Don't send a result to client.
return None
if req["action"] == "setup":
return self.finger_setup(req)
if req["action"] != "sample":
raise Exception(
f"Received unexpected action '{req['action']}' from UI"
)
t0 = time.time()
# Capture the finger image when the finger is on the sensor.
img, result = self.capture("vendor")
if not img:
return result
t1 = time.time()
# Record the outcome of the capture.
cherrypy.log(
f'Captured finger {req["finger"]:02d}:{req["picture"]:02d}'
f" in {t1 - t0:.2f}s"
)
self.finger_save_image(req, img)
return "ok"
def finger_worker(self):
def send_result(result_msg: str = "ok"):
"""Send the result of the capture back to the user page.
If the result is anything other than "ok", the page will show
an error message box with the result message and the capture will
be retried.
This result field seems to be designed to accommodate in study
feedback about the capture. The options were enumerated in the
"errors" string list at the top of this python script.
We will use it to surface general errors that should halt capturing,
too.
"""
self.current_req["result"] = result_msg
# Tell the page about the acquisition result.
self.send(json.dumps(self.current_req), False)
while not self.server_terminated and not self.client_terminated:
self.available_req.acquire()
while not self.current_req and not self.abort_request:
self.available_req.wait()
result = self.finger_process(self.current_req)
if result:
send_result(result)
self.current_req = None
self.available_req.release()
def finger_request(self, req: dict[str, Any]):
# Ask the thread to exit the waiting loops
# it will wait on the acquire() below if needed.
self.abort_request = True
# Ask the thread to process the new request.
self.available_req.acquire()
self.abort_request = False
self.current_req = req
self.available_req.notify()
self.available_req.release()
def config_request(self, req: dict[str, Any]):
# Populate the configuration.
req["config"] = self.config
self.send(json.dumps(req), False)
class Root:
"""Serve the static HTML/CSS and connect the websocket."""
def __init__(self, cmdline_args: argparse.Namespace):
self.args = cmdline_args
@cherrypy.expose
def index(self):
index_file = os.path.join(SCRIPT_DIR, "html/index.html")
with open(index_file, encoding="utf-8") as f:
return f.read()
@cherrypy.expose
def finger(self):
cherrypy.request.ws_handler.set_config(self.args)
def environment_parameters(default_params: dict[str, Any]) -> dict[str, Any]:
"""Return |default_params| after overriding with environment vars.
Given a dictionary of default runtime parameters, return the same
dictionary with parameters overridden by their equivalent environment
variable.
A corresponding environment variable is the uppercase equivalent of the
parameter name, with all '-' replaced with '_'.
For examples, parameter "log-dir" corresponds to environment variable
"LOG_DIR".
"""
def parse_bool(bool_str: str) -> bool:
if bool_str.lower() in {"true", "yes"}:
return True
elif bool_str.lower() in {"false", "no"}:
return False
else:
raise ValueError(f"String '{bool_str}' can't be parsed to bool.")
env_params = default_params.copy()
for param in default_params:
env_var = param.upper().replace("-", "_")
arg_type = type(default_params[param])
value = os.environ.get(env_var)
if value is not None:
try:
if arg_type is bool:
value = parse_bool(value)
elif arg_type is type(None):
raise Exception("Cannot handle type None in default list.")
else:
value = arg_type(value)
except ValueError:
raise ValueError(env_var)
env_params[param] = value
return env_params
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser()
# Read environment variables as the arg default values.
try:
env_default = environment_parameters(DEFAULT_ARGS)
except ValueError as e:
parser.error(f"failed to parse {e}")
# Get study parameters from the command-line.
parser.add_argument(
"-f",
"--finger-count",
type=int,
default=env_default["finger-count"],
help="Number of fingers acquired per user",
)
parser.add_argument(
"-e",
"--enrollment-count",
type=int,
default=env_default["enrollment-count"],
help="Number of enrollment images per finger",
)
parser.add_argument(
"-v",
"--verification-count",
type=int,
default=env_default["verification-count"],
help="Number of verification images per finger",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=env_default["port"],
help="The port for the webserver",
)
parser.add_argument(
"-d",
"--picture-dir",
default=env_default["picture-dir"],
help="Directory for the fingerprint captures",
)
parser.add_argument(
"-l",
"--log-dir",
default=env_default["log-dir"],
help="Log files directory",
)
parser.add_argument(
"-s",
"--syslog",
action="store_true",
default=env_default["syslog"],
help="Log to syslog",
)
parser.add_argument(
"-k",
"--gpg-keyring",
type=str,
default=env_default["gpg-keyring"],
help="Path to the GPG keyring",
)
parser.add_argument(
"-r",
"--gpg-recipients",
type=str,
default=env_default["gpg-recipients"],
help="User IDs of GPG recipients separated by space",
)
parser.add_argument(
"-fmi",
"--export-fmi",
action="store_true",
default=env_default["export-fmi"],
help="Enable export of FMI converted capture.",
)
args = parser.parse_args(argv)
# GPG can only be used when gpg-keyring and gpg-recipient are specified.
if args.gpg_keyring and not args.gpg_recipients:
parser.error("gpg-recipients must be specified with gpg-keyring")
if args.gpg_recipients and not args.gpg_keyring:
parser.error("gpg-keyring must be specified with gpg-recipients")
if args.gpg_keyring and not os.access(args.gpg_keyring, os.R_OK):
parser.error(f"cannot read gpg-keyring file {args.gpg_keyring}")
# Configure cherrypy server.
cherrypy.config.update({"server.socket_port": args.port})
# Configure logging.
cherrypy.config.update({"log.screen": False})
loggers = []
l = logging.getLogger("cherrypy.access")
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger("cherrypy.error")
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger("gnupg")
l.setLevel(logging.INFO)
loggers.append(l)
if args.log_dir:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_name = f"server-{timestamp}.log"
h = logging.handlers.RotatingFileHandler(
filename=os.path.join(args.log_dir, log_name)
)
for l in loggers:
l.addHandler(h)
if args.syslog:
h = logging.handlers.SysLogHandler(
address="/dev/log",
facility=logging.handlers.SysLogHandler.LOG_LOCAL1,
)
for l in loggers:
l.addHandler(h)
if not args.log_dir and not args.syslog:
h = logging.StreamHandler()
for l in loggers:
l.addHandler(h)
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
cherrypy.quickstart(
Root(args),
"/",
config={
"/finger": {
"tools.websocket.on": True,
"tools.websocket.handler_cls": FingerWebSocket,
},
"/static": {
"tools.staticdir.on": True,
"tools.staticdir.dir": HTML_DIR,
},
},
)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))