blob: e2af41c62d3a137f4f76933218fecc9f31a254f5 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
from enum import Enum
import pathlib
from typing import Optional
from collection import Collection
import fpsutils
import numpy as np
import numpy.typing as npt
import pandas as pd
class Experiment:
"""Represents a fingerprint study experiment that is being analyzed."""
class Finger(Enum):
Thumb_Left = 0
Thumb_Right = 1
Index_Left = 2
Index_Right = 3
Middle_Left = 4
Middle_Right = 5
class UserGroup(Enum):
A = 0
B = 1
C = 2
D = 3
E = 4
F = 5
class Decision(Enum):
Accept = "ACCEPT"
Reject = "REJECT"
class TableCol(Enum):
"""All known table column names used across different table types."""
Enroll_User = "EnrollUser"
Enroll_Finger = "EnrollFinger"
Enroll_Group = "EnrollGroup"
Verify_User = "VerifyUser"
Verify_Finger = "VerifyFinger"
Verify_Sample = "VerifySample"
Verify_Group = "VerifyGroup"
Decision = "Decision"
User = "User"
Group = "Group"
@classmethod
def all(cls) -> list[Enum]:
return list(level for level in cls)
@classmethod
def all_values(cls) -> list[str]:
return list(level.value for level in cls)
FALSE_TABLE_COLS = [
TableCol.Enroll_User.value,
TableCol.Enroll_Finger.value,
TableCol.Verify_User.value,
TableCol.Verify_Finger.value,
TableCol.Verify_Sample.value,
]
DECISION_TABLE_COLS = [
TableCol.Enroll_User.value,
TableCol.Enroll_Finger.value,
TableCol.Verify_User.value,
TableCol.Verify_Finger.value,
TableCol.Verify_Sample.value,
TableCol.Decision.value,
]
DECISION_TABLE_GROUP_COLS = [
TableCol.Enroll_Group.value,
TableCol.Verify_Group.value,
]
USER_GROUP_TABLE_COLS = [
TableCol.User.value,
TableCol.Group.value,
]
"""Column names used in a user_group mapping table."""
@staticmethod
def _false_table_query(
false_table: pd.DataFrame,
enroll_user_id: Optional[int] = None,
enroll_finger_id: Optional[int] = None,
verify_user_id: Optional[int] = None,
verify_finger_id: Optional[int] = None,
verify_sample_index: Optional[int] = None,
) -> pd.DataFrame:
query_parts: list[str] = []
for arg, col in [
(enroll_user_id, Experiment.TableCol.Enroll_User),
(enroll_finger_id, Experiment.TableCol.Enroll_Finger),
(verify_user_id, Experiment.TableCol.Verify_User),
(verify_finger_id, Experiment.TableCol.Verify_Finger),
(verify_sample_index, Experiment.TableCol.Verify_Sample),
]:
if arg:
query_parts.append(f"({col.value} == {arg})")
query_str = " & ".join(query_parts)
return false_table.query(query_str) if query_str else false_table
@staticmethod
def _false_table_query2(
false_table: pd.DataFrame,
enroll_user_id: Optional[int] = None,
enroll_finger_id: Optional[int] = None,
verify_user_id: Optional[int] = None,
verify_finger_id: Optional[int] = None,
verify_sample_index: Optional[int] = None,
) -> pd.DataFrame:
"""A faster version of `_false_table_query`.
See the unit test benchmarks.
"""
query_cols: list[str] = []
query_vals = ()
for arg, col in [
(enroll_user_id, Experiment.TableCol.Enroll_User),
(enroll_finger_id, Experiment.TableCol.Enroll_Finger),
(verify_user_id, Experiment.TableCol.Verify_User),
(verify_finger_id, Experiment.TableCol.Verify_Finger),
(verify_sample_index, Experiment.TableCol.Verify_Sample),
]:
if arg:
query_cols.append(col.value)
query_vals += (arg,)
if query_cols:
res = false_table[query_cols] == query_vals
return false_table.loc[res.all(axis=1)]
return false_table
def __init__(
self,
# num_enrollment: int,
num_verification: int,
num_fingers: int,
num_users: int,
far_decisions: Optional[pd.DataFrame] = None,
frr_decisions: Optional[pd.DataFrame] = None,
fa_list: Optional[pd.DataFrame] = None,
):
"""Initialize a new experiment."""
# self.num_enrollment = num_enrollment
self.num_verification = num_verification
self.num_fingers = num_fingers
self.num_users = num_users
self._tbl_far_decisions = far_decisions
self._tbl_frr_decisions = frr_decisions
self._tbl_fa_list = fa_list
self._tbl_fr_list = None
self._tbl_user_groups = None
def Describe(self):
print("Users:", self.num_users)
print("Fingers:", self.num_fingers)
print("Verification Samples:", self.num_verification)
print(self.far_decisions().describe())
print(self.fa_table().describe())
def fa_table(self) -> pd.DataFrame:
"""Return an FAR table that only contains the False Acceptances."""
if self._tbl_fa_list is None:
far = self.far_decisions()
fa_list = far.loc[
far[Experiment.TableCol.Decision.value]
== Experiment.Decision.Accept.value
].copy(deep=True)
fa_list.drop(
[Experiment.TableCol.Decision.value], axis=1, inplace=True
)
fa_list.reset_index(drop=True, inplace=True)
self._tbl_fa_list = fa_list
return self._tbl_fa_list
def fr_table(self) -> pd.DataFrame:
"""Return an FRR table that only contains the False Rejections."""
if self._tbl_fr_list is None:
frr = self.frr_decisions()
fr_list = frr.loc[
frr[Experiment.TableCol.Decision.value]
== Experiment.Decision.Reject.value
].copy(deep=True)
fr_list.drop(
[Experiment.TableCol.Decision.value], axis=1, inplace=True
)
fr_list.reset_index(drop=True, inplace=True)
self._tbl_fr_list = fr_list
return self._tbl_fr_list
def has_far_decisions(self) -> bool:
"""Return True if an FAR decision table has been set."""
return self._tbl_far_decisions is not None
def has_frr_decisions(self) -> bool:
"""Return True if an FRR decision table has been set."""
return self._tbl_frr_decisions is not None
def has_user_groups(self) -> bool:
"""Return True if user group mapping table is available."""
return self.user_groups_table() is not None
def user_groups_table(self) -> Optional[pd.DataFrame]:
"""Return the user group mapping table.
This function will search for user-group mapping information in the
following order:
1. Preexisting user groups added through the add_groups function
2. Group mappings in FRR decisions table
3. Group mappings in FAR decisions table
If no group information is found in any of these, None is returned.
Considering we do not know whether all user/groups are represented in
the enrollment or verification set, we scan both. However, we do not
scan both FAR and FRR decision tables.
"""
def exists_with_groups(table: Optional[pd.DataFrame]) -> bool:
"""Return True if `table` exists and contains group cols."""
if table is None:
return False
return fpsutils.has_columns(table, self.DECISION_TABLE_GROUP_COLS)
if self._tbl_user_groups is None:
if exists_with_groups(self._tbl_frr_decisions):
tbl = self._tbl_frr_decisions
elif exists_with_groups(self._tbl_far_decisions):
tbl = self._tbl_far_decisions
else:
return None
assert tbl is not None, "exists_with_groups allowed None table"
enroll_groups = tbl[
[
Experiment.TableCol.Enroll_User.value,
Experiment.TableCol.Enroll_Group.value,
]
].copy(deep=True)
verify_groups = tbl[
[
Experiment.TableCol.Verify_User.value,
Experiment.TableCol.Verify_Group.value,
]
].copy(deep=True)
# Rename columns.
enroll_groups.columns = [
Experiment.TableCol.User.value,
Experiment.TableCol.Group.value,
]
verify_groups.columns = [
Experiment.TableCol.User.value,
Experiment.TableCol.Group.value,
]
user_groups = pd.concat([enroll_groups, verify_groups])
user_groups.drop_duplicates(inplace=True)
user_groups.sort_values(
Experiment.TableCol.User.value, inplace=True
)
user_groups.reset_index(inplace=True, drop=True)
self._tbl_user_groups = user_groups
return self._tbl_user_groups
def user_groups_table_to_csv(self, csv_file_path: pathlib.Path) -> None:
"""Write out the user group mapping table to a CSV file."""
user_groups = self.user_groups_table()
if user_groups is None:
raise ValueError("No user group information found")
user_groups.to_csv(csv_file_path, index=False)
def fa_trials_count(self) -> int:
"""Return the total number of false accept cross matches."""
return self.far_decisions().shape[0]
def fa_count(self) -> int:
"""Return the number of False Acceptances."""
return self.fa_table().shape[0]
def fr_trials_count(self) -> int:
"""Return the total number of false reject cross matches."""
return self.frr_decisions().shape[0]
def fr_count(self) -> int:
"""Return the number of False Rejections."""
return self.fr_table().shape[0]
def far_decisions(self) -> pd.DataFrame:
return self._tbl_far_decisions
def far_decisions_to_csv(
self, csv_file_path: pathlib.Path, exclude_groups: bool = True
) -> None:
"""Write out the FAR decisions to a CSV file.
By default, any added groups will be stripped from the output decisions.
When `exclude_groups` is False, any groups added will be saved within
the the output decisions CSV.
"""
_write_decision_file(
self.far_decisions(), csv_file_path, exclude_groups
)
def frr_decisions(self) -> pd.DataFrame:
return self._tbl_frr_decisions
def frr_decisions_to_csv(
self, csv_file_path: pathlib.Path, exclude_groups: bool = True
) -> None:
"""Write out the FRR decisions to a CSV file.
By default, any added groups will be stripped from the output decisions.
When `exclude_groups` is False, any groups added will be saved within
the the output decisions CSV.
"""
_write_decision_file(
self.frr_decisions(), csv_file_path, exclude_groups
)
def unique_list(self, column: TableCol) -> npt.NDArray:
"""Return a unique and sorted list of items for the given column.
These items are discovered from the FAR decision table.
The returned array is indexable from 0, since they are in a numpy array.
"""
return np.sort(self._tbl_far_decisions[column.value].unique())
def user_list(self) -> npt.NDArray:
"""Return a unique set of sorted User IDs from the verification set.
These are indexable from 0, since they are in a numpy array.
"""
return self.unique_list(Experiment.TableCol.Verify_User)
def finger_list(self) -> npt.NDArray:
"""Return a unique set of sorted Finger IDs from the verification sets.
These are indexable from 0, since they are in a numpy array.
"""
return self.unique_list(Experiment.TableCol.Verify_Finger)
def sample_list(self) -> npt.NDArray:
"""Return a unique set of sorted Sample IDs from the verification sets.
These are indexable from 0, since they are in a numpy array.
"""
return self.unique_list(Experiment.TableCol.Verify_Sample)
def group_list(self) -> npt.NDArray:
"""Return a unique set of sorted Group IDs from the verification sets.
Recall that these groups might have been added by this class itself.
These are indexable from 0, since they are in a numpy array.
"""
return self.unique_list(Experiment.TableCol.Verify_Group)
def fa_counts_by(self, column: TableCol) -> pd.Series:
"""Return a Series of False Accept counts based on `column`.
The series index will be `column` values and the data values will be the
counts. All column values will be represented from `unique_list(column)`.
This function is faster than doing the counts directly from the entire
FAR decision table.
"""
# Ultimately, we want to generate a histogram over the entire FAR/FRR
# decision dataset, but doing so directly with the large DataFrame
# is much too slow.
#
# The fastest method is by reverse constructing the complete
# counts table by using the pre-aggregated fa_table.
# This runs in about 66ms, which is primarily the time to run
# `self.unique_list(column)`.
# This method could be sped up by caching the `unique_list` function.
#
# A similar method might be using the following, which runs
# in about 300ms:
# far[[column, 'Decision']].groupby([column]).sum()
fa_table = self.fa_table()
unique_indices = self.unique_list(column)
non_zero_counts = fa_table[column.value].value_counts()
# We will now backfill the 0 counts for indices that aren't represented
# in the simplified false list. Providing a sorted index list means
# that the output series will have sorted indices.
return non_zero_counts.reindex(index=unique_indices, fill_value=0)
def fr_counts_by(self, column: TableCol) -> pd.Series:
"""Return a Series of False Reject counts based on `column`.
The series index will be `column` values and the data values will be the
counts. All column values will be represented from `unique_list(column)`.
This function is faster than doing the counts directly from the entire
FRR decision table.
"""
# Ultimately, we want to generate a histogram over the entire FAR/FRR
# decision dataset, but doing so directly with the large DataFrame
# is much too slow.
#
# The fastest method is by reverse constructing the complete
# counts table by using the pre-aggregated fa_table.
# This runs in about 66ms, which is primarily the time to run
# `self.unique_list(column)`.
# This method could be sped up by caching the `unique_list` function.
#
# A similar method might be using the following, which runs
# in about 300ms:
# frr[[column, 'Decision']].groupby([column]).sum()
fr_table = self.fr_table()
unique_indices = self.unique_list(column)
non_zero_counts = fr_table[column.value].value_counts()
# We will now backfill the 0 counts for indices that aren't represented
# in the simplified false list. Providing a sorted index list means
# that the output series will have sorted indices.
return non_zero_counts.reindex(index=unique_indices, fill_value=0)
def fa_query(
self,
enroll_user_id: Optional[int] = None,
enroll_finger_id: Optional[int] = None,
verify_user_id: Optional[int] = None,
verify_finger_id: Optional[int] = None,
verify_sample_index: Optional[int] = None,
) -> pd.DataFrame:
return Experiment._false_table_query(
false_table=self.fa_table(),
enroll_user_id=enroll_user_id,
enroll_finger_id=enroll_finger_id,
verify_user_id=verify_user_id,
verify_finger_id=verify_finger_id,
verify_sample_index=verify_sample_index,
)
def fa_query2(
self,
enroll_user_id: Optional[int] = None,
enroll_finger_id: Optional[int] = None,
verify_user_id: Optional[int] = None,
verify_finger_id: Optional[int] = None,
verify_sample_index: Optional[int] = None,
) -> pd.DataFrame:
return Experiment._false_table_query2(
false_table=self.fa_table(),
enroll_user_id=enroll_user_id,
enroll_finger_id=enroll_finger_id,
verify_user_id=verify_user_id,
verify_finger_id=verify_finger_id,
verify_sample_index=verify_sample_index,
)
def add_far_decisions(self, table: pd.DataFrame):
"""Add an FAR decision table to experiment."""
self._tbl_far_decisions = table
def add_far_decisions_from_csv(self, csv_file_path: pathlib.Path):
"""Read FAR decision file and add to experiment."""
self.add_far_decisions(_read_decision_file(csv_file_path))
def add_frr_decisions(self, table: pd.DataFrame):
"""Add an FRR decision table to experiment."""
self._tbl_frr_decisions = table
def add_frr_decisions_from_csv(self, csv_file_path: pathlib.Path):
"""Read FRR decision file and add to experiment."""
self.add_frr_decisions(_read_decision_file(csv_file_path))
def add_groups(self, user_groups: pd.DataFrame):
"""Add the appropriate group columns to all saved tables."""
self._tbl_user_groups = user_groups
if self.has_far_decisions():
self._tbl_far_decisions = _add_groups_to_table(
self._tbl_far_decisions, user_groups
)
if self.has_frr_decisions():
self._tbl_frr_decisions = _add_groups_to_table(
self._tbl_frr_decisions, user_groups
)
if not self._tbl_fa_list is None:
self._tbl_fa_list = _add_groups_to_table(
self._tbl_fa_list, user_groups
)
def add_groups_from_csv(
self,
csv_file_path: pathlib.Path = pathlib.Path("User_groups.csv"),
):
"""Add group information from a user group mapping CSV file."""
user_groups: pd.DataFrame = pd.read_csv(csv_file_path)
# Ensure that the required columns exist.
if not fpsutils.has_columns(
user_groups,
Experiment.USER_GROUP_TABLE_COLS,
):
raise ValueError(
f"CSV file {csv_file_path} doesn't contain columns"
f" {Experiment.USER_GROUP_TABLE_COLS}."
)
self.add_groups(user_groups)
def add_groups_from_collection_dir(self, collection_dir: pathlib.Path):
"""Add the appropriate group columns to all saved tables.
This group information is learned from the subdirectory structure
of the raw collection directory.
"""
collection = Collection(collection_dir)
user_groups = pd.DataFrame(
collection.discover_user_groups(),
columns=[
Experiment.TableCol.User.value,
Experiment.TableCol.Group.value,
],
)
user_groups.sort_values(Experiment.TableCol.User.value, inplace=True)
self.add_groups(user_groups)
def _add_groups_to_table(
tbl: pd.DataFrame, user_groups: pd.DataFrame
) -> pd.DataFrame:
"""Adds the appropriate group columns for any user columns in `tbl`.
This joins the `Group` columns from `users_groups` with any user columns
in `tbl`.
The `user_groups` table is expected to have a `User` and `Group` column.
"""
# Add Group column, if it already contains a User column.
if Experiment.TableCol.User.value in tbl.columns:
tbl = tbl.join(
user_groups.set_index(Experiment.TableCol.Verify_User.value),
on=Experiment.TableCol.Verify_User.value,
)
# Add Verify_Group column, if it already contains a Verify_User column.
if Experiment.TableCol.Verify_User.value in tbl.columns:
tbl = tbl.join(
user_groups.rename(
columns={
Experiment.TableCol.User.value: Experiment.TableCol.Verify_User.value,
Experiment.TableCol.Group.value: Experiment.TableCol.Verify_Group.value,
}
).set_index(Experiment.TableCol.Verify_User.value),
on=Experiment.TableCol.Verify_User.value,
)
# Add Enroll_Group column, if it already contains an Enroll_User column.
if Experiment.TableCol.Enroll_User.value in tbl.columns:
tbl = tbl.join(
user_groups.rename(
columns={
Experiment.TableCol.User.value: Experiment.TableCol.Enroll_User.value,
Experiment.TableCol.Group.value: Experiment.TableCol.Enroll_Group.value,
}
).set_index(Experiment.TableCol.Enroll_User.value),
on=Experiment.TableCol.Enroll_User.value,
)
return tbl
def _read_decision_file(csv_file_path: pathlib.Path) -> pd.DataFrame:
"""Read a CSV decisions file into a DataFrame with supported columns."""
table: pd.DataFrame = pd.read_csv(csv_file_path)
# Ensure that the required columns exist.
if not fpsutils.has_columns(table, Experiment.DECISION_TABLE_COLS):
raise ValueError(
f"CSV file {csv_file_path} doesn't contain columns"
f" {Experiment.DECISION_TABLE_COLS}."
)
return table
def _write_decision_file(
table: pd.DataFrame,
csv_file_path: pathlib.Path,
exclude_groups: bool = True,
) -> None:
"""Write a decisions table out as a CSV file.
Group are removed from the written out table, if `exclude_groups` is True.
"""
if exclude_groups:
table = table[Experiment.DECISION_TABLE_COLS]
# Setting index to False avoids the "index" / primary-key of the
# dataframe from being written out.
table.to_csv(csv_file_path, index=False)