blob: 568eec5782ebaff8ccbeacf0b4ba394c2b3cc79f [file] [log] [blame] [edit]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Standalone local webserver to acquire fingerprints for user studies."""
from __future__ import print_function
import argparse
from datetime import datetime
from distutils import util
import json
import logging
import logging.handlers
import os
import re
import subprocess
import sys
import threading
import time
# The following imports will be available on the test image, but will usually
# be missing in the SDK.
# pylint: disable=import-error
import cherrypy
import gnupg
from ws4py.server.cherrypyserver import WebSocketPlugin
from ws4py.server.cherrypyserver import WebSocketTool
from ws4py.websocket import WebSocket
# Use the image conversion library if available.
sys.path.extend(["/usr/local/opt/fpc", "/opt/fpc"])
try:
import fputils
except ImportError:
fputils = None
DEFAULT_ARGS = {
"finger-count": 2,
"enrollment-count": 20,
"verification-count": 15,
"port": 9000,
"picture-dir": "./fingers",
"syslog": False,
"gpg-keyring": "",
"gpg-recipients": "",
"log-dir": "/var/log/fingerprints",
}
errors = [
# FP_SENSOR_LOW_IMAGE_QUALITY 1
"retrying...",
# FP_SENSOR_TOO_FAST 2
"keeping your finger still during capture",
# FP_SENSOR_LOW_SENSOR_COVERAGE 3
"centering your finger on the sensor",
]
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
HTML_DIR = os.path.join(SCRIPT_DIR, "html")
ECTOOL = "ectool"
# Wait to see a finger on the sensor.
FP_MODE_FINGER_DOWN = 2
# Poll until the finger has left the sensor.
FP_MODE_FINGER_UP = 4
# Capture the current finger image.
FP_MODE_CAPTURE = 8
class FingerWebSocket(WebSocket):
"""Handle the websocket used finger images acquisition and logging."""
FP_MODE_RE = re.compile(r"^FP mode:\s*\(0x([0-9a-fA-F]+)\)", re.MULTILINE)
DIR_FORMAT = "{participant:04d}/{group:s}/{finger:02d}"
FILE_FORMAT = "{finger:02d}_{picture:02d}"
config = {}
pict_dir = "/tmp"
# FpUtils class to process images through the external library.
utils = None
# The optional GNUGPG instance used for encryption.
gpg = None
gpg_recipients: list = None
# The worker thread processing the images.
worker = None
# The current request processed by the worker thread.
current_req = None
# The Condition variable the worker thread waits on to get a new request.
available_req = threading.Condition()
# Force terminating the current processing in the worker thread.
abort_request = False
def set_config(self, arg):
self.config = {
"fingerCount": arg.finger_count,
"enrollmentCount": arg.enrollment_count,
"verificationCount": arg.verification_count,
}
self.pict_dir = arg.picture_dir
if fputils:
self.utils = fputils.FpUtils()
if arg.gpg_keyring:
# The verbose flag prints a lot of info to console using print
# directly. We use the logging interface instead.
self.gpg = gnupg.GPG(
keyring=arg.gpg_keyring,
verbose=False,
options=[
"--no-options",
"--no-default-recipient",
"--trust-model",
"always",
],
)
self.gpg_recipients = arg.gpg_recipients.split()
if not self.gpg_recipients:
cherrypy.log(
"Error - GPG Recipients is Empty", severity=logging.FATAL
)
cherrypy.engine.exit()
return
cherrypy.log(f"GPG Recipients: {self.gpg_recipients}")
keyring_list = self.gpg.list_keys()
if not keyring_list:
cherrypy.log(
"Error - GPG Keyring is Empty", severity=logging.FATAL
)
cherrypy.engine.exit()
return
for k in keyring_list:
cherrypy.log(f'GPG Keyring Key {k["fingerprint"]}:')
for dk, dv in k.items():
if dv:
cherrypy.log(f"\t{dk}: {dv}")
# Check if recipients are in the keyring and perfectly
# match one to one. There could be a mismatch if a generic search
# specifier is used for the recipient that matches more than one
# key in the keyring.
for recipients in self.gpg_recipients:
keyring_list = self.gpg.list_keys(keys=recipients)
if not (keyring_list and len(keyring_list) == 1):
cherrypy.log(
"Error - GPG Recipients do not match specific keys.",
severity=logging.FATAL,
)
cherrypy.engine.exit()
return
self.worker = threading.Thread(target=self.finger_worker)
self.worker.start()
def closed(self, code, reason=""):
self.abort_request = True
cherrypy.log(f"Websocket closed with code {code} / {reason}")
if not self.worker:
cherrypy.log("Worker thread wasn't running.")
return
cherrypy.log("Stopping worker thread.")
# Wake up the thread so it can exit.
self.available_req.acquire()
self.available_req.notify()
self.available_req.release()
self.worker.join(10.0)
if self.worker.is_alive():
cherrypy.log("Failed to stop worker thread.")
else:
cherrypy.log("Successfully stopped worker thread.")
def received_message(self, m):
if m.is_binary:
return # Not supported
j = json.loads(m.data)
if "log" in j:
cherrypy.log(j["log"])
if "finger" in j:
self.finger_request(j)
if "config" in j:
self.config_request(j)
def make_dirs(self, path):
if not os.path.exists(path):
os.makedirs(path)
def save_to_file(self, data: bytes, file_path: str):
"""Save data bytes to file at file_path.
If GPG is enabled, the .gpg suffix is added to file_path.
"""
if self.gpg:
file_path += ".gpg"
enc = self.gpg.encrypt(data, self.gpg_recipients)
data = enc.data
cherrypy.log(f"Saving file '{file_path}' size {len(data)}")
if not data:
cherrypy.log(
"Error - Attempted to save empty file", severity=logging.ERROR
)
return
with open(file_path, "wb") as f:
f.write(data)
def ectool(self, command: str, *args) -> bytes:
"""Run the ectool command and return its stdout as bytes."""
cmdline = [ECTOOL, "--name=cros_fp", command] + list(args)
stdout = b""
while not self.abort_request:
try:
stdout = subprocess.check_output(cmdline)
break
except subprocess.CalledProcessError as e:
cherrypy.log(f"command '{e.cmd}' failed with {e.returncode}")
stdout = b""
return stdout
def ectool_fpmode(self, *args) -> int:
mode = self.ectool("fpmode", *args).decode("utf-8")
match_mode = self.FP_MODE_RE.search(mode)
return int(match_mode.group(1), 16) if match_mode else -1
def finger_wait_done(self, mode):
# Poll until the mode bit has disappeared.
while not self.abort_request and self.ectool_fpmode() & mode:
time.sleep(0.050)
return not self.abort_request
def finger_save_image(self, req):
directory = os.path.join(self.pict_dir, self.DIR_FORMAT.format(**req))
self.make_dirs(directory)
file_base = os.path.join(directory, self.FILE_FORMAT.format(**req))
raw_file = file_base + ".raw"
fmi_file = file_base + ".fmi"
img = self.ectool("fpframe", "raw")
if not img:
cherrypy.log("Failed to download fpframe")
return
self.save_to_file(img, raw_file)
if self.utils:
rc, fmi = self.utils.image_data_to_fmi(img)
if rc == 0:
self.save_to_file(fmi, fmi_file)
else:
cherrypy.log(f"FMI conversion failed {rc}")
def finger_process(self, req):
# Ensure the user has removed the finger between 2 captures.
if not self.finger_wait_done(FP_MODE_FINGER_UP):
return
# Capture the finger image when the finger is on the sensor.
self.ectool_fpmode("capture", "vendor")
t0 = time.time()
# Wait for the image being available.
if not self.finger_wait_done(FP_MODE_CAPTURE):
return
t1 = time.time()
# Detect the finger removal before the next capture.
self.ectool_fpmode("fingerup")
# Record the outcome of the capture.
cherrypy.log(
f'Captured finger {req["finger"]:02d}:{req["picture"]:02d}'
f" in {t1 - t0:.2f}s"
)
req["result"] = "ok" # ODER req['result'] = errors[ERRNUM_TBD]
# Retrieve the finger image.
self.finger_save_image(req)
# Tell the page about the acquisition result.
self.send(json.dumps(req), False)
def finger_worker(self):
while not self.server_terminated and not self.client_terminated:
self.available_req.acquire()
while not self.current_req and not self.abort_request:
self.available_req.wait()
self.finger_process(self.current_req)
self.current_req = None
self.available_req.release()
def finger_request(self, req):
# Ask the thread to exit the waiting loops
# it will wait on the acquire() below if needed.
self.abort_request = True
# Ask the thread to process the new request.
self.available_req.acquire()
self.abort_request = False
self.current_req = req
self.available_req.notify()
self.available_req.release()
def config_request(self, req):
# Populate the configuration.
req["config"] = self.config
self.send(json.dumps(req), False)
class Root(object):
"""Serve the static HTML/CSS and connect the websocket."""
def __init__(self, cmdline_args):
self.args = cmdline_args
@cherrypy.expose
def index(self):
index_file = os.path.join(SCRIPT_DIR, "html/index.html")
with open(index_file, encoding="utf-8") as f:
return f.read()
@cherrypy.expose
def finger(self):
cherrypy.request.ws_handler.set_config(self.args)
def environment_parameters(default_params: dict) -> dict:
"""Return |default_params| after overriding with environment vars.
Given a dictionary of default runtime parameters, return the same
dictionary with parameters overridden by their equivalent environment
variable.
A corresponding environment variable is the uppercase equivalent of the
parameter name, with all '-' replaced with '_'.
For examples, parameter "log-dir" corresponds to environment variable
"LOG_DIR".
"""
env_params = default_params.copy()
for param in default_params:
env_var = param.upper().replace("-", "_")
arg_type = type(default_params[param])
value = os.environ.get(env_var)
if value is not None:
try:
if arg_type is bool:
value = bool(util.strtobool(value))
elif arg_type is type(None):
raise Exception("Cannot handle type None in default list.")
else:
value = arg_type(value)
except ValueError:
raise ValueError(env_var)
env_params[param] = value
return env_params
def main(argv: list):
parser = argparse.ArgumentParser()
# Read environment variables as the arg default values.
try:
env_default = environment_parameters(DEFAULT_ARGS)
except ValueError as e:
parser.error(f"failed to parse {e}")
# Get study parameters from the command-line.
parser.add_argument(
"-f",
"--finger-count",
type=int,
default=env_default["finger-count"],
help="Number of fingers acquired per user",
)
parser.add_argument(
"-e",
"--enrollment-count",
type=int,
default=env_default["enrollment-count"],
help="Number of enrollment images per finger",
)
parser.add_argument(
"-v",
"--verification-count",
type=int,
default=env_default["verification-count"],
help="Number of verification images per finger",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=env_default["port"],
help="The port for the webserver",
)
parser.add_argument(
"-d",
"--picture-dir",
default=env_default["picture-dir"],
help="Directory for the fingerprint captures",
)
parser.add_argument(
"-l",
"--log-dir",
default=env_default["log-dir"],
help="Log files directory",
)
parser.add_argument(
"-s",
"--syslog",
action="store_true",
default=env_default["syslog"],
help="Log to syslog",
)
parser.add_argument(
"-k",
"--gpg-keyring",
type=str,
default=env_default["gpg-keyring"],
help="Path to the GPG keyring",
)
parser.add_argument(
"-r",
"--gpg-recipients",
type=str,
default=env_default["gpg-recipients"],
help="User IDs of GPG recipients separated by space",
)
args = parser.parse_args(argv)
# GPG can only be used when gpg-keyring and gpg-recipient are specified.
if args.gpg_keyring and not args.gpg_recipients:
parser.error("gpg-recipients must be specified with gpg-keyring")
if args.gpg_recipients and not args.gpg_keyring:
parser.error("gpg-keyring must be specified with gpg-recipients")
if args.gpg_keyring and not os.access(args.gpg_keyring, os.R_OK):
parser.error(f"cannot read gpg-keyring file {args.gpg_keyring}")
# Configure cherrypy server.
cherrypy.config.update({"server.socket_port": args.port})
# Configure logging.
cherrypy.config.update({"log.screen": False})
loggers = []
l = logging.getLogger("cherrypy.access")
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger("cherrypy.error")
l.setLevel(logging.DEBUG)
loggers.append(l)
l = logging.getLogger("gnupg")
l.setLevel(logging.INFO)
loggers.append(l)
if args.log_dir:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_name = f"server-{timestamp}.log"
h = logging.handlers.RotatingFileHandler(
filename=os.path.join(args.log_dir, log_name)
)
for l in loggers:
l.addHandler(h)
if args.syslog:
h = logging.handlers.SysLogHandler(
address="/dev/log",
facility=logging.handlers.SysLogHandler.LOG_LOCAL1,
)
for l in loggers:
l.addHandler(h)
if not args.log_dir and not args.syslog:
h = logging.StreamHandler()
for l in loggers:
l.addHandler(h)
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
cherrypy.quickstart(
Root(args),
"/",
config={
"/finger": {
"tools.websocket.on": True,
"tools.websocket.handler_cls": FingerWebSocket,
},
"/static": {
"tools.staticdir.on": True,
"tools.staticdir.dir": HTML_DIR,
},
},
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))