Source code for input_checker.checker

from tubular.base import BaseTransformer
from input_checker._version import __version__
from input_checker.exceptions import InputCheckerError

import numpy as np
import pandas as pd


[docs]class InputChecker(BaseTransformer): """Class to compare a dataframe against a benchmark The input checker class currently contains 5 different checks: 1. Null checker: ensures that columns with missing values in the benchmark dataframe are the only columns with missing values in the comparison dataframe 2. Dtype checker: ensures that columns in the comparison dataframe are of the same data type as in the benchmark dataframe 3. Categorical value checker: ensures that categorical columns in the comparison dataframe only contain values that exist in the benchmark dataframe 4. Numerical checker: ensures that the values of the numerical columns in the comparison dataframe lie within the minimum and maximum range of the numerical columns in the benchmark dataframe. 5. Datetime checker: ensures that the values of datetime columns in the comparison dataframe lie beyond the minimum date (optionally maximum) of datetime columns in the benchmark dataframe. Checks 1 and 2 are completed for all the columns that are defined under the 'columns' variable. If this attribute is not set, all of the columns in the dataframe passed to the fit method will be taken into account. The numerical and categorical checks may be skipped by setting the categorical_columns and numerical_column variables to None. There is alternatively an 'infer' option which automatically finds the columns that are of a categorical or numerical type among the list of columns defined/set in the 'columns' attribute. The class is fitted to the benchmark dataframe by calling the fit method which calls all the individual fit methods for individual checks. The input checker class object can then be saved, later to be loaded, and called to compare a dataframe against the benchmark dataframe. For comparison, the transform method will get called, which runs every check in the fitted input checker class against the benchmark dataframe and returns an exception message stating which checks have failed if any. Parameters ---------- columns : None, list or str The list of model input column names that the column name, null checker and data type checks are generated for. If None then all the columns in the (fitted) benchmark dataframe are included in the checks. If str of a column name then only that column is included in the check categorical_columns : list or 'infer' The list of model input column names containing categorical data that the categorical level checks are generated for. If the 'infer' option is defined instead, this list is inferred based on the column types of the benchmark dataframe (category, boolean or string) numerical_columns : list, 'infer' or dict The list of model input column names containing numerical data that the numerical range checks are generated for. If the 'infer' option is defined instead, this list is inferred based on the column types of the benchmark dataframe. If equal to a dict, then each key in the dictionary must be a column in the (fitted) benchmark dataframe, these must contain a 'maximum' and 'minimum' keys within them. These keys contain a boolean stating if a maximum and / or minimum value check is desired datetime_columns : list, 'infer' The list of model input column names containing datetime data that the datetime level checks are generated for. If the 'infer' option is defined instead, this list is inferred based on the column types of the (fitted) benchmark dataframe (datetime, object). skip_infer_columns : list The list of columns conttaining the names for dataframe columns that will have type and null checks applied to them but will not be included in the 'infer' calculation for the categorical and numerical columns check these should include id, datetime and text fields Attributes ---------- Aside from the class parameters, these attributes are generated when the class is fitted to a benchmark dataframe null_map: dict Dictionary contain the null map for the specified columns, keys are the column names and the values are a 1 if the column can contain nulls and 0 if the column is not allowed to contain any nulls expected_values: dict Dictionary contain the categorical map for the specified categorical columns, keys are the column names and the values are the various values that are allowed within each categorical column. Only generated if the categorical columns parameter is not set to None column_classes: dict Dictionary contain the data type map for the specified columns, keys are the column names and the values the column data types numerical_values: dict Dictionary contain the numerical map for the specified numerical columns, keys are the column names which themselves contain minimum and maximum allowables within each numerical column. Only generated if the numerical columns parameter is not set to None datetime_values: dict Dictionary contain the datetime map for the specified datetime columns, keys are the column names which themselves contain minimum and (optional)maximum allowables within each datetime column. Only generated if the datetime columns parameter is not set to None """ def __init__( self, columns=None, categorical_columns=None, numerical_columns=None, datetime_columns=None, skip_infer_columns=None, **kwds, ): super().__init__(columns=columns, **kwds) self.columns = columns self.categorical_columns = categorical_columns self.numerical_columns = numerical_columns self.datetime_columns = datetime_columns self.skip_infer_columns = skip_infer_columns # check that all the inputs are of the accepted formats self._check_type(self.columns, "input columns", [list, type(None), str]) self._check_type( self.categorical_columns, "categorical columns", [list, str, type(None)] ) if isinstance(self.categorical_columns, str): self._is_string_value( self.categorical_columns, "categorical columns", "infer" ) self._check_type( self.numerical_columns, "numerical columns", [list, dict, str, type(None)] ) if isinstance(self.numerical_columns, str): self._is_string_value(self.numerical_columns, "numerical columns", "infer") self._check_type( self.datetime_columns, "datetime columns", [list, dict, str, type(None)] ) if isinstance(self.datetime_columns, str): self._is_string_value(self.datetime_columns, "datetime columns", "infer") self._check_type( self.skip_infer_columns, "skip infer columns", [list, type(None)] ) # check if any of the inputs are empty self._is_empty("input columns", self.columns) self._is_empty("categorical columns", self.categorical_columns) self._is_empty("numerical columns", self.numerical_columns) self._is_empty("datetime columns", self.datetime_columns) # check if categorical/numerical/datetime/skip_infer columns are listed in columns (when all provided) if columns is not None: self._is_listed_in_columns() self.version_ = __version__ def _consolidate_inputs(self, X): """Method to run checks on class inputs and convert them to the same format, if needed Parameters ---------- X : pd.DataFrame The training input samples. """ # set key column values to an empy list if equal to None if self.skip_infer_columns is None: self.skip_infer_columns = [] else: self._is_subset("skip infer columns", self.skip_infer_columns, X) # if infer option is selected, generate list of categorical, numerical & datetime columns if self.categorical_columns == "infer": self.categorical_columns = [] for column in self.columns: col_type = X[column].dtypes.name if ( col_type in ["category", "object", "bool"] and column not in self.skip_infer_columns ): self.categorical_columns.append(column) if self.numerical_columns == "infer": self.numerical_dict = {} for column in self.columns: if ( (str(X[column].dtype).startswith("int")) or (str(X[column].dtype).startswith("float")) ) and (column not in self.skip_infer_columns): self.numerical_dict[column] = {} self.numerical_dict[column]["maximum"] = True self.numerical_dict[column]["minimum"] = True if self.datetime_columns == "infer": self.datetime_dict = {} for column in self.columns: if ( str(X[column].dtype).startswith("datetime") and column not in self.skip_infer_columns ): self.datetime_dict[column] = {} self.datetime_dict[column]["maximum"] = False self.datetime_dict[column]["minimum"] = True # check that columns are a subset of the dataframe columns self._is_subset("input columns", self.columns, X) if isinstance(self.categorical_columns, list): self._is_subset("categorical columns", self.categorical_columns, X) # for numerical check, also store value ranges in a dictionary if isinstance(self.numerical_columns, list): self._is_subset("numerical columns", self.numerical_columns, X) self.numerical_dict = {} for column in self.numerical_columns: self.numerical_dict[column] = {} self.numerical_dict[column]["maximum"] = True self.numerical_dict[column]["minimum"] = True # for datetime check, also store value ranges in a dictionary if isinstance(self.datetime_columns, list): self._is_subset("datetime columns", self.datetime_columns, X) self.datetime_dict = {} for column in self.datetime_columns: self.datetime_dict[column] = {} self.datetime_dict[column]["maximum"] = False self.datetime_dict[column]["minimum"] = True # if numerical_columns attribute is a dictionary, # then save values and check keys are subset of dataframe columns if isinstance(self.numerical_columns, dict): self._is_subset( "numerical dictionary keys", list(self.numerical_columns.keys()), X ) self.numerical_dict = self.numerical_columns if self.numerical_columns is not None: self.numerical_columns = list(self.numerical_dict.keys()) # if datetime_columns attribute is a dictionary, # then save values and check keys are subset of dataframe columns if isinstance(self.datetime_columns, dict): self._is_subset( "datetime dictionary keys", list(self.datetime_columns.keys()), X ) self.datetime_dict = self.datetime_columns if self.datetime_columns is not None: self.datetime_columns = list(self.datetime_dict.keys()) def _fit_type_checker(self, X): """Sets the expected dtypes based on the benchmark dataframe, X. Parameters ---------- X : pd.DataFrame Data to set expected dtypes from. """ self.column_classes = X[self.columns].dtypes.to_dict() def _fit_null_checker(self, X): """Sets a lookup to check whether a column can have missing values or not. Based on the data of the benchmark dataframe X, this method initialises and sets the null_map attribute which is a dictionary with column names as keys and binary values set to indicate if a given column can contain missing values. Parameters ---------- X : pd.DataFrame The training input samples. """ self.null_map = {} for col in self.columns: if X[col].isnull().values.any(): self.null_map[col] = 1 else: self.null_map[col] = 0 def _fit_value_checker(self, X): """Creates a dictionary to enable categorical value checks for the comparison dataframe. This method initialises and sets expected_values class attribute based on the categorical values in the benchmark dataframe, X. Parameters ---------- X : pd.DataFrame The training input samples. """ self.expected_values = {} for col in self.categorical_columns: self.expected_values[col] = X[col].unique().tolist() def _fit_numerical_checker(self, X): """Creates a dictionary to enable numerical value checks for the comparison dataframe. This method initialises and sets numerical_values class attribute based on the numerical values in the benchmark dataframe X. numerical_values is used to check that the values of the selected numerical variables of the comparison dataframe lie within a specified range based on the numerical values of the benchmark dataframe X. Parameters ---------- X : pd.DataFrame The training input samples. """ self.numerical_values = {} for col in self.numerical_dict: self.numerical_values[col] = {} if self.numerical_dict[col]["maximum"]: self.numerical_values[col]["maximum"] = X[col].max() else: self.numerical_values[col]["maximum"] = None if self.numerical_dict[col]["minimum"]: self.numerical_values[col]["minimum"] = X[col].min() else: self.numerical_values[col]["minimum"] = None def _fit_datetime_checker(self, X): """Creates a dictionary to enable datetime value checks for the comparison dataframe. This method initialises and sets datetime_values class attribute based on the datetime values in the benchmark dataframe X. datetime_values is used to check that the values of the datetime variables of the comparison dataframe lie within a specified range based on the datetime values of the benchmark dataframe X. Parameters ---------- X : pd.DataFrame The training input samples. """ self.datetime_values = {} for col in self.datetime_columns: self.datetime_values[col] = {} if self.datetime_dict[col]["maximum"]: self.datetime_values[col]["maximum"] = X[col].max() else: self.datetime_values[col]["maximum"] = None if self.datetime_dict[col]["minimum"]: self.datetime_values[col]["minimum"] = X[col].min() else: self.datetime_values[col]["minimum"] = None
[docs] def fit(self, X, y=None): """Checks that the class inputs are of the correct format and then fits the different input checker methods to the benchmark dataframe Parameters ---------- X : pd.DataFrame The training input samples. y : None y is not needed in this transformer, yet the sklearn pipeline API requires this parameter for checking. """ if y is not None: raise ValueError( f"{y} is passed to the fit method which is not required for the input_checker" ) super().fit(X, y) self._df_is_empty("input dataframe", X) self._consolidate_inputs(X) self._fit_type_checker(X) self._fit_null_checker(X) # only run the categorical, numerical & datetime checks if the columns have been specified if self.categorical_columns is not None: self._fit_value_checker(X) if self.numerical_columns is not None: self._fit_numerical_checker(X) if self.datetime_columns is not None: self._fit_datetime_checker(X) return self
def _transform_type_checker(self, X, batch_mode=False): """Checks if columns in the comparison dataframe X are of the expected dtypes based on the (fitted) benchmark dataframe . Parameters ---------- X : pd.DataFrame Input data to check column types. batch_mode: bool, default=False Flag indicating if transform is being run in batch mode Returns ------- type_checker_failed_checks : dict Dictionary containing the failed tests, empty if none failed """ self.check_is_fitted(["column_classes"]) # mapping for pandas dtype to Python dtypes type_mappings = { "object": "str", "int": "int", "float": "float", "bool": "bool", "datetime[ns]": "Timestamp", "category": "str", } type_checker_failed_checks = {} for col in self.columns: # skip column if all values in column are missing as the expected # type will be float, nulls will be checked by null check either way if X[col].isnull().all(): continue # compare types by row if operating in batch mode if batch_mode: # remove bytes part of type target_dtype_name = "".join( i for i in self.column_classes[col].name if not i.isdigit() ) # convert pandas dtype to python dtype target_dtype = type_mappings[target_dtype_name] current_dtype = X[col].apply(lambda x: type(x).__name__) # fix nulls dtype to target dtype before comparing actual to expected current_dtype[X[col].isnull()] = target_dtype if (current_dtype != target_dtype).any(): type_checker_failed_checks[col] = {} type_checker_failed_checks[col]["idxs"] = X[ current_dtype != target_dtype ].index.tolist() type_checker_failed_checks[col]["actual"] = current_dtype[ type_checker_failed_checks[col]["idxs"] ].to_dict() type_checker_failed_checks[col]["expected"] = target_dtype # otherwise compare overall pandas dtype else: target_dtype = self.column_classes[col] current_dtype = X[col].dtypes if target_dtype.name == "category": # checking object type == categorical type throws error same_level_check = target_dtype == current_dtype else: same_level_check = current_dtype == target_dtype if not same_level_check: type_checker_failed_checks[col] = {} type_checker_failed_checks[col]["actual"] = current_dtype type_checker_failed_checks[col]["expected"] = target_dtype return type_checker_failed_checks def _transform_null_checker(self, X): """Checks if columns with missing values in the comparison dataframe X are the only columns that also contain missing values in the (fitted) benchmark dataframe. Parameters ---------- X : pd.DataFrame Pandas dataframe containing columns to check null values. Returns ------- null_checker_failed_checks : dict Dictionary containing the failed tests, empty if none failed """ self.check_is_fitted(["null_map"]) null_checker_failed_checks = {} for col in self.columns: if self.null_map[col] == 0 and X[col].isnull().any(): null_checker_failed_checks[col] = X[X[col].isnull()].index.tolist() return null_checker_failed_checks def _transform_numerical_checker(self, X, type_fails={}, batch_mode=False): """Checks if values of numerical columns in the comparison dataframe X are inline with the benchmark dataframe. Please note that missing values are not checked as a part of this method, they are handled by the NullValueChecker. Parameters ---------- X : pd.DataFrame The input samples to check take expected values. type_fails : dict, default={} Output dictionary from transform_type_checker. batch_mode : bool, default=False Flag indicating if transform is being run in batch mode Returns ------- numerical_checker_failed_checks : dict Dictionary containing the failed tests, empty if none failed """ self.check_is_fitted(["numerical_values"]) numerical_checker_failed_checks = {} for col in self.numerical_columns: # remove rows which failed type checks X_filtered = X.copy() if col in type_fails.keys(): if batch_mode: # remove any rows where type is not float or int ids_to_drop = [ k for k, v in type_fails[col]["actual"].items() if v not in ("float", "int") ] X_filtered = X_filtered.drop(ids_to_drop, axis=0) # if not batch mode and column is not a numerical dtype, drop all rows elif not type_fails[col]["actual"].name.startswith( "float" ) and not type_fails[col]["actual"].name.startswith("int"): X_filtered = X_filtered.drop(X.index, axis=0) min_value = self.numerical_values[col]["minimum"] max_value = self.numerical_values[col]["maximum"] if max_value: if (X_filtered[col] > max_value).any(): above_list = X_filtered[col][X_filtered[col] > max_value].to_dict() above_idxs = X_filtered[col][ X_filtered[col] > max_value ].index.tolist() if col not in numerical_checker_failed_checks: numerical_checker_failed_checks[col] = {} numerical_checker_failed_checks[col]["max idxs"] = above_idxs numerical_checker_failed_checks[col]["maximum"] = above_list if min_value: if (X_filtered[col] < min_value).any(): if col not in numerical_checker_failed_checks: numerical_checker_failed_checks[col] = {} below_list = X_filtered[col][X_filtered[col] < min_value].to_dict() below_idxs = X_filtered[col][ X_filtered[col] < min_value ].index.tolist() numerical_checker_failed_checks[col]["minimum"] = below_list numerical_checker_failed_checks[col]["min idxs"] = below_idxs return numerical_checker_failed_checks def _transform_value_checker(self, X): """Checks if values of categorical columns in the comparison dataframe X are inline with the benchmark dataframe using expected_values attribute. Please note that missing values are not checked as a part of this method, they are handled by using the NullValueChecker. Parameters ---------- X : pd.DataFrame The input samples to check take expected values. Returns ------- value_checker_failed_checks : dict Dictionary containing the failed tests, empty if none failed """ self.check_is_fitted(["expected_values"]) value_checker_failed_checks = {} for col in self.categorical_columns: v = self.expected_values[col] if (~X.loc[(~X[col].isnull()), col].isin(v)).any(): unexpected_list_idx = X[ (~X[col].isnull()) & (~X[col].isin(v)) ].index.tolist() value_checker_failed_checks[col] = {} value_checker_failed_checks[col]["idxs"] = unexpected_list_idx value_checker_failed_checks[col]["values"] = ( X[(~X[col].isnull()) & (~X[col].isin(v))][col].unique().tolist() ) return value_checker_failed_checks def _transform_datetime_checker(self, X, type_fails={}, batch_mode=False): """Checks if values of datetime columns in the comparison dataframe X are inline with the benchmark dataframe using datetime_dict attribute. Please note that missing values are not checked as a part of this method, they are handled by using the NullValueChecker. Parameters ---------- X : pd.DataFrame The input samples to check take expected values. type_fails : dict Output dictionary from transform_type_checker. batch_mode : bool Flag if transform is being run in batch mode Returns ------- datetime_checker_failed_checks : dict Dictionary containing the failed tests, empty if none failed """ self.check_is_fitted(["datetime_values"]) datetime_checker_failed_checks = {} for col in self.datetime_columns: # remove rows which failed type checks X_filtered = X.copy() if col in type_fails.keys(): if batch_mode: # remove all rows where dtype was not Timestamp X_filtered = X_filtered.drop(type_fails[col]["idxs"], axis=0) # if not batch mode and column is not a datetime dtype, drop all rows elif not type_fails[col]["actual"].name.startswith("datetime"): X_filtered = X_filtered.drop(X.index, axis=0) min_value = self.datetime_values[col]["minimum"] max_value = self.datetime_values[col]["maximum"] if max_value: if (X_filtered[col] > max_value).any(): above_list = X_filtered[col][X_filtered[col] > max_value].to_dict() above_idxs = X_filtered[col][ X_filtered[col] > max_value ].index.tolist() if col not in datetime_checker_failed_checks: datetime_checker_failed_checks[col] = {} datetime_checker_failed_checks[col]["maximum"] = above_list datetime_checker_failed_checks[col]["max idxs"] = above_idxs if min_value: if (X_filtered[col] < min_value).any(): if col not in datetime_checker_failed_checks: datetime_checker_failed_checks[col] = {} below_list = X_filtered[col][X_filtered[col] < min_value].to_dict() below_idxs = X_filtered[col][ X_filtered[col] < min_value ].index.tolist() datetime_checker_failed_checks[col]["minimum"] = below_list datetime_checker_failed_checks[col]["min idxs"] = below_idxs return datetime_checker_failed_checks
[docs] def raise_exception_if_checks_fail( self, type_failed_checks, null_failed_checks, value_failed_checks, numerical_failed_checks, datetime_failed_checks, ): """Method to combine all tests results from input checker tests and raise an InputChecker exception if any one of the checks fails. Parameters ---------- type_failed_checks : dict Details of failed type checker tests, empty if no checks failed. null_failed_checks : dict Details of failed null checker tests, empty if no checks failed. value_failed_checks : dict Details of failed categorical checker tests, empty if no checks failed. numerical_failed_checks : dict Details of failed numerical checker tests, empty if no checks failed. datetime_failed_checks : dict Details of failed datetime checker tests, empty if no checks failed. """ null_exception = "" for col in null_failed_checks: null_exception = null_exception + f"Failed null check for column: {col}\n" type_exception = "" for col, fails in type_failed_checks.items(): type_exception = ( type_exception + f"Failed type check for column: {col}; Expected: {fails['expected']}, Found: {fails['actual']}\n" ) value_exception = "" for col, fails in value_failed_checks.items(): value_exception = ( value_exception + f"Failed categorical check for column: {col}; Unexpected values: {fails['values']}\n" ) numerical_exception = "" for col, fails in numerical_failed_checks.items(): if "maximum" in fails.keys(): numerical_exception = ( numerical_exception + f"Failed maximum value check for column: {col}; Values above maximum: {fails['maximum']}\n" ) if "minimum" in fails.keys(): numerical_exception = ( numerical_exception + f"Failed minimum value check for column: {col}; Values below minimum: {fails['minimum']}\n" ) datetime_exception = "" for col, fails in datetime_failed_checks.items(): if "maximum" in fails.keys(): datetime_exception = ( datetime_exception + f"Failed maximum value check for column: {col}; Values above maximum: {fails['maximum']}\n" ) if "minimum" in fails.keys(): datetime_exception = ( datetime_exception + f"Failed minimum value check for column: {col}; Values below minimum: {fails['minimum']}\n" ) exception_message = ( null_exception + type_exception + value_exception + numerical_exception + datetime_exception ) self.validation_failed_checks = {} self.validation_failed_checks["Failed type checks"] = type_failed_checks self.validation_failed_checks["Failed null checks"] = null_failed_checks self.validation_failed_checks["Failed categorical checks"] = value_failed_checks self.validation_failed_checks[ "Failed numerical checks" ] = numerical_failed_checks self.validation_failed_checks["Failed datetime checks"] = datetime_failed_checks self.validation_failed_checks["Exception message"] = exception_message if len(exception_message) > 0: raise InputCheckerError(exception_message)
[docs] def separate_passes_and_fails( self, type_failed_checks, null_failed_checks, value_failed_checks, numerical_failed_checks, datetime_failed_checks, X, ): """Method to combine all tests results from input checker tests and separate rows which pass checks (good_df) from rows which fail checks (bad_df). Failing rows will have an extra column added called 'failed_checks', which concatenates all the failing test information. Parameters ---------- type_failed_checks : dict Details of failed type checker tests, empty if no checks failed. null_failed_checks : dict Details of failed null checker tests, empty if no checks failed. value_failed_checks : dict Details of failed categorical checker tests, empty if no checks failed. numerical_failed_checks : dict Details of failed numerical checker tests, empty if no checks failed. datetime_failed_checks : dict Details of failed datetime checker tests, empty if no checks failed. Returns: -------- good_df, bad_df : tuple Dataframes containing rows which pass checks (good_df) and rows which fail checks (bad_df). """ good_df = X.copy(deep=True) bad_df = pd.DataFrame(columns=X.columns.values.tolist() + ["failed_checks"]) # add expected values check failures for col, fails in value_failed_checks.items(): # if any of the failing rows have previously failed checks, # update these with the new failure bad_df = self._update_bad_df( bad_df, fails["idxs"], f"Failed categorical check for column: {col}. Unexpected values are {fails['values']}", ) # separate failing rows from good_df and move to bad_df good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["idxs"], f"Failed categorical check for column: {col}. Unexpected values are {fails['values']}", ) # add numerical check failures for col, fails in numerical_failed_checks.items(): if "maximum" in fails.keys(): # check if some idxs have already been chosen bad_df = self._update_bad_df( bad_df, fails["max idxs"], f"Failed maximum value check for column: {col}; Value above maximum: ", error_info_by_row=fails["maximum"], ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["max idxs"], f"Failed maximum value check for column: {col}; Value above maximum: ", error_info_by_row=fails["maximum"], ) if "minimum" in fails.keys(): # check if some idxs have already been chosen bad_df = self._update_bad_df( bad_df, fails["min idxs"], f"Failed minimum value check for column: {col}; Value above minimum: ", error_info_by_row=fails["minimum"], ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["min idxs"], f"Failed minimum value check for column: {col}; Value below minimum: ", error_info_by_row=fails["minimum"], ) # add datetime check failures for col, fails in datetime_failed_checks.items(): if "maximum" in fails.keys(): for k, v in fails["maximum"].items(): fails["maximum"][k] = np.datetime_as_string( v.to_datetime64(), unit="D" ) # check if some idxs have already been chosen bad_df = self._update_bad_df( bad_df, fails["max idxs"], f"Failed maximum value check for column: {col}; Value above maximum: ", error_info_by_row=fails["maximum"], ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["max idxs"], f"Failed maximum value check for column: {col}; Value above maximum: ", error_info_by_row=fails["maximum"], ) if "minimum" in fails.keys(): for k, v in fails["minimum"].items(): fails["minimum"][k] = np.datetime_as_string( v.to_datetime64(), unit="D" ) # check if some idxs have already been chosen bad_df = self._update_bad_df( bad_df, fails["min idxs"], f"Failed minimum value check for column: {col}; Value below minimum: ", error_info_by_row=fails["minimum"], ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["min idxs"], f"Failed minimum value check for column: {col}; Value below minimum: ", error_info_by_row=fails["minimum"], ) # add null check failures for col, idxs in null_failed_checks.items(): bad_df = self._update_bad_df( bad_df, idxs, f"Failed null check for column: {col}" ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, idxs, f"Failed null check for column: {col}" ) # add type check failures for col, fails in type_failed_checks.items(): bad_df = self._update_bad_df( bad_df, fails["idxs"], f"Failed type check for column: {col}; Expected: {fails['expected']}, Found: ", fails["actual"], ) good_df, bad_df = self._update_good_bad_df( good_df, bad_df, fails["idxs"], f"Failed type check for column: {col}; Expected: {fails['expected']}, Found: ", fails["actual"], ) # indices in bad_df will be out of order, change to match order in original DF bad_df = bad_df.loc[[i for i in X.index if i in bad_df.index]] return good_df, bad_df
def _update_bad_df(self, bad_df, idxs, reason_failed, error_info_by_row=None): """Method to update 'failed_checks' field of rows with indices in idxs. The field is updated by contentating reason_failed. Parameters ---------- bad_df : pd.DataFrame The dataframe containing rows to update. idxs : list List of indices in bad_df to update. reason_failed : str String to concatenate to 'failed_checks' in bad_df. error_info_by_row: None or dict Additional error information for each record. Has actual value per row ID which is failing check Returns ------- bad_df: pd.DataFrame Dataframe containing rows which failed checks """ if error_info_by_row: if type(error_info_by_row) is not dict: raise TypeError("numerical should either be none or a dict") if sum(bad_df.index.isin(idxs)) == 0: return bad_df elif not error_info_by_row: bad_df.loc[bad_df.index.isin(idxs), "failed_checks"] = bad_df.loc[ bad_df.index.isin(idxs), "failed_checks" ].apply(lambda x: x + "\n" + reason_failed) else: bad_df.loc[bad_df.index.isin(idxs), "failed_checks"] = bad_df.loc[ bad_df.index.isin(idxs) ].apply( lambda x: x["failed_checks"] + "\n" + reason_failed + f"{error_info_by_row[x.name]}", axis=1, ) return bad_df def _update_good_bad_df( self, good_df, bad_df, idxs, reason_failed, error_info_by_row=None ): """Function to separate rows from good_df with indices in idxs and add them to bad_df, along with an extra field 'failed_checks' set to reason failed Parameters ---------- good_df : pd.DataFrame The dataframe containing rows to remove. bad_df : pd.DataFrame The dataframe to which rows will be added. idxs : list List of indices in good_df to remove and add to bad_df reason_failed : str String to assign as'failed_checks' in bad_df. error_info_by_row: None or dict, default=None Additional error information for each record. Has actual value per row ID which is failing check Returns ------- good_df, bad_df : tuple Dataframes containing rows which pass checks (good_df) and rows which fail checks (bad_df). """ if error_info_by_row: if type(error_info_by_row) is not dict: raise TypeError("numerical should either be none or a dict") bad_idxs = good_df.loc[good_df.index.isin(idxs)] if not error_info_by_row: bad_idxs = bad_idxs.assign(failed_checks=reason_failed) else: bad_idxs = bad_idxs.assign(failed_checks="") bad_idxs["failed_checks"] = bad_idxs.apply( lambda x: reason_failed + f"{error_info_by_row[x.name]}", axis=1 ) good_df = good_df.loc[~good_df.index.isin(idxs)] bad_df = bad_df.append(bad_idxs) return good_df, bad_df
[docs] def transform(self, X, batch_mode=False): """Method to run the input checker tests that have set based on the fitted benchmark dataframe on the comparison dataframe. Parameters ---------- X : pd.DataFrame The new dataframe to validate against the benchmark samples. batch_mode : bool, default=False When batch_mode = True, the dataframe is processed row-by-row. Two data frames are returned: a DF of the records that pass the checks and a DF of the records that fail the checks. The failed records have an extra column 'failed_checks' which contains reasons for the failed checks. When batch_mode = False, an exception will be raised if any of the rows fail the input checks, otherwise the comparison dataframe X is returned Returns ------- good_df, bad_df or X: tuple or pd.DataFrame Returns a tuple of dataframes with rows passing and failing checks respectively if run in batch mode or the comparison dataframe X. If any of the checks fail when batch_mode=False, it will throw an InputChecker exception """ if not isinstance(batch_mode, bool): raise ValueError("batch_mode must be either True or False") X = super().transform(X) # check that scoring dataframe is not empty self._df_is_empty("scoring dataframe", X) type_failed_checks = self._transform_type_checker(X, batch_mode) null_failed_checks = self._transform_null_checker(X) # only run the categorical and numerical checks if checks had been selected if self.categorical_columns is not None: value_failed_checks = self._transform_value_checker(X) else: value_failed_checks = {} if self.numerical_columns is not None: numerical_failed_checks = self._transform_numerical_checker( X, type_failed_checks, batch_mode ) else: numerical_failed_checks = {} if self.datetime_columns is not None: datetime_failed_checks = self._transform_datetime_checker( X, type_failed_checks, batch_mode ) else: datetime_failed_checks = {} if batch_mode: # read test results and raise exception if any have failed with check details good_df, bad_df = self.separate_passes_and_fails( type_failed_checks, null_failed_checks, value_failed_checks, numerical_failed_checks, datetime_failed_checks, X, ) return good_df, bad_df else: # read test results and raise exception if any have failed with check details self.raise_exception_if_checks_fail( type_failed_checks, null_failed_checks, value_failed_checks, numerical_failed_checks, datetime_failed_checks, ) return X
def _check_type(self, obj, obj_name, options): """Method to check the type of a given object. Parameters ---------- obj : any Object to check type of. obj_name : str Name of object, used in error message. options : list Expected options for obj. A single type may be passed in list or multiple options can be passed. """ if type(obj) not in options: raise TypeError( f"unexpected type for {obj_name}\n Expected: {options}\n Actual: {type(obj)}" ) def _is_string_value(self, string, string_name, check_value): """Method to check the value of a given string. Parameters ---------- string : any string to check value. string_name : str Name of string, used in error message. check_value : str Expected value for string. """ if string is not check_value: raise ValueError( f"unexpected str option for {string_name}\n Expected: {check_value}\n Actual: {string}" ) def _is_subset(self, obj_name, columns, dataframe): """Method to check if columns are a subset of a dataframe columns. Parameters ---------- obj_name : str Name of object, used in error message. columns : list Lists of subset columns. dataframe : pd.DataFrame Dataframe to check for subset of columns. """ if not set(columns).issubset(dataframe.columns): unexpected_columns = list(set(columns) - set(dataframe.columns)) raise ValueError( f"{obj_name} is not a subset of the training datframe columns\n Unexpected columns: {unexpected_columns}" ) def _is_empty(self, obj_name, obj): """Method to check if an object is empty. Parameters ---------- obj_name : str Name of object, used in error message. obj : any object to run check on. """ if obj is not None and not obj: raise ValueError(f"{obj_name} is empty") def _is_listed_in_columns(self): """Method to check if all columns passed are included in the columns attribute.""" col_lst = [] for cols in [ self.categorical_columns, self.numerical_columns, self.datetime_columns, self.skip_infer_columns, ]: if cols is not None and cols != "infer": col_lst += cols cols_diff = sorted(set(col_lst) - set(self.columns)) if len(cols_diff) > 0: raise ValueError( f"Column(s); {cols_diff} are not listed when initialising column attribute" ) def _df_is_empty(self, obj_name, df): """Method to check if a dataframe is empty. Parameters ---------- obj_name : str Name of object, used in error message. df : pd.DataFrame dataframe to run check on. """ if df.empty: raise ValueError(f"{obj_name} is empty")