Code Re-usability through feature pipeline framework

Introduction
In any data science, project feature creation is a common stage where raw data is transformed into features. Often the raw data will be in the form of structured tables having a variety of columns & values. The feature creation stage involves picking out useful columns and utilizing the information to create meaningful features eg. In case of fraud detection, a typical raw data will have columns such as Loss Data & Report Date, during the feature creation stage we can pick these columns and create a more meaningful feature – Report Lag Days as,
Report Lag Days = Report Date - Loss Date
At an abstract level, creating the features means applying computational logic on a subset of columns from raw data.
In practice, usually, the column names are hard-wired during the model building stage. i.e to compute Report Lag Days a traditional way is to just code it as follow,
Report_Lag_Series = raw_data['Loss Date'] - raw_data['Report Date']
Depending on the complexity of the model there could be many such features where we directly refer and use column names specific to given input data. Although, this approach is the easiest & fastest for model building but not scalable due to the following reasons:
- Hard coding of column names hamper the reusability of the code, what if in the next project the column names changes (which often the case) but the computation logic remains the same
- With more features, the code looks clumsy and may not be easy to maintain/understand later on
- The underlying computation logic is not encapsulated
We can improve on the traditional feature coding approach with a more re-usable & generalizable framework – Feature Pipeline!
Feature Pipeline class defines the behavior for managing various transformations. It mainly does two jobs
- Adding the transformation sequence (which are to be applied to input raw data)
- Applying those transformations sequentially
import pandas as pd import numpy as np
class FeaturePipeline():
"""
Create a feature pipeline for a raw data source.
Attributes:
transformations: The feature transformations for the pipeline
"""
def __init__(self, logger=None):
self.transformations = []
self.transformed_df = None
self.logger = logger
self.transformed_df = None
def add_transformation(self, on_cols, transformation_f, transformation_args=None, name=None, transform_type='series'):
"""
Add a feature.
Args:
on_cols: String or Tuple
transformation_f: The function to use to transform the input
transformation_args: Additional arguments for transformation
name: Transformation name
transform_type: Transformation type
Returns:
self
"""
if transformation_f in self.transformations:
print('Transformation: {} already added, skipping...'.format(transformation_f))
return self
transformation_f.logger = self.logger
if type(on_cols) in [str]:
on_cols = [on_cols]
self.transformations.append(
Transformation(on_col=on_cols,
f=transformation_f,
args=transformation_args,
name=name,
transform_type=transform_type
))
return self
def get_args(self, t, df, transformed_df):
"""
Creates argument for transformation
"""
list_of_series = []
on_col_list = []
if isinstance(t.on_col, str):
on_col_list = [t.on_col]
else:
on_col_list = t.on_col
for c in on_col_list:
if c in df.columns:
s = df[c]
list_of_series.append(s)
else:
try:
s = transformed_df[c]
list_of_series.append(s)
except:
raise Exception('Column {} not found in input dataframes'.format(c))
if t.transform_type == 'series':
args = {'ser{}'.format(i+1): l for i, l in enumerate(list_of_series)}
elif t.transform_type == 'dataframe':
arg_df = pd.concat(list_of_series, axis=1)
assert arg_df.shape[0] == list_of_series[0].shape[0], 'Argument dataframe has different shape than concatenated series'
args = {'df' : arg_df}
else:
raise Exception('Unknwon transform type')
if t.transformation_args:
args.update(t.transformation_args)
return args
def apply_transformations(self, df, index_cols=None, reapply_all=False):
"""
Apply feature pipline on raw data
Args:
df: Input DataFrame
Returns:
Features DataFrame
"""
# Deduplicate the input dataframe on columns
df = df.loc[:,~df.columns.duplicated()].copy()
# Create index column
if index_cols:
df['INDEX'] = df[index_cols].apply(lambda x: "".join([v for v in x[index_cols]]), axis=1)
# Deduplicate the input dataframe at INDEX level
df = df.drop_duplicates(['INDEX'])
# Set The new index
index_list = df['INDEX'].tolist()
df.index = index_list
del df['INDEX']
else:
index_list = df.index
transformed_df = None or self.transformed_df
for t in self.transformations:
if t.transformation_applied and (not reapply_all):
print('Transformation: {} applied, skipping...'.format(t))
continue
args = self.get_args(t=t, df=df, transformed_df=transformed_df)
if transformed_df is None:
transformed_df = t.apply(args)
transformed_df.index = index_list
else:
tdf = t.apply(args)
tdf.index = index_list
transformed_df = pd.concat([transformed_df, tdf], axis=1)
self.transformed_df = transformed_df
return transformed_df
A Transformation class below controls the behavior of individual transformation, It encapsulates the underlying computation logic used to create features.
class Transformation():
"""
Encapsulation for individual computation logic used for feature creation
Applies a transformation on one or more Pandas Series
in order to produce one or more features
Args:
on_col: String or Tuple, Series names to use when
creating features
f: The function to use to transform the input
name: The name of the transformed feature (or prefix if >1 features)
args: dictionary of additional arguments of f
transform_type: Whether transform to be applied on series or dataframe object
"""
def __init__(self, on_col, f, name, args=None, transform_type='series'):
self.on_col = on_col
self.transformation_f = f
self.name = name
self.transform_type = transform_type
# Additional arguments for transformation
self.transformation_args = args
self.transformation_applied = False
def apply(self, args):
try:
tdf = self.transformation_f(**args)
except TypeError:
tdf = self.transformation_f(*list(args.values()))
if self.name is not None:
if isinstance(tdf, pd.DataFrame):
if isinstance(self.name, list):
tdf.columns = self.name
else:
tdf.columns = [self.name + "_" + str(col) for col in tdf.columns]
elif isinstance(tdf, pd.Series):
tdf = pd.DataFrame(tdf)
tdf.columns = [self.name]
tdf = tdf.reset_index(drop=True)
self.transformation_applied = True
return tdf
Let’s see these in action!
I am considering a sample dataset for fraudulent claims detection.
Read the sample data
raw_data = pd.read_excel("../data/sample_raw_data.xlsx")
Define computation logics as transforms functions
def transforms_above_threshold(ser1, threshold):
"""
Creates indicators based on claimed amount
ser1: Series of values
threshold: threshold value
"""
s = pd.Series(np.where(ser1 > threshold, 1, 0))
s.index = ser1.index
return s
def transforms_days_between(ser1, ser2):
"""
Difference between Dates in Days
ser1: 'From' Date Series
ser2: 'To' Date Series
"""
s1 = pd.to_datetime(ser1)
s2 = pd.to_datetime(ser2)
ser1 = (s2 - s1).dt.days
# To Date>= From Date
ser1 = ser1.clip(0)
return ser1
Initialize the Feature Pipeline
ff = FeaturePipeline()
Create features using transforms
Let’s create some features which could be relevant to fraudulent claims detection use case!
We will now use the above defined transforms functions to create individual features. For adding any transformation in the feature pipeline we need to specify on what columns the transformation will act, the transformation function if any variable arguments are needed for transformation, and the name of the transformation.
A. High claimed amount indicator
ff.add_transformation(on_cols=('Claimed_Amount'),
transformation_f=transforms_above_threshold,
transformation_args={'threshold': 500},
name='HighClaimedAmount')
B. Travel Length
ff.add_transformation(on_cols=('DepartureDate', 'ReturnDate'),
transformation_f=transforms_days_between,
transformation_args=None,
name='TravelLength')
C. Report lag
ff.add_transformation(on_cols=('LossDate', 'ReportDate'),
transformation_f=transforms_days_between,
transformation_args=None,
name='ReportLag')
D. Hospital stay length
ff.add_transformation(on_cols=('Hospital_Start_Date', 'Hospital_End_Date'),
transformation_f=transforms_days_between,
transformation_args=None,
name='HospitalStayLength')
E. Loss duration since policy inception
ff.add_transformation(on_cols=('POL_Eff_Date', 'LossDate'),
transformation_f=transforms_days_between,
transformation_args=None,
name='LossDurationSincePolicyEffective')
Apply the transformations
features = ff.apply_transformations(df=raw_data, index_cols=[‘ClaimNumber’])
features.describe()
| HighClaimedAmount | TravelLength | ReportLag | HospitalStayLength | LossDurationSincePolicyEffective | |
|---|---|---|---|---|---|
| count | 5000.00000 | 982.000000 | 5000.000000 | 131.000000 | 5000.000000 |
| mean | 0.07860 | 10.178208 | 43.579600 | 4.022901 | 1177.416000 |
| std | 0.26914 | 28.931812 | 85.770203 | 7.125756 | 1570.842842 |
| min | 0.00000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 |
| 25% | 0.00000 | 1.000000 | 10.000000 | 1.000000 | 91.000000 |
| 50% | 0.00000 | 4.000000 | 22.000000 | 2.000000 | 454.500000 |
| 75% | 0.00000 | 9.000000 | 51.000000 | 4.000000 | 1770.250000 |
| max | 1.00000 | 364.000000 | 2224.000000 | 66.000000 | 10833.000000 |
features.head()
| HighClaimedAmount | TravelLength | ReportLag | HospitalStayLength | LossDurationSincePolicyEffective | |
|---|---|---|---|---|---|
| Claim_0 | 0 | NaN | 4 | NaN | 4 |
| Claim_1 | 0 | 2.0 | 11 | NaN | 9 |
| Claim_2 | 0 | NaN | 11 | NaN | 4904 |
| Claim_3 | 0 | NaN | 24 | NaN | 199 |
| Claim_4 | 0 | NaN | 16 | NaN | 1340 |
Finally, we’ll have the features table!
Summary
- Feature Pipeline provides a structured way of handling the feature creation stage which is common in most data science projects
- It encapsulates the underlying computation logic, since no column names are hardcoded this approach enhances the reusability of code
- Computation logic defined once can be reused multiple times for similar feature creation. Observe that for creating duration based features such as Report Lag, Travel Length & Hospital Stay Length we used the same transforms the function
- A separate file can be maintained containing transforms functions that can be used across different projects, this will speed up the process of creating features for another project since we can leverage some of the pre-defined transforms for feature creation
- The proposed framework can help in monitoring the individual transforms, experimenting easily with adding more or deleting some transformations from the pipeline and enhancing the overall readability of the code.
Hope you find the article useful.
Do Like & share if you find this article useful!
The media shown in this article are not owned by Analytics Vidhya and is used at the Author’s discretion.


