blob: 6b011f5e0bc7181896d956797783d02ab8daa2de [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Standalone local webserver to acquire fingerprints for user studies."""
from __future__ import print_function
import argparse
from datetime import datetime
import json
import os
import re
import subprocess
import sys
import threading
import time
import cherrypy
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
# use the image conversion library if available
sys.path.extend(['/usr/local/opt/fpc', '/opt/fpc'])
try:
import fputils
except ImportError:
fputils = None
errors = [
# FP_SENSOR_LOW_IMAGE_QUALITY 1
'retrying...',
# FP_SENSOR_TOO_FAST 2
'keeping your finger still during capture',
# FP_SENSOR_LOW_SENSOR_COVERAGE 3
'centering your finger on the sensor',
]
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
HTML_DIR = os.path.join(SCRIPT_DIR, 'html')
ECTOOL = 'ectool'
# Wait to see a finger on the sensor
FP_MODE_FINGER_DOWN = 2
# Poll until the finger has left the sensor
FP_MODE_FINGER_UP = 4
# Capture the current finger image
FP_MODE_CAPTURE = 8
class FingerWebSocket(WebSocket):
"""Handle the websocket used finger images acquisition and logging."""
FP_MODE_RE = re.compile(r'^FP mode:\s*\(0x([0-9a-fA-F]+)\)', re.MULTILINE)
DIR_FORMAT = '{participant:04d}/{group:s}/{finger:02d}'
FILE_FORMAT = '{finger:02d}_{picture:02d}'
config = {}
pict_dir = '/tmp'
# FpUtils class to process images through the external library.
utils = None
# The worker thread processing the images.
worker = None
# The current request processed by the worker thread.
current_req = None
# The Condition variable the worker thread waits on to get a new request.
available_req = threading.Condition()
# Force terminating the current processing in the worker thread.
abort_request = False
def set_config(self, arg):
self.config = {
'fingerCount': arg.finger_count,
'enrollmentCount': arg.enrollment_count,
'verificationCount': arg.verification_count
}
self.pict_dir = arg.picture_dir
if fputils:
self.utils = fputils.FpUtils()
self.worker = threading.Thread(target=self.finger_worker)
self.worker.start()
def closed(self, code, reason=''):
self.abort_request = True
cherrypy.log('Websocket closed with code %d / %s' % (code, reason))
if not self.worker:
cherrypy.log("Worker thread wasn't running.")
return
cherrypy.log('Stopping worker thread.')
# Wake up the thread so it can exit.
self.available_req.acquire()
self.available_req.notify()
self.available_req.release()
self.worker.join(10.0)
if self.worker.isAlive():
cherrypy.log('Failed to stop worker thread.')
else:
cherrypy.log('Successfully stopped worker thread.')
def received_message(self, m):
if m.is_binary:
return # Not supported
j = json.loads(m.data)
if 'log' in j:
cherrypy.log(j['log'])
if 'finger' in j:
self.finger_request(j)
if 'config' in j:
self.config_request(j)
def make_dirs(self, path):
# Ensure the image directory exists
if not os.path.exists(path):
os.makedirs(path)
def ectool(self, command: str, *params) -> bytes:
"""Run the ectool command and return its stdout as bytes"""
cmdline = [ECTOOL, '--name=cros_fp', command] + list(params)
stdout = b''
while not self.abort_request:
try:
stdout = subprocess.check_output(cmdline)
break
except subprocess.CalledProcessError as e:
cherrypy.log("command '%s' failed with %d" %
(e.cmd, e.returncode))
stdout = b''
# try again
return stdout
def ectool_fpmode(self, *params) -> int:
mode = self.ectool('fpmode', *params).decode('utf-8')
match_mode = self.FP_MODE_RE.search(mode)
return int(match_mode.group(1), 16) if match_mode else -1
def finger_wait_done(self, mode):
# Poll until the mode bit has disappeared
while not self.abort_request and self.ectool_fpmode() & mode:
time.sleep(0.050)
return not self.abort_request
def finger_save_image(self, req):
directory = os.path.join(self.pict_dir, self.DIR_FORMAT.format(**req))
self.make_dirs(directory)
file_base = os.path.join(directory, self.FILE_FORMAT.format(**req))
raw_file = file_base + '.raw'
fmi_file = file_base + '.fmi'
img = self.ectool('fpframe', 'raw')
if not img:
return
cherrypy.log("Saving file '%s' size %d" % (raw_file, len(img)))
with open(raw_file, 'wb') as f:
f.write(img)
if self.utils:
rc, fmi = self.utils.image_data_to_fmi(img)
if rc == 0:
with open(fmi_file, 'wb') as f:
f.write(fmi)
else:
cherrypy.log('FMI conversion failed %d' % (rc))
def finger_process(self, req):
# Ensure the user has removed the finger between 2 captures
if not self.finger_wait_done(FP_MODE_FINGER_UP):
return
# Capture the finger image when the finger is on the sensor
self.ectool_fpmode('capture', 'vendor')
t0 = time.time()
# Wait for the image being available
if not self.finger_wait_done(FP_MODE_CAPTURE):
return
t1 = time.time()
# detect the finger removal before the next capture
self.ectool_fpmode('fingerup')
# record the outcome of the capture
cherrypy.log('Captured finger %02d:%02d in %.2fs' % (req['finger'],
req['picture'],
t1 - t0))
req['result'] = 'ok' # ODER req['result'] = errors[ERRNUM_TBD]
# retrieve the finger image
self.finger_save_image(req)
# tell the page about the acquisition result
self.send(json.dumps(req), False)
def finger_worker(self):
while not self.client_terminated:
self.available_req.acquire()
while not self.current_req and not self.abort_request:
self.available_req.wait()
self.finger_process(self.current_req)
self.current_req = None
self.available_req.release()
def finger_request(self, req):
# ask the thread to exit the waiting loops
# it will wait on the acquire() below if needed
self.abort_request = True
# ask the thread to process the new request
self.available_req.acquire()
self.abort_request = False
self.current_req = req
self.available_req.notify()
self.available_req.release()
def config_request(self, req):
# Populate configuration.
req['config'] = self.config
self.send(json.dumps(req), False)
class Root(object):
"""Serve the static HTML/CSS and connect the websocket."""
def __init__(self, cmdline_args):
self.args = cmdline_args
@cherrypy.expose
def index(self):
index_file = os.path.join(SCRIPT_DIR, 'html/index.html')
with open(index_file, encoding='utf-8') as f:
return f.read()
@cherrypy.expose
def finger(self):
cherrypy.request.ws_handler.set_config(self.args)
def main(argv: list):
# Get study parameters from the command-line
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--finger_count', type=int, default=2,
help='Number of fingers acquired per user')
parser.add_argument('-e', '--enrollment_count', type=int, default=20,
help='Number of enrollment images per finger')
parser.add_argument('-v', '--verification_count', type=int, default=15,
help='Number of verification images per finger')
parser.add_argument('-p', '--port', type=int, default=9000,
help='port for the webserver socket')
parser.add_argument('-d', '--picture_dir', default='./fingers',
help='Log files directory')
parser.add_argument('-l', '--log_dir')
args = parser.parse_args(argv)
# Configure cherrypy server
cherrypy.config.update({'server.socket_port': args.port})
if args.log_dir:
log_name = 'server-%s.log' % (datetime.now().strftime('%Y%m%d_%H%M%S'))
cherrypy.config.update({
'log.access_file': os.path.join(args.log_dir, 'access.log'),
'log.error_file': os.path.join(args.log_dir, log_name),
'log.screen': False})
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
cherrypy.quickstart(Root(args), '/', config={
'/finger': {'tools.websocket.on': True,
'tools.websocket.handler_cls': FingerWebSocket},
'/static': {'tools.staticdir.on': True,
'tools.staticdir.dir': HTML_DIR}})
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))